Loading...

Follow AWS Big Data Blog on Feedspot

Continue with Google
Continue with Facebook
or

Valid

3M Health Information Systems (HIS), a business of 3M Health Care, works with providers, payers, and government agencies to anticipate and navigate a changing healthcare landscape. 3M provides healthcare performance measurement and management solutions, analysis and strategic services that help clients move from volume to value-based health care, resulting in millions of dollars in savings, improved provider performance and higher quality care. 3M’s innovative software is designed to raise the bar for computer-assisted coding, clinical documentation improvement, performance monitoring, quality outcomes reporting, and terminology management.

There was an ongoing initiative at 3M HIS to migrate applications installed on-premises or on other cloud hosting providers to Amazon Web Services (AWS). 3M HIS began migration to AWS to take advantage of compute, storage and network elasticity. We wanted to build on a solid foundation that would help us focus more of our efforts on delivering customer value while also scaling to support business growth that we expected over the next several years. 3M HIS was already processing healthcare data for many customers which is complex in nature, requiring a lot of complex transformations to get data into a format useful for analytics or machine learning.

After reviewing many solutions, 3M HIS chose Amazon Redshift as the appropriate data warehouse solution. We concluded Amazon Redshift met our needs; a fast, fully managed, petabyte-scale data warehouse solution that uses columnar storage to minimize I/O, provides high data compression rates, and offers fast performance. We quickly spun up a cluster in our development environment, built out the dimensional model, loaded data, and made it available to perform benchmarking and testing of the user data. An extract, transform, load (ETL) tool was used to process and load the data from various sources into Amazon Redshift.

3M legacy implementation

3M HIS processes a large volume of data through this data warehouse. We ingest healthcare data from clients, representing millions of procedure codes, diagnosis codes and all the associated meta data for each of those codes. The legacy process loaded this data into the data warehouse once every two weeks.

For reporting, we published 25 static reports and 6 static HTML reports for over 1000 customers every week. To provide business intelligence reports, we built analytical cubes on a legacy relational database and provided reports from those cubes.

With the ever-increasing amounts of data to be processed meeting the SLA’s was a challenge. It was time to replace our SQL database with a modern architecture and tooling that would auto scale based on the data to be processed and be performant.

How 3M modernized the data warehouse with Amazon Redshift

When choosing a new solution, first we had to ensure that we were able to load data in near real-time. Second, we had to ensure that the solution was scalable, because the amount of data stored in the data warehouse would be 10x larger as compared to the existing solution. Third, the solution needed to be able to provide reports for massive queries in reasonable amount of time without impacting the ETL processing that would be running 24/7. Last, we needed a cost-effective data warehouse that could integrate with other analytics services that were part of the overall solution.

We evaluated data warehouses such as Amazon Redshift and Snowflake. We chose Amazon Redshift because it fulfilled the preceding criteria and aligned with our preference for native AWS managed services. But also, we did so because Amazon Redshift would be a solution for the future that could keep pace with the growth of the business in an economically sustainable way.

To build the reporting tool, we migrated our multi-terabyte data warehouse to Amazon Redshift. The data was processed through the ETL workflow into an Amazon S3 bucket and then bulk-copied into Amazon Redshift. The GitHub repository provided by AWS, a collection of scripts and utilities, helped us set up the cluster and get the best performance possible from Amazon Redshift.

Top lessons that we learned during the implementation

During the initial development, we encountered challenges loading the data into the Amazon Redshift tables because we were trying to load the data from an Amazon RDS staging instance into Amazon Redshift. After some research, we identified that a bulk load from Amazon S3 bucket was the best practice to get large amounts of data loaded into the Amazon Redshift tables.

The second challenge was that the Amazon Redshift VACUUM and ANALYZE operations were holding up our ETL pipeline, because these operations had been baked into the ETL process. We performed frequent data loads into the Amazon Redshift tables and a lot of DELETE operations as part of the ETL processing. These two concerns meant that the VACUUM and ANALYZE operations had to be performed frequently, resulting in the tables being locked for the operations’ duration and conflicting with the ETL process. Triggering the process after all the loads were complete helped eliminate the performance issues we were encountering. VACUUM and ANALYZE have recently been automated, which we expect to prevent such issues arising in the future.

The final challenge was to find a way to use windowing functions which previously resided in the Analysis Services cube layer, whose functionality Amazon Redshift now fulfilled. However, most of the windowing functions that we need are built into Amazon Redshift allowing for an easy transition to port the existing functionality to Amazon Redshift and provide the same results.

During the port, we used Amazon Redshift’s comprehensive best practices guide and tuning techniques. We found these helped us set up the Amazon Redshift cluster for optimal performance.

Flow diagram of the new implementation

Benefits of the updated implementation

With the legacy solution, our implementation had grown complex, we found it difficult to support the growing volume of new data we needed to incorporate into the database and then report on in near real-time. Reports executing on the data were slowly drifting away from the initial SLA requirements. With Amazon Redshift, we’re working to solve those problems with less need for manual maintenance and care and feeding of the solution. First, it has the potential to allow us to store a larger quantity of data for a longer time. Second, adding nodes to the cluster when needed is simple and we can do it in minutes with the Elastic Resize feature. Likewise, we can scale back nodes when cost sensitivity is an issue. Third, Amazon Redshift also gives better support for computing analytics over large sets of grouped data than our previous solution. Often, we want to look at how recent data compares to historical data. In some cases, we want more than a year or two of historical data to rule out seasonality and we have found that Amazon Redshift is a more scalable solution for this type of operation.

Conclusion

At 3M HIS, we’re transforming health care from a system that treats disease to one that improves health and wellness with accurate health and clinical information from the start. 3M’s nearly 40 years of clinical knowledge, from data management and nosology to reimbursement and risk adjustment, opens the door for our providers and payers clients to find innovative solutions that improve outcomes across the continuum of care. We help our clients ensure accurate and compliant reimbursement, as well as leverage 3M’s analytical capabilities, powered by AWS, to improve health system and health plan performance while lowering costs.

About the Author

Dhanraj Shriyan is an Enterprise Data Architect at 3M Health Information Systems with a Masters in Predictive Analytics from Northwestern University, Chicago. He loves helping customers in exploring their data and providing valuable insights and implementing a scalable solution using the right database technology based on their needs. He has several years of experience in building large scale datawarehouse Business Intelligence solutions in cloud as well as on prem. Currently, exploring graph technologies and Lake formation services in AWS.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Businesses worldwide are discovering the power of new big data processing and analytics frameworks like Apache Hadoop and Apache Spark, but they are also discovering some of the challenges of operating these technologies in on-premises data lake environments. They may also have concerns about the future of their current distribution vendor.

To address this, we’ve introduced the Amazon EMR Migration Guide (first published June 2019.) This paper is a comprehensive guide to offer sound technical advice to help customers in planning how to move from on-premises big data deployments to EMR.

Common problems of on-premises big data environments include a lack of agility, excessive costs, and administrative headaches, as IT organizations wrestle with the effort of provisioning resources, handling uneven workloads at large scale, and keeping up with the pace of rapidly changing, community-driven, open-source software innovation. Many big data initiatives suffer from the delay and burden of evaluating, selecting, purchasing, receiving, deploying, integrating, provisioning, patching, maintaining, upgrading, and supporting the underlying hardware and software infrastructure.

A subtler, if equally critical, problem is the way companies’ data center deployments of Apache Hadoop and Apache Spark directly tie together the compute and storage resources in the same servers, creating an inflexible model where they must scale in lock step. This means that almost any on-premises environment pays for high amounts of under-used disk capacity, processing power, or system memory, as each workload has different requirements for these components. Typical workloads run on different types of clusters, at differing frequencies and times of day. These big data workloads should be freed to run whenever and however is most efficient, while still accessing the same shared underlying storage or data lake. See Figure 1. below for an illustration.

