Loading...

Follow Cloudera on Feedspot

Continue with Google
Continue with Facebook
or

Valid

Apache Kafka has become an essential component of enterprise data pipelines and is used for tracking clickstream event data, collecting logs, gathering metrics, and being the enterprise data bus in a microservices based architectures. Kafka is essentially a highly available and highly scalable distributed log of all the messages flowing in an enterprise data pipeline. Kafka supports internal replication to support data availability within a cluster. However, enterprises require that the data availability and durability guarantees span entire cluster and site failures.

The solution, thus far, in the Apache Kafka community was to use MirrorMaker, an external utility, that helped replicate the data between two Kafka clusters within or across data centers. Mirrormaker is essentially a Kafka high-level consumer and producer pair, efficiently moving data from the source cluster to the destination cluster and not offering much else. The initial use case that Mirrormaker was designed for was to move data from clusters to an aggregate cluster within a data center or to another data center to feed batch or streaming analytics pipelines. Enterprises have a much broader set of  use cases and requirements on replication guarantees.

Multiple vendors and Internet service companies have their own proprietary solutions (Brooklin Mirrormaker from Linkedin, Mirus from Salesforce, uReplicator from Uber, Confluent Replicator from Confluent) for cross-cluster replication that points to the need for the community Apache Kafka to have an enterprise ready cross-cluster replication solution too.

Typical Mirrormaker Use Cases

There are many uses cases why data in one Kafka cluster needs to be replicated to another cluster. Some of the common ones are:

Aggregation for Analytics

A common use case is to aggregate data from multiple streaming pipelines possibly across multiple data centers to run batch analytics jobs that provide a holistic view across the enterprise, for example, a completeness check that all customer requests had been processed..

Data Deployment after Analytics

This is the opposite of the aggregation use case in which the data generated by the analytics application  in one cluster (say the aggregate cluster) is broadcast  to multiple clusters possibly across data centers for end user consumption.

Isolation

Sometimes access to data in a production environment is restricted for performance or security reasons and data is replicated between different environments to isolate access.  In many deployments the ingestion cluster is isolated from the consumption clusters.

Disaster Recovery

One of the most common enterprise use cases for cross-cluster replication is for guaranteeing business continuity in the presence of cluster or data center-wide outages. This would require application and the producers and consumers of the Kafka cluster to failover to the replica cluster.

Geo Proximity

In geographically distributed access patterns where low latency is required, replication is used to move data closer to the access location.

Cloud Migration

As more enterprises have an on prem and cloud presence Kafka replication can be used to migrate data to the public or private cloud and back.

Legal and Compliance

Much like the isolation uses case, a policy driven replication is used to limit what data is accessible in a cluster to meet legal and compliance requirements.

Limitations of MirrorMaker v1

Mirromaker is widely deployed in production but has serious limitations for enterprises looking for a flexible, high performing and resilient mirroring pipeline. Here are some of the concerns:

Static Whitelists and Blacklists

To control what topics get replicated between the source and destination cluster Mirrormaker uses whitelists and blacklists with regular expressions  or explicit topic listings. But these are statically configured. Mostly when new topics are created that match the whitelist the new topic gets created at the destination and the replication happens automatically. However, when the whitelist itself has to be updated, it requires mirrormaker instances to be bounced. Restarting mirrormaker each time the list changes creates backlogs in the replication pipeline causing operational pain points.

No Syncing of Topic Properties

Using MMv1, a new or existing topic at the source cluster is automatically created at the destination cluster either directly by the Kafka broker, if auto.create.topics is enabled, or by Mirrormaker enhancements directly using the Kafka admin client API. The problem happens with the configuration of the topic at the destination. MMv1 does not promise the topic properties from the source will be maintained as it relies on the cluster defaults at the destination. Say a topic A had a partition count of 10 on the source cluster and the destination cluster default was 8, the topic A will get created on the destination with 8 partitions. If an application was relying on message ordering within a partition to be carried over after replication then all hell breaks loose. Similarly, the replication factor could be different on the destination cluster changing the availability guarantees of the replicated data. Even if the initial topic configuration was duplicated by an admin, any dynamic changes to the topic properties are not going to be automatically reflected. These differences become an operational nightmare very quickly.

Manual Topic Naming to avoid Cycles

By default, MirrorMaker creates a topic on the destination cluster with the same name as that on the source cluster. This works fine if the replication was a simple unidirectional pipeline between a source and destination cluster. A bidirectional active-active setup where all topics in cluster A are replicated to cluster B and vice versa would create an infinite loop which MirrorMaker cannot prevent without explicit naming conventions to break the cycle. Typically the cluster name is added in each topic name as a prefix with a blacklist to not replicate topics that had the same prefix as the destination cluster.  In large enterprises with multiple clusters in multiple data centers it is easy to create a loop in the pipeline if care is not taken to set the naming conventions correctly.

Scalability and Throughput Limitations due to Rebalances

Internally, MirrorMaker uses the high-level consumer to fetch data from the source cluster where the partitions are assigned to the consumers within a consumer group via a group coordinator (or earlier via Zookeeper). Each time there is a change in topics, say when a new topic is created or an old topic is deleted, or a partition count is changed, or when MirrorMaker itself is bounced for a software upgrade, it triggers a consumer rebalance which stalls the mirroring process and creates a backlog in the pipeline and increases the end to end latency observed by the downstream application. Such constant hiccups violate any latency driven SLAs that a service dependent on mirrored pipeline needs to offer.

Lack of Monitoring and Operational Support

Mirrormaker provides minimal monitoring and management functions to configure, launch and monitor the state of the pipeline and has no ability to trigger alerts when there is a problem. Most enterprises require more than just the basic scripts to start and stop a replication pipeline.

No Disaster Recovery Support

A common enterprise requirement is to maintain service availability in the event of a catastrophic failure such as the loss of the entire cluster or an entire data center. Ideally in such an event, the consumers and producers reading and writing to a cluster should seamlessly failover to the destination cluster and failback when the source cluster comes back up. Mirrormaker doesn’t support this seamless switch due to a fundamental limitation in offset management. The offsets of a topic in the source cluster and the offset of the replica topic can be completely different based on the point in the topic lifetime the replication began. Thus the committed offsets in the consumer offsets topic are tracking a completely different location at the source than at the destination. If the consumers make a switch to the destination cluster they cannot simply use the value of the last committed offset at the source to continue.  One approach to deal with this offset mismatch is to rely on timestamps (assuming time is relatively in sync across clusters). But timestamps get messy too and we will discuss that at length in the next blog in the series, “A look inside MirrorMaker 2.

Lack of Exactly Once Guarantees

Mirrormaker is not setup to utilize the support for exactly once processing semantics in Kafka and follows the default atleast once semantics provided by Kafka. Thus  duplicate messages can show up in the replicated topic especially after failures, as the produce to the replicated topic at the destination cluster and the update to the __consumer_offsets topic at the source cluster  are not executed together in one transaction to get exactly once replication. Mostly it is a problem left to the downstream application to handle duplicates correctly.

Too many MirrorMaker Clusters

Traditionally a MirrorMaker cluster is paired with the destination cluster. Thus there is a mirroring cluster for each destination cluster following a remote-consume and  local-produce pattern. For example, for 2 data centers with 8 clusters each and 8 bidirectional replication pairs there are 16 mirrormaker clusters.  For large data centers this can significantly increase the operational cost.  Ideally there should be one MirrorMaker cluster per destination data center. Thus in the above example there would be 2 Mirrormaker clusters, one in each data center.

What is coming in MirrorMaker 2

MirrorMaker 2 was designed to address the limitations of MirrorMaker 1 listed above. MM2 is based on the Kafka Connect framework and has the ability to dynamically change configurations, keep the topic properties in sync across clusters and improve performance significantly by reducing rebalances to a minimum. Moreover, handling active-active clusters and disaster recovery are use cases that MM2 supports out of the box. MM2 (KIP-382) is accepted as part of  Apache Kafka. If you’re interested in learning more, take a look at Ryanne Dolan’s talk at Kafka Summit, and standby for the next blog in this series for  “A Look inside MirrorMaker 2”.

The post Kafka Replication: The case for Mirrormaker 2.0 appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Small files are a common challenge in the Apache Hadoop world and when not handled with care, they can lead to a number of complications. The Apache Hadoop Distributed File System (HDFS) was developed to store and process large data sets over the range of terabytes and petabytes. However, HDFS stores small files inefficiently, leading to inefficient Namenode memory utilization and RPC calls, block scanning throughput degradation, and reduced application layer performance. In this blog post, we will define the issue of small file storage and examine ways to tackle it while keeping the complications at bay.
What are Small Files?

A small file is one which is significantly smaller than the default Apache Hadoop HDFS default block size (128MB by default in CDH). One should note that it is expected and inevitable to have some small files on HDFS. These are files like library jars, XML configuration files, temporary staging files, and so on. But when small files become a significant part of datasets, the problems arise. Hence, in this section, we shall discuss why it is a good goal to have a file size as close to a multiple of the HDFS block size as possible.

Hadoop’s storage and application layers are not designed to function efficiently with a large number of small files. Before we get to the implications of this, let’s review how HDFS stores files.

In HDFS, data and metadata are separate entities. Files are split into blocks that are stored and replicated on the DataNodes’ local file systems across the cluster. The HDFS namespace tree and associated metadata are maintained as objects in the NameNode’s memory (and backed up to disk), each of which occupies approximately 150 bytes, as a rule of thumb. This arrangement is described in more detail in the public documentation here.

The two scenarios below illustrate the small files issue:

Scenario 1 (1 large file of 192MiB):

Scenario 2 (192 small files, 1MiB each):

Scenario 1 has one file which is 192MB which is broken down to 2 blocks of size 128MB and 64MB. After replication, the total memory required to store the metadata of a file is = 150 bytes x (1 file inode + (No. of blocks x Replication Factor)).

According to this calculation, the total memory required to store the metadata of this file on the Namenode = 150  x (1 + (2 x 3)) = 1050 Bytes.

