Loading...

Follow AWS Big Data Blog on Feedspot

Continue with Google
Continue with Facebook
or

Valid

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

As the cost of genomic sequencing has rapidly decreased, the amount of publicly available genomic data has soared over the past couple of years. New cohorts and studies have produced massive datasets consisting of over 100,000 individuals. Simultaneously, these datasets have been processed to extract genetic variation across populations, producing mass amounts of variation data for each cohort.

In this era of big data, tools like Apache Spark have provided a user-friendly platform for batch processing of large datasets. However, to use such tools as a sufficient replacement to current bioinformatics pipelines, we need more accessible and comprehensive APIs for processing genomic data. We also need support for interactive exploration of these processed datasets.

ADAM and Mango provide a unified environment for processing, filtering, and visualizing large genomic datasets on Apache Spark. ADAM allows you to programmatically load, process, and select raw genomic and variation data using Spark SQL, an SQL interface for aggregating and selecting data in Apache Spark. Mango supports the visualization of both raw and aggregated genomic data in a Jupyter notebook environment, allowing you to draw conclusions from large datasets at multiple resolutions.

With the combined power of ADAM and Mango, you can load, query, and explore datasets in a unified environment. You can interactively explore genomic data at a scale previously impossible using single node bioinformatics tools. In this post, we describe how to set up and run ADAM and Mango on Amazon EMR. We demonstrate how you can use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, which is publicly available in Amazon S3 as a public dataset.

Configuring ADAM and Mango on Amazon EMR

First, you launch and configure an EMR cluster. Mango uses Docker containers to easily run on Amazon EMR. Upon cluster startup, EMR uses the following bootstrap action to install Docker and the required startup scripts. The scripts are available at /home/hadoop/mango-scripts.

aws emr create-cluster 
--release-label emr-5.14.0 \   
--name 'emr-5.14.0 Mango example' \   
--applications Name=Hadoop Name=Hive Name=Spark \   
--ec2-attributes KeyName=<your-ec2-key>,InstanceProfile=EMR_EC2_DefaultRole \   
--service-role EMR_DefaultRole \     
--instance-groups \     InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c5.4xlarge \     InstanceGroupType=CORE,InstanceCount=4,InstanceType=c5.4xlarge \   --region <your-aws-region> \   
--log-uri s3://<your-s3-bucket>/emr-logs/ \   
--bootstrap-actions \     
Name='Install Mango', Path="s3://aws-bigdata-blog/artifacts/mango-emr/install-bdg-mango-emr5.sh"

To start the Mango notebook, run the following:

/home/hadoop/mango-scripts/run-notebook.sh

This file sets up all of the environment variables that are needed to run Mango in Docker on Amazon EMR. In your terminal, you will see the port and Jupyter notebook token for the Mango notebook session. Navigate to this port on the public DNS URL of the master node for your EMR cluster.

Loading data from the 1000 Genomes Project

Now that you have a working environment, you can use ADAM and Mango to discover interesting variants in the child from the genome sequencing data of a trio (data from a mother, father, and child). This data is available from the 1000 Genomes Project AWS public dataset. In this analysis, you will view a trio (NA19685, NA19661, and NA19660) and search for variants that are present in the child but not present in the parents.

In particular, we want to identify genetic variants that are found in the child but not in the parents, known as de novo variants. These are interesting regions, as they can indicate sights of de novo variation that might contribute to multiple disorders.

You can find the Jupyter notebook containing these examples in Mango’s GitHub repository, or at /opt/cgl-docker-lib/mango/example-files/notebooks/aws-1000genomes.ipynb in the running Docker container for Mango.

First, import the ADAM and Mango modules and any Spark modules that you need:

# Import ADAM modules
from bdgenomics.adam.adamContext import ADAMContext
from bdgenomics.adam.rdd import AlignmentRecordRDD, CoverageRDD
from bdgenomics.adam.stringency import LENIENT, _toJava

# Import Mango modules
from bdgenomics.mango.rdd import GenomicVizRDD
from bdgenomics.mango.QC import CoverageDistribution

# Import Spark modules
from pyspark.sql import functions as sf

Next, create a Spark session. You will use this session to run SQL queries on variants.

# Create ADAM Context
ac = ADAMContext(spark)
genomicRDD = GenomicVizRDD(spark)
Variant analysis with Spark SQL

Load in a subset of variant data from chromosome 17:

genotypesPath = 's3://1000genomes/phase1/analysis_results/integrated_call_sets/ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz'
genotypes = ac.loadGenotypes(genotypesPath)

# repartition genotypes to balance the load across memory
genotypes_df  = genotypes.toDF()

You can take a look at the schema by printing the columns in the dataframe.

# cache genotypes and show the schema
genotypes_df.columns

This genotypes dataset contains all samples from the 1000 Genomes Project. Therefore, you will next filter genotypes to only consider samples that are in the NA19685 trio, and cache the results in memory.

# trio IDs
IDs = ['NA19685', 'NA19661','NA19660']

# Filter by individuals in the trio
trio_df = genotypes_df.filter(genotypes_df["sampleId"].isin(IDs))

trio_df.cache()
trio_df.count()

Next, add a new column to your dataframe that determines the genomic location of each variant. This is defined by the chromosome (contigName) and the start and end position of the variant.

# Add ReferenceRegion column and group by referenceRegion
trios_with_referenceRegion = trio_df.withColumn('ReferenceRegion', 
                    sf.concat(sf.col('contigName'),sf.lit(':'), sf.col('start'), sf.lit('-'), sf.col('end')))

Now, you can query your dataset to find de novo variants. But first, you must register your dataframe with Spark SQL.

#  Register df with Spark SQL
trios_with_referenceRegion.createOrReplaceTempView("trios")

Now that your dataframe is registered, you can run SQL queries on it. For the first query, select the names of variants belonging to sample NA19685 that have at least one alternative (ALT) allele.

# filter by alleles. This is a list of variant names that have an alternate allele for the child
alternate_variant_sites = spark.sql("SELECT variant.names[0] AS snp FROM trios \
                                    WHERE array_contains(alleles, 'ALT') AND sampleId == 'NA19685'") 

collected_sites = map(lambda x: x.snp, alternate_variant_sites.collect())

For your next query, filter sites in which the parents have both reference alleles. Then filter these variants by the set produced previously from the child.

# get parent records and filter by only REF locations for variant names that were found in the child with an ALT
filtered1 = spark.sql("SELECT * FROM trios WHERE sampleId == 'NA19661' or sampleId == 'NA19660' \
            AND !array_contains(alleles, 'ALT')")
filtered2 = filtered1.filter(filtered1["variant.names"][0].isin(collected_sites))
snp_counts = filtered2.groupBy("variant.names").count().collect()

# collect snp names as a list
snp_names = map(lambda x: x.names, snp_counts)
denovo_snps = [item for sublist in snp_names for item in sublist]
denovo_snps[:10]

Now that you have found some interesting variants, you can unpersist your genotypes from memory.

trio_df.unpersist()
Working with alignment data

You have found a lot of potential de novo variant sites. Next, you can visually verify some of these sites to see if the raw alignments match up with these de novo hits.

First, load in the alignment data for the NA19685 trio:

# load in NA19685 exome from s3a

childReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19685.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent1ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19660.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent2ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19661.mapped.illumina.mosaik.MXL.exome.20110411.bam'

childReads = ac.loadAlignments(childReadsPath, stringency=LENIENT)
parent1Reads = ac.loadAlignments(parent1ReadsPath, stringency=LENIENT)
parent2Reads = ac.loadAlignments(parent2ReadsPath, stringency=LENIENT)

Note that this example uses s3a:// instead of s3:// style URLs. The reason for this is that the ADAM formats use Java NIO to access BAM files. To do this, we are using a JSR 203 implementation for the Hadoop Distributed File System to access these files. This itself requires the s3a:// protocol. You can view that implementation in this GitHub repository.

You now have data alignment data for three individuals in your trio. However, the data has not yet been loaded into memory. To cache these datasets for fast subsequent access to the data, run the cache() function:

# cache child RDD and count records
# takes about 2 minutes, on 4 c3.4xlarge worker nodes 
childReads.cache()

# Count reads in the child
childReads.toDF().count()
# Output should be 95634679
Quality control of alignment data

One popular analysis to visually re-affirm the quality of genomic alignment data is by viewing coverage distribution. Coverage distribution gives you an idea of the read coverage that you have across a sample.

Next, generate a sample coverage distribution plot for the child alignment data on chromosome 17:

# Calculate read coverage
# Takes 2-3 minutes
childCoverage = childReads.transform(lambda x: x.filter(x.contigName == "17")).toCoverage()

childCoverage.cache()
childCoverage.toDF().count()

# Output should be 51252612

Now that coverage data is calculated and cached, compute the coverage distribution of chromosome 17 and plot the coverage distribution:

# Calculate coverage distribution

# You can check the progress in the SparkUI by navigating to 
# <PUBLIC_MASTER_DNS>:8088 and clicking on the currently running Spark application.
cd = CoverageDistribution(sc, childCoverage)
x = cd.plot(normalize=True, cumulative=False, xScaleLog=True, labels="NA19685")

This looks pretty standard because the data you are viewing is exome data. Therefore, you can see a high number of sights with low coverage and a smaller number of genomic positions with more than 100 reads. Now that you are done with coverage, you can unpersist these datasets to clear space in memory for the next analysis.

childCoverage.unpersist()
Viewing sites with missense variants in the proband

After verifying alignment data and filtering variants, you have four genes with potential missense mutations in the proband, including YBX2, ZNF286B, KSR1, and GNA13. You can visually verify these sites by filtering and viewing the raw reads of the child and parents.

First, view the child reads. If you zoom in to the location of the GNA13 variant (63052580-63052581), you can see a heterozygous T to A call:

# missense variant at GNA13: 63052580-63052581 (SNP rs201316886)
# Takes about 2 minutes to collect data from workers
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(childReads, contig, start, end)