How can smart businesses find success with their big data initiatives? Migrating big data (and machine learning) to the cloud offers many advantages. Cloud infrastructure service providers, such as AWS offer a broad choice of on-demand and elastic compute resources, resilient and inexpensive persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. Data engineers, developers, data scientists, and IT personnel can focus their efforts on preparing data and extracting valuable insights.

Services like Amazon EMR, AWS Glue, and Amazon S3 enable you to decouple and scale your compute and storage independently, while providing an integrated, well-managed, highly resilient environment, immediately reducing so many of the problems of on-premises approaches. This approach leads to faster, more agile, easier to use, and more cost-efficient big data and data lake initiatives.

However, the conventional wisdom of traditional on-premises Apache Hadoop and Apache Spark isn’t always the best strategy in cloud-based deployments. A simple lift and shift approach to running cluster nodes in the cloud is conceptually easy but suboptimal in practice. Different design decisions go a long way towards maximizing your gains as you migrate big data to a cloud architecture.

This guide provides the best practices for:

  • Migrating data, applications, and catalogs
  • Using persistent and transient resources
  • Configuring security policies, access controls, and audit logs
  • Estimating and minimizing costs, while maximizing value
  • Leveraging the AWS Cloud for high availability (HA) and disaster recovery (DR)
  • Automating common administrative tasks

Although not intended as a replacement for professional services, this guide covers a wide range of common questions, and scenarios as you migrate your big data and data lake initiatives to the cloud.

When starting your journey for migrating your big data platform to the cloud, you must first decide how to approach migration. One approach is to re-architect your platform to maximize the benefits of the cloud. The other approach is known as lift and shift, is to take your existing architecture and complete a straight migration to the cloud. A final option is a hybrid approach, where you blend a lift and shift with re-architecture. This decision is not straightforward as there are advantages and disadvantages of both approaches.

A lift and shift approach is usually simpler with less ambiguity and risk. Additionally, this approach is better when you are working against tight deadlines, such as when your lease is expiring for a data center. However, the disadvantage to a lift and shift is that it is not always the most cost effective, and the existing architecture may not readily map to a solution in the cloud.

A re-architecture unlocks many advantages, including optimization of costs and efficiencies. With re-architecture, you move to the latest and greatest software, have better integration with native cloud tools, and lower operational burden by leveraging native cloud products and services.

This paper provides advantages and disadvantages of each migration approach from the perspective of the Apache Spark and Hadoop ecosystems. To read the paper, download the Amazon EMR Migration Guide now.

For a more general resource on deciding which approach is ideal for your workflow, see An E-Book of Cloud Best Practices for Your Enterprise, which outlines the best practices for performing migrations to the cloud at a higher level.

About the Author

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Today, we are pleased to announce a set of updates to Amazon QuickSight:

  • Richer dashboards with multiple sheets in your regular and embedded dashboards
  • Multiple axis label orientation options for better readability of dashboards
  • More calculations such as standard deviation, variance and conditional string functions on SPICE
  • Enhanced URL actions for supporting a broader set of interaction scenarios
  • One-click duplication of visuals for faster authoring

Multiple sheets in Dashboards

First, let’s look at the update to dashboards that allows multiple sheets, accessed via a new tab control. This allows better organization of information within the dashboard, where visuals related to specific subject areas or topics can be organized in separate sheets and distinctly identified through the tab name. As a reader, you can then navigate to a single dashboard to get a comprehensive view of all the insights related to a topic.

Each sheet within a dashboard has a distinct URL, which allows you to bookmark the specific sheet, or share a link to the specific sheet with others. Additionally, dashboards can also be setup to filter content at a dashboard level, or at a per-sheet level, with the filters being passed either via on-screen controls or using the URL.

Filtering through on-screen controls

Navigating to another dashboard and passing filters via URL

If you are authoring a dashboard, you will find that every dashboard now starts off as a single sheet. You can use the “add sheet” icon in the tabs section to add a new sheet to your dashboard. Note that the tab control only shows up on a dashboard if there is more than one sheet. When authoring, you will also be able to rename a sheet, change the order of sheets or delete a sheet. The first sheet will always be the one presented to your readers by default, so this would be where you want to provide a summary of the dashboard with KPIs, ML-Insights, or high level trends. Email reports will send the first sheet of the dashboard to all your viewers; we are working on additional rendering options for email reports.

Previously, on the Big Data Blog, we covered ways to add parameters and on-screen controls to dashboards. Parameter values are global across a dashboard, which allow you to maintain user inputs and filter content according to user selections across the dashboard. On-screen controls are specific to each sheet, since the context of these controls is often confined to the sheet itself. If you want to discreetly filter data based on user or group-level attributes, you may also use row-level security to do so.

Multi-sheet dashboards can be shared through the regular website, or can be embedded in your custom application. To embed a dashboard in your application, visit Embed interactive dashboards in your application with Amazon QuickSight.

Axis label orientation

When authoring a dashboard, you will see that Amazon QuickSight now automatically selects a horizontal or angled layout to provide better readability. You can also manually navigate to a visual’s options menu and choose “Format visual”, and then choose the axis label orientation that is most suited to your dashboard.

Duplicate visual option

When authoring a dashboard, there are often situations where you want to quickly clone a visual and make changes. With the new “Duplicate visual” option, Amazon QuickSight allows you to achieve this via a one-click action from the visual’s options menu. This new option creates a copy of the visual, along with its filters and other settings such as visual size and format, within the same sheet or a different sheet within the dashboard, thus removing the overhead of creating visuals from scratch each time.

URL action open options

URL actions allow you to configure actions outside of QuickSight that can be triggered by a user viewing a QuickSight dashboard. With today’s release we will allow you to choose how these URLs are opened, with options of overriding the viewer’s QuickSight browser tab, loading in a new tab, or a new browser window altogether.

In addition to these features that improve author and reader experience, QuickSight now supports additional statistical capabilities through new functions in calculated fields. These statistical functions include standard deviation and variance calculations and conditional functions on string fields. With standard deviation and variance, you can now determine dispersion of a metric relative to the mean. You can do this across your entire data set or partition by a select dimension. For more information, see Functions by Category in the Amazon QuickSight User Guide.

With conditional string functions on SPICE, you can now use coalesce, ifelse, isNotNull, isNull, nullif while building reports on SPICE datasets. For more information, see Calculated Field Function and Operator Reference for Amazon QuickSight in the Amazon QuickSight user guide.

All of these new features are now available in both Standard and Enterprise Editions in all supported Amazon QuickSight regions – US East (N. Virginia and Ohio), US West (Oregon), EU (Ireland), and Asia Pacific (Singapore, Sydney and Tokyo).

About the Author

Sahitya Pandiri is a technical program manager with Amazon Web Services. Sahitya has been in the product/program management for 5 years now, and has built multiple products in the retail, healthcare and analytics spaces. She enjoys problem solving, and leveraging technology to simplify processes.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.
Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameter Provide this
StackName Provide the stack name.

ClientIP

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamName The name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceType The EC2 instance type.
KeyName The name of an existing EC2 key pair to enable access to login.
KinesisStreamName The name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
Region AWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

A name for the EMR cluster.
S3BucketName The name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameter Do this
StackName Provide the stack name.
S3BucketName Provide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIp Provide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

Key Value
StackName Name
VPCID Vpc-xxxxxxx
SubnetID subnet-xxxxxxxx
SecurityGroup sg-xxxxxxxxxx
S3BucketDomain <S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARN arn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameter Do this
StackName Provide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

Key Value
LambdaRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameter Do this
StackName Provide the stack name.
FirehoseDeliveryStreamName Provide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
Role Provide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARN Select the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameter Do this
StackName Provide the stack name.
KinesisStreamName Provide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
Role Provide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3Bucket Provide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
Region Select the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameter Do this
EMRClusterName Provide the name for the EMR cluster.
ClusterSecurityGroup Select the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetID Select the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDR Provide the IP address range of the client that is allowed to connect to the cluster.
KeyName Provide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

