Build a Data Pipeline with AWS Athena and Airflow (part 1)

In this post, I build up on the knowledge shared in the post for creating Data Pipelines on Airflow and introduce new technologies that help in the Extraction part of the process with cost and performance in mind. I’ll go through the options available and then introduce to a specific solution using AWS Athena. First we’ll establish the dataset and organize our data in S3 Buckets. Afterwards, you’ll learn how to make it so that this information is queryable through AWS Athena, while making sure it is updated daily.

Data dump files of not so structured data are a common byproduct of Data Pipelines that include extraction. dumps of not-so-structured data. This happens by design: business-wise and as Data Engineers, it’s never too much data. From an investment stand point, object-relational database systems can become increasingly costly to keep, especially if we aim at keeping performance while the data grows.

Having this said, this is not a new problem. Both Apache and Facebook have developed open source software that is extremely efficient in dealing with extreme amounts of data. While such softwares are written in Java, they maintain an abstracted interface to the data that relies on traditional SQL language to query data that is stored on filesystem storage, such as S3 for our example and in a wide range of different formats from HFDS to CSV.

Today we have many options to tackle this problem and I’m going to go through on how to welcome this problem in today’s serverless world with AWS Athena. For this we need to quickly rewind back in time and go through the technology Continue reading “Build a Data Pipeline with AWS Athena and Airflow (part 1)”

Airflow: create and manage Data Pipelines easily

This bootstrap guide was originally published at GoSmarten but as the use cases continue to increase, it’s a good idea to share it here as well.

What is Airflow

The need to perform operations or tasks, either simple and isolated or complex and sequential, is present in all things data nowadays. If you or your team work with lots of data on a daily basis there is a good chance you’re struggled with the need to implement some sort of pipeline to structure these routines. To make this process more efficient, Airbnb developed an internal project conveniently called Airflow which was later fostered under the Apache Incubator program. In 2015 Airbnb open-sourced the code to the community and, albeit its trustworthy origin played a role in its popularity, there are many other reasons why it became widely adopted (in the engineering community). It allows for tasks to be set up purely in Python (or as Bash commands/scripts).

What you’ll find in this tutorial

Not only we will walk you through setting up Airflow locally, but you’ll do so using Docker, which will optimize the conditions to learn locally while minimizing transition efforts into production. Docker is a subject for itself and we could dedicate a few dozen posts to, however, it is also simple enough and has all the magic needed to show off some of its advantages combining it with Airflow. Continue reading “Airflow: create and manage Data Pipelines easily”

Using Akka Streaming for “saving alerts” – part 2

This blog post is the second and final part of the post Using akka streaming for “saving alerts”. Part 1 is available here.  In this part we enter the details on how the application was designed.

Full disclosure: this post was initially published at Bonial tech blog here. If you are looking for positions in tech, I would definitely recommend checking their career page.

Application Actor System

The following illustration gives you a schematic view of all the actors used in our application, and (hopefully) some of the mechanics of their interaction:


As previously mentioned, one could divide the application lifecycle logically into three main stages. We will get into more detail about each one of them next, but for now let us walk through them narrowly and try to map to our application

Main Actors

The main actors in our application are: KafkaConsumerActorCouponRouterActor and PushNotificationRouterActor. They perform the core business tasks of this application:

  • Consume events from Kafka and validate them – this is done by KafkaConsumerActor. This is also the actor who controls the whole Akka Streaming Pipeline/flow. The flow is controled so that we can be assured to not overflow the main downstram Akka actors – CouponRouterActor and PushNotificationRouterActor – with too many events such that they cannot handle.
  • Query Coupon API for results – for available coupons for a given merchant and for a given user, we query coupon API for results. Those results are sent back to Akka Streaming Pipeline.
  • Apply Business Rules & fire or not a Push notification – the last key stage involves sending returned results to PushNotificationRouterActor for it to apply a given set of business rules. In case those rules consider the event valid, a push notification may be fired, in case none has been sent in the last X amount of hours.

Not mentioned yet is MetaInfoRouterActor. It is used with sole purpose of monitoring statistics throughout the whole Akka Streaming pipeline flow. As written on the illustration, given that it is not a core feature of the application itself, and thus we send all messages to our monitoring service in a “fire and forget” manner – that is, we do not wait for acknowledgement. This of course implies that there is the possibility of messages not being delivered, and ultimately not landing in our monitoring system. However, this was considered as a minor and neglectable risk.

Secondary Actors

In the sidelines, and as a secondary service that serves the main actors we have three actors: AppMasterActor, MetaInfoActor and RulesActor.

AppMasterActor actor has two main functions: control the discovery protocol that we implemented, and host healthcheck endpoint used for outside application monitoring.

The so called discovery protocol basically makes sure that all actors know where – on which servers – other actors are, so that theoretically speaking we could separate each actor into different servers in a scale-out fashion. As a side note, we would like to highlight that this discovery protocol could have been implemented using Distributed PubSub modules from Akka – which would be definetely more advisable in case our application would grow in the number of actors. Full disclosure: at the time, due to the simplicity of our current App, it seemed simpler to implement it ourselves to keep the project simpler and smaller, which might be a questionable technical architecture decision.

Technically speaking, MetaInfoActor and RulesActor are almost identical actors in their implementation: they basically have a scheduled timer to remind them to check in a S3 bucket for a given key, stream-load it into memory, and broadcast it to their respective client actors.

As explained in the previous section, routers host many workers (so called “routees”) behind them, serving as a … well yes, router in front that directs traffic to them. All the actors that are Routers have it explicitely referenced in their name. Thus, when we say the MetaInfoActor or the RulesActor broadcast a message, in fact we are just sending one single message to the respective Router wrapped in a Broadcast() case class; the router then knows that it should broadcast the intended message to all it’s routees.

Scalability & HA

All the actors depict in the illustration live in the same server. As a matter a fact, for the time being we are scaling out the application kind of in a “schizofrenic manner” – we deploy the application in different servers, and each server runs a completely isolated application unaware of the existance of other twin applications. In other words, actors living inside server 1 do not cumunicate with any actor living in server 2. Thus we like to call our current deployment “Pod mode”. We are able to achieve this because all the application “Pods” are consuming events from Kafka using the same consumer group. Kafka intelligently assigns partitions Ids to the several consumers. In other words, Kafka controls the distribution of partitions to each POD, thus allowing us to scale out in a very simple manner:


To increase performance, we can scale out the number of KafkaConsumerActors up to the same number of Kafka partitions. So, for example, if we had a topic with three (3) partitions, we could improve consumption performance by scaling up to three (3) KafkaConsumerActors.

To address High Availability (HA), we could, theoretically speaking, add N+1 KafkaConsumerActors, where N is the number of paritions for HA purposes, if our application was mission critical and very sensitive to performance. This would, however, only potentially improve HA of the application, as this additional KafkaConsumerActor would sit iddle (e.g. not consuming anything from Kafka) until another KafkaConsumerActor node failed.

Moreover, in case you are wondering, not having N+1 KafkaConsumerActor does not severely harm the HA of our application, as Kafka will reassign partition Ids among remaining Consumers in the event of Consumer failure. However, this obviously means that consumers will be unbalanced, as one them will be simultaneously consuming from two partitions.

Now, you may ask what happens in the case of failure of a given node that was processing a given event? Well at the end of the Akka Streaming Pipeline each KafkaConsumerActor commits back the message offset to Kafka – thus ackowledging consumption. So in this case, after the default TTL of message consumption that is configured in Kafka passes, we know that a message was not successfully processed (no acknowledgement), and so another KafkaConsumerActor will actually read again from Kafka that same message, and thereby reprocessing it.

As mentioned previously, when an event processing was processed by the system KafkaConsumerActor will commit back the to Kafka that event’s offset, thereby acknowledging to Kafka that a given message has been successfully consumed for it’s Kafka Consumer Group. We can’t stress this enough (and thus repeating ourselves): this is how we are able to guarantee at at-least once semantics when processing a given message. Note that in our case, since we are storing in Kafka the offsets, in our implementation we cannot guarantee exactly once semantics. Nonetheless, this does not constitute a problem, as we are later using Redis cache to assure event. For more information about Akka Kafka consumer, please check here.

Let us address scalabilty in the rest of the application, by taking the CouponRouterActor architecture into consideration.



As shown in the previous illustration, performance is scaled by using Akka “routees” behind CouponRouterActor (as well as behind PushNotificationRouterActor). On of the beauties of Akka is that it allows us to code the CouponRouterActor 99% the same as if it was not operating as Akka Router. Simply on Actor class instantiation we mention its Router nature, and the rest is handled by Akka.

Final remarks

We will dive into more detail into each stage next. However, we would like to highlight the importance of Akka Streaming Pipeline. It is able to control how many messages should be read from Kafka, because it sends messages to CouponRouterActor and PushNotificationRouterActor using the Ask Pattern – which waits for a response (up to a given time-to-live (TTL)).

Also note that no matter how far an event may go down the flow (an event may be, for example, filtered right in the beginning in case it is considered invalid), we always log to Datadog that a given message was read from Kafka, and was successfully processed. Note that “successfully processed” can have different meanings – either considered Invalid event right in the beginning of the streaming pipeline, or no available coupons returned from Coupon API, or even business rules considered that the system should not send push notification to Kepler API, as business rules define it is unfit.

Moreover, please note that when an event processing is finished – again, no matter how far it goes down the stream pipelineKafkaConsumerActor has the final task of committing back the to Kafka for that event’s offset. In other words, we acknowledge back to Kafka that a given message has been processed. This is an important detail –  since in case of failure of processing a given event (let’s say one of the application servers crashes), after the default TTL of message consumption tha tis configured in Kafka passes, another KafkaConsumerActor will actually read again from Kafka that same message, thus reprocessing it.


Docker environment

Currently we are only using Docker for local development, although this application would fit quite well in, say, Kubernettes cluster, for example.

We have setup a complete emulation of the production env in local via docker:

This is (extremely) useful not only to get a better grip of how the system works in day to day development, but also to do harder to emulate behavioral tests, such as High Availability (HA) tests.

Final Notes

Like any application, there are a number of things that could have be done better, but due to practical constraints (mainly time), were not.  Let us start with some of the things we do not regret:

  • Using Akka: there are many ways we could have implemented this application. Overall akka is a mature full-fledge framework – contains modules for almost anything you may require while building distributed highly available asyncronous applications – and with very satisfactory performance.
  • Using Akka streaming: there are many blogs out there with horror stories on constant performance issues with pure Akka implementations. Akka Streaming module, not only increases stability via back-pressure, it also provides a very intuitive and fun to work with API
  • Using Docker in local: this allowed us to test very easily and especially rapidly in our local machines, more rare scenarious, such as simulating failures on all points in the application: Kafka nodes, Redis, S3, and of course, the Akka application itself.

Some open topics for further reflection:

  • Using our own discovery protocol ended was a questionable technical decision. One possible alternative could have been using akka module “DistributedPubSub”
  • Ideally, this application would be a very nice initial use case to start using Container orchestration tools, such as Kubernetes


And … that’s all folks. We hope that this post was useful to you.

Getting Started with Spark (part 3) – UDFs & Window functions

This post attempts to continue the previous introductory series “Getting started with Spark in Python” with the topics UDFs and Window Functions. Unfortunately it stayed marinating in my Word press for quite a while (was already more less 70% complete) and only now had the opportunity to complete it.

Note: For this post I’m using Spark 1.6.1. There are some minor differences in comparison to the new coming Spark 2.0, such as using a SparkSession object to initialize the Spark Context, instead of HiveContext as I do here. Nonetheless, the important parts are common in both.


Now up until Spark 1.6.2, the only way for you to enrich your SQL queries would be to use a HiveContext instead of Spark SQLContext.

With a HiveContext you got the same features of a SparkContext, but with some of additional advantageous, such as ability to use window functions, access to Hive UDFs, besides the ability to read data from Hive tables. For more detail, please refer here for a concise well explained answer to the differences between SQLContext and HiveContext.

OK, let us start by importing all required dependencies for this tutorial:

# python dependencies
import sys
from datetime import datetime as dt
# pyspark dependencies
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

… and initialize the HiveContext:

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)

As a reminder, our (extremely)  dummy dataset is comprised of the following (non-sense) data:

# note: to simplify, not providing the schema, Spark Df api will infer it
customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27",  "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


What if we wanted to answer a question such as: What is the cumulative sum of spending of each customer throughout time?
A way of thinking of a cumulative sum is as a recursive call where for every new period you sum the current value plus the all the previous accumulated. So one way to solve this is by using Window Functions, a functionality added back in Spark 1.4. However, let us start by adding a column with amount spent, using Spark User Defined Functions (UDFs) for that. These functions basically apply a given function to every row on one or more columns.

# create the general function
def amount_spent(quantity, price):
   Calculates the product between two variables
   :param quantity: (float/int)
   :param price: (float/int)
   return quantity * price

# create the general UDF
amount_spent_udf = udf(amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()

# Apply our UDF to the dataframe
customers02 = customers.withColumn('amount_spent', amount_spent_udf(customers['quantity'], customers['price'])).cache(), truncate=False)

|customer_name|date      |category|product_name|quantity|price|amount_spent|
|Geoffrey     |2016-04-22|A       |apples      |1       |50.0 |50.0        |
|Geoffrey     |2016-05-03|B       |Lamp        |2       |38.0 |76.0        |
|Geoffrey     |2016-05-03|D       |Solar Pannel|1       |29.0 |29.0        |
only showing top 3 rows

To compute a cumulating sum over time, we need to build a window object and specify how it should be partitioned (aka how to determine which intervals should be used for the aggregation computation, meaning which column to use), and optionally the interval to build a window.

window_01 = Window.partitionBy("customer_name").orderBy("date", "category").rowsBetween(-sys.maxsize, 0)
# note: func was the name given to functions, a Spark API for a suite of convenience functions 
win_customers01 = customers02.withColumn("cumulative_sum", func.sum(customers02['amount_spent']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|amount_spent|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0       |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |76.0        |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |
|Yann         |2016-04-13|E       |Book: The noose           |5       |125.0|625.0       |1366.0        |
|Yann         |2016-04-22|B       |Lamp                      |1       |38.0 |38.0        |1404.0        |
|Yann         |2016-04-27|D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |
|Yann         |2016-05-01|Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |
|Yann         |2016-05-03|C       |Rice                      |15      |15.0 |225.0       |1842.0        |
|Yann         |2016-05-03|D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |
|Yann         |2016-05-03|Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |
only showing top 10 rows

Cumalative sum calculation is partitioned by each customer in an interval from the beginning (-sys.maxsize effectively mean start at the very first row in that partition) until the current row (which when the aggregation function is sliding has an index of Zero), and finally ordered by date and category ascending (default).
So just to be sure we’re perfectly clear:
cumlative_sum row zero = (amount_spent row zero);
cumlative_sum row one = (cumlative_sum row zero + amount_spent row zero);
Also, and as a side note, alternatively than defining an UDF, we could specify directly the sum function over the product of two columns (as the Functions.sum is also a UDF).

# Note: instead of defining an UDF, you could alternatively specify directly 
window_01 = Window.partitionBy("customer_name").orderBy("date").rowsBetween(-sys.maxsize, 0)
win_customers01_B = customers.withColumn("cumulative_sum", func.sum(customers['price']*customers['quantity']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|741.0         |
only showing top 3 rows

If the rowsBetween() method still smells a bit funky, no worries, it will become clearer in the next example.

What about if we want to understand how much customers spend on average overall/in total (not grouped by product), throughout time? In other words, how does the average spending vary across a given time periodicy – aka: moving Average?

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)


window_02 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-3, 0)
win_customers02 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_02) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|        movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|            165.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|            120.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|            247.0|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|            341.5|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|           309.75|
|         Yann|2016-04-27|       D|        Solar Pannel|       5| 29.0|       145.0|        1549.0|            327.0|
|         Yann|2016-05-01|       Y|         Motor skate|       1| 68.0|        68.0|        1617.0|            219.0|
|         Yann|2016-05-03|       C|                Rice|      15| 15.0|       225.0|        1842.0|            119.0|
|         Yann|2016-05-03|       D|         Recycle bin|       5| 27.0|       135.0|        1977.0|           143.25|
|         Yann|2016-05-03|       Y|         Motor skate|       1| 68.0|        68.0|        2045.0|            124.0|
|         Yann|2016-05-27|       A|             bananas|       3| 55.0|       165.0|        2210.0|           148.25|
|         Yann|2016-05-27|       D|         Recycle bin|       5| 27.0|       135.0|        2345.0|           125.75|
|         Yann|2016-06-07|       Z|          space ship|       1|227.0|       227.0|        2572.0|           148.75|
|       Yoshua|2016-02-07|       Z|          space ship|       2|227.0|       454.0|         454.0|            454.0|
|       Yoshua|2016-02-14|       A|             bananas|       9| 55.0|       495.0|         949.0|            474.5|
|       Yoshua|2016-02-14|       A|              apples|      10| 55.0|       550.0|        1499.0|499.6666666666667|
|       Yoshua|2016-02-14|       B|                Lamp|       2| 38.0|        76.0|        1575.0|           393.75|
|       Yoshua|2016-03-07|       Z|          space ship|       5|227.0|      1135.0|        2710.0|            564.0|
|       Yoshua|2016-04-07|       A|             bananas|       9| 55.0|       495.0|        3205.0|            564.0|
|       Yoshua|2016-04-07|       C|                Rice|       5| 15.0|        75.0|        3280.0|           445.25|
only showing top 20 rows

Before explaining how this is working, let us revisit the rowsBetween() method. Note that here we specify to compute between the interval of a maximum of 3 rows behind the current one. Alternatively we could say for example two values behind and two ahead interval:

window_03 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-2, 2)
win_customers03 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_03) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|    247.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|    341.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|    280.8|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|    276.8|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|    275.2|
only showing top 5 rows

The first row (247.0) is simply the current value plus the next two, devided by the total:
(165.0 + 76.0 + 500.0)/3 = 247.0

Simple, right?

Going back to how the computation is partioned, the way we structured this is to compute a moving average per customer but iterating over each event.
However let’s say we want to know how the customer spending varies on average across daily/weekly/monthly basis? For that, let’s extract those from our date column.

 |-- customer_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- amount_spent: double (nullable = true)
 |-- cumulative_sum: double (nullable = true)

Spark automatically infered the type of our date column as being String (as we did not specify the schema when we created the Dataframe). Let’s use a UDF to cast it to datetime (using an anonymous function – lambda)

from datetime import datetime as dt
# create the general UDF
string_to_datetime = udf(lambda x: dt.strptime(x, '%Y-%m-%d'), DateType())

# Create a new column called datetime, and drop the date column
win_customers01_B = win_customers01.withColumn('datetime', string_to_datetime( win_customers01['date'])).drop('date')
# Add month and Week columns
win_customers01_C = win_customers01_B.withColumn('year', func.year( win_customers01_B['datetime'] )) \
.withColumn('month', func.month( win_customers01_B['datetime'] )) \
.withColumn('week', func.weekofyear( win_customers01_B['datetime'])), truncate=False)

|customer_name|category|product_name              |quantity|price|amount_spent|cumulative_sum|datetime  |year|month|week|
|Yann         |A       |bananas                   |3       |55.0 |165.0       |165.0         |2016-04-02|2016|4    |13  |
|Yann         |B       |Lamp                      |2       |38.0 |76.0        |241.0         |2016-04-02|2016|4    |13  |
|Yann         |E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |2016-04-03|2016|4    |13  |
|Yann         |E       |Book: The noose           |5       |125.0|625.0       |1366.0        |2016-04-13|2016|4    |15  |
|Yann         |B       |Lamp                      |1       |38.0 |38.0        |1404.0        |2016-04-22|2016|4    |16  |
|Yann         |D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |2016-04-27|2016|4    |17  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |2016-05-01|2016|5    |17  |
|Yann         |C       |Rice                      |15      |15.0 |225.0       |1842.0        |2016-05-03|2016|5    |18  |
|Yann         |D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |2016-05-03|2016|5    |18  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |2016-05-03|2016|5    |18  |
only showing top 10 rows

Let us group customers by spending:

customer_grp_by_day = win_customers01_C.groupBy('customer_name', 'datetime', 'year') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'datetime')

|customer_name|  datetime|year|amount_spent|
|     Geoffrey|2016-04-22|2016|        50.0|
|     Geoffrey|2016-05-03|2016|       330.0|
|     Geoffrey|2016-06-05|2016|       525.0|
|     Geoffrey|2016-06-15|2016|       601.0|
|       Jurgen|2016-05-01|2016|       502.0|
|       Jurgen|2016-05-08|2016|       343.0|
|       Jurgen|2016-06-05|2016|       621.0|
|         Yann|2016-04-02|2016|       241.0|
|         Yann|2016-04-03|2016|       500.0|
|         Yann|2016-04-13|2016|       625.0|
|         Yann|2016-04-22|2016|        38.0|
|         Yann|2016-04-27|2016|       145.0|
|         Yann|2016-05-01|2016|        68.0|
|         Yann|2016-05-03|2016|       428.0|
|         Yann|2016-05-27|2016|       300.0|
|         Yann|2016-06-07|2016|       227.0|
|       Yoshua|2016-02-07|2016|       454.0|
|       Yoshua|2016-02-14|2016|      1121.0|
|       Yoshua|2016-03-07|2016|      1135.0|
|       Yoshua|2016-04-07|2016|       977.0|

Next, let’s check per customer visit (assuming each customer does not visit the store more than once), how much the customer’s pending progresses, with a 7 iterations back interval average:

window_04 = Window.partitionBy("customer_name").orderBy("customer_name", "datetime").rowsBetween(-7, 0)
win_customers04 = customer_grp_by_day.withColumn("movingAvg", func.avg(customer_grp_by_day['amount_spent']).over(window_04))
|customer_name|  datetime|year|amount_spent|         movingAvg|
|         Yann|2016-04-02|2016|       241.0|             241.0|
|         Yann|2016-04-03|2016|       500.0|             370.5|
|         Yann|2016-04-13|2016|       625.0| 455.3333333333333|
|         Yann|2016-04-22|2016|        38.0|             351.0|
|         Yann|2016-04-27|2016|       145.0|             309.8|
|         Yann|2016-05-01|2016|        68.0|             269.5|
|         Yann|2016-05-03|2016|       428.0|292.14285714285717|
|         Yann|2016-05-27|2016|       300.0|           293.125|
|         Yann|2016-06-07|2016|       227.0|           291.375|
|       Yoshua|2016-02-07|2016|       454.0|             454.0|
|       Yoshua|2016-02-14|2016|      1121.0|             787.5|
|       Yoshua|2016-03-07|2016|      1135.0| 903.3333333333334|
|       Yoshua|2016-04-07|2016|       977.0|            921.75|
|     Geoffrey|2016-04-22|2016|        50.0|              50.0|
|     Geoffrey|2016-05-03|2016|       330.0|             190.0|
|     Geoffrey|2016-06-05|2016|       525.0| 301.6666666666667|
|     Geoffrey|2016-06-15|2016|       601.0|             376.5|
|       Jurgen|2016-05-01|2016|       502.0|             502.0|
|       Jurgen|2016-05-08|2016|       343.0|             422.5|
|       Jurgen|2016-06-05|2016|       621.0| 488.6666666666667|

Let us group customers by weekly spending:

customer_grp_by_week = win_customers01_C.groupBy('customer_name', 'year', 'week') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'week')
|     Geoffrey|2016|  16|        50.0|
|     Geoffrey|2016|  18|       330.0|
|     Geoffrey|2016|  22|       525.0|
|     Geoffrey|2016|  24|       601.0|
|       Jurgen|2016|  17|       502.0|
|       Jurgen|2016|  18|       343.0|
|       Jurgen|2016|  22|       621.0|
|         Yann|2016|  13|       741.0|
|         Yann|2016|  15|       625.0|
|         Yann|2016|  16|        38.0|
|         Yann|2016|  17|       213.0|
|         Yann|2016|  18|       428.0|
|         Yann|2016|  21|       300.0|
|         Yann|2016|  23|       227.0|
|       Yoshua|2016|   5|       454.0|
|       Yoshua|2016|   6|      1121.0|
|       Yoshua|2016|  10|      1135.0|
|       Yoshua|2016|  14|       977.0|

And computing the weekly moving average:

window_05 = Window.partitionBy('customer_name').orderBy('customer_name', 'week', 'year').rowsBetween(-4, 0)
win_customers05 = customer_grp_by_week.withColumn("movingAvg", func.avg(customer_grp_by_week['amount_spent']).over(window_05))
|customer_name|year|week|amount_spent|        movingAvg|
|         Yann|2016|  13|       741.0|            741.0|
|         Yann|2016|  15|       625.0|            683.0|
|         Yann|2016|  16|        38.0|            468.0|
|         Yann|2016|  17|       213.0|           404.25|
|         Yann|2016|  18|       428.0|            409.0|
|         Yann|2016|  21|       300.0|            320.8|
|         Yann|2016|  23|       227.0|            241.2|
|       Yoshua|2016|   5|       454.0|            454.0|
|       Yoshua|2016|   6|      1121.0|            787.5|
|       Yoshua|2016|  10|      1135.0|903.3333333333334|
|       Yoshua|2016|  14|       977.0|           921.75|
|     Geoffrey|2016|  16|        50.0|             50.0|
|     Geoffrey|2016|  18|       330.0|            190.0|
|     Geoffrey|2016|  22|       525.0|301.6666666666667|
|     Geoffrey|2016|  24|       601.0|            376.5|
|       Jurgen|2016|  17|       502.0|            502.0|
|       Jurgen|2016|  18|       343.0|            422.5|
|       Jurgen|2016|  22|       621.0|488.6666666666667|

Finally, let us move to monthly groupping and calculations.

customer_grp_by_month = win_customers01_C.groupBy('customer_name', 'year', 'month')\
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'month')
|     Geoffrey|2016|    4|        50.0|
|     Geoffrey|2016|    5|       330.0|
|     Geoffrey|2016|    6|      1126.0|
|       Jurgen|2016|    5|       845.0|
|       Jurgen|2016|    6|       621.0|
|         Yann|2016|    4|      1549.0|
|         Yann|2016|    5|       796.0|
|         Yann|2016|    6|       227.0|
|       Yoshua|2016|    2|      1575.0|
|       Yoshua|2016|    3|      1135.0|
|       Yoshua|2016|    4|       977.0|


# This shows how much the customer's pending progresses across months, with a 3 iterations back interval avg

window_06 = Window.partitionBy('customer_name').orderBy('customer_name', 'month', 'year').rowsBetween(-3, 0)
win_customers06 = customer_grp_by_month.withColumn("movingAvg", func.avg(customer_grp_by_month['amount_spent']).over(window_06))
|customer_name|year|month|amount_spent|        movingAvg|
|         Yann|2016|    4|      1549.0|           1549.0|
|         Yann|2016|    5|       796.0|           1172.5|
|         Yann|2016|    6|       227.0|857.3333333333334|
|       Yoshua|2016|    2|      1575.0|           1575.0|
|       Yoshua|2016|    3|      1135.0|           1355.0|
|       Yoshua|2016|    4|       977.0|           1229.0|
|     Geoffrey|2016|    4|        50.0|             50.0|
|     Geoffrey|2016|    5|       330.0|            190.0|
|     Geoffrey|2016|    6|      1126.0|            502.0|
|       Jurgen|2016|    5|       845.0|            845.0|
|       Jurgen|2016|    6|       621.0|            733.0|

As usual, I suggest further checking these sources:



Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation.

Rewind to Spark – 1.6.2

Unlike Apache Flink, where batch jobs are the exception and Streaming the default, Apache Spark uses the exact opposite logic: Streaming is the exception of batch. To make this more clear, the way Spark Streaming works is by dividing live data streams of data into batches (sometimes called microbatches). The continuous streams are represented as a high level abstraction called discretized stream or DStream, which internally is represented as a sequence of RDDs.

Some things in common between RDDs and DStreams are that DStreams are executed lazily by the output operations – the equivalent to Actions in RDDs. For example, in the classical word count example, the print method would be the output operation.

Nonetheless, even though the main abstraction to work around was DStreams on top of RDDs, it was already possible to work with Dataframes (and thus SQL) on streaming data before.


Changes in Spark 2.0 – introducing “Structured Streaming”

So  the new kid on the block is structured streaming, a feature still in alpha for Spark 2.0. It’s marketing slogan: “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming“. In other words, this is an attempt to simplify the life of all developers, struggling with the three main general problems in streaming, while bringing a streaming API for DataFrames and DataSets.

Note that now the streaming API has been unified with Batch API, which means that you should expect (almost always) to use same method calls exactly as you would in normal batch API. Of course, some minor exceptions, such as reading from file and writing to file,  require extra method call. Here is a copy paste from the examples shown at Spark Submit in the “A deep dive into structured streaming” presentation (highly recommendable).

Syntax for batch more:

input ="json").load("source-path")
result ="device", "signal").where("signal > 15")

Now for structured streaming:

input ="json").stream("source-path")
result ="device", "signal").where("signal > 15")

In case you didn’t notice the difference (that’s good in this case), what changed is instead of load() of a file, you continuously stream(), and same logic to write with startStream(). Pretty cool, right?

Also note that the base premise of  spark’s streaming discussed before still stays; that is, micro-batching still stays.

Repeated Queries on input table

So besides unifying the API, where’s the magic sauce? The main concept is this new abstraction where streaming data is viewed as if it were much like a table in a relational DB. In other words new incoming data is treated as a new row inserted on an unbounded input table.

The developer has three tasks in the middle of this whole streaming party.

  1. One is to define a query that acts repeatedly on this input table, and produces a new table called result table that will be written to an output sink. This query will be analyzed under the hood by spark (as explained bellow) by the query planner into an execution plan, and Spark will figure out on its own what needs to be maintained in the input table to update the result table.
  2. The second is to specify the triggers that control when to update the results.
  3. Define an output mode, so that spark knows how it should write to an external system (such as S3, HDFS or database). Three alternatives: append (only new rows of result table will be written), complete (full result table will be flushed), update (only rows that were updated since last trigger will be changed)

Structured Streaming under the hood

Under the hood (and when I say “under the hood” I mean what is done for you that you don’t need implement in your code – but yes, you should nonetheless care about it) structured streaming are possible due to some considerable changes. The first is that the Query planner (the same for batch mode) generates a continuous series of multiple incremental execution plans for each processing of the next chunk of streaming data (for example, when pooling from Kafka this would mean the new range of offsets).

Now this can get particularly tricky because each execution plan needs to take into consideration the previous aggregations that were held by previous executions. So these continuous aggregations’ state is maintained internally as an in-memory stated, and backed by  Write Ahead Log (WAL) in the file system (for Fault Tolerance). So yes, the Query Planner is FT by storing tracks of offsets of each execution into a distributed file system such as HDFS.

Moreover on FT, the sinks are also idempotent, which means that one should expect that they handle re-executions to avoid double committing the output.

Finally, before you get all excited, I do want to emphasize that at the moment, this is still in alpha mode, and is explicitly not recommended for production environments.

Not enough?, bring on more resources:


Overview HP Vertica vs AWS Redshift

While working in HP some years ago, I was exposed to not only internal training materials, but also a demo environment. I still remember the excitement when HP acquired Vertica Systems in 2011, and we had a new toy to play with… Come on, you can’t blame me, distributed DBs was something only the cool kids were doing.

Bottom line is that it’s been a while since I laid eyes on it… Well recently, while considering possible architectural solutions, I had the pleasure  to revisit Vertica. And since AWS Redshift has been gaining a lot of popularity and we’re also using it at some of our clients, I thought I might give some easy summary to help others.

Now if you’re expecting a smack down post, then I’m afraid I’ll disappoint you – for that you have the internet. If experience has taught me  something is that – in the case of top-notch solutions there are only use cases, and one finds the best fitting one.

They share some properties in terms of architecture in internal engine:

  • Massively Parallel Processing (MPP) architecture: data is distributed among distinct nodes in shared nothing architecture, leveraging scale out and; in case you’re wondering how it compares to hadoop Hive, AirBnB did a smack-down comparison, concluding around 5x advantage of Redshift over Hive;
  • High availability (HA): this follows the first, thanks to to data replication mechanism; in the case of Vertica, they call it”k-safety” for measuring replication factor; and you may also want to check fault groups to control how data is replicated according to physical distribution (such as server rack location, power circuits, etc); same automatic data replication among nodes happens with Redshift under the hood (besides more goodies such as backups)
  • columnar oriented data store: for analytical applications/OLAP (where queries usually select only specific columns in opposition to OLTP), it is usually much faster mainly due to a) does not need to scan whole row and then discard the unnecessary content, b) higher efficiency in compression/encoding mechanisms due to similar data types; for more detailed explanation, I suggest here. Both Vertica and Redshift are built with this architecture;
  • Data compression: Vertica mixes encoding strategies, depending on column data type, table cardinality, and sort order; they do distinguish a difference between encoding and compression, since it will operate directly on encoded data whenever possible, which does not hold true for compression; Redshift does not make such a distinction, and recommends leaving compression to auto mode, although you can choose encoding type;
  • SQL standard interface: as always, minor differences are present, but bottom line you can use the SQL syntax that you’re already accustomed to
  • User Defined Functions (UDFs): both Redshift and Vertica give you space for customization;
  • In-memory DB: Nop, none of these are like SAP HANA, Oracle TimesTen or IBM’s SolidDB

Where they differ:

  • Architecture: in Vertica all nodes are “created equally”, meaning they share similar functions; Redshift has the concept of a leader node, a dedicated node which manages workload and query coordination among worker nodes;
  • Management: (this is a key differentiator that most likely that the biggest weight in the final decision) with Vertica you have to do all the ops work (install, upgrade/update, configure nodes, etc.);  Redshift is a fully managed Cloud solution, where you only have pure Database related ops work; Note: yes, HP provides an AMI to easily kickstart projects in AWS cloud, but come on, this is still not the same thing;
  • Freedom of environment: with Redshift you’re locked in to AWS; with Vertica you can run it wherever you feel like;
  • Schema Design: Vertica provides you with a Designer Tool to easily migrate from traditional RDBMS systems based on their schema (not saying this saves the world, but can be helpful); this is specially important in the beginning, since columnar data warehouses don’t support indexes; so in Vertica you play with the projection concept, in Redshift with distribution and sort keys (and you better do it well, as it will be key for performance and keeping things balanced)
  • Payment scheme: In Vertica (for data bigger than 1TB, which most certainly is the case) you pay upfront licensing, plus the cost of the machines where you’re running them; with Redshift costs all diluted into an hourly cost, and that’s it;
  • Compiled code: Redshift claims that the leader node compiles the code for optimal performance on execution time, which the guys at Cake also confirm with this excellent post;
  • Free Trial/usage: Vertica lets you have up to 3 nodes and 1TB of data;  Redshift, on the other hand, in case you’re account is still electable for the free usage tier (in the first year), you can try for a total of 750 normalized instance hours per month, enough for running continuously one DC1.Large single node, with 160TB SSD storage
  • Add-Ons: Vertica Pulse for sentiment analysis and Place for geospatial data analysis;

Finally, you might want to go deeper. Again, I really suggest this excellent post by Cake, which provides performance benchmarks. Benchmarks are always disputable; but still, it is always interesting and important comparison method.

Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys.

So let us start on Spark Redshift methods.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc) 

rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'

# Create spark dataframe through a Redshift query
df = \
 .format('com.databricks.spark.redshift') \
 .option('url', rs_url) \
 .option('query', rs_query) \
 .option('tempdir', rs_tmp_dir) \
 .option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
 .option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
 .option('temporary_aws_session_token', sts_credentials.get('Token')) \


OK, until here almost only spark related code in Python.  Now the only thing you might be wondering is what the hell is that “sts_credentials” map?  Well spotted. The following snippet will reveal it.

import requests
from requests.exceptions import HTTPError, Timeout

# Optionally define a default ec2 instance role for EMR instances

def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
  """ Retrieves temp AWS credentials """
  query_uri = '{}'.format(role)
  print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
    if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
      print('Successfully retrieved temp AWS credentials.')
      return sts_credentials
print('There was a problem when retrieving temp credentials '
         'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
   except (HTTPError, ConnectionError, Timeout) as err:
     msg = 'Unable to query AWS for credentials: {}'.format(err)
   except ValueError as err:
     msg = 'Error: unable to decode json from \'None\' value - {} ' \
           '(hint: most likely the role you are using is wrong)'.format(err)
   except Exception as err:
     msg = 'Failed to get AWS role temp credentials: {}'.format(err)

# to use this function, you might do something like the following:
role = 'my-emr-cluster-role'
sts_credentials = get_temp_credentials(role=role)
aws_access_key_id = sts_credentials.get('AccessKeyId')
aws_secret_access_key = sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')


Yes, essentially this peace of code is just doing an http request  to an AWS service. In case you’re getting suspicious of the black magic looking address – “” – let me tell you in advance that this is a handy service to provide meta-information about your instances.

As a side note, and if you are indeed using pyspark jobs, you might want to give more flexibility whether your testing your code in local, or actually running the job in an AWS cluster. So, given the fact that the previous snippet will ONLY work if you run it on a AWS ec2 instance AND that instance is assigned the correct IAM role, here is a simple function which either passes credentials directly (if in local) or uses Role to ask for temp credentials to AWS.

def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
    """ Returns temp AWS credentials present in an AWS instance
    Note: only works on AWS machines!
    :param role: (str) AWS instance role
    :param local_aws_access_key_id: (str) optional param for local testing/dev
    :param local_aws_secret_access_key: (str) optional param for local testing/dev
    :param opts: ()
            (str) temp credentials to be used in query
    if not role:
    sts_credentials = get_temp_credentials(role=role) or dict()
    aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
    aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
    token = sts_credentials.get('Token')
    redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
    if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
        redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
    return redshift_credentials


Here is a complete gist of this code:

Finally some potential gotcha’s: the temporary key have by default a 30 minutes limit, which you can extend initially when you request the temp keys.  In any case, you might want to take that into consideration on jobs that can potentially be long running.