Getting Started with Spark (part 3) – UDFs & Window functions

This post attempts to continue the previous introductory series “Getting started with Spark in Python” with the topics UDFs and Window Functions. Unfortunately it stayed marinating in my Word press for quite a while (was already more less 70% complete) and only now had the opportunity to complete it.

Note: For this post I’m using Spark 1.6.1. There are some minor differences in comparison to the new coming Spark 2.0, such as using a SparkSession object to initialize the Spark Context, instead of HiveContext as I do here. Nonetheless, the important parts are common in both.

Continue reading “Getting Started with Spark (part 3) – UDFs & Window functions”

Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation. Continue reading “Spark 2.0: From quasi to proper-streaming?”

Spark – Redshift: AWS Roles to the rescue

If you are using AWS to host your applications, you probably heard that you can apply IAM Roles also to ec2 instances. In a lot of cases this can be a really cool way to avoid passing AWS credentials to your applications, and having the pain of having to manage key distribution among servers, as well as ensuring key rotation mechanisms for security purposes.

This post is about a simple trick on how to take advantage of this feature when your Spark job needs to interact with AWS Redshift.

As can be read in Databricks repo for Spark-redshift library the are three (3) strategies for setting up AWS credentials: either setup in hadoop configuration (how many people are used to so far with Cloudera or HortonWorks), encoding the keys in a tempdir (by far not the best option if you ask me), or using temporary keys. The last strategy is the one being discussed here, and its based on AWS own documentation how to use temporary keys. Continue reading “Spark – Redshift: AWS Roles to the rescue”

Getting started with the Spark (part 2) – SparkSQL

Update: this tutorial has been updated mainly up to Spark 1.6.2 (with a minor detail regarding Spark 2.0), which is not the most recent version of Spark at the moment of updating of this post. Nonetheless, for the operations exemplified you can pretty much rest assured that the API has not changed substantially. I will try to do my best to update and point out these differences when they occur.

SparkSQL has become one of the core modules of Spark ecosystem (with its API already available for all currently supported languages, namely Scala, Java, Python, and R), and is intended to allow you to structure data into named columns (instead of with raw data without a schema as with RDDs) which can be queried with SQL syntax taking advantage of a distributed query engine running in the background.

In practical terms, you can very much think of it as equivalent as an attempt to emulate the API’s of popular dataframes in R or Python pandas, as well as SQL tables. However, if I do my job correctly, I succeed in proving you that they are not the same (and hopefully more).

In order for you to use SparkSQL you need to import SQLContext. Let’s start how you would do it with Python:

# import required libs
from pyspark import SparkConf, SparkContext
# here is the import that allows you to use dataframes
from pyspark.sql import SQLContext

# initialize context in Spark 1.6.2
conf = SparkConf().setMaster('local[2]').setAppName('SparkSQL-demo')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

Note: in the new Spark 2.0, this would indeed vary as such:

# import required libs
from pyspark.sql import SparkSession
# initialize context in Spark 2.0
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
    <span class="o">.</span><span class="n">builder</span>\
    <span class="o">.</span><span class="n">appName</span><span class="p">(</span>'SparkSQL-demo')<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>

Good news, Scala does not vary much:

//import required libs
import org.apache.spark.{SparkConf, SparkContext}
//here is the import that allows you to use dataframes
import org.apache.spark.sql.SQLContext

# initialize context in Spark 1.6.2
val conf = new SparkConf().setMaster(&quot;local[2]&quot;).setAppName(&quot;SparkSQL-demo&quot;)
val sc = new SparkContext(conf=conf)
val sqlContext = new SQLContext(sc)

 

A dataframe can be created from an existing RDD, as well as similarly to RDDs from a variety of different sources such astext files, HDFS, S3,  Hive tables, external databases, etc. Here’s an example how to build a dataframe by reading from S3 (in Python):

df = sqlContext.read.json('s3n://my-bucket/example/file.json')

 

Note: we use “s3n” prefix which is valid for a serious of regions in AWS, but not valid for all of them. For example, to access S3 buckets in Frankfurt region you would rather use “s3a” as a prefix.

What sets Spark Dataframes apart

However, and even though the goal is to hide the maximum amount of disparities, there are nonetheless big conceptual differences between a Spark Dataframe and a Pandas dataframe. The first is that the immutability side has (thank God) not been removed. In other words, when you call an action on an Dataframe, you do need to save it into a variable. I’ve seen a lot of data scientists stumble on this in their first Spark days, so let us illustrate this in a very simple example:


# Create a Dataframe with pseudo data:
column_names = [&quot;customer_name&quot;, &quot;date&quot;, &quot;category&quot;,
&quot;product_name&quot;, &quot;quantity&quot;, &quot;price&quot;]
# Note that we create an RDD, and then cast it to
# Dataframe by providing a schema
column_names = ["customer_name", "date", "category",
"product_name", "quantity", "price"]
df01 = sc.parallelize(
[("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00)]).toDF(column_names)

df02 = df01.select("customer_name", "date", "category", "product_name", "price", (df01['quantity']+1).alias("quantity"))
df01.show()
df02.show()

+-------------+----------+--------+------------+--------+-----+
|customer_name|      date|category|product_name|quantity|price|
+-------------+----------+--------+------------+--------+-----+
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|         Yann|2016-04-22|       B|        Lamp|       1| 38.0|
+-------------+----------+--------+------------+--------+-----+

# .. where df02 is, as expected df02.show() 

+-------------+----------+--------+------------+-----+--------+
|customer_name|      date|category|product_name|price|quantity|
+-------------+----------+--------+------------+-----+--------+
|     Geoffrey|2016-04-22|       A|      apples| 50.0|       2|
|         Yann|2016-04-22|       B|        Lamp| 38.0|       2|
+-------------+----------+--------+------------+-----+--------+

The second most obvious difference (compared to pandas df) is the distributed nature of a Spark Dataframe, similarly as RDDs are. As a matter a fact,in previous versions of Spark Dataframes were even called SchemaRDD. So internally the same redundancy principles of RDDs apply, you should in fact still keep in mind the Driver – Executor programming model, as well as Actions and Transformations concept.

Also important to note is that Spark Dataframes use internally a query optmizer, one of the reasons why the performance difference between native Scala written Spark code vs Python or Java drops significantly as can be seen in the next illustration provided by databricks:

perf_diffs_from_dataframes_vs_rdds

 

API

Let us view some useful operations one can perform with spark dfs. For the sake of brevity I will continue this tutorial mainly using the python API, as the syntax does not really vary that much.

Let us create a bigger fake (and complete utterly non-sense) dataset to make operations more interesting:

customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])

 

The dataset is minimal for the example, so it really does not make a difference to cache it in memory. But my point is to illustrate that similarly to RDDs, you should also care about memory persistence, and can indeed use a similar API. Moreover, since next we are going to use several distinct operations of type “Action” (yielding back results to the driver), it is of good practice to signal that this dataframe should be kept in memory for further operations.

print(customers.is_cached)
customers.cache()
print(customers.is_cached)
# False
# True

OK, so yes, there are some exceptions, and in this case as you can see the cache() method does not obey the laws of immutability. Moving on, let’s take a peak on it:

customers.show(10)

+-------------+----------+--------+---------------+--------+-----+
|customer_name|      date|category|   product_name|quantity|price|
+-------------+----------+--------+---------------+--------+-----+
|     Geoffrey|2016-04-22|       A|         apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|           Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|   Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|         apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|           Rice|       5| 15.0|
|     Geoffrey|2016-06-05|       A|         apples|       5| 50.0|
|     Geoffrey|2016-06-05|       A|        bananas|       5| 55.0|
|     Geoffrey|2016-06-15|       Y|    Motor skate|       7| 68.0|
|     Geoffrey|2016-06-15|       E|Book: The noose|       1|125.0|
|         Yann|2016-04-22|       B|           Lamp|       1| 38.0|
+-------------+----------+--------+---------------+--------+-----+
only showing top 10 rows

 

As previously referred, one advantage over RDDs is the ability to use standard SQL syntax to slice and dice over a Spark Dataframe. Let’s check for example all distinct products that we have:

spark_sql_table = 'customers'
sql_qry = "select distinct(product_name) from {table}".format(table=spark_sql_table)
customers.registerTempTable(spark_sql_table)
distinct_product_names_df01 = sqlContext.sql(sql_qry)
distinct_product_names_df01.show(truncate=False)

+--------------------------+
|product_name              |
+--------------------------+
|space ship                |
|Book: The noose           |
|Rice                      |
|Motor skate               |
|Book: Crime and Punishment|
|Recycle bin               |
|Lamp                      |
|bananas                   |
|Solar Pannel              |
|apples                    |
+--------------------------+

 

Similarly to how we work with RDDs, with Spark DataFrames you are not restricted to SQL, and can indeed call methods on dataframes, which yields a new dataframe (remember the immutability story, right?)

Let’s check for example the same previous example on how to get all distinct:

distinct_product_names_df02 = customers.select('product_name').distinct()
distinct_product_names_df02.show(truncate=False)
distinct_product_names_df02.count()

+--------------------------+
|product_name |
+--------------------------+
|space ship |
|Book: The noose |
|Rice |
|Motor skate |
|Book: Crime and Punishment|
|Recycle bin |
|Lamp |
|bananas |
|Solar Pannel |
|apples |
+--------------------------+
10

 

Note that you can as well work on a dataframe just like you iterate on an RDD, which causes the dataframe to be cast into a RDD in the background:


distinct_product_names_rdd = customers.map(lambda row: row[3])
# calling distinct_product_names_rdd.show() will yield an exception
try:
 distinct_product_names_rdd.show()
except Exception as err:
 print('Error: {}'.format(err))
 print('BUT calling .take() this works, as it is an RDD now: {}' \
 .format(distinct_product_names_rdd.take(3)) )

distinct_product_names = distinct_product_names_rdd.distinct().collect()
print('Distinct product names are: {}'.format(distinct_product_names))
print('\n... And, as expected, they are {} in total.' \
 .format(len(distinct_product_names)))

Error: 'PipelinedRDD' object has no attribute 'show'
BUT calling .take() this works, as it is an RDD now: ['apples', 'Lamp', 'Solar Pannel']
Distinct product names are: ['Rice', 'bananas', 'space ship', 'Book: The noose', 'Solar Pannel', 'Recycle bin', 'apples', 'Lamp', 'Book: Crime and Punishment', 'Motor skate']

... And, as expected, they are 10 in total.

 

Similar than in R or Python, you can have summary statistics. Let’s get statistics for Category, quantity and price columns:

customers.describe('category', 'quantity', 'price').show()
&amp;amp;amp;amp;nbsp;
+-------+--------+------------------+------------------+
|summary|category|          quantity|             price|
+-------+--------+------------------+------------------+
|  count|      39|                39|                39|
|   mean|    null| 4.153846153846154| 68.94871794871794|
| stddev|    null|2.9694803863945936|59.759583559965165|
|    min|       A|                 1|              15.0|
|    max|       Z|                15|             227.0|
+-------+--------+------------------+------------------+

 

Being a categorical variable, as you can see the summary regarding the ‘category’ column does not make much sense. We’ll work on better on this later, but for now, let’s do something simple and try to look into product_name and category.

Let’s suppose we are interested in finding out how does purchasing behavior vary among categories, namely what is the average frequency of purchases per category.

cat_freq_per_cust = customers.stat.crosstab('customer_name', 'category')
cat_freq_per_cust.show()

+----------------------+---+---+---+---+---+---+---+
|customer_name_category|  Y|  Z|  A|  B|  C|  D|  E|
+----------------------+---+---+---+---+---+---+---+
|              Geoffrey|  1|  0|  4|  1|  1|  1|  1|
|                Yoshua|  1|  2|  3|  1|  1|  1|  0|
|                  Yann|  2|  1|  2|  2|  1|  3|  2|
|                Jurgen|  2|  1|  3|  0|  1|  1|  0|
+----------------------+---+---+---+---+---+---+---+

 

cols = cat_freq_per_cust.columns
cat_freq_per_cust.describe(cols[1::]).show(truncate=False)

+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+
|summary|Y                 |Z                |A                |B                |C  |D                 |E                 |
+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+
|count  |4                 |4                |4                |4                |4  |4                 |4                 |
|mean   |1.5               |1.0              |3.0              |1.0              |1.0|1.5               |0.75              |
|stddev |0.5773502691896257|0.816496580927726|0.816496580927726|0.816496580927726|0.0|0.9999999999999999|0.9574271077563381|
|min    |1                 |0                |2                |0                |1  |1                 |0                 |
|max    |2                 |2                |4                |2                |1  |3                 |2                 |
+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+

 

Note: “count” column does not have much meaning here; it simply counts number of rows blindly, and as expected since the rows without a given value are filled with “zero” instead of Null, it counts them too.

Let’s check the same for products.

# Let's view products vertically, for formatting reasons
customers.stat.crosstab('product_name', 'customer_name' )\
.show(truncate=False)
+--------------------------+--------+------+----+------+
|product_name_customer_name|Geoffrey|Jurgen|Yann|Yoshua|
+--------------------------+--------+------+----+------+
|Book: Crime and Punishment|0       |0     |1   |0     |
|Recycle bin               |0       |1     |2   |1     |
|Solar Pannel              |1       |0     |1   |0     |
|space ship                |0       |1     |1   |2     |
|Motor skate               |1       |2     |2   |1     |
|Rice                      |1       |1     |1   |1     |
|apples                    |3       |0     |0   |1     |
|Lamp                      |1       |0     |2   |1     |
|bananas                   |1       |3     |2   |2     |
|Book: The noose           |1       |0     |1   |0     |
+--------------------------+--------+------+----+------+

 


freq_products = customers.stat.crosstab('customer_name', 'product_name')
freq_prod_cols = freq_products.columns
freq_products.describe(freq_prod_cols[1::]).show()

+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+
|summary|          bananas|       space ship|   Book: The noose|Rice|       Motor skate|             Lamp|            apples|Book: Crime and Punishment|      Recycle bin|      Solar Pannel|
+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+
|  count|                4|                4|                 4|   4|                 4|                4|                 4|                         4|                4|                 4|
|   mean|              2.0|              1.0|               0.5| 1.0|               1.5|              1.0|               1.0|                      0.25|              1.0|               0.5|
| stddev|0.816496580927726|0.816496580927726|0.5773502691896257| 0.0|0.5773502691896257|0.816496580927726|1.4142135623730951|                       0.5|0.816496580927726|0.5773502691896257|
|    min|                1|                0|                 0|   1|                 1|                0|                 0|                         0|                0|                 0|
|    max|                3|                2|                 1|   1|                 2|                2|                 3|                         1|                2|                 1|
+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+

Alternatively we could have answered suing a more SQL-like approach:

cust_prod = customers.groupBy('customer_name', 'product_name') \
 .agg(func.count('product_name')) \
 .orderBy('product_name', 'customer_name')
cust_prod.show(cust_prod.count(), truncate=False)

+-------------+--------------------------+-------------------+
|customer_name|product_name              |count(product_name)|
+-------------+--------------------------+-------------------+
|Yann         |Book: Crime and Punishment|1                  |
|Geoffrey     |Book: The noose           |1                  |
|Yann         |Book: The noose           |1                  |
|Geoffrey     |Lamp                      |1                  |
|Yann         |Lamp                      |2                  |
|Yoshua       |Lamp                      |1                  |
|Geoffrey     |Motor skate               |1                  |
|Jurgen       |Motor skate               |2                  |
|Yann         |Motor skate               |2                  |
|Yoshua       |Motor skate               |1                  |
|Jurgen       |Recycle bin               |1                  |
|Yann         |Recycle bin               |2                  |
|Yoshua       |Recycle bin               |1                  |
|Geoffrey     |Rice                      |1                  |
|Jurgen       |Rice                      |1                  |
|Yann         |Rice                      |1                  |
|Yoshua       |Rice                      |1                  |
|Geoffrey     |Solar Pannel              |1                  |
|Yann         |Solar Pannel              |1                  |
|Geoffrey     |apples                    |3                  |
|Yoshua       |apples                    |1                  |
|Geoffrey     |bananas                   |1                  |
|Jurgen       |bananas                   |3                  |
|Yann         |bananas                   |2                  |
|Yoshua       |bananas                   |2                  |
|Jurgen       |space ship                |1                  |
|Yann         |space ship                |1                  |
|Yoshua       |space ship                |2                  |
+-------------+--------------------------+-------------------+

 

cust_prod.groupBy('product_name') \
 .agg(func.avg('count(product_name)')) \
 .show(truncate=False)
+--------------------------+------------------------+
|product_name              |avg(count(product_name))|
+--------------------------+------------------------+
|space ship                |1.3333333333333333      |
|Book: The noose           |1.0                     |
|Rice                      |1.0                     |
|Motor skate               |1.5                     |
|Book: Crime and Punishment|1.0                     |
|Recycle bin               |1.3333333333333333      |
|Lamp                      |1.3333333333333333      |
|bananas                   |2.0                     |
|Solar Pannel              |1.0                     |
|apples                    |2.0                     |
+--------------------------+------------------------+

Note that this does not take into account the quantity of each product bought, just how many times client was there to buy a given item. For our last question that may be OK, because we understand the different nature of products.
But let us assume it is not OK, and we do need to count by quantity. For that we can use pivot dataframes, a feature introduced in Spark 1.6

total_products_bought_by_customer = customers.groupBy('customer_name')\
 .pivot('product_name').sum('quantity')
total_products_bought_by_customer.show()

+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|customer_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|         Yann|                         5|              5|   3|          2|         10|  15|           5|  null|      6|         1|
|       Yoshua|                      null|           null|   2|          4|          5|   5|        null|    10|     18|         7|
|     Geoffrey|                      null|              1|   2|          7|       null|   5|           1|     9|      5|      null|
|       Jurgen|                      null|           null|null|          3|          5|   5|        null|  null|     15|         1|
+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+

 

Hope this has been a good introduction, and I suggest that you further check out this great post from databricks covering similar operations.

For the last part of this tutorial series I plan to cover window functions, something that until the more recent Spark 2.0 was possible thanks to HiveContext.

Getting started with Spark in Python/Scala

This is part of a series of introductory posts about Spark, meant to help beginners getting started with it. Hope it helps!

So what’s that funky business people call Spark?

Essentially Apache Spark is a framework for distributing parallel computational (inherently iterative) work across many nodes in a cluster of servers maintaining high performance and High Availability (HA) while working with commodity servers. It abstracts core complexities that distributed computing activities are subject to (such as resource scheduling, job submission and execution, tracking, message passing between nodes, etc), and provides developers a higher level API – in Java, Python, R and Scala – to manipulate and work with data.

Why is everyone going crazzy about it? Well, thanks to some cool features (such as in-memory caching, lazy evaluation of some operations, etc), it has been doing pretty well lately in performance benchmarks.

In this article I want to avoid getting into more engineering details about Spark, and focus more on developer basic concepts; but it’s always useful to have some general idea. Spark requires scala programming language to run, and (as you probably guessed) Java Runtime Environment (JRE) and Java Development Kit (JDK) installed.

A Spark cluster is made up of two main types of processes: a driver program, and worker program (where executors run). To its core, the programming model is that the driver passes functions to be executed on the worker nodes, which eventually return a value to the driver. The worker programs can run either on cluster nodes, or on local threads, and they perform compute operations on data.

spark-cluster-overviewBootstrapping env

OK, enough chit-chat, lets get our hands dirty. In case the procrastinator side of you is preparing to hit ctrl+w with the pseudo-sophisticated argument that you can’t run spark except on a cluster, well I’ve got bad news.. Spark can be run using the built-in standalone cluster scheduler in the local mode (e.g. driver and executors processes running within the same JVM – single multithreaded instance).

The first thing a Spark program does is to create a SparkContext object (for Scala/Python, JavaSparkContext for Java, and SparkR.init for R), which tells Spark how to use the cluster resources.  Here is an example in Python:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster('local[4]')
.setAppName('My first app')
sc = SparkContext(conf = conf)

This example shows how to create a context running in local mode (using 4 threads). Lets exemplify in Scala, but now in cluster mode (e.g. connecting to an existing cluster) instead of local (usually, your pc):

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("spark://{MASTER_HOST_IP}:{PORT}")
.setAppName("My first app")
val sc = new SparkContext(conf)

Last, but not least, to acess Spark from the shell navigate to Spark base directory and in Scala:

./bin/spark-shell

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1
/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
scala>

.. Or for PySpark (as of the time of writting of this post, I’m using locally Spark 1.5.1):

./bin/pyspark

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.1
/_/

Using Python version 3.4.3 (default, Mar 6 2015 12:06:10)
SparkContext available as sc, HiveContext available as sqlContext.

Please note that when you access Spark from the shell (only available for Scala & Python), the sparkContext is bootstrapped automatically for you (in local mode), and in Python and Scala you access this object by referencing “sc” (which, if you look carefully, the shell also instructs you).

El Kabong time

The first core concept to learn are Spark’s own data collection structure, the Resilient Distributed Datasets (RDDs). RDDs are immutable collections of records that are split and spread across cluster nodes, and saved either in memory or disk. Another interesting property is that RDDs are fault-tolerant: data can be rebuilt even on node failure.

Please give a warm welcome to RDDs, because if you want to “talk parallel lingo”, RDDs are your new best friends and currency when dealing with Spark.

buzzlight_rdds

The first option to create an RDD in PySpark is from a list:

my_app_name = 'hello spark';
conf = SparkConf().setAppName(my_app_name)
data = range(10) distData = sc.parallelize(data)

Please keep in mind that although the list in python is a mutable object, passing it the RDD creation will obviously have no reflection on RDDs immutable nature. Much like tuples in Python, operations you do on RDDs will create a new object. This means that our second option on how to create an RDD is.. from another already existing RDD.

This was obviously a silly example, but the important part to take into consideration is that depending on the number of nodes on your cluster, or if running locally on the number of threads specified initially in SparkContext constructor (..local[4] in our example), Spark will automatically take care of splitting RDD’s data among workers (e.g. nodes or threads).

In case you are wondering if you can change per RDD how many partitions/splits are created, the answer is yes. Both methods parallelize() and textFile (as we will see next) accept an additional parameter, where you can specify an integer.

Lets look at the same example, but in scala and specifying parallelism:

val data = 0 to 9
val distData = sc.parallelize(data, 4)

Besides collections (created in the driver program) and transformations on other RDDs, you can also create RDDs by referencing external sources, such as local file system, distributed file systems (hdfs), cloud storage (S3),  Cassandra, HBase, etc.

Here are some simple examples in Scala:

val localFileDist = sc.textFile("myfile.txt")
val hdfsFileDist = sc.textFile("hdfs://{HOST_IP}:{PORT}/path/to/file")
val s3FileDist = sc.textFile("s3n://bucket/...")

Though you can also use the SparkContext textFile() method for grapping files from AWS S3, heads up for some possible issues.

You should also keep in mind that RDDs are lazily evaluated. In other words, only when you actually perform an action (we will get into that in just a few seconds) computation on it. As a matter a fact even on the case of loading a file with the textFile() method, all you are doing is storing a pointer to the file location.

This becomes very clear if you are using the shell, and do for example the following action to return one element from the RDD to the driver:

distData.take(1)

Transformations & Actions

Now, before you get all excited to perform operations on RDDs, there is yet another important aspect to grasp. There are two main types of operations that can be applied on RDDs, namely: actions and transformations.

The key distinctions between the two is that transformations are lazily evaluated and create new RDDs from existing ones, where actions are immediately evaluated and return values to the driver program. In other words, you can think of transformations as just recipes of data, and transformations the real cooking.

Yes, in our previous example, the take() method was indeed an action. Let’s continue with our silly example to illustrate this aspect, starting with Python:

derivateData = distData.map(lambda x: x**2).filter(lambda x: x%2 == 0).map(lambda x: x+1)
result = derivateData.reduce(lambda x,y: x+y)
result

..last, but not least in scala:

val derivateData = distData.map(x => x*x).filter(x => x%2 == 0).map(x => x+1)
val result = derivateData.reduce(_ + _)
result

On our first line we are simply performing transformations on our base RDD (distData), namely map and filter operations.

Furthermore, if you are following along with Spark shell, then it will be clear to you that only when you execute the second line – with the action reduce – that Spark will “wake up” and perform the desired transformations on each worker node plus the reduce action, and finally return the result to the driver.

As you can see, you can conveniently chain several transformations on RDDs, which will upon an action be computed in memory fashion on worker nodes. Moreover, all of these operations – both transformations (map +filter) and action (reduce) occurred in memory, without having to store intermediate results on disk. This is one of the main secrets that make benchmarks from Spark agains Hadoop’s MapReduce jobs so disproportionately different.

However, after returning values to the driver, these RDDs will be cleared from memory for further computations. Now before you panic, the good folks at Berkley anticipated your wish, and there is a method you can call on an RDD to persist it in cache. In our example, to persist the derivateData in memory, we should call .cache() or .persist() method on it before calling any action.

In part 2, I will cover Spark DataFrames, also equally important.

In the meanwhile, where to go next? Here’s a suggestion: Berkley AMPcamp Introduction exercises.

Hope this helped!