It looks like there indeed is a variant at this position, possibly a heterozygous SNP with alternate allele A. Look at the parent data to verify that this variant does not appear in the parents:

# view missense variant at GNA13: 63052580-63052581 in parent 1
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(parent1Reads, contig, start, end)

This confirms the filter that this variant is indeed present only in the proband, but not the parents.

Summary

To summarize, this post demonstrated how to set up and run ADAM and Mango in Amazon EMR. We demonstrated how to use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, a publicly available dataset on Amazon S3. We used these tools inspect 1000 Genomes data quality, query for interesting variants in the genome, and validate results through the visualization of raw data.

For more information about Mango, see the Mango User Guide. If you have questions or suggestions, please comment below.

Additional Reading

If you found this post useful, be sure to check out Genomic Analysis with Hail on Amazon EMR and Amazon Athena, Interactive Analysis of Genomic Datasets Using Amazon Athena, and, on the AWS Compute Blog, Building High-Throughput Genomics Batch Workflows on AWS: Introduction (Part 1 of 4).

About the Author

Alyssa Marrow is a graduate student in the RISELab and Yosef Lab at the University of California Berkeley. Her research interests lie at the intersection of systems and computational biology. This involves building scalable systems and easily parallelized algorithms to process and compute on all that ‘omics data.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

At Goodreads, we’re currently in the process of decomposing our monolithic Rails application into microservices. For the vast majority of those services, we’ve decided to use Amazon DynamoDB as the primary data store. We like DynamoDB because it gives us consistent, single-digit-millisecond performance across a variety of our storage and throughput needs.

Although DynamoDB excels at high-throughput read and write workloads, it’s not optimized to support one-time, ad hoc queries or data warehouse workloads. However, by combining AWS Data Pipeline, Amazon S3, AWS Glue, and Amazon Athena we can export our dataset from DynamoDB to S3 and use Athena to run SQL queries against that dataset.

Architecture overview

Our architecture for this process is as follows:

  • AWS Data Pipeline scans data from a DynamoDB table and writes it to S3 as JSON. It broadcasts on an Amazon SNS topic when it is finished.
  • An AWS Glue job invoked by AWS Lambda converts the JSON data into Parquet.
  • An AWS Glue crawler adds the Parquet data in S3 to the AWS Glue Data Catalog, making it available to Athena for queries.

We could have queried the data in the JSON format, thus removing the extra step of the Parquet conversion. However, we decided to make the additional effort because Parquet is space-efficient and high-performing. For larger datasets, this approach translates not only into faster queries, but also cost savings.

Major architecture components

The major components of our architecture are described following.

Data Pipeline is an orchestration service for spinning up Amazon EMR clusters and running fault-tolerant jobs using big data technology like Apache Pig, Apache Hive, or Apache Spark. Data Pipeline provides a template for exporting data from an arbitrary DynamoDB table to Amazon S3. We use a slightly modified version of the standard export template.

In this version, we add the ability to send success or failure messages on Amazon SNS. Doing this lets us use Lambda to kick off further processing outside of the Data Pipeline service.

AWS Glue is used in three different ways in this architecture:

  • A serverless Apache Spark environment runs a job that converts the JSON export from Data Pipeline into the Apache Parquet format.
  • An AWS Glue crawler automatically crawls and infers the schema of our dataset and adds it to the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is the metadata store for our dataset so we can query the data with Athena.

Athena is used after the data is in the AWS Glue Data Catalog. At this point, you can query it in Athena with ANSI SQL.

Setting up infrastructure

In this process, we use AWS CloudFormation to manage our AWS resources. We’ve split the various AWS resources across three stacks to make them more composable.

The reviews.yaml template defines an example DynamoDB table called Reviews. The common.yaml template contains IAM and S3 resources that are shared across stacks. The dynamodb-exports.yaml template defines a Data Pipeline, Lambda function, AWS Glue job, and AWS Glue crawlers.

Working with the Reviews stack

The reviews.yaml CloudFormation template contains a simple DynamoDB table definition for storing user reviews on books. We’re using a hash key and sort key structure that nests each review on a book under a particular user. This structure allows an application to check if a user has a review on a book in a simple get operation and also to list all reviews by a user.

Working with the DynamoDB table schema

The table defined in reviews.yaml is a hash key and sort key table. The User attribute is the hash key, and the Book attribute is the sort key. If you build an application on this table, you might add additional Global Secondary Indexes (GSIs) to accommodate other access patterns, for example showing the highest weighted reviews for a book.

First, you create a CloudFormation stack:

  1. Click on this this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. Choose Create in the Review

Next, you create test items in Reviews table. After the ReviewsStack status is CREATE_COMPLETE, you can open up the DynamoDB console and explore the table. Let’s add a few items to the table:

  1. Open DynamoDB in the AWS Management Console.
  2. Choose Tables from the left navigation pane
  1. Choose the Reviews table, and then choose the Items

  1. Choose Create item, and in the Create item box, for Tree choose Text.

  1. Remove the existing text, and copy and paste the item following into the text box.
{
  "User": "Tristan",
  "Book": "Harry Potter and the Philosopher's Stone",
  "Rating": 5,
  "Review": "A thrilling journey through the world of Hogwarts",
  "Author": "J.K. Rowling"
}
  1. Choose Save.

Now let’s add one more item.

{
  "User": "Adeline",
  "Book": "Harry Potter and the Sorcerer's Stone",
  "Rating": 4,
  "Review": "Harry is pretty brave, but Hermione is the clear hero",
  "Author": "J.K. Rowling"
}

You can see that we’ve added a few different fields that were not specified in the table schema. Notably, these are: Rating, Review, and Author. Because DynamoDB is a NoSQL database, we can add new attributes as our application evolves. However, to aggregate against those attributes efficiently, they either need to be a part of the primary key schema at table creation or defined on an index.

The Goodreads reviews table is not dissimilar from our example table. However, we have used our maximum of five Global Secondary Indexes (GSIs) to support the access patterns that our users need the most. It’s no longer an option for us to create short-lived GSIs to answer arbitrary questions we have about our data. Even if we could, we have so much data that creating a GSI creates a few days.

Now imagine that our product team wants to run queries over the reviews data for arbitrary authors. We can’t add an additional GSI, and the access pattern isn’t required in production. However, by using the architecture described in this blog post we can unlock our dataset for our product team.

Feel free to add more items to the table, because the more data you have in the table when we export it the more interesting SQL queries you can run in Athena.

Creating the common stack

The common.yaml CloudFormation template creates a variety of IAM and EC2 permissions that Data Pipeline, Lambda, and AWS Glue use. In addition, the template creates a S3 bucket to store DynamoDB exports. The resources that need to be referenced across stacks are declared in the Outputs section.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. In the Capabilities section of Review, choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create.
Creating the DynamoDB exports stack

The dynamodb-exports.yaml template is a self-contained template to create a Data Pipeline, SNS topics, Lambda trigger, AWS Glue job, and an AWS Glue crawler for any given DynamoDB table. If you have multiple DynamoDB tables you want to export, you can reuse the dynamodb-exports.yaml template and create a stack for each table.

The most interesting part of this stack is the AWS Glue job script that converts an arbitrary DynamoDB export file created by the Data Pipeline task into Parquet. It also removes DynamoDB type information from the raw JSON by using Boto3, which is available in the PySpark AWS Glue environment. The code is well-documented, so don’t hesitate to dive in here if you’re interested in how to write a custom AWS Glue job.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. For parameters, enter the following and then choose Next:

ExportTimeout: 1

MaxConsumedReadThroughput: 0.5

TableName: Reviews

  1. On the Options screen, leave everything set to default and then choose Next.
  2. In the Review section, scroll to the bottom and choose Create.
Watching your data flow from DynamoDB to the AWS Glue Data Catalog

The pipeline from DynamoDB to the Apache Hive catalog is fully automated. After the CloudFormation stack to export Reviews is deployed, the data pipeline begins. You can query the data in Athena soon.

Monitor the data pipeline:

  1. Open AWS Data Pipeline in the console.
  2. Choose the pipeline with the name ReviewsExport.

  1. Monitor the pipeline as it goes through the various stages from provisioning a cluster to running your job.

  1. When the status is Finished, the data is in S3.

The pipeline sends a message on the export success SNS topic. Doing so triggers Lambda to invoke the AWS Glue job to convert the JSON export into Parquet.

Let’s monitor the AWS Glue job:

  1. Open AWS Glue in the console.
  2. Choose Jobs under the ETL header in the left navigation pane.
  3. Choose the check box next to ReviewsExportToParquet to view the job’s run history and other details. At this point, Run Status is in the Running

  1. The AWS Glue job is finished when the Run status reaches the Succeeded

Next, run the AWS Glue crawler:

  1. From the AWS Glue console page, choose Crawlers on the left navigation pane.
  2. Choose the check box next to ReviewsParquetCrawler.
  3. Choose Run crawler at the top of the page.

The first time the crawler runs, it adds the reviews table to the dynamodb-exports database in the AWS Glue Data Catalog. If you accumulate more export snapshots after you run the crawler, subsequent runs of the crawler add new partitions to the table,

Inspecting the reviews table in the AWS Glue Data Catalog

Next, look at the reviews table:

  1. From the AWS Glue console page, choose Tables.

  1. Choose reviews.

The AWS Glue Data Catalog is an Apache Hive–compatible metastore that stores the schema of the dataset. It stores properties such as object count and dataset location in S3, among other data.

Taking a look at the schema, you might notice the ddb_export_timestamp column, which wasn’t originally a part of the attributes that we added to the items in DynamoDB. Under the key column, ddb_export_timestamp is marked as Partition (0).  Partition columns are just like regular columns, and when they are used in WHERE clauses in Athena they allow you to restrict the amount of data scanned. The Athena partitions documentation is a great place to start if you want to know more.

The Lambda function that invokes the Parquet conversion script provides this extra metadata. So, when the AWS Glue crawler infers the schema, the partition is given a useful name, as opposed to the default partition_N name that is given if no partition name is present.

Using Athena to query the dataset

To use Athena to query the dataset, take these steps:

  1. Open Athena on the console.
  2. If you haven’t done so already, upgrade Athena to use the Hive Catalog.
  3. For Database on the left navigation pane, choose dynamodb-exports.

  1. Under Tables, you can see reviews.
  2. Choose the ellipsis at right of reviews, and choose Preview table.

You just ran a SQL query over your DynamoDB dataset! Let’s run an aggregation to count how many reviews J.K. Rowling has. As you might recall, this access pattern isn’t well-supported by our DynamoDB table design.

SELECT COUNT(author) as num_reviews FROM "dynamodb-exports"."reviews"
WHERE author = 'J.K. Rowling';

You might see different results if you added more items, but here are the results we see in our table.

With Athena, as your data grows in size or complexity, you can pull insights out of data from DynamoDB using ANSI SQL.

Next steps

Here are a few ways that you can extend this work:

  • Modify the Data Pipeline to run the DynamoDB export every night at midnight local time to you.
  • Run the AWS Glue Crawler every day at 4 a.m. local time so you always have the latest snapshot of your data in DynamoDB.
  • Use the export success topic to trigger more complex pipelines or aggregations.
  • Combine this approach with building a data lake in S3.
Conclusion

In this post, we show you how to export data from a DynamoDB table, convert it into a more efficient format with AWS Glue, and query the data with Athena. This approach gives you a way to pull insights from your data stored in DynamoDB.

Additional Reading

If you found this post useful, be sure to check these out as well:

About the Author

Joe Feeney is a Software Engineer on the Amazon Author team where he leverages all of Amazon’s data to provide Authors with unique, actionable insights. He enjoys losing to his wife and kids at Mario Kart, and building and modifying guitars.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Many enterprises have highly regulated policies around cloud security. Those policies might be even more restrictive for Amazon EMR where sensitive data is processed.

EMR provides security configurations that allow you to set up encryption for data at rest stored on Amazon S3 and local Amazon EBS volumes. It also allows the setup of Transport Layer Security (TLS) certificates for the encryption of data in transit.

When in-transit encryption is enabled, EMR supports the following components by default:

  • Hadoop MapReduce Encrypted Shuffle.
  • Secure Hadoop RPC set to Privacy and using SASL, which is activated in EMR when data at rest encryption is enabled.
  • Secure Hadoop RPC set to Privacy and using SASL. This is activated in EMR when data at rest encryption is enabled in the security configuration.
  • Presto internal communication between nodes using SSL/TLS. This applies only to EMR version 5.6.0 and later.
  • Tez Shuffle Handler using TLS.
  • Internal RPC communication between Apache Spark
  • HTTP protocol communication with user interfaces, such as Spark History Server and HTTPS-enabled file servers encrypted using Spark’s Secure Sockets Layer (SSL) configuration.

For more information about EMR in-transit encryption, see Encryption Options.

A security configuration provides the following options to specify TLS certificates:

  1. As a path to a .zip file in an S3 bucket that contains all certificates
  2. Through a custom certificate provider as a Java class

In many cases, company security policies prohibit storing any type of sensitive information in an S3 bucket, including certificate private keys. For that reason, the only remaining option to secure data in transit on EMR is to configure the custom certificate provider.

In this post, I guide you through the configuration process, showing you how to secure data in transit on EMR using the TLS custom certificate provider.

Required knowledge

To walk through this solution, you should know or be familiar with:

Solution overview

The custom certificate provider is a Java class that implements the TLSArtifactsProvider interface and compiles it into a JAR file. The TLSArtifactsProvider interface is available in the AWS SDK for Java version 1.11.+.

The TLSArtifactsProvider interface provides the TLSArtifacs method, which as argument expects certificates.

To make this solution work, you need a secure place to store certificates that can also be accessed by Java code.

In this example, use Parameter Store, which supports encryption using the AWS Key Management Service (AWS KMS) key.

Another way would be to store encrypted certificates in Amazon DynamoDB.

The following diagram and steps show the configuration process from the Java standpoint:

  1. During bootstrap, EMR downloads the Java JAR file from the S3 bucket, and runs it on each node.
  2. Java invokes the Lambda function, requesting the value of a specific parameter key.
  3. Lambda calls Parameter Store to get the value. The value returned by Systems Manager remains encrypted.
  4. Lambda returns the encrypted value back to Java.
  5. Java decrypts the value using an AWS KMS API call.
  6. The decrypted value is converted to the correct format of the certificate.
  7. The process repeats for all certificates.
  8. Certificates are returned back to EMR through the TLSArtifactsProvider interface.

In this example, for the master node, I used a certificate signed by a certificate authority (CA) and wildcard self-signed certificate for slave nodes. Depending on requirements, you can use CA certificates for all nodes or only a self-signed wildcard certificate.

Implementing in-transit encryption

This section walks you through all aspects of implementation and configuration for in-transit encryption using a custom certificate provider.

Create a self-signed wildcard certificate

To create a self-signed wildcard certificate, you can use OpenSSL:

openssl req -x509 -newkey rsa:4096 -keyout inter-nodes.key -out inter-nodes.crt -days 365 -subj "/C=US/ST=MA/L=Boston/O=EMR/OU=EMR/CN=*.ec2.internal" -nodes


This command creates a self-signed, 4096-bit certificate.

Explanation of parameter:

-keyout – The output file in which to store the private key.

-out – The output file in which to store the certificate.

-days – The number of days for which to certify the certificate.

-subj – The subject name for a new request.  The CN must match the domain name specified in DHCP that is assigned to the virtual private cloud (VPC). The default is ec2.internal. The “*” prefix is the wildcard certificate.

-nodes – Allows you to create a private key without a password, which is without encryption.

For more information, see req command.

Upload certificates

To upload certificates to the Parameter Store, run the following AWS Command Line Interface (AWS CLI) command for each certificate file, including private keys:

aws ssm put-parameter --name <parameter key name> --key-id < KMS key ID> --type SecureString --value file://<path to certificate file>

The following are examples of uploaded CA and self-signed certificate files:

aws ssm put-parameter --name /emr/certificate --value fileb://emr-ca-certificate.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb://emr-ca-private-key.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

The following are examples of uploading certificates when the wildcard certificate is used on all nodes:

aws ssm put-parameter --name /emr/certificate --value fileb:// inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb:// inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1
Using the Lambda function

The Lambda function in this solution is a broker that allows Java JAR to retrieve certificates from Parameter Store.

Create a new role for the Lambda function, using the following command:

aws iam create-role --role-name lambda-ssm-parameter-store-role --assume-role-policy-document "{\"Version\": \"2012-10-17\", \"Statement\": [{\"Effect\": \"Allow\",\"Principal\": {\"Service\": \"lambda.amazonaws.com\"},\"Action\": \"sts:AssumeRole\"}]}"

Grant permissions to Parameter Store, using the following command:

aws iam put-role-policy --role-name lambda-ssm-parameter-store-role --policy-name ssm --policy-document "{\"Version\": \"2012-10-17\",\"Statement\": [{\"Effect\": \"Allow\",\"Action\": \"ssm:GetParameter\",\"Resource\": \"*\"}]}"

Create a new Lambda function:

To create a new Lambda function, open the AWS Management Console, choose Lambda, and choose Create function. On the Create function page, complete the form as shown in the following screenshot:

Choose runtime as Python 2.7, and specify the role that you created for the Lambda function.

When the new function is created, add the following code in the Function code section:

import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):

    ssmResp = ssm.get_parameter(
        Name=event['ParameterName'],
        WithDecryption=False
    )

    paramValue = ssmResp['Parameter']['Value']
    return(paramValue)

Change the timeout to 1 minute, and then save the function.

Tag resources

For the Java class to call the Lambda function, you must provide information about the function name and names of parameter keys under which the certificates are stored.

To reuse the same Java JAR with different certificates and configurations, provide those values to Java through EMR tags, rather than embedding them in Java code.

In this example, I used the following tags:

  • ssm:ssl:certificate – The name of the Systems Manager parameter key storing the CA-signed certificate.
  • ssm:ssl:private-key – The name of the Systems Manager parameter key storing the CA-signed certificate private key.
  • ssm:ssl:inter-node-certificate – The name of the Systems Manager parameter key storing the self-signed certificate.
  • ssm:ssl:inter-node-private-key – The name of the Systems Manager parameter key storing the self-signed certificate private key.
  • tls:lambda-fn-name – The name of the Lambda function. In this example, this is get-ssm-parameter-lambda.

Use the Java class flow

This section describes the flow in the Java code only. You can download the full code alone with the compiled JAR file from GitHub. For more information, see the Java folder in the emr-tls-security GitHub repo.

Important

Because of EMR dependencies, all other methods must be implemented based on the AWS SDK for Java version 1.10.75. These dependencies do not include the TLSArtifactsProvider interface that should be imported from the AWS SDK for Java version 1.11.170 (aws-java-sdk-emr-1.11.170.jar).

All necessary dependencies are included in the example project.

The following is an example of the basic structure of the Java class, with an implementation of the TLSArtifactsProvider interface:

public class emrtls extends TLSArtifactsProvider {

            public emrtls() {

            }

            @Override

            public TLSArtifacts getTlsArtifacts() {

 

                        List<Certificate> crt = new ArrayList<Certificate>();

                        List<Certificate> crtCA = new ArrayList<Certificate>();

                        PrivateKey privkey;

           

                        //here code to retrieve certificates from secure location

                        // and assign them to local variables

                       

                        TLSArtifacts tls = new TLSArtifacts(privkey,crt,crtCA);

                        return tls;

            }

}

The code to add is related to getting certificates from a secure location.

