Consuming Kinesis Streams with Lambda functions locally

This blog post was originally published at GoSmarten website. As the number of projects where we use it was increasing, we thought we might as well share it. Let me know if it was helpful!


AWS offers the cool possibility to consume from Kinesis streams in real time in a serverless fashion via AWS Lambda. However in can become extremely annoying to have to deploy a Lambda function in AWS just to test it. Moreover, it is also expensive to hold a Kinesis stream (e.g. queue) up and running just to test code.

Thus, by combining Kinesis Client Library (KCL) with AWS Kinesis and DynamoDB docker containers we were able to recreate locally everything that happens on the background when you plug a Lambda function to a Kinesis stream on AWS. Besides saving costs, this allows developers to substantially reduce development time, as well as develop higher quality code due to the ease and flexibility of testing different scenarios locally.

Feel free to checkout the code supporting this blog post on our repository.

Context: Event Sourcing/CQRS


Event sourcing is not a new concept, but as available streaming technologies have evolved, its widespread use has gained the attention it deserves. Thanks to “publish-subscribe” type of queues, it has become much easier to build streams of events available to multiple consumers at the same time. This democratization of access to an immutable, append-only stream of events is essential, as it separates the responsibility of modelling an event schema to a particular logic. It is also the reason why so many people either argue CQRS and event sourcing are the same thing, or have a symbiotic relationship.

The consumer on his side can then choose how to represent a given fact and ingest it in light of his own business framing. Now, it is important to note that though consumers use events to constantly mutate state representation and store data in a database – that does not mean that they are locked to that interpretation forever in time. As raw events are stored in an immutable fashion and decoupled from consumers interpretation, state can always be replayed and reconstructed to any given point in time.

This is where AWS Kinesis shines, offering a queue as a service, offloading a lot of the maintenance effort and complexities from your development and/or operations teams.


Furthermore, having serverless streaming consumption can further offload a substantial chunk of work of streaming projects. Lambda functions obviously have limitations, and cannot stand by themselves compared to proper streaming engines, such as Apache Flink, Apache Spark, etc. The first limitation would be that each execution is stateless and independent of the previous one. The implications are that, unless you query some external data source, you are left alone in the party with a collection of events, completely blind to what happened in the previous execution. To add up, at the time of writing this blog post, lambda functions can only run up to 5 minutes maximum.

However, depending on your requirements, there are potential ways around their limitations. One example would be using materialized views, supported in all main relational databases. Depending on your write load, some people even consider database triggers, though this might be a ticking bomb in the medium/long term. Another strategy could be leveraging further AWS goodies, using DynamoDB as a stateful layer and DynamoDB streams to progressively evolve as potential out of order events arrive.

Thus, considering plugging AWS Lambda functions to your streaming execution can be a viable solution, depending on the complexity of the application you are building. Next, we dive into more details on how AWS is actually implementing the plugging of Lambda functions to Kinesis streams.

AWS Lambda for streaming

AWS Lambda service has come a long way since it was launched, and it integrates with numerous other services. One of the ways it integrates with other services is by allowing you to specify other services as triggers for lambda execution. In our case, the service is Kinesis streams. The way this works is by having the AWS Lambda service constantly polling the stream and invoking a particular lambda function.

When using AWS services as a trigger for lambda invocation, that invocation is obviously predetermined by the service. As stated in the AWS documentation, in the case of stream-based services – at the time of writing this blog post that means Kinesis streams and DynamoDB streams – the invocation will always be synchronous. However, the polling from the stream is done in parallel, where the parallelism level is determined by the number of shards a given stream has. In practice, that will potentially result in having X amount of lambda functions simultaneously running. If this represents a potential issue for you, one way to minimize ordering issues is to adapt the partition key in your producer to group events conveniently.

Last but not least, one of the cool things about this solution is that the polling from the stream is done in the background by AWS. Every second AWS will poll from the stream, and if there are records, it will pass that collection of records to your lambda function. However, don’t fear: you can and should customize this batch size of records, given that, as previously mentioned, lambda functions can only run up to 5 minutes.

Running locally

The following steps assume that you have clone locally our public repo.

1) start docker env

Although we are big fans of docker compose, we have rather chosen to implement bootstrapping docker containers via bash scripts for two main reasons:
a) We wanted to give developers the flexibility to choose which Dockers to start. For example, Java consumer implementation requires using a local DynamoDB, whereas Python doesn’t;
b) We wanted to have the flexibility to automate additional functionality, such as creating Kinesis streams and DynamoDB tables, listing them after boot, creating local AWS CLI profile, etc.
To bootstrap all containers:

cd docker/bootstrapEnv && bash +x

If you check the bash script, you will see that it will:
a) start DynamoDB docker
b) locally setup a fake AWS profile to interact with local Kinesis
c) start Kinesis container
d) create a new kinesis stream Note that we are not persisting any data from these containers, so every time you start any Docker it will be “brand new”.

Also relevant to point out is that we are creating the stream with three shards. In WS Kinesis terminology, this means the queue partitioning, and also how one would improve read/write capacity of a given stream. However, in reality this is completely mocked, since we are running a single docker container, which “pretends” to have X amount of shards/partitions.

2) Publish fake data to Kinesis stream

To help you get started, we provided yet another bash script that pushes mock data (json encoding) to the same Kinesis stream previously created.

To start producing events:

cd docker/common && bash +x

This will continuously publish events to the Kinesis stream until you decide to stop it (for example by hitting Ctrl+C). Note also that we are randomly publishing to any of the 3 available shards. For future reference, besides adapting our mock event data towards your requirements, the specification of partition key might also be something you want to enforce depending on how your producers are publishing data into Kinesis.

3) Start consuming from kinesis stream

At this point, you have everything to test the execution of a Lambda function. We have provided an example of a basic Lambda – com.gosmarten.mockcloud.localrunner.TestLambda – that just prints each event. To actually test it running, you need to run com.gosmarten.mockcloud.localrunner.RawEventsProcessorLambdaRunner class. This class continuously iterates over each stream shard and pulls for data, which it will then pass to our lambda as a collection of Records.

4) How to test your own Lambda functions

final KinesisConsumerProcessorManager recordProcessorFactory = new KinesisConsumerProcessorManager<>(TestLambda::new);recordProcessorFactory.setAwsProfile(AWS_PROFILE).runWorker(STREAM_NAME, APP_NAME);

Instead of “TestLambda”, specify your own. And … that’s it!

Last but not least, stay tuned as we plan to update the original repo with the python version. Happy coding!




Using Akka Streaming “saving alerts” – part 1

Full disclosure: this post was initially published at Bonial tech blog here, one my favorite companies at the heart of Berlin, and where I have been fortunate enough to be working for 2+ years as a freelance Data Engineer. If you are looking for positions in tech, I can’t help to recommend checking their career page.


Some months ago I was working on an internal project at Bonial using Akka Streaming (in scala) to provide additional features to our current push notification system. The main goal of the project was to enhance the speed to which the client is able to notify its end users of available discount coupons. In this case, we wanted to notify the users in a real time fashion of available coupons on store, so that they could use them more effectively on the spot and save money. Hence our project code name: “saving alerts”!

After some architectural discussions where we compared several technical options, we decided to give akka streaming a go. It has been a fun ride, so we thought we might as well share some of the lessons learned.

This post has been divided into two parts:

  1. Part 1 – we provide an overview about the main tech building blocks being used in this project (mainly focusing on Akka)
  2. Part 2 – details how the system was designed and implemented


Without further ado, let us start with an overview of our “saving-alerts” application:


Logically speaking one can group the tasks executed by our application into three (3) main stages:

  1. Read an event(s) from a given Kafka topic and perform basic validation (which are collected from our own tracking API); each event belongs to a unique user, and is triggered by our mobile App;
  2. Querying an internal service – the so-called “Coupon API” – to check if there are any available coupons for that given user;
  3. Apply of set of business logic rules – which at the moment are determined by our Product Managers – which determine whether, in the end, to send or not to send a push notification to that user mobile app.

Besides these main logical tasks, we still do some other “offline” housekeeping scheduled processes, namely: loading from an S3 bucket into memory updated versions of meta information about our retailers and the business logic rules, renew an Auth token to talk internally within the client’s API, and obviously logging of app statistics for monitoring purposes.

In terms of tech stack, relevant for this project is simply the Akka actor system, a dedicated single node Redis instance, and some dedicated S3 buckets. All the rest – such as the tracking API, Kafka queue, Authentication API, Coupon API, Monitoring Service and Push Notification API, etc. – are all viewed as external services from the app point of view, even though most of them belong inside the company.

Though not particularly relevant for this project, the whole Akka application was deployed on AWS on ec2 instances. As we state in our final conclusion notes, a good fit for this application would also be to use some Docker container orchestration service such as Kubernetes.

Before we dive deep into how we implemented this system, let us first review the main technical building block concepts used in the project.

System Architecture

The main building block of the current application is the Akka framework. Hopefully this section will guide you through some of the rational that we used to guide our decisions, and ideally why we choose to use Akka for this project.

About Akka

Why akka

Let’s start from the very basics: building concurrent and distributed applications is far from being a trivial task. In short, Akka comes to the rescue for this exact problem: it is an open source project that provides a simple and high level abstraction layer in the form of Actor model to greatly simplify dealing concurrent, distributed and fault tolerant applications on the JVM. Here is a summary of Akka’s purpose:

  • provide concurrency (scale up) and remoting (scale out)
  • easily get rid of race conditions and multi-threading locking problems, such as deadlocks (“[…] when several participants are waiting on each other to reach a specific state to be able to progress”), starvation (“[…] when there are participants that can make progress, but there might be one or more that cannot”), and livelocks (when several participants are granting each other a given resource and none ends up using it) – please see akka documentation, which does a fantastic job explaining it;
  • provide easy programming model to code a multitude of complex tasks
  • allow you to scale horizontally in your Application


There are three main Building blocks we are using from Akka framework in this project:

  • Akka actors
  • Akka remote & clustering
  • Akka streaming

Akka Actors

The actor system provides asynchronous non-blocking highly performant message-driven programming model distributed environment. This is achieved via the Actor concept.

An Actor is sort of a safety container, a sort of light weight isolated computation units which encapsulate state and behaviour.

In fact, actor methods are private by default –  one cannot call methods on actors from the outside. The only way to interact with actors is by message sending – this holds also true for inter-actor communication. Moreover, and as stated in Akka documentation:  “This method (the “receive” method, which has to be overriden by every actor implementation) is responsible for receiving and handling one message. Let’s reiterate what we already said: each and every actor can handle at most one message at a time, thus receive method is never called concurrently. If the actor is already handling some message, the remaining messages are kept in a queue dedicated to each actor. Thanks to this rigorous rule, we can avoid any synchronization inside actor, which is always thread-safe.

Here is an example of a basic Actor class (written in scala), retrieved from Akka’s documentation and changed on minor details:


The receive method

In order to create an actor, one needs to extend the Actor Trait (sort of Java Interface), which mandates the implementation of the “receive” method – basically where all the action happens. In our example, in case the “MyActor” receives the message “test”, the actor will log “received test”, and if it receives the message “mutate”, it will mutate its local variable by incrementing one (1). As each message is handled sequentially, and there is no other way to interact with an actor, it follows that you do not have to synchronize access to internal Actor state variables, as they are protected from state corruption via isolation – this is what is meant when one says that actors encapsulate state and behaviour.

As mentioned before, the receive method needs to be implemented by every actor. The receive Method is a PartialFunction, which accepts “Any” type and with void return. You can confirm this in Akka’s source code, namely the Actor object implementation:

By the way, as a side note, the receive method being a PartialFunction is also one of Akka Streaming main proponents criticism, due to the lack of type safety.

In the provided example we are using simple strings as messages (“test”, and “mutate”). However usually one uses scala case classes to send messages, since, as a best practice, messages should be immutable objects, which do not hold any object that is mutable. Finally, Akka will take care of serialization in the background. However, you can also implement your custom serializers, as is recommended speacially in the cases of remoting, in order to optimize performance or for complex cases. Here is an example how two actors can communicate with each other:

If one wants to reply to a message sent, one can use the exclamation mark “!” notation to send a message. This is a “fire and forget” way of sending a message (which means there is no acknowledgement from the receiver that the message was indeed received). In order to have an acknowledgement one could use the ask pattern instead with the interrogation mark “?”.

Also note that to retrieve the underlying message sender we call the “sender” method, which returns a so-called “ActorRef” – a reference of the underlying address of the sender actor.  Thus, if actor DudeB would receive message “hallo” from actor DudeA, it would be able to reply to it just by calling sender() method, which is provided in the Actor trait:

Finally, messages are stored in the recipients Mailbox. Though there are exceptions (such a routers, which we will see later), every actor can have a dedicated Mailbox. A Mailbox is a queue to store messages.

Message ordering

Important to note is that message order is not guaranteed. That is, if say Actor B has sent a message to Actor A at a given point in time, and then later Actor C sends a message to same Actor A, Akka does not provide any guarantee that the Actor’s B message will be delivered before Actor’s C message (event though Actor B sent it a message before Actor C). This would be fairly difficult for Akka to control especially in the case where actors are not co-located on the same server (as we will discuss later) – if Actor B is having high gitter on his network for example, it might happen that Actor C gets his message passed through first, for example.

Though order between different actors is not guaranteed, Akka does guarantee that messages from the same actor to another actor will be ordered. So, if Actor B sends one message to Actor A, and then later sends a second message again to Actor A, one has the guarantee that, assuming both messages are successfully delivered, the first message will be processed before the second.


Besides being processed sequentially, it is also relevant to note that messages are also processed asynchronously to avoid blocking the current thread where the actor is residing. Each actor gets assigned a light weight thread – you can have several millions of actors per GB of heap memory – which are completely shielded from other actors. This is the first basic fundamental advantage of Akka – providing a lighting fast asynchronous paradigm/API for building applications.

As we will see next, akka provides many more building blocks which enhance its capabilities. We will focus on how akka benefits this application specifically, namely how it provides an optimized multi-threading scale-up (inside the same server) and scale-out (accross several remote servers) environment for free.

Akka Routers (scale-up)

Router actors are a special kind of actors, that make it easy to scale out. That is, with exactly the same code, one can simply launch an actor of a type Router, and it starts automatically child actors – so-called “routees” in akka terminology.

The child actors will have their own Mailboxes; however the Router itself will not. A router will serve as a fast proxy, which just forwards messages to it’s own routees according to a given algorithm. For example, in our application, we are simply using round-robin policy. However, other (some more complex) algorithms could be used, such as load balancing by routee CPU and memory statistics, or scatter-gun-approach (for low latency requirements for example), or even simply to broadcast to all routees.


The main advantage of Routers is that they provide a very easy way to scale-up the multi-threading environment. With the same Class code and simply changing the way we instantiate the Actor we can transform an actor to a Router.

Akka Remote & clustering modules (scale-out)

To distribute actors accross different servers one has two modules available: Akka remoting, and, dependent on the first, Akka Clustering. Akka remote provides location transparency to actors, so that the application code does not have to change (well, neglectable) if an actor is communicating with another actor on the same server or on a different one.

Akka Clustering on the other hand, goes on step further and builds on top of Akka Remoting, providing failure detection and potentially failover mechanisms. The way clustering works is by having a decentralized peer-to-peer membership service with no single-point-of-failure, nor single point of bottleneck. The membership is done via gossip protocol based on Amazon DynamoDB.

As we will see later, the way we scale in this our application, the clustering advantadges are not currently being used. That is, we are not extending specific actor system accross more than one node. However, the application is written in a way that it is completly prepared for it.

Akka Streaming (backpressure for  streaming consumption)

Akka streaming is yet another module from Akka, relatively recently released. The main point of it is to hide away the complexities of creating a streaming consumption environment,  and providing back-pressure for free. Akka has a really good documentation explaining back-pressure in more detail, but in short back-pressure ensures that producers halt down production speed in case consumers are lagging behind (for example, for some performance issue not being able to consume fast enough).

Last but not least, it is important to highlight that Akka Streaming works kind of a blackbox (in a good way), doing all the heavy lifting in the background reliefing you to focus on other business critial logic. The API is also intuitive to use, with a following nice functional programming paradigm style. However, we should warn that as your operations graph complexity grows, you will be forced to dive deep into more advanced topics.

About Kafka

Kafka is a complex system – more specifically in this case a distributed publish-subscribe messaging system – where one of the many uses-cases include messaging. This is provided to the Akka application as a service, thus from the application stand point, one does not need to care much about it. However, it is beneficial to understand how the application scales and how it ingests data. The following summary attempts to highlight some of the core differences that make Kafka different from other messaging systems, such as RabbitMQ:
  • Kafka implements philosophy dumb broker, smart consumer; consumers are responsible for knowing from when they should consume data – kafka does not keep track; this is a major destinction compared to, for example, RabbitMQ, where many sophisticated delivery checks are available to deal with dead letter messages; in regards to our application, given Akka’ Streaming back-pressure mechanism, our application will halt consumption, in case consumers are not able to keep up with producers;
  • Persistent storage during X amount of time; many clients can read from same topic, for as long as Kafka is configured to persist data;
  • Topics can be partitioned for scalability – in practice this means that we can distribute and store for the same topic among several severs;
  • Data in each partition added in append-only mode, creating an immutable sequence of records – a structured commit log; records are stored in key value structure, and in any format, such as: String, JSON, Avro, etc.
  • It follows that order is only guaranteed on a partition basis; that is, inside the same partition if event A was appended before event B, it will be consumed before as well by the Consumer assigned to that partition. However, among partitions order is not guaranteed; the following illustration taken from kafka’s own project page illustrates this concept better:



  • Besides possibly being partitioned, topics can also be replicated among several nodes, thus guaranteeing HA;
  • Consumers can be assigned to groups, thus scaling the amount of times topic pool can be consumed;

For more detail about Kafka, we recommend Kafka’s own page, which has really good intro. Finally, if you are indeed familiarized with RabbitMQ, we would recommend reading the following article, comparing Kafka with RabbitMQ, especially to which use cases each fits best.


Summary Terraform vs Spinakker

I recently needed to build this summary, so thought I’d rather share with more people as well. Please feel free to add any points you see fitting.


Rather then putting on versus another assuming mutual exclusivity, many companies are adopting both tools simultaneously.Terraform is usually used for static cloud Infrastructure setup and updates, such as networks/VLANs, Firewalls, Load Balancers, storage buckets, etc. Spinakker is used for setting up more complex deployment pipelines, mainly orchestration of software packages and application code to setup on servers.Though there is intersection (Spinakker can also deploy App environment), Terraform provides an easy and clean way to setup Infrastructure-as-Code.


  • Hashicorp product focused on allowing you to cleanly describe and deploy infrastructure in Cloud and on premise environments;
  • Allows to describe your infrastructure as code, as well as to deploy it. Although complexer deployments replying on server images is possible, man-power effort starts growing exponentially
  • The sintax used to describe resources is own Hashicorp Configuration Language (HCL), which may be a turn off at first sight; however, after seeing how clear human readable it is, you won’t regret it;
  • Deployment form varies; from immutable infrastructure style deployments (e.g. new update = complete new resource) to updates, depending on the nature of resources. For example, server instances require immutable approach, where firewall rules do not;
  • Failure to deploy resources does not roll back and destroy “zombie” resources; instead they are marked as tainted, and left as it is. On a next execution plan run, tainted resources will be removed, to give place to new intended resources. More detail here;
  • Multi-cloud deployments, currently supporting AWS, GCP, Azure, OpenStack, Heroku, Atlas, DNSimple, CloudFlare.
  • Doesn’t require server deployment, e.g. can be launched from your own local machine. However, Hashicorp provides Atlas, to provide remote central deployments.


  • Is a Netflix open source tool for managing deployment of complex workflows (Continuous Delivery). It is kind of a second generation/evolution of Asgard.
  • Orchestrator for deploying full Application environments, from sourrounding infrastructure environment (networks, firewalls and load balancers), along with server images;
  • Works hand-in-hand with tools such as Jenkins and packer.
  • As an orchestrator, it focuses on Pipelines, a sequence of stages. A stage is an atomic building block; examples of stages:
    • Build an image (example AMI in case of AWS) in a specific Region
    • Deploy the image
    • Run a bash script
    • Resize a server group
  • Multi-cloud deployments, currently supporting AWS, GCP, Azure, OpenStack, Kubernetes, and CloudFoundry.
  • It is by itself a product with several functionality, requiring itself server deployemnt. Images for easy deployment are available both in AWS, Azure, GCP, and Kubernetes;
  • Provides Web GUI for full pipeline configuration, as well as an API
  • Allows to setup webhooks for notifications via email, SMS and chat clients (HipChat/Slack)


More Resources

Overview HP Vertica vs AWS Redshift

While working in HP some years ago, I was exposed to not only internal training materials, but also a demo environment. I still remember the excitement when HP acquired Vertica Systems in 2011, and we had a new toy to play with… Come on, you can’t blame me, distributed DBs was something only the cool kids were doing.

Bottom line is that it’s been a while since I laid eyes on it… Well recently, while considering possible architectural solutions, I had the pleasure  to revisit Vertica. And since AWS Redshift has been gaining a lot of popularity and we’re also using it at some of our clients, I thought I might give some easy summary to help others.

Now if you’re expecting a smack down post, then I’m afraid I’ll disappoint you – for that you have the internet. If experience has taught me  something is that – in the case of top-notch solutions there are only use cases, and one finds the best fitting one.

They share some properties in terms of architecture in internal engine:

  • Massively Parallel Processing (MPP) architecture: data is distributed among distinct nodes in shared nothing architecture, leveraging scale out and; in case you’re wondering how it compares to hadoop Hive, AirBnB did a smack-down comparison, concluding around 5x advantage of Redshift over Hive;
  • High availability (HA): this follows the first, thanks to to data replication mechanism; in the case of Vertica, they call it”k-safety” for measuring replication factor; and you may also want to check fault groups to control how data is replicated according to physical distribution (such as server rack location, power circuits, etc); same automatic data replication among nodes happens with Redshift under the hood (besides more goodies such as backups)
  • columnar oriented data store: for analytical applications/OLAP (where queries usually select only specific columns in opposition to OLTP), it is usually much faster mainly due to a) does not need to scan whole row and then discard the unnecessary content, b) higher efficiency in compression/encoding mechanisms due to similar data types; for more detailed explanation, I suggest here. Both Vertica and Redshift are built with this architecture;
  • Data compression: Vertica mixes encoding strategies, depending on column data type, table cardinality, and sort order; they do distinguish a difference between encoding and compression, since it will operate directly on encoded data whenever possible, which does not hold true for compression; Redshift does not make such a distinction, and recommends leaving compression to auto mode, although you can choose encoding type;
  • SQL standard interface: as always, minor differences are present, but bottom line you can use the SQL syntax that you’re already accustomed to
  • User Defined Functions (UDFs): both Redshift and Vertica give you space for customization;
  • In-memory DB: Nop, none of these are like SAP HANA, Oracle TimesTen or IBM’s SolidDB

Where they differ:

  • Architecture: in Vertica all nodes are “created equally”, meaning they share similar functions; Redshift has the concept of a leader node, a dedicated node which manages workload and query coordination among worker nodes;
  • Management: (this is a key differentiator that most likely that the biggest weight in the final decision) with Vertica you have to do all the ops work (install, upgrade/update, configure nodes, etc.);  Redshift is a fully managed Cloud solution, where you only have pure Database related ops work; Note: yes, HP provides an AMI to easily kickstart projects in AWS cloud, but come on, this is still not the same thing;
  • Freedom of environment: with Redshift you’re locked in to AWS; with Vertica you can run it wherever you feel like;
  • Schema Design: Vertica provides you with a Designer Tool to easily migrate from traditional RDBMS systems based on their schema (not saying this saves the world, but can be helpful); this is specially important in the beginning, since columnar data warehouses don’t support indexes; so in Vertica you play with the projection concept, in Redshift with distribution and sort keys (and you better do it well, as it will be key for performance and keeping things balanced)
  • Payment scheme: In Vertica (for data bigger than 1TB, which most certainly is the case) you pay upfront licensing, plus the cost of the machines where you’re running them; with Redshift costs all diluted into an hourly cost, and that’s it;
  • Compiled code: Redshift claims that the leader node compiles the code for optimal performance on execution time, which the guys at Cake also confirm with this excellent post;
  • Free Trial/usage: Vertica lets you have up to 3 nodes and 1TB of data;  Redshift, on the other hand, in case you’re account is still electable for the free usage tier (in the first year), you can try for a total of 750 normalized instance hours per month, enough for running continuously one DC1.Large single node, with 160TB SSD storage
  • Add-Ons: Vertica Pulse for sentiment analysis and Place for geospatial data analysis;

Finally, you might want to go deeper. Again, I really suggest this excellent post by Cake, which provides performance benchmarks. Benchmarks are always disputable; but still, it is always interesting and important comparison method.

Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys.

So let us start on Spark Redshift methods.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc) 

rs_query = &amp;quot;select * from my_table limit 10&amp;quot;
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&amp;amp;amp;amp;amp;password=pass'

# Create spark dataframe through a Redshift query
df = \
 .format('com.databricks.spark.redshift') \
 .option('url', rs_url) \
 .option('query', rs_query) \
 .option('tempdir', rs_tmp_dir) \
 .option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
 .option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
 .option('temporary_aws_session_token', sts_credentials.get('Token')) \


OK, until here almost only spark related code in Python.  Now the only thing you might be wondering is what the hell is that “sts_credentials” map?  Well spotted. The following snippet will reveal it.

import requests
from requests.exceptions import HTTPError, Timeout

# Optionally define a default ec2 instance role for EMR instances

def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
  &amp;quot;&amp;quot;&amp;quot; Retrieves temp AWS credentials &amp;quot;&amp;quot;&amp;quot;
  query_uri = '{}'.format(role)
  print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
    if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
      print('Successfully retrieved temp AWS credentials.')
      return sts_credentials