In contrast, scenario 2 has 192 1 MB files. These files are then replicated across the cluster. The total memory required by the Namenode to store the metadata of these files = 150 x (192 + (192 x 3)) = 115200 Bytes.

Hence, we can see that we require more than 100x memory on the Namenode heap to store the multiple small files as opposed to one big 192MB file.

Effects on the Storage Layer

When a NameNode restarts, it must load the filesystem metadata from local disk into memory. This means that if the namenode metadata is large, restarts will be slower. The NameNode must also track changes in the block locations on the cluster. Too many small files can also cause the NameNode to run out of metadata space in memory before the DataNodes run out of data space on disk. The datanodes also report block  changes to the NameNode over the network; more blocks means more changes to report over the network.

More files mean more read requests that need to be served by the NameNode, which may end up clogging NameNode’s capacity to do so. This will increase the RPC queue and processing latency, which will then lead to degraded performance and responsiveness. An overall RPC workload of close to 40K~50K RPCs/s is considered high.

Effects on Application Layer

 In general, having a large number of small files results in more disk seeks while running computations through an analytical SQL engine like Impala or an application framework like MapReduce or Spark.

MapReduce/Spark

In Hadoop, a block is the most granular unit of data on which computation can be performed. Thus, it affects the throughput of an application. In MapReduce, an individual Map task is spawned for each block that must be read. Hence, a block with very little data can degrade performance, increase Application Master bookkeeping, task scheduling, and task creation overhead since each task requires its own JVM process.

This concept is similar for Spark, in which each “map” equivalent task within an executor reads and processes one partition at a time. Each partition is one HDFS block by default. Hence, a single concurrent task can run for every partition in a Spark RDD. This means that if you have a lot of small files, each file is read in a different partition and this will cause a substantial task scheduling overhead compounded by lower throughput per CPU core.

MapReduce jobs also create 0 byte files such as _SUCCESS and _FAILURE. These files do not account for any HDFS blocks but they still register as an inode entry in the Namenode heap which uses 150 bytes each as described earlier. An easy and effective way to clear these files is by using the below HDFS command:

hdfs dfs -ls -R <path> | awk '$1 !~ /^d/ && $5 == "0" { print $8 }' | xargs -n100 hdfs dfs –rm

This will move those files to the .Trash location from where it will be cleared out automatically once the trash retention policy takes effect.

Note: This should not be done while your workloads are running on the specified path since it may cause applications to fail if they have dependencies on these files to know when the jobs complete or fail.

Impala—Effects on the Catalog Server

 Impala is a high-performance query engine, which caches the HDFS namespace information in the Catalog Server for faster metadata access. Below is an architecture diagram detailing the way the Impala catalog is maintained and distributed across the service.

As seen with complications around NameNode metadata management, a similar issue arises with the metadata that Impala needs to maintain in the Catalog Server. The catalog size is a function of the number and size of objects maintained in the Catalog Server. These objects with their estimated average memory usage are described in the table below:

Object Memory Usage
Table 5KB
Partition 2KB
Column 100B
Incremental Stats 400B* (per column per partition)
File 750B
File Block 300B

*Can go as high as 1.4KB/Column/Partition

Example: If there are 1000 tables with 200 partitions each and 10 files per partitions, the Impala Catalog Size will be at least (excluding table stats and table width):

#tables * 5KB + #partitions * 2kb + #files * 750B + #file_blocks * 300B = 5MB + 400MB + 1.5GB + 600MB = ~ <strong>2.5GB</strong>

The larger the Impala Catalog Size the higher its memory footprint. Large metadata in the HMS for Hive/Impala is not advised as it needs to keep track of more files, causing:

  • Longer Metadata loading time
  • Longer StateStore topic update time
  • Slow DDL statement operations
  • Longer query plan distribution time

In addition to the issues related to the metadata, each disk read is single threaded by default in Impala which can cause a significant overhead in I/O with small files. Further, if the table is stored in the parquet file format, each physical file needs to be opened/closed twice; that is, once for the read footer and again for the column data.

How Do Small Files Originate?

Let us discuss some of the common mistakes that may give birth to insidious small files.

Streaming Data

Data ingested incrementally and in small batches can end up creating a large number of small files over a period of time. Near-real-time requirements for streaming data, with small windows (every few minutes or hours) that do not create much data will cause this problem. Below is a typical streaming ETL ingest pipeline into HDFS.

Large Number of Mappers/Reducers

MapReduce jobs and Hive queries with large number of mappers or reducers can generate a number of files on HDFS proportional to the number of mappers (for Map-Only jobs) or reducers (for MapReduce jobs). Large number of reducers with not enough data being written to HDFS will dilute the result set to files that are small, because each reducer writes one file. Along the same lines, data skew can have a similar effect in which most of the data is routed to one or a few reducers, leaving the other reducers with little data to write, resulting in small files.

Over-Partitioned Tables

An over-partitioned table is a partitioned Hive table with a small amount of data (< 256 MB) per partition. The Hive Metastore Server (HMS) API call overhead increases with the number of partitions that a table maintains. This in return leads to deteriorated performance. In these cases, consider reviewing the partition design and reducing the partition granularity, for example from daily to monthly partitions.

Over-Parallelizing

In a Spark job, depending on the number of partitions mentioned in a write task, a new file gets written per partition. This is similar to having a new file getting created for each reduce task in the MapReduce framework. The more Spark partitions, the more files are written. Control the number of partitions to curb the generation of small files.

File Formats and Compression

Using of inefficient file formats, for example TextFile format and storing data without compression compounds the small file issue, affecting performance and scalability in different ways:

  • Reading data from very wide tables (tables with a large number of columns) stored as non-columnar formats (TextFile, SequenceFile, Avro) requires that each record be completely read from disk, even if only a few columns are required. Columnar formats, like Parquet, allow the reading of only the required columns from disk, which can significantly improve performance
  • Use of inefficient file formats, especially uncompressed ones, increases the HDFS space usage and the number of blocks that need to be tracked by the NameNode. If the files are small in size, it means the data is split into a larger number of files thereby increasing the amount of associated metadata to be stored. 
Identifying Small Files

FSImage and fsck

Because the NameNode stores all the metadata related to the files, it keeps the entire namespace image in RAM. This is the persistent record of the image stored in the NameNode’s local native filesystem – fsimage. Thus we can analyze the fsimage or the fsck output to identify paths with small files.

The fields available in the fsimage are:

Path, Replication, ModificationTime, AccessTime, PreferredBlockSize, BlocksCount, FileSize, NSQUOTA, DSQUOTA, Permission, UserName, GroupName

The fsimage can be processed in an application framework like MapReduce or Spark and even loaded into a Hive table for easy SQL access.

Another approach is using the fsck output and parsing that to load it into a Hive table for analysis. There are a few variants of this approach; here is a public project that uses PySpark and Hive to achieve this. It aggregates the total number of blocks, average block size and total file size at each HDFS path which can then be queried in Hive or Impala.

Cloudera Navigator

Cloudera Navigator is a data governance product with audit, lineage, metadata management, data stewardship and policy enforcement features.

The Navigator Search and Analytics tabs can be used to identify small files easily. The HDFS search filters in the left panel allows to filter for files under a specific size or range. The new version of Cloudera Navigator (2.14.x) even has an in-built Dashboard widget to identify small files as shown below.

Ways to Tackle Small Files Preventative Streaming Ingest Use-Case

As mentioned earlier, ingesting streaming data  usually leads to creating small files. Tweaking the rate of ingest, window, or dstream size (Spark) can help alleviate some of the issues. But usually to meet near-real-time analytics demands, some architectural changes need to be introduced in the HDFS ingestion pipeline with respect to intermediate compaction jobs, maintaining multiple landing directories, and active/passive versions of table data. This is discussed in more detail in this Cloudera Engineering blog.

For near-real-time analytical processing, HBase and Kudu are better choices for storage layers, based on the data type (unstructured vs structured), append/update frequency and data usage patterns (random reads vs aggregations).

Batch Ingest Use-Case

For batch ingest pipelines, a good choice is a regularly scheduled compaction job, which compacts files after landing into HDFS. The file compaction tools mentioned later in this blog would be good candidates for this.

Over-Partitioned Tables

We should aim to have partitions with a significant volume of data so that the files within each partition are large. While deciding on the granularity of the partitions, consider the volume of data that will be stored per partition. Plan for partitions that have large files (~256MB or larger with Parquet), even if it means having less granular partitions, such as monthly instead of daily. For example, keeping the number of partitions within 10K-30K during the lifetime of a table is a good guideline to follow.

For tables that have small data volumes (few hundred MBs), consider creating a non-partitioned table. It can be more efficient to scan all the (small) table’s data stored in a single file than having to deal with thousands of files scattered throughout multiple partitions with tiny number of bytes.

Creating buckets for your table can also reduce the number of small files by essentially fixing the number of reducers and output files generated.

Spark Over-Parallelizing

When writing data to HDFS in Spark, repartition or coalesce the partitions before writing to disk. The number of partitions defined in those statements will determine the number of output files. Checking the output of the Spark Job and verifying the number of files created and throughput achieved is highly recommended.

Prescriptive HDFS File Compaction Tools

The most obvious solution to small files is to run a file compaction job that rewrites the files into larger files in HDFS. A popular tool for this is FileCrush. There are also other public projects available such as the Spark compaction tool.

Re-Create Table in Hive

To ensure a good balance between performance and efficient storage, create tables using the PARQUET file format and ensure that data compression is enabled when writing data to them.

If you have an existing Hive table that has a large number of small files, you can re-write the table with the below configuration settings applied before re-writing:

set hive.exec.compress.output=true;

set hive.exec.parallel = true;

set parquet.compression=snappy;

set hive.merge.mapfiles=true;

set hive.merge.mapredfiles=true;

set hive.merge.smallfiles.avgsize = 134217728;        --128M

set hive.merge.size.per.task = 268435456;             --256M

set hive.optimize.sort.dynamic.partition = true;

