Getting Started with Spark (part 4) – Unit Testing

Alright quite a while ago (already counting years), I published a tutorial series focused on helping people getting started with Spark. Here is an outline of the previous posts:

In the meanwhile Spark has not decreased popularity, so I thought I continued updating the same series. In this post we cover an essential part of any ETL project, namely Unit testing.

For that I created a sample repository, which is meant to serve as boiler plate code for any new Python Spark project.

Let us browse through the main job script. Please note that the repository might contain updated version, which might defer in details with the next gist.

# create the general function
def _amount_spent(quantity: int, price: float) -> float:
"""
Calculates the product between two variables
:param quantity: (float/int)
:param price: (float/int)
:return:
(float/int)
"""
return quantity * price
def amount_spent_udf(data: DataFrame) -> DataFrame:
# create the general UDF
amount_spent_udf = F.udf(_amount_spent, DoubleType())
# Note: DoubleType in Java/Scala is equal to Python float; thus you can alternatively specify FloatType()
# Apply our UDF to the dataframe
return data.withColumn('amount_spent', amount_spent_udf(F.col('quantity'), F.col('price')))
def main(conf: ConfigParser, spark: SparkSession) -> None:
# mock data
customers = spark.createDataFrame([
Row(customer_name="Geoffrey", date="2016-04-22", category="A", product_name="apples", quantity=1, price=50.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="D", product_name="Solar Pannel", quantity=1, price=29.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="A", product_name="apples", quantity=3, price=50.00),
Row(customer_name="Geoffrey", date="2016-05-03", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="apples", quantity=5, price=50.00),
Row(customer_name="Geoffrey", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Geoffrey", date="2016-06-15", category="Y", product_name="Motor skate", quantity=7, price=68.00),
Row(customer_name="Geoffrey", date="2016-06-15", category="E", product_name="Book: The noose", quantity=1, price=125.00),
Row(customer_name="Yann", date="2016-04-22", category="B", product_name="Lamp", quantity=1, price=38.00),
Row(customer_name="Yann", date="2016-05-03", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Yann", date="2016-05-03", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yann", date="2016-05-03", category="C", product_name="Rice", quantity=15, price=15.00),
Row(customer_name="Yann", date="2016-04-02", category="A", product_name="bananas", quantity=3, price=55.00),
Row(customer_name="Yann", date="2016-04-02", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Yann", date="2016-04-03", category="E", product_name="Book: Crime and Punishment", quantity=5, price=100.00),
Row(customer_name="Yann", date="2016-04-13", category="E", product_name="Book: The noose", quantity=5, price=125.00),
Row(customer_name="Yann", date="2016-04-27", category="D", product_name="Solar Pannel", quantity=5, price=29.00),
Row(customer_name="Yann", date="2016-05-27", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yann", date="2016-05-27", category="A", product_name="bananas", quantity=3, price=55.00),
Row(customer_name="Yann", date="2016-05-01", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Yann", date="2016-06-07", category="Z", product_name="space ship", quantity=1, price=227.00),
Row(customer_name="Yoshua", date="2016-02-07", category="Z", product_name="space ship", quantity=2, price=227.00),
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="bananas", quantity=9, price=55.00),
Row(customer_name="Yoshua", date="2016-02-14", category="B", product_name="Lamp", quantity=2, price=38.00),
Row(customer_name="Yoshua", date="2016-02-14", category="A", product_name="apples", quantity=10, price=55.00),
Row(customer_name="Yoshua", date="2016-03-07", category="Z", product_name="space ship", quantity=5, price=227.00),
Row(customer_name="Yoshua", date="2016-04-07", category="Y", product_name="Motor skate", quantity=4, price=68.00),
Row(customer_name="Yoshua", date="2016-04-07", category="D", product_name="Recycle bin", quantity=5, price=27.00),
Row(customer_name="Yoshua", date="2016-04-07", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Yoshua", date="2016-04-07",category= "A", product_name="bananas", quantity=9, price=55.00),
Row(customer_name="Jurgen", date="2016-05-01", category="Z", product_name="space ship", quantity=1, price=227.00),
Row(customer_name="Jurgen", date="2016-05-01", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-05-08", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-05-08", category="Y", product_name="Motor skate", quantity=1, price=68.00),
Row(customer_name="Jurgen", date="2016-06-05", category="A", product_name="bananas", quantity=5, price=55.00),
Row(customer_name="Jurgen", date="2016-06-05", category="C", product_name="Rice", quantity=5, price=15.00),
Row(customer_name="Jurgen", date="2016-06-05", category="Y", product_name="Motor skate", quantity=2, price=68.00),
Row(customer_name="Jurgen", date="2016-06-05", category="D", product_name="Recycle bin", quantity=5, price=27.00),
])
result = amount_spent_udf(data=customers)
result.show(10)

view raw
pyspark_demo_app.py
hosted with ❤ by GitHub

The previous gist recovers the same example used in the previous post on UDFs and Window Functions.

Here is an example how we could test our “amount_spent_udf” function:

from tests.test_utils.test_spark import spark_session
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import col
from src.job import amount_spent_udf
def test_amount_spent_udf(spark_session: SparkSession) -> None:
input_df = spark_session.createDataFrame([
Row(customer_name="Geoffrey", date="2016-04-22", category="Foo", product_name="Bar", quantity=1, price=2.00),
])
result = amount_spent_udf(data=input_df)
assert isinstance(result, DataFrame)
assert result.count() == input_df.count()
assert sorted(result.columns) == ['amount_spent', 'category', 'customer_name', 'date', 'price',
'product_name', 'quantity']
assert result.collect()[0].amount_spent == 2.00

Now note the first line on the unit tests script, which is the secret sauce to load a spark context for your unit tests. Bellow is the code that creates the “spark_session” object passed as an argument to the “test_amount_spent_udf” function.

"""
Utilities common to all tests using spark
"""
import pytest
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import logging
def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_context(request):
"""
fixture for creating a spark session
Args:
request: pytest.FixtureRequest object
"""
conf = SparkConf() \
.setMaster("local[2]") \
.setAppName("pytest-pyspark-local-testing")
sc = SparkContext(conf=conf)
request.addfinalizer(lambda: sc.stop())
quiet_py4j()
return sc
@pytest.fixture(scope="session")
def spark_session(request):
"""
fixture for creating a spark session
Args:
request: pytest.FixtureRequest object
"""
spark_conf = SparkConf() \
.setMaster("local[2]") \
.setAppName("pytest-pyspark2.+-local-testing")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark

view raw
pyspark_fixtures.py
hosted with ❤ by GitHub

And that is it. We strongly encourage you to have a look on the correspondent git repository, where we specify detailed instructions how to run it locally.

And that is it for today, hope it helped!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s