Key Value
EMRClusterMaster ssh hadoop@ec2-XX-XXX-XXX-XXX.us-east-1.compute.amazonaws.com -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view..

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In typical AWS fashion, not a week had gone by after I published How Goodreads offloads Amazon DynamoDB tables to Amazon S3 and queries them using Amazon Athena on the AWS Big Data blog when the AWS Glue team released the ability for AWS Glue crawlers and AWS Glue ETL jobs to read from DynamoDB tables natively. I was actually pretty excited about this. Less code means fewer bugs. The original architecture had been around for at least 18 months and could be simplified significantly with a little bit of work.

Refactoring the data pipeline

The AWS Data Pipeline architecture outlined in my previous blog post is just under two years old now. We had used data pipelines as a way to back up Amazon DynamoDB data to Amazon S3 in case of a catastrophic developer error. However, with DynamoDB point-in-time recovery we have a better, native mechanism for disaster recovery. Additionally, with data pipelines we still own the operations associated with the clusters themselves, even if they are transient. A common challenge is keeping our clusters up with recent releases of Amazon EMR to help mitigate any outstanding bugs. Another is the inefficiency of needing to spin up an EMR cluster for each DynamoDB table.

I decided to take a step back and list the capabilities I wanted to have in the next iteration:

  • Export tables using AWS Glue instead of EMR.
    • AWS Glue provides a serverless ETL environment where I don’t have to worry about the underlying infrastructure. This minimizes operational tasks like keeping up with the EMR release tags.
  • Use a workflow solution that works across services like AWS Glue and Amazon Athena.
    • In the first iteration, the workflow was spread across various services. Unless you had the entire pipeline in your head, it was difficult to get a bird’s-eye view of how the pipeline was progressing.
  • Ability to select different formats.
    • For data engineering, I prefer Apache Parquet. However, customers might prefer a different format.
  • Add exported data to Athena.
    • I find that the easier it is for the data to be queried, the more likely it’s used.
Architecture overview

At a high level, this is the architecture:

  • We’re using AWS Step Functions as the workflow engine.
    • Each step is either a built-in Step Functions state, a service integration, or a simple Python AWS Lambda For example, GlueStartJobRun is using the synchronous job run service integration, as discussed in the documentation.
    • We get a visual representation of the entire pipeline.
    • It’s quick to onboard new developers.
  • An event in Amazon CloudWatch Events, which is disabled to start, triggers a Step Functions state machine with a JSON payload that contains the following:
    • AWS Glue job name
    • Export destination
    • DynamoDB table name
    • Desired read percentage
    • AWS Glue crawler name
  • AWS Glue exports a DynamoDB table in your preferred format to S3 as snapshots_your_table_name. The data is partitioned by the snapshot_timestamp
  • An AWS Glue crawler adds or updates your data’s schema and partitions in the AWS Glue Data Catalog.
  • Finally, we create an Athena view that only has data from the latest export snapshot.
A simple AWS Glue ETL job

The script that I created accepts AWS Glue ETL job arguments for the table name, read throughput, output, and format. Behind the scenes, AWS Glue scans the DynamoDB table. AWS Glue makes sure that every top-level attribute makes it into the schema, no matter how sparse your attributes are (as discussed in the DynamoDB documentation).

Here’s the script:

import sys
import datetime
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

ARG_TABLE_NAME = "table_name"
ARG_READ_PERCENT = "read_percentage"
ARG_OUTPUT = "output_prefix"
ARG_FORMAT = "output_format"

PARTITION = "snapshot_timestamp"

args = getResolvedOptions(sys.argv,
  [
    'JOB_NAME',
    ARG_TABLE_NAME,
    ARG_READ_PERCENT,
    ARG_OUTPUT,
    ARG_FORMAT
  ]
)

table_name = args[ARG_TABLE_NAME]
read = args[ARG_READ_PERCENT]
output_prefix = args[ARG_OUTPUT]
fmt = args[ARG_FORMAT]

print("Table name:", table_name)
print("Read percentage:", read)
print("Output prefix:", output_prefix)
print("Format:", fmt)

date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M')
output = "%s/%s=%s" % (output_prefix, PARTITION, date_str)

sc = SparkContext()
glueContext = GlueContext(sc)

table = glueContext.create_dynamic_frame.from_options(
  "dynamodb",
  connection_options={
    "dynamodb.input.tableName": table_name,
    "dynamodb.throughput.read.percent": read
  }
)

glueContext.write_dynamic_frame.from_options(
  frame=table,
  connection_type="s3",
  connection_options={
    "path": output
  },
  format=fmt,
  transformation_ctx="datasink"
)

There’s not a lot here. We’re creating a DynamicFrameReader of connection type dynamodb and passing in the table name and desired maximum read throughput consumption. We pass that data frame to a DynamicFrameWriter that writes the table to S3 in the specified format.

Athena views

Most teams at Amazon own applications that have multiple DynamoDB tables, including my own team. Our current application uses five primary tables. Ideally, at the end of an export workflow you can write simple, obvious queries across a consistent view of your tables. However, each exported table is partitioned by the timestamp from when the table was exported. This makes querying across one or more tables very cumbersome, because you have to add a WHERE snapshot_timestamp = clause to every table reference in your query. Additionally, each table might have a different snapshot_timestamp value for any given day!

The final step in this export workflow creates an Athena view that adds that WHERE clause for you. This means that you can interact with your DynamoDB exports as if they were one sane view of your exported DynamoDB tables.

Setting up the infrastructure

The AWS CloudFormation stacks I create are split into two stacks. The common stack contains shared infrastructure, and you need only one of these per AWS Region. The table stacks are designed in such a way that you can create one per table-format combination in any given AWS Region. It contains the CloudWatch event logic and AWS Glue components needed to export and transform DynamoDB tables.

Creating the common stack

The common stack contains the majority of the infrastructure. That includes the Step Functions state machine and Lambda functions to trigger and check the state of asynchronous jobs. It also includes IAM roles that the export stacks use, and the S3 bucket to store the exports.

To create the common stack, do the following:

  1. Choose this Launch Stack
  2. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.
Creating the table export stack

If you don’t have a DynamoDB table to export, follow the original blog post. Start with the Working with the Reviews stack section and continue until you’ve added the two Items to the table. Otherwise, feel free to point this CloudFormation stack at your favorite DynamoDB table that is using provisioned throughput. Tables that use on-demand throughput are not currently supported.

Because so much of this architecture is shareable, there’s not much in the table export stack. This stack defines the CloudWatch event used to trigger the Step Functions state machine with a JSON payload containing all the necessary metadata. Additionally, it contains the AWS Glue ETL job that exports the table and the AWS Glue Crawler that updates metadata in the AWS Glue Data Catalog.

Technically, you can define the AWS Glue ETL job in the common stack because it’s already parameterized. However, the default limit for concurrent runs for an AWS Glue job is three. This is a soft limit, but with this architecture you have headroom to export up to 25 tables before asking for a limit increase.

To create the table export stack, do the following:

  1. Choose this Launch Stack
  2. Choose an output format from the list. All the available formats are supported by Athena natively.
  3. Enter your DynamoDB table name.
  4. Enter the percentage of Read Capacity Units (RCUs) that the job should consume from your table’s currently provisioned throughput. This percentage is expressed as a float between 0.1 and 1.0 inclusive. The default is 0.25 (25 percent).

As an example: Suppose that your table’s RCUs are set to 100 and you use the default 0.25, 25 percent. Then the AWS Glue job consume 25 RCUs while running.

  1. Choose Create.
Kicking off a state machine execution

To demonstrate how this works, we run the DynamoDB export state machine manually by passing it the JSON payload that the CloudWatch event would pass to Step Functions.

Getting the JSON payload from CloudWatch Events

To get the JSON payload, do the following:

  1. Open CloudWatch in the AWS Management Console.
  2. In the left column under Events, choose Rules.
  3. Choose your rule from the list. It is prefixed by AWSBigDataBlog-.
  4. For Actions, choose Edit.
  5. Copy the JSON payload from the Configure input section of Targets.
  6. Choose Cancel to exit edit mode.
Starting a state machine execution

To start an execution of the state machine, take the following steps:

  1. Open Step Functions in the console.
  2. Choose the DynamoDBExportAndAthenaLoad state machine.
  3. Choose Start execution.
  4. Paste the JSON payload into the Input
  5. Choose Start execution.

There are a few ways to follow along with the execution. As steps are entered and exited, entries are added to the Execution event history list. This is a great way to see what state (event in Lambda speak) is passed to each step, in case you need to debug.

You can also expand the Visual workflow. It’s a great high-level view to see how the workflow is progressing.

After the workflow is finished, you see two new tables under the dynamodb_exports database in your AWS Glue Data Catalog. Your DynamoDB snapshots table name is prefixed with snapshots_. The schema is formatted for the AWS Glue Data Catalog (lowercase and hyphens transformed to underscores). You also have a view table with the same table name formatted for AWS Glue Data Catalog but without the snapshots_ prefix.

Querying your data

To showcase how having a separate view table of the most recent snapshot of a table is useful, I use the Reviews table from the previous blog post. The table has two items. I have also run the export workflow twice. As you can see when you preview the table, there are four items total. That’s because each snapshot contains two items.

From the items, the latest snapshot_timestamp is 2019-01-11T23:26. When I run the same preview query against the view table reviews, we see that there are only two items, which is what we expect. The view takes care of specifying the where snapshot_timestamp=… clause so you don’t have to.

Wrapping up

In this post, I showed you how to use AWS Glue’s DynamoDB integration and AWS Step Functions to create a workflow to export your DynamoDB tables to S3 in Parquet. I also show how to create an Athena view for each table’s latest snapshot, giving you a consistent view of your DynamoDB table exports.

About the Author

Joe Feeney is a Software Engineer at Amazon Go, where he does secret stuff and he’s quite chuffed with that. He enjoys embarrassing his family by taking Mario Kart entirely too seriously.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.
Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Many organizations use Amazon Simple Storage Service (Amazon S3) as a primary storage destination for a wide variety of logs including AWS service logs. One of the benefits of storing log data in Amazon S3 is that you can access it in any number of ways. One popular option is to query it using Amazon Athena, a serverless query engine for data on S3. Common use cases for querying logs are service and application troubleshooting, performance analysis, and security audits. To get the best performance and reduce query costs in Athena, we recommend following common best practices, as outlined in Top 10 Performance Tuning Tips for Amazon Athena on the AWS Big Data Blog. These best practices include converting the data to a columnar format like Apache Parquet and partitioning the resulting data in S3.

In this post, we’re open-sourcing a Python library known as Athena Glue Service Logs (AGSlogger). This library has predefined templates for parsing and optimizing the most popular log formats. The library provides a mechanism for defining schemas, managing partitions, and transforming data within an extract, transform, load (ETL) job in AWS Glue. AWS Glue is a serverless data transformation and cataloging service. You can use this library in conjunction with AWS Glue ETL jobs to enable a common framework for processing log data.

Using Python libraries with AWS Glue ETL

One of the features of AWS Glue ETL is the ability to import Python libraries into a job (as described in the documentation). We take advantage of this feature in our approach. With this capability, you first provide a link to a .zip file in Amazon S3 containing selected Python modules to AWS Glue. Then AWS Glue imports them at runtime.

We want our AWS Glue jobs to be as simple as possible while enabling the ability to easily roll out new versions of the library. To accomplish this, all of the setup, configuration, and transformation logic is contained in the library and AWS Glue simply executes the job. As new log formats are added or updated in the library, a new version of the .zip file can be deployed to S3. It’s then automatically imported by the relevant AWS Glue job. Here is an example ETL script:

from athena_glue_service_logs.job import JobRunner
 
job_run = JobRunner(service_name='s3_access')
job_run.convert_and_partition()
About the AGSlogger library

The library is available on GitHub in the athena-glue-service-logs repository. It’s designed to do an initial conversion of AWS Service logs and also perform ongoing conversion as new logs are delivered to S3. The following log types are supported:

  • Application Load Balancer
  • Classic Load Balancer
  • AWS CloudTrail
  • Amazon CloudFront
  • S3 Access
  • Amazon VPC Flow

To convert additional logs, update the service_name variable in the script, and also the different job parameters that point to your desired table names and Amazon S3 locations.

There are some limitations of the script:

  • The script has not been tested with large volumes of log data (greater than 100 GiB).
  • If you have a large number of log files, you might need to increase your Apache Spark executor settings. Edit the AWS Glue job and add the following job parameter:

key: --conf
value: spark.yarn.executor.memoryOverhead=1G

  • If you do not have any recent logs (less than 30 days old) for certain log types like S3 Access, the script may not be able to properly populate the optimized table.
  • Several CloudTrail fields such as requestParameters and responseElements are left as JSON strings – you can use Athena to extract data from this JSON at the time of query.
Before you begin

There are a few prerequisites before you get started:

  1. Create an IAM role to use with AWS Glue. For more information, see Create an IAM Role for AWS Glue in the AWS Glue documentation.
  2. Ensure that you have access to Athena from your account.
  3. We use Amazon S3 server access logs as our example for this script, so enable access logging on an Amazon S3 bucket. For more information, see How to Enable Server Access Logging in the
    S3 documentation.
  4. Download and store the Python library in an Amazon S3 bucket in the same AWS Region in which you run the AWS Glue ETL job. Download the latest release from https://github.com/awslabs/athena-glue-service-logs/releases. Then, copy the .zip file to your Amazon S3 bucket, as follows:

aws s3 cp athena_glue_converter_v5.3.0.zip s3://<bucket>/glue_scripts/

Now, you are ready to create the AWS Glue ETL job.

Create an AWS Glue ETL job using the library

For this post, we focus on Amazon S3 server access logs. (described in the documentation). By default, these logs are delivered to a single location in Amazon S3. Converting to Parquet and partitioning these logs can significantly improve query performance and decrease query costs.

If you’ve cloned the repository associated with this release, you can use a “make” command to automate the job creation. We also walk through the job creation process in the AWS Glue console. There are a few specific settings on the Job properties page we need to set.

To create the AWS Glue ETL job
  1. In the AWS Glue console, choose Jobs under ETL on the navigation pane, and then choose Add Job. Follow the job creation wizard. Ensure that “A new script to be authored by you” is selected. We provide the code for it later. Our ETL language is Python. Under advanced properties, enable the Job bookmark. Job metrics can also be useful when monitoring your job, but not required.
  2. Under Script libraries in the Python library path section, put the full path to the .zip file that you uploaded to your Amazon S3 bucket as shown previously:
    s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip

    You can adjust the DPUs if you think you need more or less processing power. For our purposes, you can leave it at 10.
  1. Specify a few different types of parameters, described in detail following:
  • The source of your Amazon S3 Server Access Logs.
  • The destination where to save the converted logs.

AWS service logs can be stored in a number of different locations, as discussed in Service Log Specifics. For storing Amazon S3 server access logs, specify the bucket and prefix matching those that you configured on the S3 bucket where you enabled access logging.

  • The names of the databases and tables that are created in the AWS Glue Data Catalog.

By default, the converted logs are partitioned by date. The script creates the necessary tables and keeps the partitions up-to-date on subsequent runs of the job. You don’t need to use AWS Glue crawlers, although they can provide similar functionality. Here are the different properties you need to configure:

Key Value
--raw_database_name source_logs
--raw_table_name s3_access
--converted_database_name aws_service_logs
--converted_table_name s3_access
--s3_converted_target s3://<bucket>/converted/s3_access
--s3_source_location

s3://<bucket>/s3_access

  1. Continue with the rest of the wizard, finishing the job creation flow. The script editor opens. Replace all the code in the script editor, even the import lines, with these lines:
    from athena_glue_service_logs.job import JobRunner
     
    job_run = JobRunner(service_name='s3_access')
    job_run.convert_and_partition()
  1. Save the script and choose Run Job! When the job begins, you see log output from the job scrolling under the script.

The script you just created is saved to S3 in a standard bucket. You can also use the AWS Command Line Interface to create the AWS Glue ETL job. Copy the script preceding to S3 first and provide that as the ScriptLocation parameter.

aws glue create-job --name S3AccessLogConvertor \
--description "Convert and partition S3 Access logs" \
--role AWSGlueServiceRoleDefault \
--command Name=glueetl,ScriptLocation=s3://<bucket>/glue_scripts/s3_access_job.py \
--default-arguments '{
  "--extra-py-files":"s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip",
  "--job-bookmark-option":"job-bookmark-enable",
  "--raw_database_name":"source_logs",
  "--raw_table_name":"s3_access",
  "--converted_database_name":"aws_service_logs",
  "--converted_table_name":"s3_access",
  "--TempDir":"s3://<bucket>/tmp",
  "--s3_converted_target":"s3://<bucket>/converted/s3_access",
  "--s3_source_location":"s3://<bucket>/s3_access/"
}'
Scheduling future runs

By default, this job is configured to run on a manual basis. To run it on a regular basis, set up a new schedule trigger in AWS Glue to run the job at your desired frequency. We recommend scheduling it at hourly to make it easier to locate recent logs for your optimized queries.

On every run of the job, the script looks for the new log data and converts it to Parquet format. The script then adds any new partitions that might have been added as a result of the conversion. The script uses the AWS Glue job bookmarks to ensure that it processes newly delivered data. To find more information about bookmarks in the AWS Glue documentation, see Tracking Processed Data Using Job Bookmarks.

Querying your optimized data in Athena: examples

Now that you’ve converted your data from row-based log files to columnar-based Parquet, you can write queries against this data using Athena. After the first run of the script, the tables specified in the AWS Glue ETL job properties are created for you. Here are several sample queries to get you started.

Example 1: Most requested S3 keys
SELECT key, COUNT(*) AS count
FROM "aws_service_logs"."s3_access"
WHERE operation IN ('REST.GET.OBJECT', 'REST.COPY.OBJECT', 'REST.COPY.OBJECT_GET')
GROUP BY 1
ORDER BY 2 DESC
limit 100;
Example 2: Top IP addresses that accessed the bucket yesterday
SELECT remote_ip, COUNT(*) FROM "aws_service_logs"."s3_access"
WHERE year=date_format(current_date, '%Y') AND month=date_format(current_date, '%m') AND day=date_format(current_date + interval '-1' day, '%d')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Note the use of numbers instead of strings in the use of the GROUP BY and ORDER BY operations. This is one of the optimizations for Athena queries. For other optimizations, be sure to check out the Top 10 Performance Tuning Tips blog post.

In addition, we use the year, month, and day partition columns to limit the amount of data scanned and decrease the cost of the query.

Summary

This post introduces a new open-source library that you can use to efficiently process various types of AWS service logs using AWS Glue. The library automates the application of common best practices to allow high-performing and cost-effective querying of the data using Amazon Athena and Amazon Redshift. We hope this library comes in handy, and we’re open to pull requests. If you want to add a new log type, check out the code in the AWS Labs athena-glue-service-logs repository!

About the Author

Damon Cortesi is a big data architect with Amazon Web Services.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Notebooks are increasingly becoming the standard tool for interactively developing big data applications. It’s easy to see why. Their flexible architecture allows you to experiment with data in multiple languages, test code interactively, and visualize large datasets. To help scientists and developers easily access notebook tools, we launched Amazon EMR Notebooks, a managed notebook environment that is based on the popular open-source Jupyter notebook application. EMR Notebooks support Spark Magic kernels, which allows you to submit jobs remotely on your EMR cluster using languages like PySpark, Spark SQL, Spark R, and Scala. The kernels submit your Spark code through Apache Livy, which is a REST server for Spark running on your cluster.

EMR Notebooks is designed to make it easy for you to experiment and build applications with Apache Spark. In this blog post, I first cover some of the benefits that EMR Notebooks offers. Then I introduce you to some of its capabilities such as detaching and attaching a notebook to different EMR clusters, monitoring Spark activity from within the notebook, using tags to control user permissions, and setting up user-impersonation to track notebook users and their actions. To learn about creating and using EMR Notebooks, you can visit Using Amazon EMR Notebooks or follow along with the AWS Online Tech Talks webinar.

Benefits of EMR Notebooks

One of the useful features of EMR Notebooks is the separation of the notebook environment from your underlying cluster infrastructure. The separation makes it easy for you to execute notebook code against transient clusters without worrying about deploying or configuring your notebook infrastructure every time you bring up a new cluster. You can create multiple serverless notebooks from the AWS Management Console for EMR and access the notebook UI without spending time setting up SSH access or configuring your browser for port-forwarding. Each notebook you create is launched instantly with its own Spark context. This capability enables you to attach multiple notebooks to a single shared cluster and submit parallel jobs without fear of job conflicts in a multi-tenant environment. This way you make efficient use of your clusters.

You can also connect EMR Notebooks to an EMR cluster as small as a one node. This gives you a budget-friendly sandbox environment to develop your Spark application.

Finally, with EMR Notebooks, you don’t have to spend time to manually configure your notebook to store files persistently. Your notebook files are saved automatically to a chosen Amazon S3 bucket periodically so you don’t have to worry about losing your work if your cluster is shut down. You can retrieve your saved notebooks from the console or download it locally from your S3 bucket in Jupyter “ipynb” format.

Detaching and attaching EMR Notebooks to different clusters

With EMR Notebooks, you can detach an active notebook from a cluster and attach it to a different cluster and promptly resume your work. This capability can be useful in scenarios where you want to move your notebook from a sandbox development cluster to a production environment or attach to a different cluster with appropriate CPU or memory resources and library packages required to execute your notebook against large datasets. To detach an active notebook:

First select the notebook name and then choose Stop.

Wait for the notebook status shown next to the notebook name to change from Ready to Stopped and then choose Change Cluster.

After you stop the notebook, you can choose to attach it to a different cluster in the same VPC or create a new cluster. EMR Notebooks automatically attaches the notebook to the cluster and re-starts the notebook.

Monitoring and debugging Spark jobs

EMR Notebooks supports a built-in Jupyter notebook widget called SparkMonitor that allows you to monitor the status of all your Spark jobs launched from the notebook without connecting to the Spark web UI server.

A widget appears and automatically integrates within the cell structure of your notebook and displays detailed status about the job submitted from each cell of your notebook, providing you with real-time progress of the different stages of the job. For any failed jobs, this widget also offers embedded links to the container logs in Amazon S3, allowing you to get to the relevant logs and debug your jobs.

Additionally, if you have configured your cluster to accept SSH connections, then you can access the Spark application web UI and Hadoop jobs history server from within the notebook. This allows you to view the event timelines, visualize Directed Acyclic Graphs (DAG) of each job, and view the detailed system and runtime information to inspect and debug your code. These web UIs are available automatically the first time you start to run your Spark code from a notebook.

Writing tag-based policies to control user permissions for notebooks and users

EMR Notebooks are by default shared resources that anyone from your organization with access to your AWS account can open, edit, or even delete. If you want more control over your notebook, you can use tags to label your notebook and write IAM policies that control access for other users. To get you started, when you create a notebook, a default tag with a key string, creatorUserId, is set to the value of the IAM User ID of the user who created the notebook.

You can use this tag to limit allowed actions on the notebook to only the creator of the notebook. For example, the permissions policy statement below, when attached to a role or user, enables the IAM user to view, start, stop, edit, or delete only those notebooks that they have created. This policy statement uses the default tag that is applied by EMR Notebooks when you create the notebook.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:OpenEditorInConsole"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/creatorUserId": "${aws:userId}"
                }
            }
        }
    ]

You can write policies that enforce the creation of tags before starting a notebook. For example, the policy below requires that the user not change or delete the creatorUserID tag that is added by default. The variable ${aws:userId}, specifies the currently active user’s User ID, which is the default value of the tag.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:CreateEditor"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:RequestTag/creatorUserId": "${aws:userid}"
                }
            }
        }
    ]
}

You can use the notebook tags along with EMR cluster tags to control notebook user access to your cluster. Tagging your notebook and clusters, in addition to securing your resources, also allows you to categorize, track, and allocate your EMR cluster costs across your different line of businesses. For example, the policy below allows a user to create a notebook only if the notebook has a tag with a key string “department” with its value set to “Analytics” and only if the notebook is attached to the EMR cluster that has a tag with the key string “cost-center” and value set to “12345.”

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:editor/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/department": [
                        "Analytics"
                    ]
                }
            }
        },
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:cluster/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/cost-center": [
                        "12345"
                    ]
                }
            }
        }
    ]
}

You can learn more about notebook and cluster tags by visiting Using Tags to Control User Permissions and Tag Clusters. Also, visit Using Cost Allocation Tags to learn more using tags to generate cost allocation report in the AWS Billing and Cost Management Console.

Tracking notebook users by enforcing user-impersonation

EMR Notebooks enables multiple users to execute their notebooks’ code concurrently in a shared EMR cluster, improving cluster utilization. By default, all Spark jobs spawned by these different users from their notebook run as the same user (the livy user) on your EMR cluster. If your corporate policy requires you to set up an audit trail and track individual notebook user actions on shared clusters, you can use the user-impersonation feature of EMR Notebooks. This capability allows you to discriminate and audit all notebook users by associating the jobs they executed from their notebook with the user’s IAM identity. To use this feature, you should enable Livy user-impersonation by configuring the core-site and livy-conf configuration classifications when you create and launch an EMR cluster as follows:

[
    {
        "Classification": "core-site",
        "Properties": {
          "hadoop.proxyuser.livy.groups": "*",
          "hadoop.proxyuser.livy.hosts": "*"
        }
    },
    {
        "Classification": "livy-conf",
        "Properties": {
          "livy.impersonation.enabled": "true"
        }
    }
]

Visit Configuring Applications to learn more about configuring application. After this feature is enabled, EMR Notebooks creates HDFS user directories on the master node for each user identity. This means all the Spark jobs from the notebook run as the IAM user instead of the indistinct user livy. For example, if user NB_User1 runs code from the notebook editor, then a user directory named user_NB_User1 is created on the master node and all Spark jobs run as user_NB_User1. You can then use a service like AWS CloudTrail to audit the record of actions by the user NbUser1 by creating a trail. To learn more about setting up audit trails, see logging Amazon EMR API calls in AWS CloudTrail.

Conclusion

In this post, I highlighted some of the capabilities of EMR Notebooks such as the ability to change clusters, monitor Spark jobs from each notebook cell, control user permission, and categorize resource costs. You can do this by using notebook and cluster tags and setting up user-impersonation to track notebook user actions.

Oh by the way, there is no additional charge for using EMR Notebooks and you only pay for the use of the EMR cluster as-usual!

If you have questions or suggestions, feel free to leave a comment.

About the Authors

Vignesh Rajamani is a senior product manager for EMR at AWS.

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

You generally write unit tests for your code, but do you also test your data? Incorrect or malformed data can have a large impact on production systems. Examples of data quality issues are:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException).
  • Changes in the distribution of data can lead to unexpected outputs of machine learning models.
  • Aggregations of incorrect data can lead to wrong business decisions.

In this blog post, we introduce Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse.

Deequ at Amazon

Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius.

Overview of Deequ

To use Deequ, let’s look at its main components (also shown in Figure 1).

  • Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.

Figure 1: Overview of Deequ components

Setup: Launch the Spark cluster

This section shows the steps to use Deequ on your own data. First, set up Spark and Deequ on an Amazon EMR cluster. Then, load a sample dataset provided by AWS, run some analysis, and then run data tests.

Deequ is built on top of Apache Spark to support fast, distributed calculations on large datasets. Deequ depends on Spark version 2.2.0 or later. As a first step, create a cluster with Spark on Amazon EMR. Amazon EMR takes care of the configuration of Spark for you. Also, you canuse the EMR File System (EMRFS) to directly access data in Amazon S3. For testing, you can also install Spark on a single machine in standalone mode.

Connect to the Amazon EMR master node using SSH. Load the latest Deequ JAR from Maven Repository. To load the JAR of version 1.0.1, use the following:

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar

Launch Spark Shell and use the spark.jars argument for referencing the Deequ JAR file:

spark-shell --conf spark.jars=deequ-1.0.1.jar

For more information about how to set up Spark, see the Spark Quick Start guide, and the overview of Spark configuration options.

Load data

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. Let’s load the dataset containing reviews for the category “Electronics” in Spark. Make sure to enter the code in the Spark shell:

val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

You can see the following selected attributes if you run dataset.printSchema() in the Spark shell:

root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. Deequ supports the following metrics (they are defined in this Deequ package):

Metric

Description

Usage Example

ApproxCountDistinct Approximate number of distinct value, computed with HyperLogLogPlusPlus sketches. ApproxCountDistinct("review_id")
ApproxQuantile Approximate quantile of a distribution. ApproxQuantile("star_rating", quantile = 0.5)
ApproxQuantiles Approximate quantiles of a distribution. ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9))
Completeness Fraction of non-null values in a column. Completeness("review_id")
Compliance Fraction of rows that comply with the given column constraint. Compliance("top star_rating", "star_rating >= 4.0")
Correlation Pearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation. Correlation("total_votes", "star_rating")
CountDistinct Number of distinct values. CountDistinct("review_id")
DataType Distribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions. DataType("year")
Distinctness Fraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3. Distinctness("review_id")
Entropy Entropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055. Entropy("star_rating")
Maximum Maximum value. Maximum("star_rating")
Mean Mean value; null values are excluded. Mean("star_rating")
Minimum Minimum value. Minimum("star_rating")
MutualInformation Mutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative. MutualInformation(Seq("total_votes", "star_rating"))
PatternMatch Fraction of rows that comply with a given regular experssion. PatternMatch("marketplace", pattern = raw"\w{2}".r)
Size Number of rows in a DataFrame. Size()
Sum Sum of all values of a column. Sum("total_votes")
UniqueValueRatio Fraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2. UniqueValueRatio("star_rating")
Uniqueness Fraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3. Uniqueness("star_rating")

In the following example, we show how to use the AnalysisRunner to define the metrics you are interested in. You can run the following code in the Spark shell by either just pasting it in the shell or by saving it in a local file on the master node and loading it in the Spark shell with the following command:

:load PATH_TO_FILE

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)

The resulting data frame contains the calculated metrics (call metrics.show() in the Spark shell):

name instance value
ApproxCountDistinct review_id 3010972
Completeness review_id 1
Compliance top star_rating 0.74941
Correlation helpful_votes,total_votes 0.99365
Correlation total_votes,star_rating -0.03451
Mean star_rating 4.03614
Size * 3120938

We can learn that:

  • review_id has no missing values and approximately 3,010,972 unique values.
  • 74.9 % of reviews have a star_rating of 4 or higher.
  • total_votes and star_rating are not correlated.
  • helpful_votes and total_votes are strongly correlated.
  • The average star_rating is 4.0.
  • The dataset contains 3,120,938 reviews.
Define and Run Tests for Data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data. In this example, we test for the following properties of our data:

  • There are at least 3 million rows in total.
  • review_id is never NULL.
  • review_id is unique.
  • star_rating has a minimum of 1.0 and a maximum of 5.0.
  • marketplace only contains “US”, “UK”, “DE”, “JP”, or “FR”.
  • year does not contain negative values.

This is the code that reflects the previous statements. For information about all available checks, see this GitHub repository. You can run this directly in the Spark shell as previously explained:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

After calling run, Deequ translates your test description into a series of Spark jobs, which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., _ == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.

Call resultDataFrame.show(truncate=false) in the Spark shell to inspect the result. The resulting table shows the verification result for every test, for example:

constraint constraint_status constraint_message
SizeConstraint(Size(None)) Success
MinimumConstraint(Minimum(star_rating,None)) Success
MaximumConstraint(Maximum(star_rating,None)) Success
CompletenessConstraint(Completeness(review_id,None)) Success
UniquenessConstraint(Uniqueness(List(review_id))) Failure Value: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None)) Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) Success

Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness.

We can also look at all the metrics that Deequ computed for this check:

VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)

Result:

name instance value
Completeness review_id 1
Completeness marketplace 1
Compliance marketplace contained in US,UK,DE,JP,FR 1
Compliance year is non-negative 1
Maximum star_rating 5
Minimum star_rating 1
Size * 3120938
Uniqueness review_id 0.99266
Automated Constraint Suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see this GitHub repository.

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested. 
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

The result contains a list of constraints with descriptions and Scala code, so that you can directly apply it in your data quality checks. Call suggestionDataFrame.show(truncate=false) in the Spark shell to inspect the suggested constraints; here we show a subset:

column constraint scala code
customer_id ‘customer_id’ is not null .isComplete("customer_id")
customer_id ‘customer_id’ has type Integral .hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id ‘customer_id’ has no negative values .isNonNegative("customer_id")
helpful_votes ‘helpful_votes’ is not null .isComplete("helpful_votes")
helpful_votes ‘helpful_votes’ has no negative values .isNonNegative("helpful_votes")
marketplace ‘marketplace’ has value range ‘US’, ‘UK’, ‘DE’, ‘JP’, ‘FR’ .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
product_title ‘product_title’ is not null .isComplete("product_title")
star_rating ‘star_rating’ is not null .isComplete("star_rating")
star_rating ‘star_rating’ has no negative values .isNonNegative("star_rating")
vine ‘vine’ has value range ‘N’, ‘Y’ .isContainedIn("vine", Array("N", "Y"))

Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is correct, which might not be the case. We recommend to review the suggestions before applying them in production.

More Examples on GitHub

You can find examples of more advanced features at Deequ’s GitHub page:

  • Deequ not only provides data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the..
Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Many customers use Amazon EMR to run big data workloads, such as Apache Spark and Apache Hive queries, in their development environment. Data analysts and data scientists frequently use these types of clusters, known as analytics EMR clusters. Users often forget to terminate the clusters after their work is done. This leads to idle running of the clusters and in turn, adds up unnecessary costs.

To avoid this overhead, you must track the idleness of the EMR cluster and terminate it if it is running idle for long hours. There is the Amazon EMR native IsIdle Amazon CloudWatch metric, which determines the idleness of the cluster by checking whether there’s a YARN job running. However, you should consider additional metrics, such as SSH users connected or Presto jobs running, to determine whether the cluster is idle. Also, when you execute any Spark jobs in Apache Zeppelin, the IsIdle metric remains active (1) for long hours, even after the job is finished executing. In such cases, the IsIdle metric is not ideal in deciding the inactivity of a cluster.

In this blog post, we propose a solution to cut down this overhead cost. We implemented a bash script to be installed in the master node of the EMR cluster, and the script is scheduled to run every 5 minutes. The script monitors the clusters and sends a CUSTOM metric EMR-INUSE (0=inactive; 1=active) to CloudWatch every 5 minutes. If CloudWatch receives 0 (inactive) for some predefined set of data points, it triggers an alarm, which in turn executes an AWS Lambda function that terminates the cluster.

Prerequisites

You must have the following before you can create and deploy this framework:

Note: This solution is designed as an additional feature. It can be applied to any existing EMR clusters by executing the scheduler script (explained later in the post) as an EMR step. If you want to implement this solution as a mandatory feature for your future clusters, you can include the EMR step as part of your cluster deployment. You can also apply this solution to EMR clusters that are spun up through AWS CloudFormation, the AWS CLI, and even the AWS Management Console.

Key components

The following are the key components of the solution.

Analytics EMR cluster

Amazon EMR provides a managed Apache Hadoop framework that lets you easily process large amounts of data across dynamically scalable Amazon EC2 instances. Data scientists use analytics EMR clusters for data analysis, machine learning using notebook applications (such as Apache Zeppelin or JupyterHub), and running big data workloads based on Apache Spark, Presto, etc.

Scheduler script

The schedule_script.sh is the shell script to be executed as an Amazon EMR step. When executed, it copies the monitoring script from the Amazon S3 artifacts folder and schedules the monitoring script to run every 5 minutes. The S3 location of the monitoring script should be passed as an argument.

Monitoring script

The pushShutDownMetrin.sh script is a monitoring script that is implemented using shell commands. It should be installed in the master node of the EMR cluster as an Amazon EMR step. The script is scheduled to run every 5 minutes and sends the cluster activity status to CloudWatch.  

JupyterHub API token script

The jupyterhub_addAdminToken.sh script is a shell script to be executed as an Amazon EMR step if JupyterHub is enabled on the cluster. In our design, the monitoring script uses REST APIs provided by JupyterHub to check whether the application is in use.

To send the request to JupyterHub, you must pass an API token along with the request. By default, the application does not generate API tokens. This script generates the API token and assigns it to the admin user, which is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Custom CloudWatch metric

All Amazon EMR clusters send data for several metrics to CloudWatch. Metrics are updated every 5 minutes, automatically collected, and pushed to CloudWatch. For this use case, we created the Amazon EMR metric EMR-INUSE. This metric represents the active status of the cluster based on the module checks implemented in the monitoring script. The metric is set to 0 when the cluster is inactive and 1 when active.

Amazon CloudWatch

CloudWatch is a monitoring service that you can use to set high-resolution alarms to take automated actions. In this case, CloudWatch triggers an alarm if it receives 0 continuously for the configured number of hours.

AWS Lambda

Lambda is a serverless technology that lets you run code without provisioning or managing servers. With Lambda, you can run code for virtually any type of application or backend service—all with zero administration. You can set up your code to automatically trigger from other AWS services. In this case, the triggered CloudWatch alarm mentioned earlier signals Lambda to terminate the cluster.

Architectural diagram

The following diagram illustrates the sequence of events when the solution is enabled, showing what happens to the EMR cluster that is spun up via AWS CloudFormation.

 

The diagram shows the following steps:

  1. The AWS CloudFormation stack is launched to spin up an EMR cluster.
  2. The Amazon EMR step is executed (installs the pushShutDownMetric.sh and then schedules it as a cron job to run every 5 minutes).
  3. If the EMR cluster is active (executing jobs), the master node sets the EMR-INUSE metric to 1 and sends it to CloudWatch.
  4. If the EMR cluster is inactive, the master node sets the EMR-INUSE metric to 0 and sends it to CloudWatch.
  5. On receiving 0 for a predefined number of data points, CloudWatch triggers a CloudWatch alarm.
  6. The CloudWatch alarm sends notification to AWS Lambda to terminate the cluster.
  7. AWS Lambda executes the Lambda function.
  8. The Lambda function then deletes all the stack resources associated with the cluster.
  9. Finally, the EMR cluster is terminated, and the Stack ID is removed from AWS CloudFormation.
Modules in the monitoring script

Following are the different activity checks that are implemented in the monitoring script (pushShutDownMetric.sh). The script is designed in a modular fashion so that you can easily include new modules without modifying the core functionality.

ActiveSSHCheck

The ActiveSSHCheck module checks whether there are any active SSH connections to the master node. If there is an active SSH connection, and it’s idle for less than 10 minutes, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.

YARNCheck

Apache Hadoop YARN is the resource manager of the EMR Hadoop ecosystem. All the Spark Submits and Hive queries reach YARN initially. It then schedules and processes these jobs. The YARNCheck module checks whether there are any running jobs in YARN or jobs completed within last 5 minutes. If it finds any, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.The checks are performed by calling REST APIs exposed by YARN.

The API to fetch the running jobs is http://localhost:8088/ws/v1/cluster/apps?state=RUNNING.

The API to fetch the completed jobs is

http://localhost:8088/ws/v1/cluster/apps?state=FINISHED.

PRESTOCheck

Presto is an open-source distributed query engine for running interactive analytic queries. It is included in EMR release version 5.0.0 and later.The PRESTOCheck module checks whether there are any running Presto queries or if any queries have been completed within last 5 minutes. If there are some, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling REST APIs exposed by the Presto server.

The API to fetch the Presto jobs is http://localhost:8889/v1/query.

ZeppelinCheck

Amazon EMR users use Apache Zeppelin as a notebook for interactive data exploration. The ZeppelinCheck module checks whether there are any jobs running or if any have been completed within the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling the REST APIs exposed by Zeppelin.

The API to fetch the list of notebook IDs is http://localhost:8890/api/notebook.

The API to fetch the status of each cell inside each notebook ID is http://localhost:8890/api/notebook/job/$notebookID.

JupyterHubCheck

Jupyter Notebook is an open-source web application that you can use to create and share documents that contain live code, equations, visualizations, and narrative text. JupyterHub allows you to host multiple instances of a single-user Jupyter notebook server.The JupyterHubCheck module checks whether any Jupyter notebook is currently in use.

The function uses REST APIs exposed by JupyterHub to fetch the list of Jupyter notebook users and gathers the data about individual notebook servers. From the response, it extracts the last activity time of the servers and checks whether any server was used in the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. The jupyterhub_addAdminToken.sh script needs to be executed as an EMR step before enabling the scheduler script.

The API to fetch the list of notebook users is https://localhost:9443/hub/api/users -H "Authorization: token $admin_token".

The API to fetch individual server information is https://localhost:9443/hub/api/users/$user -H "Authorization: token $admin_token.

If any one of these checks fails, the cluster is considered to be inactive, and the monitoring script sets the EMR-INUSE metric to 0 and pushes it to CloudWatch.

Note:

The scheduler script schedules the monitoring script (pushShutDownMetric.sh) to run every 5 minutes. Internal cron jobs that run for a very few minutes are not considered in calibrating the EMR-INUSE metric.

Deploying each component

Follow the steps in this section to deploy each component of the proposed design.

Step 1. Create the Lambda function and SNS subscription

The Lambda function and the SNS subscription are the core components of the design. You must set up these components initially, and they are common for every cluster. The following are the AWS resources to be created for these components:

  • Execution role for the Lambda function
  • Terminate Idle EMR Lambda function
  • SNS topic and Lambda subscription

For one-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

Parameter Default Description
s3Bucket emr-shutdown-blogartifacts The name of the S3 bucket that contains the Lambda file
s3Key EMRTerminate.zip The Amazon S3 key of the Lambda file

For manual deployment, follow these steps on the AWS Management Console.

Execution role for the Lambda function
  1. Open the AWS Identity and Access Management (IAM) consoleand choose PoliciesCreate policy.
  2. Choose the JSON tab, paste the following policy text, and then choose Review policy.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:HeadBucket",
                "s3:ListObjects",
                "s3:GetObject",
                "cloudformation:ListStacks",
                "cloudformation:DeleteStack",
                "cloudformation:DescribeStacks",
                "cloudformation:ListStackResources",
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "GenericAccess"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*",
            "Effect": "Allow",
            "Sid": "LogAccess"
        }
    ]
}
  1. For Name, enter TerminateEMRPolicy and choose Create policy.
  2. Choose RolesCreate role.
  3. Under Choose the service that will use this role, choose Lambda, and then choose Next: Permissions.
  4. For Attach permissions policies, choose the arrow next to Filter policies and choose Customer managed in the drop-down list.
  5. Attach the TerminateEMRPolicy policy that you just created, and choose Review.
  6. For Role name, enter TerminateEMRLambdaRole and then choose Create role.
Terminate idle EMR – Lambda function

I created a deployment package to use with this function.

  1. Open the Lambda consoleand choose Create function.
  2. Choose Author from scratch, and provide the details as shown in the following screenshot:
  • Name: lambdaTerminateEMR
  • Runtime: Python 2.7
  • Role: Choose an existing role
  • Existing role: TerminateEMRLambdaRole

  1. Choose Create function.
  2. In the Function code section, for Code entry type, choose Upload a file from Amazon S3, and for Runtime, choose Python 2.7.

The Lambda function S3 link URL is

s3://emr-shutdown-blogartifacts/EMRTerminate.zip.

Link to the function: https://s3.amazonaws.com/emr-shutdown-blogartifacts/EMRTerminate.zip

This Lambda function is triggered by a CloudWatch alarm. It parses the input event, retrieves the JobFlowId, and deletes the AWS CloudFormation stack of the corresponding JobFlowId.

SNS topic and Lambda subscription

For setting the CloudWatch alarm in the further stages, you must create an Amazon SNS topic that notifies the preceding Lambda function to execute. Follow these steps to create an SNS topic and configure the Lambda endpoint.

  1. Navigate to the Amazon SNS console, and choose Create topic.
  2. Enter the Topic name and Display name, and choose Create topic.

  1. The topic is created and displayed in the Topics
  2. Select the topic and choose Actions, Subscribe to topic.

  1. In the Create subscription, choose the AWS Lambda Choose lambdaterminateEMR as the endpoint, and choose Create subscription.
Step 2. Execute the JupyterHub API token script as an EMR step

This step is required only when JupyterHub is enabled in the cluster.

Navigate to the EMR cluster to be monitored, and execute the scheduler script as an EMR step.

Command: s3://emr-shutdown-blogartifacts/jupyterhub_addAdminToken.sh

This script generates an API token and assigns it to the admin user. It is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Step 3. Execute the scheduler script as an EMR step

Navigate to the EMR cluster to be monitored and execute the scheduler script as an EMR step.

Note:

Ensure that termination protection is disabled in the cluster. The termination protection flag causes the Lambda function to fail.

Command: s3://emr-shutdown-blogartifacts/schedule_script.sh

Parameter: s3://emr-shutdown-blogartifacts/pushShutDownMetrin.sh

The step function copies the pushShutDownMetric.sh script to the master node and schedules it to run every 5 minutes.

The schedule_script.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/schedule_script.sh.

The pushShutDownMetrin.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/pushShutDownMetrin.sh.

Step 4. Create a CloudWatch alarm

For single-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

Parameter Default Description
AlarmName TerminateIDLE-EMRAlarm The name for the alarm.
EMRJobFlowID Requires input The Jobflowid of the cluster.
EvaluationPeriod Requires input The idle timeout value—input should be in data points (1 data point equals 5 minutes). For example, to terminate the cluster if it is idle for 20 minutes, the input should be 4.
SNSSubscribeTopic Requires input The Amazon Resource Name (ARN) of the SNS topic to be triggered on the alarm.

The AWS CloudFormation CLI command is as follows:

aws cloudformation create-stack --stack-name EMRAlarmStack \
      --template-body s3://emr-shutdown-blogartifacts/Cloudformation/alarm.json \
      --parameters AlarmName=TerminateIDLE-EMRAlarm,EMRJobFlowID=<Input>,                 EvaluationPeriod=4,SNSSubscribeTopic=<Input>

For manual deployment, follow these steps to create the alarm.

  1. Open the Amazon CloudWatch console and choose Alarms.
  2. Choose Create Alarm.
  3. On the Select Metric page, under Custom Metrics, choose EMRShutdown/Cluster-Metric.

  1. Choose the isEMRUsed metric of the EMR JobFlowId, and then choose Next.

  1. Define the alarm as required. In this case, the alarm is set to send notification to the SNS topic shutDownEMRTest when CloudWatch receives the IsEMRUsed metric as 0 for every data point in the last 2 hours.

  1. Choose Create Alarm.
Summary

In this post, we focused on building a framework to cut down the additional cost that you might incur due to the idle running of an EMR cluster. The modules implemented in the shell script, the tracking of the execution status of the Spark scripts, and the Hive/Presto queries using the lightweight REST API calls make this approach an efficient solution.

If you have questions or suggestions, please comment below.

About the Author

Praveen Krishnamoorthy Ravikumar is an associate big data consultant with Amazon Web Services.

Read Full Article

Read for later

Articles marked as Favorite are saved for later viewing.
close
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview