Getting Started with Spark (part 3) – UDFs & Window functions

This post attempts to continue the previous introductory series “Getting started with Spark in Python” with the topics UDFs and Window Functions. Unfortunately it stayed marinating in my Word press for quite a while (was already more less 70% complete) and only now had the opportunity to complete it.

Note: For this post I’m using Spark 1.6.1. There are some minor differences in comparison to the new coming Spark 2.0, such as using a SparkSession object to initialize the Spark Context, instead of HiveContext as I do here. Nonetheless, the important parts are common in both.


Now up until Spark 1.6.2, the only way for you to enrich your SQL queries would be to use a HiveContext instead of Spark SQLContext.

With a HiveContext you got the same features of a SparkContext, but with some of additional advantageous, such as ability to use window functions, access to Hive UDFs, besides the ability to read data from Hive tables. For more detail, please refer here for a concise well explained answer to the differences between SQLContext and HiveContext.

OK, let us start by importing all required dependencies for this tutorial:

# python dependencies
import sys
from datetime import datetime as dt
# pyspark dependencies
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

… and initialize the HiveContext:

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)

As a reminder, our (extremely)  dummy dataset is comprised of the following (non-sense) data:

# note: to simplify, not providing the schema, Spark Df api will infer it
customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27",  "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


What if we wanted to answer a question such as: What is the cumulative sum of spending of each customer throughout time?
A way of thinking of a cumulative sum is as a recursive call where for every new period you sum the current value plus the all the previous accumulated. So one way to solve this is by using Window Functions, a functionality added back in Spark 1.4. However, let us start by adding a column with amount spent, using Spark User Defined Functions (UDFs) for that. These functions basically apply a given function to every row on one or more columns.

# create the general function
def amount_spent(quantity, price):
   Calculates the product between two variables
   :param quantity: (float/int)
   :param price: (float/int)
   return quantity * price

# create the general UDF
amount_spent_udf = udf(amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()

# Apply our UDF to the dataframe
customers02 = customers.withColumn('amount_spent', amount_spent_udf(customers['quantity'], customers['price'])).cache(), truncate=False)

|customer_name|date      |category|product_name|quantity|price|amount_spent|
|Geoffrey     |2016-04-22|A       |apples      |1       |50.0 |50.0        |
|Geoffrey     |2016-05-03|B       |Lamp        |2       |38.0 |76.0        |
|Geoffrey     |2016-05-03|D       |Solar Pannel|1       |29.0 |29.0        |
only showing top 3 rows

To compute a cumulating sum over time, we need to build a window object and specify how it should be partitioned (aka how to determine which intervals should be used for the aggregation computation, meaning which column to use), and optionally the interval to build a window.

window_01 = Window.partitionBy("customer_name").orderBy("date", "category").rowsBetween(-sys.maxsize, 0)
# note: func was the name given to functions, a Spark API for a suite of convenience functions 
win_customers01 = customers02.withColumn("cumulative_sum", func.sum(customers02['amount_spent']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|amount_spent|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0       |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |76.0        |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |
|Yann         |2016-04-13|E       |Book: The noose           |5       |125.0|625.0       |1366.0        |
|Yann         |2016-04-22|B       |Lamp                      |1       |38.0 |38.0        |1404.0        |
|Yann         |2016-04-27|D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |
|Yann         |2016-05-01|Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |
|Yann         |2016-05-03|C       |Rice                      |15      |15.0 |225.0       |1842.0        |
|Yann         |2016-05-03|D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |
|Yann         |2016-05-03|Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |
only showing top 10 rows

Cumalative sum calculation is partitioned by each customer in an interval from the beginning (-sys.maxsize effectively mean start at the very first row in that partition) until the current row (which when the aggregation function is sliding has an index of Zero), and finally ordered by date and category ascending (default).
So just to be sure we’re perfectly clear:
cumlative_sum row zero = (amount_spent row zero);
cumlative_sum row one = (cumlative_sum row zero + amount_spent row zero);
Also, and as a side note, alternatively than defining an UDF, we could specify directly the sum function over the product of two columns (as the Functions.sum is also a UDF).

# Note: instead of defining an UDF, you could alternatively specify directly 
window_01 = Window.partitionBy("customer_name").orderBy("date").rowsBetween(-sys.maxsize, 0)
win_customers01_B = customers.withColumn("cumulative_sum", func.sum(customers['price']*customers['quantity']).over(window_01)), truncate=False)

|customer_name|date      |category|product_name              |quantity|price|cumulative_sum|
|Yann         |2016-04-02|A       |bananas                   |3       |55.0 |165.0         |
|Yann         |2016-04-02|B       |Lamp                      |2       |38.0 |241.0         |
|Yann         |2016-04-03|E       |Book: Crime and Punishment|5       |100.0|741.0         |
only showing top 3 rows

If the rowsBetween() method still smells a bit funky, no worries, it will become clearer in the next example.

What about if we want to understand how much customers spend on average overall/in total (not grouped by product), throughout time? In other words, how does the average spending vary across a given time periodicy – aka: moving Average?

conf = SparkConf().setMaster("local[4]").setAppName("window-demo") \
      .set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc)


window_02 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-3, 0)
win_customers02 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_02) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|        movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|            165.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|            120.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|            247.0|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|            341.5|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|           309.75|
|         Yann|2016-04-27|       D|        Solar Pannel|       5| 29.0|       145.0|        1549.0|            327.0|
|         Yann|2016-05-01|       Y|         Motor skate|       1| 68.0|        68.0|        1617.0|            219.0|
|         Yann|2016-05-03|       C|                Rice|      15| 15.0|       225.0|        1842.0|            119.0|
|         Yann|2016-05-03|       D|         Recycle bin|       5| 27.0|       135.0|        1977.0|           143.25|
|         Yann|2016-05-03|       Y|         Motor skate|       1| 68.0|        68.0|        2045.0|            124.0|
|         Yann|2016-05-27|       A|             bananas|       3| 55.0|       165.0|        2210.0|           148.25|
|         Yann|2016-05-27|       D|         Recycle bin|       5| 27.0|       135.0|        2345.0|           125.75|
|         Yann|2016-06-07|       Z|          space ship|       1|227.0|       227.0|        2572.0|           148.75|
|       Yoshua|2016-02-07|       Z|          space ship|       2|227.0|       454.0|         454.0|            454.0|
|       Yoshua|2016-02-14|       A|             bananas|       9| 55.0|       495.0|         949.0|            474.5|
|       Yoshua|2016-02-14|       A|              apples|      10| 55.0|       550.0|        1499.0|499.6666666666667|
|       Yoshua|2016-02-14|       B|                Lamp|       2| 38.0|        76.0|        1575.0|           393.75|
|       Yoshua|2016-03-07|       Z|          space ship|       5|227.0|      1135.0|        2710.0|            564.0|
|       Yoshua|2016-04-07|       A|             bananas|       9| 55.0|       495.0|        3205.0|            564.0|
|       Yoshua|2016-04-07|       C|                Rice|       5| 15.0|        75.0|        3280.0|           445.25|
only showing top 20 rows

Before explaining how this is working, let us revisit the rowsBetween() method. Note that here we specify to compute between the interval of a maximum of 3 rows behind the current one. Alternatively we could say for example two values behind and two ahead interval:

window_03 = Window.partitionBy("customer_name").orderBy("customer_name", "date").rowsBetween(-2, 2)
win_customers03 = win_customers01.withColumn("movingAvg", func.avg(customers02['amount_spent']).over(window_03) )

|customer_name|      date|category|        product_name|quantity|price|amount_spent|cumulative_sum|movingAvg|
|         Yann|2016-04-02|       A|             bananas|       3| 55.0|       165.0|         165.0|    247.0|
|         Yann|2016-04-02|       B|                Lamp|       2| 38.0|        76.0|         241.0|    341.5|
|         Yann|2016-04-03|       E|Book: Crime and P...|       5|100.0|       500.0|         741.0|    280.8|
|         Yann|2016-04-13|       E|     Book: The noose|       5|125.0|       625.0|        1366.0|    276.8|
|         Yann|2016-04-22|       B|                Lamp|       1| 38.0|        38.0|        1404.0|    275.2|
only showing top 5 rows

The first row (247.0) is simply the current value plus the next two, devided by the total:
(165.0 + 76.0 + 500.0)/3 = 247.0

Simple, right?

Going back to how the computation is partioned, the way we structured this is to compute a moving average per customer but iterating over each event.
However let’s say we want to know how the customer spending varies on average across daily/weekly/monthly basis? For that, let’s extract those from our date column.

 |-- customer_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)
 |-- amount_spent: double (nullable = true)
 |-- cumulative_sum: double (nullable = true)

Spark automatically infered the type of our date column as being String (as we did not specify the schema when we created the Dataframe). Let’s use a UDF to cast it to datetime (using an anonymous function – lambda)

from datetime import datetime as dt
# create the general UDF
string_to_datetime = udf(lambda x: dt.strptime(x, '%Y-%m-%d'), DateType())

# Create a new column called datetime, and drop the date column
win_customers01_B = win_customers01.withColumn('datetime', string_to_datetime( win_customers01['date'])).drop('date')
# Add month and Week columns
win_customers01_C = win_customers01_B.withColumn('year', func.year( win_customers01_B['datetime'] )) \
.withColumn('month', func.month( win_customers01_B['datetime'] )) \
.withColumn('week', func.weekofyear( win_customers01_B['datetime'])), truncate=False)

|customer_name|category|product_name              |quantity|price|amount_spent|cumulative_sum|datetime  |year|month|week|
|Yann         |A       |bananas                   |3       |55.0 |165.0       |165.0         |2016-04-02|2016|4    |13  |
|Yann         |B       |Lamp                      |2       |38.0 |76.0        |241.0         |2016-04-02|2016|4    |13  |
|Yann         |E       |Book: Crime and Punishment|5       |100.0|500.0       |741.0         |2016-04-03|2016|4    |13  |
|Yann         |E       |Book: The noose           |5       |125.0|625.0       |1366.0        |2016-04-13|2016|4    |15  |
|Yann         |B       |Lamp                      |1       |38.0 |38.0        |1404.0        |2016-04-22|2016|4    |16  |
|Yann         |D       |Solar Pannel              |5       |29.0 |145.0       |1549.0        |2016-04-27|2016|4    |17  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |1617.0        |2016-05-01|2016|5    |17  |
|Yann         |C       |Rice                      |15      |15.0 |225.0       |1842.0        |2016-05-03|2016|5    |18  |
|Yann         |D       |Recycle bin               |5       |27.0 |135.0       |1977.0        |2016-05-03|2016|5    |18  |
|Yann         |Y       |Motor skate               |1       |68.0 |68.0        |2045.0        |2016-05-03|2016|5    |18  |
only showing top 10 rows

Let us group customers by spending:

customer_grp_by_day = win_customers01_C.groupBy('customer_name', 'datetime', 'year') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'datetime')

|customer_name|  datetime|year|amount_spent|
|     Geoffrey|2016-04-22|2016|        50.0|
|     Geoffrey|2016-05-03|2016|       330.0|
|     Geoffrey|2016-06-05|2016|       525.0|
|     Geoffrey|2016-06-15|2016|       601.0|
|       Jurgen|2016-05-01|2016|       502.0|
|       Jurgen|2016-05-08|2016|       343.0|
|       Jurgen|2016-06-05|2016|       621.0|
|         Yann|2016-04-02|2016|       241.0|
|         Yann|2016-04-03|2016|       500.0|
|         Yann|2016-04-13|2016|       625.0|
|         Yann|2016-04-22|2016|        38.0|
|         Yann|2016-04-27|2016|       145.0|
|         Yann|2016-05-01|2016|        68.0|
|         Yann|2016-05-03|2016|       428.0|
|         Yann|2016-05-27|2016|       300.0|
|         Yann|2016-06-07|2016|       227.0|
|       Yoshua|2016-02-07|2016|       454.0|
|       Yoshua|2016-02-14|2016|      1121.0|
|       Yoshua|2016-03-07|2016|      1135.0|
|       Yoshua|2016-04-07|2016|       977.0|

Next, let’s check per customer visit (assuming each customer does not visit the store more than once), how much the customer’s pending progresses, with a 7 iterations back interval average:

window_04 = Window.partitionBy("customer_name").orderBy("customer_name", "datetime").rowsBetween(-7, 0)
win_customers04 = customer_grp_by_day.withColumn("movingAvg", func.avg(customer_grp_by_day['amount_spent']).over(window_04))
|customer_name|  datetime|year|amount_spent|         movingAvg|
|         Yann|2016-04-02|2016|       241.0|             241.0|
|         Yann|2016-04-03|2016|       500.0|             370.5|
|         Yann|2016-04-13|2016|       625.0| 455.3333333333333|
|         Yann|2016-04-22|2016|        38.0|             351.0|
|         Yann|2016-04-27|2016|       145.0|             309.8|
|         Yann|2016-05-01|2016|        68.0|             269.5|
|         Yann|2016-05-03|2016|       428.0|292.14285714285717|
|         Yann|2016-05-27|2016|       300.0|           293.125|
|         Yann|2016-06-07|2016|       227.0|           291.375|
|       Yoshua|2016-02-07|2016|       454.0|             454.0|
|       Yoshua|2016-02-14|2016|      1121.0|             787.5|
|       Yoshua|2016-03-07|2016|      1135.0| 903.3333333333334|
|       Yoshua|2016-04-07|2016|       977.0|            921.75|
|     Geoffrey|2016-04-22|2016|        50.0|              50.0|
|     Geoffrey|2016-05-03|2016|       330.0|             190.0|
|     Geoffrey|2016-06-05|2016|       525.0| 301.6666666666667|
|     Geoffrey|2016-06-15|2016|       601.0|             376.5|
|       Jurgen|2016-05-01|2016|       502.0|             502.0|
|       Jurgen|2016-05-08|2016|       343.0|             422.5|
|       Jurgen|2016-06-05|2016|       621.0| 488.6666666666667|

Let us group customers by weekly spending:

customer_grp_by_week = win_customers01_C.groupBy('customer_name', 'year', 'week') \
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'week')
|     Geoffrey|2016|  16|        50.0|
|     Geoffrey|2016|  18|       330.0|
|     Geoffrey|2016|  22|       525.0|
|     Geoffrey|2016|  24|       601.0|
|       Jurgen|2016|  17|       502.0|
|       Jurgen|2016|  18|       343.0|
|       Jurgen|2016|  22|       621.0|
|         Yann|2016|  13|       741.0|
|         Yann|2016|  15|       625.0|
|         Yann|2016|  16|        38.0|
|         Yann|2016|  17|       213.0|
|         Yann|2016|  18|       428.0|
|         Yann|2016|  21|       300.0|
|         Yann|2016|  23|       227.0|
|       Yoshua|2016|   5|       454.0|
|       Yoshua|2016|   6|      1121.0|
|       Yoshua|2016|  10|      1135.0|
|       Yoshua|2016|  14|       977.0|

And computing the weekly moving average:

window_05 = Window.partitionBy('customer_name').orderBy('customer_name', 'week', 'year').rowsBetween(-4, 0)
win_customers05 = customer_grp_by_week.withColumn("movingAvg", func.avg(customer_grp_by_week['amount_spent']).over(window_05))
|customer_name|year|week|amount_spent|        movingAvg|
|         Yann|2016|  13|       741.0|            741.0|
|         Yann|2016|  15|       625.0|            683.0|
|         Yann|2016|  16|        38.0|            468.0|
|         Yann|2016|  17|       213.0|           404.25|
|         Yann|2016|  18|       428.0|            409.0|
|         Yann|2016|  21|       300.0|            320.8|
|         Yann|2016|  23|       227.0|            241.2|
|       Yoshua|2016|   5|       454.0|            454.0|
|       Yoshua|2016|   6|      1121.0|            787.5|
|       Yoshua|2016|  10|      1135.0|903.3333333333334|
|       Yoshua|2016|  14|       977.0|           921.75|
|     Geoffrey|2016|  16|        50.0|             50.0|
|     Geoffrey|2016|  18|       330.0|            190.0|
|     Geoffrey|2016|  22|       525.0|301.6666666666667|
|     Geoffrey|2016|  24|       601.0|            376.5|
|       Jurgen|2016|  17|       502.0|            502.0|
|       Jurgen|2016|  18|       343.0|            422.5|
|       Jurgen|2016|  22|       621.0|488.6666666666667|

Finally, let us move to monthly groupping and calculations.

customer_grp_by_month = win_customers01_C.groupBy('customer_name', 'year', 'month')\
.agg({'amount_spent': 'sum'}) \
.withColumnRenamed('sum(amount_spent)', 'amount_spent') \
.orderBy('customer_name', 'month')
|     Geoffrey|2016|    4|        50.0|
|     Geoffrey|2016|    5|       330.0|
|     Geoffrey|2016|    6|      1126.0|
|       Jurgen|2016|    5|       845.0|
|       Jurgen|2016|    6|       621.0|
|         Yann|2016|    4|      1549.0|
|         Yann|2016|    5|       796.0|
|         Yann|2016|    6|       227.0|
|       Yoshua|2016|    2|      1575.0|
|       Yoshua|2016|    3|      1135.0|
|       Yoshua|2016|    4|       977.0|


# This shows how much the customer's pending progresses across months, with a 3 iterations back interval avg

window_06 = Window.partitionBy('customer_name').orderBy('customer_name', 'month', 'year').rowsBetween(-3, 0)
win_customers06 = customer_grp_by_month.withColumn("movingAvg", func.avg(customer_grp_by_month['amount_spent']).over(window_06))
|customer_name|year|month|amount_spent|        movingAvg|
|         Yann|2016|    4|      1549.0|           1549.0|
|         Yann|2016|    5|       796.0|           1172.5|
|         Yann|2016|    6|       227.0|857.3333333333334|
|       Yoshua|2016|    2|      1575.0|           1575.0|
|       Yoshua|2016|    3|      1135.0|           1355.0|
|       Yoshua|2016|    4|       977.0|           1229.0|
|     Geoffrey|2016|    4|        50.0|             50.0|
|     Geoffrey|2016|    5|       330.0|            190.0|
|     Geoffrey|2016|    6|      1126.0|            502.0|
|       Jurgen|2016|    5|       845.0|            845.0|
|       Jurgen|2016|    6|       621.0|            733.0|

As usual, I suggest further checking these sources:



Getting through Deep Learning – CNNs (part 1)

The number of available open source libraries making Deep learning easier to use is spreading fast as hype continuous to build. However, without understanding the background principles, it just feels like poking around a black box.

In this post (or several, most likely) will try to give an introduction to Convolution Neural Networks (CNNs). Note that, for the sake of brevity, I assume that you already know the basics about Neural Networks. If not, I would suggest you go through the following introduction.

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)
  3. Getting through Deep Learning – TensorFlow intro (part 3)

Disclaimer: this post uses images and formulas from distinct sources. I would suggest to have a look over the complete list of sources at the end of the post, as usual.


In 1958 and 1959 David H. Hubel and Torsten Wiesel performed a series of experiments, whereby they concluded that many neurons in the visual cortex focus on a limited region in the vision field.

This insight provided the notion of a local receptive field – a narrow sub-region of what is available in the whole visual field which serves as input – thus giving rise for a different architecture than the previously fully connected neural network architecture.

Basics – Convolution Layer

The first thing to realize is that Convolution networks are simply the application of “mini-neural networks” to segments of input space. In the case of images, that results in that neurons in the first convolutional layer are not connected to every single pixel in their Receiptive Field (RF).  The following image (source) shows an illustration of how a a a convolution layer is built using an image from the famous MNIST dataset – whereby the goal consists in identifyying the digits from handwritten numbers pictures.



OK, let’s break this down. In the MNIST dataset each, each image is 28 by 28 pixels – respectively height and width. For a fully connected neural network, the input space for the first layer of 28 x 28 = 728px if we were only to include height and width.

However, in a so-called convolution, you would instead apply a mini-neural network to just a single portion/segment of the image – let’s say a 3×3 px (width and height) rectangle. This 3×3 receptive field is also often refered to as a filter or kernel in Deep Learning (DL) lingo.

Next you would slide that kernel over the image, let us say 1px to the left in each step until we reach the far right end, and then 1px down until we reach the lower bound. The following animated illustration – credits go to Vincent Dumoulin and Francesco Visin with awesome CNN architectual overview available here  – shows the building of a convolution layer using this 3×3 Receptive Field/Filter/Kernel step-by-step building what is called a Feature Map – a layer full of neurons using the same filter.



Thus a Feature Map can also be thought of a multitude of these mini-Neural Networks – whereby each filter – the dark blue rectangle – has its own weights, bias term, and activation function, and produces one output (darker green square).

The following illustration shows a detailed view of the progressive application of these mini neural network across filters of the initial input space -again credits go to Vincent Dumoulin and Francesco Visin – and producing a Feature map.


Note that the illustrations used an iterative sliding of a kernel of 3×3 in a step of 1 px. However this is not a requirement, as one can use for example a step size of 2, reducing the dimention of output space. By now you should be realizing that indeed this step size – called the stride in DL lingo – is yet another hyper parameter, just as the filter size.

OK, we are almost completed with the basic concepts related to a CNNs: we take a local receptive field  – which we call filter/kernel – slide it through an image in given step size – which we call stride – and produce a set of mini neural networks – which we call a feature map.

The missing detail that builds uppon the previous knowledge is the fact that we can simultaneously use different filters to the same input space, thus producing several feature maps as a result. The only restriction is that feature maps in a given layer have the same dimention. The following illustration from the excellent book “Hands-On Machine Learning with Scikit-Learn and TensorFlow”  gives more insight to the intuition of stacking multiple feature maps.


Until so far we have represented a convolution layer in a thin 2-D output space (1 single feature map). However, if one produces several feature maps in one layer, the result is a 3-D volume.  And this is when the notion of convolution comes into place: the output of this convolution will have a new height, width and depth.



It is thus frequent to see convolutional networks illustrated as “tall and skinny” boxes (aka with high values of height and low of depth), and progressively getting “shorter and fatter” (smaller height and bigger depth).



Basics – Pooling Layer

Before we go into more detail about different architectures a Convolution Layer may have (in the next post of this series), it is important to cover some ground aspects. To start, one is that you are not restricted to use Convolution Layer when creating a new hidden layer. In fact there are two more main types of layers: Pooling Layer (which we will cover in just a bit) and Fully-Connected layer (exactly as a regular Neural Network).

Also note that similarly to regular Neural Networks, where you can have as many hidden layers as you want, the same goes the CNN. That is, you can build convolutions that serve as input space for a next convolution layer or a pooling layer for example,  and so on.

Finally, and again similarly to Neural Networks, the last fully-connected layer will always contain as many neurons as the number of classes to be predicted.

Typical Activation functions

Again to be perfectly clear, what we do in each step of the sliding filter in a convolution is to a apply the dot product of a given set of weights (which we are trying to tune with training), plus a given bias. This is effectively a linear function represented in the following form:


where the weights is a vector represented by W, Xi would be the pixel values in our example inside a given filter, and b the bias. So what usually happens (except from pooling)  is that we pass the output of this function to a neuron, which will then apply given activation function. The typical activation functions that are usually implemented are:

Softmax – is a generalized form of the logistic function/sigmoid function, which turns the outputs into probabilities (thus comprising in interval between 0 and 1).


ReLU – Rectified Linear Unit functions have a smoothing effect on the output, making results always bigger than zero.


Tanh – hyperbolic tangent function, which enables activation functions to range from -1 to +1.


Typical Cost Functions

As you know, cost functions are what makes the whole training of models possible. Here are three of the main, where cross entropy is probably the most frequently used.

Mean Squared Error –  used to train linear regression models


Hinge Loss – used to train Support Vector Machines (SVM) models



Cross entropy – used to train logistic regression models



Beyond activation functions applied to convolutions, there are also some other useful tricks applied to build a Deep Neural Network (DNN), which address the well known problem of over-fitting.

Pooling Layer – bottom line, pooling layers are used to reduce dimension. They sample from input space also using a filter/kernel with a given output dimension, and simply applying a reduce function. Usually a max()  – usually called max pooling – or mean()  – average pooling – functions.

For example, if one of kernels was a 2×2 matrix with the values [ [1,2], [3,4]], then max pooling would yield 4 as an output, where average pooling would yield 2.5 .

Dropouts -dropouts goal is exactly the same as regularization (it is after all a regularization technique); that is, it is intended to reduce over-fitting on outer sample. Initially it was used to simply turn off passing a portion of output of neurons at every iteration during training. That is, instead of passing all weights dot product computed against the input layer, we randomly (with a given specifiable probability) consciously decide to not add a given set of weights to the output layer.

In case you are wondering if this is similar trick to bagging that Random Forests does to Decision Trees, the answer would be not quite. The operation of averaging through lots of Decision Trees (wich have high propensity to over-fit data) using  sampling  with replacement is computationally doable (at today’ standards). However the same does not hold true to train distinct DNNs. So the dropouts technique is a practical method to average internally the outputs among layers of a network.

Final Notes

In part 2 I plan to get into more detail about convolution architectures, as well as provide a coding example in order to bring all these concepts home.

However, I didn’t want to terminate this post without any code snippet. So even though not going to go through it, just as a demonstration that with the concepts covered before you can already have some intuition around it, here is a code snippet where a training session with Google’s deep learning library TensorFlow is defined with two convolution layers:

As usual, here are the sources used for this post:

Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation.

Rewind to Spark – 1.6.2

Unlike Apache Flink, where batch jobs are the exception and Streaming the default, Apache Spark uses the exact opposite logic: Streaming is the exception of batch. To make this more clear, the way Spark Streaming works is by dividing live data streams of data into batches (sometimes called microbatches). The continuous streams are represented as a high level abstraction called discretized stream or DStream, which internally is represented as a sequence of RDDs.

Some things in common between RDDs and DStreams are that DStreams are executed lazily by the output operations – the equivalent to Actions in RDDs. For example, in the classical word count example, the print method would be the output operation.

Nonetheless, even though the main abstraction to work around was DStreams on top of RDDs, it was already possible to work with Dataframes (and thus SQL) on streaming data before.


Changes in Spark 2.0 – introducing “Structured Streaming”

So  the new kid on the block is structured streaming, a feature still in alpha for Spark 2.0. It’s marketing slogan: “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming“. In other words, this is an attempt to simplify the life of all developers, struggling with the three main general problems in streaming, while bringing a streaming API for DataFrames and DataSets.

Note that now the streaming API has been unified with Batch API, which means that you should expect (almost always) to use same method calls exactly as you would in normal batch API. Of course, some minor exceptions, such as reading from file and writing to file,  require extra method call. Here is a copy paste from the examples shown at Spark Submit in the “A deep dive into structured streaming” presentation (highly recommendable).

Syntax for batch more:

input ="json").load("source-path")
result ="device", "signal").where("signal > 15")

Now for structured streaming:

input ="json").stream("source-path")
result ="device", "signal").where("signal > 15")

In case you didn’t notice the difference (that’s good in this case), what changed is instead of load() of a file, you continuously stream(), and same logic to write with startStream(). Pretty cool, right?

Also note that the base premise of  spark’s streaming discussed before still stays; that is, micro-batching still stays.

Repeated Queries on input table

So besides unifying the API, where’s the magic sauce? The main concept is this new abstraction where streaming data is viewed as if it were much like a table in a relational DB. In other words new incoming data is treated as a new row inserted on an unbounded input table.

The developer has three tasks in the middle of this whole streaming party.

  1. One is to define a query that acts repeatedly on this input table, and produces a new table called result table that will be written to an output sink. This query will be analyzed under the hood by spark (as explained bellow) by the query planner into an execution plan, and Spark will figure out on its own what needs to be maintained in the input table to update the result table.
  2. The second is to specify the triggers that control when to update the results.
  3. Define an output mode, so that spark knows how it should write to an external system (such as S3, HDFS or database). Three alternatives: append (only new rows of result table will be written), complete (full result table will be flushed), update (only rows that were updated since last trigger will be changed)

Structured Streaming under the hood

Under the hood (and when I say “under the hood” I mean what is done for you that you don’t need implement in your code – but yes, you should nonetheless care about it) structured streaming are possible due to some considerable changes. The first is that the Query planner (the same for batch mode) generates a continuous series of multiple incremental execution plans for each processing of the next chunk of streaming data (for example, when pooling from Kafka this would mean the new range of offsets).

Now this can get particularly tricky because each execution plan needs to take into consideration the previous aggregations that were held by previous executions. So these continuous aggregations’ state is maintained internally as an in-memory stated, and backed by  Write Ahead Log (WAL) in the file system (for Fault Tolerance). So yes, the Query Planner is FT by storing tracks of offsets of each execution into a distributed file system such as HDFS.

Moreover on FT, the sinks are also idempotent, which means that one should expect that they handle re-executions to avoid double committing the output.

Finally, before you get all excited, I do want to emphasize that at the moment, this is still in alpha mode, and is explicitly not recommended for production environments.

Not enough?, bring on more resources:


Overview HP Vertica vs AWS Redshift

While working in HP some years ago, I was exposed to not only internal training materials, but also a demo environment. I still remember the excitement when HP acquired Vertica Systems in 2011, and we had a new toy to play with… Come on, you can’t blame me, distributed DBs was something only the cool kids were doing.

Bottom line is that it’s been a while since I laid eyes on it… Well recently, while considering possible architectural solutions, I had the pleasure  to revisit Vertica. And since AWS Redshift has been gaining a lot of popularity and we’re also using it at some of our clients, I thought I might give some easy summary to help others.

Now if you’re expecting a smack down post, then I’m afraid I’ll disappoint you – for that you have the internet. If experience has taught me  something is that – in the case of top-notch solutions there are only use cases, and one finds the best fitting one.

They share some properties in terms of architecture in internal engine:

  • Massively Parallel Processing (MPP) architecture: data is distributed among distinct nodes in shared nothing architecture, leveraging scale out and; in case you’re wondering how it compares to hadoop Hive, AirBnB did a smack-down comparison, concluding around 5x advantage of Redshift over Hive;
  • High availability (HA): this follows the first, thanks to to data replication mechanism; in the case of Vertica, they call it”k-safety” for measuring replication factor; and you may also want to check fault groups to control how data is replicated according to physical distribution (such as server rack location, power circuits, etc); same automatic data replication among nodes happens with Redshift under the hood (besides more goodies such as backups)
  • columnar oriented data store: for analytical applications/OLAP (where queries usually select only specific columns in opposition to OLTP), it is usually much faster mainly due to a) does not need to scan whole row and then discard the unnecessary content, b) higher efficiency in compression/encoding mechanisms due to similar data types; for more detailed explanation, I suggest here. Both Vertica and Redshift are built with this architecture;
  • Data compression: Vertica mixes encoding strategies, depending on column data type, table cardinality, and sort order; they do distinguish a difference between encoding and compression, since it will operate directly on encoded data whenever possible, which does not hold true for compression; Redshift does not make such a distinction, and recommends leaving compression to auto mode, although you can choose encoding type;
  • SQL standard interface: as always, minor differences are present, but bottom line you can use the SQL syntax that you’re already accustomed to
  • User Defined Functions (UDFs): both Redshift and Vertica give you space for customization;
  • In-memory DB: Nop, none of these are like SAP HANA, Oracle TimesTen or IBM’s SolidDB

Where they differ:

  • Architecture: in Vertica all nodes are “created equally”, meaning they share similar functions; Redshift has the concept of a leader node, a dedicated node which manages workload and query coordination among worker nodes;
  • Management: (this is a key differentiator that most likely that the biggest weight in the final decision) with Vertica you have to do all the ops work (install, upgrade/update, configure nodes, etc.);  Redshift is a fully managed Cloud solution, where you only have pure Database related ops work; Note: yes, HP provides an AMI to easily kickstart projects in AWS cloud, but come on, this is still not the same thing;
  • Freedom of environment: with Redshift you’re locked in to AWS; with Vertica you can run it wherever you feel like;
  • Schema Design: Vertica provides you with a Designer Tool to easily migrate from traditional RDBMS systems based on their schema (not saying this saves the world, but can be helpful); this is specially important in the beginning, since columnar data warehouses don’t support indexes; so in Vertica you play with the projection concept, in Redshift with distribution and sort keys (and you better do it well, as it will be key for performance and keeping things balanced)
  • Payment scheme: In Vertica (for data bigger than 1TB, which most certainly is the case) you pay upfront licensing, plus the cost of the machines where you’re running them; with Redshift costs all diluted into an hourly cost, and that’s it;
  • Compiled code: Redshift claims that the leader node compiles the code for optimal performance on execution time, which the guys at Cake also confirm with this excellent post;
  • Free Trial/usage: Vertica lets you have up to 3 nodes and 1TB of data;  Redshift, on the other hand, in case you’re account is still electable for the free usage tier (in the first year), you can try for a total of 750 normalized instance hours per month, enough for running continuously one DC1.Large single node, with 160TB SSD storage
  • Add-Ons: Vertica Pulse for sentiment analysis and Place for geospatial data analysis;

Finally, you might want to go deeper. Again, I really suggest this excellent post by Cake, which provides performance benchmarks. Benchmarks are always disputable; but still, it is always interesting and important comparison method.

Check out OpenFace

If you’re interested in using machine learning (ML) on image and video datasets, then you might be interested in heaving a look on a relatively new project called OpenFace (first released in October 2015), with  Brandon Amos, Ludwiczuk Bartosz and Mahadev Satyanarayanan as authors.

TL;DR: For the impatient

  • Pitch me: Open source project (aka free for you to use) developed inside Carnegie Mellon University  for face recognition with deep neural networks, with a Python API
  • What do I get from it: improved accuracy and reduced training time
  • Need to see to believe (and so one should)? You can start playing with it with Docker, and check the provided demos

What about it

Even though face recognition research has already started since the 1970s, it is still far from stagnant. The usual strategy for solving the problem has been divided into three main steps; given an image with a set of faces, first run face detection algorithm to isolate the faces from the rest, then preprocess this cropped part to reduce the high dimensionality, and finally classification. However, what makes this whole process so challenging is that many factors can create noise around this process, such as images can be taken from different angles, different lighting conditions, the face itself also suffers changes throughout time (for example due to age or style), etc.

Now one important fact to point out is that state of the art top performing algorithms are using convolutional neural networks. OpenFace is inspired by Facebook’s DeepFace and (mainly) Google’s FaceNet systems. The performance smack down that the authors present using the “Labeled Faces in the wild” dataset (LFW) for eveluation, and achieved some interesting results.

Another interesting point is that, as the authors state, the implementation is tuned for using the model in mobile devices, so the  “[…] key design consideration is a system that gives high accuracy with low training and prediction times“.

Note: In case you are wondering what’s the difference to OpenBiometrics (OpenBR). As stated by the authors of OpenFace in HackerNews, the main difference lies on the approach taken – deep convolutional networks – and could potentially be integrated into OpenBR’s pipeline.

Internal Guts

As you might imagine (as any image/video processing package), dependencies are complex and time consuming, so prepare yourself for some dependencies troubleshooting in case your machine is still new to this world.

The project’s API is written in Python 2 – entry point here – given its dependencies on OpenCV and DLib. OpenCV provides the computer vision base, DLib enhances OpenCV face detection ability, numpy for matrix algebra operations and scikit-learn for classification operations.

For training the convolutional network openface uses Torch, Lua and Luajit which is written in Lua programming language. In this case, Torch allows the neural networks to be executed either in CPU or CUDA enabled GPUs.

The following illustration was extracted from the recent technical report “OpenFace: A general-purpose face recognition

library with mobile applications“, by the authors, and provides interesting insight:


So important to note is that you do have the option to use already pretrained models (which use the CASIA-WebFace and FaceScrub databases) to help with face detection, which you can find in the models directory. The provided bash script downloads them.

Where to get started

To setup either locally or with Docker you can check the provided documentation.

Finally, you might also be interested in having a look at other projects using deep neural networks for face recognition:  Visual Geometry Group (VGG) Face Descriptor, and Lightened Convolutional Neural Networks (CNNs)


Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys.

So let us start on Spark Redshift methods.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc) 

rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'

# Create spark dataframe through a Redshift query
df = \
 .format('com.databricks.spark.redshift') \
 .option('url', rs_url) \
 .option('query', rs_query) \
 .option('tempdir', rs_tmp_dir) \
 .option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
 .option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
 .option('temporary_aws_session_token', sts_credentials.get('Token')) \


OK, until here almost only spark related code in Python.  Now the only thing you might be wondering is what the hell is that “sts_credentials” map?  Well spotted. The following snippet will reveal it.

import requests
from requests.exceptions import HTTPError, Timeout

# Optionally define a default ec2 instance role for EMR instances

def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
  """ Retrieves temp AWS credentials """
  query_uri = '{}'.format(role)
  print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
    if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
      print('Successfully retrieved temp AWS credentials.')
      return sts_credentials
print('There was a problem when retrieving temp credentials '
         'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
   except (HTTPError, ConnectionError, Timeout) as err:
     msg = 'Unable to query AWS for credentials: {}'.format(err)
   except ValueError as err:
     msg = 'Error: unable to decode json from \'None\' value - {} ' \
           '(hint: most likely the role you are using is wrong)'.format(err)
   except Exception as err:
     msg = 'Failed to get AWS role temp credentials: {}'.format(err)

# to use this function, you might do something like the following:
role = 'my-emr-cluster-role'
sts_credentials = get_temp_credentials(role=role)
aws_access_key_id = sts_credentials.get('AccessKeyId')
aws_secret_access_key = sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')


Yes, essentially this peace of code is just doing an http request  to an AWS service. In case you’re getting suspicious of the black magic looking address – “” – let me tell you in advance that this is a handy service to provide meta-information about your instances.

As a side note, and if you are indeed using pyspark jobs, you might want to give more flexibility whether your testing your code in local, or actually running the job in an AWS cluster. So, given the fact that the previous snippet will ONLY work if you run it on a AWS ec2 instance AND that instance is assigned the correct IAM role, here is a simple function which either passes credentials directly (if in local) or uses Role to ask for temp credentials to AWS.

def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
    """ Returns temp AWS credentials present in an AWS instance
    Note: only works on AWS machines!
    :param role: (str) AWS instance role
    :param local_aws_access_key_id: (str) optional param for local testing/dev
    :param local_aws_secret_access_key: (str) optional param for local testing/dev
    :param opts: ()
            (str) temp credentials to be used in query
    if not role:
    sts_credentials = get_temp_credentials(role=role) or dict()
    aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
    aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
    token = sts_credentials.get('Token')
    redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
    if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
        redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
    return redshift_credentials


Here is a complete gist of this code:

Finally some potential gotcha’s: the temporary key have by default a 30 minutes limit, which you can extend initially when you request the temp keys.  In any case, you might want to take that into consideration on jobs that can potentially be long running.




Distcp S3-hdfs for eu-central-1 (AWS Frankfurt) – details “behind the trenches”


Distcp (distributed copy) is a fairly old tool used to move a large quantity of files usually within hdfs, using MapReduce job to do so where mappers list the source files and reducers do the copy heavy lifting. Another useful integration is that it can also deal with file migrations between hdfs and AWS S3. In the meanwhile S3Distcp came along as an specific enhancement for speeding up these last kind of migrations.

However, what might be new for you is the requirement to use either one of these tools to move data from/to a bucket located in S3 Frankfurt region.   So this post intends to save you that half an hour of painful google research trying to figure out why your Oozie job is failing.

Without further ado, before you dive in to your Cloudera or Hortonworks Manager to edit hadoop properties in core-site.xml, I would suggest that you SSH to one of your hadop nodes, and start by testing running distcp command. Here is the sintax for moving data from S3 to HDFS:

hadoop distcp -Dfs.s3a.awsAccessKeyId=XXX \
-Dfs.s3a.awsSecretAccessKey=XX \ s3a://YOUR-BUCKET-NAME/ \

So yeah, in case you haven’t noticed, the black magic pieces in this command that makes things work just like in other regions are: the “fs.s3a.endpoint” property, as well as the “a” after “s3”.

Also in case you are moving data from hdfs to S3, besides write access, make sure you also give remove permissions on your AWS policy attaches to  the given user account on for that S3 bucket (or specific path) . This is because Distcp produces temporary files adjacent to your bucket target file migration, which you want to allow the reducers to remove in the end. Here is an example of an AWS policy for that effect:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:s3:::*"
            "Effect": "Allow",
            "Action": [
            "Resource": [
                "arn:aws:s3:::YOUR-BUCKET" ]

Alternatively you might also want to test it using S3Distcp tool. Here is another example:

 hadoop jar s3distcp.jar --src /data/ \
--dest s3a://YOUR-BUCKET-NAME/ \

Note that the s3distcp jar needs to be locally on the host file system from which you are running the command. If you have the jar in hdfs, here is an example how you can fetch it:

hadoop fs -copyToLocal /YOUR-JARS-LOCATION-IN-HDFS/s3distcp.jar \

Also there is also the possibility you might stumble uppon some headaches with S3Distcp tool. This might prove itself to be handy.

Hope this helped! Where to go next? AWS has a handy best practice guide listing different strategies for data migration here.