Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation. Continue reading “Spark 2.0: From quasi to proper-streaming?”

Overview HP Vertica vs AWS Redshift

While working in HP some years ago, I was exposed to not only internal training materials, but also a demo environment. I still remember the excitement when HP acquired Vertica Systems in 2011, and we had a new toy to play with… Come on, you can’t blame me, distributed DBs was something only the cool kids were doing.

Bottom line is that it’s been a while since I laid eyes on it… Well recently, while considering possible architectural solutions, I had the pleasure  to revisit Vertica. And since AWS Redshift has been gaining a lot of popularity and we’re also using it at some of our clients, I thought I might give some easy summary to help others.

Now if you’re expecting a smack down post, then I’m afraid I’ll disappoint you – for that you have the internet. If experience has taught me  something is that – in the case of top-notch solutions there are only use cases, and one finds the best fitting one. Continue reading “Overview HP Vertica vs AWS Redshift”

Check out OpenFace

If you’re interested in using machine learning (ML) on image and video datasets, then you might be interested in heaving a look on a relatively new project called OpenFace (first released in October 2015), with  Brandon Amos, Ludwiczuk Bartosz and Mahadev Satyanarayanan as authors.

TL;DR: For the impatient

  • Pitch me: Open source project (aka free for you to use) developed inside Carnegie Mellon University  for face recognition with deep neural networks, with a Python API
  • What do I get from it: improved accuracy and reduced training time
  • Need to see to believe (and so one should)? You can start playing with it with Docker, and check the provided demos

What about it

Even though face recognition research has already started since the 1970s, it is still far from stagnant. The usual strategy for solving the problem has been divided into three main steps; given an image with a set of faces, first run face detection algorithm to isolate the faces from the rest, then preprocess this cropped part to reduce the high dimensionality, and finally classification. However, what makes this whole process so challenging is that many factors can create noise around this process, such as images can be taken from different angles, different lighting conditions, the face itself also suffers changes throughout time (for example due to age or style), etc.

Now one important fact to point out is that state of the art top performing algorithms are using convolutional neural networks. OpenFace is inspired by Facebook’s DeepFace and (mainly) Google’s FaceNet systems. The performance smack down that the authors present using the “Labeled Faces in the wild” dataset (LFW) for eveluation, and achieved some interesting results.

Another interesting point is that, as the authors state, the implementation is tuned for using the model in mobile devices, so the  “[…] key design consideration is a system that gives high accuracy with low training and prediction times“.

Note: In case you are wondering what’s the difference to OpenBiometrics (OpenBR). As stated by the authors of OpenFace in HackerNews, the main difference lies on the approach taken – deep convolutional networks – and could potentially be integrated into OpenBR’s pipeline.

Internal Guts

As you might imagine (as any image/video processing package), dependencies are complex and time consuming, so prepare yourself for some dependencies troubleshooting in case your machine is still new to this world.

The project’s API is written in Python 2 – entry point here – given its dependencies on OpenCV and DLib. OpenCV provides the computer vision base, DLib enhances OpenCV face detection ability, numpy for matrix algebra operations and scikit-learn for classification operations.

For training the convolutional network openface uses Torch, Lua and Luajit which is written in Lua programming language. In this case, Torch allows the neural networks to be executed either in CPU or CUDA enabled GPUs.

The following illustration was extracted from the recent technical report “OpenFace: A general-purpose face recognition

library with mobile applications“, by the authors, and provides interesting insight:


So important to note is that you do have the option to use already pretrained models (which use the CASIA-WebFace and FaceScrub databases) to help with face detection, which you can find in the models directory. The provided bash script downloads them.

Where to get started

To setup either locally or with Docker you can check the provided documentation.

Finally, you might also be interested in having a look at other projects using deep neural networks for face recognition:  Visual Geometry Group (VGG) Face Descriptor, and Lightened Convolutional Neural Networks (CNNs)


Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys. Continue reading “Spark – Redshift: AWS Roles to the rescue”

Distcp S3-hdfs for eu-central-1 (AWS Frankfurt) – details “behind the trenches”


Distcp (distributed copy) is a fairly old tool used to move a large quantity of files usually within hdfs, using MapReduce job to do so where mappers list the source files and reducers do the copy heavy lifting. Another useful integration is that it can also deal with file migrations between hdfs and AWS S3. In the meanwhile S3Distcp came along as an specific enhancement for speeding up these last kind of migrations.

However, what might be new for you is the requirement to use either one of these tools to move data from/to a bucket located in S3 Frankfurt region.   So this post intends to save you that half an hour of painful google research trying to figure out why your Oozie job is failing.

Without further ado, before you dive in to your Cloudera or Hortonworks Manager to edit hadoop properties in core-site.xml, I would suggest that you SSH to one of your hadop nodes, and start by testing running distcp command. Here is the sintax for moving data from S3 to HDFS:

hadoop distcp -Dfs.s3a.awsAccessKeyId=XXX \
-Dfs.s3a.awsSecretAccessKey=XX \ s3a://YOUR-BUCKET-NAME/ \

So yeah, in case you haven’t noticed, the black magic pieces in this command that makes things work just like in other regions are: the “fs.s3a.endpoint” property, as well as the “a” after “s3”.

Also in case you are moving data from hdfs to S3, besides write access, make sure you also give remove permissions on your AWS policy attaches to  the given user account on for that S3 bucket (or specific path) . This is because Distcp produces temporary files adjacent to your bucket target file migration, which you want to allow the reducers to remove in the end. Here is an example of an AWS policy for that effect:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:s3:::*"
            "Effect": "Allow",
            "Action": [
            "Resource": [
                "arn:aws:s3:::YOUR-BUCKET" ]

Alternatively you might also want to test it using S3Distcp tool. Here is another example:

 hadoop jar s3distcp.jar --src /data/ \
--dest s3a://YOUR-BUCKET-NAME/ \

Note that the s3distcp jar needs to be locally on the host file system from which you are running the command. If you have the jar in hdfs, here is an example how you can fetch it:

hadoop fs -copyToLocal /YOUR-JARS-LOCATION-IN-HDFS/s3distcp.jar \

Also there is also the possibility you might stumble uppon some headaches with S3Distcp tool. This might prove itself to be handy.

Hope this helped! Where to go next? AWS has a handy best practice guide listing different strategies for data migration here.



Getting started with the Spark (part 2) – SparkSQL

Update: this tutorial has been updated mainly up to Spark 1.6.2 (with a minor detail regarding Spark 2.0), which is not the most recent version of Spark at the moment of updating of this post. Nonetheless, for the operations exemplified you can pretty much rest assured that the API has not changed substantially. I will try to do my best to update and point out these differences when they occur.

SparkSQL has become one of the core modules of Spark ecosystem (with its API already available for all currently supported languages, namely Scala, Java, Python, and R), and is intended to allow you to structure data into named columns (instead of with raw data without a schema as with RDDs) which can be queried with SQL syntax taking advantage of a distributed query engine running in the background.

In practical terms, you can very much think of it as equivalent as an attempt to emulate the API’s of popular dataframes in R or Python pandas, as well as SQL tables. However, if I do my job correctly, I succeed in proving you that they are not the same (and hopefully more).

In order for you to use SparkSQL you need to import SQLContext. Let’s start how you would do it with Python:

# import required libs
from pyspark import SparkConf, SparkContext
# here is the import that allows you to use dataframes
from pyspark.sql import SQLContext

# initialize context in Spark 1.6.2
conf = SparkConf().setMaster('local[2]').setAppName('SparkSQL-demo')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

Note: in the new Spark 2.0, this would indeed vary as such:

# import required libs
from pyspark.sql import SparkSession
# initialize context in Spark 2.0
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
    <span class="o">.</span><span class="n">builder</span>\
    <span class="o">.</span><span class="n">appName</span><span class="p">(</span>'SparkSQL-demo')<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>

Good news, Scala does not vary much:

//import required libs
import org.apache.spark.{SparkConf, SparkContext}
//here is the import that allows you to use dataframes
import org.apache.spark.sql.SQLContext

# initialize context in Spark 1.6.2
val conf = new SparkConf().setMaster(&quot;local[2]&quot;).setAppName(&quot;SparkSQL-demo&quot;)
val sc = new SparkContext(conf=conf)
val sqlContext = new SQLContext(sc)


A dataframe can be created from an existing RDD, as well as similarly to RDDs from a variety of different sources such astext files, HDFS, S3,  Hive tables, external databases, etc. Here’s an example how to build a dataframe by reading from S3 (in Python):

df ='s3n://my-bucket/example/file.json')


Note: we use “s3n” prefix which is valid for a serious of regions in AWS, but not valid for all of them. For example, to access S3 buckets in Frankfurt region you would rather use “s3a” as a prefix.

What sets Spark Dataframes apart

However, and even though the goal is to hide the maximum amount of disparities, there are nonetheless big conceptual differences between a Spark Dataframe and a Pandas dataframe. The first is that the immutability side has (thank God) not been removed. In other words, when you call an action on an Dataframe, you do need to save it into a variable. I’ve seen a lot of data scientists stumble on this in their first Spark days, so let us illustrate this in a very simple example:

# Create a Dataframe with pseudo data:
column_names = [&quot;customer_name&quot;, &quot;date&quot;, &quot;category&quot;,
&quot;product_name&quot;, &quot;quantity&quot;, &quot;price&quot;]
# Note that we create an RDD, and then cast it to
# Dataframe by providing a schema
column_names = ["customer_name", "date", "category",
"product_name", "quantity", "price"]
df01 = sc.parallelize(
[("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00)]).toDF(column_names)

df02 ="customer_name", "date", "category", "product_name", "price", (df01['quantity']+1).alias("quantity"))

|customer_name|      date|category|product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|         Yann|2016-04-22|       B|        Lamp|       1| 38.0|

# .. where df02 is, as expected 

|customer_name|      date|category|product_name|price|quantity|
|     Geoffrey|2016-04-22|       A|      apples| 50.0|       2|
|         Yann|2016-04-22|       B|        Lamp| 38.0|       2|

The second most obvious difference (compared to pandas df) is the distributed nature of a Spark Dataframe, similarly as RDDs are. As a matter a fact,in previous versions of Spark Dataframes were even called SchemaRDD. So internally the same redundancy principles of RDDs apply, you should in fact still keep in mind the Driver – Executor programming model, as well as Actions and Transformations concept.

Also important to note is that Spark Dataframes use internally a query optmizer, one of the reasons why the performance difference between native Scala written Spark code vs Python or Java drops significantly as can be seen in the next illustration provided by databricks:




Let us view some useful operations one can perform with spark dfs. For the sake of brevity I will continue this tutorial mainly using the python API, as the syntax does not really vary that much.

Let us create a bigger fake (and complete utterly non-sense) dataset to make operations more interesting:

customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])


The dataset is minimal for the example, so it really does not make a difference to cache it in memory. But my point is to illustrate that similarly to RDDs, you should also care about memory persistence, and can indeed use a similar API. Moreover, since next we are going to use several distinct operations of type “Action” (yielding back results to the driver), it is of good practice to signal that this dataframe should be kept in memory for further operations.

# False
# True

OK, so yes, there are some exceptions, and in this case as you can see the cache() method does not obey the laws of immutability. Moving on, let’s take a peak on it:

|customer_name|      date|category|   product_name|quantity|price|
|     Geoffrey|2016-04-22|       A|         apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|           Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|   Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|         apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|           Rice|       5| 15.0|
|     Geoffrey|2016-06-05|       A|         apples|       5| 50.0|
|     Geoffrey|2016-06-05|       A|        bananas|       5| 55.0|
|     Geoffrey|2016-06-15|       Y|    Motor skate|       7| 68.0|
|     Geoffrey|2016-06-15|       E|Book: The noose|       1|125.0|
|         Yann|2016-04-22|       B|           Lamp|       1| 38.0|
only showing top 10 rows


As previously referred, one advantage over RDDs is the ability to use standard SQL syntax to slice and dice over a Spark Dataframe. Let’s check for example all distinct products that we have:

spark_sql_table = 'customers'
sql_qry = "select distinct(product_name) from {table}".format(table=spark_sql_table)
distinct_product_names_df01 = sqlContext.sql(sql_qry)

|product_name              |
|space ship                |
|Book: The noose           |
|Rice                      |
|Motor skate               |
|Book: Crime and Punishment|
|Recycle bin               |
|Lamp                      |
|bananas                   |
|Solar Pannel              |
|apples                    |


Similarly to how we work with RDDs, with Spark DataFrames you are not restricted to SQL, and can indeed call methods on dataframes, which yields a new dataframe (remember the immutability story, right?)

Let’s check for example the same previous example on how to get all distinct:

distinct_product_names_df02 ='product_name').distinct()

|product_name |
|space ship |
|Book: The noose |
|Rice |
|Motor skate |
|Book: Crime and Punishment|
|Recycle bin |
|Lamp |
|bananas |
|Solar Pannel |
|apples |


Note that you can as well work on a dataframe just like you iterate on an RDD, which causes the dataframe to be cast into a RDD in the background:

distinct_product_names_rdd = row: row[3])
# calling will yield an exception
except Exception as err:
 print('Error: {}'.format(err))
 print('BUT calling .take() this works, as it is an RDD now: {}' \
 .format(distinct_product_names_rdd.take(3)) )

distinct_product_names = distinct_product_names_rdd.distinct().collect()
print('Distinct product names are: {}'.format(distinct_product_names))
print('\n... And, as expected, they are {} in total.' \

Error: 'PipelinedRDD' object has no attribute 'show'
BUT calling .take() this works, as it is an RDD now: ['apples', 'Lamp', 'Solar Pannel']
Distinct product names are: ['Rice', 'bananas', 'space ship', 'Book: The noose', 'Solar Pannel', 'Recycle bin', 'apples', 'Lamp', 'Book: Crime and Punishment', 'Motor skate']

... And, as expected, they are 10 in total.


Similar than in R or Python, you can have summary statistics. Let’s get statistics for Category, quantity and price columns:

customers.describe('category', 'quantity', 'price').show()
|summary|category|          quantity|             price|
|  count|      39|                39|                39|
|   mean|    null| 4.153846153846154| 68.94871794871794|
| stddev|    null|2.9694803863945936|59.759583559965165|
|    min|       A|                 1|              15.0|
|    max|       Z|                15|             227.0|


Being a categorical variable, as you can see the summary regarding the ‘category’ column does not make much sense. We’ll work on better on this later, but for now, let’s do something simple and try to look into product_name and category.

Let’s suppose we are interested in finding out how does purchasing behavior vary among categories, namely what is the average frequency of purchases per category.

cat_freq_per_cust = customers.stat.crosstab('customer_name', 'category')

|customer_name_category|  Y|  Z|  A|  B|  C|  D|  E|
|              Geoffrey|  1|  0|  4|  1|  1|  1|  1|
|                Yoshua|  1|  2|  3|  1|  1|  1|  0|
|                  Yann|  2|  1|  2|  2|  1|  3|  2|
|                Jurgen|  2|  1|  3|  0|  1|  1|  0|


cols = cat_freq_per_cust.columns

|summary|Y                 |Z                |A                |B                |C  |D                 |E                 |
|count  |4                 |4                |4                |4                |4  |4                 |4                 |
|mean   |1.5               |1.0              |3.0              |1.0              |1.0|1.5               |0.75              |
|stddev |0.5773502691896257|0.816496580927726|0.816496580927726|0.816496580927726|0.0|0.9999999999999999|0.9574271077563381|
|min    |1                 |0                |2                |0                |1  |1                 |0                 |
|max    |2                 |2                |4                |2                |1  |3                 |2                 |


Note: “count” column does not have much meaning here; it simply counts number of rows blindly, and as expected since the rows without a given value are filled with “zero” instead of Null, it counts them too.

Let’s check the same for products.

# Let's view products vertically, for formatting reasons
customers.stat.crosstab('product_name', 'customer_name' )\
|Book: Crime and Punishment|0       |0     |1   |0     |
|Recycle bin               |0       |1     |2   |1     |
|Solar Pannel              |1       |0     |1   |0     |
|space ship                |0       |1     |1   |2     |
|Motor skate               |1       |2     |2   |1     |
|Rice                      |1       |1     |1   |1     |
|apples                    |3       |0     |0   |1     |
|Lamp                      |1       |0     |2   |1     |
|bananas                   |1       |3     |2   |2     |
|Book: The noose           |1       |0     |1   |0     |


freq_products = customers.stat.crosstab('customer_name', 'product_name')
freq_prod_cols = freq_products.columns

|summary|          bananas|       space ship|   Book: The noose|Rice|       Motor skate|             Lamp|            apples|Book: Crime and Punishment|      Recycle bin|      Solar Pannel|
|  count|                4|                4|                 4|   4|                 4|                4|                 4|                         4|                4|                 4|
|   mean|              2.0|              1.0|               0.5| 1.0|               1.5|              1.0|               1.0|                      0.25|              1.0|               0.5|
| stddev|0.816496580927726|0.816496580927726|0.5773502691896257| 0.0|0.5773502691896257|0.816496580927726|1.4142135623730951|                       0.5|0.816496580927726|0.5773502691896257|
|    min|                1|                0|                 0|   1|                 1|                0|                 0|                         0|                0|                 0|
|    max|                3|                2|                 1|   1|                 2|                2|                 3|                         1|                2|                 1|

Alternatively we could have answered suing a more SQL-like approach:

cust_prod = customers.groupBy('customer_name', 'product_name') \
 .agg(func.count('product_name')) \
 .orderBy('product_name', 'customer_name'), truncate=False)

|customer_name|product_name              |count(product_name)|
|Yann         |Book: Crime and Punishment|1                  |
|Geoffrey     |Book: The noose           |1                  |
|Yann         |Book: The noose           |1                  |
|Geoffrey     |Lamp                      |1                  |
|Yann         |Lamp                      |2                  |
|Yoshua       |Lamp                      |1                  |
|Geoffrey     |Motor skate               |1                  |
|Jurgen       |Motor skate               |2                  |
|Yann         |Motor skate               |2                  |
|Yoshua       |Motor skate               |1                  |
|Jurgen       |Recycle bin               |1                  |
|Yann         |Recycle bin               |2                  |
|Yoshua       |Recycle bin               |1                  |
|Geoffrey     |Rice                      |1                  |
|Jurgen       |Rice                      |1                  |
|Yann         |Rice                      |1                  |
|Yoshua       |Rice                      |1                  |
|Geoffrey     |Solar Pannel              |1                  |
|Yann         |Solar Pannel              |1                  |
|Geoffrey     |apples                    |3                  |
|Yoshua       |apples                    |1                  |
|Geoffrey     |bananas                   |1                  |
|Jurgen       |bananas                   |3                  |
|Yann         |bananas                   |2                  |
|Yoshua       |bananas                   |2                  |
|Jurgen       |space ship                |1                  |
|Yann         |space ship                |1                  |
|Yoshua       |space ship                |2                  |


cust_prod.groupBy('product_name') \
 .agg(func.avg('count(product_name)')) \
|product_name              |avg(count(product_name))|
|space ship                |1.3333333333333333      |
|Book: The noose           |1.0                     |
|Rice                      |1.0                     |
|Motor skate               |1.5                     |
|Book: Crime and Punishment|1.0                     |
|Recycle bin               |1.3333333333333333      |
|Lamp                      |1.3333333333333333      |
|bananas                   |2.0                     |
|Solar Pannel              |1.0                     |
|apples                    |2.0                     |

Note that this does not take into account the quantity of each product bought, just how many times client was there to buy a given item. For our last question that may be OK, because we understand the different nature of products.
But let us assume it is not OK, and we do need to count by quantity. For that we can use pivot dataframes, a feature introduced in Spark 1.6

total_products_bought_by_customer = customers.groupBy('customer_name')\

|customer_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
|         Yann|                         5|              5|   3|          2|         10|  15|           5|  null|      6|         1|
|       Yoshua|                      null|           null|   2|          4|          5|   5|        null|    10|     18|         7|
|     Geoffrey|                      null|              1|   2|          7|       null|   5|           1|     9|      5|      null|
|       Jurgen|                      null|           null|null|          3|          5|   5|        null|  null|     15|         1|


Hope this has been a good introduction, and I suggest that you further check out this great post from databricks covering similar operations.

For the last part of this tutorial series I plan to cover window functions, something that until the more recent Spark 2.0 was possible thanks to HiveContext.

Getting started with Spark in Python/Scala

This is part of a series of introductory posts about Spark, meant to help beginners getting started with it. Hope it helps!

So what’s that funky business people call Spark?

Essentially Apache Spark is a framework for distributing parallel computational (inherently iterative) work across many nodes in a cluster of servers maintaining high performance and High Availability (HA) while working with commodity servers. It abstracts core complexities that distributed computing activities are subject to (such as resource scheduling, job submission and execution, tracking, message passing between nodes, etc), and provides developers a higher level API – in Java, Python, R and Scala – to manipulate and work with data.

