Amazon Web Services offers you with all the latest updates and news on Big Data. The AWS mission is to enable developers and businesses to use web services to easily build and be paid for sophisticated, scalable applications.
Amazon EMR empowers many customers to build big data processing applications quickly and cost-effectively, using popular distributed frameworks such as Apache Spark, Apache HBase, Presto, and Apache Flink. For organizations that are crafting their analytical applications on Amazon EMR, there is a growing need to keep their data assets organized in an automated fashion. Because datasets tend to grow exponentially, using cataloging tools is essential to automating data discovery and organizing data assets.
AWS Glue Data Catalog provides this essential capability, allowing you to automatically discover and catalog metadata about your data stores in a central repository. Since Amazon EMR 5.8.0, customers have been using the AWS Glue Data Catalog as a metadata store for Apache Hive and Spark SQL applications that are running on Amazon EMR. Starting with Amazon EMR 5.10.0, you can catalog datasets using AWS Glue and run queries using Presto on Amazon EMR from the Hue (Hadoop User Experience) and Apache Zeppelin UIs.
You might wonder what scenarios warrant using Presto running on Amazon EMR and when to choose Amazon Athena (which uses Presto as the query engine under the hood). It is important to note that both are excellent tools for querying massive amounts of data and addressing different needs and use cases.
Amazon Athena provides the easiest way to run interactive queries for data in Amazon S3 without needing to set up or manage any servers. Presto running on Amazon EMR gives you much more flexibility in how you configure and run your queries, providing the ability to federate to other data sources if needed. For example, you might have a use case that requires LDAP authentication for clients such as the Presto CLI or JDBC/ODBC drivers. Or you might have a workflow where you need to join data between different systems like MySQL/Amazon Redshift/Apache Cassandra and Hive. In these examples, Presto running on Amazon EMR is the right tool to use because it can be configured to enable LDAP authentication in addition to the desired database connectors at cluster launch.
Now, let’s look at how metadata management for Presto works with AWS Glue.
Using an AWS Glue crawler to discover datasets
The AWS Glue Data Catalog is a reference to the location, schema, and runtime metrics of your datasets. To create this reference metadata, AWS Glue needs to crawl your datasets. In this exercise, we use an AWS Glue crawler to populate tables in the Data Catalog for the NYC taxi rides dataset.
The following are the steps for adding a crawler:
Sign in to the AWS Management Console, and open the AWS Glue console. In the navigation pane, choose Crawlers. Then choose Add crawler.
On the Add a data store page, specify the location of the NYC taxi rides dataset.
In the next step, choose an existing IAM role if one is available, or create a new role. Then choose Next.
On the scheduling page, for Frequency, choose Run on demand.
On the Configure the crawler’s output page, choose Add database. Specify blog-db as the database name. (You can specify a name of your choice, but be sure to choose the correct database name when running queries.)
Follow the remaining steps using the default values to create a crawler.
When the crawler displays the Ready state, navigate to the Databases (Choose blog-db from the list of databases, or search for it by specifying it as a filter, as shown in the following screenshot.) Then choose Tables. You should see the three tables created by the crawler, as follows.
After you’ve set up the Amazon EMR cluster with Presto, the AWS Glue Data Catalog is available through a default “hive” catalog. To change between the Hive and Glue metastores, you have to manually update hive.properties and restart the Presto server. Connect to the master node on your EMR cluster using SSH, and run the Presto CLI to start running queries interactively.
$ presto-cli --catalog hive
Begin with a simple query to sample a few rows:
presto> SELECT * FROM “blog-db”.taxi limit 10;
The query shows a few sample rows as follows:
Query the average fare for trips at each hour of the day and for each day of the month on the Parquet version of the taxi dataset.
presto> SELECT EXTRACT (HOUR FROM pickup_datetime) AS hour, avg(fare_amount) AS average_fare FROM “blog-db”.taxi_parquet GROUP BY 1 ORDER BY 1;
The following image shows the results:
More interestingly, you can compute the number of trips that gave tips in the 10 percent, 15 percent, or higher percentage range:
presto> -- Tip Percent Category
, COUNT (DISTINCT TripID) TripCt
WHEN fare_prct < 0.7 THEN 'FL70'
WHEN fare_prct < 0.8 THEN 'FL80'
WHEN fare_prct < 0.9 THEN 'FL90'
WHEN tip_prct < 0.1 THEN 'TL10'
WHEN tip_prct < 0.15 THEN 'TL15'
WHEN tip_prct < 0.2 THEN 'TL20'
, (fare_amount / total_amount) as fare_prct
, (extra / total_amount) as extra_prct
, (mta_tax / total_amount) as tip_prct
, (tolls_amount / total_amount) as mta_taxprct
, (tip_amount / total_amount) as tolls_prct
, (improvement_surcharge / total_amount) as imprv_suchrgprct
, (cast(pickup_longitude AS VARCHAR(100)) || '_' || cast(pickup_latitude AS VARCHAR(100))) as TripID
WHERE total_amount > 0
) as t
) as t
GROUP BY TipPrctCtgry;
The results are as follows:
While the preceding query is running, navigate to the web interface for Presto on Amazon EMR at <http://master-public-dns-name:8889/. Here you can look into the query metrics, such as active worker nodes, number of rows read per second, reserved memory, and parallelism.
Running queries in the Presto Editor on Hue
If you installed Hue with your Amazon EMR launch, you can also run queries on Hue’s Presto Editor. On the Amazon EMR Cluster console, choose Enable Web Connection, and follow the instructions to access the web interfaces for Hue and Zeppelin.
After the web connection is enabled, choose the Hue link to open the web interface. At the login screen, if you are the administrator logging in for the first time, type a user name and password to create your Hue superuser account. Then choose Create account. Otherwise, type your user name and password and choose Create account, or type the credentials provided by your administrator.
Choose the Presto Editor from the menu. You can run Presto queries against your tables in the AWS Glue Data Catalog.
Having a shared data catalog for applications on Amazon EMR alleviates a myriad of data-related challenges that organizations face today—including discovery, governance, auditability, and collaboration. In this post, we explored how the AWS Glue Data Catalog addresses discoverability and manageability for table metadata for Presto on Amazon EMR. Go ahead, give this a try, and share your experience with us!
Radhika Ravirala is a Solutions Architect at Amazon Web Services where she helps customers craft distributed big data applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley. She holds a M.S in computer science from San Jose State University.
A customer has been successfully creating and running multiple Amazon Elasticsearch Service (Amazon ES) domains to support their business users’ search needs across products, orders, support documentation, and a growing suite of similar needs. The service has become heavily used across the organization. This led to some domains running at 100% capacity during peak times, while others began to run low on storage space. Because of this increased usage, the technical teams were in danger of missing their service level agreements. They contacted me for help.
This post shows how you can set up automated alarms to warn when domains need attention.
Amazon ES is a fully managed service that delivers Elasticsearch’s easy-to-use APIs and real-time analytics capabilities along with the availability, scalability, and security that production workloads require. The service offers built-in integrations with a number of other components and AWS services, enabling customers to go from raw data to actionable insights quickly and securely.
One of these other integrated services is Amazon CloudWatch. CloudWatch is a monitoring service for AWS Cloud resources and the applications that you run on AWS. You can use CloudWatch to collect and track metrics, collect and monitor log files, set alarms, and automatically react to changes in your AWS resources.
CloudWatch collects metrics for Amazon ES. You can use these metrics to monitor the state of your Amazon ES domains, and set alarms to notify you about high utilization of system resources. For more information, see Amazon Elasticsearch Service Metrics and Dimensions.
While the metrics are automatically collected, the missing piece is how to set alarms on these metrics at appropriate levels for each of your domains. This post includes sample Python code to evaluate the current state of your Amazon ES environment, and to set up alarms according to AWS recommendations and best practices.
There are two components to the sample solution:
es-check-cwalarms.py: This Python script checks the CloudWatch alarms that have been set, for all Amazon ES domains in a given account and region.
The sample code can also be found in the amazon-es-check-cw-alarms GitHub repo. The scripts are easy to extend or combine, as described in the section “Extensions and Adaptations”.
Assessing the current state
The first script, es-check-cwalarms.py, is used to give an overview of the configurations and alarm settings for all the Amazon ES domains in the given region. The script takes the following parameters:
python es-checkcwalarms.py -h
usage: es-checkcwalarms.py [-h] [-e ESPREFIX] [-n NOTIFY] [-f FREE][-p PROFILE] [-r REGION]
Checks a set of recommended CloudWatch alarms for Amazon Elasticsearch Service domains (optionally, those beginning with a given prefix).
-h, --help show this help message and exit
-e ESPREFIX, --esprefix ESPREFIX Only check Amazon Elasticsearch Service domains that begin with this prefix.
-n NOTIFY, --notify NOTIFY List of CloudWatch alarm actions; e.g. ['arn:aws:sns:xxxx']
-f FREE, --free FREE Minimum free storage (MB) on which to alarm
-p PROFILE, --profile PROFILE IAM profile name to use
-r REGION, --region REGION AWS region for the domain. Default: us-east-1
The script first identifies all the domains in the given region (or, optionally, limits them to the subset that begins with a given prefix). It then starts running a set of checks against each one.
The script can be run from the command line or set up as a scheduled Lambda function. For example, for one customer, it was deemed appropriate to regularly run the script to check that alarms were correctly set for all domains. In addition, because configuration changes—cluster size increases to accommodate larger workloads being a common change—might require updates to alarms, this approach allowed the automatic identification of alarms no longer appropriately set as the domain configurations changed.
The output shown below is the output for one domain in my account.
Starting checks for Elasticsearch domain iotfleet , version is 53
Iotfleet Automated snapshot hour (UTC): 0
Iotfleet Instance configuration: 1 instances; type:m3.medium.elasticsearch
Iotfleet Instance storage definition is: 4 GB; free storage calced to: 819.2 MB
iotfleet Desired free storage set to (in MB): 819.2
iotfleet WARNING: Not using VPC Endpoint
iotfleet WARNING: Does not have Zone Awareness enabled
iotfleet WARNING: Instance count is ODD. Best practice is for an even number of data nodes and zone awareness.
iotfleet WARNING: Does not have Dedicated Masters.
iotfleet WARNING: Neither index nor search slow logs are enabled.
iotfleet WARNING: EBS not in use. Using instance storage only.
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-ClusterStatus.yellow-Alarm ClusterStatus.yellow
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-ClusterStatus.red-Alarm ClusterStatus.red
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-CPUUtilization-Alarm CPUUtilization
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-JVMMemoryPressure-Alarm JVMMemoryPressure
iotfleet WARNING: Missing alarm!! ('ClusterIndexWritesBlocked', 'Maximum', 60, 5, 'GreaterThanOrEqualToThreshold', 1.0)
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-AutomatedSnapshotFailure-Alarm AutomatedSnapshotFailure
iotfleet Alarm: Threshold does not match: Test-Elasticsearch-iotfleet-FreeStorageSpace-Alarm Should be: 819.2 ; is 3000.0
The output messages fall into the following categories:
System overview, Informational: The Amazon ES version and configuration, including instance type and number, storage, automated snapshot hour, etc.
Free storage: A calculation for the appropriate amount of free storage, based on the recommended 20% of total storage.
Warnings: best practices that are not being followed for this domain. (For more about this, read on.)
Alarms: An assessment of the CloudWatch alarms currently set for this domain, against a recommended set.
The script contains an array of recommended CloudWatch alarms, based on best practices for these metrics and statistics. Using the array allows alarm parameters (such as free space) to be updated within the code based on current domain statistics and configurations.
For a given domain, the script checks if each alarm has been set. If the alarm is set, it checks whether the values match those in the array esAlarms. In the output above, you can see three different situations being reported:
Alarm ok; definition matches. The alarm set for the domain matches the settings in the array.
Alarm: Threshold does not match. An alarm exists, but the threshold value at which the alarm is triggered does not match.
WARNING: Missing alarm!! The recommended alarm is missing.
All in all, the list above shows that this domain does not have a configuration that adheres to best practices, nor does it have all the recommended alarms.
Setting up alarms
Now that you know that the domains in their current state are missing critical alarms, you can correct the situation.
To demonstrate the script, set up a new domain named “ver”, in us-west-2. Specify 1 node, and a 10-GB EBS disk. Also, create an SNS topic in us-west-2 with a name of “sendnotification”, which sends you an email.
Run the second script, es-create-cwalarms.py, from the command line. This script creates (or updates) the desired CloudWatch alarms for the specified Amazon ES domain, “ver”.
python es-create-cwalarms.py -r us-west-2 -e test -c ver -n "['arn:aws:sns:us-west-2:xxxxxxxxxx:sendnotification']"
EBS enabled: True type: gp2 size (GB): 10 No Iops 10240 total storage (MB)
Desired free storage set to (in MB): 2048.0
Successfully finished creating alarms!
As with the first script, this script contains an array of recommended CloudWatch alarms, based on best practices for these metrics and statistics. This approach allows you to add or modify alarms based on your use case (more on that below).
After running the script, navigate to Alarms on the CloudWatch console. You can see the set of alarms set up on your domain.
Because the “ver” domain has only a single node, cluster status is yellow, and that alarm is in an “ALARM” state. It’s already sent a notification that the alarm has been triggered.
In most cases, the alarm triggers due to an increased workload. The likely action is to reconfigure the system to handle the increased workload, rather than reducing the incoming workload. Reconfiguring any backend store—a category of systems that includes Elasticsearch—is best performed when the system is quiescent or lightly loaded. Reconfigurations such as setting zone awareness or modifying the disk type cause Amazon ES to enter a “processing” state, potentially disrupting client access.
Other changes, such as increasing the number of data nodes, may cause Elasticsearch to begin moving shards, potentially impacting search performance on these shards while this is happening. These actions should be considered in the context of your production usage. For the same reason I also do not recommend running a script that resets all domains to match best practices.
Avoid the need to reconfigure during heavy workload by setting alarms at a level that allows a considered approach to making the needed changes. For example, if you identify that each weekly peak is increasing, you can reconfigure during a weekly quiet period.
While Elasticsearch can be reconfigured without being quiesced, it is not a best practice to automatically scale it up and down based on usage patterns. Unlike some other AWS services, I recommend against setting a CloudWatch action that automatically reconfigures the system when alarms are triggered.
There are other situations where the planned reconfiguration approach may not work, such as low or zero free disk space causing the domain to reject writes. If the business is dependent on the domain continuing to accept incoming writes and deleting data is not an option, the team may choose to reconfigure immediately.
Extensions and adaptations
You may wish to modify the best practices encoded in the scripts for your own environment or workloads. It’s always better to avoid situations where alerts are generated but routinely ignored. All alerts should trigger a review and one or more actions, either immediately or at a planned date. The following is a list of common situations where you may wish to set different alarms for different domains:
Dev/test vs. production You may have a different set of configuration rules and alarms for your dev environment configurations than for test. For example, you may require zone awareness and dedicated masters for your production environment, but not for your development domains. Or, you may not have any alarms set in dev. For test environments that mirror your potential peak load, test to ensure that the alarms are appropriately triggered.
Differing workloads or SLAs for different domains You may have one domain with a requirement for superfast search performance, and another domain with a heavy ingest load that tolerates slower search response. Your reaction to slow response for these two workloads is likely to be different, so perhaps the thresholds for these two domains should be set at a different level. In this case, you might add a “max CPU utilization” alarm at 100% for 1 minute for the fast search domain, while the other domain only triggers an alarm when the average has been higher than 60% for 5 minutes. You might also add a “free space” rule with a higher threshold to reflect the need for more space for the heavy ingest load if there is danger that it could fill the available disk quickly.
“Normal” alarms versus “emergency” alarms If, for example, free disk space drops to 25% of total capacity, an alarm is triggered that indicates action should be taken as soon as possible, such as cleaning up old indexes or reconfiguring at the next quiet period for this domain. However, if free space drops below a critical level (20% free space), action must be taken immediately in order to prevent Amazon ES from setting the domain to read-only. Similarly, if the “ClusterIndexWritesBlocked” alarm triggers, the domain has already stopped accepting writes, so immediate action is needed. In this case, you may wish to set “laddered” alarms, where one threshold causes an alarm to be triggered to review the current workload for a planned reconfiguration, but a different threshold raises a “DefCon 3” alarm that immediate action is required.
The sample scripts provided here are a starting point, intended for you to adapt to your own environment and needs.
Running the scripts one time can identify how far your current state is from your desired state, and create an initial set of alarms. Regularly re-running these scripts can capture changes in your environment over time and adjusting your alarms for changes in your environment and configurations. One customer has set them up to run nightly, and to automatically create and update alarms to match their preferred settings.
Removing unwanted alarms
Each CloudWatch alarm costs approximately $0.10 per month. You can remove unwanted alarms in the CloudWatch console, under Alarms. If you set up a “ver” domain above, remember to remove it to avoid continuing charges.
Setting CloudWatch alarms appropriately for your Amazon ES domains can help you avoid suboptimal performance and allow you to respond to workload growth or configuration issues well before they become urgent. This post gives you a starting point for doing so. The additional sleep you’ll get knowing you don’t need to be concerned about Elasticsearch domain performance will allow you to focus on building creative solutions for your business and solving problems for your customers.
Dr. Veronika Megler is a senior consultant at Amazon Web Services. She works with our customers to implement innovative big data, AI and ML projects, helping them accelerate their time-to-value when using AWS.
This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.
Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.
The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.
Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this blog post, we cover the following aspects of running Kafka clusters on AWS:
Deployment considerations and patterns
Backup and restore
Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.
Deployment considerations and patterns
In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.
Single AWS Region, Three Availability Zones, All Active
One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.
In this pattern, this is the Kafka cluster deployment:
Kafka producers and Kafka cluster are deployed on each AZ.
Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
Kafka consumers aggregate data from all three Kafka clusters.
Kafka cluster failover occurs this way:
Mark down all Kafka producers
Debug and restack Kafka
Restart Kafka producers
Following are the pros and cons of this pattern.
Can sustain the failure of two AZs
No message loss during failover
Very high operational overhead:
All changes need to be deployed three times, one for each Kafka cluster
Maintaining and monitoring three Kafka clusters
Maintaining and monitoring three consumer clusters
A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.
Single Region, Three Availability Zones, Active-Standby
Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.
In this pattern, this is the Kafka cluster deployment:
Kafka producers are deployed on all three AZs.
Only one Kafka cluster is deployed across three AZs (active).
ZooKeeper instances are deployed on each AZ.
Brokers are spread evenly across all three AZs.
Kafka consumers can be deployed across all three AZs.
Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.
Kafka cluster failover occurs this way:
Switch traffic to standby Kafka producers cluster and Kafka cluster.
Restart consumers to consume from standby Kafka cluster.
Following are the pros and cons of this pattern.
Less operational overhead when compared to the first option
Only one Kafka cluster to manage and consume data from
Can handle single AZ failures without activating a standby Kafka cluster
Added latency due to cross-AZ data transfer among Kafka brokers
For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
Possibility of in-transit message loss during failover
Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.
There are two storage options for file storage in Amazon EC2:
Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.
Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.
The following table contrasts the benefits of using an instance store versus using EBS for storage.
Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.
The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.
Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.
Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.
The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.
The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.
If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.
In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.
If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.
Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.
Rolling or in-place upgrade
In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.
If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.
Intuit followed the blue/green deployment model for their workloads, as described following.
If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.
The following illustration shows a blue/green upgrade.
In this scenario, the upgrade plan works like this:
Create a new Kafka cluster on AWS.
Create a new Kafka producers stack to point to the new Kafka cluster.
Create topics on the new Kafka cluster.
Test the green deployment end to end (sanity check).
Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.
The roll-back plan works like this:
Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.
You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.
These are some general performance tuning techniques:
If throughput is less than network capacity, try the following:
Add more threads
Increase batch size
Add more producer instances
Add more partitions
To improve latency when acks =-1, increase your num.replica.fetches value.
For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.
For Java and JVM tuning, try the following:
Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
Try to keep the Kafka heap size below 4 GB.
Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.
For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.
For system metrics, we recommend that you monitor:
File handle usage
Disk I/O performance
For producers, we recommend that you monitor:
For consumers, we recommend that you monitor:
Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.
Kafka uses TLS for client and internode communications.
Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).
Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.
In Kafka, authorization is pluggable and integration with external authorization services is supported.
Backup and restore
The type of storage used in your deployment dictates your backup and restore strategy.
The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.
For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.
For more information on how to back up in Kafka, see the Kafka documentation.
In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Apache Cassandra is a commonly used, high performance NoSQL database. AWS customers that currently maintain Cassandra on-premises may want to take advantage of the scalability, reliability, security, and economic benefits of running Cassandra on Amazon EC2.
Amazon EC2 and Amazon Elastic Block Store (Amazon EBS) provide secure, resizable compute capacity and storage in the AWS Cloud. When combined, you can deploy Cassandra, allowing you to scale capacity according to your requirements. Given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this post, we outline three Cassandra deployment options, as well as provide guidance about determining the best practices for your use case in the following areas:
Cassandra resource overview
High availability and resiliency
Before we jump into best practices for running Cassandra on AWS, we should mention that we have many customers who decided to use DynamoDB instead of managing their own Cassandra cluster. DynamoDB is fully managed, serverless, and provides multi-master cross-region replication, encryption at rest, and managed backup and restore. Integration with AWS Identity and Access Management (IAM) enables DynamoDB customers to implement fine-grained access control for their data security needs.
AWS provides options, so you’re covered whether you want to run your own NoSQL Cassandra database, or move to a fully managed, serverless DynamoDB database.
Cassandra resource overview
Here’s a short introduction to standard Cassandra resources and how they are implemented with AWS infrastructure. If you’re already familiar with Cassandra or AWS deployments, this can serve as a refresher.
A single Cassandra deployment.
This typically consists of multiple physical locations, keyspaces, and physical servers.
A logical deployment construct in AWS that maps to an AWS CloudFormation StackSet, which consists of one or many CloudFormation stacks to deploy Cassandra.
A group of nodes configured as a single replication group.
A logical deployment construct in AWS.
A datacenter is deployed with a single CloudFormation stack consisting of Amazon EC2 instances, networking, storage, and security resources.
A collection of servers.
A datacenter consists of at least one rack. Cassandra tries to place the replicas on different racks.
A single Availability Zone.
A physical virtual machine running Cassandra software.
An EC2 instance.
Conceptually, the data managed by a cluster is represented as a ring. The ring is then divided into ranges equal to the number of nodes. Each node being responsible for one or more ranges of the data. Each node gets assigned with a token, which is essentially a random number from the range. The token value determines the node’s position in the ring and its range of data.
Managed within Cassandra.
Virtual node (vnode)
Responsible for storing a range of data. Each vnode receives one token in the ring. A cluster (by default) consists of 256 tokens, which are uniformly distributed across all servers in the Cassandra datacenter.
Managed within Cassandra.
The total number of replicas across the cluster.
Managed within Cassandra.
One of the many benefits of deploying Cassandra on Amazon EC2 is that you can automate many deployment tasks. In addition, AWS includes services, such as CloudFormation, that allow you to describe and provision all your infrastructure resources in your cloud environment.
We recommend orchestrating each Cassandra ring with one CloudFormation template. If you are deploying in multiple AWS Regions, you can use a CloudFormation StackSet to manage those stacks. All the maintenance actions (scaling, upgrading, and backing up) should be scripted with an AWS SDK. These may live as standalone AWS Lambda functions that can be invoked on demand during maintenance.
You can get started by following the Cassandra Quick Start deployment guide. Keep in mind that this guide does not address the requirements to operate a production deployment and should be used only for learning more about Cassandra.
In this section, we discuss various deployment options available for Cassandra in Amazon EC2. A successful deployment starts with thoughtful consideration of these options. Consider the amount of data, network environment, throughput, and availability.
Single AWS Region, 3 Availability Zones
Single region, 3 Availability Zones
In this pattern, you deploy the Cassandra cluster in one AWS Region and three Availability Zones. There is only one ring in the cluster. By using EC2 instances in three zones, you ensure that the replicas are distributed uniformly in all zones.
To ensure the even distribution of data across all Availability Zones, we recommend that you distribute the EC2 instances evenly in all three Availability Zones. The number of EC2 instances in the cluster is a multiple of three (the replication factor).
This pattern is suitable in situations where the application is deployed in one Region or where deployments in different Regions should be constrained to the same Region because of data privacy or other legal requirements.
● Highly available, can sustain failure of one Availability Zone.
● Simple deployment
● Does not protect in a situation when many of the resources in a Region are experiencing intermittent failure.
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster are deployed in more than one Region.
● No data loss during failover.
● Highly available, can sustain when many of the resources in a Region are experiencing intermittent failures.
● Read/write traffic can be localized to the closest Region for the user for lower latency and higher performance.
● High operational overhead
● The second Region effectively doubles the cost
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
However, the second Region does not receive traffic from the applications. It only functions as a secondary location for disaster recovery reasons. If the primary Region is not available, the second Region receives traffic.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster require low recovery point objective (RPO) and recovery time objective (RTO).
● No data loss during failover.
● Highly available, can sustain failure or partitioning of one whole Region.
● High operational overhead.
● High latency for writes for eventual consistency.
● The second Region effectively doubles the cost.
In on-premises deployments, Cassandra deployments use local disks to store data. There are two storage options for EC2 instances:
Your choice of storage is closely related to the type of workload supported by the Cassandra cluster. Instance store works best for most general purpose Cassandra deployments. However, in certain read-heavy clusters, Amazon EBS is a better choice.
The choice of instance type is generally driven by the type of storage:
If ephemeral storage is required for your application, a storage-optimized (I3) instance is the best option.
If your workload requires Amazon EBS, it is best to go with compute-optimized (C5) instances.
Burstable instance types (T2) don’t offer good performance for Cassandra deployments.
Ephemeral storage is local to the EC2 instance. It may provide high input/output operations per second (IOPs) based on the instance type. An SSD-based instance store can support up to 3.3M IOPS in I3 instances. This high performance makes it an ideal choice for transactional or write-intensive applications such as Cassandra.
In general, instance storage is recommended for transactional, large, and medium-size Cassandra clusters. For a large cluster, read/write traffic is distributed across a higher number of nodes, so the loss of one node has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important.
As an example, for a cluster with 100 nodes, the loss of 1 node is 3.33% loss (with a replication factor of 3). Similarly, for a cluster with 10 nodes, the loss of 1 node is 33% less capacity (with a replication factor of 3).
(translates to higher query performance)
Up to 3.3M on I3
This results in a higher query performance on each host. However, Cassandra implicitly scales well in terms of horizontal scale. In general, we recommend scaling horizontally first. Then, scale vertically to mitigate specific issues.
Note: 3.3M IOPS is observed with 100% random read with a 4-KB block size on Amazon Linux.
AWS instance types
Compute optimized, C5
Being able to choose between different instance types is an advantage in terms of CPU, memory, etc., for horizontal and vertical scaling.
Basic building blocks are available from AWS.
Amazon EBS offers distinct advantage here. It is small engineering effort to establish a backup/restore strategy.
a) In case of an instance failure, the EBS volumes from the failing instance are attached to a new instance.
b) In case of an EBS volume failure, the data is restored by creating a new EBS volume from last snapshot.
EBS volumes offer higher resiliency, and IOPs can be configured based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. EBS volumes can support up to 32K IOPS per volume and up to 80K IOPS per instance in RAID configuration. They have an annualized failure rate (AFR) of 0.1–0.2%, which makes EBS volumes 20 times more reliable than typical commodity disk drives.
The primary advantage of using Amazon EBS in a Cassandra deployment is that it reduces data-transfer traffic significantly when a node fails or must be replaced. The replacement node joins the cluster much faster. However, Amazon EBS could be more expensive, depending on your data storage needs.
Cassandra has built-in fault tolerance by replicating data to partitions across a configurable number of nodes. It can not only withstand node failures but if a node fails, it can also recover by copying data from other replicas into a new node. Depending on your application, this could mean copying tens of gigabytes of data. This adds additional delay to the recovery process, increases network traffic, and could possibly impact the performance of the Cassandra cluster during recovery.
Data stored on Amazon EBS is persisted in case of an instance failure or termination. The node’s data stored on an EBS volume remains intact and the EBS volume can be mounted to a new EC2 instance. Most of the replicated data for the replacement node is already available in the EBS volume and won’t need to be copied over the network from another node. Only the changes made after the original node failed need to be transferred across the network. That makes this process much faster.
EBS volumes are snapshotted periodically. So, if a volume fails, a new volume can be created from the last known good snapshot and be attached to a new instance. This is faster than creating a new volume and coping all the data to it.
Most Cassandra deployments use a replication factor of three. However, Amazon EBS does its own replication under the covers for fault tolerance. In practice, EBS volumes are about 20 times more reliable than typical disk drives. So, it is possible to go with a replication factor of two. This not only saves cost, but also enables deployments in a region that has two Availability Zones.
EBS volumes are recommended in case of read-heavy, small clusters (fewer nodes) that require storage of a large amount of data. Keep in mind that the Amazon EBS provisioned IOPS could get expensive. General purpose EBS volumes work best when sized for required performance.
If your cluster is expected to receive high read/write traffic, select an instance type that offers 10–Gb/s performance. As an example, i3.8xlarge and c5.9xlarge both offer 10–Gb/s networking performance. A smaller instance type in the same family leads to a relatively lower networking throughput.
Cassandra generates a universal unique identifier (UUID) for each node based on IP address for the instance. This UUID is used for distributing vnodes on the ring.
In the case of an AWS deployment, IP addresses are assigned automatically to the instance when an EC2 instance is created. With the new IP address, the data distribution changes and the whole ring has to be rebalanced. This is not desirable.
To preserve the assigned IP address, use a secondary elastic network interface with a fixed IP address. Before swapping an EC2 instance with a new one, detach the secondary network interface from the old instance and attach it to the new one. This way, the UUID remains same and there is no change in the way that data is distributed in the cluster.
If you are deploying in more than one region, you can connect the two VPCs in two regions using cross-region VPC peering.
High availability and resiliency
Cassandra is designed to be fault-tolerant and highly available during multiple node failures. In the patterns described earlier in this post, you deploy Cassandra to three Availability Zones with a replication factor of three. Even though it limits the AWS Region choices to the Regions with three or more Availability Zones, it offers protection for the cases of one-zone failure and network partitioning within a single Region. The multi-Region deployments described earlier in this post protect when many of the resources in a Region are experiencing intermittent failure.
Resiliency is ensured through infrastructure automation. The deployment patterns all require a quick replacement of the failing nodes. In the case of a regionwide failure, when you deploy with the multi-Region option, traffic can be directed to the other active Region while the infrastructure is recovering in the failing Region. In the case of unforeseen data corruption, the standby cluster can be restored with point-in-time backups stored in Amazon S3.
In this section, we look at ways to ensure that your Cassandra cluster is healthy:
Backup and restore
Cassandra is horizontally scaled by adding more instances to the ring. We recommend doubling the number of nodes in a cluster to scale up in one scale operation. This leaves the data homogeneously distributed across Availability Zones. Similarly, when scaling down, it’s best to halve the number of instances to keep the data homogeneously distributed.
Cassandra is vertically scaled by increasing the compute power of each node. Larger instance types have proportionally bigger memory. Use deployment automation to swap instances for bigger instances without downtime or data loss.
All three types of upgrades (Cassandra, operating system patching, and instance type changes) follow the same rolling upgrade pattern.
In this process, you start with a new EC2 instance and install software and patches on it. Thereafter, remove one node from the ring. For more information, see Cassandra cluster Rolling upgrade. Then, you detach the secondary network interface from one of the EC2 instances in the ring and attach it to the new EC2 instance. Restart the Cassandra service and wait for it to sync. Repeat this process for all nodes in the cluster.
Backup and restore
Your backup and restore strategy is dependent on the type of storage used in the deployment. Cassandra supports snapshots and incremental backups. When using instance store, a file-based backup tool works best. Customers use rsync or other third-party products to copy data backups from the instance to long-term storage. For more information, see Backing up and restoring data in the DataStax documentation. This process has to be repeated for all instances in the cluster for a complete backup. These backup files are copied back to new instances to restore. We recommend using S3 to durably store backup files for long-term storage.
For Amazon EBS based deployments, you can enable automated snapshots of EBS volumes to back up volumes. New EBS volumes can be easily created from these snapshots for restoration.
We recommend that you think about security in all aspects of deployment. The first step is to ensure that the data is encrypted at rest and in transit. The second step is to restrict access to unauthorized users. For more information about security, see the Cassandra documentation.
Encryption at rest
Encryption at rest can be achieved by using EBS volumes with encryption enabled. Amazon EBS uses AWS KMS for encryption. For more information, see Amazon EBS Encryption.
Instance store–based deployments require using an encrypted file system or an AWS partner solution. If you are using DataStax Enterprise, it supports transparent data encryption.
Encryption in transit
Cassandra uses Transport Layer Security (TLS) for client and internode communications.
The security mechanism is pluggable, which means that you can easily swap out one authentication method for another. You can also provide your own method of authenticating to Cassandra, such as a Kerberos ticket, or if you want to store passwords in a different location, such as an LDAP directory.
The authorizer that’s plugged in by default is org.apache.cassandra.auth.Allow AllAuthorizer. Cassandra also provides a role-based access control (RBAC) capability, which allows you to create roles and assign permissions to these roles.
In this post, we discussed several patterns for running Cassandra in the AWS Cloud. This post describes how you can manage Cassandra databases running on Amazon EC2. AWS also provides managed offerings for a number of databases. To learn more, see Purpose-built databases for all your..
We have been busy adding new features and capabilities to Amazon Redshift, and we wanted to give you a glimpse of what we’ve been doing over the past year. In this article, we recap a few of our enhancements and provide a set of resources that you can use to learn more and get the most out of your Amazon Redshift implementation.
In 2017, we made more than 30 announcements about Amazon Redshift. We listened to you, our customers, and delivered Redshift Spectrum, a feature of Amazon Redshift, that gives you the ability to extend analytics to your data lake—without moving data. We launched new DC2 nodes, doubling performance at the same price. We also announced many new features that provide greater scalability, better performance, more automation, and easier ways to manage your analytics workloads.
To see a full list of our launches, visit our what’s new page—and be sure to subscribe to our RSS feed.
Major launches in 2017
Amazon Redshift Spectrum—extend analytics to your data lake, without moving data
We launched Amazon Redshift Spectrum to give you the freedom to store data in Amazon S3, in open file formats, and have it available for analytics without the need to load it into your Amazon Redshift cluster. It enables you to easily join datasets across Redshift clusters and S3 to provide unique insights that you would not be able to obtain by querying independent data silos.
With Redshift Spectrum, you can run SQL queries against data in an Amazon S3 data lake as easily as you analyze data stored in Amazon Redshift. And you can do it without loading data or resizing the Amazon Redshift cluster based on growing data volumes. Redshift Spectrum separates compute and storage to meet workload demands for data size, concurrency, and performance. Redshift Spectrum scales processing across thousands of nodes, so results are fast, even with massive datasets and complex queries. You can query open file formats that you already use—such as Apache Avro, CSV, Grok, ORC, Apache Parquet, RCFile, RegexSerDe, SequenceFile, TextFile, and TSV—directly in Amazon S3, without any data movement.
“For complex queries, Redshift Spectrum provided a 67 percent performance gain,” said Rafi Ton, CEO, NUVIAD. “Using the Parquet data format, Redshift Spectrum delivered an 80 percent performance improvement. For us, this was substantial.”
DC2 nodes—twice the performance of DC1 at the same price
We launched second-generation Dense Compute (DC2) nodes to provide low latency and high throughput for demanding data warehousing workloads. DC2 nodes feature powerful Intel E5-2686 v4 (Broadwell) CPUs, fast DDR4 memory, and NVMe-based solid state disks (SSDs). We’ve tuned Amazon Redshift to take advantage of the better CPU, network, and disk on DC2 nodes, providing up to twice the performance of DC1 at the same price. Our DC2.8xlarge instances now provide twice the memory per slice of data and an optimized storage layout with 30 percent better storage utilization.
“Redshift allows us to quickly spin up clusters and provide our data scientists with a fast and easy method to access data and generate insights,” said Bradley Todd, technology architect at Liberty Mutual. “We saw a 9x reduction in month-end reporting time with Redshift DC2 nodes as compared to DC1.”
On average, our customers are seeing 3x to 5x performance gains for most of their critical workloads.
We introduced short query acceleration to speed up execution of queries such as reports, dashboards, and interactive analysis. Short query acceleration uses machine learning to predict the execution time of a query, and to move short running queries to an express short query queue for faster processing.
We launched results caching to deliver sub-second response times for queries that are repeated, such as dashboards, visualizations, and those from BI tools. Results caching has an added benefit of freeing up resources to improve the performance of all other queries.
We also introduced late materialization to reduce the amount of data scanned for queries with predicate filters by batching and factoring in the filtering of predicates before fetching data blocks in the next column. For example, if only 10 percent of the table rows satisfy the predicate filters, Amazon Redshift can potentially save 90 percent of the I/O for the remaining columns to improve query performance.
We launched query monitoring rules and pre-defined rule templates. These features make it easier for you to set metrics-based performance boundaries for workload management (WLM) queries, and specify what action to take when a query goes beyond those boundaries. For example, for a queue that’s dedicated to short-running queries, you might create a rule that aborts queries that run for more than 60 seconds. To track poorly designed queries, you might have another rule that logs queries that contain nested loops.
Amazon Redshift and Redshift Spectrum serve customers across a variety of industries and sizes, from startups to large enterprises. Visit our customer page to see the success that customers are having with our recent enhancements. Learn how companies like Liberty Mutual Insurance saw a 9x reduction in month-end reporting time using DC2 nodes. On this page, you can find case studies, videos, and other content that show how our customers are using Amazon Redshift to drive innovation and business results.
In addition, check out these resources to learn about the success our customers are having building out a data warehouse and data lake integration solution with Amazon Redshift:
You can enhance your Amazon Redshift data warehouse by working with industry-leading experts. Our AWS Partner Network (APN) Partners have certified their solutions to work with Amazon Redshift. They offer software, tools, integration, and consulting services to help you at every step. Visit our Amazon Redshift Partner page and choose an APN Partner. Or, use AWS Marketplace to find and immediately start using third-party software.
To see what our Partners are saying about Amazon Redshift Spectrum and our DC2 nodes mentioned earlier, read these blog posts:
If you are evaluating or considering a proof of concept with Amazon Redshift, or you need assistance migrating your on-premises or other cloud-based data warehouse to Amazon Redshift, our team of product experts and solutions architects can help you with architecting, sizing, and optimizing your data warehouse. Contact us using this support request form, and let us know how we can assist you.
If you are an Amazon Redshift customer, we offer a no-cost health check program. Our team of database engineers and solutions architects give you recommendations for optimizing Amazon Redshift and Amazon Redshift Spectrum for your specific workloads. To learn more, email us at email@example.com.
Larry Heathcote is a Principle Product Marketing Manager at Amazon Web Services for data warehousing and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys family time, home projects, grilling out and the taste of classic barbeque.
This is a customer post by Ajay Rathod, a Staff Data Engineer at Realtor.com.
Realtor.com, in their own words: Realtor.com®, operated by Move, Inc., is a trusted resource for home buyers, sellers, and dreamers. It offers the most comprehensive database of for-sale properties, among competing national sites, and the information, tools, and professional expertise to help people move confidently through every step of their home journey.
Move, Inc. processes hundreds of terabytes of data partitioned by day and hour. Various teams run hundreds of queries on this data. Using AWS services, Move, Inc. has built an infrastructure for gathering and analyzing data:
To increase the effectiveness of the storage and subsequent querying, the data is converted into a Parquet format, and stored again in S3.
Amazon Athena is used as the SQL (Structured Query Language) engine to query the data in S3. Athena is easy to use and is often quickly adopted by various teams.
Teams visualize query results in Amazon QuickSight. Amazon QuickSight is a business analytics service that allows you to quickly and easily visualize data and collaborate with other users in your account.
This architecture is known as the data platform and is shared by the data science, data engineering, and the data operations teams within the organization. Move, Inc. also enables other cross-functional teams to use Athena. When many users use Athena, it helps to monitor its usage to ensure cost-effectiveness. This leads to a strong need for Athena metrics that can give details about the following:
Amount of data scanned (to monitor the cost of AWS service usage)
The databases used for queries
Actual queries that teams run
Currently, the Move, Inc. team does not have an easy way of obtaining all these metrics from a single tool. Having a way to do this would greatly simplify monitoring efforts. For example, the data operations team wants to collect several metrics every day obtained from queries run on Athena for their data. They require the following metrics:
Amount of data scanned by each user
Number of queries by each user
Databases accessed by each user
In this post, I discuss how to build a solution for monitoring Athena usage. To build this solution, you rely on AWS CloudTrail. CloudTrail is a web service that records AWS API calls for your AWS account and delivers log files to an S3 bucket.
Here is the high-level overview:
Use the CloudTrail API to audit the user queries, and then use Athena to create a table from the CloudTrail logs.
Query the Athena API with the AWS CLI to gather metrics about the data scanned by the user queries and put this information into another table in Athena.
Combine the information from these two sources by joining the two tables.
Use the resulting data to analyze, build insights, and create a dashboard that shows the usage of Athena by users within different teams in the organization.
The architecture of this solution is shown in the following diagram.
Step 1: Create a table in Athena for data in CloudTrail
The CloudTrail API records all Athena queries run by different teams within the organization. These logs are saved in S3. The fields of most interest are:
Start time of the API call
Source IP address
Response elements returned by the service
You can use the following CREATE TABLE statement to create the cloudtrail_logs table in Athena. For more information, see Querying CloudTrail Logs in the Athena documentation.
Step 2: Create a table in Amazon Athena for data from API output
Athena provides an API that can be queried to obtain information of a specific query ID. It also provides an API to obtain information of a batch of query IDs, with a batch size of up to 50 query IDs.
You can use this API call to obtain information about the Athena queries that you are interested in and store this information in an S3 location. Create an Athena table to represent this data in S3. For the purpose of this post, the response fields that are of interest are as follows:
The CREATE TABLE statement for athena_api_output, is as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS athena_api_output(
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://<s3 location of the output from the API calls>'
You can inspect the query IDs and user information for the last day. The query is as follows:
with data AS (
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
WHERE dt > date_add('day',-1,now() )
Step 3: Obtain Query Statistics from Athena API
You can write a simple Python script to loop through queries in batches of 50 and query the Athena API for query statistics. You can use the Boto library for these lookups. Boto is a library that provides you with an easy way to interact with and automate your AWS development. The response from the Boto API can be parsed to extract the fields that you need as described in Step 2.
An example Python script is available in the AthenaMetrics GitHub repo.
Format these fields, for each query ID, as CSV strings and store them for the entire batch response in an S3 bucket. This S3 bucket is represented by the table created in Step 2, cloudtrail_logs.
In your Python code, create a variable named sql_query, and assign it a string representing the SQL query defined in Step 2. The s3_query_folder is the location in S3 that is used by Athena for storing results of the query. The code is as follows:
with data AS (
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
WHERE dt > date_add('day',-1,now() )
athena_client = boto3.client('athena')
query_execution = self.client.start_query_execution(
query_execution_id = query_execution['QueryExecutionId']
### Allow query to complete, check for status response["QueryExecution"]["Status"]["State"]
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
if response[“QueryExecution”][“Status”][“State”] == “SUCCEEDED”:
results = athena_client.get_query_results(QueryEecutionId=query_exection_id)
You can iterate through the results in the response object and consolidate them in batches of 50 results. For each batch, you can invoke the Athena API, batch-get-query-execution.
Store the output in the S3 location pointed to by the CREATE TABLE definition for the table athena_api_output, in Step 2. The SQL statement above returns only queries run in the last 24 hours. You may want to increase that to get usage over a longer period of time. The code snippet for this API call is as follows:
The batchqueryids value is an array of 50 query IDs extracted from the result set of the SELECT query. This script creates the data needed by your second table, athena_api_output, and you are now ready to join both tables in Athena.
Step 4: Join the CloudTrail and Athena API data
Now that the two tables are available with the data that you need, you can run the following Athena query to look at the usage by user. You can limit the output of this query to the most recent five days.
json_extract(c.responseelements, '$.queryExecutionId') qid,
FROM cloudtrail_logs c
JOIN athena_api_output a
ON cast(json_extract(c.responseelements, '$.queryExecutionId') as varchar) = a.queryid
WHERE eventsource = 'athena.amazonaws.com'
AND eventname = 'StartQueryExecution'
AND from_iso8601_timestamp(eventtime) > date_add('day',-5 ,now() )
Step 5: Analyze and visualize the results
In this step, using QuickSight, you can create a dashboard that shows the following metrics:
Average amount of data scanned (MB) by a user and database
Using the solution described in this post, you can continuously monitor the usage of Athena by various teams. Taking this a step further, you can automate and set user limits for how much data the Athena users in your team can query within a given period of time. You may also choose to add notifications when the usage by a particular user crosses a specified threshold. This helps you manage costs incurred by different teams in your organization.
Realtor.com would like to acknowledge the tremendous support and guidance provided by Hemant Borole, Senior Consultant, Big Data & Analytics with AWS Professional Services in helping to author this post.
Ajay Rathod is Staff Data Engineer at Realtor.com. With a deep background in AWS Cloud Platform and Data Infrastructure, Ajay leads the Data Engineering and automation aspect of Data Operations at Realtor.com. He has designed and deployed many ETL pipelines and workflows for the Realtor Data Analytics Platform using AWS services like Data Pipeline, Athena, Batch, Glue and Boto3. He has created various operational metrics to monitor ETL Pipelines and Resource Usage.
Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.
Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.
In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.
With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.
Handling data feeds
I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.
To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.
Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.
Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.
This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.
Working with Amazon Athena and Amazon Redshift for analysis
I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.
For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.
To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.
Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).
In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.
Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.
One of the challenges faced by our customers—especially those in highly regulated industries—is balancing the need for security with flexibility. In this post, we cover how to enable multi-tenancy and increase security by using EMRFS (EMR File System) authorization, the Amazon S3 storage-level authorization on Amazon EMR.
Amazon EMR is an easy, fast, and scalable analytics platform enabling large-scale data processing. EMRFS authorization provides Amazon S3 storage-level authorization by configuring EMRFS with multiple IAM roles. With this functionality enabled, different users and groups can share the same cluster and assume their own IAM roles respectively.
Simply put, on Amazon EMR, we can now have an Amazon EC2 role per user assumed at run time instead of one general EC2 role at the cluster level. When the user is trying to access Amazon S3 resources, Amazon EMR evaluates against a predefined mappings list in EMRFS configurations and picks up the right role for the user.
In this post, we will discuss what EMRFS authorization is (Amazon S3 storage-level access control) and show how to configure the role mappings with detailed examples. You will then have the desired permissions in a multi-tenant environment. We also demo Amazon S3 access from HDFS command line, Apache Hive on Hue, and Apache Spark.
EMRFS authorization for Amazon S3
There are two prerequisites for using this feature:
Users must be authenticated, because EMRFS needs to map the current user/group/prefix to a predefined user/group/prefix. There are several authentication options. In this post, we launch a Kerberos-enabled cluster that manages the Key Distribution Center (KDC) on the master node, and enable a one-way trust from the KDC to a Microsoft Active Directory domain.
The application must support accessing Amazon S3 via Applications that have their own S3FileSystem APIs (for example, Presto) are not supported at this time.
EMRFS supports three types of mapping entries: user, group, and Amazon S3 prefix. Let’s use an example to show how this works.
Assume that you have the following three identities in your organization, and they are defined in the Active Directory:
To enable all these groups and users to share the EMR cluster, you need to define the following IAM roles:
In this case, you create a separate Amazon EC2 role that doesn’t give any permission to Amazon S3. Let’s call the role the base role (the EC2 role attached to the EMR cluster), which in this example is named EMR_EC2_RestrictedRole. Then, you define all the Amazon S3 permissions for each specific user or group in their own roles. The restricted role serves as the fallback role when the user doesn’t belong to any user/group, nor does the user try to access any listed Amazon S3 prefixes defined on the list.
Important: For all other roles, like emrfs_auth_group_role_data_eng, you need to add the base role (EMR_EC2_RestrictedRole) as the trusted entity so that it can assume other roles. See the following example:
This role grants all Amazon S3 permissions to the emrfs-auth-data-science-bucket-demo bucket and all the objects in it. Similarly, the policy for the role emrfs_auth_group_role_data_eng is shown below:
To configure EMRFS authorization, you use EMR security configuration. Here is the configuration we use in this post
Consider the following scenario.
First, the admin user admin1 tries to log in and run a command to access Amazon S3 data through EMRFS. The first role emrfs_auth_user_role_admin_user on the mapping list, which is a user role, is mapped and picked up. Then admin1 has access to the Amazon S3 locations that are defined in this role.
Then a user from the data engineer group (grp_data_engineering) tries to access a data bucket to run some jobs. When EMRFS sees that the user is a member of the grp_data_engineering group, the group role emrfs_auth_group_role_data_eng is assumed, and the user has proper access to Amazon S3 that is defined in the emrfs_auth_group_role_data_eng role.
Next, the third user comes, who is not an admin and doesn’t belong to any of the groups. After failing evaluation of the top three entries, EMRFS evaluates whether the user is trying to access a certain Amazon S3 prefix defined in the last mapping entry. This type of mapping entry is called the prefix type. If the user is trying to access s3://emrfs-auth-default-bucket-demo/, then the prefix mapping is in effect, and the prefix role emrfs_auth_prefix_role_default_s3_prefix is assumed.
If the user is not trying to access any of the Amazon S3 paths that are defined on the list—which means it failed the evaluation of all the entries—it only has the permissions defined in the EMR_EC2RestrictedRole. This role is assumed by the EC2 instances in the cluster.
In this process, all the mappings defined are evaluated in the defined order, and the first role that is mapped is assumed, and the rest of the list is skipped.
Setting up an EMR cluster and mapping Active Directory users and groups
Now that we know how EMRFS authorization role mapping works, the next thing we need to think about is how we can use this feature in an easy and manageable way.
Active Directory setup
Many customers manage their users and groups using Microsoft Active Directory or other tools like OpenLDAP. In this post, we create the Active Directory on an Amazon EC2 instance running Windows Server and create the users and groups we will be using in the example below. After setting up Active Directory, we use the Amazon EMR Kerberos auto-join capability to establish a one-way trust from the KDC running on the EMR master node to the Active Directory domain on the EC2 instance. You can use your own directory services as long as it talks to the LDAP (Lightweight Directory Access Protocol).
After configuring Active Directory, you can create all the users and groups using the Active Directory tools and add users to appropriate groups. In this example, we created users like admin1, dataeng1, datascientist1, grp_data_engineering, and grp_data_science, and then add the users to the right groups.
Join the EMR cluster to an Active Directory domain
For clusters with Kerberos, Amazon EMR now supports automated Active Directory domain joins. You can use the security configuration to configure the one-way trust from the KDC to the Active Directory domain. You also configure the EMRFS role mappings in the same security configuration.
The following is an example of the EMR security configuration with a trusted Active Directory domain EMRKRB.TEST.COM and the EMRFS role mappings as we discussed earlier:
The EMRFS role mapping configuration is shown in this example:
We will also provide an example AWS CLI command that you can run.
Launching the EMR cluster and running the tests
Now you have configured Kerberos and EMRFS authorization for Amazon S3.
Additionally, you need to configure Hue with Active Directory using the Amazon EMR configuration API in order to log in using the AD users created before. The following is an example of Hue AD configuration.
Note: In the preceding configuration JSON file, change the values as required before pasting it into the software setting section in the Amazon EMR console.
Now let’s use this configuration and the security configuration you created before to launch the cluster.
In the Amazon EMR console, choose Create cluster. Then choose Go to advanced options. On the Step1: Software and Steps page, under Edit software settings (optional), paste the configuration in the box.
The rest of the setup is the same as an ordinary cluster setup, except in the Security Options section. In Step 4: Security, under Permissions, choose Custom, and then choose the RestrictedRole that you created before.
Choose the appropriate subnets (these should meet the base requirement in order for a successful Active Directory join—see the Amazon EMR Management Guide for more details), and choose the appropriate security groups to make sure it talks to the Active Directory. Choose a key so that you can log in and configure the cluster.
Most importantly, choose the security configuration that you created earlier to enable Kerberos and EMRFS authorization for Amazon S3.
You can use the following AWS CLI command to create a cluster.
It successfully returns the listing results. Next we will test Apache Hive and then Apache Spark.
To run jobs successfully, you need to create a home directory for every user in HDFS for staging data under /user/<username>. Users can configure a step to create a home directory at cluster launch time for every user who has access to the cluster. In this example, you use Hue since Hue will create the home directory in HDFS for the user at the first login. Here Hue also needs to be integrated with the same Active Directory as explained in the example configuration described earlier.
First, log in to Hue as a data engineer user, and open a Hive Notebook in Hue. Then run a query to create a new table pointing to the data engineer bucket, s3://emrfs-auth-data-engineering-bucket-demo/table1_data_eng/.
You can see that the table was created successfully. Now try to create another table pointing to the data science group’s bucket, where the data engineer group doesn’t have access.
It failed and threw an Amazon S3 Access Denied error.
Now insert one line of data into the successfully create table.
Next, log out, switch to a data science group user, and create another table, test2_datasci_tb.
The creation is successful.
The last task is to test Spark (it requires the user directory, but Hue created one in the previous step).
Now let’s come back to the command line and run some Spark commands.
Login to the master node using the datascientist1 user:
Start the SparkSQL interactive shell by typing spark-sql, and run the show tables command. It should list the tables that you created using Hive.
As a data science group user, try select on both tables. You will find that you can only select the table defined in the location that your group has access to.
EMRFS authorization for Amazon S3 enables you to have multiple roles on the same cluster, providing flexibility to configure a shared cluster for different teams to achieve better efficiency. The Active Directory integration and group mapping make it much easier for you to manage your users and groups, and provides better auditability in a multi-tenant environment.
Amazon EMR enables data analysts and scientists to deploy a cluster running popular frameworks such as Spark, HBase, Presto, and Flink of any size in minutes. When you launch a cluster, Amazon EMR automatically configures the underlying Amazon EC2 instances with the frameworks and applications that you choose for your cluster. This can include popular web interfaces such as Hue workbench, Zeppelin notebook, and Ganglia monitoring dashboards and tools.
These web interfaces are hosted on the EMR master node and must be accessed using the public DNS name of the master node (master public DNS value). The master public DNS value is dynamically created, not very user friendly and is hard to remember— it looks something like ip-###-###-###-###.us-west-2.compute.internal. Not having a friendly URL to connect to the popular workbench or notebook interfaces may impact the workflow and hinder your gained agility.
Some customers have addressed this challenge through custom bootstrap actions, steps, or external scripts that periodically check for new clusters and register a friendlier name in DNS. These approaches either put additional burden on the data practitioners or require additional resources to execute the scripts. In addition, there is typically some lag time associated with such scripts. They often don’t do a great job cleaning up the DNS records after the cluster has terminated, potentially resulting in a security risk.
The solution in this post provides an automated, serverless approach to registering a friendly master node name for easy access to the web interfaces.
Before I dive deeper, I review these key services and how they are part of this solution.
CloudWatch Events delivers a near real-time stream of system events that describe changes in AWS resources. Using simple rules, you can match events and route them to one or more target functions or streams. An event can be generated in one of four ways:
From an AWS service when resources change state
From API calls that are delivered via AWS CloudTrail
From your own code that can generate application-level events
In this solution, I cover the first type of event, which is automatically emitted by EMR when the cluster state changes. Based on the state of this event, either create or update the DNS record in Route 53 when the cluster state changes to STARTING, or delete the DNS record when the cluster is no longer needed and the state changes to TERMINATED. For more information about all EMR event details, see Monitor CloudWatch Events.
Route 53 private hosted zones
A private hosted zone is a container that holds information about how to route traffic for a domain and its subdomains within one or more VPCs. Private hosted zones enable you to use custom DNS names for your internal resources without exposing the names or IP addresses to the internet.
Route 53 supports resource record sets with a wide range of record types. In this solution, you use a CNAME record that is used to specify a domain name as an alias for another domain (the ‘canonical’ domain). You use a friendly name of the cluster as the CNAME for the EMR master public DNS value.
Lambda is a compute service that lets you run code without provisioning or managing servers. Lambda executes your code only when needed and scales automatically to thousands of requests per second. Lambda takes care of high availability, and server and OS maintenance and patching. You pay only for the consumed compute time. There is no charge when your code is not running.
Lambda provides the ability to invoke your code in response to events, such as when an object is put to an Amazon S3 bucket or as in this case, when a CloudWatch event is emitted. As part of this solution, you deploy a Lambda function as a target that is invoked by CloudWatch Events when the event matches your rule. You also configure the necessary permissions based on the Lambda permissions model, including a Lambda function policy and Lambda execution role.
Putting it all together
Now that you have all of the pieces, you can put together a complete solution. The following diagram illustrates how the solution works:
Start with a user activity such as launching or terminating an EMR cluster.
EMR automatically sends events to the CloudWatch Events stream.
A CloudWatch Events rule matches the specified event, and routes it to a target, which in this case is a Lambda function. In this case, you are using the EMR Cluster State Change
The Lambda function performs the following key steps:
Get the clusterId value from the event detail and use it to call EMR. DescribeCluster API to retrieve the following data points:
MasterPublicDnsName – public DNS name of the master node
Locate the tag containing the friendly name to use as the CNAME for the cluster. The key name containing the friendly name should be The value should be specified as host.domain.com, where domain is the private hosted zone in which to update the DNS record.
Update DNS based on the state in the event detail.
If the state is STARTING, the function calls the Route 53 API to create or update a resource record set in the private hosted zone specified by the domain tag. This is a CNAME record mapped to MasterPublicDnsName.
Conversely, if the state is TERMINATED, the function calls the Route 53 API to delete the associated resource record set from the private hosted zone.
Deploying the solution
Because all of the components of this solution are serverless, use the AWS Serverless Application Model (AWS SAM) template to deploy the solution. AWS SAM is natively supported by AWS CloudFormation and provides a simplified syntax for expressing serverless resources, resulting in fewer lines of code.
Overview of the SAM template
For this solution, the SAM template has 76 lines of text as compared to 142 lines without SAM resources (and writing the template in YAML would be even slightly smaller). The solution can be deployed using the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS SAM Local.
CloudFormation transforms help simplify template authoring by condensing a multiple-line resource declaration into a single line in your template. To inform CloudFormation that your template defines a serverless application, add a line under the template format version as follows:
Before SAM, you would use the AWS::Lambda::Function resource type to define your Lambda function. You would then need a resource to define the permissions for the function (AWS::Lambda::Permission), another resource to define a Lambda execution role (AWS::IAM::Role), and finally a CloudWatch Events resource (Events::Rule) that triggers this function.
With SAM, you need to define just a single resource for your function, AWS::Serverless::Function. Using this single resource type, you can define everything that you need, including function properties such as function handler, runtime, and code URI, as well as the required IAM policies and the CloudWatch event.
A few additional things to note in the code example:
CodeUri – Before you can deploy a SAM template, first upload your Lambda function code zip to S3. You can do this manually or use the aws cloudformation package CLI command to automate the task of uploading local artifacts to a S3 bucket, as shown later.
Lambda execution role and permissions – You are not specifying a Lambda execution role in the template. Rather, you are providing the required permissions as IAM policy documents. When the template is submitted, CloudFormation expands the AWS::Serverless::Function resource, declaring a Lambda function and an execution role. The created role has two attached policies: a default AWSLambdaBasicExecutionRole and the inline policy specified in the template.
CloudWatch Events rule – Instead of specifying a CloudWatch Events resource type, you are defining an event source object as a property of the function itself. When the template is submitted, CloudFormation expands this into a CloudWatch Events rule resource and automatically creates the Lambda resource-based permissions to allow the CloudWatch Events rule to trigger the function.
NOTE: If you are trying this solution outside of us-east-1, then you should download the necessary files, upload them to the buckets in your region, edit the script as appropriate and then run it or use the CLI deployment method below.
3.) Choose Next.
4.) On the Specify Details page, keep or modify the stack name and choose Next.
5.) On the Options page, choose Next.
6.) On the Review page, take the following steps:
Acknowledge the two Transform access capabilities. This allows the CloudFormation transform to create the required IAM resources with custom names.
Under Transforms, choose Create Change Set.
Wait a few seconds for the change set to be created before proceeding. The change set should look as follows:
7.) Choose Execute to deploy the template.
After the template is deployed, you should see four resources created:
You should see the following output after the stack has been successfully created:
Waiting for changeset to be created...
Waiting for stack create/update to complete
Successfully created/updated stack – EmrDnsSetterCli
To test the solution, launch an EMR cluster. The Lambda function looks for the cluster_name tag associated with the EMR cluster. Make sure to specify the friendly name of your cluster as host.domain.com where the domain is the private hosted zone in which to create the CNAME record.
Here is a sample CLI command to launch a cluster within a specific subnet in a VPC with the required tag cluster_name.
After the cluster is launched, log in to the Route 53 console. In the left navigation pane, choose Hosted Zones to view the list of private and public zones currently configured in Route 53. Select the hosted zone that you specified in the ZONE tag when you launched the cluster. Verify that the resource records were created.
You can also monitor the CloudWatch Events metrics that are published to CloudWatch every minute, such as the number of TriggeredRules and Invocations.
Now that you’ve verified that the Lambda function successfully updated the Route 53 resource records in the zone file, terminate the EMR cluster and verify that the records are removed by the same function.
This solution provides a serverless approach to automatically assigning a friendly name for your EMR cluster for easy access to popular notebooks and other web interfaces. CloudWatch Events also supports cross-account event delivery, so if you are running EMR clusters in multiple AWS accounts, all cluster state events across accounts can be consolidated into a single account.
I hope that this solution provides a small glimpse into the power of CloudWatch Events and Lambda and how they can be leveraged with EMR and other AWS big data services. For example, by using the EMR step state change event, you can chain various pieces of your analytics pipeline. You may have a transient cluster perform data ingest and, when the task successfully completes, spin up an ETL cluster for transformation and upload to Amazon Redshift. The possibilities are truly endless.
An ETL (Extract, Transform, Load) process enables you to load data from source systems into your data warehouse. This is typically executed as a batch or near-real-time ingest process to keep the data warehouse current and provide up-to-date analytical data to end users.
Amazon Redshift is a fast, petabyte-scale data warehouse that enables you easily to make data-driven decisions. With Amazon Redshift, you can get insights into your big data in a cost-effective fashion using standard SQL. You can set up any type of data model, from star and snowflake schemas, to simple de-normalized tables for running any analytical queries.
To operate a robust ETL platform and deliver data to Amazon Redshift in a timely manner, design your ETL processes to take account of Amazon Redshift’s architecture. When migrating from a legacy data warehouse to Amazon Redshift, it is tempting to adopt a lift-and-shift approach, but this can result in performance and scale issues long term. This post guides you through the following best practices for ensuring optimal, consistent runtimes for your ETL processes:
COPY data from multiple, evenly sized files.
Use workload management to improve ETL runtimes.
Perform table maintenance regularly.
Perform multiple steps in a single transaction.
Loading data in bulk.
Use UNLOAD to extract large result sets.
Use Amazon Redshift Spectrum for ad hoc ETL processing.
Monitor daily ETL health using diagnostic queries.
1. COPY data from multiple, evenly sized files
Amazon Redshift is an MPP (massively parallel processing) database, where all the compute nodes divide and parallelize the work of ingesting data. Each node is further subdivided into slices, with each slice having one or more dedicated cores, equally dividing the processing capacity. The number of slices per node depends on the node type of the cluster. For example, each DS2.XLARGE compute node has two slices, whereas each DS2.8XLARGE compute node has 16 slices.
When you load data into Amazon Redshift, you should aim to have each slice do an equal amount of work. When you load the data from a single large file or from files split into uneven sizes, some slices do more work than others. As a result, the process runs only as fast as the slowest, or most heavily loaded, slice. In the example shown below, a single large file is loaded into a two-node cluster, resulting in only one of the nodes, “Compute-0”, performing all the data ingestion:
When splitting your data files, ensure that they are of approximately equal size – between 1 MB and 1 GB after compression. The number of files should be a multiple of the number of slices in your cluster. Also, I strongly recommend that you individually compress the load files using gzip, lzop, or bzip2 to efficiently load large datasets.
When loading multiple files into a single table, use a single COPY command for the table, rather than multiple COPY commands. Amazon Redshift automatically parallelizes the data ingestion. Using a single COPY command to bulk load data into a table ensures optimal use of cluster resources, and quickest possible throughput.
2. Use workload management to improve ETL runtimes
Use Amazon Redshift’s workload management (WLM) to define multiple queues dedicated to different workloads (for example, ETL versus reporting) and to manage the runtimes of queries. As you migrate more workloads into Amazon Redshift, your ETL runtimes can become inconsistent if WLM is not appropriately set up.
I recommend limiting the overall concurrency of WLM across all queues to around 15 or less. This WLM guide helps you organize and monitor the different queues for your Amazon Redshift cluster.
When managing different workloads on your Amazon Redshift cluster, consider the following for the queue setup:
Create a queue dedicated to your ETL processes. Configure this queue with a small number of slots (5 or fewer). Amazon Redshift is designed for analytics queries, rather than transaction processing. The cost of COMMIT is relatively high, and excessive use of COMMIT can result in queries waiting for access to the commit queue. Because ETL is a commit-intensive process, having a separate queue with a small number of slots helps mitigate this issue.
Claim extra memory available in a queue. When executing an ETL query, you can take advantage of the wlm_query_slot_count to claim the extra memory available in a particular queue. For example, a typical ETL process might involve COPYing raw data into a staging table so that downstream ETL jobs can run transformations that calculate daily, weekly, and monthly aggregates. To speed up the COPY process (so that the downstream tasks can start in parallel sooner), the wlm_query_slot_count can be increased for this step.
Create a separate queue for reporting queries. Configure query monitoring rules on this queue to further manage long-running and expensive queries.
Take advantage of the dynamic memory parameters. They swap the memory from your ETL to your reporting queue after the ETL job has completed.
3. Perform table maintenance regularly
Amazon Redshift is a columnar database, which enables fast transformations for aggregating data. Performing regular table maintenance ensures that transformation ETLs are predictable and performant. To get the best performance from your Amazon Redshift database, you must ensure that database tables regularly are VACUUMed and ANALYZEd. The Analyze & Vacuum schema utility helps you automate the table maintenance task and have VACUUM & ANALYZE executed in a regular fashion.
Use VACUUM to sort tables and remove deleted blocks
During a typical ETL refresh process, tables receive new incoming records using COPY, and unneeded data (cold data) is removed using DELETE. New rows are added to the unsorted region in a table. Deleted rows are simply marked for deletion.
DELETE does not automatically reclaim the space occupied by the deleted rows. Adding and removing large numbers of rows can therefore cause the unsorted region and the number of deleted blocks to grow. This can degrade the performance of queries executed against these tables.
After an ETL process completes, perform VACUUM to ensure that user queries execute in a consistent manner. The complete list of tables that need VACUUMing can be found using the Amazon Redshift Util’s table_info script.
Use the following approaches to ensure that VACCUM is completed in a timely manner:
Use wlm_query_slot_count to claim all the memory allocated in the ETL WLM queue during the VACUUM process.
DROP or TRUNCATE intermediate or staging tables, thereby eliminating the need to VACUUM them.
If your table has a compound sort key with only one sort column, try to load your data in sort key order. This helps reduce or eliminate the need to VACUUM the table.
Consider using time series This helps reduce the amount of data you need to VACUUM.
Use ANALYZE to update database statistics
Amazon Redshift uses a cost-based query planner and optimizer using statistics about tables to make good decisions about the query plan for the SQL statements. Regular statistics collection after the ETL completion ensures that user queries run fast, and that daily ETL processes are performant. The Amazon Redshift utility table_info script provides insights into the freshness of the statistics. Keeping the statistics off (pct_stats_off) less than 20% ensures effective query plans for the SQL queries.
4. Perform multiple steps in a single transaction
ETL transformation logic often spans multiple steps. Because commits in Amazon Redshift are expensive, if each ETL step performs a commit, multiple concurrent ETL processes can take a long time to execute.
To minimize the number of commits in a process, the steps in an ETL script should be surrounded by a BEGIN…END statement so that a single commit is performed only after all the transformation logic has been executed. For example, here is an example multi-step ETL script that performs one commit at the end:
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source (transformation logic);
DELETE FROM daily_table WHERE dataset_date =?;
INSERT INTO daily_table SELECT .. FROM staging_table (daily aggregate);
DELETE FROM weekly_table WHERE weekending_date=?;
INSERT INTO weekly_table SELECT .. FROM staging_table(weekly aggregate);
5. Loading data in bulk
Amazon Redshift is designed to store and query petabyte-scale datasets. Using Amazon S3 you can stage and accumulate data from multiple source systems before executing a bulk COPY operation. The following methods allow efficient and fast transfer of these bulk datasets into Amazon Redshift:
Use temporary staging tables to hold the data for transformation. These tables are automatically dropped after the ETL session is complete. Temporary tables can be created using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. Explicitly specifying the CREATE TEMPORARY TABLE statement allows you to control the DISTRIBUTION KEY, SORT KEY, and compression settings to further improve performance.
User ALTER table APPEND to swap data from the staging tables to the target table. Data in the source table is moved to matching columns in the target table. Column order doesn’t matter. After data is successfully appended to the target table, the source table is empty. ALTER TABLE APPEND is much faster than a similar CREATE TABLE AS or INSERT INTO operation because it doesn’t involve copying or moving data.
6. Use UNLOAD to extract large result sets
Fetching a large number of rows using SELECT is expensive and takes a long time. When a large amount of data is fetched from the Amazon Redshift cluster, the leader node has to hold the data temporarily until the fetches are complete. Further, data is streamed out sequentially, which results in longer elapsed time. As a result, the leader node can become hot, which not only affects the SELECT that is being executed, but also throttles resources for creating execution plans and managing the overall cluster resources. Here is an example of a large SELECT statement. Notice that the leader node is doing most of the work to stream out the rows:
Use UNLOAD to extract large results sets directly to S3. After it’s in S3, the data can be shared with multiple downstream systems. By default, UNLOAD writes data in parallel to multiple files according to the number of slices in the cluster. All the compute nodes participate to quickly offload the data into S3.
If you are extracting data for use with Amazon Redshift Spectrum, you should make use of the MAXFILESIZE parameter to and keep files are 150 MB. Similar to item 1 above, having many evenly sized files ensures that Redshift Spectrum can do the maximum amount of work in parallel.
7. Use Redshift Spectrum for ad hoc ETL processing
Events such as data backfill, promotional activity, and special calendar days can trigger additional data volumes that affect the data refresh times in your Amazon Redshift cluster. To help address these spikes in data volumes and throughput, I recommend staging data in S3. After data is organized in S3, Redshift Spectrum enables you to query it directly using standard SQL. In this way, you gain the benefits of additional capacity without having to resize your cluster.
8. Monitor daily ETL health using diagnostic queries
Monitoring the health of your ETL processes on a regular basis helps identify the early onset of performance issues before they have a significant impact on your cluster. The following monitoring scripts can be used to provide insights into the health of your ETL processes:
Analyze the top transformation SQL and use EXPLAIN to find opportunities for tuning the query plan.
There are several other useful scripts available in the amazon-redshift-utils repository. The AWS Lambda Utility Runner runs a subset of these scripts on a scheduled basis, allowing you to automate much of monitoring of your ETL processes.
Example ETL process
The following ETL process reinforces some of the best practices discussed in this post. Consider the following four-step daily ETL workflow where data from an RDBMS source system is staged in S3 and then loaded into Amazon Redshift. Amazon Redshift is used to calculate daily, weekly, and monthly aggregations, which are then unloaded to S3, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.
Step 1: Extract from the RDBMS source to a S3 bucket
In this ETL process, the data extract job fetches change data every 1 hour and it is staged into multiple hourly files. For example, the staged S3 folder looks like the following:
Organizing the data into multiple, evenly sized files enables the COPY command to ingest this data using all available resources in the Amazon Redshift cluster. Further, the files are compressed (gzipped) to further reduce COPY times.
Step 2: Stage data to the Amazon Redshift table for cleansing
Ingesting the data can be accomplished using a JSON-based manifest file. Using the manifest file ensures that S3 eventual consistency issues can be eliminated and also provides an opportunity to dedupe any files if needed. A sample manifest20170702.json file looks like the following:
The data can be ingested using the following command:
SET wlm_query_slot_count TO <<max available concurrency in the ETL queue>>;
COPY stage_tbl FROM 's3:// <<S3 Bucket>>/batch/manifest20170702.json' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' manifest;
Because the downstream ETL processes depend on this COPY command to complete, the wlm_query_slot_count is used to claim all the memory available to the queue. This helps the COPY command complete as quickly as possible.
Step 3: Transform data to create daily, weekly, and monthly datasets and load into target tables
Data is staged in the “stage_tbl” from where it can be transformed into the daily, weekly, and monthly aggregates and loaded into target tables. The following job illustrates a typical weekly process:
INSERT into ETL_LOG (..) values (..);
DELETE from weekly_tbl where dataset_week = <<current week>>;
INSERT into weekly_tbl (..)
SELECT date_trunc('week', dataset_day) AS week_begin_dataset_date, SUM(C1) AS C1, SUM(C2) AS C2
GROUP BY date_trunc('week', dataset_day);
INSERT into AUDIT_LOG values (..);
As shown above, multiple steps are combined into one transaction to perform a single commit, reducing contention on the commit queue.
Step 4: Unload the daily dataset to populate the S3 data lake bucket
The transformed results are now unloaded into another S3 bucket, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.
unload ('SELECT * FROM weekly_tbl WHERE dataset_week = <<current week>>’) TO 's3:// <<S3 Bucket>>/datalake/weekly/20170526/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';
Amazon Redshift lets you easily operate petabyte-scale data warehouses on the cloud. This post summarized the best practices for operating scalable ETL natively within Amazon Redshift. I demonstrated efficient ways to ingest and transform data, along with close monitoring. I also demonstrated the best practices being used in a typical sample ETL workload to transform the data into Amazon Redshift.
If you have questions or suggestions, please comment below.
About the Author
Read Full Article
Read for later
Articles marked as Favorite are saved for later viewing.
Scroll to Top
Separate tags by commas
To access this feature, please upgrade your account.