In the provided code example from GitHub, the following logic was implemented. I’ve listed the methods used in each step.

  1. Read the names of the Systems Manager parameter key. Also read the name of the AWS Lambda function from the EMR tags (see “Tagging” section) – readTags()
  2. Invoke the Lambda function to download certificates from Parameter Store – callLambda():
    • Decrypt the values returned by Lambda using the KMS API call – decryptValue().
    • Assign decrypted values to local variables.
  3. If needed, save CA-signed certificates to a local disk. For more information, see Other Communication – Hue section later in this post– createDirectoryForCerts() and writeCert().
  4. Convert certificates to an X509 format – getX509FromString().
  5. Convert the private key to the correct format – getPrivateKey().
  6. Call the getTlsArtifacts() method to provide certificates in arguments.

You can use a wildcard certificate for all nodes without changing code. Reference the same Systems Manager parameter key in ssm:ssl:certificate/ssm:ssl:private-key, and in the ssm:ssl:inter-node-certificate/ ssm:ssl:inter-node-private-key in EMR tags.

If the implemented methods in the example code meet requirements, you can use the provided Java JAR file in the EMR security configuration, as described in the next section. Otherwise, any changes in code require a compile of Java code into a JAR file.

Create the EMR security configuration

Before creating the security configuration, upload the compiled Java JAR file to an S3 bucket.

To create the security configuration:

  1. Log in to the Amazon EMR console.
  2. Choose Security configurations, Create.
  3. Type a name for your new security configuration; for example, emr-tls-ssm
  4. Select In-transit encryption.
  5. Under TLS certificate provider, for Certificate provider type, choose Custom.
  6. For S3 object, type the path to the uploaded Java JAR file.
  7. For Certificate provider class, type the name of the Java class. In the example code, the name is emrtls.
  8. Configure the At-rest encryption, as required.
  9. Choose Create.

Modify the instance profile role

Applications running on EMR assumes and uses the EMR role for EC2 to interact with other AWS services.

To grant Java permissions to invoke Lambda, and to decrypt certificates, add the following policy to your EC2 instance profile role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TLS",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:xxxxxxxxxx:function:get-ssm-parameter-lambda",
                "arn:aws:kms:us-east-1:xxxxxxxxxx:key/<your KMS key used to encrypt certificates in AWS Systems Manager"
            ]
        }
    ]
}

Note

Before creating the policy, update resources with the correct Amazon Resource Name (ARN) for the Lambda function and KMS key.

Other available configurations

In addition to the applications that natively support in-transit encryption in EMR, the custom TLS certificate provider can also be used to secure communication (HTTPS) for other applications like Presto, Hue, and Zeppelin.

The sections that follow describe the configuration of each application that works with the certificates set up by the TLS security configuration.

Presto

For Presto, most configuration is done by EMR when TLS certificates are applied.

Depending on the type of certificates used, there are two additional configurations that must be added:

  1. When the CA-signed certificate with a single common name (not wildcard) is set on the master node, additional configurations are required:
    • The certificate common name must be registered in DNS. The EMR cluster must be able to resolve that name to the IP address of the master node. One solution would be to run a script on the bootstrap action to register the IP address of the EMR master node and name in DNS.
    • The Discovery URI in the Presto configuration file must match the certificate common name. The value of uri must be changed on all nodes. This can be accomplished by two provided scripts.

      Each script must be uploaded to an S3 bucket to which the EMR cluster has permission.
      The first script, emr-presto-conf.sh, must be run on the EMR bootstrap action, as follows where the value of “Args” is the certificate common name:

      {
        "Name": "PrestoConfiguration",
        "ScriptBootstrapAction": {
          "Path": "s3://xxxxx/emr-presto-conf.sh",
          "Args": [ "emr.mycluster.com" ]
        }
      }

      The PrestoConfiguration bootstrap action downloads and runs a script (presto-update-dicovery-uri.sh) as a background process. This script waits for the Presto server to be installed and then modify the configuration files.
      Before uploading the emr-presto-conf.sh script to the Amazon S3 bucket, change the path to “presto-update-dicovery-uri.sh”
      Both scripts can be downloaded from GitHub:
      https://github.com/aws-samples/emr-tls-security/tree/master/scripts

    • 2. When a self-signed wildcard certificate is used on the master node, the certificate must be added to the Java default truststore. This can be accomplished by running the following script:

      #!/bin/bash
      
      truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)
      
      sudo keytool -importkeystore -srckeystore /usr/share/aws/emr/security/conf/truststore.jks -destkeystore /usr/lib/jvm/java/jre/lib/security/cacerts -deststorepass changeit -srcstorepass $truststorePass

      The previous script can be run on the EMR step. The following is an AWS CloudFormation snippet:

      "EMRPrestoTrustedStorStep": {
            "Type": "AWS::EMR::Step",
            "Properties": {
              "ActionOnFailure": "CONTINUE",
              "HadoopJarStep": {
                "Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
                "Args": [
                  "s3://xxxxxx/presto-update-trusted-store.sh"
                ]
              },
              "JobFlowId": {
                "Ref": "EMRCluster"
              },
              "Name": "EMR-Setup-Presto-Trusted-Store"
            }
          }
      
      Hue

      To configure access to the Hue UI over HTTPS, the path to the certificate and private key files must be specified in the hue.ini file. Because our Java class has methods, createDirectoryForCerts() and writeCert(), which support exporting TLS certificates to the local disk, the remaining configuration should point to those files in the hue.ini file.

      This configuration can be applied by adding the following configuration to the EMR cluster:

               [{
                  "Classification": "hue-ini",
                  "Configurations": [
                    {
                      "Classification": "desktop",
                      "ConfigurationProperties": {
                        	"ssl_certificate": "/etc/certs/public.crt",
                       	"ssl_private_key": "/etc/certs/private.key"
                      }
      	}]
                }]

      The port for the HTTPS connection to Hue remains the same. The default is: 8888

      Zeppelin

      Unlike Hue, Zeppelin configuration files reference certificates from the Java keystore.

      Because EMR already added all certificates to the Java keystore, the only modification needed is to reference the same Java keystore files and password in zeppelin-site.xml.

      The path to the Java keystore and the password can be read directly from the Presto configuration file.

      This configuration can be done by running the following script on EMR:

      #!/bin/bash
      sudo cp /etc/zeppelin/conf/zeppelin-site.xml.template /etc/zeppelin/conf/zeppelin-site.xml
      truststorePath=$(grep -Po "(?<=^internal-communication.https.keystore.path = ).*" /etc/presto/conf/config.properties)
      truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)
      keystorePath=$(grep -Po "(?<=^http-server.https.keystore.path = ).*" /etc/presto/conf/config.properties)
      keystorePass=$(grep -Po "(?<=^http-server.https.keystore.key = ).*" /etc/presto/conf/config.properties)
      keymanager=$(grep -Po "(?<=^http-server.https.keymanager.password = ).*" /etc/presto/conf/config.properties)
      sudo sed -i '/<name>zeppelin.server.port<\/name>/!b;n;c<value>8890<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      sudo sed -i '/<name>zeppelin.server.ssl.port<\/name>/!b;n;c<value>7773<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      sudo sed -i '/<name>zeppelin.ssl<\/name>/!b;n;c<value>true<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      sudo sed -i '/<name>zeppelin.ssl.keystore.path<\/name>/!b;n;c<value>'"$keystorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      sudo sed -i '/<name>zeppelin.ssl.keystore.password<\/name>/!b;n;c<value>'"$keystorePass"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      sudo sed -i '/<name>zeppelin.ssl.truststore.path<\/name>/!b;n;c<value>'"$truststorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
      CONTENT1="<property>\n  <name>zeppelin.ssl.truststore.password</name>\n  <value>${truststorePass}</value>\n</property>"
      sudo sed -i '/<\/configuration>/i'"$CONTENT1" /etc/zeppelin/conf/zeppelin-site.xml
      CONTENT2="<property>\n  <name>zeppelin.ssl.key.manager.password</name>\n  <value>${keymanager}</value>\n</property>"
      sudo sed -i '/<\/configuration>/i'"$CONTENT2" /etc/zeppelin/conf/zeppelin-site.xml
      sudo stop zeppelin
      sudo..
Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

You can increase your savings by taking advantage of the dynamic scaling feature set available in Amazon EMR. The ability to scale the number of nodes in your cluster up and down on the fly is among the major features that make Amazon EMR elastic. You can take advantage of scaling in EMR by resizing your cluster down when you have little or no workload. You can also scale your cluster up to add processing power when the job gets too slow. This allows you to spend just enough to cover the cost of your job and little more.

Knowing the complex logic behind this feature can help you take advantage of it to save on cluster costs. In this post, I detail how EMR clusters resize, and I present some best practices for getting the maximum benefit and resulting cost savings for your own cluster through this feature.

EMR scaling is more complex than simply adding or removing nodes from the cluster. One common misconception is that scaling in Amazon EMR works exactly like Amazon EC2 scaling. With EC2 scaling, you can add/remove nodes almost instantly and without worry, but EMR has more complexity to it, especially when scaling a cluster down. This is because important data or jobs could be running on your nodes.

To prevent data loss, Amazon EMR scaling ensures that your node has no running Apache Hadoop tasks or unique data that could be lost before removing your node. It is worth considering this decommissioning delay when resizing your EMR cluster. By understanding and accounting for how this process works, you can avoid issues that have plagued others, such as slow cluster resizes and inefficient automatic scaling policies.

When an EMR scale cluster is scaled down, two different decommission processes are triggered on the nodes that will be terminated. The first process is the decommissioning of Hadoop YARN, which is the Hadoop resource manager. Hadoop tasks that are submitted to Amazon EMR generally run through YARN, so EMR must ensure that any running YARN tasks are complete before removing the node. If for some reason the YARN task is stuck, there is a configurable timeout to ensure that the decommissioning still finishes. When this timeout happens, the YARN task is terminated and is instead rescheduled to a different node so that the task can finish.