set parquet.blocksize= 268435456;                     --256M

set dfs.block.size=268435456;                         --256M

<strong> </strong>

Note: The average size and parquet block sizes specified here are for representation purposes only and should be changed based on the application and needs. Details on the Hive configuration properties can be found on the official Apache Hive page.

There are two ways to do this:

  1. You can run a CREATE TABLE AS SELECT (CTAS) statement to create the target table, as long as the target table is not partitioned, is not external, and is not bucketed.
  2. To overcome those limitations, instead of a direct CTAS, you can run a CREATE TABLE LIKE (CTL) statement to copy the source table schema to create the target table and then use an INSERT OVERWRITE SELECT statement to load the data from the source table to the target table.
    Note: you will need to enable non-strict dynamic partition mode in Hive if the data is being inserted without a static partition name defined. This can be done by setting
    hive.exec.dynamic.partition.mode=nonstrict
    The partition column(s) must be the last column(s) in the select statement for dynamic partitions to work in this context.

Consider the following simplified example:

create external table target_tbl like source_tbl

stored as parquet

location <hdfs_path>';

set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table target_tbl partition (partition_col)

select * from source_tbl;

 Similar CTAS can be executed in Impala as well, but if the query runs with multiple fragments on different nodes you will get one file per fragment. To avoid this, you could restrict Impala to run the query on a single node using set num_nodes=1 but this approach is not recommended since it removes parallelism and causes slow inserts, degrading the performance, and could cause the daemon to run out of memory if writing a large table.

Additionally, the number of reducers can be configured directly as well using the mapred.reduce.tasks setting. The number of files created will be equal to the number of reducers used. Setting an optimal reducer value depends on the volume of the data being written.

Conclusion

Prevention is better than cure. Hence, it is critical to review application design and catch users in the act of creating small files. Having a reasonable number of small files might be acceptable, but too many of them can be detrimental to your cluster. Eventually leading to irritation, tears, and extended hours at work. Therefore, Happy Cluster, Happy Life!

Have any questions or want to connect with other users? Check out the Cloudera Community

Shashank Naik is a Senior Solutions Consultant at Cloudera.
Bhagya Gummalla is a Solutions Consultant at Cloudera.

The post Small Files, Big Foils: Addressing the Associated Metadata and Application Challenges appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 
Cloudera by Shelby Khan - 2w ago

Guest blog post written by Adir Mashiach

In this post I’ll talk about the problem of Hive tables with a lot of small partitions and files and describe my solution in details.

A little background

In my organization,  we keep a lot of our data in HDFS. Most of it is the raw data but a significant amount is the final product of many data enrichment processes. In order to manage all the data pipelines conveniently, the default partitioning method of all the Hive tables is hourly DateTime partitioning (for example: dt=’2019041316’).

My personal opinion about the decision to save so many final-product tables in the HDFS is that it’s a bad practice. But assuming those tables have to be stored in the HDFS — we need to face some issues regarding the subject of storage management, or what we call: “partition management”.

The many-small-files problem

As I’ve written in a couple of my previous posts, one of the major problems of Hadoop is the “many-small-files” problem. When we have a data process that adds a new partition to a certain table every hour, and it’s been running for more than 2 years, we need to start handling this table. There are 24 * 365 * 2 (17,520) hours in 2 years time, so we’ll be having a table with almost 20k partitions. And I shall state that the volume we’re talking about here is around 1MB per hour. Now imagine having 500 tables of that sort.

I don’t know if any of you tried to scan 20,000 partitions (i.e. files) just to read 20GB of data, but the overhead is enormous. No matter the technology: Spark, Hive, MapReduce, Impala, Presto— they all suffer extremely bad performance when there are too many small partitions. Now imagine having thousands of queries every day, scanning thousands of partitions per table.

The problem of HDFS, is that it’s simply a distributed filesystem — and nothing more. It’s not a storage layer that lets you ingest data and handles everything in the background. It’s a filesystem, plain and simple. That’s why I personally suggest you to store your final-product tables in a decent store like Apache Kudu, or an RDBMS like MySQL or PostgreSQL. But if for some reason you keep your data in the HDFS, you need to write your own storage management layer.

Partition Management

Well then, what exactly this storage management layer should do — is up to your specific problems. For instance, in our case there are 3 goals:

1. Merge partitions on selected tables

I want the “Partition Manager” to merge hourly partitions to monthly ones on a regular basis. The reason I’ve chosen monthly resolution as a merging standard is because it generates optimal-sized partitions (100mb-1gb). I don’t want one table to be partitioned monthly and the other yearly, for example, because I want to make it simple for our users (both analysts and data developers). The merging process will be described in detail later.

2. Archiving cold data

Sometimes, we want to keep certain tables’ data for years, even though old data will probably be much less used. Therefore, I want my storage management layer to “archive” partitions that are older that 2 or 3 years (depends on your use-case of course). That is done by moving the data to another version of the table with a more aggressive compression algorithm like GZIP (compared to SNAPPY in the “hot” tables).

3. Delete partitions

And of course, we might want to choose a certain threshold (most probably a time threshold) for tables that we want to delete their old data from the HDFS. That is a really basic (but necessary) feature we would want from our storage management layer.

All of those 3 features are important, but I think the first one is the trickiest, and the actual reason I started writing this post, is to explain how I think it should be done.

Partition Merging

Well first of all, I’ll have to say that the complexity of this task really depends on your situation. In our situation, the task of merging partitions on a regular basis was not simple because of the following requirements:

  1. Production tables with zero tolerance to downtime. Especially not if it’s more than a few seconds.
  2. Not losing time resolution — we found out some tables are partitioned by DT but there is no other matching time column. It means that if we are going to merge “2018041312” to “201804”, the users lose the daily and hourly time resolution on those partitions.
  3. As seamless as possible — the goal was to make the partition merging seamless to the users. We found out that in some cases it’s simply impossible with the current partitioning method (flat string DT), but in a different partitioning method, it’s quite possible. More on that later.

So now that we realize it may not be a simple problem to solve, let’s see how we solved it.

How to merge partitions in Hive tables

The process is quite simple. All the queries described here are Impala queries, but the syntax is quite similar (and sometimes identical) in other technologies like Hive, SparkSQL, Presto, etc. Another thing to remember is the setting you may need to perform before the “merge queries”, in order for them to generate optimal-sized files. For example in Impala you might want to execute:

set num_nodes=1;

  1. Perform a merge query — the following example demonstrates merging all the small partitions of April 2019 to a single monthly partition:
    INSERT OVERWRITE tbl PARTITION(dt) AS SELECT t.col1, t.col2, ..., SUBSTR(t.dt, 1, 6) AS dt FROM tbl t WHERE t.dt LIKE ‘201904%’;
  2. Drop the old partitions from the metastore (if it’s an external table, only the partition metadata will be deleted), for example:
    ALTER TABLE tbl DROP PARTITION(dt='2019040101');
  3. Delete the directory of each of the old partitions from the HDFS:

    curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE&recursive=true"
  4. If you’re using Impala, you should invalidate the table’s metadata cache:
    INVALIDATE METADATA tbl;

Pretty simple, isn’t it? well, this way we solve the first problem — no downtime, but the trade-off is that there is a window of time in which the users will see a duplicated data (until all the DROP PARTITIONS finish). We decided it’s fine.

Alright then, now instead of flat hourly DT partitions, we have flat monthly DT partitions. It’s not seamless at all and we may lose time resolution in certain tables. Let’s solve the first problem before we handle the resolution loss.

Merged partitions in a user view

Our users are divided to: “common” users, analysts, data scientists and data engineers. The data scientists and the data engineers are a small and technical group of people, so they can adapt the change quite easily. The problem is the analysts and the common users, and we have a lot of them.

Why is it hard to adapt to this simple change anyway? It’s simply 201801 instead of 2018010101. Well the reason is — if a user will query a table in the following way:

SELECT * FROM tbl WHERE dt > '2018011220' AND < '2018022015';

It will be scanning only the partition of ‘201802’. Not only that, it will also get all the month instead of just the dates he wanted. In order to have the results correctly, they will have to change the DT filtering and add another time column (“insertion_time” in the following example) filtering:

SELECT * FROM tbl WHERE (dt BETWEEN '201801' AND '201802') AND (from_timestamp(insertion_time, 'yyyyMMddHH') BETWEEN '2018011220' AND '2018022015');

But we don’t handle merge partitions for all the tables, just for the problematic ones (those which consist of many small files and that are queried often). So I can’t tell my users that from now on all of the tables are managed that way (last 2 months — hourly, and older than that — monthly). Because only some of the tables are handled like that, so they’re required to check it before they query. It’s the opposite of seamless.

For the common users, we solved the problem with a little effort: we changed the query templates in the BI systems they use to fit the new partitioning. The common user is querying our datalake through a certain BI tool. That way, the query itself is transparent to the user. I can easily change the template from:

... WHERE (dt BETWEEN from_timestamp({from}, 'yyyyMMddHH') AND from_timestamp({to}, 'yyyyMMddHH')) AND (insertion_time BETWEEN {from} and {to});

to:

... WHERE (dt BETWEEN from_timestamp({from}, 'yyyyMM') AND from_timestamp({to}, 'yyyyMMddHH')) AND (insertion_time BETWEEN {from} and {to});

Notice that I keep the {to} format hourly (yyyyMMddHH). Because if, for instance, a user want to query the last 6 months — I don’t want him to miss the last month, like the following query:

SELECT * FROM tbl WHERE dt BETWEEN '201810' AND '201904';

This query will miss all the partitions of April 2019, because they are still in an hourly format, instead I would like the query to look like this:

SELECT * FROM tbl WHERE dt BETWEEN '201810' AND '2019041519';

The nice thing is, even if the users want to query only old months, say October-December 2018, it will still work and get all the relevant monthly partitions.

Well that was actually good enough for us, because the vast majority of our users use BI tools and don’t write SQL themselves. As for the analysts who do write SQL, we decided they will have to check if a table is managed in the partition manager and to adjust their queries accordingly — of course we have to be supportive and help them adapt this methodology.

How we handle the resolution loss

That simply requires a change in the table itself: adding an “original_dt” column, and make sure the data process that populates that table is “aware” of it. Of course we need to apply the same process of changing the related query templates in the BI systems, and letting the analysts know about the change.

Nested DateTime Partitions

The easiest way to perform a seamless partition merging is when you have nested DT partitions instead of flat DT:

year=2019/month=04/day=16/hour=19

In that case, merging the small partitions while adding the omitted resolution as columns in the table, will be completely transparent to the users. For example, if I would like to merge hourly partitions to monthly ones, I’ll operate according to the following steps:

  1. Create a merged version of the original table, like so:
    CREATE EXTERNAL TABLE tbl_merged_nested (
      col1 STRING,
      col2 STRING,
      ...,
      day STRING,
      hour STRING
    ) PARTITIONED BY (
      year STRING,
      month STRING
    )
    STORED AS PARQUET;
  2. Perform a merge query:

    INSERT OVERWRITE TABLE tbl_merged_nested PARTITION(year, month)
    SELECT col1, col2, ..., day, hour, year, month FROM tbl_original_nested WHERE year='2019' AND month='04';
  3. Drop the old partitions of the original table from the metastore (and of course afterwards delete the directory from the HDFS):
    ALTER TABLE tbl_original_nested DROP PARTITION(year='2019', month='04', day='17', hour='20');
  4. Then, I can create a view that unions the 2 tables if I want to have a “hot” one and a “merged” one, and it will be completely seamless to the user who will not care if the year, month, day or hour columns are partitions or not:
    CREATE VIEW union_view AS SELECT * FROM tbl_original_nested UNION ALL SELECT * FROM tbl_merged_nested;

Therefore, the most important condition to really merge DT partitions seamlessly, is to have them nested and not flat.

Summary
  • Don’t store small, frequently-queried tables in HDFS, especially not if they consist of thousands of files. Store them in another place like an RDBMS (MySQL, PostgreSQL, etc.) or in Apache Kudu if you want to stay in the Hadoop ecosystem. Of course you’ll have to provide a solution to perform join-queries between those tables and the Hive tables, I recommend Presto (if you’re using Kudu, Impala can work too).
  • If you have to store them in HDFS, make sure to have a storage management layer (“partition manager”) that handles the partitions merging and prevent situations of tables with many small files.
  • Partition merging can be difficult if you want it to be transparent to the users. But compared to flat DT partitions, nested ones make seamless merging much easier.

I hope the post was helpful to some of you and I invite you to comment and share you thoughts about partition management in Hadoop.

Adir is a Big data architect, specializes in the Hadoop ecosystem. Experienced at designing solutions in on-premise clusters with limited resources to efficiently serve thousands of users and analysts.
Adir technical posts can also be found on https://medium.com/@adirmashiach

The post Partition Management in Hadoop appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Apache Impala supports fine-grained authorization via Apache Sentry on all of the tables it manages including Apache Kudu tables. Given Impala is a very common way to access the data stored in Kudu, this capability allows users deploying Impala and Kudu to fully secure the Kudu data in multi-tenant clusters even though Kudu does not yet have native fine-grained authorization of its own. This solution works because Kudu natively supports coarse-grained (all or nothing) authorization which enables blocking all access to Kudu directly except for the impala user and an optional whitelist of other trusted users. This post will describe how to use Apache Impala’s fine-grained authorization support along with Apache Kudu’s coarse-grained authorization to achieve a secure multi-tenant deployment.

Sample Workflow

The examples in this post enable a workflow that uses Apache Spark to ingest data directly into Kudu and Impala to run analytic queries on that data. The Spark job, run as the etl_service user, is permitted to access the Kudu data via coarse-grained authorization. Even though this gives access to all the data in Kudu, the etl_service user is only used for scheduled jobs or by an administrator. All queries on the data, from a wide array of users, will use Impala and leverage Impala’s fine-grained authorization. Impala’s GRANT statements allow you to flexibly control the privileges on the Kudu storage tables. Impala’s fine-grained privileges along with support for SELECT, INSERT, UPDATE, UPSERT, and DELETE statements, allow you to finely control who can read and write data to your Kudu tables while using Impala. Below is a diagram showing the workflow described:Note: The examples below assume that Authorization has already been configured for Kudu, Impala, and Spark. For help configuring authorization see the Cloudera authorization documentation.

Configuring Kudu’s Coarse-Grained Authorization

Kudu supports coarse-grained authorization of client requests based on the authenticated client Kerberos principal. The two levels of access which can be configured are:

  • Superuser – principals authorized as a superuser are able to perform certain administrative functionality such as using the kudu command line tool to diagnose or repair cluster issues.
  • User – principals authorized as a user are able to access and modify all data in the Kudu cluster. This includes the ability to create, drop, and alter tables as well as read, insert, update, and delete data.

Access levels are granted using whitelist-style Access Control Lists (ACLs), one for each of the two levels. Each access control list either specifies a comma-separated list of users, or may be set to * to indicate that all authenticated users are able to gain access at the specified level.

Note: The default value for the User ACL is *, which allows all users access to the cluster.

Example Configuration

The first and most important step is to remove the default ACL of * from Kudu’s –user_acl configuration. This will ensure only the users you list will have access to the Kudu cluster. Then, to allow the Impala service to access all of the data in Kudu, the Impala service user, usually impala, should be added to the Kudu –user_acl configuration. Any user that is not using Impala will also need to be added to this list. For example, an Apache Spark job might be used to load data directly into Kudu. Generally, a single user is used to run scheduled jobs of applications that do not support fine-grained authorization on their own. For this example, that user is etl_service. The full  –user_acl configuration is:

--user_acl=impala,etl_service

For more details see the Kudu authorization documentation.

Using Impala’s Fine-Grained Authorization

Follow the Impala’s authorization documentation to configure fine-grained authorization. Once configured, you can use Impala’s GRANT statements to control the privileges of Kudu tables. These fine-grained privileges can be set at the database, table and column level. Additionally you can individually control SELECT, INSERT, CREATE, ALTER, and DROP privileges.

Note: A user needs the ALL privilege in order to run DELETE, UPDATE, or UPSERT statements against a Kudu table.

Below is a brief example with a couple tables stored in Kudu:

CREATE TABLE messages

(  

  name STRING,

  time TIMESTAMP,

  message STRING,

  PRIMARY KEY(name, time)

)

PARTITION BY HASH(name) PARTITIONS 4

STORED AS KUDU;

GRANT ALL ON TABLE messages TO userA;




CREATE TABLE metrics 

(

  host STRING NOT NULL,

  metric STRING NOT NULL,

  time INT64 NOT NULL,

  value DOUBLE NOT NULL,

  PRIMARY KEY (host, metric, time)

)

PARTITION BY HASH(name) PARTITIONS 4

STORED AS KUDU;

GRANT ALL ON TABLE messages TO userB;

Conclusion

This brief example that combines Kudu’s coarse-grained authorization and Impala’s fine-grained authorization should enable you to meet the security needs of your data workflow today. The pattern described here can be applied to other services and workflows using Kudu as well. For greater authorization flexibility, you can look forward to the near future when Kudu supports native fine-grained authorization on its own. The Apache Kudu contributors understand the importance of native fine-grained authorization and they are working on integrations with Apache Sentry and Apache Ranger.

To read more about Apache Kudu, check out Transparent Hierarchical Storage Management with Apache Kudu and Impala.

Grant Henke is a Software Engineer at Cloudera

The post Fine-Grained Authorization with Apache Kudu and Impala appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Apache Spark is one of the most popular engines for distributed data processing on Big Data clusters. Spark jobs come in all shapes, sizes and cluster form factors. Ranging from 10’s to 1000’s of nodes and executors, seconds to hours or even days for job duration, megabytes to petabytes of data and simple data scans to complicated analytical workloads. Throw in a growing number of streaming workloads to huge body of batch and machine learning jobs — and we can see the significant amount of infrastructure expenditure on running Spark jobs. Costs that could be optimized by reducing wastage and improving the efficiency of Spark jobs. Another hidden but meaningful cost is developer productivity that is lost in trying to understand why Spark jobs failed or are not running within desired latency or resource requirements. There is a lot of data scattered across logs, metrics, Spark UI etc. that needs to be collected, parsed and correlated to get some insights but not every developer has the deep expertise needed for that analysis. And the sheer scale of Spark jobs, with 1000’s of tasks across 100’s of machine, can make that effort overwhelming even for experts. To validate this hypothesis, we interviewed a diverse set of our users, and indeed found that their top of mind issue was getting easy to understand and actionable visibility into their Spark jobs. So we decided to do something about it.

Our objective was to build a system that would provide an intuitive insight into Spark jobs that not just provides visibility but also codifies the best practices and deep experience we have gained after years of debugging and optimizing Spark jobs. The main design objectives were to be

  • Intuitive and easy – Big data practitioners should be able to navigate and ramp quickly
  • Concise and focused – Hide the complexity and scale but present all necessary information in a way that does not overwhelm the end user
  • Batteries included – Provide actionable recommendations for a self service experience, especially for users who are less familiar with Spark
  • Extensible – To enable additions of deep dives for the most common and difficult scenarios as we come across them

Here is a sneak preview of what we have been building. We will try to analyze a run of TPC-DS query 64 on a cloud provider and see if we can identify potential areas of improvement.

A bird’s eye view of the entire Spark application execution

When first looking at an application, we often struggle with where to begin because of the multitude of angles to look at. To help with that problem, we designed a timeline based DAG view. It tries to capture a lot of summarized information that provides a concise, yet powerful view into what happened through the lifetime of the job. The intent is to quickly identify problem areas that deserve a closer look with the concept of navigational debugging.

Spark execution primer

Let’s start with a brief refresher on how Spark runs jobs. It does that by taking the user code (Dataframe, RDD or SQL) and breaking that up into stages of computation, where a stage does a specific part of the work using multiple tasks. Stages depend on each other for input data and start after their data becomes available. After all stages finish successfully the job is completed. These stages logically produce a DAG (directed acyclic graph) of execution. E.g. a simple wordcount job is a 2 stage DAG – the first stage reads the words and the second stage counts them. However, for most Spark jobs its not easy to determine the structure of this DAG and how its stages got executed during the lifetime of the job. Being able to construct and visualize that DAG is foundational to understanding Spark jobs.

We start with the DAG view of the Spark application that shows the structure of the DAG and how it executed over time along with key metrics for scheduling and resources. The horizontal axes on all charts are aligned with each other and span the timeline of the job from its start to its end. This immediately shows which stages of the job are using the most time and how they correlate with key metrics. Above, we see that the initial stages of execution spent most of their time waiting for resources. Scanning vertically down to the scheduling stats, we see that the number of active tasks is much higher compared to the available execution cores allocated to the job. That explains the waiting time and the best way to speed up these stages would be to add more executors. The CPU metrics shows fairly good utilization of the Spark CPU cores at about 100% throughout the job and its matched closely by actual CPU occupancy showing that Spark used its allocated compute effectively. The memory metrics group shows how memory was allocated and used for various purposes (off-heap, storage, execution etc.) along the timeline of the application. We can clearly see a lot of memory being wasted because the allocation is around 168GB throughout but the utilization maxes out at 64GB. We can reduce the memory allocation and use the savings to acquire more executors, thereby improving the performance while maintaining or decreasing the spend. Flexible infra choices from cloud providers enable that choice. Thus, we see that we can quickly get a lot of actionable information from this intuitive and time correlated bird’s eye view.

Concise Stage Summary

Clicking on a stage in the DAG pops up a concise summary of the relevant details about a stage including input and output data sizes and their distributions, tasks executed and failures. The DAG edges provide quick visual cues of the magnitude and skew of data moved across them. Hint – Thicker edges mean larger data transfers. A quick look at the summary for stage-15 shows uniform data distribution while reading about 65GB of primary input and writing about 16GB of shuffle output.

Did you ever wonder which part of the query ran in which stage?

On the Apache Spark UI, the SQL tab shows what the Spark job will do overall logically and the stage view shows how the job was divided into tasks for execution. But it takes a Spark SQL expert to correlate which fragment of the SQL plan actually ran in a particular stage. We did the hard work to uncover that elusive connection for you and its available in the SQL tab for a given stage. You are welcome!

Where exactly is the skew in the job…

Data skew is one of the most common problems that frustrate Spark developers. We saw earlier how the DAG view can show large skews across the full data set. But often skews are present within partitions of a data set and they can be across the key space or the value space of the partition. We will identify the potential skewed stages for you and let you jump into a skew deep dive view. Here, we present per-partition runtimes, data, key and value distributions, all correlated by partition id on the horizontal axis. Now there is no place left for those pesky skews to hide! Below, we analyse the join stage-17 for potential issues and we can see that the join inputs are very different in overall size – 65GB vs 1GB – and the stage is doing a shuffle join. and we can see that skewed tasks have already been identified. We can analyze the stage further and observe pre-identified skewed tasks. Further, we can look at per-partition correlated metrics that clearly show that all partitions have skewed inputs with one side much larger than the other. We may conclude that this join could be significantly improved by using a broadcast strategy.

Which code occupied the most time?

Another common strategy that can help optimize Spark jobs is to understand which parts of the code occupied most of the processing time on the threads of the executors. Flame graphs are a popular way to visualize that information. But the difficulty in applying that for Spark jobs is that tasks for different stages can run across multiple executors and in fact, tasks from different stages could be running concurrently across different threads in a particular executor. And all that needs to get properly handled before an accurate flame graph can be generated to visualize how time was spent running code in a particular stage. We are happy to help do that heavy lifting so you can focus on where to optimize your code. Not only that, we pre-identify outliers in your job so you can focus on them directly. Below, in the DAG summary we can see that stage-15 spent a lot of its time running code with a significant IO overhead. It did do a lot of IO – about 65GB of reads and 16GB of writes. Analyzing stage-15 for CPU shows the aggregate flame graph with some interesting information. About 20% of the time is spent in LZO compression of the outputs which could be optimized by using a different codec. Another 35% was spent reading inputs from cloud storage. This could be for various reasons like avoidable seeks in the data access or throttling because we read too much data. These issues are worth investigating in order to improve the query performance.

What about failures?

Jobs often fail and we are left wondering how exactly they failed. Even if the job does not fail outright, it may have task or stage level failures and re-executions that can make it run slower. It turns out that our DAG timeline view provides fantastic visibility into when and where failures happened and how Spark responded to them. Take a look here at a failed execution for a different query. It almost looks like the same job ran 4 times right? We can quickly see that stage-10 failed 4 times and each time it caused the re-execution of a number of predecessor stages. Eventually after 4 attempts Spark gave up and failed the job. Correlating stage-10 with the scheduling chart shows task failures as well as a reduction in executor cores, implying executors were lost. Correlating that on the CPU chart shows high JVM GC and memory chart shows huge memory usage. Using this, we could conclude that stage-10 used a lot of memory that eventually caused executor loss or random failures in the tasks. Thus, we have identified the root cause of the failure! We can assess the cost of the re-executions by seeing that the first execution of Stage-9 ran 71 tasks while its last re-execution re-ran 24 tasks – a massive penalty.

Looking ahead

Visualizing the above data for a wide variety of jobs showed that we are able to diagnose a fairly large number of patterns of issues and optimizations around Spark jobs. The next logical step would be to encode such pattern identification into the product itself such that they are available out of the box and reduce the analysis burden on the user. E.g. such rules could be used to provide alerts or recommendations for the cases we described above. This would be particularly attractive for newer users who are less familiar with Spark and also serve as a precursor for more automated job management systems – say alerting users about GC spikes in their jobs that might cause failures down the road. In fact, adding such a system to the CI/CD pipeline for Spark jobs could help prevent problematic jobs from making it to production.

We have made our own lives easier and better supported our customers with this – and have received great feedback as we have tried to productize it all in the above form. This is just the beginning of the journey and we’ve just scratched the surface of how Spark workloads can be effectively managed and optimized – thereby improving developer productivity and reducing infrastructure costs.

Stay up to date and learn more about Spark workloads with Workload XM.

Kudos to the team effort by Arun Iyer, Bikas Saha, Marco Gaido, Mohammed Shahbaz Hussain, Mridul Murlidharan, Prabhjyot Singh, Renjith Kamath, Sameer Shaikh, Shane Marotical, Subhrajit Das, Supreeth Sharma and many others who chipped in with code, critique, ideas and support.

The post Demystifying Spark Jobs to Optimize for Cost and Performance appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

This was originally published on the Fast Forward Labs blog

We are excited to release Learning with Limited Labeled Data, the latest report and prototype from Cloudera Fast Forward Labs.

Being able to learn with limited labeled data relaxes the stringent labeled data requirement for supervised machine learning. Our report focuses on active learning, a technique that relies on collaboration between machines and humans to label smartly.

Active learning makes it possible to build applications using a small set of labeled data, and enables enterprises to leverage their large pools of unlabeled data. In this blog post, we explore how active learning works. (For a higher level introduction, please see our previous blogpost.

The active learning loop

Active learning takes advantage of the collaboration between humans and machines to smartly select a small subset of datapoints for which to obtain labels. It is an iterative process, and ideally access is available to some initial labels to start. These initial labels allow a human to build a baseline machine learning model, and use it to predict outputs for all the unlabeled datapoints. The model then looks through all its predictions, flags the one with which it has the most difficulty, and requests a label for it. A human steps in to provide the label, and the newly labeled data is combined with the initial labeled data to improve the model. Model performance is recorded, and the process repeats.

The active learning loop

How to select datapoints

At the heart of active learning is a machine (learner) that requests labels for datapoints that it finds particularly hard to predict. The learner follows a strategy, and uses it to identify these datapoints. To evaluate the effectiveness of the strategy, a simple approach for choosing datapoints needs to be defined. A good starting point is to remove the intelligence of the learner; the datapoints are chosen independently of what the learner thinks.

Random sampling

When we take the learner out of the picture, what is left is a pool of unlabeled data and some labeled data from which a model can be built. To improve the model, the only reasonable option is to randomly start labeling more data. This strategy is known as random sampling, and selects unlabeled datapoints from the pool according to no particular criteria. You can think of it as being akin to picking a card from the top of a shuffled deck, then reshuffling the deck without the previously chosen card and repeating the action. Because the learner does not help with the selection process, random sampling is also known as passive learning.

Random sampling is like picking the top card from a shuffled deck

Uncertainty sampling

A slightly more complex strategy is to select datapoints that the model is uncertain about. In uncertainty sampling, the learner looks at all unlabeled datapoints and surfaces the ones about which it is uncertain. Labels are then provided by a human, and fed back into the model to refine it.

But how do we quantify uncertainty? One way is to use the distance between the datapoint and the decision boundary. Datapoints far away from the decision boundary are safe from changes in the decision boundary. This implies that the model has high certainty in these classifications. Datapoints close to the boundary, however, can easily be affected by small changes in the boundary. The model (learner) is not certain about them; a slight shift in the decision boundary will cause them to be classified differently. The margin sampling strategy therefore dictates that we surface the datapoint closest to the boundary and obtain a label for it.

There are many other selection strategies that can be used with active learning. Our report explores some of them in detail.

When to stop

Because active learning is an iterative process, when should we stop? Each label comes with a cost of acquisition – the amount of money and time it takes to acquire the label. With this cost in mind, the stopping criteria can either be static or dynamic. A static criteria sets a budget limit or performance target in the beginning. A dynamic criteria looks at the incremental gain in performance over each round of active learning and stops when it is no longer worthwhile to acquire more labels (the incremental performance plateaus).

Stopping criteria for active learning

Does it work for deep learning?

Deep learning introduces a couple of wrinkles that make direct application of active learning ineffective. The most obvious issue is that adding a single labeled datapoint does not have much impact on deep learning models, which train on batches of data. In addition, because the models need to be retrained until convergence after each point is added, this can become an expensive undertaking – especially when viewed in terms of the performance improvement vs. acquisition cost (time and money) trade-off. One straightforward solution is to select a very large subset of datapoints to label. But depending on the type of heuristics used, this could result in correlated datapoints. Obtaining labels for these datapoints is not ideal – datapoints that are independent and diverse are much more effective at capturing the relationship between input and output.

The second problem is that existing criteria used to help select datapoints do not translate to deep learning easily. Some require computation that does not scale to models with high-dimensional parameters. These approaches are rendered impossible with deep learning. For the criteria that are computationally viable, reinterpretation under the light of deep learning is necessary.

In our report, we take the idea of uncertainty and examine it in the context of deep learning.

Practical considerations

Active learning sounds tempting – with this approach, it is possible to build applications previously constrained by lack of labeled data. But active learning is not a silver bullet.

Choosing a learner and a strategy

Active learning relies on a small subset of labeled data at the beginning to choose both the learner and strategy. The learner is used to make predictions for all the unlabeled data and the strategy selects the datapoints that are difficult. Choosing a learner (or model) for any machine learning problem is difficult, but it is made even more difficult with active learning for two reasons. First, the choice of a learner needs to be made very early on when we only have a small subset of labeled data. Second, the learner is not just used to make predictions, it is used in conjunction with the strategy to surface datapoints that will help refine itself. This tight feedback loop amplifies the effect of a wrong learner.

In addition, some selection strategies result in a labeled dataset that is biased. Margin sampling, for example, surfaces datapoints right around the decision boundary to be labeled. Most datapoints far from the boundary might not even be used in building the model, resulting in a labeled dataset that may not be representative of the entire pool of unlabeled data.

Human biases

Because a human needs to step in to provide labels, this restricts the type of use cases to which active learning can be applied. Humans can label images and annotate text, but we cannot tell if a financial transaction is fraudulent just by looking at the data.

In addition, the data that requires human labeling is by definition more difficult. Under these circumstances, it is easy for a human to inject his own bias and judgement when making labeling decisions.

A pause between iterations

When applying active learning in real life, surfaced datapoints will need to be sent to a human for labeling. The next round of active learning cannot proceed until the newly labeled datapoints are ready.

The length of time between each active learning iteration varies depending on who provides the label. In a research scenario, a data scientist who builds the model and also creates labels will be able to iterate through each round of active learning quickly. In a production scenario, an outsourced labeling team will need more time for data exchange and label (knowledge) transfer to occur.

For active learning to be successful, the pause between iterations should be as small as practically possible. In addition to considering different types of labeling workforce, an efficient pipeline needs to be set up. This pipeline should include a platform for exchanging unlabeled datapoints, a user interface for creating labels, and a platform for transferring the labeled datapoints.

Active Learner

Visit the Active Learner prototype we built to accompany this report. 

Conclusion

Active learning makes it possible to build machine learning models with a small set of labeled data. It offers one way for enterprises to leverage their large pools of unlabeled data for building new products, but it is not the only solution to learning with limited labeled data.

Our report goes into much more detail (including strategies specific to deep learning, resources and recommendations for setting up an active learning production environment, and technical and ethical implications). Join our webinar to learn more, explore the prototype and get in touch if you are interested in accessing the full report (which is available by subscription to our research and advising services).

The post A Guide to Learning with Limited Labeled Data appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Cloudera Altus Director helps you deploy, scale, and manage Cloudera clusters on AWS, Microsoft Azure, or Google Cloud Platform. Altus Director both enables and enforces the best practices of big data deployments and cloud infrastructure. Altus Director’s enterprise-grade features deliver a mechanism for establishing production-ready clusters in the cloud for big data workloads and applications in a simple, reliable, automated fashion. In this post, you will learn about new functionality and changes in release 6.2.

Cloudera Altus Director Overview

For those new to Altus Director, let’s revisit what it does.

  • On-demand creation and termination of clusters: Altus Director can allocate and configure Cloudera Manager instances and highly available CDH clusters in various cloud providers. A single Altus Director instance can manage multiple cloud provider environments as well as the separate lifecycles of multiple Cloudera Manager instances and clusters.
  • Multi-cloud support: Altus Director supports creating clusters in Amazon Web Services (AWS), Microsoft Azure, and Google Cloud Platform (GCP) through its cloud provider plugin architecture. A single Altus Director instance can work with multiple cloud providers at once. Because the plugin specification is open source, you can create a plugin to support other providers, either in-house or public.
  • On-demand grow and shrink of clusters: One of the main benefits of running CDH clusters in the cloud is being able to provision additional instances when demand increases, and to terminate instances when demand decreases. Altus Director, in concert with Cloudera Manager, does the work required to add new instances to and remove existing ones from your CDH clusters.
  • Programmatic and repeatable instantiation of clusters: Altus Director can consume cluster definitions specified in HOCON configuration files submitted through the Altus Director CLI or in JSON or HOCON input sent to the Altus Director API. A cluster definition can include custom scripts to run after instance provisioning and cluster setup, or before cluster termination, to perform tasks like installing additional packages, configuring system settings, or saving important data.
  • Long-running cluster support: Long-running clusters often require actions like upgrading CDH and Cloudera Manager, changing the topology of the cluster, and reconfiguring the cluster. Altus Director supports such modifications when using Cloudera Manager 5.11 and above.
  • Usage-based billing for Cloudera services: Usage-based billing can help you optimize your expenditures for transient clusters. With a pay-as-you-go billing ID from Cloudera, you can use your Cloudera Enterprise license as usual, but you are only charged for CDH services when they are running.
  • Security: Altus Director, like other Cloudera offerings, is committed to enabling secure deployments and applications. Altus Director’s own database is automatically encrypted, and Altus Director helps you configure Cloudera Manager and CDH clusters with Kerberos authentication, TLS for on-the-wire privacy, and Cloudera Navigator for auditing, data lineage, and data discovery.
  • Powerful web user interface: Altus Director’s user interface provides a single dashboard to assess the health of all your clusters across all cloud providers and all Cloudera Manager deployments. It can also be used to bootstrap new clusters, grow and shrink existing clusters, and terminate clusters that are no longer needed. Exploring the web user interface is a great stepping stone to using the configuration file or API to deploy production-ready clusters.

Changes in Cloudera Altus Director 6.2 Automatic Instance Groups

Director now supports launching instances in an EC2 Auto Scaling Group (ASG) or Azure Virtual Machine Scale Set (VMSS). These instances typically launch more reliably, and in the case of Azure, often support new features earlier than traditional Virtual Machine (VM) instances. Director does not yet support external changes to the size of the ASG or VMSS (e.g. through the cloud provider portal), so continue to use Director’s grow/shrink cluster updates if you need to change the size of such an instance group. Please refer to the Cloudera Director documentation for more information.

Improved Azure functionality Azure MySQL Database Support

Azure MySQL databases can now be provisioned through Cloudera Altus Director and utilized by Cloudera Manager and other various cluster services such as the Hive Metastore, Sentry, Cloudera Manager itself for its database, and so on. Please refer to the Cloudera Director documentation for more information.

New Azure Instance Types

Azure’s Fsv2-series,  Dsv3-series and Esv3-series VM sizes are now supported.

Proxy support

Cloudera Director can now provision Azure instances through an HTTP proxy.

Cloudera Manager Configuration Sync

Cloudera Altus Director has expanded how it monitors Cloudera Manager configurations and synchronizes them with Altus Director’s internal state. The values of configurations for Cloudera Manager and the management service and roles are copied back into the deployment template, so the refreshed values will be present in any subsequent export of the deployment (direct or in the deployment section of a cluster export). Sensitive values are stored as “REDACTED” so they are not exposed via the UI or in the exported configuration.

Cloudera Manager License Update

In addition to monitoring Cloudera Manager configurations, Altus Director will monitor Cloudera Manager for license changes and reflect those as well. This will help new and prospective usage-based billing users. Users who need to update their existing Cloudera Manager license will be able to do so without interruption, and users will be able to add PAYG billing on clusters that were initially set up with a trial license and later migrated to a full license.

Using Cloudera Altus Director

For more information on what’s new in this release, check out the Altus Director 6.2 section of the New Features and Changes in Altus Director page in the Altus Director documentation. If you’re ready to give the latest version of Altus Director a try, here are the ways you can get started.

Send questions or feedback to the Altus Director community forum.

Michael Wilson is a software engineer at Cloudera.

The post What’s New in Cloudera Altus Director 6.2? appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Self-service exploratory analytics is one of the most common use cases we see by our customers running on Cloudera’s Data Warehouse solution.

With the recent release of Cloudera 6.2, we continue to improve the end user query experience with Hue, focusing on easier SQL query troubleshooting and increased compatibility with Hive. Read on to learn more and try it out in one-click at demo.gethue.com.

Easier SelfService Query Troubleshooting

Hue has great assistance for finding tables in the Data Catalog and getting recommendations on how to write (better) queries with the smart autocomplete, providing popular values and notifying of dangerous operations. When executing queries, however, it might be difficult to understand why they would be slow.

A new feature in 6.2 introduces a prettier display of the SQL Query Profile, which helps understand why/where the query bottlenecks are and how to optimize the query.

Example of how query plan displayed in the previous release:And what it looks like now:


Note that in addition to the much simpler visualization, tips are provided when available:


Please read more about this feature in this complete self-troubleshooting scenario.

Additionally, one of the most requested fixes was implemented: releasing query resources after the query has finished and they are no longer needed.

First, on the Apache Impala side, the query execution status will properly say if the query is actively running (“processing” data) or just “open but finished” (meaning just “keeping” the results but not using resources). In addition, the new parameter NUM_ROWS_PRODUCED_LIMIT will even notify Impala to truncate any query execution as soon as this maximum number of result rows has been returned. This will release resources early on large SELECT operations where only the first few rows are actually displayed (which is the primary use case in Hue).

Better compatibility with Hive in HDP

Apache Hive has typically been very innovative in the Hortonworks distribution. In upstream the support for Hive on Tez and Hive LLAP was improved. Now:

  • The jobs will show up in Job Browser
  • The query ID is printed
  • The progress is displayed

You can read into more details in the Hue and Hive 3 integration improvements post.

Note that currently Hue is not officially supported in HDP. However, if you want to experiment, you can learn how to configure Hue in HDP and set it up on your own, or get help from Cloudera Professional Services to do it for you.

Misc Improvements

More than 80 bugs were fixed to improve the supportability and stability of Hue. The full list is in the release notes but here are the top ones:

  • HUE-7474 [core] Add ability to enable/disable Hue data/file “download” options globally
  • HUE-7128 [core] Apply config ENABLE_DOWNLOAD to search dashboard
  • HUE-8680 [core] Fill in Impalad WEBUI username passwords automatically
  • HUE-8585 [useradmin] Bubbling up errors for Add Sync Ldap Users
  • HUE-8690 [backend] Fix Hue allows unsigned SAML assertions
  • HUE-8140 [editor] Improve multi-statement execution
  • HUE-8662 [core] Fix missing static URLs

In addition, the Hue Docker image was simplified, so that it is easier to quickly get started and play/test the latest features.

Last but not least, the upstream and downstream documentation just got the first pass of a revamp, with a better table of contents, restyling, and updated instructions. In particular, on the upstream docs, reporting issues or sending a suggestion is one click away via GitHub, so feel free to send some pull requests!

Thank you to everybody using the product and who contributed to this release. Now off to the next one!

If you have any questions or feedback, feel free to comment here, on the community forum, or via @gethue!

The post What’s new in the Hue Data Warehouse Editor in Cloudera 6.2 appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Although the Kudu server is written in C++ for performance and efficiency, developers can write client applications in C++, Java, or Python. To make it easier for Java developers to create reliable client applications, we’ve added new utilities in Kudu 1.9.0 that allow you to write tests using a Kudu cluster without needing to build Kudu yourself, without any knowledge of C++, and without any complicated coordination around starting and stopping Kudu clusters for each test. This post describes how the new testing utilities work and how you can use them in your application tests.

User Guide

Note: It is possible this blog post could become outdated – for the latest documentation on using the JVM testing utilities see the Kudu documentation.

Requirements

In order to use the new testing utilities, the following requirements must be met:

  • OS
    • macOS El Capitan (10.11) or later
    • CentOS 6.6+, Ubuntu 14.04+, or another recent distribution of Linux supported by Kudu
  • JVM
    • Java 8+
    • Note: Java 7+ is deprecated, but still supported
  • Build Tool
Build Configuration

In order to use the Kudu testing utilities, add two dependencies to your classpath:

  • The kudu-test-utils dependency
  • The kudu-binary dependency

The kudu-test-utils dependency has useful utilities for testing applications that use Kudu. Primarily, it provides the KuduTestHarness class to manage the lifecycle of a Kudu cluster for each test. The KuduTestHarness is a JUnit TestRule that not only starts and stops a Kudu cluster for each test, but also has methods to manage the cluster and get pre-configured KuduClient instances for use while testing.

The kudu-binary dependency contains the native Kudu (server and command-line tool) binaries for the specified operating system. In order to download the right artifact for the running operating system it is easiest to use a plugin, such as the os-maven-plugin or osdetector-gradle-plugin, to detect the current runtime environment. The  KuduTestHarness will automatically find and use the kudu-binary jar on the classpath.

WARNING: The kudu-binary module should only be used to run Kudu for integration testing purposes. It should never be used to run an actual Kudu service, in production or development, because the kudu-binary module includes native security-related dependencies that have been copied from the build system and will not be patched when the operating system on the runtime host is patched.

Maven Configuration

If you are using Maven to build your project, add the following entries to your project’s pom.xml file:

<build>
  <extensions>
    <!-- Used to find the right kudu-binary artifact with the Maven
         property ${os.detected.classifier} -->
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.6.2</version>
    </extension>
  </extensions>
</build>

<dependencies>
  <dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-test-utils</artifactId>
    <version>1.9.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-binary</artifactId>
    <version>1.9.0</version>
    <classifier>${os.detected.classifier}</classifier>
    <scope>test</scope>
  </dependency>
</dependencies>

Gradle Configuration

If you are using Gradle to build your project, add the following entries to your project’s build.gradle file:

plugins {
  // Used to find the right kudu-binary artifact with the Gradle
  // property ${osdetector.classifier}
  id "com.google.osdetector" version "1.6.2"
}

dependencies {
   testCompile "org.apache.kudu:kudu-test-utils:1.9.0"
   testCompile "org.apache.kudu:kudu-binary:1.9.0:${osdetector.classifier}"
}

Test Setup

Once your project is configured correctly, you can start writing tests using the kudu-test-utils and kudu-binary artifacts. One line of code will ensure that each test automatically starts and stops a real Kudu cluster and that cluster logging is output through slf4j:

@Rule public KuduTestHarness harness = new KuduTestHarness();

The KuduTestHarness has methods to get pre-configured clients, start and stop servers, and more. Below is an example test to showcase some of the capabilities:

import org.apache.kudu.*;
import org.apache.kudu.client.*;
import org.apache.kudu.test.KuduTestHarness;
import org.junit.*;

import java.util.Arrays;
import java.util.Collections;

public class MyKuduTest {
  
    @Rule
    public KuduTestHarness harness = new KuduTestHarness();

    @Test
    public void test() throws Exception {
        // Get a KuduClient configured to talk to the running mini cluster.
        KuduClient client = harness.getClient();

        // Some of the other most common KuduTestHarness methods include:
        AsyncKuduClient asyncClient = harness.getAsyncClient();
        String masterAddresses= harness.getMasterAddressesAsString();
        List<HostAndPort> masterServers = harness.getMasterServers();
        List<HostAndPort> tabletServers = harness.getTabletServers();
        harness.killLeaderMasterServer();
        harness.killAllMasterServers();
        harness.startAllMasterServers();
        harness.killAllTabletServers();
        harness.startAllTabletServers();

        // Create a new Kudu table.
        String tableName = "myTable";
        Schema schema = new Schema(Arrays.asList(
            new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
            new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).key(true).build()
        ));
        CreateTableOptions opts = new CreateTableOptions()
            .setRangePartitionColumns(Collections.singletonList("key"));
        client.createTable(tableName, schema, opts);
        KuduTable table = client.openTable(tableName);

        // Write a few rows to the table
        KuduSession session = client.newSession();
        for(int i = 0; i < 10; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            row.addInt("key", i);
            row.addString("value", String.valueOf(i));
            session.apply(insert);
        }
        session.close();

        // ... Continue the test. Read and validate the rows, alter the table, etc.
    }

For a complete example of a project using the KuduTestHarness, see the java-example project in the Kudu source code repository. The Kudu project itself uses the KuduTestHarness for all of its own integration tests. For more complex examples, you can explore the various Kudu integration tests in the Kudu source code repository.

Feedback

Kudu 1.9.0 is the first release to have these testing utilities available. Although these utilities simplify testing of Kudu applications, there is always room for improvement. Please report any issues, ideas, or feedback to the Kudu user mailing list, Jira, or Slack channel and we will try to incorporate your feedback quickly. See the Kudu community page for details.

Thank you

We would like to give a special thank you to everyone who helped contribute to the kudu-test-utils and kudu-binary artifacts. We would especially like to thank Brian McDevitt at phData and Tim Robertson at GBIF who helped us tremendously.

Grant Henke is a Software Engineer at Cloudera
Mike Percy is a Software Engineer at Cloudera

The post Testing Apache Kudu Applications on the JVM appeared first on Cloudera Engineering Blog.

Read Full Article
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

When picking a storage option for an application it is common to pick a single storage option which has the most applicable features to your use case. For mutability and real-time analytics workloads you may want to use Apache Kudu, but for massive scalability at a low cost you may want to use HDFS. For that reason, there is a need for a solution that allows you to leverage the best features of multiple storage options. This post describes the sliding window pattern using Apache Impala with data stored in Apache Kudu and Apache HDFS. With this pattern you get all of the benefits of multiple storage layers in a way that is transparent to users.

Apache Kudu is designed for fast analytics on rapidly changing data. Kudu provides a combination of fast inserts/updates and efficient columnar scans to enable multiple real-time analytic workloads across a single storage layer. For that reason, Kudu fits well into a data pipeline as the place to store real-time data that needs to be queryable immediately. Additionally, Kudu supports updating and deleting rows in real-time allowing support for late arriving data and data correction.

Apache HDFS is designed to allow for limitless scalability at a low cost. It is optimized for batch oriented use cases where data is immutable. When paired with the Apache Parquet file format, structured data can be accessed with extremely high throughput and efficiency.

For situations in which the data is small and ever-changing, like dimension tables, it is common to keep all of the data in Kudu. It is even common to keep large tables in Kudu when the data fits within Kudu’s scaling limits and can benefit from Kudu’s unique features. In cases where the data is massive, batch oriented, and unlikely to change, storing the data in HDFS using the Parquet format is preferred. When you need the benefits of both storage layers, the sliding window pattern is a useful solution.

The Sliding Window Pattern

In this pattern, matching Kudu and Parquet formatted HDFS tables are created in Impala. These tables are partitioned by a unit of time based on how frequently the data is moved between the Kudu and HDFS table. It is common to use daily, monthly, or yearly partitions. A unified view is created and a WHERE clause is used to define a boundary that separates which data is read from the Kudu table and which is read from the HDFS table. The defined boundary is important so that you can move data between Kudu and HDFS without exposing duplicate records to the view. Once the data is moved, an atomic ALTER VIEW statement can be used to move the boundary forward.

Note: This pattern works best with somewhat sequential data organized into range partitions, because having a sliding window of time and dropping partitions is very efficient.

This pattern results in a sliding window of time where mutable data is stored in Kudu and immutable data is stored in the Parquet format on HDFS. Leveraging both Kudu and HDFS via Impala provides the benefits of both storage systems:

  • Streaming data is immediately queryable
  • Updates for late arriving data or manual corrections can be made
  • Data stored in HDFS is optimally sized increasing performance and preventing small files
  • Reduced cost

Impala also supports cloud storage options such as S3 and ADLS. This capability allows convenient access to a storage system that is remotely managed, accessible from anywhere, and integrated with various cloud-based services. Because this data is remote, queries against S3 data are less performant, making S3 suitable for holding “cold” data that is only queried occasionally. This pattern can be extended to use cloud storage for cold data by creating a third matching table and adding another boundary to the unified view.

Note: For simplicity only Kudu and HDFS are illustrated in the examples below.

The process for moving data from Kudu to HDFS is broken into two phases. The first phase is the data migration, and the second phase is the metadata change. These ongoing steps should be scheduled to run automatically on a regular basis.

In the first phase, the now immutable data is copied from Kudu to HDFS. Even though data is duplicated from Kudu into HDFS, the boundary defined in the view will prevent duplicate data from being shown to users. This step can include any validation and retries as needed to ensure the data offload is successful.

In the second phase, now that the data is safely copied to HDFS, the metadata is changed to adjust how the offloaded partition is exposed. This includes shifting the boundary forward, adding a new Kudu partition for the next period, and dropping the old Kudu partition.

Building Blocks

In order to implement the sliding window pattern, a few Impala fundamentals are required. Below each fundamental building block of the sliding window pattern is described.

Moving Data

Moving data among storage systems via Impala is straightforward provided you have matching tables defined using each of the storage formats. In order to keep this post brief, all of the options available when creating an Impala table are not described. However, Impala’s CREATE TABLE documentation can be referenced to find the correct syntax for Kudu, HDFS, and cloud storage tables. A few examples are shown further below where the sliding window pattern is illustrated.

Once the tables are created, moving the data is as simple as an INSERT…SELECT statement:

INSERT INTO table_foo
SELECT * FROM table_bar;

All of the features of the SELECT statement can be used to select the specific data you would like to move.

Note: If moving data to Kudu, an UPSERT INTO statement can be used to handle duplicate keys.

Unified Querying

Querying data from multiple tables and data sources in Impala is also straightforward. For the sake of brevity, all of the options available when creating an Impala view are not described. However, see Impala’s CREATE VIEW documentation for more in-depth details.

Creating a view for unified querying is as simple as a CREATE VIEW statement using two SELECT clauses combined with a UNION ALL:

CREATE VIEW foo_view AS
SELECT col1, col2, col3 FROM foo_parquet
UNION ALL
SELECT col1, col2, col3 FROM foo_kudu;

WARNING: Be sure to use UNION ALL and not UNION. The UNION keyword by itself is the same as UNION DISTINCT and can have significant performance impact. More information can be found in the Impala UNION documentation.

All of the features of the SELECT statement can be used to expose the correct data and columns from each of the underlying tables. It is important to use the WHERE clause to pass through and pushdown any predicates that need special handling or transformations. More examples will follow below in the discussion of the sliding window pattern.

Additionally, views can be altered via the ALTER VIEW statement. This is useful when combined with the SELECT statement because it can be used to atomically update what data is being accessed by the view.

An Example Implementation

Below are sample steps to implement the sliding window pattern using a monthly period with three months of active mutable data. Data older than three months will be offloaded to HDFS using the Parquet format.

Create the Kudu Table

First, create a Kudu table which will hold three months of active mutable data. The table is range partitioned by the time column with each range containing one period of data. It is important to have partitions that match the period because dropping Kudu partitions is much more efficient than removing the data via the DELETE clause. The table is also hash partitioned by the other key column to ensure that all of the data is not written to a single partition.

Note: Your schema design should vary based on your data and read/write performance considerations. This example schema is intended for demonstration purposes and not as an “optimal” schema. See the Kudu schema design documentation for more guidance on choosing your schema. For example, you may not need any hash partitioning if your data input rate is low. Alternatively, you may need more hash buckets if your data input rate is very high.

CREATE TABLE my_table_kudu
(  
  name STRING,
  time TIMESTAMP,
  message STRING,
  PRIMARY KEY(name, time)
)
PARTITION BY
   HASH(name) PARTITIONS 4,
   RANGE(time) (
      PARTITION '2018-01-01' <= VALUES < '2018-02-01', --January
      PARTITION '2018-02-01' <= VALUES < '2018-03-01', --February
      PARTITION '2018-03-01' <= VALUES < '2018-04-01', --March
      PARTITION '2018-04-01' <= VALUES < '2018-05-01'  --April
)
STORED AS KUDU;

Note: There is an extra month partition to provide a buffer of time for the data to be moved into the immutable table.

Create the HDFS Table

Create the matching Parquet formatted HDFS table which will hold the older immutable data. This table is partitioned by year, month, and day for efficient access even though you can’t partition by the time column itself. This is addressed further in the view step below. See Impala’s partitioning documentation for more details.

CREATE TABLE my_table_parquet
(  
  name STRING,
  time TIMESTAMP,
  message STRING
)
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET;

Create the Unified View

Now create the unified view which will be used to query all of the data seamlessly:

CREATE VIEW my_table_view AS
SELECT name, time, message
FROM my_table_kudu
WHERE time >= "2018-01-01"
UNION ALL
SELECT name, time, message
FROM my_table_parquet
WHERE time < "2018-01-01"
AND year = year(time)
AND month = month(time)
AND day = day(time);

Each SELECT clause explicitly lists all of the columns to expose. This ensures that the year, month, and day columns that are unique to the Parquet table are not exposed. If needed, it also allows any necessary column or type mapping to be handled.

The initial WHERE clauses applied to both my_table_kudu and my_table_parquet define the boundary between Kudu and HDFS to ensure duplicate data is not read while in the process of offloading data.

The additional AND clauses applied to my_table_parquet are used to ensure good predicate pushdown on the individual year, month, and day columns.

WARNING: As stated earlier, be sure to use UNION ALL and not UNION. The UNION keyword by itself is the same as UNION DISTINCT and can have significant performance impact. More information can be found in the Impala UNION documentation.

Ongoing Steps

Now that the base tables and view are created, prepare the ongoing steps to maintain the sliding window. Because these ongoing steps should be scheduled to run on a regular basis, the examples below are shown using .sql files that take variables which can be passed from your scripts and scheduling tool of choice.

Create the window_data_move.sql file to move the data from the oldest partition to HDFS:

INSERT INTO ${var:hdfs_table} PARTITION (year, month, day)
SELECT *, year(time), month(time), day(time)
FROM ${var:kudu_table}
WHERE time >= add_months("${var:new_boundary_time}", -1)
AND time < "${var:new_boundary_time}";
COMPUTE INCREMENTAL STATS ${var:hdfs_table};

Note: The COMPUTE INCREMENTAL STATS clause is not required but helps Impala to optimize queries.

To run the SQL statement, use the Impala shell and pass the required variables. Below is an example:

impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

Note: You can adjust the WHERE clause to match the given period and cadence of your offload. Here the add_months function is used with an argument of -1 to move one month of data in the past from the new boundary time.

Create the window_view_alter.sql file to shift the time boundary forward by altering the unified view:

ALTER VIEW ${var:view_name} AS
SELECT name, time, message
FROM ${var:kudu_table}
WHERE time >= "${var:new_boundary_time}"
UNION ALL
SELECT name, time, message
FROM ${var:hdfs_table}
WHERE time < "${var:new_boundary_time}"
AND year = year(time)
AND month = month(time)
AND day = day(time);

To run the SQL statement, use the Impala shell and pass the required variables. Below is an example:

impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

Create the window_partition_shift.sql file to shift the Kudu partitions forward:

ALTER TABLE ${var:kudu_table}

ADD RANGE PARTITION add_months("${var:new_boundary_time}", 
${var:window_length}) <= VALUES < add_months("${var:new_boundary_time}", 
${var:window_length} + 1);

ALTER TABLE ${var:kudu_table}  

DROP RANGE PARTITION add_months("${var:new_boundary_time}", -1) 
<= VALUES < "${var:new_boundary_time}";

To run the SQL statement, use the Impala shell and pass the required variables. Below is an example:

impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3

Note: You should periodically run COMPUTE STATS on your Kudu table to ensure Impala’s query performance is optimal.

Experimentation

Now that you have created the tables, view, and scripts to leverage the sliding window pattern, you can experiment with them by inserting data for different time ranges and running the scripts to move the window forward through time.

Insert some sample values into the Kudu table:

INSERT INTO my_table_kudu VALUES
('joey', '2018-01-01', 'hello'),
('ross', '2018-02-01', 'goodbye'),
('rachel', '2018-03-01', 'hi');

Show the data in each table/view:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

Move the January data into HDFS:

impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

Confirm the data is in both places, but not duplicated in the view:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

Alter the view to shift the time boundary forward to February:

impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"

Confirm the data is still in both places, but not duplicated in the view:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

Shift the Kudu partitions forward:

impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3

Confirm the January data is now only in HDFS:

SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;

Confirm predicate push down with Impala’s EXPLAIN statement:

EXPLAIN SELECT * FROM my_table_view;
EXPLAIN SELECT * FROM my_table_view WHERE time < "2018-02-01";
EXPLAIN SELECT * FROM my_table_view WHERE time > "2018-02-01";

In the explain output you should see “kudu predicates” which include the time column filters in the “SCAN KUDU” section and “predicates” which include the time, day, month, and year columns in the “SCAN HDFS” section.

Grant Henke is a Software Engineer at Cloudera

The post Transparent Hierarchical Storage Management with Apache Kudu and Impala appeared first on Cloudera Engineering Blog.

Read Full Article

Read for later

Articles marked as Favorite are saved for later viewing.
close
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview