Getting through Deep Learning – CNNs (part 1)

The number of available open source libraries making Deep learning easier to use is spreading fast as hype continuous to build. However, without understanding the background principles, it just feels like poking around a black box.

In this post (or several, most likely) will try to give an introduction to Convolution Neural Networks (CNNs). Note that, for the sake of brevity, I assume that you already know the basics about Neural Networks. If not, I would suggest you go through the following introduction.

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)
  3. Getting through Deep Learning – TensorFlow intro (part 3)

Disclaimer: this post uses images and formulas from distinct sources. I would suggest to have a look over the complete list of sources at the end of the post, as usual.


In 1958 and 1959 David H. Hubel and Torsten Wiesel performed a series of experiments, whereby they concluded that many neurons in the visual cortex focus on a limited region in the vision field.

This insight provided the notion of a local receptive field – a narrow sub-region of what is available in the whole visual field which serves as input – thus giving rise for a different architecture than the previously fully connected neural network architecture.

Basics – Convolution Layer

The first thing to realize is that Convolution networks are simply the application of “mini-neural networks” to segments of input space. In the case of images, that results in that neurons in the first convolutional layer are not connected to every single pixel in their Receiptive Field (RF).  The following image (source) shows an illustration of how a a a convolution layer is built using an image from the famous MNIST dataset – whereby the goal consists in identifyying the digits from handwritten numbers pictures.



OK, let’s break this down. In the MNIST dataset each, each image is 28 by 28 pixels – respectively height and width. For a fully connected neural network, the input space for the first layer of 28 x 28 = 728px if we were only to include height and width.

However, in a so-called convolution, you would instead apply a mini-neural network to just a single portion/segment of the image – let’s say a 3×3 px (width and height) rectangle. This 3×3 receptive field is also often refered to as a filter or kernel in Deep Learning (DL) lingo.

Next you would slide that kernel over the image, let us say 1px to the left in each step until we reach the far right end, and then 1px down until we reach the lower bound. The following animated illustration – credits go to Vincent Dumoulin and Francesco Visin with awesome CNN architectual overview available here  – shows the building of a convolution layer using this 3×3 Receptive Field/Filter/Kernel step-by-step building what is called a Feature Map – a layer full of neurons using the same filter.



Thus a Feature Map can also be thought of a multitude of these mini-Neural Networks – whereby each filter – the dark blue rectangle – has its own weights, bias term, and activation function, and produces one output (darker green square).

The following illustration shows a detailed view of the progressive application of these mini neural network across filters of the initial input space -again credits go to Vincent Dumoulin and Francesco Visin – and producing a Feature map.


Note that the illustrations used an iterative sliding of a kernel of 3×3 in a step of 1 px. However this is not a requirement, as one can use for example a step size of 2, reducing the dimention of output space. By now you should be realizing that indeed this step size – called the stride in DL lingo – is yet another hyper parameter, just as the filter size.

OK, we are almost completed with the basic concepts related to a CNNs: we take a local receptive field  – which we call filter/kernel – slide it through an image in given step size – which we call stride – and produce a set of mini neural networks – which we call a feature map.

The missing detail that builds uppon the previous knowledge is the fact that we can simultaneously use different filters to the same input space, thus producing several feature maps as a result. The only restriction is that feature maps in a given layer have the same dimention. The following illustration from the excellent book “Hands-On Machine Learning with Scikit-Learn and TensorFlow”  gives more insight to the intuition of stacking multiple feature maps.


Until so far we have represented a convolution layer in a thin 2-D output space (1 single feature map). However, if one produces several feature maps in one layer, the result is a 3-D volume.  And this is when the notion of convolution comes into place: the output of this convolution will have a new height, width and depth.



It is thus frequent to see convolutional networks illustrated as “tall and skinny” boxes (aka with high values of height and low of depth), and progressively getting “shorter and fatter” (smaller height and bigger depth).



Basics – Pooling Layer

Before we go into more detail about different architectures a Convolution Layer may have (in the next post of this series), it is important to cover some ground aspects. To start, one is that you are not restricted to use Convolution Layer when creating a new hidden layer. In fact there are two more main types of layers: Pooling Layer (which we will cover in just a bit) and Fully-Connected layer (exactly as a regular Neural Network).

Also note that similarly to regular Neural Networks, where you can have as many hidden layers as you want, the same goes the CNN. That is, you can build convolutions that serve as input space for a next convolution layer or a pooling layer for example,  and so on.

Finally, and again similarly to Neural Networks, the last fully-connected layer will always contain as many neurons as the number of classes to be predicted.

Typical Activation functions

Again to be perfectly clear, what we do in each step of the sliding filter in a convolution is to a apply the dot product of a given set of weights (which we are trying to tune with training), plus a given bias. This is effectively a linear function represented in the following form:


where the weights is a vector represented by W, Xi would be the pixel values in our example inside a given filter, and b the bias. So what usually happens (except from pooling)  is that we pass the output of this function to a neuron, which will then apply given activation function. The typical activation functions that are usually implemented are:

Softmax – is a generalized form of the logistic function/sigmoid function, which turns the outputs into probabilities (thus comprising in interval between 0 and 1).


ReLU – Rectified Linear Unit functions have a smoothing effect on the output, making results always bigger than zero.


Tanh – hyperbolic tangent function, which enables activation functions to range from -1 to +1.


Typical Cost Functions

As you know, cost functions are what makes the whole training of models possible. Here are three of the main, where cross entropy is probably the most frequently used.

Mean Squared Error –  used to train linear regression models


Hinge Loss – used to train Support Vector Machines (SVM) models



Cross entropy – used to train logistic regression models



Beyond activation functions applied to convolutions, there are also some other useful tricks applied to build a Deep Neural Network (DNN), which address the well known problem of over-fitting.

Pooling Layer – bottom line, pooling layers are used to reduce dimension. They sample from input space also using a filter/kernel with a given output dimension, and simply applying a reduce function. Usually a max()  – usually called max pooling – or mean()  – average pooling – functions.

For example, if one of kernels was a 2×2 matrix with the values [ [1,2], [3,4]], then max pooling would yield 4 as an output, where average pooling would yield 2.5 .

Dropouts -dropouts goal is exactly the same as regularization (it is after all a regularization technique); that is, it is intended to reduce over-fitting on outer sample. Initially it was used to simply turn off passing a portion of output of neurons at every iteration during training. That is, instead of passing all weights dot product computed against the input layer, we randomly (with a given specifiable probability) consciously decide to not add a given set of weights to the output layer.

In case you are wondering if this is similar trick to bagging that Random Forests does to Decision Trees, the answer would be not quite. The operation of averaging through lots of Decision Trees (wich have high propensity to over-fit data) using  sampling  with replacement is computationally doable (at today’ standards). However the same does not hold true to train distinct DNNs. So the dropouts technique is a practical method to average internally the outputs among layers of a network.

Final Notes

In part 2 I plan to get into more detail about convolution architectures, as well as provide a coding example in order to bring all these concepts home.

However, I didn’t want to terminate this post without any code snippet. So even though not going to go through it, just as a demonstration that with the concepts covered before you can already have some intuition around it, here is a code snippet where a training session with Google’s deep learning library TensorFlow is defined with two convolution layers:

As usual, here are the sources used for this post:

Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation.

Rewind to Spark – 1.6.2

Unlike Apache Flink, where batch jobs are the exception and Streaming the default, Apache Spark uses the exact opposite logic: Streaming is the exception of batch. To make this more clear, the way Spark Streaming works is by dividing live data streams of data into batches (sometimes called microbatches). The continuous streams are represented as a high level abstraction called discretized stream or DStream, which internally is represented as a sequence of RDDs.

Some things in common between RDDs and DStreams are that DStreams are executed lazily by the output operations – the equivalent to Actions in RDDs. For example, in the classical word count example, the print method would be the output operation.

Nonetheless, even though the main abstraction to work around was DStreams on top of RDDs, it was already possible to work with Dataframes (and thus SQL) on streaming data before.


Changes in Spark 2.0 – introducing “Structured Streaming”

So  the new kid on the block is structured streaming, a feature still in alpha for Spark 2.0. It’s marketing slogan: “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming“. In other words, this is an attempt to simplify the life of all developers, struggling with the three main general problems in streaming, while bringing a streaming API for DataFrames and DataSets.

Note that now the streaming API has been unified with Batch API, which means that you should expect (almost always) to use same method calls exactly as you would in normal batch API. Of course, some minor exceptions, such as reading from file and writing to file,  require extra method call. Here is a copy paste from the examples shown at Spark Submit in the “A deep dive into structured streaming” presentation (highly recommendable).

Syntax for batch more:

input ="json").load("source-path")
result ="device", "signal").where("signal > 15")

Now for structured streaming:

input ="json").stream("source-path")
result ="device", "signal").where("signal > 15")

In case you didn’t notice the difference (that’s good in this case), what changed is instead of load() of a file, you continuously stream(), and same logic to write with startStream(). Pretty cool, right?

Also note that the base premise of  spark’s streaming discussed before still stays; that is, micro-batching still stays.

Repeated Queries on input table

So besides unifying the API, where’s the magic sauce? The main concept is this new abstraction where streaming data is viewed as if it were much like a table in a relational DB. In other words new incoming data is treated as a new row inserted on an unbounded input table.

The developer has three tasks in the middle of this whole streaming party.

  1. One is to define a query that acts repeatedly on this input table, and produces a new table called result table that will be written to an output sink. This query will be analyzed under the hood by spark (as explained bellow) by the query planner into an execution plan, and Spark will figure out on its own what needs to be maintained in the input table to update the result table.
  2. The second is to specify the triggers that control when to update the results.
  3. Define an output mode, so that spark knows how it should write to an external system (such as S3, HDFS or database). Three alternatives: append (only new rows of result table will be written), complete (full result table will be flushed), update (only rows that were updated since last trigger will be changed)

Structured Streaming under the hood

Under the hood (and when I say “under the hood” I mean what is done for you that you don’t need implement in your code – but yes, you should nonetheless care about it) structured streaming are possible due to some considerable changes. The first is that the Query planner (the same for batch mode) generates a continuous series of multiple incremental execution plans for each processing of the next chunk of streaming data (for example, when pooling from Kafka this would mean the new range of offsets).

Now this can get particularly tricky because each execution plan needs to take into consideration the previous aggregations that were held by previous executions. So these continuous aggregations’ state is maintained internally as an in-memory stated, and backed by  Write Ahead Log (WAL) in the file system (for Fault Tolerance). So yes, the Query Planner is FT by storing tracks of offsets of each execution into a distributed file system such as HDFS.

Moreover on FT, the sinks are also idempotent, which means that one should expect that they handle re-executions to avoid double committing the output.

Finally, before you get all excited, I do want to emphasize that at the moment, this is still in alpha mode, and is explicitly not recommended for production environments.

Not enough?, bring on more resources:


Overview HP Vertica vs AWS Redshift

While working in HP some years ago, I was exposed to not only internal training materials, but also a demo environment. I still remember the excitement when HP acquired Vertica Systems in 2011, and we had a new toy to play with… Come on, you can’t blame me, distributed DBs was something only the cool kids were doing.

Bottom line is that it’s been a while since I laid eyes on it… Well recently, while considering possible architectural solutions, I had the pleasure  to revisit Vertica. And since AWS Redshift has been gaining a lot of popularity and we’re also using it at some of our clients, I thought I might give some easy summary to help others.

Now if you’re expecting a smack down post, then I’m afraid I’ll disappoint you – for that you have the internet. If experience has taught me  something is that – in the case of top-notch solutions there are only use cases, and one finds the best fitting one.

They share some properties in terms of architecture in internal engine:

  • Massively Parallel Processing (MPP) architecture: data is distributed among distinct nodes in shared nothing architecture, leveraging scale out and; in case you’re wondering how it compares to hadoop Hive, AirBnB did a smack-down comparison, concluding around 5x advantage of Redshift over Hive;
  • High availability (HA): this follows the first, thanks to to data replication mechanism; in the case of Vertica, they call it”k-safety” for measuring replication factor; and you may also want to check fault groups to control how data is replicated according to physical distribution (such as server rack location, power circuits, etc); same automatic data replication among nodes happens with Redshift under the hood (besides more goodies such as backups)
  • columnar oriented data store: for analytical applications/OLAP (where queries usually select only specific columns in opposition to OLTP), it is usually much faster mainly due to a) does not need to scan whole row and then discard the unnecessary content, b) higher efficiency in compression/encoding mechanisms due to similar data types; for more detailed explanation, I suggest here. Both Vertica and Redshift are built with this architecture;
  • Data compression: Vertica mixes encoding strategies, depending on column data type, table cardinality, and sort order; they do distinguish a difference between encoding and compression, since it will operate directly on encoded data whenever possible, which does not hold true for compression; Redshift does not make such a distinction, and recommends leaving compression to auto mode, although you can choose encoding type;
  • SQL standard interface: as always, minor differences are present, but bottom line you can use the SQL syntax that you’re already accustomed to
  • User Defined Functions (UDFs): both Redshift and Vertica give you space for customization;
  • In-memory DB: Nop, none of these are like SAP HANA, Oracle TimesTen or IBM’s SolidDB

Where they differ:

  • Architecture: in Vertica all nodes are “created equally”, meaning they share similar functions; Redshift has the concept of a leader node, a dedicated node which manages workload and query coordination among worker nodes;
  • Management: (this is a key differentiator that most likely that the biggest weight in the final decision) with Vertica you have to do all the ops work (install, upgrade/update, configure nodes, etc.);  Redshift is a fully managed Cloud solution, where you only have pure Database related ops work; Note: yes, HP provides an AMI to easily kickstart projects in AWS cloud, but come on, this is still not the same thing;
  • Freedom of environment: with Redshift you’re locked in to AWS; with Vertica you can run it wherever you feel like;
  • Schema Design: Vertica provides you with a Designer Tool to easily migrate from traditional RDBMS systems based on their schema (not saying this saves the world, but can be helpful); this is specially important in the beginning, since columnar data warehouses don’t support indexes; so in Vertica you play with the projection concept, in Redshift with distribution and sort keys (and you better do it well, as it will be key for performance and keeping things balanced)
  • Payment scheme: In Vertica (for data bigger than 1TB, which most certainly is the case) you pay upfront licensing, plus the cost of the machines where you’re running them; with Redshift costs all diluted into an hourly cost, and that’s it;
  • Compiled code: Redshift claims that the leader node compiles the code for optimal performance on execution time, which the guys at Cake also confirm with this excellent post;
  • Free Trial/usage: Vertica lets you have up to 3 nodes and 1TB of data;  Redshift, on the other hand, in case you’re account is still electable for the free usage tier (in the first year), you can try for a total of 750 normalized instance hours per month, enough for running continuously one DC1.Large single node, with 160TB SSD storage
  • Add-Ons: Vertica Pulse for sentiment analysis and Place for geospatial data analysis;

Finally, you might want to go deeper. Again, I really suggest this excellent post by Cake, which provides performance benchmarks. Benchmarks are always disputable; but still, it is always interesting and important comparison method.

Check out OpenFace

If you’re interested in using machine learning (ML) on image and video datasets, then you might be interested in heaving a look on a relatively new project called OpenFace (first released in October 2015), with  Brandon Amos, Ludwiczuk Bartosz and Mahadev Satyanarayanan as authors.

TL;DR: For the impatient

  • Pitch me: Open source project (aka free for you to use) developed inside Carnegie Mellon University  for face recognition with deep neural networks, with a Python API
  • What do I get from it: improved accuracy and reduced training time
  • Need to see to believe (and so one should)? You can start playing with it with Docker, and check the provided demos

What about it

Even though face recognition research has already started since the 1970s, it is still far from stagnant. The usual strategy for solving the problem has been divided into three main steps; given an image with a set of faces, first run face detection algorithm to isolate the faces from the rest, then preprocess this cropped part to reduce the high dimensionality, and finally classification. However, what makes this whole process so challenging is that many factors can create noise around this process, such as images can be taken from different angles, different lighting conditions, the face itself also suffers changes throughout time (for example due to age or style), etc.

Now one important fact to point out is that state of the art top performing algorithms are using convolutional neural networks. OpenFace is inspired by Facebook’s DeepFace and (mainly) Google’s FaceNet systems. The performance smack down that the authors present using the “Labeled Faces in the wild” dataset (LFW) for eveluation, and achieved some interesting results.

Another interesting point is that, as the authors state, the implementation is tuned for using the model in mobile devices, so the  “[…] key design consideration is a system that gives high accuracy with low training and prediction times“.

Note: In case you are wondering what’s the difference to OpenBiometrics (OpenBR). As stated by the authors of OpenFace in HackerNews, the main difference lies on the approach taken – deep convolutional networks – and could potentially be integrated into OpenBR’s pipeline.

Internal Guts

As you might imagine (as any image/video processing package), dependencies are complex and time consuming, so prepare yourself for some dependencies troubleshooting in case your machine is still new to this world.

The project’s API is written in Python 2 – entry point here – given its dependencies on OpenCV and DLib. OpenCV provides the computer vision base, DLib enhances OpenCV face detection ability, numpy for matrix algebra operations and scikit-learn for classification operations.

For training the convolutional network openface uses Torch, Lua and Luajit which is written in Lua programming language. In this case, Torch allows the neural networks to be executed either in CPU or CUDA enabled GPUs.

The following illustration was extracted from the recent technical report “OpenFace: A general-purpose face recognition

library with mobile applications“, by the authors, and provides interesting insight:


So important to note is that you do have the option to use already pretrained models (which use the CASIA-WebFace and FaceScrub databases) to help with face detection, which you can find in the models directory. The provided bash script downloads them.

Where to get started

To setup either locally or with Docker you can check the provided documentation.

Finally, you might also be interested in having a look at other projects using deep neural networks for face recognition:  Visual Geometry Group (VGG) Face Descriptor, and Lightened Convolutional Neural Networks (CNNs)


Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys.

So let us start on Spark Redshift methods.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc) 

rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'

# Create spark dataframe through a Redshift query
df = \
 .format('com.databricks.spark.redshift') \
 .option('url', rs_url) \
 .option('query', rs_query) \
 .option('tempdir', rs_tmp_dir) \
 .option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
 .option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
 .option('temporary_aws_session_token', sts_credentials.get('Token')) \


OK, until here almost only spark related code in Python.  Now the only thing you might be wondering is what the hell is that “sts_credentials” map?  Well spotted. The following snippet will reveal it.

import requests
from requests.exceptions import HTTPError, Timeout

# Optionally define a default ec2 instance role for EMR instances