The second decommission process is that of the Hadoop Distributed File System or HDFS. HDFS stores data in blocks that are spread through the EMR cluster on any nodes that are running HDFS. When an HDFS node is decommissioning, it must replicate those data blocks to other HDFS nodes so that they are not lost when the node is terminated.

So how can you use this knowledge in Amazon EMR?

Tips for resizing clusters

The following are some issues to consider when resizing your clusters.

EMR clusters can use two types of nodes for Hadoop tasks: core nodes and task nodes. Core nodes host persistent data by running the HDFS DataNode process and run Hadoop tasks through YARN’s resource manager. Task nodes only run Hadoop tasks through YARN and DO NOT store data in HDFS.

When scaling down task nodes on a running cluster, expect a short delay for any running Hadoop task on the cluster to decommission. This allows you to get the best usage of your task node by not losing task progress through interruption. However, if your job allows for this interruption, you can adjust the one hour default timeout on the resize by adjusting the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml. When this process times out, your task node is shut down regardless of any running tasks. This process is usually relatively quick, which makes it fast to scale down task nodes.

When you’re scaling down core nodes, Amazon EMR must also wait for HDFS to decommission to protect your data. HDFS can take a relatively long time to decommission. This is because HDFS block replication is throttled by design through configurations located in hdfs-site.xml. This in turn means that HDFS decommissioning is throttled. This protects your cluster from a spiked workload if a node goes down, but it slows down decommissioning. When scaling down a large number of core nodes, consider adjusting these configurations beforehand so that you can scale down more quickly.

For example, consider this exercise with HDFS and resizing speed.

The HDFS configurations, located in hdfs-site.xml, have some of the most significant impact on throttling block replication:

  • datanode.balance.bandwidthPerSec: Bandwidth for each node’s replication
  • namenode.replication.max-streams: Max streams running for block replication
  • namenode.replication.max-streams-hard-limit: Hard limit on max streams
  • datanode.balance.max.concurrent.moves: Number of threads used by the block balancer for pending moves
  • namenode.replication.work.multiplier.per.iteration: Used to determine the number of blocks to begin transfers immediately during each replication interval

(Beware when modifying: Changing these configurations improperly, especially on a cluster with high load, can seriously degrade cluster performance.)

Cluster resizing speed exercise

Modifying these configurations can speed up the decommissioning time significantly. Try the following exercise to see this difference for yourself.

  1. Create an EMR cluster with the following hardware configuration:
  • Master: 1 node – m3.xlarge
  • Core: 6 nodes – m3.xlarge
  1. Connect to the master node of your cluster using SSH (Secure Shell).

For more information, see Connect to the Master Node Using SSH in the Amazon EMR documentation.

  1. Load data into HDFS by using the following jobs:
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar teragen 1000000000 /user/hadoop/data1/
$ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest  hdfs:///user/hadoop/data2/
  1. Edit your hdfs-site.xml configs:
$ sudo vim /etc/hadoop/conf/hdfs-site.xml

Then paste in the following configuration setup in the hdfs-site properties.

Disclaimer: These values are relatively high for example purposes and should not necessarily be used in production. Be sure to test config values for production clusters under load before modifying them.

<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>100m</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>100</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>200</value>
</property>

<property>
  <name>dfs.datanode.balance.max.concurrent.moves</name>
  <value>500</value>
</property>

<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>30</value>
</property>
  1. Resize your EMR cluster from six to five core nodes, and look in the EMR events tab to see how long the resize took.
  2. Repeat the previous steps without modifying the configurations, and check the difference in resize time.

While performing this exercise, I saw resizing time lower from 45+ minutes (without config changes) down to about 6 minutes (with modified hdfs-site configs). This exercise demonstrates how much HDFS is throttled under default configurations. Although removing these throttles is dangerous and performance using them should be tested first, they can significantly speed up decommissioning time and therefore resizing.

The following are some additional tips for resizing clusters:

  • Shrink resizing timeouts. You can configure EMR nodes in two ways: instance groups or instance fleets. For more information, see Create a Cluster with Instance Fleets or Uniform Instance Groups. EMR has implemented shrink resize timeouts when nodes are configured in instance fleets. This timeout prevents an instance fleet from attempting to resize forever if something goes wrong during the resize. It currently defaults to one day, so keep it in mind when you are resizing an instance fleet down.

If an instance fleet shrink request takes longer than one day, it finishes and pauses at however many instances are currently running. On the other hand, instance groups have no default shrink resize timeout. However, both types have the one-hour YARN timeout described earlier in the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml.

  • Watch out for high frequency HDFS writes when resizing core nodes. If HDFS is receiving a lot of writes, it will modify a large number of blocks that require replication. This replication can interfere with the block replication from any decommissioning core nodes and significantly slow down the resizing process.
Setting up policies for automatic scaling

Although manual scaling is useful, a majority of the time cluster resizes are executed dynamically through Amazon EMR automatic scaling. Generally, the details of the automatic scaling policy must be tailored to the specific Hadoop job, so I won’t go into detail there. Instead, I provide some general guidelines for setting up your cluster’s auto scaling policies.

The following are some considerations when setting up your auto scaling policy.

Metrics for scaling

Choose the right metrics for your node types to trigger scaling. For example, scaling core nodes solely on the YARNMemoryAvailablePercentage metric doesn’t make sense. This is because you would be increasing/decreasing HDFS total size when really you only need more processing power. Scaling task nodes on HDFSUtilization also doesn’t make sense because you would want more HDFS storage space that does not come with task nodes. A common automatic scaling metric for core nodes is HDFSUtilization. Common auto scaling metrics for task nodes include ContainerPending-Out and YarnMemoryAvailablePercentage.

Note: Keep in mind that Amazon EMR currently requires HDFS, so you must have at least one core node in your cluster. Core nodes can also provide CPU and memory resources. But if you don’t need to scale HDFS, and you just need more CPU or memory resources for your job, we recommend that you use task nodes for that purpose.

Scaling core nodes

As described earlier, one of the two EMR node types in your cluster is the core node. Core nodes run HDFS, so they have a longer decommissioning delay. This means that they are slow to scale and should not be aggressively scaled. Only adding and removing a few core nodes at a time will help you avoid scaling issues. Unless you need the HDFS storage, scaling task nodes is usually a better option. If you find that you have to scale large numbers of core nodes, consider changing hdfs-site.xml configurations to allow faster decommission time and faster scale down.

Scaling task nodes

Task nodes don’t run HDFS, which makes them perfect for aggressively scaling with a dynamic job. When your Hadoop task has spikes of work between periods of downtime, this is the node type that you want to use.

You can set up task nodes with a very aggressive auto scaling policy, and they can be scaled up or down easily. If you don’t need HDFS space, you can use task nodes in your cluster.

Using Spot Instances

Automatic scaling is a perfect time to use EMR Spot Instance types. The tendency of Spot Instances to disappear and reappear makes them perfect for task nodes. Because these task nodes are already used to scale in and out aggressively, Spot Instances can have very little disadvantage here. However, for time-sensitive Hadoop tasks, On-Demand Instances might be prioritized for the guaranteed availability.

Scale-in vs. scale-out policies for core nodes

Don’t fall into the trap of making your scale-in policy the exact opposite of your scale-out policy, especially for core nodes. Many times, scaling in results in additional delays for decommissioning. Take this into account and allow your scale-in policy to be more forgiving than your scale-out policy. This means longer cooldowns and higher metric requirements to trigger resizes.

You can think of scale-out policies as easily triggered with a low cooldown and small node increments. Scale-in policies should be hard to trigger, with larger cooldowns and node increments.

Minimum nodes for core node auto scaling

One last thing to consider when scaling core nodes is the yarn.app.mapreduce.am.labels property located in yarn-site.xml. In Amazon EMR, yarn.app.mapreduce.am.labels is set to “CORE” by default, which means that the application master always runs on core nodes and not task nodes. This is to help prevent application failure in a scenario where Spot Instances are used for the task nodes.

This means that when setting a minimum number of core nodes, you should choose a number that is greater than or at least equal to the number of simultaneous application masters that you plan to have running on your cluster. If you want the application master to also run on task nodes, you should modify this property to include “TASK.” However, as a best practice, don’t set the yarn.app.mapreduce.am.labels property to TASK if Spot Instances are used for task nodes.

Aggregating data using S3DistCp

Before wrapping up this post, I have one last piece of information to share about cluster resizing. When resizing core nodes, you might notice that HDFS decommissioning takes a very long time. Often this is the result of storing many small files in your cluster’s HDFS. Having many small files within HDFS (files smaller than the HDFS block size of 128 MB) adds lots of metadata overhead and can cause slowdowns in both decommissioning and Hadoop tasks.

Keeping your small files to a minimum by aggregating your data can help your cluster and jobs run more smoothly. For information about how to aggregate files, see the post Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3.

Summary

In this post, you read about how Amazon EMR resizing logic works to protect your data and Hadoop tasks. I also provided some additional considerations for EMR resizing and automatic scaling. Keeping these practices in mind can help you maximize cluster savings by allowing you to use only the required cluster resources.

If you have questions or suggestions, please leave a comment below.

Additional Reading

If you found this post useful, be sure to check out Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3 and Dynamically Scale Applications on Amazon EMR with Auto Scaling.

About the Author

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Extract, transform, and load (ETL) operations collectively form the backbone of any modern enterprise data lake. It transforms raw data into useful datasets and, ultimately, into actionable insight. An ETL job typically reads data from one or more data sources, applies various transformations to the data, and then writes the results to a target where data is ready for consumption. The sources and targets of an ETL job could be relational databases in Amazon Relational Database Service (Amazon RDS) or on-premises, a data warehouse such as Amazon Redshift, or object storage such as Amazon Simple Storage Service (Amazon S3) buckets. Amazon S3 as a target is especially commonplace in the context of building a data lake in AWS.

AWS offers AWS Glue, which is a service that helps author and deploy ETL jobs. AWS Glue is a fully managed extract, transform, and load service that makes it easy for customers to prepare and load their data for analytics. Other AWS Services also can be used to implement and manage ETL jobs. They include: AWS Database Migration Service (AWS DMS), Amazon EMR (using the Steps API), and even Amazon Athena.

The challenge of orchestrating an ETL workflow

How can we orchestrate an ETL workflow that involves a diverse set of ETL technologies? AWS Glue, AWS DMS, Amazon EMR, and other services support Amazon CloudWatch Events, which we could use to chain ETL jobs together. Amazon S3, the central data lake store, also supports CloudWatch Events. But relying on CloudWatch Events alone means that there’s no single visual representation of the ETL workflow. Also, tracing the overall ETL workflow’s execution status and handling error scenarios can become a challenge.

In this post, I show you how to use AWS Step Functions and AWS Lambda for orchestrating multiple ETL jobs involving a diverse set of technologies in an arbitrarily-complex ETL workflow. AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components. Each component performs a discrete function, or task, allowing you to scale and change applications quickly.

Let’s look at an example ETL workflow.

Example datasets for the ETL workflow

For our example, we’ll use two publicly available Amazon QuickSight datasets.

The first dataset is a sales pipeline dataset, which contains a list of slightly more than 20,000 sales opportunity records for a fictitious business. Each record has fields that specify:

  • A date, potentially when an opportunity was identified.
  • The salesperson’s name.
  • A market segment to which the opportunity belongs.
  • Forecasted monthly revenue.

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this example we’ll assume that they are related.

The example ETL workflow requirements

Imagine there’s a business user who needs to answer questions based on both datasets. Perhaps the user wants to explore the correlations between online user engagement metrics on the one hand, and forecasted sales revenue and opportunities generated on the other hand. The user engagement metrics include website visits, mobile users, and desktop users.

The steps in the ETL workflow are:

Process the Sales dataset (PSD). Read the Sales dataset. Group records by day, aggregating the Forecasted Monthly Revenue field. Rename fields to replace white space with underscores. Output the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Process the Marketing dataset (PMD). Read the Marketing dataset. Rename fields to replace white space with underscores. Send the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Join Marketing and Sales datasets (JMSD). Read the output of the processed Sales and Marketing datasets. Perform an inner join of both datasets on the date field. Sort in ascending order by date. Send the final joined dataset to Amazon S3, and overwrite any previous output.

So far, this ETL workflow can be implemented with AWS Glue, with the ETL jobs being chained by using job triggers. But you might have other requirements outside of AWS Glue that are part of your end-to-end data processing workflow, such as the following:

  • Both Sales and Marketing datasets are uploaded to an S3 bucket at random times in an interval of up to a week. The PSD job should start as soon as the Sales dataset file is uploaded. The PMD job should start as soon as the Marketing dataset file is uploaded. Parallel ETL jobs can start and finish anytime, but the final JMSD job can start only after all parallel ETL jobs are complete.
  • In addition to PSD and PMD jobs, the orchestration must support more parallel ETL jobs in the future that contribute to the final dataset aggregated by the JMSD job. The additional ETL jobs could be managed by AWS services, such as AWS Database Migration Service, Amazon EMR, Amazon Athena or other non-AWS services.

The data engineer takes these requirements and builds the following ETL workflow chart.

To fulfill the requirements, we need a generic ETL orchestration solution. A serverless solution is even better.

The ETL orchestration architecture and events

Let’s see how we can orchestrate an ETL workflow to fulfill the requirements using AWS Step Functions and AWS Lambda. The following diagram shows the ETL orchestration architecture and the main flow of events.

The main flow of events starts with an AWS Step Functions state machine. This state machine defines the steps in the orchestrated ETL workflow. A state machine can be triggered through Amazon CloudWatch based on a schedule, through the AWS Command Line Interface (AWS CLI), or using the various AWS SDKs in an AWS Lambda function or some other execution environment.

As the state machine execution progresses, it invokes the ETL jobs. As shown in the diagram, the invocation happens indirectly through intermediary AWS Lambda functions that you author and set up in your account. We’ll call this type of function an ETL Runner.

While the architecture in the diagram shows Amazon Athena, Amazon EMR, and AWS Glue, the accompanying code sample (aws-etl-orchestrator) includes a single ETL Runner, labeled AWS Glue Runner Function in the diagram. You can use this ETL Runner to orchestrate AWS Glue jobs. You can also follow the pattern and implement more ETL Runners to orchestrate other AWS services or non-AWS tools.

ETL Runners are invoked by activity tasks in Step Functions. Because of the way AWS Step Functions’ activity tasks work, ETL Runners need to periodically poll the AWS Step Functions state machine for tasks. The state machine responds by providing a Task object. The Task object contains inputs which enable an ETL Runner to run an ETL job.

As soon as an ETL Runner receives a task, it starts the respective ETL job. An ETL Runner maintains a state of active jobs in an Amazon DynamoDB table. Periodically, the ETL Runner checks the status of active jobs. When an active ETL job completes, the ETL Runners notifies the AWS Step Functions state machine. This allows the ETL workflow in AWS Step Functions to proceed to the next step.

An important question may come up. Why does an ETL Runner run independently from your Step Functions state machine and poll for tasks? Can’t we instead directly invoke an AWS Lambda function from the Step Functions state machine? Then can’t we have that function start and monitor an ETL job until it completes?

The answer is that AWS Lambda functions have a maximum execution duration per request of 300 seconds, or 5 minutes. For more information, see AWS Lambda Limits. ETL jobs typically take more than 5 minutes to complete. If an ETL Runner function is invoked directly, it will likely time out before the ETL job completes. Thus, we follow the long-running worker approach with activity tasks. The worker in this code sample – the ETL Runner – is an AWS Lambda function that gets triggered on a schedule using CloudWatch Events. If you want to avoid managing the polling schedule through CloudWatch Events, you can implement a polling loop in your ETL workflow’s state machine. Check the AWS Big Data blog post Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy for an example.

Finally, let’s discuss how we fulfill the requirement of waiting for Sales and Marketing datasets to arrive in an S3 bucket at random times. We implement these waits as two separate activity tasks: Wait for Sales Data and Wait for Marketing Data. A state machine halts execution when it encounters either of these activity tasks. A CloudWatch Events event handler is then configured on an Amazon S3 bucket, so that when Sales or Marketing dataset files are uploaded to the bucket, Amazon S3 invokes an AWS Lambda function. The Lambda function then signals the waiting state machine to exit the activity task corresponding to the uploaded dataset. The subsequent ETL job is then invoked by the state machine.

Set up your own ETL orchestration

The aws-etl-orchestrator GitHub repository provides source code you can customize to set up the ETL orchestration architecture in your AWS account. The following steps show what you need to do to start orchestrating your ETL jobs using the architecture shown in this post:

  1. Model the ETL orchestration workflow in AWS Step Functions
  2. Build your ETL Runners (or use an existing AWS Glue ETL Runner)
  3. Customize AWS CloudFormation templates and create stacks
  4. Invoke the ETL orchestration state machine
  5. Upload sample Sales and Marketing datasets to Amazon S3

Model the ETL orchestration workflow in AWS Step Functions.  Use AWS Step Functions to model the ETL workflow described in this post as a state machine. A state machine in Step Functions consists of a set of states and the transitions between these states. A state machine is defined in Amazon States Language, which is a JSON-based notation. For a few examples of state machine definitions, see Sample Projects.

The following snapshot from the AWS Step Functions console shows our example ETL workflow modeled as a state machine. This workflow is what we provide you in the code sample.

When you start an execution of this state machine, it will branch to run two ETL jobs in parallel: Process Sales Data (PSD) and Process Marketing Data (PMD). But, according to the requirements, both ETL jobs should not start until their respective datasets are uploaded to Amazon S3. Hence, we implement Wait activity tasks before both PSD and PMD. When a dataset file is uploaded to Amazon S3, this triggers an AWS Lambda function that notifies the state machine to exit the Wait states. When both PMD and PSD jobs are successful, the JMSD job runs to produce the final dataset.

Finally, to have this ETL workflow execute once per week, you will need to configure a state machine execution to start once per week using a CloudWatch Event.

Build your ETL Runners (or use an existing AWS Glue ETL Runner)The code sample includes an AWS Glue ETL Runner. For simplicity, we implemented the ETL workflow using only AWS Glue jobs. However, nothing prevents you from using a different ETL technology to implement PMD or PSD jobs. You’ll need to build an ETL Runner for the technology that follows the AWS Glue ETL Runner example.

Customize AWS CloudFormation templates and create stacks. The sample published in the aws-etl-orchestrator repository includes three separate AWS CloudFormation templates. We organized resources into three templates following AWS CloudFormation best practices. The three resource groups are logically distinct and likely to have separate lifecycles and ownerships. Each template has an associated AWS CloudFormation parameters file (“*-params.json” files). Parameters in those files must be customized. The details about the three AWS CloudFormation templates are as follows:

  1. A template responsible for setting up AWS Glue resources.For our example ETL workflow, the sample template creates three AWS Glue jobs: PSD, PMD, and JMSD. The scripts for these jobs are pulled by AWS CloudFormation from an Amazon S3 bucket that you own.
  2. A template where the AWS Step Functions state machine is defined.The state machine definition in Amazon States Language is embedded in a StateMachine resource within the Step Functions template.
  3. A template that sets up resources required by the ETL Runner for AWS Glue.The AWS Glue ETL Runner is a Python script that is written to be run as an AWS Lambda function.

Invoke the ETL orchestration state machine. Finally, it is time to start a new state machine execution in AWS Step Functions. For our ETL example, the AWS CloudFormation template creates a state machine named MarketingAndSalesETLOrchestrator. You can start an execution from the AWS Step Functions console, or through an AWS CLI command. When you start an execution, the state machine will immediately enter Wait for Data states, waiting for datasets to be uploaded to Amazon S3.

Upload sample Sales and Marketing datasets to Amazon S3

Upload datasets provided to the S3 bucket that you specified in the code sample configuration. This uploaded datasets signal the state machine to continue execution.

The state machine may take a while to complete execution. You can monitor progress in the AWS Step Functions console. If the execution is successful, the output shown in the following diagram appears.

Congratulations! You’ve orchestrated the example ETL workflow to a successful completion.

Handling failed ETL jobs

What if a job in the ETL workflow fails? In such a case, there are error-handling strategies available to the ETL workflow developer, from simply notifying an administrator, to fully undoing the effects of the previous jobs through compensating ETL jobs. Detecting and responding to a failed ETL job can be implemented using the AWS Step Functions’ Catch mechanism. For more information, see Handling Error Conditions Using a State Machine. In the sample state machine, errors are handled by a do-nothing Pass state.

Try it out. Stop any of the example ETL workflow’s jobs while executing through the AWS Glue console or the AWS CLI. You’ll notice the state machine transitioning to the ETL Job Failed Fallback state.

Conclusion

In this post, I showed you how to implement your ETL logic as an orchestrated workflow. I presented a serverless solution for ETL orchestration that allows you to control ETL job execution using AWS Step Functions and AWS Lambda.  You can use the concepts and the code described in this post to build arbitrarily complex ETL state machines.

For more information and to download the source code, see the aws-etl-orchestrator GitHub repository. If you have questions about this post, send them our way in the Comments section below.

Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

About the Author

Moataz Anany is a senior solutions architect with AWS. He enjoys partnering with customers to help them leverage AWS and the cloud in creative ways. He dedicates most of his spare time to his wife and little ones. The rest is spent building and breaking things in side projects.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources
  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose..
Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2

There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.

Solution overview

The following diagram illustrates the solution.

In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.

In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.

The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher.  However, if your requirement is to ingest the data in Parquet format on realtime,  we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.

By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.

You can see our approach in the Lambda code following.

from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = re.sub(r'[\W]+', '', key.lower())
        v = obj[key]
        if isinstance(v, dict):
            if len(v) > 0:
                convertColumntoLowwerCaps(v)
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    try:
        client.download_file(bucket, key, '/tmp/file.json')        
        with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
            i = 0
            for line in file: 
                for object in line.replace("}{","}\n{").split("\n"):
                    record = json.loads(object,object_hook=convertColumntoLowwerCaps)
                    if i != 0:
                        output.write("\n")
                    output.write(json.dumps(record))
                    i += 1
        newkey = 'flatfiles/' + key.replace("/", "")                
        client.upload_file('/tmp/out.json', bucket,newkey)
        s3.Object(bucket,key).delete()
        return "success"
    except Exception as e:
        print(e)
        print('Error coping object {} from bucket {}'.format(key, bucket))
        raise e

We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.

We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.

After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.

After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.

Try it out!

You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.

In the console, specify the following parameters:

  • BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
  • etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
  • KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
  • s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
  • sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.

Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.

If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.

Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.

You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.

After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.

When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.

To query the data with Athena, we can select the ctr table, and for Action choose View data.

Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.

When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.

After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.

Then we add a new data set.

We choose Athena as the source and give the data source a name (in this case, I named it connectctr).

Choose the name of the database and the table referencing the Parquet results.

Then choose Visualize.

After that, we should see the following screen.

Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.

We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.

After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.

Conclusion

In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.

You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.

Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift and Visualize AWS Cloudtrail Logs Using AWS Glue and Amazon QuickSight.

About the Authors

Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

The adoption of Apache Spark has increased significantly over the past few years, and running Spark-based application pipelines is the new normal. Spark jobs that are in an ETL (extract, transform, and load) pipeline have different requirements—you must handle dependencies in the jobs, maintain order during executions, and run multiple jobs in parallel. In most of these cases, you can use workflow scheduler tools like Apache Oozie, Apache Airflow, and even Cron to fulfill these requirements.

Apache Oozie is a widely used workflow scheduler system for Hadoop-based jobs. However, its limited UI capabilities, lack of integration with other services, and heavy XML dependency might not be suitable for some users. On the other hand, Apache Airflow comes with a lot of neat features, along with powerful UI and monitoring capabilities and integration with several AWS and third-party services. However, with Airflow, you do need to provision and manage the Airflow server. The Cron utility is a powerful job scheduler. But it doesn’t give you much visibility into the job details, and creating a workflow using Cron jobs can be challenging.

What if you have a simple use case, in which you want to run a few Spark jobs in a specific order, but you don’t want to spend time orchestrating those jobs or maintaining a separate application? You can do that today in a serverless fashion using AWS Step Functions. You can create the entire workflow in AWS Step Functions and interact with Spark on Amazon EMR through Apache Livy.

In this post, I walk you through a list of steps to orchestrate a serverless Spark-based ETL pipeline using AWS Step Functions and Apache Livy.

Input data

For the source data for this post, I use the New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. In this example, we’ll work mainly with the following three columns for the Spark jobs.

Column name Column description
RateCodeID Represents the rate code in effect at the end of the trip (for example, 1 for standard rate, 2 for JFK airport, 3 for Newark airport, and so on).
FareAmount Represents the time-and-distance fare calculated by the meter.
TripDistance Represents the elapsed trip distance in miles reported by the taxi meter.

The trip data is in comma-separated values (CSV) format with the first row as a header. To shorten the Spark execution time, I trimmed the large input data to only 20,000 rows. During the deployment phase, the input file tripdata.csv is stored in Amazon S3 in the <<your-bucket>>/emr-step-functions/input/ folder.

The following image shows a sample of the trip data:

Solution overview

The next few sections describe how Spark jobs are created for this solution, how you can interact with Spark using Apache Livy, and how you can use AWS Step Functions to create orchestrations for these Spark applications.

At a high level, the solution includes the following steps:

  1. Trigger the AWS Step Function state machine by passing the input file path.
  2. The first stage in the state machine triggers an AWS Lambda
  3. The Lambda function interacts with Apache Spark running on Amazon EMR using Apache Livy, and submits a Spark job.
  4. The state machine waits a few seconds before checking the Spark job status.
  5. Based on the job status, the state machine moves to the success or failure state.
  6. Subsequent Spark jobs are submitted using the same approach.
  7. The state machine waits a few seconds for the job to finish.
  8. The job finishes, and the state machine updates with its final status.

Let’s take a look at the Spark application that is used for this solution.

Spark jobs

For this example, I built a Spark jar named spark-taxi.jar. It has two different Spark applications:

  1. MilesPerRateCode – The first job that runs on the Amazon EMR cluster. This job reads the trip data from an input source and computes the total trip distance for each rate code. The output of this job consists of two columns and is stored in Apache Parquet format in the output path.

The following are the expected output columns:

  • rate_code – Represents the rate code for the trip.
  • total_distance – Represents the total trip distance for that rate code (for example, sum(trip_distance)).
  1. RateCodeStatus – The second job that runs on the EMR cluster, but only if the first job finishes successfully. This job depends on two different input sets:
  • csv – The same trip data that is used for the first Spark job.
  • miles-per-rate – The output of the first job.

This job first reads the tripdata.csv file and aggregates the fare_amount by the rate_code. After this point, you have two different datasets, both aggregated by rate_code. Finally, the job uses the rate_code field to join two datasets and output the entire rate code status in a single CSV file.

The output columns are as follows:

  • rate_code_id – Represents the rate code type.
  • total_distance – Derived from first Spark job and represents the total trip distance.
  • total_fare_amount – A new field that is generated during the second Spark application, representing the total fare amount by the rate code type.

Note that in this case, you don’t need to run two different Spark jobs to generate that output. The goal of setting up the jobs in this way is just to create a dependency between the two jobs and use them within AWS Step Functions.

Both Spark applications take one input argument called rootPath. It’s the S3 location where the Spark job is stored along with input and output data. Here is a sample of the final output:

The next section discusses how you can use Apache Livy to interact with Spark applications that are running on Amazon EMR.

Using Apache Livy to interact with Apache Spark

Apache Livy provides a REST interface to interact with Spark running on an EMR cluster. Livy is included in Amazon EMR release version 5.9.0 and later. In this post, I use Livy to submit Spark jobs and retrieve job status. When Amazon EMR is launched with Livy installed, the EMR master node becomes the endpoint for Livy, and it starts listening on port 8998 by default. Livy provides APIs to interact with Spark.

Let’s look at a couple of examples how you can interact with Spark running on Amazon EMR using Livy.

To list active running jobs, you can execute the following from the EMR master node:

curl localhost:8998/sessions

If you want to do the same from a remote instance, just change localhost to the EMR hostname, as in the following (port 8998 must be open to that remote instance through the security group):

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/sessions

To submit a Spark job through Livy pointing to the application jar in S3, you can do the following:

curl -X POST --data '{"file": "s3://<<bucket-location>/spark.jar", "className": "com.example.SparkApp "]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

Spark submit through Livy returns a session ID that starts from 0. Using that session ID, you can retrieve the status of the job as shown following:

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/0 | python -m json.tool

Through Spark submit, you can pass multiple arguments for the Spark job and Spark configuration settings. You can also do that using Livy, by passing the S3 path through the args parameter, as shown following:

curl -X POST --data '{"file": "s3://<<bucket-location>>/spark.jar", "className": "com.example.SparkApp", “args”: [“s3://bucket-path”]}' -H "Content-Type: application/json" http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches

All Apache Livy REST calls return a response as JSON, as shown in the following image:

If you want to pretty-print that JSON response, you can pipe command with Python’s JSON tool as follows:

curl http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:8998/batches/3 | python -m json.tool

For a detailed list of Livy APIs, see the Apache Livy REST API page. This post uses GET /batches and POST /batches.

In the next section, you create a state machine and orchestrate Spark applications using AWS Step Functions.

Using AWS Step Functions to create a Spark job workflow

AWS Step Functions automatically triggers and tracks each step and retries when it encounters errors. So your application executes in order and as expected every time. To create a Spark job workflow using AWS Step Functions, you first create a Lambda state machine using different types of states to create the entire workflow.

First, you use the Task state—a simple state in AWS Step Functions that performs a single unit of work. You also use the Wait state to delay the state machine from continuing for a specified time. Later, you use the Choice state to add branching logic to a state machine.

The following is a quick summary of how to use different states in the state machine to create the Spark ETL pipeline:

  • Task state – Invokes a Lambda function. The first Task state submits the Spark job on Amazon EMR, and the next Task state is used to retrieve the previous Spark job status.
  • Wait state – Pauses the state machine until a job completes execution.
  • Choice state – Each Spark job execution can return a failure, an error, or a success state So, in the state machine, you use the Choice state to create a rule that specifies the next action or step based on the success or failure of the previous step.

Here is one of my Task states, MilesPerRateCode, which simply submits a Spark job:

"MilesPerRate Job": {
      "Type": "Task",
      "Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:blog-miles-per-rate-job-submit-function",
      "ResultPath": "$.jobId",
      "Next": "Wait for MilesPerRate job to complete"
 }

This Task state configuration specifies the Lambda function to execute. Inside the Lambda function, it submits a Spark job through Livy using Livy’s POST API. Using ResultPath, it tells the state machine where to place the result of the executing task. As discussed in the previous section, Spark submit returns the session ID, which is captured with $.jobId and used in a later state.

The following code section shows the Lambda function, which is used to submit the MilesPerRateCode job. It uses the Python request library to submit a POST against the Livy endpoint hosted on Amazon EMR and passes the required parameters in JSON format through payload. It then parses the response, grabs id from the response, and returns it. The Next field tells the state machine which state to go to next.

from botocore.vendored import requests
import json

def lambda_handler(event, context):
  headers = { "content-type": "application/json" }
  url = 'http://xxxxxx.compute-1.amazonaws.com:8998/batches'
  payload = {
    'file' : 's3://<<s3-location>>/emr-step-functions/spark-taxi.jar',
    'className' : 'com.example.MilesPerRateCode',
    'args' : [event.get('rootPath')]
  }
  res = requests.post(url, data = json.dumps(payload), headers = headers, verify = False)
  json_data = json.loads(res.text)
  return json_data.get('id')

Just like in the MilesPerRate job, another state submits the RateCodeStatus job, but it executes only when all previous jobs have completed successfully.

Here is the Task state in the state machine that checks the Spark job status:

"Query RateCodeStatus job status": {
   "Type": "Task",
   "Resource": "arn:aws:lambda:us-east-1:xxxxx:function:blog-spark-job-status-function",
   "Next": "RateCodeStatus job complete?",
   "ResultPath": "$.jobStatus"
}

Just like other states, the preceding Task executes a Lambda function, captures the result (represented by jobStatus), and passes it to the next state. The following is the Lambda function that checks the Spark job status based on a given session ID:

from botocore.vendored import requests
import json

def lambda_handler(event, context):
    jobid = event.get('jobId')
    url     = 'http://xxxx.compute-1.amazonaws.com:8998/batches/' + str(jobid)
    res = requests.get(url)
    json_data = json.loads(res.text)
    return json_data.get('state')

In the Choice state, it checks the Spark job status value, compares it with a predefined state status, and transitions the state based on the result. For example, if the status is success, move to the next state (RateCodeJobStatus job), and if it is dead, move to the MilesPerRate job failed state.

"MilesPerRate job complete?": {
   "Type": "Choice",
   "Choices": [{
       "Variable": "$.jobStatus",
       "StringEquals": "success",
       "Next": "RateCodeStatus Job"
   },{
       "Variable": "$.jobStatus",
       "StringEquals": "dead",
       "Next": "MilesPerRate job failed"
   }],
   "Default": "Wait for MilesPerRate job to complete"
}
Walkthrough using AWS CloudFormation

To set up this entire solution, you need to create a few AWS resources. To make it easier, I have created an AWS CloudFormation template. This template creates all the required AWS resources and configures all the resources that are needed to create a Spark-based ETL pipeline on AWS Step Functions.

This CloudFormation template requires you to pass the following four parameters during initiation.

Parameter Description
ClusterSubnetID The subnet where the Amazon EMR cluster is deployed and Lambda is configured to talk to this subnet.
KeyName The name of the existing EC2 key pair to access the Amazon EMR cluster.
VPCID The ID of the virtual private cloud (VPC) where the EMR cluster is deployed and Lambda is configured to talk to this VPC.
S3RootPath The Amazon S3 path where all required files (input file, Spark job, and so on) are stored and the resulting data is written.

IMPORTANT: These templates are designed only to show how you can create a Spark-based ETL pipeline on AWS Step Functions using Apache Livy. They are not intended for production use without modification. And if you try this solution outside of the us-east-1 Region, download the necessary files from s3://aws-data-analytics-blog/emr-step-functions, upload the files to the buckets in your Region, edit the script as appropriate, and then run it.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates the following list of AWS resources.

Logical ID Resource Type Description
StepFunctionsStateExecutionRole IAM role IAM role to execute the state machine and have a trust relationship with the states service.
SparkETLStateMachine AWS Step Functions state machine State machine in AWS Step Functions for the Spark ETL workflow.
LambdaSecurityGroup Amazon EC2 security group Security group that is used for the Lambda function to call the Livy API.

RateCodeStatusJobSubmitFunction

AWS Lambda function Lambda function to submit the RateCodeStatus job.
MilesPerRateJobSubmitFunction AWS Lambda function Lambda function to submit the MilesPerRate job.
SparkJobStatusFunction AWS Lambda function Lambda function to check the Spark job status.
LambdaStateMachineRole IAM role IAM role for all Lambda functions to use the lambda trust relationship.
EMRCluster Amazon EMR cluster EMR cluster where Livy is running and where the job is placed.

During the AWS CloudFormation deployment phase, it sets up S3 paths for input and output. Input files are stored in the <<s3-root-path>>/emr-step-functions/input/ path, whereas spark-taxi.jar is copied under <<s3-root-path>>/emr-step-functions/.

The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed a bucket that I created in the AWS account s3://tm-app-demos for the S3 root path.

If the CloudFormation template completed successfully, you will see Spark-ETL-State-Machine in the AWS Step Functions dashboard, as follows:

Choose the Spark-ETL-State-Machine state machine to take a look at this implementation. The AWS CloudFormation template built the entire state machine along with its dependent Lambda functions, which are now ready to be executed.

On the dashboard, choose the newly created state machine, and then choose New execution to initiate the state machine. It asks you to pass input in JSON format. This input goes to the first state MilesPerRate Job, which eventually executes the Lambda function blog-miles-per-rate-job-submit-function.

Pass the S3 root path as input:

{

“rootPath”: “s3://tm-app-demos”

}

Then choose Start Execution:

The rootPath value is the same value that was passed when creating the CloudFormation stack. It can be an S3 bucket location or a bucket with prefixes, but it should be the same value that is used for AWS CloudFormation. This value tells the state machine where it can find the Spark jar and input file, and where it will write output files. After the state machine starts, each state/task is executed based on its definition in the state machine.

At a high level, the following represents the flow of events:

  1. Execute the first Spark job, MilesPerRate.
  2. The Spark job reads the input file from the location <<rootPath>>/emr-step-functions/input/tripdata.csv. If the job finishes successfully, it writes the output data to <<rootPath>>/emr-step-functions/miles-per-rate.
  3. If the Spark job fails, it transitions to the error state MilesPerRate job failed, and the state machine stops. If the Spark job finishes successfully, it transitions to the RateCodeStatus Job state, and the second Spark job is executed.
  4. If the second Spark job fails, it transitions to the error state RateCodeStatus job failed, and the state machine stops with the Failed status.
  5. If this Spark job completes successfully, it writes the final output data to the <<rootPath>>/emr-step-functions/rate-code-status/ It also transitions the RateCodeStatus job finished state, and the state machine ends its execution with the Success status.

This following screenshot shows a successfully completed Spark ETL state machine:

The right side of the state machine diagram shows the details of individual states with their input and output.

When you execute the state machine for the second time, it fails because the S3 path already exists. The state machine turns red and stops at MilePerRate job failed. The following image represents that failed execution of the state machine:

You can also..

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Regardless of your career path, there’s no denying that attending industry events can provide helpful career development opportunities — not only for improving and expanding your skill sets, but for networking as well. According to this article from PayScale.com, experts estimate that somewhere between 70-85% of new positions are landed through networking.

Narrowing our focus to networking opportunities with cloud computing professionals who’re working on tackling some of today’s most innovative and exciting big data solutions, attending big data-focused sessions at an AWS Global Summit is a great place to start.

AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. As the name suggests, these summits are held in major cities around the world, and attract technologists from all industries and skill levels who’re interested in hearing from AWS leaders, experts, partners, and customers.

In addition to networking opportunities with top cloud technology providers, consultants and your peers in our Partner and Solutions Expo, you’ll also hone your AWS skills by attending and participating in a multitude of education and training opportunities.

Here’s a brief sampling of some of the upcoming sessions relevant to big data professionals:

May 31st : Big Data Architectural Patterns and Best Practices on AWS | AWS Summit – Mexico City

June 6th-7th: Various (click on the “Big Data & Analytics” header) | AWS Summit – Berlin

June 20-21st : BigData@Scale | Public Sector Summit – Washington DC

June 21st: Enabling Self Service for Data Scientists with AWS Service Catalog | AWS Summit – Sao Paulo

Be sure to check out the main page for AWS Global Summits, where you can see which cities have AWS Summits planned for 2018, register to attend an upcoming event, or provide your information to be notified when registration opens for a future event.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS

About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

Read Full Article

Read for later

Articles marked as Favorite are saved for later viewing.
close
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview