Build a Data Pipeline with AWS Athena and Airflow (part 1)

In this post, I build up on the knowledge shared in the post for creating Data Pipelines on Airflow and introduce new technologies that help in the Extraction part of the process with cost and performance in mind. I’ll go through the options available and then introduce to a specific solution using AWS Athena. First we’ll establish the dataset and organize our data in S3 Buckets. Afterwards, you’ll learn how to make it so that this information is queryable through AWS Athena, while making sure it is updated daily.

Data dump files of not so structured data are a common byproduct of Data Pipelines that include extraction. dumps of not-so-structured data. This happens by design: business-wise and as Data Engineers, it’s never too much data. From an investment stand point, object-relational database systems can become increasingly costly to keep, especially if we aim at keeping performance while the data grows.

Having this said, this is not a new problem. Both Apache and Facebook have developed open source software that is extremely efficient in dealing with extreme amounts of data. While such softwares are written in Java, they maintain an abstracted interface to the data that relies on traditional SQL language to query data that is stored on filesystem storage, such as S3 for our example and in a wide range of different formats from HFDS to CSV.

Today we have many options to tackle this problem and I’m going to go through on how to welcome this problem in today’s serverless world with AWS Athena. For this we need to quickly rewind back in time and go through the technology Continue reading “Build a Data Pipeline with AWS Athena and Airflow (part 1)”

Airflow: create and manage Data Pipelines easily

This bootstrap guide was originally published at GoSmarten but as the use cases continue to increase, it’s a good idea to share it here as well.

What is Airflow

The need to perform operations or tasks, either simple and isolated or complex and sequential, is present in all things data nowadays. If you or your team work with lots of data on a daily basis there is a good chance you’re struggled with the need to implement some sort of pipeline to structure these routines. To make this process more efficient, Airbnb developed an internal project conveniently called Airflow which was later fostered under the Apache Incubator program. In 2015 Airbnb open-sourced the code to the community and, albeit its trustworthy origin played a role in its popularity, there are many other reasons why it became widely adopted (in the engineering community). It allows for tasks to be set up purely in Python (or as Bash commands/scripts).

What you’ll find in this tutorial

Not only we will walk you through setting up Airflow locally, but you’ll do so using Docker, which will optimize the conditions to learn locally while minimizing transition efforts into production. Docker is a subject for itself and we could dedicate a few dozen posts to, however, it is also simple enough and has all the magic needed to show off some of its advantages combining it with Airflow. Continue reading “Airflow: create and manage Data Pipelines easily”

Getting through Deep Learning – Tensorflow intro (part 3)

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)
  3. Getting through Deep Learning – TensorFlow intro (part 3)

Alright, lets move on to more interesting stuff: linear regression. Since the main focus in TensorFlow, and given the abundancy of online resources on the subject, I’ll just assume you are familiar with Linear Regressions.

As previously mentioned, a linear regression has the following formula:

linear_regression

Where Y is the dependent variable, X is the independent variable, and b0 and b1 being the parameters we want to adjust.

Let us generate random data, and feed that random data into a linear function. Then, as opposed to using the closed-form solution, we use an iterative algorithm to progressively become closer to a minimal cost, in this case using gradient descent to fit a linear regression.

We start by initializing the weights – b0 and b1 – with random variables, which naturally results in a poor fit. However, as one can see through the print statements as the model trains, b0 approaches the target value of 3, and b1 of 5, with the last printed step: [3.0229037, 4.9730182]

The next figure illustrates the progressive fitting of lines of the model, as weights change:

lr_fitting

 

Sources:

 

Getting through Deep Learning – Tensorflow intro (part 2)

Yes, I kind of jumped the guns on my initial post on Deep Learning straight into CNNs. For me this learning path works the best, as I dive straight into the fun part, and eventually stumble upon the fact that maybe I’m not that good of a swimmer, and it might be good to practice a bit before going out in deep waters. This post attempts to be exactly that: going back to the basics.

This post is part of a tutorial series:

  1. Getting through Deep Learning – CNNs (part 1)
  2. Getting through Deep Learning – TensorFlow intro (part 2)

TensorFlow is a great starting point for Deep Learning/ Machine Learning, as it provides a very concise yet extremely powerful API. It is an open-source project created by Google initially with numerical computation tasks in mind, and used for Machine Learning/Deep Learning.

TensorFlow provides APIs for both Python and C++, but it’s backend is written in C/C++, allowing it to achieve much greater performance milestones. Moreover, it supports CPU, GPU, as well as distributed computing in a cluster.

The first thing to realize is that TensorFlow uses the concept of a session. A session is nothing more than a series of operations to manipulate tensors, organized in a structure of a data flow graph. This graph building activity pretty much works like Lego building, by matching nodes and edges. Nodes represent mathematical operations, and edges multi-dimensional arrays – aka: Tensors. As the name hints, a tensor is the central data structure in TensorFlow, and is described by its shape. For example, one would characterize a 2 row by 3 columns matrix as a tensor with shape of [2,3].

Important also to note is that the graph is lazy loaded, meaning that computation will only be triggered by an explicit run order for that session graph. OK, enough talking, let us get into coding, by exemplifying how a basic graph Session is built:

In the previous example, variables “a” and “b” are the nodes in the graph, and the summation is the operation connecting both of them.

A TensorFlow program is typically split into two parts: construction phase – where the graph is built – and a second one called execution phase, when actually resources (CPU and/or GPU, RAM and disk) are allocated until the session is closed.

Typically machine learning applications strive to iteratively update model weights. So, of course, one can also specify tensors of variable type, and even combine those constants.

By defining the dtype of a node, one can gain/loose precision, and at the same time impact on memory utilization and computation times.

Note lines 35 to 37 now:

r1 = op1.eval()
r2 = op2.eval()
result = f.eval()

TensorFlow automatically detects which operations depend on each other. In this case, TensorFlow will know that op1 depends on x and y evaluation, op2 on a, b and c evaluation,  and finally that f depends on both op1 and op2. Thus internally the lazy evaluation is also aligned with the computation graph. So far so good.

However, all nodes values are dropped between graph runs, except for variable values, which are maintained by the session across graph runs. This has the import implication that op1 and op2 evaluation will not be reused upon f graph run – meaning the code will eveluate op1 and op2 twice.

To overcome this limitation, one needs to instruct TensorFlow to run those operations in a single graph:

And yes, that is all for today. I want to blog more frequently, and instead of writing just once every couple of months (and in the meanwhile pilling up a lot of draft posts that never see the light of day), I decided to keep it simple. See you soon 🙂

Sources:

 

 

Getting started with the Spark (part 2) – SparkSQL

Update: this tutorial has been updated mainly up to Spark 1.6.2 (with a minor detail regarding Spark 2.0), which is not the most recent version of Spark at the moment of updating of this post. Nonetheless, for the operations exemplified you can pretty much rest assured that the API has not changed substantially. I will try to do my best to update and point out these differences when they occur.

SparkSQL has become one of the core modules of Spark ecosystem (with its API already available for all currently supported languages, namely Scala, Java, Python, and R), and is intended to allow you to structure data into named columns (instead of with raw data without a schema as with RDDs) which can be queried with SQL syntax taking advantage of a distributed query engine running in the background.

In practical terms, you can very much think of it as equivalent as an attempt to emulate the API’s of popular dataframes in R or Python pandas, as well as SQL tables. However, if I do my job correctly, I succeed in proving you that they are not the same (and hopefully more).

In order for you to use SparkSQL you need to import SQLContext. Let’s start how you would do it with Python:

# import required libs
from pyspark import SparkConf, SparkContext
# here is the import that allows you to use dataframes
from pyspark.sql import SQLContext

# initialize context in Spark 1.6.2
conf = SparkConf().setMaster('local[2]').setAppName('SparkSQL-demo')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

Note: in the new Spark 2.0, this would indeed vary as such:

# import required libs
from pyspark.sql import SparkSession
# initialize context in Spark 2.0
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\
    <span class="o">.</span><span class="n">builder</span>\
    <span class="o">.</span><span class="n">appName</span><span class="p">(</span>'SparkSQL-demo')<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>

Good news, Scala does not vary much:

//import required libs
import org.apache.spark.{SparkConf, SparkContext}
//here is the import that allows you to use dataframes
import org.apache.spark.sql.SQLContext

# initialize context in Spark 1.6.2
val conf = new SparkConf().setMaster(&quot;local[2]&quot;).setAppName(&quot;SparkSQL-demo&quot;)
val sc = new SparkContext(conf=conf)
val sqlContext = new SQLContext(sc)

 

A dataframe can be created from an existing RDD, as well as similarly to RDDs from a variety of different sources such astext files, HDFS, S3,  Hive tables, external databases, etc. Here’s an example how to build a dataframe by reading from S3 (in Python):

df = sqlContext.read.json('s3n://my-bucket/example/file.json')

 

Note: we use “s3n” prefix which is valid for a serious of regions in AWS, but not valid for all of them. For example, to access S3 buckets in Frankfurt region you would rather use “s3a” as a prefix.

What sets Spark Dataframes apart

However, and even though the goal is to hide the maximum amount of disparities, there are nonetheless big conceptual differences between a Spark Dataframe and a Pandas dataframe. The first is that the immutability side has (thank God) not been removed. In other words, when you call an action on an Dataframe, you do need to save it into a variable. I’ve seen a lot of data scientists stumble on this in their first Spark days, so let us illustrate this in a very simple example:


# Create a Dataframe with pseudo data:
column_names = [&quot;customer_name&quot;, &quot;date&quot;, &quot;category&quot;,
&quot;product_name&quot;, &quot;quantity&quot;, &quot;price&quot;]
# Note that we create an RDD, and then cast it to
# Dataframe by providing a schema
column_names = ["customer_name", "date", "category",
"product_name", "quantity", "price"]
df01 = sc.parallelize(
[("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00)]).toDF(column_names)

df02 = df01.select("customer_name", "date", "category", "product_name", "price", (df01['quantity']+1).alias("quantity"))
df01.show()
df02.show()

+-------------+----------+--------+------------+--------+-----+
|customer_name|      date|category|product_name|quantity|price|
+-------------+----------+--------+------------+--------+-----+
|     Geoffrey|2016-04-22|       A|      apples|       1| 50.0|
|         Yann|2016-04-22|       B|        Lamp|       1| 38.0|
+-------------+----------+--------+------------+--------+-----+

# .. where df02 is, as expected df02.show() 

+-------------+----------+--------+------------+-----+--------+
|customer_name|      date|category|product_name|price|quantity|
+-------------+----------+--------+------------+-----+--------+
|     Geoffrey|2016-04-22|       A|      apples| 50.0|       2|
|         Yann|2016-04-22|       B|        Lamp| 38.0|       2|
+-------------+----------+--------+------------+-----+--------+

The second most obvious difference (compared to pandas df) is the distributed nature of a Spark Dataframe, similarly as RDDs are. As a matter a fact,in previous versions of Spark Dataframes were even called SchemaRDD. So internally the same redundancy principles of RDDs apply, you should in fact still keep in mind the Driver – Executor programming model, as well as Actions and Transformations concept.

Also important to note is that Spark Dataframes use internally a query optmizer, one of the reasons why the performance difference between native Scala written Spark code vs Python or Java drops significantly as can be seen in the next illustration provided by databricks:

perf_diffs_from_dataframes_vs_rdds

 

API

Let us view some useful operations one can perform with spark dfs. For the sake of brevity I will continue this tutorial mainly using the python API, as the syntax does not really vary that much.

Let us create a bigger fake (and complete utterly non-sense) dataset to make operations more interesting:

customers = sc.parallelize([("Geoffrey", "2016-04-22", "A", "apples", 1, 50.00),
("Geoffrey", "2016-05-03", "B", "Lamp", 2, 38.00),
("Geoffrey", "2016-05-03", "D", "Solar Pannel", 1, 29.00),
("Geoffrey", "2016-05-03", "A", "apples", 3, 50.00),
("Geoffrey", "2016-05-03", "C", "Rice", 5, 15.00),
("Geoffrey", "2016-06-05", "A", "apples", 5, 50.00),
("Geoffrey", "2016-06-05", "A", "bananas", 5, 55.00),
("Geoffrey", "2016-06-15", "Y", "Motor skate", 7, 68.00),
("Geoffrey", "2016-06-15", "E", "Book: The noose", 1, 125.00),
("Yann", "2016-04-22", "B", "Lamp", 1, 38.00),
("Yann", "2016-05-03", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-05-03", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-03", "C", "Rice", 15, 15.00),
("Yann", "2016-04-02", "A", "bananas", 3, 55.00),
("Yann", "2016-04-02", "B", "Lamp", 2, 38.00),
("Yann", "2016-04-03", "E", "Book: Crime and Punishment", 5, 100.00),
("Yann", "2016-04-13", "E", "Book: The noose", 5, 125.00),
("Yann", "2016-04-27", "D", "Solar Pannel", 5, 29.00),
("Yann", "2016-05-27", "D", "Recycle bin", 5, 27.00),
("Yann", "2016-05-27", "A", "bananas", 3, 55.00),
("Yann", "2016-05-01", "Y", "Motor skate", 1, 68.00),
("Yann", "2016-06-07", "Z", "space ship", 1, 227.00),
("Yoshua", "2016-02-07", "Z", "space ship", 2, 227.00),
("Yoshua", "2016-02-14", "A", "bananas", 9, 55.00),
("Yoshua", "2016-02-14", "B", "Lamp", 2, 38.00),
("Yoshua", "2016-02-14", "A", "apples", 10, 55.00),
("Yoshua", "2016-03-07", "Z", "space ship", 5, 227.00),
("Yoshua", "2016-04-07", "Y", "Motor skate", 4, 68.00),
("Yoshua", "2016-04-07", "D", "Recycle bin", 5, 27.00),
("Yoshua", "2016-04-07", "C", "Rice", 5, 15.00),
("Yoshua", "2016-04-07", "A", "bananas", 9, 55.00),
("Jurgen", "2016-05-01", "Z", "space ship", 1, 227.00),
("Jurgen", "2016-05-01", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "A", "bananas", 5, 55.00),
("Jurgen", "2016-05-08", "Y", "Motor skate", 1, 68.00),
("Jurgen", "2016-06-05", "A", "bananas", 5, 55.00),
("Jurgen", "2016-06-05", "C", "Rice", 5, 15.00),
("Jurgen", "2016-06-05", "Y", "Motor skate", 2, 68.00),
("Jurgen", "2016-06-05", "D", "Recycle bin", 5, 27.00),
]).toDF(["customer_name", "date", "category", "product_name", "quantity", "price"])

 

The dataset is minimal for the example, so it really does not make a difference to cache it in memory. But my point is to illustrate that similarly to RDDs, you should also care about memory persistence, and can indeed use a similar API. Moreover, since next we are going to use several distinct operations of type “Action” (yielding back results to the driver), it is of good practice to signal that this dataframe should be kept in memory for further operations.

print(customers.is_cached)
customers.cache()
print(customers.is_cached)
# False
# True

OK, so yes, there are some exceptions, and in this case as you can see the cache() method does not obey the laws of immutability. Moving on, let’s take a peak on it:

customers.show(10)

+-------------+----------+--------+---------------+--------+-----+
|customer_name|      date|category|   product_name|quantity|price|
+-------------+----------+--------+---------------+--------+-----+
|     Geoffrey|2016-04-22|       A|         apples|       1| 50.0|
|     Geoffrey|2016-05-03|       B|           Lamp|       2| 38.0|
|     Geoffrey|2016-05-03|       D|   Solar Pannel|       1| 29.0|
|     Geoffrey|2016-05-03|       A|         apples|       3| 50.0|
|     Geoffrey|2016-05-03|       C|           Rice|       5| 15.0|
|     Geoffrey|2016-06-05|       A|         apples|       5| 50.0|
|     Geoffrey|2016-06-05|       A|        bananas|       5| 55.0|
|     Geoffrey|2016-06-15|       Y|    Motor skate|       7| 68.0|
|     Geoffrey|2016-06-15|       E|Book: The noose|       1|125.0|
|         Yann|2016-04-22|       B|           Lamp|       1| 38.0|
+-------------+----------+--------+---------------+--------+-----+
only showing top 10 rows

 

As previously referred, one advantage over RDDs is the ability to use standard SQL syntax to slice and dice over a Spark Dataframe. Let’s check for example all distinct products that we have:

spark_sql_table = 'customers'
sql_qry = "select distinct(product_name) from {table}".format(table=spark_sql_table)
customers.registerTempTable(spark_sql_table)
distinct_product_names_df01 = sqlContext.sql(sql_qry)
distinct_product_names_df01.show(truncate=False)

+--------------------------+
|product_name              |
+--------------------------+
|space ship                |
|Book: The noose           |
|Rice                      |
|Motor skate               |
|Book: Crime and Punishment|
|Recycle bin               |
|Lamp                      |
|bananas                   |
|Solar Pannel              |
|apples                    |
+--------------------------+

 

Similarly to how we work with RDDs, with Spark DataFrames you are not restricted to SQL, and can indeed call methods on dataframes, which yields a new dataframe (remember the immutability story, right?)

Let’s check for example the same previous example on how to get all distinct:

distinct_product_names_df02 = customers.select('product_name').distinct()
distinct_product_names_df02.show(truncate=False)
distinct_product_names_df02.count()

+--------------------------+
|product_name |
+--------------------------+
|space ship |
|Book: The noose |
|Rice |
|Motor skate |
|Book: Crime and Punishment|
|Recycle bin |
|Lamp |
|bananas |
|Solar Pannel |
|apples |
+--------------------------+
10

 

Note that you can as well work on a dataframe just like you iterate on an RDD, which causes the dataframe to be cast into a RDD in the background:


distinct_product_names_rdd = customers.map(lambda row: row[3])
# calling distinct_product_names_rdd.show() will yield an exception
try:
 distinct_product_names_rdd.show()
except Exception as err:
 print('Error: {}'.format(err))
 print('BUT calling .take() this works, as it is an RDD now: {}' \
 .format(distinct_product_names_rdd.take(3)) )

distinct_product_names = distinct_product_names_rdd.distinct().collect()
print('Distinct product names are: {}'.format(distinct_product_names))
print('\n... And, as expected, they are {} in total.' \
 .format(len(distinct_product_names)))

Error: 'PipelinedRDD' object has no attribute 'show'
BUT calling .take() this works, as it is an RDD now: ['apples', 'Lamp', 'Solar Pannel']
Distinct product names are: ['Rice', 'bananas', 'space ship', 'Book: The noose', 'Solar Pannel', 'Recycle bin', 'apples', 'Lamp', 'Book: Crime and Punishment', 'Motor skate']

... And, as expected, they are 10 in total.

 

Similar than in R or Python, you can have summary statistics. Let’s get statistics for Category, quantity and price columns:

customers.describe('category', 'quantity', 'price').show()
&amp;amp;amp;amp;nbsp;
+-------+--------+------------------+------------------+
|summary|category|          quantity|             price|
+-------+--------+------------------+------------------+
|  count|      39|                39|                39|
|   mean|    null| 4.153846153846154| 68.94871794871794|
| stddev|    null|2.9694803863945936|59.759583559965165|
|    min|       A|                 1|              15.0|
|    max|       Z|                15|             227.0|
+-------+--------+------------------+------------------+

 

Being a categorical variable, as you can see the summary regarding the ‘category’ column does not make much sense. We’ll work on better on this later, but for now, let’s do something simple and try to look into product_name and category.

Let’s suppose we are interested in finding out how does purchasing behavior vary among categories, namely what is the average frequency of purchases per category.

cat_freq_per_cust = customers.stat.crosstab('customer_name', 'category')
cat_freq_per_cust.show()

+----------------------+---+---+---+---+---+---+---+
|customer_name_category|  Y|  Z|  A|  B|  C|  D|  E|
+----------------------+---+---+---+---+---+---+---+
|              Geoffrey|  1|  0|  4|  1|  1|  1|  1|
|                Yoshua|  1|  2|  3|  1|  1|  1|  0|
|                  Yann|  2|  1|  2|  2|  1|  3|  2|
|                Jurgen|  2|  1|  3|  0|  1|  1|  0|
+----------------------+---+---+---+---+---+---+---+

 

cols = cat_freq_per_cust.columns
cat_freq_per_cust.describe(cols[1::]).show(truncate=False)

+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+
|summary|Y                 |Z                |A                |B                |C  |D                 |E                 |
+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+
|count  |4                 |4                |4                |4                |4  |4                 |4                 |
|mean   |1.5               |1.0              |3.0              |1.0              |1.0|1.5               |0.75              |
|stddev |0.5773502691896257|0.816496580927726|0.816496580927726|0.816496580927726|0.0|0.9999999999999999|0.9574271077563381|
|min    |1                 |0                |2                |0                |1  |1                 |0                 |
|max    |2                 |2                |4                |2                |1  |3                 |2                 |
+-------+------------------+-----------------+-----------------+-----------------+---+------------------+------------------+

 

Note: “count” column does not have much meaning here; it simply counts number of rows blindly, and as expected since the rows without a given value are filled with “zero” instead of Null, it counts them too.

Let’s check the same for products.

# Let's view products vertically, for formatting reasons
customers.stat.crosstab('product_name', 'customer_name' )\
.show(truncate=False)
+--------------------------+--------+------+----+------+
|product_name_customer_name|Geoffrey|Jurgen|Yann|Yoshua|
+--------------------------+--------+------+----+------+
|Book: Crime and Punishment|0       |0     |1   |0     |
|Recycle bin               |0       |1     |2   |1     |
|Solar Pannel              |1       |0     |1   |0     |
|space ship                |0       |1     |1   |2     |
|Motor skate               |1       |2     |2   |1     |
|Rice                      |1       |1     |1   |1     |
|apples                    |3       |0     |0   |1     |
|Lamp                      |1       |0     |2   |1     |
|bananas                   |1       |3     |2   |2     |
|Book: The noose           |1       |0     |1   |0     |
+--------------------------+--------+------+----+------+

 


freq_products = customers.stat.crosstab('customer_name', 'product_name')
freq_prod_cols = freq_products.columns
freq_products.describe(freq_prod_cols[1::]).show()

+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+
|summary|          bananas|       space ship|   Book: The noose|Rice|       Motor skate|             Lamp|            apples|Book: Crime and Punishment|      Recycle bin|      Solar Pannel|
+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+
|  count|                4|                4|                 4|   4|                 4|                4|                 4|                         4|                4|                 4|
|   mean|              2.0|              1.0|               0.5| 1.0|               1.5|              1.0|               1.0|                      0.25|              1.0|               0.5|
| stddev|0.816496580927726|0.816496580927726|0.5773502691896257| 0.0|0.5773502691896257|0.816496580927726|1.4142135623730951|                       0.5|0.816496580927726|0.5773502691896257|
|    min|                1|                0|                 0|   1|                 1|                0|                 0|                         0|                0|                 0|
|    max|                3|                2|                 1|   1|                 2|                2|                 3|                         1|                2|                 1|
+-------+-----------------+-----------------+------------------+----+------------------+-----------------+------------------+--------------------------+-----------------+------------------+

Alternatively we could have answered suing a more SQL-like approach:

cust_prod = customers.groupBy('customer_name', 'product_name') \
 .agg(func.count('product_name')) \
 .orderBy('product_name', 'customer_name')
cust_prod.show(cust_prod.count(), truncate=False)

+-------------+--------------------------+-------------------+
|customer_name|product_name              |count(product_name)|
+-------------+--------------------------+-------------------+
|Yann         |Book: Crime and Punishment|1                  |
|Geoffrey     |Book: The noose           |1                  |
|Yann         |Book: The noose           |1                  |
|Geoffrey     |Lamp                      |1                  |
|Yann         |Lamp                      |2                  |
|Yoshua       |Lamp                      |1                  |
|Geoffrey     |Motor skate               |1                  |
|Jurgen       |Motor skate               |2                  |
|Yann         |Motor skate               |2                  |
|Yoshua       |Motor skate               |1                  |
|Jurgen       |Recycle bin               |1                  |
|Yann         |Recycle bin               |2                  |
|Yoshua       |Recycle bin               |1                  |
|Geoffrey     |Rice                      |1                  |
|Jurgen       |Rice                      |1                  |
|Yann         |Rice                      |1                  |
|Yoshua       |Rice                      |1                  |
|Geoffrey     |Solar Pannel              |1                  |
|Yann         |Solar Pannel              |1                  |
|Geoffrey     |apples                    |3                  |
|Yoshua       |apples                    |1                  |
|Geoffrey     |bananas                   |1                  |
|Jurgen       |bananas                   |3                  |
|Yann         |bananas                   |2                  |
|Yoshua       |bananas                   |2                  |
|Jurgen       |space ship                |1                  |
|Yann         |space ship                |1                  |
|Yoshua       |space ship                |2                  |
+-------------+--------------------------+-------------------+

 

cust_prod.groupBy('product_name') \
 .agg(func.avg('count(product_name)')) \
 .show(truncate=False)
+--------------------------+------------------------+
|product_name              |avg(count(product_name))|
+--------------------------+------------------------+
|space ship                |1.3333333333333333      |
|Book: The noose           |1.0                     |
|Rice                      |1.0                     |
|Motor skate               |1.5                     |
|Book: Crime and Punishment|1.0                     |
|Recycle bin               |1.3333333333333333      |
|Lamp                      |1.3333333333333333      |
|bananas                   |2.0                     |
|Solar Pannel              |1.0                     |
|apples                    |2.0                     |
+--------------------------+------------------------+

Note that this does not take into account the quantity of each product bought, just how many times client was there to buy a given item. For our last question that may be OK, because we understand the different nature of products.
But let us assume it is not OK, and we do need to count by quantity. For that we can use pivot dataframes, a feature introduced in Spark 1.6

total_products_bought_by_customer = customers.groupBy('customer_name')\
 .pivot('product_name').sum('quantity')
total_products_bought_by_customer.show()

+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|customer_name|Book: Crime and Punishment|Book: The noose|Lamp|Motor skate|Recycle bin|Rice|Solar Pannel|apples|bananas|space ship|
+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+
|         Yann|                         5|              5|   3|          2|         10|  15|           5|  null|      6|         1|
|       Yoshua|                      null|           null|   2|          4|          5|   5|        null|    10|     18|         7|
|     Geoffrey|                      null|              1|   2|          7|       null|   5|           1|     9|      5|      null|
|       Jurgen|                      null|           null|null|          3|          5|   5|        null|  null|     15|         1|
+-------------+--------------------------+---------------+----+-----------+-----------+----+------------+------+-------+----------+

 

Hope this has been a good introduction, and I suggest that you further check out this great post from databricks covering similar operations.

For the last part of this tutorial series I plan to cover window functions, something that until the more recent Spark 2.0 was possible thanks to HiveContext.