Getting Started with Spark (part 3) – UDFs & Window functions

This post attempts to continue the previous introductory series “Getting started with Spark in Python” with the topics UDFs and Window Functions. Unfortunately it stayed marinating in my Word press for quite a while (was already more less 70% complete) and only now had the opportunity to complete it.

Note: For this post I’m using Spark 1.6.1. There are some minor differences in comparison to the new coming Spark 2.0, such as using a SparkSession object to initialize the Spark Context, instead of HiveContext as I do here. Nonetheless, the important parts are common in both.


Now up until Spark 1.6.2, the only way for you to enrich your SQL queries would be to use a HiveContext instead of Spark SQLContext.

With a HiveContext you got the same features of a SparkContext, but with some of additional advantageous, such as ability to use window functions, access to Hive UDFs, besides the ability to read data from Hive tables. For more detail, please refer here for a concise well explained answer to the differences between SQLContext and HiveContext.

OK, let us start by importing all required dependencies for this tutorial:

# python dependencies
import sys
from datetime import datetime as dt
# pyspark dependencies
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

… and initialize the HiveContext:

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)

As a reminder, our (extremely)  dummy dataset is comprised of the following (non-sense) data:

# note: to simplify, not providing the schema, Spark Df api will infer it
customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27",  "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


What if we wanted to answer a question such as: What is the cumulative sum of spending of each customer throughout time?
A way of thinking of a cumulative sum is as a recursive call where for every new period you sum the current value plus the all the previous accumulated. So one way to solve this is by using Window Functions, a functionality added back in Spark 1.4. However, let us start by adding a column with amount spent, using Spark User Defined Functions (UDFs) for that. These functions basically apply a given function to every row on one or more columns.

# create the general function
def amount_spent(quantity, price):
   Calculates the product between two variables
   :param quantity: (float/int)
   :param price: (float/int)
   return quantity * price

# create the general UDF
amount_spent_udf = udf(amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()

# Apply our UDF to the dataframe
customers02 = customers.withColumn('amount_spent', amount_spent_udf(customers['quantity'], customers['price'])).cache(), truncate=False)

|customer_name|date      |category|product_name|quantity|price|amount_spent|
|Geoffrey     |2016-04-22|A       |apples      |1       |50.0 |50.0        |
|Geoffrey     |2016-05-03|B       |Lamp        |2       |38.0 |76.0        |
|Geoffrey     |2016-05-03|D       |Solar Pannel|1       |29.0 |29.0        |
only showing top 3 rows

To compute a cumulating sum over time, we need to build a window object and specify how it should be partitioned (aka how to determine which intervals should be used for the aggregation computation, meaning which column to use), and optionally the interval to build a window.

window_01 = Window.partitionBy("customer_name").orderBy("date", "category").rowsBetween(-sys.maxsize, 0)
# note: func was the name given to functions, a Spark API for a suite of convenience functions 
win_customers01 = customers02.withColumn("cumulative_sum", func.sum(customers02['amount_spent']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|amount_spent|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0       |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |76.0        |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |
|Yann         |2016-04-13|E       |Book: The noose           |5       |125.0|625.0       |1366.0        |
|Yann         |2016-04-22|B       |Lamp                      |1       |38.0 |38.0        |1404.0        |
|Yann         |2016-04-27|D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |
|Yann         |2016-05-01|Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |
|Yann         |2016-05-03|C       |Rice                      |15      |15.0 |225.0       |1842.0        |
|Yann         |2016-05-03|D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |
|Yann         |2016-05-03|Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |
only showing top 10 rows

Cumalative sum calculation is partitioned by each customer in an interval from the beginning (-sys.maxsize effectively mean start at the very first row in that partition) until the current row (which when the aggregation function is sliding has an index of Zero), and finally ordered by date and category ascending (default).
So just to be sure we’re perfectly clear:
cumlative_sum row zero = (amount_spent row zero);
cumlative_sum row one = (cumlative_sum row zero + amount_spent row zero);
Also, and as a side note, alternatively than defining an UDF, we could specify directly the sum function over the product of two columns (as the Functions.sum is also a UDF).

# Note: instead of defining an UDF, you could alternatively specify directly 
window_01 = Window.partitionBy("customer_name").orderBy("date").rowsBetween(-sys.maxsize, 0)
win_customers01_B = customers.withColumn("cumulative_sum", func.sum(customers['price']*customers['quantity']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|741.0         |
only showing top 3 rows

If the rowsBetween() method still smells a bit funky, no worries, it will become clearer in the next example.

What about if we want to understand how much customers spend on average overall/in total (not grouped by product), throughout time? In other words, how does the average spending vary across a given time periodicy – aka: moving Average?

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)


window_02 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-3, 0)
win_customers02 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_02) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|        movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|            165.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|            120.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|            247.0|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|            341.5|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|           309.75|
|         Yann|2016-04-27|       D|        Solar Pannel|       5| 29.0|       145.0|        1549.0|            327.0|
|         Yann|2016-05-01|       Y|         Motor skate|       1| 68.0|        68.0|        1617.0|            219.0|
|         Yann|2016-05-03|       C|                Rice|      15| 15.0|       225.0|        1842.0|            119.0|
|         Yann|2016-05-03|       D|         Recycle bin|       5| 27.0|       135.0|        1977.0|           143.25|
|         Yann|2016-05-03|       Y|         Motor skate|       1| 68.0|        68.0|        2045.0|            124.0|
|         Yann|2016-05-27|       A|             bananas|       3| 55.0|       165.0|        2210.0|           148.25|
|         Yann|2016-05-27|       D|         Recycle bin|       5| 27.0|       135.0|        2345.0|           125.75|
|         Yann|2016-06-07|       Z|          space ship|       1|227.0|       227.0|        2572.0|           148.75|
|       Yoshua|2016-02-07|       Z|          space ship|       2|227.0|       454.0|         454.0|            454.0|
|       Yoshua|2016-02-14|       A|             bananas|       9| 55.0|       495.0|         949.0|            474.5|
|       Yoshua|2016-02-14|       A|              apples|      10| 55.0|       550.0|        1499.0|499.6666666666667|
|       Yoshua|2016-02-14|       B|                Lamp|       2| 38.0|        76.0|        1575.0|           393.75|
|       Yoshua|2016-03-07|       Z|          space ship|       5|227.0|      1135.0|        2710.0|            564.0|
|       Yoshua|2016-04-07|       A|             bananas|       9| 55.0|       495.0|        3205.0|            564.0|
|       Yoshua|2016-04-07|       C|                Rice|       5| 15.0|        75.0|        3280.0|           445.25|
only showing top 20 rows

Before explaining how this is working, let us revisit the rowsBetween() method. Note that here we specify to compute between the interval of a maximum of 3 rows behind the current one. Alternatively we could say for example two values behind and two ahead interval:

window_03 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-2, 2)
win_customers03 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_03) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|    247.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|    341.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|    280.8|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|    276.8|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|    275.2|
only showing top 5 rows

