In this post, I build up on the knowledge shared in the post for creating Data Pipelines on Airflow and introduce new technologies that help in the Extraction part of the process with cost and performance in mind. I’ll go through the options available and then introduce to a specific solution using AWS Athena. First we’ll establish the dataset and organize our data in S3 Buckets. Afterwards, you’ll learn how to make it so that this information is queryable through AWS Athena, while making sure it is updated daily.
Data dump files of not so structured data are a common byproduct of Data Pipelines that include extraction. dumps of not-so-structured data. This happens by design: business-wise and as Data Engineers, it’s never too much data. From an investment stand point, object-relational database systems can become increasingly costly to keep, especially if we aim at keeping performance while the data grows.
Having this said, this is not a new problem. Both Apache and Facebook have developed open source software that is extremely efficient in dealing with extreme amounts of data. While such softwares are written in Java, they maintain an abstracted interface to the data that relies on traditional SQL language to query data that is stored on filesystem storage, such as S3 for our example and in a wide range of different formats from HFDS to CSV.
Today we have many options to tackle this problem and I’m going to go through on how to welcome this problem in today’s serverless world with AWS Athena. For this we need to quickly rewind back in time and go through the technology that marked a significant shift in the technology that helps us solve such problems while integrating these solutions with a popular Data Pipeline platform such as Airflow.
Apache Hive was the first of the family, it works with Apache Hadoop and relies heavily on MapReduce to the work behind the scenes. The SQL queries are translated to MapReduce stages and intermediate results are stored on filesystems, including S3 buckets. This made Hive extremely appealing as it was much faster then traditional alternatives and was also very reliable by taking a batch-approach to the processing of jobs – you know jobs are bound to fail at some point of development and production.
Apache Hive depends on something called the Hive Metastore. As the name suggests, this is a table that keeps the metadata of where the information is stored in your filesystem and the structure it has – it’s essentially a pointer to data. We’ll come back to this, as it is an essential component of this introduction, but for now it’s important to know of its existence.
Fast forward to 2013, Facebook releases Presto, also for Apache Hadoop, which in turn relies on in memory processing to achieve the same goal resulting in much faster query execution and also with connectors to S3. This also meant, however, that your infrastructure needs to be specifically tuned to the jobs you are running so they don’t run of memory constantly and, thus, giving you the despair of failed jobs. Of course this is a problem only when astronomical amounts of data are in place for being processed and still, Facebook was using it daily to scan around 1 Petabyte of data everyday (was, as the text back a Presto’s official welcome excerpt looks like was last updated in 2016). Apart from this, Presto also relies on a Hive Metastore to understand how and where the data is stored.
There are several more differences between both but if I had lightly boil it down to the use cases that distinguishes both, I could say: you use Hive for heavy processing that has a flexible time schedule and Presto when you need more frequent interaction with the data.
This context of differences (batch write to the filesystem vs. in memory) is extremely relevant if you are a IaaS provider with the capacity to direct virtually any amount of power at a designated service they deem worth. Combine this with the popularity of their storage service of S3 and the speed of Presto, you get the AWS Athena: a serverless service allows for queries to data stored in S3 buckets in several different formats, including CSV, JSON, ORC, Avro, and Parquet.
Moreover, the pricing strategy adopted by AWS means you only by per data scanned, letting you off he hook in regards to computational power. Not only you bypass the head aches of configuration and setup, you also bypass the costs of having a full blown EMR instance for running data intensive queries.
Establish a Dataset
For this tutorial, I there is a daily dump of .CSV files into an S3 bucket called s3://data. The first order of business is making sure our data will be organised in some way. A generic way of approaching this, which applies to most time-related data, is to organize it in a folder tree separated by Year, Month and Day.
Presto let’s you query your data also based on this structure, transforming the directory tree in fields you can use in you SQL filters. For this to work, you specifically need to use a keyword=value relation in the literal name of each directory such as s3://data/year=2018/month=6/day=1/dump.csv.
For the sake of simplicity, I’ll use this dataset of US Cities Population in 2014, in this case, the file should be store in an S3 path like s3://data/year=2014/dump.csv. It really doesn’t matter the name of the file. Athena will look for all of the formats you define at the Hive Metastore table level.
Top Tip: If you go through the AWS Athena tutorial you notice that you could just use the base directory, e.g. s3://data and run a manual query for Athena to scan the files inside that directory tree. You would conclude you could do this the first time and then every time there is a new dump file in the file system. While this is true it is but highly disregarded. There are two main reasons: the first is that you will have Athena to crawl/refresh the Hive Metastore completely every time you add a new file (e.g. daily with a manual command) which results in a scan. The second reason is that although AWS doesn’t charge for partition detection, the process often times out and they do charge for S3 GET requests. And the direct consequence of calling the command equivalent to “refresh all” does generate GET Requests to S3 as clarified in this post on the AWS forum. This is what we’ll use Airflow for in the next tutorial as a Data Pipeline.
Now that your data is organised, head out AWS Athena to the query section and select the sampledb which is where we’ll create our very first Hive Metastore table for this tutorial.
In line with our previous comment, we’ll create the table pointing at the root folder but will add the file location (or partition as Hive will call it) manually for each file or set of files.
Creating an empty table:
Note how we define the specifics to our file in the SERDEPROPERTIES (specific to this SerDe). Also, note that we are creating an external table, which means that deleting this table at a later stage will not delete the underlying data.
CREATE EXTERNAL TABLE `sampledb.us_cities_pop`( `name` string, `pop` bigint, `lat` decimal, `long` decimal ) PARTITIONED BY ( `year` int ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = ',', 'quoteChar' = '\"', 'escapeChar' = '\\' ) LOCATION's3://data/';
We could make the first query using the traditional
SELECT * FROM sampledb.us_cities_pop;
But the query will return since we haven’t added any partition or have explicitly told Athena to scan for files. Remember, you will be paying based on the amount of data scanned.
You could also check this by running the command:
SHOW PARTITIONS sampledb.us_cities_pop;
Let add the 2014 partition
Adding partitions is relatively simple, for the purposes of this simple directory tree and dataset:
ALTER TABLE sampledb.us_cities_pop ADD IF NOT EXISTS PARTITION ( year = 2014 ) LOCATION = 's3://data/year=2014/dump.csv;'
Now if you run the previous code to show partitions you’d see this very same one. The objective is that you would be doing this for every new dump file, e.g. every day. Adding manually a partition.
In the next post we’ll let you know how to integrate Athena into an Airflow pipeline and make sure you add partitions to the Hive Metastore table with a specific routine without having to be charged just for updating the parititons. This way you make sure you only have costs when effectively running a query that aims and giving answers back to you.