Why is everyone going crazzy about it? Well, thanks to some cool features (such as in-memory caching, lazy evaluation of some operations, etc), it has been doing pretty well lately in performance benchmarks.

In this article I want to avoid getting into more engineering details about Spark, and focus more on developer basic concepts; but it’s always useful to have some general idea. Spark requires scala programming language to run, and (as you probably guessed) Java Runtime Environment (JRE) and Java Development Kit (JDK) installed.

A Spark cluster is made up of two main types of processes: a driver program, and worker program (where executors run). To its core, the programming model is that the driver passes functions to be executed on the worker nodes, which eventually return a value to the driver. The worker programs can run either on cluster nodes, or on local threads, and they perform compute operations on data.

spark-cluster-overviewBootstrapping env

OK, enough chit-chat, lets get our hands dirty. In case the procrastinator side of you is preparing to hit ctrl+w with the pseudo-sophisticated argument that you can’t run spark except on a cluster, well I’ve got bad news.. Spark can be run using the built-in standalone cluster scheduler in the local mode (e.g. driver and executors processes running within the same JVM – single multithreaded instance).

The first thing a Spark program does is to create a SparkContext object (for Scala/Python, JavaSparkContext for Java, and SparkR.init for R), which tells Spark how to use the cluster resources.  Here is an example in Python:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName('My first app')
sc = SparkContext(conf = conf)

This example shows how to create a context running in local mode (using 4 threads). Lets exemplify in Scala, but now in cluster mode (e.g. connecting to an existing cluster) instead of local (usually, your pc):

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setAppName("My first app")
val sc = new SparkContext(conf)

Last, but not least, to acess Spark from the shell navigate to Spark base directory and in Scala:


Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

.. Or for PySpark (as of the time of writting of this post, I’m using locally Spark 1.5.1):


Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.1

Using Python version 3.4.3 (default, Mar 6 2015 12:06:10)
SparkContext available as sc, HiveContext available as sqlContext.

Please note that when you access Spark from the shell (only available for Scala & Python), the sparkContext is bootstrapped automatically for you (in local mode), and in Python and Scala you access this object by referencing “sc” (which, if you look carefully, the shell also instructs you).

El Kabong time

The first core concept to learn are Spark’s own data collection structure, the Resilient Distributed Datasets (RDDs). RDDs are immutable collections of records that are split and spread across cluster nodes, and saved either in memory or disk. Another interesting property is that RDDs are fault-tolerant: data can be rebuilt even on node failure.

Please give a warm welcome to RDDs, because if you want to “talk parallel lingo”, RDDs are your new best friends and currency when dealing with Spark.


The first option to create an RDD in PySpark is from a list:

my_app_name = 'hello spark';
conf = SparkConf().setAppName(my_app_name)
data = range(10) distData = sc.parallelize(data)

Please keep in mind that although the list in python is a mutable object, passing it the RDD creation will obviously have no reflection on RDDs immutable nature. Much like tuples in Python, operations you do on RDDs will create a new object. This means that our second option on how to create an RDD is.. from another already existing RDD.

This was obviously a silly example, but the important part to take into consideration is that depending on the number of nodes on your cluster, or if running locally on the number of threads specified initially in SparkContext constructor (..local[4] in our example), Spark will automatically take care of splitting RDD’s data among workers (e.g. nodes or threads).

In case you are wondering if you can change per RDD how many partitions/splits are created, the answer is yes. Both methods parallelize() and textFile (as we will see next) accept an additional parameter, where you can specify an integer.

Lets look at the same example, but in scala and specifying parallelism:

val data = 0 to 9
val distData = sc.parallelize(data, 4)

Besides collections (created in the driver program) and transformations on other RDDs, you can also create RDDs by referencing external sources, such as local file system, distributed file systems (hdfs), cloud storage (S3),  Cassandra, HBase, etc.

Here are some simple examples in Scala:

val localFileDist = sc.textFile("myfile.txt")
val hdfsFileDist = sc.textFile("hdfs://{HOST_IP}:{PORT}/path/to/file")
val s3FileDist = sc.textFile("s3n://bucket/...")

Though you can also use the SparkContext textFile() method for grapping files from AWS S3, heads up for some possible issues.

You should also keep in mind that RDDs are lazily evaluated. In other words, only when you actually perform an action (we will get into that in just a few seconds) computation on it. As a matter a fact even on the case of loading a file with the textFile() method, all you are doing is storing a pointer to the file location.

This becomes very clear if you are using the shell, and do for example the following action to return one element from the RDD to the driver:


Transformations & Actions

Now, before you get all excited to perform operations on RDDs, there is yet another important aspect to grasp. There are two main types of operations that can be applied on RDDs, namely: actions and transformations.

The key distinctions between the two is that transformations are lazily evaluated and create new RDDs from existing ones, where actions are immediately evaluated and return values to the driver program. In other words, you can think of transformations as just recipes of data, and transformations the real cooking.

Yes, in our previous example, the take() method was indeed an action. Let’s continue with our silly example to illustrate this aspect, starting with Python:

derivateData = x: x**2).filter(lambda x: x%2 == 0).map(lambda x: x+1)
result = derivateData.reduce(lambda x,y: x+y)

..last, but not least in scala:

val derivateData = => x*x).filter(x => x%2 == 0).map(x => x+1)
val result = derivateData.reduce(_ + _)

On our first line we are simply performing transformations on our base RDD (distData), namely map and filter operations.

Furthermore, if you are following along with Spark shell, then it will be clear to you that only when you execute the second line – with the action reduce – that Spark will “wake up” and perform the desired transformations on each worker node plus the reduce action, and finally return the result to the driver.

As you can see, you can conveniently chain several transformations on RDDs, which will upon an action be computed in memory fashion on worker nodes. Moreover, all of these operations – both transformations (map +filter) and action (reduce) occurred in memory, without having to store intermediate results on disk. This is one of the main secrets that make benchmarks from Spark agains Hadoop’s MapReduce jobs so disproportionately different.

However, after returning values to the driver, these RDDs will be cleared from memory for further computations. Now before you panic, the good folks at Berkley anticipated your wish, and there is a method you can call on an RDD to persist it in cache. In our example, to persist the derivateData in memory, we should call .cache() or .persist() method on it before calling any action.

In part 2, I will cover Spark DataFrames, also equally important.

In the meanwhile, where to go next? Here’s a suggestion: Berkley AMPcamp Introduction exercises.

Hope this helped!

Resilience measures with HP IRF: ISSU, GR and MAD (Part I)

In an effort to come down to earth and cover a topic which can be useful for the majority of now-a-days Enterprises that have HP gear, I will cover resilience features one can/should use in a HP Networking environment along with IRF.

Though I don’t argue that this is a Best Practice for all cases, clustering HP (former 3Com) switches with IRF can be a great solution to a lot of problems. How? Basically using a simple and thus effective ingredient that speaks by itself: drastic topology complexity reduction. Aggregation of devices to function as one can be an early Christmas for many cases: You get to do so many more LAGs (MLAGs preferably), your Spanning Tree is much simpler (I’m not that big of a fan of the HPN marketing papers issuing STPs death certificate by IRF, you gotta admit human mistake!), your linkstate DB gets much simpler, and you have less devices to config and manage (after you get centralized control & management planes).

Creating a Non-stop network environment
I’m not going to focus on HP’s claim on convergence time above HPs claim. That should be left for a demo by HP guys. What I want to focus on are the Software features that can be used to create an even more resilient network along with IRF, and which require human config.
These features are In-Service-Software-Upgrade (ISSU) – so yes, you get this bonus feature on standalone switches (where this feature is not natively present) when you setup an IRF cluster – Graceful Restart, and Multi-Active detection – commonly know as split brain detection.

First you have to take into consideration that the way IRF works when you do virtual clustering with standalone switches, is exactly the way chassis-based switches work: Master MPU controls Management & Control planes, synchronizes real time with standby MPU, and forwarding plane is active on all LPUs. The difference being that in standalone switches one of the switch acts as a Master and the all the rest of the nodes as standby “MPUs”.
So when your start a Software upgrade on a IRF stack, the first members to be upgraded are the standby nodes. After this job is completed, one of the standby nodes gets elected as the new master, and failover occurs. While the new elected node acts as Master, the former Master is upgraded. After this job is done, preemption-alike behavior occurs, and the former master gets reelected to the master role.
Note that the virtual cluster runs with a virtual Bridge MAC address, so L2 destination remains remains the same, and the only changes that might occur are on the link forwarding inside MLAGs. This should be neglectable if you have a solid config. Note also that the routing process (if any) running on the master will not be restarted during the service upgrade.


Comment Relative To former post “HP MSM Controllers Initial Setup Considerations”

This post is an answer to a comment relative to this former post “HP MSM Controllers Initial Setup Considerations“. (I wanted to add some drawings to the answer to make it clearer, so ended up using another post.) Thank you for your comment. Please let me know if I understood your comment well, or got it all wrong.

George P Isaac’s comment was:

As per my understanding..In option 2 we have to do following config.

1.We should tag particular VLAN in either internet port or access port
2.we should assign IP address to particular VLAN and gateway..
3.Nothing is required in VSC mapping in AP group.

then Where will I specify VLAN mapping..??

“extending the ingress interface to the egress interface “– is this option used when internet port and tunneled network in same VLAN??

OK to be fair my post could have been clearer (probably related to the fact that I’m not an expert on HP’s MSM solution). I took the next figure from a MSM Controller Config manual (section 4-30), which summarizes pretty well in my opinion what the options are for Access-Controlled traffic.

Access-controlled Flow of traffic

So let me recap: “Option 2” should actually have been Option 2 A) and B). Essentially these options are:

2. A) Having Access-Controlled Clients doing Web Auth on the HP MSM Controller, and then being ejected straight to the a default Gateway of a different interface that bypasses the Corporate Network. So in summary what the Egress VLAN is doing is defining a new default Gateway – for instance the routing device connected to the Internet – for the controller to use specifically for the clients assigned to that VLAN. The reasoning for this option might be that the Network Admin is worried that it may allow clients to access corporate resources, the Controller’s default Gateway might not have the adjusted ACLs for handling the Client traffic, or you might prefer to simply save that router CPU. In any case – the main goal is to bypass the Corporate Network.

In this option Clients still have an IP address in a different subnet from the egress VLAN subnet. This is the reason why you should enable NAT on that interface (to simplify): because clients will be placed on a subnet to which the gateway has no route to. Alternatively you can also configure a Route on that gateway to that subnet.

2. B) Having Access-Controlled Clients doing Web Auth on the HP MSM Controller, and then being ejected to a restricted VLAN, and receiving IP addresses on the egress VLAN. Hence the term extending the Egress VLAN to clients. Sort of like in non-access-controlled scenarios, but in this case the controller is actually routing on the background, though clients can’t notice it.

Please note that Option 2 A) and B) have essentially one thing in common: clients bypass Corporate resources. However in example B, clients actually receive an IP address in the VLAN where they are ejected either given by a DHCP Server resident on that VLAN, or by the Egress VLAN’s Gateway, which is implementing DHCP Relay.

then Where will I specify VLAN mapping..??” 

So in both cases, you have to alternatives to specify the VLAN Mapping: VSC level or user-based level. In VSC level you simply grab all authenticated users and forward them on the same VLAN. In user-based Egress VLANs you get more granularity by specifying customized VLAN IDs to user account profiles. Or you can implement both altogether, where user-based specifics override VSC level Egress VLAN definitions.

However in option B – where you extend the Egress VLAN IP addressing to the clients – you have some additional settings to configure, which I specified in the previous post:

  • In the global DHCP relay settings, select the checkbox for extending the ingress interface to the egress interface. After this is enabled, you will no longer be able to specify the IP address and subnet mask settings on VSCs.
  • Disable NAT on the egress VLAN IP interface.
  • Set the MSM controller’s IP address for the default gateway and DNS server (it will forward them to the correct ones).

“extending the ingress interface to the egress interface “– is this option used when internet port and tunneled network in same VLAN??

Well the Egress VLAN (whether extended or not to the client) might be implemented in either port (LAN or Internet). I would rather say that this option might make sense being used when you prefer having the “Bypassing main Corporate Network feature” + “the non-access-controlled alike behavior” all together. This solution might greatly simplify your manual setup, when you want certain clients to access some corporate resources, for instance. Having the same Subnet as those corporate resources might be advantageous for your setup.

Hope this helps. Cheers!