print('There was a problem when retrieving temp credentials '
         'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
   except (HTTPError, ConnectionError, Timeout) as err:
     msg = 'Unable to query AWS for credentials: {}'.format(err)
   except ValueError as err:
     msg = 'Error: unable to decode json from \'None\' value - {} ' \
           '(hint: most likely the role you are using is wrong)'.format(err)
   except Exception as err:
     msg = 'Failed to get AWS role temp credentials: {}'.format(err)

# to use this function, you might do something like the following:
role = 'my-emr-cluster-role'
sts_credentials = get_temp_credentials(role=role)
aws_access_key_id = sts_credentials.get('AccessKeyId')
aws_secret_access_key = sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')


Yes, essentially this peace of code is just doing an http request  to an AWS service. In case you’re getting suspicious of the black magic looking address – “; – let me tell you in advance that this is a handy service to provide meta-information about your instances.

As a side note, and if you are indeed using pyspark jobs, you might want to give more flexibility whether your testing your code in local, or actually running the job in an AWS cluster. So, given the fact that the previous snippet will ONLY work if you run it on a AWS ec2 instance AND that instance is assigned the correct IAM role, here is a simple function which either passes credentials directly (if in local) or uses Role to ask for temp credentials to AWS.

def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
    &amp;quot;&amp;quot;&amp;quot; Returns temp AWS credentials present in an AWS instance
    Note: only works on AWS machines!
    :param role: (str) AWS instance role
    :param local_aws_access_key_id: (str) optional param for local testing/dev
    :param local_aws_secret_access_key: (str) optional param for local testing/dev
    :param opts: ()
            (str) temp credentials to be used in query
    if not role:
    sts_credentials = get_temp_credentials(role=role) or dict()
    aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
    aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
    token = sts_credentials.get('Token')
    redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
    if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
        redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
    return redshift_credentials


Here is a complete gist of this code:

Finally some potential gotcha’s: the temporary key have by default a 30 minutes limit, which you can extend initially when you request the temp keys.  In any case, you might want to take that into consideration on jobs that can potentially be long running.




Distcp S3-hdfs for eu-central-1 (AWS Frankfurt) – details “behind the trenches”


Distcp (distributed copy) is a fairly old tool used to move a large quantity of files usually within hdfs, using MapReduce job to do so where mappers list the source files and reducers do the copy heavy lifting. Another useful integration is that it can also deal with file migrations between hdfs and AWS S3. In the meanwhile S3Distcp came along as an specific enhancement for speeding up these last kind of migrations.

However, what might be new for you is the requirement to use either one of these tools to move data from/to a bucket located in S3 Frankfurt region.   So this post intends to save you that half an hour of painful google research trying to figure out why your Oozie job is failing.

Without further ado, before you dive in to your Cloudera or Hortonworks Manager to edit hadoop properties in core-site.xml, I would suggest that you SSH to one of your hadop nodes, and start by testing running distcp command. Here is the sintax for moving data from S3 to HDFS:

hadoop distcp -Dfs.s3a.awsAccessKeyId=XXX \
-Dfs.s3a.awsSecretAccessKey=XX \ s3a://YOUR-BUCKET-NAME/ \

So yeah, in case you haven’t noticed, the black magic pieces in this command that makes things work just like in other regions are: the “fs.s3a.endpoint” property, as well as the “a” after “s3”.

Also in case you are moving data from hdfs to S3, besides write access, make sure you also give remove permissions on your AWS policy attaches to  the given user account on for that S3 bucket (or specific path) . This is because Distcp produces temporary files adjacent to your bucket target file migration, which you want to allow the reducers to remove in the end. Here is an example of an AWS policy for that effect:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:s3:::*"
            "Effect": "Allow",
            "Action": [
            "Resource": [
                "arn:aws:s3:::YOUR-BUCKET" ]

Alternatively you might also want to test it using S3Distcp tool. Here is another example:

 hadoop jar s3distcp.jar --src /data/ \
--dest s3a://YOUR-BUCKET-NAME/ \

Note that the s3distcp jar needs to be locally on the host file system from which you are running the command. If you have the jar in hdfs, here is an example how you can fetch it:

hadoop fs -copyToLocal /YOUR-JARS-LOCATION-IN-HDFS/s3distcp.jar \

Also there is also the possibility you might stumble uppon some headaches with S3Distcp tool. This might prove itself to be handy.

Hope this helped! Where to go next? AWS has a handy best practice guide listing different strategies for data migration here.