Spark 2.0: From quasi to proper-streaming?

This post attempts to follow the relatively recent new Spark release – Spark 2.0 – and understand the differences regarding Streaming applications. Why is streaming in particular?, you may ask. Well, Streaming is the ideal scenario most companies would like to use, and the competition landscape is definitely heating up, specially with Apache Flink and Google’s Apache Beam.

Why is streaming so difficult

There are three main problems when it comes to building real time applications based on streaming data:

  • Data consistency:  due to the distributed architecture nature it is possible that at any given point in time some events have been processed in some nodes and not  in other nodes, even though these events might actually have occurred before than others. For example, you may have a Logout event without a Login event, close app event without open app, etc.
  • Fault tolerance (FT): related to the first problem, on node failure processing engines may try to reprocess an event that had actually already been processed by the failed node, leading to duplicated records and/or inaccurate metrics
  • Dealing with late data/out-of-order data: either because you are reading from a message bus system such as Kafka or Kinesis, or simply because mobile app might only be able to sync data hours later, developers simply must tackle this challenge in application code.

See this post for an excellent detailed explanation. Continue reading “Spark 2.0: From quasi to proper-streaming?”

Distcp S3-hdfs for eu-central-1 (AWS Frankfurt) – details “behind the trenches”

 

Distcp (distributed copy) is a fairly old tool used to move a large quantity of files usually within hdfs, using MapReduce job to do so where mappers list the source files and reducers do the copy heavy lifting. Another useful integration is that it can also deal with file migrations between hdfs and AWS S3. In the meanwhile S3Distcp came along as an specific enhancement for speeding up these last kind of migrations.

However, what might be new for you is the requirement to use either one of these tools to move data from/to a bucket located in S3 Frankfurt region.   So this post intends to save you that half an hour of painful google research trying to figure out why your Oozie job is failing.

Without further ado, before you dive in to your Cloudera or Hortonworks Manager to edit hadoop properties in core-site.xml, I would suggest that you SSH to one of your hadop nodes, and start by testing running distcp command. Here is the sintax for moving data from S3 to HDFS:

hadoop distcp -Dfs.s3a.awsAccessKeyId=XXX \
-Dfs.s3a.awsSecretAccessKey=XX \
-Dfs.s3a.endpoint=s3-eu-central-1.amazonaws.com s3a://YOUR-BUCKET-NAME/ \
hdfs:///data/

So yeah, in case you haven’t noticed, the black magic pieces in this command that makes things work just like in other regions are: the “fs.s3a.endpoint” property, as well as the “a” after “s3”.

Also in case you are moving data from hdfs to S3, besides write access, make sure you also give remove permissions on your AWS policy attaches to  the given user account on for that S3 bucket (or specific path) . This is because Distcp produces temporary files adjacent to your bucket target file migration, which you want to allow the reducers to remove in the end. Here is an example of an AWS policy for that effect:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::YOUR-BUCKET/*",
                "arn:aws:s3:::YOUR-BUCKET" ]
}
]
}

Alternatively you might also want to test it using S3Distcp tool. Here is another example:

 hadoop jar s3distcp.jar --src /data/ \
--dest s3a://YOUR-BUCKET-NAME/ \
--s3Endpoint s3-eu-central-1.amazonaws.com

Note that the s3distcp jar needs to be locally on the host file system from which you are running the command. If you have the jar in hdfs, here is an example how you can fetch it:

hadoop fs -copyToLocal /YOUR-JARS-LOCATION-IN-HDFS/s3distcp.jar \
/var/lib/hadoop-hdfs/ 

Also there is also the possibility you might stumble uppon some headaches with S3Distcp tool. This might prove itself to be handy.

Hope this helped! Where to go next? AWS has a handy best practice guide listing different strategies for data migration here.