The first row (247.0) is simply the current value plus the next two, devided by the total:
(165.0 + 76.0 + 500.0)/3 = 247.0

Simple, right?

Going back to how the computation is partioned, the way we structured this is to compute a moving average per customer but iterating over each event.
However let’s say we want to know how the customer spending varies on average across daily/weekly/monthly basis? For that, let’s extract those from our date column.

 |-- customer_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- amount_spent: double (nullable = true)
 |-- cumulative_sum: double (nullable = true)

Spark automatically infered the type of our date column as being String (as we did not specify the schema when we created the Dataframe). Let’s use a UDF to cast it to datetime (using an anonymous function – lambda)

from datetime import datetime as dt
# create the general UDF
string_to_datetime = udf(lambda x: dt.strptime(x, '%Y-%m-%d'), DateType())

# Create a new column called datetime, and drop the date column
win_customers01_B = win_customers01.withColumn('datetime', string_to_datetime( win_customers01['date'])).drop('date')
# Add month and Week columns
win_customers01_C = win_customers01_B.withColumn('year', func.year( win_customers01_B['datetime'] )) \
.withColumn('month', func.month( win_customers01_B['datetime'] )) \
.withColumn('week', func.weekofyear( win_customers01_B['datetime'])), truncate=False)

|customer_name|category|product_name              |quantity|price|amount_spent|cumulative_sum|datetime  |year|month|week|
|Yann         |A       |bananas                   |3       |55.0 |165.0       |165.0         |2016-04-02|2016|4    |13  |
|Yann         |B       |Lamp                      |2       |38.0 |76.0        |241.0         |2016-04-02|2016|4    |13  |
|Yann         |E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |2016-04-03|2016|4    |13  |
|Yann         |E       |Book: The noose           |5       |125.0|625.0       |1366.0        |2016-04-13|2016|4    |15  |
|Yann         |B       |Lamp                      |1       |38.0 |38.0        |1404.0        |2016-04-22|2016|4    |16  |
|Yann         |D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |2016-04-27|2016|4    |17  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |2016-05-01|2016|5    |17  |
|Yann         |C       |Rice                      |15      |15.0 |225.0       |1842.0        |2016-05-03|2016|5    |18  |
|Yann         |D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |2016-05-03|2016|5    |18  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |2016-05-03|2016|5    |18  |
only showing top 10 rows

Let us group customers by spending:

customer_grp_by_day = win_customers01_C.groupBy('customer_name', 'datetime', 'year') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'datetime')

|customer_name|  datetime|year|amount_spent|
|     Geoffrey|2016-04-22|2016|        50.0|
|     Geoffrey|2016-05-03|2016|       330.0|
|     Geoffrey|2016-06-05|2016|       525.0|
|     Geoffrey|2016-06-15|2016|       601.0|
|       Jurgen|2016-05-01|2016|       502.0|
|       Jurgen|2016-05-08|2016|       343.0|
|       Jurgen|2016-06-05|2016|       621.0|
|         Yann|2016-04-02|2016|       241.0|
|         Yann|2016-04-03|2016|       500.0|
|         Yann|2016-04-13|2016|       625.0|
|         Yann|2016-04-22|2016|        38.0|
|         Yann|2016-04-27|2016|       145.0|
|         Yann|2016-05-01|2016|        68.0|
|         Yann|2016-05-03|2016|       428.0|
|         Yann|2016-05-27|2016|       300.0|
|         Yann|2016-06-07|2016|       227.0|
|       Yoshua|2016-02-07|2016|       454.0|
|       Yoshua|2016-02-14|2016|      1121.0|
|       Yoshua|2016-03-07|2016|      1135.0|
|       Yoshua|2016-04-07|2016|       977.0|

Next, let’s check per customer visit (assuming each customer does not visit the store more than once), how much the customer’s pending progresses, with a 7 iterations back interval average:

window_04 = Window.partitionBy("customer_name").orderBy("customer_name", "datetime").rowsBetween(-7, 0)
win_customers04 = customer_grp_by_day.withColumn("movingAvg", func.avg(customer_grp_by_day['amount_spent']).over(window_04))
|customer_name|  datetime|year|amount_spent|         movingAvg|
|         Yann|2016-04-02|2016|       241.0|             241.0|
|         Yann|2016-04-03|2016|       500.0|             370.5|
|         Yann|2016-04-13|2016|       625.0| 455.3333333333333|
|         Yann|2016-04-22|2016|        38.0|             351.0|
|         Yann|2016-04-27|2016|       145.0|             309.8|
|         Yann|2016-05-01|2016|        68.0|             269.5|
|         Yann|2016-05-03|2016|       428.0|292.14285714285717|
|         Yann|2016-05-27|2016|       300.0|           293.125|
|         Yann|2016-06-07|2016|       227.0|           291.375|
|       Yoshua|2016-02-07|2016|       454.0|             454.0|
|       Yoshua|2016-02-14|2016|      1121.0|             787.5|
|       Yoshua|2016-03-07|2016|      1135.0| 903.3333333333334|
|       Yoshua|2016-04-07|2016|       977.0|            921.75|
|     Geoffrey|2016-04-22|2016|        50.0|              50.0|
|     Geoffrey|2016-05-03|2016|       330.0|             190.0|
|     Geoffrey|2016-06-05|2016|       525.0| 301.6666666666667|
|     Geoffrey|2016-06-15|2016|       601.0|             376.5|
|       Jurgen|2016-05-01|2016|       502.0|             502.0|
|       Jurgen|2016-05-08|2016|       343.0|             422.5|
|       Jurgen|2016-06-05|2016|       621.0| 488.6666666666667|

Let us group customers by weekly spending:

customer_grp_by_week = win_customers01_C.groupBy('customer_name', 'year', 'week') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'week')
|     Geoffrey|2016|  16|        50.0|
|     Geoffrey|2016|  18|       330.0|
|     Geoffrey|2016|  22|       525.0|
|     Geoffrey|2016|  24|       601.0|
|       Jurgen|2016|  17|       502.0|
|       Jurgen|2016|  18|       343.0|
|       Jurgen|2016|  22|       621.0|
|         Yann|2016|  13|       741.0|
|         Yann|2016|  15|       625.0|
|         Yann|2016|  16|        38.0|
|         Yann|2016|  17|       213.0|
|         Yann|2016|  18|       428.0|
|         Yann|2016|  21|       300.0|
|         Yann|2016|  23|       227.0|
|       Yoshua|2016|   5|       454.0|
|       Yoshua|2016|   6|      1121.0|
|       Yoshua|2016|  10|      1135.0|
|       Yoshua|2016|  14|       977.0|

And computing the weekly moving average:

window_05 = Window.partitionBy('customer_name').orderBy('customer_name', 'week', 'year').rowsBetween(-4, 0)
win_customers05 = customer_grp_by_week.withColumn("movingAvg", func.avg(customer_grp_by_week['amount_spent']).over(window_05))
|customer_name|year|week|amount_spent|        movingAvg|
|         Yann|2016|  13|       741.0|            741.0|
|         Yann|2016|  15|       625.0|            683.0|
|         Yann|2016|  16|        38.0|            468.0|
|         Yann|2016|  17|       213.0|           404.25|
|         Yann|2016|  18|       428.0|            409.0|
|         Yann|2016|  21|       300.0|            320.8|
|         Yann|2016|  23|       227.0|            241.2|
|       Yoshua|2016|   5|       454.0|            454.0|
|       Yoshua|2016|   6|      1121.0|            787.5|
|       Yoshua|2016|  10|      1135.0|903.3333333333334|
|       Yoshua|2016|  14|       977.0|           921.75|
|     Geoffrey|2016|  16|        50.0|             50.0|
|     Geoffrey|2016|  18|       330.0|            190.0|
|     Geoffrey|2016|  22|       525.0|301.6666666666667|
|     Geoffrey|2016|  24|       601.0|            376.5|
|       Jurgen|2016|  17|       502.0|            502.0|
|       Jurgen|2016|  18|       343.0|            422.5|
|       Jurgen|2016|  22|       621.0|488.6666666666667|

Finally, let us move to monthly groupping and calculations.

customer_grp_by_month = win_customers01_C.groupBy('customer_name', 'year', 'month')\
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'month')
|     Geoffrey|2016|    4|        50.0|
|     Geoffrey|2016|    5|       330.0|
|     Geoffrey|2016|    6|      1126.0|
|       Jurgen|2016|    5|       845.0|
|       Jurgen|2016|    6|       621.0|
|         Yann|2016|    4|      1549.0|
|         Yann|2016|    5|       796.0|
|         Yann|2016|    6|       227.0|
|       Yoshua|2016|    2|      1575.0|
|       Yoshua|2016|    3|      1135.0|
|       Yoshua|2016|    4|       977.0|


# This shows how much the customer's pending progresses across months, with a 3 iterations back interval avg

window_06 = Window.partitionBy('customer_name').orderBy('customer_name', 'month', 'year').rowsBetween(-3, 0)
win_customers06 = customer_grp_by_month.withColumn("movingAvg", func.avg(customer_grp_by_month['amount_spent']).over(window_06))
|customer_name|year|month|amount_spent|        movingAvg|
|         Yann|2016|    4|      1549.0|           1549.0|
|         Yann|2016|    5|       796.0|           1172.5|
|         Yann|2016|    6|       227.0|857.3333333333334|
|       Yoshua|2016|    2|      1575.0|           1575.0|
|       Yoshua|2016|    3|      1135.0|           1355.0|
|       Yoshua|2016|    4|       977.0|           1229.0|
|     Geoffrey|2016|    4|        50.0|             50.0|
|     Geoffrey|2016|    5|       330.0|            190.0|
|     Geoffrey|2016|    6|      1126.0|            502.0|
|       Jurgen|2016|    5|       845.0|            845.0|
|       Jurgen|2016|    6|       621.0|            733.0|

As usual, I suggest further checking these sources:



Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation.

Rewind to Spark – 1.6.2

Unlike Apache Flink, where batch jobs are the exception and Streaming the default, Apache Spark uses the exact opposite logic: Streaming is the exception of batch. To make this more clear, the way Spark Streaming works is by dividing live data streams of data into batches (sometimes called microbatches). The continuous streams are represented as a high level abstraction called discretized stream or DStream, which internally is represented as a sequence of RDDs.

Some things in common between RDDs and DStreams are that DStreams are executed lazily by the output operations – the equivalent to Actions in RDDs. For example, in the classical word count example, the print method would be the output operation.

Nonetheless, even though the main abstraction to work around was DStreams on top of RDDs, it was already possible to work with Dataframes (and thus SQL) on streaming data before.


Changes in Spark 2.0 – introducing “Structured Streaming”

So  the new kid on the block is structured streaming, a feature still in alpha for Spark 2.0. It’s marketing slogan: “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming“. In other words, this is an attempt to simplify the life of all developers, struggling with the three main general problems in streaming, while bringing a streaming API for DataFrames and DataSets.

Note that now the streaming API has been unified with Batch API, which means that you should expect (almost always) to use same method calls exactly as you would in normal batch API. Of course, some minor exceptions, such as reading from file and writing to file,  require extra method call. Here is a copy paste from the examples shown at Spark Submit in the “A deep dive into structured streaming” presentation (highly recommendable).

Syntax for batch more:

input ="json").load("source-path")
result ="device", "signal").where("signal > 15")

Now for structured streaming:

input ="json").stream("source-path")
result ="device", "signal").where("signal > 15")

In case you didn’t notice the difference (that’s good in this case), what changed is instead of load() of a file, you continuously stream(), and same logic to write with startStream(). Pretty cool, right?

Also note that the base premise of  spark’s streaming discussed before still stays; that is, micro-batching still stays.

Repeated Queries on input table

So besides unifying the API, where’s the magic sauce? The main concept is this new abstraction where streaming data is viewed as if it were much like a table in a relational DB. In other words new incoming data is treated as a new row inserted on an unbounded input table.

The developer has three tasks in the middle of this whole streaming party.

  1. One is to define a query that acts repeatedly on this input table, and produces a new table called result table that will be written to an output sink. This query will be analyzed under the hood by spark (as explained bellow) by the query planner into an execution plan, and Spark will figure out on its own what needs to be maintained in the input table to update the result table.
  2. The second is to specify the triggers that control when to update the results.
  3. Define an output mode, so that spark knows how it should write to an external system (such as S3, HDFS or database). Three alternatives: append (only new rows of result table will be written), complete (full result table will be flushed), update (only rows that were updated since last trigger will be changed)

Structured Streaming under the hood

Under the hood (and when I say “under the hood” I mean what is done for you that you don’t need implement in your code – but yes, you should nonetheless care about it) structured streaming are possible due to some considerable changes. The first is that the Query planner (the same for batch mode) generates a continuous series of multiple incremental execution plans for each processing of the next chunk of streaming data (for example, when pooling from Kafka this would mean the new range of offsets).

Now this can get particularly tricky because each execution plan needs to take into consideration the previous aggregations that were held by previous executions. So these continuous aggregations’ state is maintained internally as an in-memory stated, and backed by  Write Ahead Log (WAL) in the file system (for Fault Tolerance). So yes, the Query Planner is FT by storing tracks of offsets of each execution into a distributed file system such as HDFS.

Moreover on FT, the sinks are also idempotent, which means that one should expect that they handle re-executions to avoid double committing the output.

Finally, before you get all excited, I do want to emphasize that at the moment, this is still in alpha mode, and is explicitly not recommended for production environments.

Not enough?, bring on more resources:


Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys.

So let us start on Spark Redshift methods.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc) 

rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'

# Create spark dataframe through a Redshift query
df = \
 .format('com.databricks.spark.redshift') \
 .option('url', rs_url) \
 .option('query', rs_query) \
 .option('tempdir', rs_tmp_dir) \
 .option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
 .option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
 .option('temporary_aws_session_token', sts_credentials.get('Token')) \


OK, until here almost only spark related code in Python.  Now the only thing you might be wondering is what the hell is that “sts_credentials” map?  Well spotted. The following snippet will reveal it.

import requests
from requests.exceptions import HTTPError, Timeout

# Optionally define a default ec2 instance role for EMR instances

def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
  """ Retrieves temp AWS credentials """
  query_uri = '{}'.format(role)
  print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
    if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
      print('Successfully retrieved temp AWS credentials.')
      return sts_credentials
print('There was a problem when retrieving temp credentials '
         'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
   except (HTTPError, ConnectionError, Timeout) as err:
     msg = 'Unable to query AWS for credentials: {}'.format(err)
   except ValueError as err:
     msg = 'Error: unable to decode json from \'None\' value - {} ' \
           '(hint: most likely the role you are using is wrong)'.format(err)
   except Exception as err:
     msg = 'Failed to get AWS role temp credentials: {}'.format(err)

# to use this function, you might do something like the following:
role = 'my-emr-cluster-role'
sts_credentials = get_temp_credentials(role=role)
aws_access_key_id = sts_credentials.get('AccessKeyId')
aws_secret_access_key = sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')


Yes, essentially this peace of code is just doing an http request  to an AWS service. In case you’re getting suspicious of the black magic looking address – “” – let me tell you in advance that this is a handy service to provide meta-information about your instances.

As a side note, and if you are indeed using pyspark jobs, you might want to give more flexibility whether your testing your code in local, or actually running the job in an AWS cluster. So, given the fact that the previous snippet will ONLY work if you run it on a AWS ec2 instance AND that instance is assigned the correct IAM role, here is a simple function which either passes credentials directly (if in local) or uses Role to ask for temp credentials to AWS.

def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
    """ Returns temp AWS credentials present in an AWS instance
    Note: only works on AWS machines!
    :param role: (str) AWS instance role
    :param local_aws_access_key_id: (str) optional param for local testing/dev
    :param local_aws_secret_access_key: (str) optional param for local testing/dev
    :param opts: ()
            (str) temp credentials to be used in query
    if not role:
    sts_credentials = get_temp_credentials(role=role) or dict()
    aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
    aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
    token = sts_credentials.get('Token')
    redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
    if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
        redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
    return redshift_credentials


Here is a complete gist of this code:

Finally some potential gotcha’s: the temporary key have by default a 30 minutes limit, which you can extend initially when you request the temp keys.  In any case, you might want to take that into consideration on jobs that can potentially be long running.




Getting started with the Spark (part 2) – SparkSQL

Update: this tutorial has been updated mainly up to Spark 1.6.2 (with a minor detail regarding Spark 2.0), which is not the most recent version of Spark at the moment of updating of this post. Nonetheless, for the operations exemplified you can pretty much rest assured that the API has not changed substantially. I will try to do my best to update and point out these differences when they occur.

SparkSQL has become one of the core modules of Spark ecosystem (with its API already available for all currently supported languages, namely Scala, Java, Python, and R), and is intended to allow you to structure data into named columns (instead of with raw data without a schema as with RDDs) which can be queried with SQL syntax taking advantage of a distributed query engine running in the background.

In practical terms, you can very much think of it as equivalent as an attempt to emulate the API’s of popular dataframes in R or Python pandas, as well as SQL tables. However, if I do my job correctly, I succeed in proving you that they are not the same (and hopefully more).

In order for you to use SparkSQL you need to import SQLContext. Let’s start how you would do it with Python:

# import required libs
from pyspark import SparkConf, SparkContext
# here is the import that allows you to use dataframes
from pyspark.sql import SQLContext

# initialize context in Spark 1.6.2
conf = SparkConf().setMaster('local[2]').setAppName('SparkSQL-demo')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

Note: in the new Spark 2.0, this would indeed vary as such:

# import required libs
from pyspark.sql import SparkSession
# initialize context in Spark 2.0
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
    <span class="o">.</span><span class="n">builder</span>\
    <span class="o">.</span><span class="n">appName</span><span class="p">(</span>'SparkSQL-demo')<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>

Good news, Scala does not vary much:

//import required libs
import org.apache.spark.{SparkConf, SparkContext}
//here is the import that allows you to use dataframes
import org.apache.spark.sql.SQLContext

# initialize context in Spark 1.6.2
val conf = new SparkConf().setMaster(&quot;local[2]&quot;).setAppName(&quot;SparkSQL-demo&quot;)
val sc = new SparkContext(conf=conf)
val sqlContext = new SQLContext(sc)


A dataframe can be created from an existing RDD, as well as similarly to RDDs from a variety of different sources such astext files, HDFS, S3,  Hive tables, external databases, etc. Here’s an example how to build a dataframe by reading from S3 (in Python):

df ='s3n://my-bucket/example/file.json')


Note: we use “s3n” prefix which is valid for a serious of regions in AWS, but not valid for all of them. For example, to access S3 buckets in Frankfurt region you would rather use “s3a” as a prefix.

What sets Spark Dataframes apart

However, and even though the goal is to hide the maximum amount of disparities, there are nonetheless big conceptual differences between a Spark Dataframe and a Pandas dataframe. The first is that the immutability side has (thank God) not been removed. In other words, when you call an action on an Dataframe, you do need to save it into a variable. I’ve seen a lot of data scientists stumble on this in their first Spark days, so let us illustrate this in a very simple example:

# Create a Dataframe with pseudo data:
column_names = [&quot;customer_name&quot;, &quot;date&quot;, &quot;category&quot;,
&quot;product_name&quot;, &quot;quantity&quot;, &quot;price&quot;]
# Note that we create an RDD, and then cast it to
# Dataframe by providing a schema
column_names = ["customer_name", "date", "category",
"product_name", "quantity", "price"]
df01 = sc.parallelize(
[("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00)]).toDF(column_names)

df02 ="customer_name", "date", "category", "product_name", "price", (df01['quantity']+1).alias("quantity"))

|customer_name|      date|category|product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|         Yann|2016-04-22|       B|        Lamp|       1| 38.0|

# .. where df02 is, as expected 

|customer_name|      date|category|product_name|price|quantity|
|     Geoffrey|2016-04-22|       A|      apples| 50.0|       2|
|         Yann|2016-04-22|       B|        Lamp| 38.0|       2|

The second most obvious difference (compared to pandas df) is the distributed nature of a Spark Dataframe, similarly as RDDs are. As a matter a fact,in previous versions of Spark Dataframes were even called SchemaRDD. So internally the same redundancy principles of RDDs apply, you should in fact still keep in mind the Driver – Executor programming model, as well as Actions and Transformations concept.

Also important to note is that Spark Dataframes use internally a query optmizer, one of the reasons why the performance difference between native Scala written Spark code vs Python or Java drops significantly as can be seen in the next illustration provided by databricks:




Let us view some useful operations one can perform with spark dfs. For the sake of brevity I will continue this tutorial mainly using the python API, as the syntax does not really vary that much.

Let us create a bigger fake (and complete utterly non-sense) dataset to make operations more interesting:

customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


The dataset is minimal for the example, so it really does not make a difference to cache it in memory. But my point is to illustrate that similarly to RDDs, you should also care about memory persistence, and can indeed use a similar API. Moreover, since next we are going to use several distinct operations of type “Action” (yielding back results to the driver), it is of good practice to signal that this dataframe should be kept in memory for further operations.

# False
# True

OK, so yes, there are some exceptions, and in this case as you can see the cache() method does not obey the laws of immutability. Moving on, let’s take a peak on it:

|customer_name|      date|category|   product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|         apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|           Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|   Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|         apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|           Rice|       5| 15.0|
|     Geoffrey|2016-06-05|       A|         apples|       5| 50.0|
|     Geoffrey|2016-06-05|       A|        bananas|       5| 55.0|
|     Geoffrey|2016-06-15|       Y|    Motor skate|       7| 68.0|
|     Geoffrey|2016-06-15|       E|Book: The noose|       1|125.0|
|         Yann|2016-04-22|       B|           Lamp|       1| 38.0|
only showing top 10 rows


As previously referred, one advantage over RDDs is the ability to use standard SQL syntax to slice and dice over a Spark Dataframe. Let’s check for example all distinct products that we have:

spark_sql_table = 'customers'
sql_qry = "select distinct(product_name) from {table}".format(table=spark_sql_table)
distinct_product_names_df01 = sqlContext.sql(sql_qry)

|product_name              |
|space ship                |
|Book: The noose           |
|Rice                      |
|Motor skate               |
|Book: Crime and Punishment|
|Recycle bin               |
|Lamp                      |
|bananas                   |
|Solar Pannel              |
|apples                    |


Similarly to how we work with RDDs, with Spark DataFrames you are not restricted to SQL, and can indeed call methods on dataframes, which yields a new dataframe (remember the immutability story, right?)

Let’s check for example the same previous example on how to get all distinct:

distinct_product_names_df02 ='product_name').distinct()

|product_name |
|space ship |
|Book: The noose |
|Rice |
|Motor skate |
|Book: Crime and Punishment|
|Recycle bin |
|Lamp |
|bananas |
|Solar Pannel |
|apples |


Note that you can as well work on a dataframe just like you iterate on an RDD, which causes the dataframe to be cast into a RDD in the background:

distinct_product_names_rdd = row: row[3])
# calling will yield an exception
except Exception as err:
 print('Error: {}'.format(err))
 print('BUT calling .take() this works, as it is an RDD now: {}' \
 .format(distinct_product_names_rdd.take(3)) )

distinct_product_names = distinct_product_names_rdd.distinct().collect()
print('Distinct product names are: {}'.format(distinct_product_names))
print('\n... And, as expected, they are {} in total.' \

Error: 'PipelinedRDD' object has no attribute 'show'
BUT calling .take() this works, as it is an RDD now: ['apples', 'Lamp', 'Solar Pannel']
Distinct product names are: ['Rice', 'bananas', 'space ship', 'Book: The noose', 'Solar Pannel', 'Recycle bin', 'apples', 'Lamp', 'Book: Crime and Punishment', 'Motor skate']

... And, as expected, they are 10 in total.


Similar than in R or Python, you can have summary statistics. Let’s get statistics for Category, quantity and price columns:

customers.describe('category', 'quantity', 'price').show()
|summary|category|          quantity|             price|
|  count|      39|                39|                39|
|   mean|    null| 4.153846153846154| 68.94871794871794|
| stddev|    null|2.9694803863945936|59.759583559965165|
|    min|       A|                 1|              15.0|
|    max|       Z|                15|             227.0|


Being a categorical variable, as you can see the summary regarding the ‘category’ column does not make much sense. We’ll work on better on this later, but for now, let’s do something simple and try to look into product_name and category.

Let’s suppose we are interested in finding out how does purchasing behavior vary among categories, namely what is the average frequency of purchases per category.

cat_freq_per_cust = customers.stat.crosstab('customer_name', 'category')

|customer_name_category|  Y|  Z|  A|  B|  C|  D|  E|
|              Geoffrey|  1|  0|  4|  1|  1|  1|  1|
|                Yoshua|  1|  2|  3|  1|  1|  1|  0|
|                  Yann|  2|  1|  2|  2|  1|  3|  2|
|                Jurgen|  2|  1|  3|  0|  1|  1|  0|


cols = cat_freq_per_cust.columns

|summary|Y                 |Z                |A                |B                |C  |D                 |E                 |
|count  |4                 |4                |4                |4                |4  |4                 |4                 |
|mean   |1.5               |1.0              |3.0              |1.0              |1.0|1.5               |0.75              |
|stddev |0.5773502691896257|0.816496580927726|0.816496580927726|0.816496580927726|0.0|0.9999999999999999|0.9574271077563381|
|min    |1                 |0                |2                |0                |1  |1                 |0                 |
|max    |2                 |2                |4                |2                |1  |3                 |2                 |


Note: “count” column does not have much meaning here; it simply counts number of rows blindly, and as expected since the rows without a given value are filled with “zero” instead of Null, it counts them too.

Let’s check the same for products.

# Let's view products vertically, for formatting reasons
customers.stat.crosstab('product_name', 'customer_name' )\
|Book: Crime and Punishment|0       |0     |1   |0     |
|Recycle bin               |0       |1     |2   |1     |
|Solar Pannel              |1       |0     |1   |0     |
|space ship                |0       |1     |1   |2     |
|Motor skate               |1       |2     |2   |1     |
|Rice                      |1       |1     |1   |1     |
|apples                    |3       |0     |0   |1     |
|Lamp                      |1       |0     |2   |1     |
|bananas                   |1       |3     |2   |2     |
|Book: The noose           |1       |0     |1   |0     |


freq_products = customers.stat.crosstab('customer_name', 'product_name')
freq_prod_cols = freq_products.columns

|summary|          bananas|       space ship|   Book: The noose|Rice|       Motor skate|             Lamp|            apples|Book: Crime and Punishment|      Recycle bin|      Solar Pannel|
|  count|                4|                4|                 4|   4|                 4|                4|                 4|                         4|                4|                 4|
|   mean|              2.0|              1.0|               0.5| 1.0|               1.5|              1.0|               1.0|                      0.25|              1.0|               0.5|
| stddev|0.816496580927726|0.816496580927726|0.5773502691896257| 0.0|0.5773502691896257|0.816496580927726|1.4142135623730951|                       0.5|0.816496580927726|0.5773502691896257|
|    min|                1|                0|                 0|   1|                 1|                0|                 0|                         0|                0|                 0|
|    max|                3|                2|                 1|   1|                 2|                2|                 3|                         1|                2|                 1|

Alternatively we could have answered suing a more SQL-like approach:

cust_prod = customers.groupBy('customer_name', 'product_name') \
 .agg(func.count('product_name')) \
 .orderBy('product_name', 'customer_name'), truncate=False)

|customer_name|product_name              |count(product_name)|
|Yann         |Book: Crime and Punishment|1                  |
|Geoffrey     |Book: The noose           |1                  |
|Yann         |Book: The noose           |1                  |
|Geoffrey     |Lamp                      |1                  |
|Yann         |Lamp                      |2                  |
|Yoshua       |Lamp                      |1                  |
|Geoffrey     |Motor skate               |1                  |
|Jurgen       |Motor skate               |2                  |
|Yann         |Motor skate               |2                  |
|Yoshua       |Motor skate               |1                  |
|Jurgen       |Recycle bin               |1                  |
|Yann         |Recycle bin               |2                  |
|Yoshua       |Recycle bin               |1                  |
|Geoffrey     |Rice                      |1                  |
|Jurgen       |Rice                      |1                  |
|Yann         |Rice                      |1                  |
|Yoshua       |Rice                      |1                  |
|Geoffrey     |Solar Pannel              |1                  |
|Yann         |Solar Pannel              |1                  |
|Geoffrey     |apples                    |3                  |
|Yoshua       |apples                    |1                  |
|Geoffrey     |bananas                   |1                  |
|Jurgen       |bananas                   |3                  |
|Yann         |bananas                   |2                  |
|Yoshua       |bananas                   |2                  |
|Jurgen       |space ship                |1                  |
|Yann         |space ship                |1                  |
|Yoshua       |space ship                |2                  |


cust_prod.groupBy('product_name') \
 .agg(func.avg('count(product_name)')) \
|product_name              |avg(count(product_name))|
|space ship                |1.3333333333333333      |
|Book: The noose           |1.0                     |
|Rice                      |1.0                     |
|Motor skate               |1.5                     |
|Book: Crime and Punishment|1.0                     |
|Recycle bin               |1.3333333333333333      |
|Lamp                      |1.3333333333333333      |
|bananas                   |2.0                     |
|Solar Pannel              |1.0                     |
|apples                    |2.0                     |

Note that this does not take into account the quantity of each product bought, just how many times client was there to buy a given item. For our last question that may be OK, because we understand the different nature of products.
But let us assume it is not OK, and we do need to count by quantity. For that we can use pivot dataframes, a feature introduced in Spark 1.6

total_products_bought_by_customer = customers.groupBy('customer_name')\

|customer_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
|         Yann|                         5|              5|   3|          2|         10|  15|           5|  null|      6|         1|
|       Yoshua|                      null|           null|   2|          4|          5|   5|        null|    10|     18|         7|
|     Geoffrey|                      null|              1|   2|          7|       null|   5|           1|     9|      5|      null|
|       Jurgen|                      null|           null|null|          3|          5|   5|        null|  null|     15|         1|


Hope this has been a good introduction, and I suggest that you further check out this great post from databricks covering similar operations.

For the last part of this tutorial series I plan to cover window functions, something that until the more recent Spark 2.0 was possible thanks to HiveContext.

Getting started with Spark in Python/Scala

This is part of a series of introductory posts about Spark, meant to help beginners getting started with it. Hope it helps!

So what’s that funky business people call Spark?

Essentially Apache Spark is a framework for distributing parallel computational (inherently iterative) work across many nodes in a cluster of servers maintaining high performance and High Availability (HA) while working with commodity servers. It abstracts core complexities that distributed computing activities are subject to (such as resource scheduling, job submission and execution, tracking, message passing between nodes, etc), and provides developers a higher level API – in Java, Python, R and Scala – to manipulate and work with data.

Why is everyone going crazzy about it? Well, thanks to some cool features (such as in-memory caching, lazy evaluation of some operations, etc), it has been doing pretty well lately in performance benchmarks.

In this article I want to avoid getting into more engineering details about Spark, and focus more on developer basic concepts; but it’s always useful to have some general idea. Spark requires scala programming language to run, and (as you probably guessed) Java Runtime Environment (JRE) and Java Development Kit (JDK) installed.

A Spark cluster is made up of two main types of processes: a driver program, and worker program (where executors run). To its core, the programming model is that the driver passes functions to be executed on the worker nodes, which eventually return a value to the driver. The worker programs can run either on cluster nodes, or on local threads, and they perform compute operations on data.

spark-cluster-overviewBootstrapping env

OK, enough chit-chat, lets get our hands dirty. In case the procrastinator side of you is preparing to hit ctrl+w with the pseudo-sophisticated argument that you can’t run spark except on a cluster, well I’ve got bad news.. Spark can be run using the built-in standalone cluster scheduler in the local mode (e.g. driver and executors processes running within the same JVM – single multithreaded instance).

The first thing a Spark program does is to create a SparkContext object (for Scala/Python, JavaSparkContext for Java, and SparkR.init for R), which tells Spark how to use the cluster resources.  Here is an example in Python:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName('My first app')
sc = SparkContext(conf = conf)

This example shows how to create a context running in local mode (using 4 threads). Lets exemplify in Scala, but now in cluster mode (e.g. connecting to an existing cluster) instead of local (usually, your pc):

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setAppName("My first app")
val sc = new SparkContext(conf)

Last, but not least, to acess Spark from the shell navigate to Spark base directory and in Scala:


Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

.. Or for PySpark (as of the time of writting of this post, I’m using locally Spark 1.5.1):


Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.1

Using Python version 3.4.3 (default, Mar 6 2015 12:06:10)
SparkContext available as sc, HiveContext available as sqlContext.

Please note that when you access Spark from the shell (only available for Scala & Python), the sparkContext is bootstrapped automatically for you (in local mode), and in Python and Scala you access this object by referencing “sc” (which, if you look carefully, the shell also instructs you).

El Kabong time

The first core concept to learn are Spark’s own data collection structure, the Resilient Distributed Datasets (RDDs). RDDs are immutable collections of records that are split and spread across cluster nodes, and saved either in memory or disk. Another interesting property is that RDDs are fault-tolerant: data can be rebuilt even on node failure.

Please give a warm welcome to RDDs, because if you want to “talk parallel lingo”, RDDs are your new best friends and currency when dealing with Spark.


The first option to create an RDD in PySpark is from a list:

my_app_name = 'hello spark';
conf = SparkConf().setAppName(my_app_name)
data = range(10) distData = sc.parallelize(data)

Please keep in mind that although the list in python is a mutable object, passing it the RDD creation will obviously have no reflection on RDDs immutable nature. Much like tuples in Python, operations you do on RDDs will create a new object. This means that our second option on how to create an RDD is.. from another already existing RDD.

This was obviously a silly example, but the important part to take into consideration is that depending on the number of nodes on your cluster, or if running locally on the number of threads specified initially in SparkContext constructor (..local[4] in our example), Spark will automatically take care of splitting RDD’s data among workers (e.g. nodes or threads).

In case you are wondering if you can change per RDD how many partitions/splits are created, the answer is yes. Both methods parallelize() and textFile (as we will see next) accept an additional parameter, where you can specify an integer.

Lets look at the same example, but in scala and specifying parallelism:

val data = 0 to 9
val distData = sc.parallelize(data, 4)

Besides collections (created in the driver program) and transformations on other RDDs, you can also create RDDs by referencing external sources, such as local file system, distributed file systems (hdfs), cloud storage (S3),  Cassandra, HBase, etc.

Here are some simple examples in Scala:

val localFileDist = sc.textFile("myfile.txt")
val hdfsFileDist = sc.textFile("hdfs://{HOST_IP}:{PORT}/path/to/file")
val s3FileDist = sc.textFile("s3n://bucket/...")

Though you can also use the SparkContext textFile() method for grapping files from AWS S3, heads up for some possible issues.

You should also keep in mind that RDDs are lazily evaluated. In other words, only when you actually perform an action (we will get into that in just a few seconds) computation on it. As a matter a fact even on the case of loading a file with the textFile() method, all you are doing is storing a pointer to the file location.

This becomes very clear if you are using the shell, and do for example the following action to return one element from the RDD to the driver:


Transformations & Actions

Now, before you get all excited to perform operations on RDDs, there is yet another important aspect to grasp. There are two main types of operations that can be applied on RDDs, namely: actions and transformations.

The key distinctions between the two is that transformations are lazily evaluated and create new RDDs from existing ones, where actions are immediately evaluated and return values to the driver program. In other words, you can think of transformations as just recipes of data, and transformations the real cooking.

Yes, in our previous example, the take() method was indeed an action. Let’s continue with our silly example to illustrate this aspect, starting with Python:

derivateData = x: x**2).filter(lambda x: x%2 == 0).map(lambda x: x+1)
result = derivateData.reduce(lambda x,y: x+y)

..last, but not least in scala:

val derivateData = => x*x).filter(x => x%2 == 0).map(x => x+1)
val result = derivateData.reduce(_ + _)

On our first line we are simply performing transformations on our base RDD (distData), namely map and filter operations.

Furthermore, if you are following along with Spark shell, then it will be clear to you that only when you execute the second line – with the action reduce – that Spark will “wake up” and perform the desired transformations on each worker node plus the reduce action, and finally return the result to the driver.

As you can see, you can conveniently chain several transformations on RDDs, which will upon an action be computed in memory fashion on worker nodes. Moreover, all of these operations – both transformations (map +filter) and action (reduce) occurred in memory, without having to store intermediate results on disk. This is one of the main secrets that make benchmarks from Spark agains Hadoop’s MapReduce jobs so disproportionately different.

However, after returning values to the driver, these RDDs will be cleared from memory for further computations. Now before you panic, the good folks at Berkley anticipated your wish, and there is a method you can call on an RDD to persist it in cache. In our example, to persist the derivateData in memory, we should call .cache() or .persist() method on it before calling any action.

In part 2, I will cover Spark DataFrames, also equally important.

In the meanwhile, where to go next? Here’s a suggestion: Berkley AMPcamp Introduction exercises.

Hope this helped!