def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
  """ Retrieves temp AWS credentials """
  query_uri = '{}'.format(role)
  print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
    if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
      print('Successfully retrieved temp AWS credentials.')
      return sts_credentials
print('There was a problem when retrieving temp credentials '
         'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
   except (HTTPError, ConnectionError, Timeout) as err:
     msg = 'Unable to query AWS for credentials: {}'.format(err)
   except ValueError as err:
     msg = 'Error: unable to decode json from \'None\' value - {} ' \
           '(hint: most likely the role you are using is wrong)'.format(err)
   except Exception as err:
     msg = 'Failed to get AWS role temp credentials: {}'.format(err)

# to use this function, you might do something like the following:
role = 'my-emr-cluster-role'
sts_credentials = get_temp_credentials(role=role)
aws_access_key_id = sts_credentials.get('AccessKeyId')
aws_secret_access_key = sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')


Yes, essentially this peace of code is just doing an http request  to an AWS service. In case you’re getting suspicious of the black magic looking address – “” – let me tell you in advance that this is a handy service to provide meta-information about your instances.

As a side note, and if you are indeed using pyspark jobs, you might want to give more flexibility whether your testing your code in local, or actually running the job in an AWS cluster. So, given the fact that the previous snippet will ONLY work if you run it on a AWS ec2 instance AND that instance is assigned the correct IAM role, here is a simple function which either passes credentials directly (if in local) or uses Role to ask for temp credentials to AWS.

def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
    """ Returns temp AWS credentials present in an AWS instance
    Note: only works on AWS machines!
    :param role: (str) AWS instance role
    :param local_aws_access_key_id: (str) optional param for local testing/dev
    :param local_aws_secret_access_key: (str) optional param for local testing/dev
    :param opts: ()
            (str) temp credentials to be used in query
    if not role:
    sts_credentials = get_temp_credentials(role=role) or dict()
    aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
    aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
    token = sts_credentials.get('Token')
    redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
    if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
        redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
    return redshift_credentials


Here is a complete gist of this code:

Finally some potential gotcha’s: the temporary key have by default a 30 minutes limit, which you can extend initially when you request the temp keys.  In any case, you might want to take that into consideration on jobs that can potentially be long running.




Distcp S3-hdfs for eu-central-1 (AWS Frankfurt) – details “behind the trenches”


Distcp (distributed copy) is a fairly old tool used to move a large quantity of files usually within hdfs, using MapReduce job to do so where mappers list the source files and reducers do the copy heavy lifting. Another useful integration is that it can also deal with file migrations between hdfs and AWS S3. In the meanwhile S3Distcp came along as an specific enhancement for speeding up these last kind of migrations.

However, what might be new for you is the requirement to use either one of these tools to move data from/to a bucket located in S3 Frankfurt region.   So this post intends to save you that half an hour of painful google research trying to figure out why your Oozie job is failing.

Without further ado, before you dive in to your Cloudera or Hortonworks Manager to edit hadoop properties in core-site.xml, I would suggest that you SSH to one of your hadop nodes, and start by testing running distcp command. Here is the sintax for moving data from S3 to HDFS:

hadoop distcp -Dfs.s3a.awsAccessKeyId=XXX \
-Dfs.s3a.awsSecretAccessKey=XX \ s3a://YOUR-BUCKET-NAME/ \

So yeah, in case you haven’t noticed, the black magic pieces in this command that makes things work just like in other regions are: the “fs.s3a.endpoint” property, as well as the “a” after “s3”.

Also in case you are moving data from hdfs to S3, besides write access, make sure you also give remove permissions on your AWS policy attaches to  the given user account on for that S3 bucket (or specific path) . This is because Distcp produces temporary files adjacent to your bucket target file migration, which you want to allow the reducers to remove in the end. Here is an example of an AWS policy for that effect:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:s3:::*"
            "Effect": "Allow",
            "Action": [
            "Resource": [
                "arn:aws:s3:::YOUR-BUCKET" ]

Alternatively you might also want to test it using S3Distcp tool. Here is another example:

 hadoop jar s3distcp.jar --src /data/ \
--dest s3a://YOUR-BUCKET-NAME/ \

Note that the s3distcp jar needs to be locally on the host file system from which you are running the command. If you have the jar in hdfs, here is an example how you can fetch it:

hadoop fs -copyToLocal /YOUR-JARS-LOCATION-IN-HDFS/s3distcp.jar \

Also there is also the possibility you might stumble uppon some headaches with S3Distcp tool. This might prove itself to be handy.

Hope this helped! Where to go next? AWS has a handy best practice guide listing different strategies for data migration here.



Getting started with the Spark (part 2) – SparkSQL

Update: this tutorial has been updated mainly up to Spark 1.6.2 (with a minor detail regarding Spark 2.0), which is not the most recent version of Spark at the moment of updating of this post. Nonetheless, for the operations exemplified you can pretty much rest assured that the API has not changed substantially. I will try to do my best to update and point out these differences when they occur.

SparkSQL has become one of the core modules of Spark ecosystem (with its API already available for all currently supported languages, namely Scala, Java, Python, and R), and is intended to allow you to structure data into named columns (instead of with raw data without a schema as with RDDs) which can be queried with SQL syntax taking advantage of a distributed query engine running in the background.

In practical terms, you can very much think of it as equivalent as an attempt to emulate the API’s of popular dataframes in R or Python pandas, as well as SQL tables. However, if I do my job correctly, I succeed in proving you that they are not the same (and hopefully more).

In order for you to use SparkSQL you need to import SQLContext. Let’s start how you would do it with Python:

# import required libs
from pyspark import SparkConf, SparkContext
# here is the import that allows you to use dataframes
from pyspark.sql import SQLContext

# initialize context in Spark 1.6.2
conf = SparkConf().setMaster('local[2]').setAppName('SparkSQL-demo')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

Note: in the new Spark 2.0, this would indeed vary as such:

# import required libs
from pyspark.sql import SparkSession
# initialize context in Spark 2.0
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
    <span class="o">.</span><span class="n">builder</span>\
    <span class="o">.</span><span class="n">appName</span><span class="p">(</span>'SparkSQL-demo')<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>

Good news, Scala does not vary much:

//import required libs
import org.apache.spark.{SparkConf, SparkContext}
//here is the import that allows you to use dataframes
import org.apache.spark.sql.SQLContext

# initialize context in Spark 1.6.2
val conf = new SparkConf().setMaster(&quot;local[2]&quot;).setAppName(&quot;SparkSQL-demo&quot;)
val sc = new SparkContext(conf=conf)
val sqlContext = new SQLContext(sc)


A dataframe can be created from an existing RDD, as well as similarly to RDDs from a variety of different sources such astext files, HDFS, S3,  Hive tables, external databases, etc. Here’s an example how to build a dataframe by reading from S3 (in Python):

df ='s3n://my-bucket/example/file.json')


Note: we use “s3n” prefix which is valid for a serious of regions in AWS, but not valid for all of them. For example, to access S3 buckets in Frankfurt region you would rather use “s3a” as a prefix.

What sets Spark Dataframes apart

However, and even though the goal is to hide the maximum amount of disparities, there are nonetheless big conceptual differences between a Spark Dataframe and a Pandas dataframe. The first is that the immutability side has (thank God) not been removed. In other words, when you call an action on an Dataframe, you do need to save it into a variable. I’ve seen a lot of data scientists stumble on this in their first Spark days, so let us illustrate this in a very simple example:

# Create a Dataframe with pseudo data:
column_names = [&quot;customer_name&quot;, &quot;date&quot;, &quot;category&quot;,
&quot;product_name&quot;, &quot;quantity&quot;, &quot;price&quot;]
# Note that we create an RDD, and then cast it to
# Dataframe by providing a schema
column_names = ["customer_name", "date", "category",
"product_name", "quantity", "price"]
df01 = sc.parallelize(
[("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00)]).toDF(column_names)

df02 ="customer_name", "date", "category", "product_name", "price", (df01['quantity']+1).alias("quantity"))

|customer_name|      date|category|product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|         Yann|2016-04-22|       B|        Lamp|       1| 38.0|

# .. where df02 is, as expected 

|customer_name|      date|category|product_name|price|quantity|
|     Geoffrey|2016-04-22|       A|      apples| 50.0|       2|
|         Yann|2016-04-22|       B|        Lamp| 38.0|       2|

The second most obvious difference (compared to pandas df) is the distributed nature of a Spark Dataframe, similarly as RDDs are. As a matter a fact,in previous versions of Spark Dataframes were even called SchemaRDD. So internally the same redundancy principles of RDDs apply, you should in fact still keep in mind the Driver – Executor programming model, as well as Actions and Transformations concept.

Also important to note is that Spark Dataframes use internally a query optmizer, one of the reasons why the performance difference between native Scala written Spark code vs Python or Java drops significantly as can be seen in the next illustration provided by databricks:




Let us view some useful operations one can perform with spark dfs. For the sake of brevity I will continue this tutorial mainly using the python API, as the syntax does not really vary that much.

Let us create a bigger fake (and complete utterly non-sense) dataset to make operations more interesting:

customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


The dataset is minimal for the example, so it really does not make a difference to cache it in memory. But my point is to illustrate that similarly to RDDs, you should also care about memory persistence, and can indeed use a similar API. Moreover, since next we are going to use several distinct operations of type “Action” (yielding back results to the driver), it is of good practice to signal that this dataframe should be kept in memory for further operations.

# False
# True

OK, so yes, there are some exceptions, and in this case as you can see the cache() method does not obey the laws of immutability. Moving on, let’s take a peak on it:

|customer_name|      date|category|   product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|         apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|           Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|   Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|         apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|           Rice|       5| 15.0|
|     Geoffrey|2016-06-05|       A|         apples|       5| 50.0|
|     Geoffrey|2016-06-05|       A|        bananas|       5| 55.0|
|     Geoffrey|2016-06-15|       Y|    Motor skate|       7| 68.0|
|     Geoffrey|2016-06-15|       E|Book: The noose|       1|125.0|
|         Yann|2016-04-22|       B|           Lamp|       1| 38.0|
only showing top 10 rows


As previously referred, one advantage over RDDs is the ability to use standard SQL syntax to slice and dice over a Spark Dataframe. Let’s check for example all distinct products that we have:

spark_sql_table = 'customers'
sql_qry = "select distinct(product_name) from {table}".format(table=spark_sql_table)
distinct_product_names_df01 = sqlContext.sql(sql_qry)

|product_name              |
|space ship                |
|Book: The noose           |
|Rice                      |
|Motor skate               |
|Book: Crime and Punishment|
|Recycle bin               |
|Lamp                      |
|bananas                   |
|Solar Pannel              |
|apples                    |


Similarly to how we work with RDDs, with Spark DataFrames you are not restricted to SQL, and can indeed call methods on dataframes, which yields a new dataframe (remember the immutability story, right?)

Let’s check for example the same previous example on how to get all distinct:

distinct_product_names_df02 ='product_name').distinct()

|product_name |
|space ship |
|Book: The noose |
|Rice |
|Motor skate |
|Book: Crime and Punishment|
|Recycle bin |
|Lamp |
|bananas |
|Solar Pannel |
|apples |


Note that you can as well work on a dataframe just like you iterate on an RDD, which causes the dataframe to be cast into a RDD in the background:

distinct_product_names_rdd = row: row[3])
# calling will yield an exception
except Exception as err:
 print('Error: {}'.format(err))
 print('BUT calling .take() this works, as it is an RDD now: {}' \
 .format(distinct_product_names_rdd.take(3)) )

distinct_product_names = distinct_product_names_rdd.distinct().collect()
print('Distinct product names are: {}'.format(distinct_product_names))
print('\n... And, as expected, they are {} in total.' \

Error: 'PipelinedRDD' object has no attribute 'show'
BUT calling .take() this works, as it is an RDD now: ['apples', 'Lamp', 'Solar Pannel']
Distinct product names are: ['Rice', 'bananas', 'space ship', 'Book: The noose', 'Solar Pannel', 'Recycle bin', 'apples', 'Lamp', 'Book: Crime and Punishment', 'Motor skate']

... And, as expected, they are 10 in total.


Similar than in R or Python, you can have summary statistics. Let’s get statistics for Category, quantity and price columns:

customers.describe('category', 'quantity', 'price').show()
|summary|category|          quantity|             price|
|  count|      39|                39|                39|
|   mean|    null| 4.153846153846154| 68.94871794871794|
| stddev|    null|2.9694803863945936|59.759583559965165|
|    min|       A|                 1|              15.0|
|    max|       Z|                15|             227.0|


Being a categorical variable, as you can see the summary regarding the ‘category’ column does not make much sense. We’ll work on better on this later, but for now, let’s do something simple and try to look into product_name and category.

Let’s suppose we are interested in finding out how does purchasing behavior vary among categories, namely what is the average frequency of purchases per category.

cat_freq_per_cust = customers.stat.crosstab('customer_name', 'category')

|customer_name_category|  Y|  Z|  A|  B|  C|  D|  E|
|              Geoffrey|  1|  0|  4|  1|  1|  1|  1|
|                Yoshua|  1|  2|  3|  1|  1|  1|  0|
|                  Yann|  2|  1|  2|  2|  1|  3|  2|
|                Jurgen|  2|  1|  3|  0|  1|  1|  0|


cols = cat_freq_per_cust.columns

|summary|Y                 |Z                |A                |B                |C  |D                 |E                 |
|count  |4                 |4                |4                |4                |4  |4                 |4                 |
|mean   |1.5               |1.0              |3.0              |1.0              |1.0|1.5               |0.75              |
|stddev |0.5773502691896257|0.816496580927726|0.816496580927726|0.816496580927726|0.0|0.9999999999999999|0.9574271077563381|
|min    |1                 |0                |2                |0                |1  |1                 |0                 |
|max    |2                 |2                |4                |2                |1  |3                 |2                 |


Note: “count” column does not have much meaning here; it simply counts number of rows blindly, and as expected since the rows without a given value are filled with “zero” instead of Null, it counts them too.

Let’s check the same for products.

# Let's view products vertically, for formatting reasons
customers.stat.crosstab('product_name', 'customer_name' )\
|Book: Crime and Punishment|0       |0     |1   |0     |
|Recycle bin               |0       |1     |2   |1     |
|Solar Pannel              |1       |0     |1   |0     |
|space ship                |0       |1     |1   |2     |
|Motor skate               |1       |2     |2   |1     |
|Rice                      |1       |1     |1   |1     |
|apples                    |3       |0     |0   |1     |
|Lamp                      |1       |0     |2   |1     |
|bananas                   |1       |3     |2   |2     |
|Book: The noose           |1       |0     |1   |0     |


freq_products = customers.stat.crosstab('customer_name', 'product_name')
freq_prod_cols = freq_products.columns

|summary|          bananas|       space ship|   Book: The noose|Rice|       Motor skate|             Lamp|            apples|Book: Crime and Punishment|      Recycle bin|      Solar Pannel|
|  count|                4|                4|                 4|   4|                 4|                4|                 4|                         4|                4|                 4|
|   mean|              2.0|              1.0|               0.5| 1.0|               1.5|              1.0|               1.0|                      0.25|              1.0|               0.5|
| stddev|0.816496580927726|0.816496580927726|0.5773502691896257| 0.0|0.5773502691896257|0.816496580927726|1.4142135623730951|                       0.5|0.816496580927726|0.5773502691896257|
|    min|                1|                0|                 0|   1|                 1|                0|                 0|                         0|                0|                 0|
|    max|                3|                2|                 1|   1|                 2|                2|                 3|                         1|                2|                 1|

Alternatively we could have answered suing a more SQL-like approach:

cust_prod = customers.groupBy('customer_name', 'product_name') \
 .agg(func.count('product_name')) \
 .orderBy('product_name', 'customer_name'), truncate=False)

|customer_name|product_name              |count(product_name)|
|Yann         |Book: Crime and Punishment|1                  |
|Geoffrey     |Book: The noose           |1                  |
|Yann         |Book: The noose           |1                  |
|Geoffrey     |Lamp                      |1                  |
|Yann         |Lamp                      |2                  |
|Yoshua       |Lamp                      |1                  |
|Geoffrey     |Motor skate               |1                  |
|Jurgen       |Motor skate               |2                  |
|Yann         |Motor skate               |2                  |
|Yoshua       |Motor skate               |1                  |
|Jurgen       |Recycle bin               |1                  |
|Yann         |Recycle bin               |2                  |
|Yoshua       |Recycle bin               |1                  |
|Geoffrey     |Rice                      |1                  |
|Jurgen       |Rice                      |1                  |
|Yann         |Rice                      |1                  |
|Yoshua       |Rice                      |1                  |
|Geoffrey     |Solar Pannel              |1                  |
|Yann         |Solar Pannel              |1                  |
|Geoffrey     |apples                    |3                  |
|Yoshua       |apples                    |1                  |
|Geoffrey     |bananas                   |1                  |
|Jurgen       |bananas                   |3                  |
|Yann         |bananas                   |2                  |
|Yoshua       |bananas                   |2                  |
|Jurgen       |space ship                |1                  |
|Yann         |space ship                |1                  |
|Yoshua       |space ship                |2                  |


cust_prod.groupBy('product_name') \
 .agg(func.avg('count(product_name)')) \
|product_name              |avg(count(product_name))|
|space ship                |1.3333333333333333      |
|Book: The noose           |1.0                     |
|Rice                      |1.0                     |
|Motor skate               |1.5                     |
|Book: Crime and Punishment|1.0                     |
|Recycle bin               |1.3333333333333333      |
|Lamp                      |1.3333333333333333      |
|bananas                   |2.0                     |
|Solar Pannel              |1.0                     |
|apples                    |2.0                     |

Note that this does not take into account the quantity of each product bought, just how many times client was there to buy a given item. For our last question that may be OK, because we understand the different nature of products.
But let us assume it is not OK, and we do need to count by quantity. For that we can use pivot dataframes, a feature introduced in Spark 1.6

total_products_bought_by_customer = customers.groupBy('customer_name')\

|customer_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
|         Yann|                         5|              5|   3|          2|         10|  15|           5|  null|      6|         1|
|       Yoshua|                      null|           null|   2|          4|          5|   5|        null|    10|     18|         7|
|     Geoffrey|                      null|              1|   2|          7|       null|   5|           1|     9|      5|      null|
|       Jurgen|                      null|           null|null|          3|          5|   5|        null|  null|     15|         1|


Hope this has been a good introduction, and I suggest that you further check out this great post from databricks covering similar operations.

For the last part of this tutorial series I plan to cover window functions, something that until the more recent Spark 2.0 was possible thanks to HiveContext.