Follow LinkedIn | Engineering Blog on Feedspot

Continue with Google
Continue with Facebook


Brooklin - a stateless and distributed service for streaming data in near real-time and at scale - has been running in production at LinkedIn since 2016, powering thousands of data streams and over 2 trillion messages per day. Today, we are pleased to announce the open-sourcing of Brooklin and that the source code is available in our Github repo

Why Brooklin?

At LinkedIn, our data infrastructure has been constantly evolving to satisfy the rising demands for scalable, low-latency data processing pipelines. Challenging as it is, moving massive amounts of data reliably at high rates was not the only problem we had to tackle. Supporting a rapidly increasing variety of data storage and messaging systems has proven to be an equally critical aspect of any viable solution. We built Brooklin to address our growing needs for a system that is capable of scaling both in terms of data volume and systems variance.

What is Brooklin?

Brooklin is a distributed system intended for streaming data across multiple different data stores and messaging systems with high reliability at scale. It exposes a set of abstractions that make it possible to extend its capabilities to support consuming and producing data to and from new systems by writing new Brooklin consumers and producers. At LinkedIn, we use Brooklin as the primary solution for streaming data across various stores (e.g., Espresso and Oracle) and messaging systems (e.g., Kafka, Azure Event Hubs, and AWS Kinesis).

Brooklin supports streaming data from a variety of sources to a variety of destinations (messaging systems and data stores)

Use cases

There are two major categories of use cases for Brooklin: streaming bridge and change data capture.

Streaming bridge

Data can be spread across different environments (public cloud and company data centers), geo-locations, or different deployment groups. Typically, each environment adds additional complexities due to differences in access mechanisms, serialization formats, compliance, or security requirements. Brooklin can be used as a bridge to stream data across such environments. For example, Brooklin can move data between different cloud services (e.g., AWS Kinesis and Microsoft Azure), between different clusters within a data center, or even across data centers.

A hypothetical example of a single Brooklin cluster being used as a streaming bridge to move data from AWS Kinesis into Kafka and data from Kafka into Azure Event Hubs. 

Because Brooklin is a dedicated service for streaming data across various environments, all of the complexities can be managed within a single service, allowing application developers to focus on processing the data and not on data movement. Additionally, this centralized, managed, and extensible framework enables organizations to enforce policies and facilitate data governance. For example, Brooklin can be configured to enforce company-wide policies, such as requiring that any data flowing in must be in JSON format, or any data flowing out must be encrypted.

Kafka mirroring

Prior to Brooklin, we were using Kafka MirrorMaker (KMM) to mirror Kafka data from one Kafka cluster to another, but we were experiencing scaling issues with it. Since Brooklin was designed as a generic bridge for streaming data, we were able to easily add support for moving enormous amounts of Kafka data. This allowed LinkedIn to move away from KMM and consolidate our Kafka mirroring solution into Brooklin. 

One of the largest use cases for Brooklin as a streaming bridge at LinkedIn is to mirror Kafka data between clusters and across data centers. Kafka is used heavily at LinkedIn to store all types of data, such as logging, tracking, metrics, and much more. We use Brooklin to aggregate this data across our data centers to make it easy to access in a centralized place. We also use Brooklin to move large amounts of Kafka data between LinkedIn and Azure.

A hypothetical example of Brooklin being used to aggregate Kafka data across two data centers, making it easy to access the entire data set from within any data center. A single Brooklin cluster in each data center can handle multiple source/destination pairs.

Brooklin’s solution for mirroring Kafka data has been tested at scale, as it has fully replaced Kafka MirrorMaker at LinkedIn, mirroring trillions of messages every day. This solution has been optimized for stability and operability, which were our major pain points with Kafka MirrorMaker. By building this Kafka mirroring solution on top of Brooklin, we were able to benefit from some of its key capabilities, which we’ll discuss in more detail below.


In the Kafka MirrorMaker deployment model, each cluster could only be configured to mirror data between two Kafka clusters. As a result, KMM users typically need to operate tens or even hundreds of separate KMM clusters, one for each pipeline; this has proven to be extremely difficult to manage. However, since Brooklin is designed to handle several independent data pipelines concurrently, we are able to use a single Brooklin cluster to keep multiple Kafka clusters in sync, thus reducing the operability complexities of maintaining hundreds of KMM clusters.

A hypothetical example of Kafka MirrorMaker (KMM) being used to aggregate Kafka data across two data centers. In contrast with the Brooklin mirroring topology, more KMM clusters are needed (one for each source/destination pair).

Dynamic provisioning and management

With Brooklin, creating new data pipelines (also known as datastreams) and modifying existing ones can be easily accomplished with just an HTTP call to a REST endpoint. For Kafka mirroring use cases, this endpoint makes it very easy to create new mirroring pipelines or modify existing pipelines’ mirroring whitelists without needing to change and deploy static configurations.

Although the mirroring pipelines can all coexist within the same cluster, Brooklin exposes the ability to control and configure each individually. For instance, it is possible to edit a pipeline’s mirroring whitelist or add more resources to the pipeline without impacting any of the others. Additionally, Brooklin allows for on-demand pausing and resuming of individual pipelines, which is useful when temporarily operating on or modifying a pipeline. For the Kafka mirroring use case, Brooklin supports pausing or resuming the entire pipeline, a single topic within the whitelist, or even a single topic partition. 


Brooklin also exposes a diagnostics REST endpoint that enables on-demand querying of a datastream’s status. This API makes it easy to query the internal state of a pipeline, including any individual topic partition lag or errors. Since the diagnostics endpoint consolidates all findings from the entire Brooklin cluster, this is extremely useful for quickly diagnosing issues with a particular partition without needing to scan through log files.

Special features

Since it was intended as a replacement for Kafka MirrorMaker, Brooklin’s Kafka mirroring solution was optimized for stability and operability. As such, we have introduced some improvements that are unique to Kafka mirroring. 

Most importantly, we strived for better failure isolation, so that errors with mirroring a specific partition or topic would not affect the entire pipeline or cluster, as it did with KMM. Brooklin has the ability to detect errors at a partition level and automatically pause mirroring of such problematic partitions. These auto-paused partitions can be auto-resumed after a configurable amount of time, which eliminates the need for manual intervention and is especially useful for transient errors. Meanwhile, processing of other partitions and pipelines is unaffected. 

For improved mirroring latency and throughput, Brooklin Kafka mirroring can also run in flushless-produce mode, where the Kafka consumption progress is tracked at the partition level. Checkpointing is done for each partition instead of at the pipeline level. This allows Brooklin to avoid making expensive Kafka producer flush calls, which are synchronous blocking calls that can often stall the entire pipeline for several minutes. 

By migrating all of LinkedIn’s Kafka MirrorMaker deployments over to Brooklin, we were able to reduce the number of mirroring clusters from hundreds to about a dozen. Leveraging Brooklin for Kafka mirroring purposes also allows us to iterate much faster, as we are continuously adding features and improvements.

Change data capture (CDC)

The second major category of use cases for Brooklin is change data capture. The objective in these cases is to stream database updates in the form of a low-latency change stream. For example, most of LinkedIn’s source-of-truth data (such as jobs, connections, and profile information) resides in various databases. Several applications are interested in knowing when a new job is posted, a new professional connection is made, or a member’s profile is updated. Instead of having each of these interested applications make expensive queries to the online database to detect these changes, Brooklin can stream these database updates in near real-time. One of the biggest advantages of using Brooklin to produce change data capture events is better resource isolation between the applications and the online stores. Applications can scale independently from the database, which avoids the risk of bringing down the database. Using Brooklin, we built change data capture solutions for Oracle, Espresso, and MySQL at LinkedIn; moreover, Brooklin’s extensible model facilitates writing new connectors to add CDC support for any database source.

Change-data capture can be used to capture updates as they are made to the online data source and propagate them to numerous applications for nearline processing. An example use case is a notifications service/application to listen to any profile updates, so that it can display the notification to every relevant user.

Bootstrap support

At times, applications may need a complete snapshot of the data store before consuming the incremental updates. This could happen when the application starts for the very first time or when it needs to re-process the entire dataset because of a change in the processing logic. Brooklin’s extensible connector model can support such use cases.

Transaction support

Many databases have transaction support, and for these sources, Brooklin connectors can ensure transaction boundaries are maintained. 

More information

For more information about Brooklin, including an overview of its architecture and capabilities, please check out our previous engineering blog post

In Brooklin’s first release, we are pleased to introduce the Kafka mirroring feature, which you can test drive with simple instructions and scripts we provided. We are working on adding support for more sources and destinations to the project—stay tuned!

Have any questions? Please reach out to us on Gitter!

What’s next?

Brooklin has been running successfully for LinkedIn workloads since October 2016. It has replaced Databus as our change-capture solution for Espresso and Oracle sources and is our streaming bridge solution for moving data amongst Azure, AWS, and LinkedIn, including mirroring trillions of messages a day across our many Kafka clusters.

We are continuing to build connectors to support additional data sources (MySQL, Cosmos DB, Azure SQL) and destinations (Azure Blob storage, Kinesis, Cosmos DB, Couchbase). We also plan to add optimizations to Brooklin, such as the ability to auto-scale based on traffic needs, the ability to skip decompression and re-compression of messages in mirroring scenarios to improve throughput, and additional read and write optimizations. 


We want to thank Kartik Paramasivam, Swee Lim, and Igor Perisic for supporting the Brooklin engineering team throughout our journey. We also are grateful to the Kafka, Samza, Nuage, and Gobblin teams for being wonderful partners. Last but not least, a huge shout out to the members of the Brooklin engineering team, who put their hearts and souls into this product and shaped what it is today.

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Pinot, a scalable distributed columnar OLAP data store developed at LinkedIn, delivers real-time analytics for site-facing use cases such as LinkedIn's Who viewed my profile, Talent insights, and more. Pinot uses Apache Helix for managing cluster resources and Apache Zookeeper to store metadata. Pinot has wide adoption at LinkedIn, ranging from internal dashboards to site-facing applications.

Pinot supports batch data ingestion (referred to as “offline” data) via Hadoop, as well as real-time data ingestion via streams such as Kafka. Pinot uses offline and real-time data to provide analytics on a continuous timeline from the earliest available rows (could be in offline data) up to the most recently-consumed row from the stream.

Serving queries on data while rows are being ingested from a real-time stream poses a unique set of challenges. Pinot has been chipping away at these, and getting better over time.

Pinot stores data in shards called "segments." During query execution, Pinot processes segments in parallel, and merges results across segments to construct the final response for a query. Offline data is pushed into Pinot as pre-built segments (offline segments) and stored in the Segment Store (see Architecture diagram). These segments are stored as ImmutableSegment objects (addition, deletion, or modification of rows is not possible on these segments). On the other hand, real-time data is consumed on a continuous basis from the underlying stream partitions into segments called MutableSegments (or, “consuming” segments). These segments allow for the addition of rows to them (rows can still not be deleted or updated, though). MutableSegments store rows in uncompressed (but still columnar) form in volatile memory (discarded on restart).

Every so often, the rows in a MutableSegment are compressed and persisted by “committing” the segment into the Segment Store as an ImmutableSegment. Pinot then moves on to consume the next set of rows from the stream partition into a new MutableSegment. The key question here is: “At what point (or, how often) should Pinot decide to commit a consuming segment?”

Committing segments too frequently ends up with many small segments for a table. Since Pinot queries are processed at segment level, having too many segments results in increased overhead for processing queries (number of threads spawned, metadata processing, etc.), resulting in higher query latencies. 

On the other hand, committing segments less frequently can result in servers running out of memory, since new rows get added into MutableSegments all the time, expanding the memory footprint of these segments. Furthermore, servers can be restarted at any time (at LinkedIn, we push new code every week), causing the MutableSegment to discard all rows and re-start consuming from the first row of the MutableSegment again. This by itself is not a problem (Pinot can ingest back-logged data at a very high rate), but it is possible that the underlying stream topic has retention configured so that the first row of the MutableSegment has been retained out. In this case, we lose data—not good!

It turns out the answer depends on several factors—like ingestion rate and number of columns in the schema, to name a few—that vary across different applications. Pinot provides some configuration settings (e.g., a setting for maximum number of rows in a MutableSegment) that address these variations, but there were still questions from administrators regarding how to set the correct values for those settings on a per application basis. Experimenting with different settings (or combinations thereof) for each application was not a scalable solution, given Pinot’s adoption rate at LinkedIn. In this blog, we will explain how we implemented auto-tuning of real-time consumption that eliminated the experimentation process completely and helped administrators scale to Pinot’s adoption rate.

In order to understand the problem and the solution better, it is useful to go over the Pinot real-time architecture in some more detail.

Pinot real-time ingestion

Pinot real-time servers create a PartitionConsumer object for each stream partition they are directed (by Helix) to consume. If the table is configured to have q replicas and there are p partitions of the stream, then there will be (p * q)  instances of PartitionConsumer objects across all the servers for the table. If there are S servers serving this table, then each server will have ⌈(p * q)/S ⌉ PartitionConsumer instances.

The figure below is an illustration of how PartitionConsumer objects are distributed across Pinot real-time servers.

Real-time servers consuming from a stream of p partitions

Helix ensures that more than one replica of any stream partition is never consumed in the same real-time server. (Therefore, we must set S >= q, otherwise table creation will not succeed).

Pinot assumes that the underlying stream partition has messages that are ordered according to their arrival within the partition, and that each message is located at a specific “offset” (essentially a pointer to the message) in the partition. Each message of the stream partition is translated into a row in the MutableSegment. Each MutableSegment instance has rows from exactly one stream partition. The metadata for a MutableSegment (in Zookeeper) has the offset in the partition from which consumption should start for that segment. This starting offset value applies to all replicas of the MutableSegment. Pinot controller sets the value of the starting offset in the segment metadata at the time the segment is created (which is either when the table is first created, or, when the previous segment in that partition is committed).

The algorithm to commit a segment involves a few steps, during which queries continue to be served from the MutableSegment. After a segment is committed, the MutableSegment is atomically swapped with the (equivalent) ImmutableSegment. The memory taken up by a MutableSegment instance is released after the last query on that instance is drained. All through this process, the application is unaware that any segment commit is going on. The algorithm to commit a segment is as follows:

  1. Pause consumption (until step 5).

  2. Execute steps of the segment completion protocol to decide which replica commits the segment.

  3. Build an ImmutableSegment out of rows in the MutableSegment.

  4. Commit the segment to the controller (In this step, the controller creates the next segment in the partition).

  5. Await signal (from Helix) for the next segment.

  6. Resume consumption when signal is received, indexing rows into a new MutableSegment.

This algorithm is illustrated in the figure below. The actual steps of segment completion are more involved, but we skip the details in this blog.

The problem of provisioning

The characteristics of applications being provisioned can vary widely from one another. Here is a partial list of variations across applications:

  • The cost of holding a row in memory depends on the data schema (more columns means more memory).

  • Pinot uses dictionary encoding to optimize memory consumption (values in rows are stored as integer dictionary IDs that refer to the actual value in a dictionary). Therefore, a higher number of unique values of any column will consume more memory in the dictionary.

  • The rate at which events are ingested into a topic varies widely across applications, and even over time in any one application. For example, events could be coming in at a much higher rate on a Monday morning than on a Friday evening.

  • The number of stream partitions can vary across applications (see below for the impact).

  • We may provision different number of machines for an application with higher query loads than another with a lower query load.

In earlier versions of Pinot, we provided two configuration settings:

  • Maximum number of rows that can be held across all MutableSegments in a server (N).

  • Maximum time (T ) for which a MutableSegment can exist. After this time, the segment is to be committed, no matter how many rows are in the segment at that time. The administrator may set the value of T depending on the retention of the underlying stream.

If a server ended up owning k ( = ⌈(p * q)/S ⌉) partitions of a table, the Pinot controller sets the segment metadata to consume at most x (= N/k) rows. The PartitionConsumer is designed to stop consumption and start the commit procedure either upon reaching time T, or after consuming x rows into the MutableSegment. However, the variations across applications will require N to be different for each one.

There is one other thing the administrators had to consider before choosing N: Resident Memory size on each server (for both MutableSegments and ImmutableSegments):

  • Memory for a MutableSegment is (as far as possible) acquired at the time the MutableSegment is created. The amount of memory acquired is based on the threshold x set for that segment (therefore, to have a high value of x and not use the memory allocated is a waste).

  • The ImmutableSegment is resident in virtual memory until the retention time of the real-time table, at which point it is unloaded. A higher value of x would mean a smaller number of (larger) ImmutableSegment objects, and larger MutableSegment objects.

The total resident memory on a server will depend on the following:

  1. Number of stream partitions that the server hosts (k).

  2. Number of ImmutableSegments created during the retention period.

  3. Size of ImmutableSegments.

  4. Size of MutableSegments (dependent on x, and other things as outlined above).

The value of k depends on the number of servers deployed. An administrator may decide to deploy as many servers as necessary to support the query throughput, given the latency requirements.

As you can see, the number of variables quickly gets out of hand, and we seem to need one to estimate the other. In order to arrive at a working configuration setting, the administrators had to run benchmark experiments before provisioning a use case:

  1. Set up a table with some number of servers and a value of N.

  2. Consume from earliest offset in the stream partitions so that we get to have the ImmutableSegments in place (this is an approximation, since ingestion rate varies across time for any given stream topic, causing us to hit the time limit rather than row limit).

  3. Run the retention manager to retain out the older segments.

  4. If there is too much paging or we run out of memory, then change the number of servers or N (depending on segment sizes) and go back to step 1.

  5. Run a query benchmark firing queries at the rate the application expects to do so. If performance is not as desired, increase the number of hosts and go back to step 1, readjusting N as needed.

Arriving at the right configuration settings for an application took a few (sometimes several) days, not to mention the time spent by Pinot administrators while they had more urgent things to focus on.

Automatic tuning

In order to help administrators provision a use case, we decided to provide:

  • A target segment size setting for the committed segment. Pinot would attempt to create ImmutableSegment objects of this size.

  • A command line tool that helped the administrators choose the target segment size.

With these two in place, all that administrators need to do is to run the command line tool with a sample segment (generated from data previously gathered via ETL on the same topic). The tool outputs a few choices to pick from, depending on the number of servers that are needed for query processing. The administrator can then select one of the choices and provision the table, confident that it will work as desired with reasonable performance.

Command line tool

Given a sample segment, the tool estimates the resident memory on a host, and the segment size setting. The tool works by estimating the resident memory with these segment sizes.

Here is a sample output from RealtimeProvisioningHelper for a table:

The output shows, for different numbers of servers used and hours that a MutableSegment consumes data:

  • The total memory used in a server (for MutableSegments as well as ImmutableSegments).

  • Optimal Segment size setting.

  • The amount of memory that MutableSegments will use (Consuming Memory).

Each of these will vary according to the number of hours consumed, so the values are displayed for the different numbers as provided in the command line arguments. The administrator specifies the host counts that they are considering (in this case 8,10,12, or 14 hosts), a sample segment from consumed data (or sample segment from offline data), and the table configuration (for retention time, etc.). The utility prints out the matrix as above.

Based on the output, the administrator can choose to deploy 8, 10, 12, or 14 hosts, and choose the segment size limit appropriately as per the table. In the above example, if the administrator chooses to use 12 servers (say, based on query throughput requirements), then 10 hours seems to be utilizing memory optimally. The optimal segment size seems to be 360MB. So, the configuration would look like this (the other parameters of StreamConfigs are omitted for brevity):

    streamConfigs {

        "realtime.segment.flush.threshold.size": "0",

        "realtime.segment.flush.desired.size": "360M",

        "realtime.segment.flush.threshold.time": "10h"


Based on the output of the tool, we know that if the PartitionConsumer commits a segment when the segment size is around 360MB, we should be utilizing resident memory optimally between MutableSegments and ImmutableSegments. Note that the 360MB size is that of an ImmutableSegment. As explained before, a MutableSegment is converted to an ImmutableSegment at the time of committing the segment, so it is a chicken-and-egg problem to determine the size of an ImmutableSegment before building one.

Recall that we stop consuming when we reach a row limit (x) or time limit (T). So, if we can somehow set the row limit for a segment in such a way that we can expect the resulting segment size to be near the target segment size, we should be good. But then, how do we estimate the number of rows that results in the desired segment size?

Estimating the row limit for a desired segment size

In order to come up with a row limit for a MutableSegment, we decided to take advantage of the fact that the controller is responsible for committing a segment as well as creating a new segment (which it does in one step, as shown in the picture above).

The idea is for the controller to decide the value of x for the next segment, so as to reach the desired segment size. At the time of segment completion, the controller estimates the number of rows that need to be consumed in the next segment based on the current segment size and the number of rows consumed in the current segment. 

ImmutableSegments have indices, dictionary, etc. in a compressed representation. So, the size of a segment may not vary linearly with the number of rows (e.g., the dictionary size is based on the number of unique values of a column and the average width of the column, no matter how many rows there are in the segment). Also, segment sizes can potentially vary a lot depending on the actual values in a single segment. 

Therefore, we take into account the past values of segment sizes while estimating the size of the next segment. Instead of maintaining the segment sizes over time, we maintain the ratio of segment size to number of rows, improving the ratio each time a segment completes, so that we can estimate the number of rows reasonably for the next segment.

Algorithm for setting the row limit

We assume that the ratio of segment size to number of rows is a constant for each table (say, R). Since there is a fixed overhead for creating a segment even with one row, R is not really a constant, but is a good approximation. Each time a segment completes, we compute the value of R and adjust the learned value R to be more accurate, as below:

Rn+1 = Rn * α + Rcurrent * (1 - α),    where 0 < α < 1

Here, Rcurrent is the row-count-to-size ratio of the current segment (i.e., the one that is in the process of completing). We choose α to be a number higher than 0.5 so that we weigh the learned value more than the new value.

The number of rows threshold for the next segment is computed as:

xn+1 = desiredSegmentSize / Rn+1

Also, it is possible that even though we set x for a segment to be some number x1, the PartitionConsumer could reach the time limit T after only x2 rows, where x2 < x1.

In this case, for the subsequent segment, we want to set the row limit to be more like x2, so that we always try to end the segments by reaching the row limit rather than the time limit (this goes back to not wasting memory allocated up front, as mentioned before).

Taking these factors into account, here is the final algorithm:

Note that the value of R is stored in local memory, not persistent store. It may happen that the lead controller needs to be restarted (e.g., for deployment, failure, etc.). In this case, another controller takes over leadership, and as per the algorithm, starts with a null value of R. However, the algorithm takes the first value of R from the completed segment, thus effectively transferring over the value to the new controller, with all the history of older segments.

Lastly, we run this algorithm only on one partition of a topic. Multiple partitions of a stream tend to have similar characteristics at similar times. For example, if 100 new articles appeared between 8 and 9am, the events for the clicks on those articles will probably follow a similar distribution across all partitions of the click-stream during that period. So, changing the value of R (which is applicable across all partitions of the table) whenever the segment completes for any partition is not a good idea, since we will be biasing the value of R towards recent segments more than we want to.

In practice, we see that all stream partitions of a topic result in more or less the same segment sizes, and complete more or less at the same time.


The algorithm presented essentially computes the row limit for the next segment, given some history and characteristics of the current completing segment. Here is a graph that shows the size of the segment adjusting to reach the target segment size over the first 20 segments. The measurements are for a single partition of the stream topic of a table. The average event ingestion rate was 630 rows/sec, with the maximum being around 1,000 rows/sec.

The number of unique values (within a segment) in a column, the dictionary sizes, etc. can vary significantly between different segments, especially as we transition from a weekend to a weekday, or from a longer holiday period to a workday. Depending on the topic (Pinot serves over 50 topics in production), major world events, publications, new product launches, etc. can significantly change the characteristics of data, thus making it hard to predict the segment size by just using number of rows. Thus, the estimated number of rows for a segment could result in much larger (as is the case with the 500MB segment size target in the graph above) or much smaller segment size.

However, the wild variations typically happen during the initial learning period. Typically, tables are provisioned first and queries ramped up over time. 

Here is a graph that shows the segment size variation over 10+ days with the target segment size of 500M.

Here is the code for this algorithm.


We now provision all single tenant real-time tables based on the output..

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

With over 630 million members, the LinkedIn platform delivers thousands of features that individually serve and store large amounts of diverse data. Protecting, maintaining, and serving data has always been of paramount importance for enriching the member experience and ensuring service reliability. In this blog post, we’ll address a critical part of data management involving ad hoc operations through data migrations or data fixes. More specifically, we’ll detail how we developed a centralized scaleable self-service platform for implementing and executing both data fixes and migrations.

Data Operations

Let’s review the two primary types of data operations. Data migrations generally involve a process of collecting, transforming, and transferring data from one field, table, or database to another. Data fixes on the other hand generally involve selecting and transforming data in place. To elaborate on these types, let’s go through an example:

  • Suppose member first names are not limited in length. A number of members have entered first names that are millions of characters long. A product decision is made to enforce a limit of 1,000 characters. A data fix might involve truncating the first names of all members with first names of length greater than 1,000 characters.

  • Suppose member profile data exists in a relational database that we would like to transfer to a NoSQL database. In the past, we migrated profile data from multiple Oracle databases to a NoSQL Espresso database in order to leverage existing technology developed internally by LinkedIn. This would be an example of a data migration.

From cleaning invalid or corrupted data, to migrating member data from legacy systems, these operation types are all use cases that we frequently encounter at LinkedIn.

As LinkedIn and its datasets continue to grow rapidly, executing these data operations becomes increasingly difficult at scale. Specifically, data fixes can carry a weight of urgency as immediate solutions to production issues. For both data operations, reducing the engineering effort required provides immense member value by preserving data quality and expediting feature development—especially when data migrations can require several months to complete and verify. Historically, across the company, data operations have been conducted through a decentralized variety of mechanisms including ad hoc scripts, use case-specific CLIs, deploying new migration services, and many more. 

However, in order to develop a generic platform, we had to keep in mind a few requirements and considerations:

Problem 1: Ease of use

This new system should maintain a level of simplicity such that any data owners can write a datafix or migration without significant effort. Achieving this requires abstracting concepts away from users writing jobs through simple interfaces, as well as flexible support for multiple languages, frameworks, and data engines (i.e. Java, Python, Pig, Hive, Spark, etc.). To reduce the engineering effort required for data operations, we need to abstract away multiple aspects such as resource/topology management, request throttling/scaling, code deployment, iterating data operations on records, and many more. The goal of these abstractions and interfaces is to improve both the developer experience and speed of implementing data operations as much as possible.

Problem 2: Data correctness

With any data operation, we must ensure and maintain data correctness. This entails keeping data in a valid state for use by features/products and preventing data from entering a state such that member features break. Any data platform should focus on preventing these implications if at all possible by allowing for strong validation. For any software or engineering company at the scale of LinkedIn, designing systems that have perfect validation and will preempt all future data quality issues is close to impossible. With a multitude of data stores, schemas, and services, not all corner cases and holes in validation can be fully prevented or protected against. Any client, system, or script can compromise data quality. Similarly, any data operation job could do the same, potentially exacerbating pre-existing issues. Therefore, any existing validation must be honored regardless if data changes occur organically via live traffic or inorganically through a data operation platform.

Validation comes in all shapes and sizes. Simple validation can range from type checks, formatting checks, range checks, and many more. Alternatively, complex rules can include checks across multiple tables within a database or cross-service calls to multiple sources of truth.

Often times, these simple rules can be fully enforced at the database layer, but not all validation rules can or should be implemented this way. Enforcement is not always feasible at the database layer and may require an intermediate service, especially in the case of complex rules. To satisfy both use cases of simple and complex rules, it is imperative that a data platform be flexible enough to access any service or database that could contain necessary validation rules, ensuring any fixes or migrations maintain high data quality.

Problem 3: Scaling and throttling

When modifying large amounts of records, any data migration must be able to iterate on records quickly. For example, we recently needed to purge deprecated profile fields, such as associations and specialties, from millions of members as part of a decision to remove the elements of the profile that no longer seemed relevant to the product. Generally, data migrations may have to apply across millions or hundreds of millions of profile records, which requires an immense scaling of this data operation process. If we were to modify all 630 million members’ records using a data migration job that can only modify 10 records per second, this migration would take almost two years to complete! Our data platform must be able to modify large amounts of records quickly across large datasets.

On the other hand, complex systems will inevitably have bottlenecks and capacity limits (database, service, and event consumption limitations). High query frequency and volume can easily take down under-provisioned services or exposed bottlenecks, and require careful control of data operation rates. This system will require consistent throttling mechanisms to ensure services do not take on an unmanageable load.

Solution: Data operation platform

In order to solve these problems, we implemented a flexible self-service platform for users to rapidly implement, test, and execute data operation jobs. At a high level, the platform filters down large datasets to records that need to be operated upon and then iterates through each of these records to conduct corresponding updates to data until the job is complete.

High-level architecture

The high-level components consist of the following:

  • User inputs: User-implemented dynamic and static job configurations, an offline script, and online data operation code.

  • Artifact repository: Source code repository for user implemented job build artifacts.

  • Azkaban workflow: A reusable generic job workflow consisting of an offline filter phase and online mapper phase. Azkaban is a batch workflow job scheduler created at LinkedIn to run Hadoop jobs.

  • Existing data services and infrastructure: Existing data infrastructure and data services that the workflow interacts with to read and write data for executing a data operation job.

Throughout this blog post, we’ll delve into how these components interact with each other and how they function together to address the technical requirements.

User inputs

Users of the platform may build data jobs for a wide variety of reasons or use cases to  be supported by the platform. To handle this, the platform workflow is intentionally made generic such that the user inputs are used as plugins during the job workflow’s execution. This allows the platform to be agnostic to the details of each job’s implementation. The platform does not define the user logic but instead controls the structure, interfaces, and execution of data operation jobs.

This approach to abstraction addresses the first problem of ease of use by removing the need for:

  1. Complex user implementations through simple interfaces provided by the platform

  2. Maintaining and understanding the tedious details common among most jobs

  3. Managing computing and storage resources required for job execution

The platform abstracts away these implementation details such that clients only provide the user inputs in the form of a static distributable and dynamic configurations. 

The static distributable includes specifications defined once for each given job involving an offline script, online data operation logic, and a static manifest configuration file. The offline script can be a Pig, Hive, or Spark job that limits input datasets or sources them into a filtered set of records to be operated upon. The online data operation logic is an implementation of an interface for conducting necessary reads or writes based upon a single record result from the offline script. These three subcomponents are compiled and built into a JAR to be stored into the artifact repository that will then be downloaded during the Azkaban workflow job execution.

The dynamic configurations provide easily modifiable parameters that users can adjust for each execution of a given job. To complete a data fix or migration, it’s likely multiple executions with different flavors will be required. Some examples of these modifications include request rates, throttling limits, data center fabrics, or other environment variables.

Filter phase

Once users provide the above components, they are ready to run their jobs. Once the platform downloads and processes the user inputs, the job begins with the filter phase. This phase narrows down the records from a given dataset to a closer representation of the records that need to be fixed or migrated. For example, even though 630 million members exist, we may only need to fix 1 million of those member records. Filtering allows jobs to iterate only on the records that actually matter and reduces unnecessary data operations. 

Filter phase workflow

With the downloaded distributable JAR from the artifact repository, the platform will first extract the offline script and static manifest. Then, the offline script will be executed to read defined datasets from HDFS and apply filtering logic based upon the values and records from those datasets. Finally, the filter phase will produce a limited set of filtered records as output to be operated upon in the mapper phase.

Mapper phase

Once the job has produced a set of records to operate on, the workflow can conduct the necessary writes. The platform will then use the user-defined online logic to conduct reads and writes to online services or data stores.

Mapper phase workflow

In order to handle large amounts of data operations, the platform must be able to dynamically scale computing resources. This is done through the use of Gobblin, a universal data ingestion framework for extracting, transforming, and loading large volumes of data. Gobblin libraries allow the platform to create tasks based upon an incoming data source, such as the filtered set of records in this case. With the online data operation logic, the workflow then constructs execution tasks to be queued, throttled, and carried out. This is how the platform addresses the second problem of scaling and throttling. Using the dynamic configurations provided, the platform scales and throttles the execution of these tasks.

Since the online logic is plugged into the tasks for execution, flexibility remains for calling existing online data services and databases and satisfying the requirement for data correctness. Therefore, users or owners of data services and databases can enforce validation rules as they so choose. Users can then ensure that their data operation logic calls the desired services or systems that contain those strict validation rules.

Lastly, the existing LinkedIn data infrastructure will propagate these job writes from databases into HDFS through ETL pipelines. As more and more updates propagate to HDFS, the number of filtered records will continue to decrease until the remaining filtered set becomes zero. Now, the loop is complete! We’ve successfully cleaned up invalid data or prepared legacy systems for deprecation and decommissioning!

Conclusion and next steps

By abstracting away a majority of the implementation details required for running data operation jobs, engineers can put aside tedious aspects—such as throttling, scaling, deployments, and resource management—allowing for faster fixing and migrating of data. Some simple data jobs can be implemented within a few hours or even a single day.

Currently, the platform supports offline to online jobs (an offline filtering phase with an online mapper phase). This will be evolved to support additional job types:

  • Online to online: In some cases, offline data may be unavailable or unreliable, which may require filtering with online sources of truth directly.

  • Nearline to online: In other cases, it may be desirable to trigger migration logic on database changes or other nearline events instead of offline filtering.

Additionally, we also plan to integrate existing quota systems into the data operation platform so that we can ensure that the throttling of data operations can be controlled by the service receiving the data operation requests, rather than on the client-side (data operation platform) that is doing the triggering. This can help ensure that systems are robustly protected against excessive data operation traffic through enforcement.


This project would not have been possible without the backing and support from Sriram Panyam. Thanks to Arjun Bora, Charlie Summers, Abhishek Tiwari, Xiang Zhang, Ke Wu, Yun Sun, and Vince Liang for early feedback on the design. Thanks to Jingyu Zhu and Estella Pham for contributions to implementation and further design enhancements. Thanks to Jaren Anderson, Heyun Jeong, and Paul Southworth for feedback on this blog post.

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

WomenConnect is a series of events focused on bringing together women in tech to build meaningful relationships. Over the past four years, we’ve shared countless stories, learned a ton of lessons, and made lifelong connections at the 16 WomenConnect events in four different cities.

On June 12, our NYC office in the Empire State Building played host to a vibrant and energetic collection of attendees at our event. The theme, “Balance for Better: Allyship,” focused on carrying out change and it was a natural progression from our previous themes of transformation and building communities. We collected various speakers and leaders to set the tone for the night by sharing their personal experiences, advice, and takeaways on how all of us can effect change.

As the networking reception kicked off the event, it was exciting to see attendees from around the local NYC engineering and product community. Having been a part of the planning process of all three WomenConnect NYC events, I was most in awe of the repeat attendees from our previous events because it showcased how we’ve come a long way in building a community. The space we’ve created for people to be vulnerable and supportive of one another is really special. 

I wanted to share some takeaways to try and capture what makes the event so unique. 

Treat people beautifully

Relationships matter. Throughout the event, we touched on the importance of allyship, defined as a “lifelong process of building relationships based on trust, consistency, and accountability with marginalized individuals and/or groups of people.” To build meaningful relationships, it’s important to be self-aware and bring your full self to work. Our Head of Global Diversity, Inclusion, and Belonging Rosanna Durruthy shared that for her this entails being a woman in tech, a mother, a person of color, and a member of the LGBTQ+ community. To be an ally and advocate for diversity means we understand and honor how these different identity categories interrelate with one another. 

“Treat people beautifully.” – Rosanna Durruthy

To be authentic is to be vulnerable

Authenticity is what allows us to connect with others. Part of showing and bringing your true self to work involves sharing your full self, including your vulnerabilities. Rosanna fully embraced and embodied this value of authenticity when asked about dealing with failure. She went on to share a personal story from early in her career when she didn’t get along with a more senior coworker—they had started off on the wrong foot, and this set the tone for the rest of the time she was on the team. Looking back, she realized her unwillingness to try to reset things was a misstep on her part. In not taking that first step to mend the relationship, she shared that she felt she had missed a lot of learning moments. Rosanna’s introspection and confidence to look back and pinpoint what could have been done better really set the tone in our table discussions to open up with one another.

In addition, among the dozens of LinkedIn executives who participated in the event, there were many male allies. They signaled strong support to our efforts and goals, and were there to learn, but also to offer valuable advice on growth and empowerment. One of them, Chris Pruett, VP of engineering, and a facilitator at one of the tables, encouraged the guests to be confident and not be afraid to ask for what they want.

The table discussions served as an additional way to meet people, including some of our engineering leadership.

Bringing it back to work

After the fireside chat, we dove into a Q&A session, during which attendees were able to line up behind the mics to ask Rosanna and marketing development global lead Ty Heath questions. This was a time for audience members to ask for advice and share their own stories. The questions were as tactical as how to negotiate for a raise or determine salary as a contractor, which led us all to realize our shared experiences and concerns in the workplace. Throughout the evening, we were reminded that we have a collective responsibility to bring back the takeaways learned from this evening to our day-to-day lives. How can we replicate this positive energy and feeling of community throughout our own workplaces?

This sense of making sure the event’s impact lasted beyond one night was the central theme of the table discussions portion of the night. As we drew inspiration from the confident and talented women on stage, it was then time for us to regroup in a more intimate group discussion setting and brainstorm how to bring the true change home to our workplaces and communities, one action at a time. First, we were asked to come up with one change we would like to see to increase diversity in the workplace. Then, we jumped into more actionable thinking—how could we help implement that change? One point that stood out to me was the importance of managers leading by example, especially when it comes to work-life balance. 

We concluded with representatives from each table coming up on stage to share the main points and take-aways from their group discussions. The amount of great ideas and actionable implementation steps was amazing to hear. By the end of the night, we left empowered with the awareness that we could impact change and that there was an entire community of allies behind us every step of the way. As Vice President of Engineering Erica Lockheimer shared, we can be change agents at any level, but we need to be intentional and have the awareness of what exactly needs to change. 


Thank you to anyone who has ever attended a WomenConnect event. Each event has built on the previous ones to improve, and that is in huge part thanks to everyone’s willingness to open up and connect with one another. Thanks also to our June event’s emcees, Stephanie Killian and Abby Garcia, for their amazing work. Follow our hashtag, #WomenConnect, on LinkedIn to see what we’re up to.

“When you truly have empathy and feel it, the passion grows stronger for change.”  – Erica Lockheimer

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

In this blog post, we’ll share how we migrated Espresso, LinkedIn’s distributed data store, to a new Netty4-based framework and achieved a large performance and capacity gain throughout the Espresso system as a result. In the larger scheme, this is particularly important since Espresso is a master data hub that serves many important applications across LinkedIn, and every improvement we implement has an impact on the user experience in one way or another. 

About Espresso 

First, we’ll provide a refresher on Espresso if you’re not familiar with it. Espresso is LinkedIn's scalable and elastic data-as-a-service infrastructure and, as mentioned, is a master data hub that serves many important applications across LinkedIn.

Here’s a 30,000-foot view of the overall Espresso ecosystem:

It’s important to note the Espresso operation can be divided into two parts:

  • Control Path: The Control Path maintains and controls the state transitions of the different components.
  • Data Path: The Data Path is how the data flows in the system. It includes three different operations, including online, nearline, and offline. The work we discuss in this post is mainly referencing the online operation, which services real-time client read/write and requires high availability and low latency.

Espresso is a horizontally-scaling, multi-colo data service, which has proved to be the most cost-effective and efficient for a large distributed system. However, even with commodity hardware, as the system grows larger and larger, the cost to build and maintain such a large system is significant. Therefore, improving the performance and capacity of each node is imperative to driving cost savings, considering the scale of the system.

About Netty framework

Netty is a Java application framework for networking services that is widely used at LinkedIn and in the industry. Netty provides high efficiency with a non-blocking, asynchronous, event-driven framework and high flexibility with a chained handler pipeline structure.

Here is an overview of protocol stacks in Netty framework:

Source: https://netty.io/

The bottom of the stacks provide core functionalities like zero-copy-capable byte buffer, asynchronous event model, and universal APIs. The upper layers of the stacks provide comprehensive services like compression, SSL support, and network protocols like HTTP and WebSocket.

Netty migration on Espresso 

The original Espresso data pipeline framework was developed around 2011 and the technology has evolved with new features and capabilities since then. However, most frameworks eventually run their course and we decided it was time to migrate the old system to a new Netty4 framework for a myriad of reasons: 

  • To modernize the infrastructure, upgrade libraries, and enhance capabilities, security, and performance. 

  • To support new features such as HTTP2, streaming, TLS1.3, and more. 

  • To make fundamental changes to the Espresso online data path, including the new thread model and direct (off-heap) memory allocation with buffer pool. 

Major features 

When building the new framework, there were specific features we were set on implementing and building, which included a new thread model for I/O threads, better memory management with direct buffer pooling, streamlining the asynchronous pipeline, and providing native epoll support for socket. This section will expand on each of these. 

A new thread model for I/O threads

We implemented a new thread model to avoid inter-thread locking between I/O threads. Thread-local variables are widely used to prevent contention between threads and improve CPU cache hit-rate. In the two following images, you can compare and contrast our old data flow with our Netty4 flow.

Direct buffer pool for better memory management

Instead of putting memory allocation load on JVM heap for transient memory usage, the direct buffer pool is now used for better memory management. The buffer pool directly allocates and manages the memory from the operating system, thus reducing the JVM heap GC pressure and related memory copy operation.

This diagram shows how the direct buffer pool is used to offload request/response data buffers from traditional JVM heap to the direct buffer pools.

Streamline the asynchronous handling pipeline

We made the online data pipeline asynchronous, where only the Store layer is still running in synchronous mode, to fit the JDBC requirements. The HTTP and Espresso layers are fully asynchronous.

In the asynchronous pipeline, the executions are non-blocking. The next execution in the queue does not need to wait for the previous execution to finish before starting the execution. This greatly improves the throughput and reduces the latency of the system.

Provide native epoll support for socket

In a typical Espresso cluster, there are thousands of TCP connections from routers to each storage node. We chose to use Netty’s native epoll approach, instead of the Java NIO epoll approach, because we found epoll to be more efficient for managing a large number of connections. 

The advantages of the native epoll approach include: 1) it implements the edge-triggered model, which performs better than the level-triggered model of Java NIO epoll; and 2) the native epoll generates less garbage on the JVM heap, which reduces the GC pressure for the system.

Deployment and the result

Metrics for performance measurement

For a large system migration, it is important to compare the system performance before and after the migration. This generates evidence on what works and what does not, providing guidance for future work and references for other related projects.

In this project, the following metrics were used to measure the performance.

  • JVM GC: JVM OldGen GC and Young Gen GC are measured to show the heap usage.

  • Latency: P99 and max latencies of client requests are measured to show the health of the service.

  • Capacity: For capacity and throughput, we measured RCU/WCU (see the definition in Capacity Improvement section) and QPS.

In the following section, we’ll look at some of our initial results in these categories. 

Production cluster results

The production Espresso system is composed of multiple clusters based on the internal customer profiles. Each cluster is configured to best fit the specific needs of the customer. From the traffic access pattern’s point of view, we can categorize the clusters into read-heavy and write-heavy clusters. 

For both read-heavy and write-heavy clusters, we saw large latency improvements after the migration. 

For read-heavy clusters, this latency improvement is mainly due to the new thread model asynchronous pipeline improvement, and the native epoll support.

For write-heavy clusters, since writing is slower and more expensive compared to reading, memory consumption is generally much higher than in read-heavy clusters. This leads to the direct memory buffer pool taking more load in the write-heavy cluster. Thus, we see the higher JVM GC improvement in the write-heavy cluster. From a latency point of view, in addition to the improvements in the read-heavy cluster, the JVM GC reduction in write-heavy clusters further improves the overall latency. 

Here we select two sample clusters, one with write-heavy, one with read-heavy, to show the improvement after migration.

Sample cluster 1 (write-heavy)

In this sample, the migration happened on Sept. 28, and we saw large improvements in JVM GC and latency. For OldGen GC, we saw about a 100x reduction; we saw a 10x reduction for YoungGen GC; and we saw a latency reduction of 60%.

Espresso Storage Node - OldGen GC - Memory Usage

Espresso Storage Node - YoungGen GC - Collection Time

Espresso Storage Node - Total Latency

Sample cluster 2 (read-heavy)

In this sample, the migration happened on Oct. 18, as shown in the graph. We saw large improvements in latency, with a 30% reduction on P99 and max latency after migration.

Espresso Storage Node - 99th Percentile Latency

Espresso Storage Node - Total Latency

Capacity improvement

Espresso measures the cost of a request based on how many bytes are processed in the storage node. Therefore, we measure RCU/WCU, in addition to QPS (queries per second), for capacity.

  • RCU: Read Capacity Unit – Up to 8K bytes read is counted as 1 read capacity unit (RCU)

  • WCU: Write Capacity Unit – Up to 1K bytes written is counted as 1 write capacity unit (WCU)

The cost of 1 RCU is approximately 0.6 of 1 WCU, depending on the hardware.

Capacity test

To measure the capacity improvement, we ran a series of tests to compare the difference between the old system (Netty3) and the new system (Netty4).

Test methodology

The capacity of RCU/WCU is defined as the maximum throughput of RCU/WCU that can be achieved within a specific SLA on a storage node.

We used the following SLA metrics in tests:

  • Latency: P50 → 10ms, P99 → 30ms

  • ErrorCount/ErrorRate: 0

  • GC pressure: reasonable GC, no anomaly

Test results

Capacity with different data sizes

When reviewing the results of RCU/WCU capabilities with different data sizes, we saw:

  • About 100% RCU improvement across small to large data sizes for read operations

  • About 60-100% WCU improvement on different data sizes for write operations

Capacity with different QPS levels

We also measured against a fixed data size, looking at the performance differences under different traffic loads. For this scenario, we examined a fixed 4KB data size and looked at the performance of JVM GC and the latencies under different QPS traffic loads.

For GC, we saw large improvements across all QPS levels—from 500 to 20K QPS.

For P99 latency, we saw significant improvements after the QPS reached 8K and higher.

For max latency, we saw a similar improvement threshold, with latency improvement returns significantly increasing when QPS reaches 2K and higher.

This is no small feat, as capacity improvement can directly reduce the cost of serving of the system. 

Finding the “sweet spot”

The “sweet spot” is the data size that achieves the best RCU/WCU throughput. In other words, we are striving to find the most cost-effective way to run the system.  

As we can see from the following diagram, the point of tradeoff is between CPU-bound (small data size) and Memory-bound (large data size). For our current Espresso system, the “sweet spot” of data size is around 40 KB.

To optimize resource usage, it is in the best interest of both Espresso users and the Espresso team to have the request data size be close to the “sweet spot.” For example, the read request with 100B data size costs about the same as the GET request with 7KB data size. Therefore, aggregating multiple reads of small data sizes into a larger data size of around 40KB would save considerable resources.

Lessons learned 

Along this journey, we learned a lot from our successes, but equally from the challenges we encountered. We felt it was important to share those lessons. 

CPU affinity, or…not

CPU affinity typically plays an important part in performance for SMP (Symmetric Multi-Processor) Architecture, which is common in today’s commodity computer hardware. 

  • CPU caches are 5-100 times faster than Memory access. 

  • Improved CPU cache hit-rate would bring a big performance gain.

Espresso is a heavy-loaded, muti-threading JVM application and we felt if we could improve the CPU cache-hit rate by applying CPU affinity, it’d be a big win. We spent time in this area with available tools and libraries, but found it difficult to achieve CPU affinity and cache line alignment within the Java system. For cache line alignment, there are some workarounds like padding and @Contended annotation, but they are hard to use and are only targeted for part of the problems.

Eventually, we abandoned this effort. 

Memory leak detection

Memory leak can be a disaster for mission critical systems like Espresso. By off-loading the memory from JVM heap to the buffer pool, we improved the memory footprint and system performance. On the other hand, this also requires dual diligence to managing the memory buffers with allocate/free and reference cnt, just like a native language programmer does.

There are two approaches we used to effectively detect memory leaks in the system:

  • Built-in Netty framework tool to detect memory leaks at the development stage.

    • Turn on JVM option '-Dio.netty.leakDetectionLevel=advanced'

  • For stress testing and production, we developed utilities to monitor the buffer pool stats. Here is a sample of the buffer pool stats:

                ALL Arenas Stats:

                     All Active: 27348 alloc: 8693759 dealloc: 8666411

                     Normal: 5201 alloc: 7976932 dealloc: 7971731

                     Small: 21903 alloc: 465891 dealloc: 443988

                     Tiny: 244 alloc: 250936 dealloc: 250692

                     Huge: 0 alloc: 0 dealloc: 0

In the above memory allocation stats, it shows the number of total allocations and deallocations and the active (in use) buffers for each buffer pool. By monitoring the number of active buffers over time, we can detect if there is any memory leak issue in the system.

Deployment challenges

For complex system changes like Espresso Netty4 migration, we found new challenges in Test and Deployment. On testing, we identified a need for better stress and performance testing tools, in addition to a better testing/canary in a production-grade setup. For deployment, we found existing tools are geared for stateless services, while config change and validation is difficult. 

Future work 

HTTP2 for Espresso

HTTP2 is supported over the Netty4 framework. Implementing HTTP2 for Espresso would bring the following benefits:

  • Efficient binary protocol to reduce data transport overhead.

  • Resolve the router connection pool scalability issue.

  • Provide a foundation for end-to-end streaming on Espresso.

To expand on that last point, enabling end-to-end data streaming for Espresso would allow for an asynchronicity in multi-read that would decrease latency. The current multi-read latency without streaming is at least 3-5 times greater than that of a single read, with the additional time/memory being spent waiting on the slowest response. Also with streaming, responses with a large amount of content can be divided into pieces and sent as a stream, which reduces memory GC pressure because we no longer need to hold large amounts of data in memory. This would allow for response size limits to be removed in Espresso. 


Migrating an existing, large distributed system with new technologies, while at the same time maintaining the system up and running with committed SLA requirements, is non-trivial. By completing this project, we modernized the foundation of Espresso with significant performance and capacity improvement, paving the way for the new development and..

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Co-authors: Ali Mohamed and Zheng Li

LinkedIn’s feed stands at the center of building global professional knowledge-sharing communities for our members. Members talk about their career stories, job openings, and ideas in a variety of formats, including links, video, text, images, documents, and long-form articles. Members participate in conversations in two distinct roles: as content creators who share posts, and as feed viewers who read those posts and respond to them through reactions, comments, or reshares. By helping members actively participate in those professional conversations, we are fulfilling LinkedIn’s mission to connect the world’s professionals to make them more productive and successful. 

This post focuses on one important aspect of  the machine learning at Linkedin’s feed: candidate selection. Before final fine feed ranking, a personalized candidate generation algorithm is applied to tens of thousands of feed updates to select a diverse and unique candidate pool. We describe the machine learning models applied in candidate generation and the infrastructure capabilities that support accurate and agile model iterations.

Overview of Feed at LinkedIn

At the heart of the feed sits a machine learning algorithm that works to identify the best conversations for our members. In a fraction of a second, the algorithm scores tens of thousands of posts and ranks the most relevant at the top of the feed. In order to operate at this scale and speed, LinkedIn’s feed has a two-pass architecture. The first pass rankers (FPR) create a preliminary candidate selection from their inventories based on predicted relevance to the feed viewer. Examples include updates from your network, job recommendations, and sponsored updates. A second pass ranker (SPR) then combines and scores the output from all first pass rankers. The SPR creates a single, personalized ranked list. FollowFeed is the dominant FPR that serves feed updates from your network. More than 80% of feed updates are coming from FollowFeed, and those updates contribute to more than 95% of members’ conversations. Through these conversations, active communities are formed and strengthened.

Two-pass ranking architecture for LinkedIn’s homepage feed

At LinkedIn’s scale, the main technical challenge is to find the right balance between infrastructure impact and multi-objective optimization using comprehensive machine learning algorithms. Those objectives include members’ likelihood to view updates, their likelihood to participate in conversations, and providing timely reactions to content creators. There are hundreds of machine learning features we use to compute these objectives. We want to optimize these objectives continuously and accurately while satisfying the low latency requirements of our infrastructure footprints.

Our teams tackled this problem through a joint project amongst the Feed Artificial Intelligence (AI), Feed Infrastructure, and Ranking Infrastructure teams. As we ramp this project globally, we would like to share more details about our technical solutions.

  1. We refreshed the machine learning software stack in FollowFeed, leveraging the latest productive machine learning technologies. Through a ranking engine upgrade and model deployment technology, we enabled frequent and agile updating of machine learning models. We also added accurate tracking of machine learning features in FollowFeed, which helps us guarantee data and model consistency across training and serving. Moreover, we developed tools to inspect machine learning algorithm complexity at serving time.

  2. With minimal additional complexity, we have rebuilt our machine learning model for candidate selection from scratch with new objectives and different algorithms. As part of this, we introduced the prediction of professional conversation contribution into our model to capture the community-building aspect of each feed update. Instead of multiple logistic regressions with manual interactions, we’ve used a single XGBoost tree ensemble to trim down the complexity. Additionally, we’ve considered timely feedback to content creators in our model and make sure all members have a chance to feel heard. All of these things are done with minimal infrastructure capacity addition.

In summary, this project builds the engineering capabilities for us to iterate comprehensive machine learning models at the candidate generation stage, and we’ve leveraged these capabilities to deliver performant, multi-objective optimization models. At LinkedIn’s scale, these solutions will help bring economic opportunity to the global workforce through more relevant conversations with their professional communities.

Performant AI infrastructure with agility

FollowFeed, LinkedIn’s powerful feed indexing and serving system, has now been equipped with advanced machine learning capabilities. The initial design had accommodated the ranking needs for feed, but the field of machine learning has advanced tremendously in the past five years since the original FollowFeed design. During this project, we boosted the agility and productivity of machine learning in FollowFeed by adopting the latest machine learning inference and model deployment technologies. Such infrastructure enhancements enable the modeling capability described later in this blog post.

FollowFeed architecture

We have updated FollowFeed’s ranking engine to Quasar. Quasar, as part of LinkedIn’s Pro-ML technology, transforms machine learning features and inferences the machine learning model at query time. As a high-performance, multi-threaded ranking engine, Quasar not only optimizes for infrastructure system efficiency but also machine learning productivity. Such productivity improvements have enabled:

  1. Cross-system leverage: We can easily port the latest machine learning models and transformers from the second pass layer to FollowFeed.

  2. Training and serving consistency: At offline training time, the same codebase is used to represent the model as at serving time. 

To reflect the rapid evolution of LinkedIn’s content ecosystem, machine learning models have to be constantly updated. We’ve built FollowFeed’s model deployment on top of LinkedIn’s Continuous Integration and Deployment (CICD) stack. Being a stateful system that indexes members’ past activities, FollowFeed presents a unique challenge in model deployment. We have to avoid calling external services to maintain high reliability and performance of index nodes where ranking is taking place. To optimize for such limits, we previously coupled ranking models with code, which leads to strong coupling of service deployment with model coefficient changes. To allow for easier model evolution, our solution is now a data pack-based deployment model, where we package the machine learning models in a separate code base, a.k.a. a multi-product. Those models are treated as a “data pack,” a deployable package consisting only of static files to be dropped into a specific location of production machines. Through such a design, model deployment can be easily managed by LinkedIn’s CICD system. Consequently, we’ve improved model deployment velocity from 3 days to 30 minutes.

In addition to ranking and model deployment, we designed and implemented advanced feature access and tracking in FollowFeed. As it scores thousands of documents per session, FollowFeed optimizes access to machine learning features needed by scoring models. Viewer features are passed down as part of the request without requiring an external call to be made. Features for feed updates are ingested, stored, and updated alongside these updates so that they are accessible locally on the index nodes for scoring. Given the well-known data inconsistency challenges between offline training and online scoring, we also added accurate tracking of machine learning features in FollowFeed. This guarantees data consistency between offline training data and online inference data. Aggregating these tracking events across the distributed index nodes presents a challenge. Even though results from the index nodes are aggregated in the broker layer, we do not want to gather the tracking data synchronously due to scalability and serving latency concerns. Our design overcomes these concerns by having individual index nodes and the broker node stream tracking events asynchronously to Kafka streams. A Samza job joins these Kafka streams and emits a comprehensive tracking event for the request. 

Equipped with advanced machine learning capabilities, it is much easier to develop performant machine learning models for FollowFeed with accurate features. Such agility will enable better candidate feed updates to be surfaced on LinkedIn’s homepage. Through actively updating the machine learning model, we will be able to give more power to existing and newly-minted content creators in LinkedIn’s ecosystem. We will also facilitate the professional community builders’ curation of their audience.

Optimizing for conversations at candidate generation

We’ve rebuilt the machine learning model at FollowFeed to optimize multiple objectives under given infrastructure constraints. The existing candidate selection is conducted through a logistic regression (LR) model that predicts the click probability. To facilitate professional conversations in LinkedIn’s feed, we have introduced “contribution” as an additional objective in the candidate selection model. Probability of contribution captures members’ intent to share, comment, or react to a particular feed update. The model also takes into account timely feedback to content creators, which is a clear signal for cultivating and retaining audience builders on LinkedIn.

To achieve these goals, our design evaluates candidate selection through recall instead of precision, combines multiple objective functions in a gradient boosting tree model, and trims down features and manual transformations.

Given that there is a comprehensive model at the second pass layer with more sophisticated computation per update, the job of candidate selection should be focused on generating a rich set of candidates. Clarifying such goals helps us evaluate our model much more effectively. Instead of precision, we use recall as an evaluation metric because it measures the overlap between the top K candidates generated by FollowFeed and the second pass ranker. K is very large for LinkedIn’s feed. To overcome the selection bias introduced by FollowFeed, we randomize its output for a small percent of traffic so that the second pass layer can be exposed to a representative range of candidates. This technique helps us approximate the second pass ranking at the candidate selection process. This technique helps us approximate what the second pass layer's ranking would be if the candidates were randomized and not pre-ranked by the first pass layer. Through various efforts outlined below, we have effectively doubled the recall percentage throughout the project.

Examples of recall calculation

We use a logistic regression model with differential weights to incorporate additional contribution objectives in a single objective model. It helps reduce the parameter space and model complexity by half while effectively reproducing the effect of a multi-objective model.  Through a suite of tools to conduct CPU/network benchmarks, JVM model profiling, and feature importance analysis, we’ve simplified the model by replacing manual feature transformations with the gradient-boosted tree and trimming down excessive features. We utilized the same differential weighting technique to train the single tree ensemble. As gradient-boosted trees are powerful models themselves, we explored using a tree model alone, without logistic regression as the final layer, to predict the re-weighted objective. The list below shows various techniques we’ve tried, with different trade-offs in terms of infrastructure cost and online results. 

  • Baseline: The production model for FollowFeed for the past several years has been a logistic regression using manually-transformed features that predict the probability of clicks.

  • Tree + transformations + logistic regression: We added the contribution objective to our model. In addition to the existing transformed features, we also added XGBoost tree scorers as interaction features to the contribution-weighted-click logistic regression. This version performed much worse in terms of CPU utilization and 99th percentile latency, but online experiments showed strong lift in member engagement.

  • Tree + logistic regression: We removed manually-transformed features from the above implementation to reduce model complexity. Its infrastructure costs are still worse than baseline, but better than the previous implementation. We can keep similar engagement improvements.

  • Tree only: We use XGBoost tree scorer to calculate the contribution-weighted-click and remove the logistic regression component. We keep the majority of engagement improvements, This version has minimal additional overhead on CPU utilization and actually reduced tail ranking latency compared to the baseline. We’ve decided to ramp tree-only model to all members.

We’ve also adjusted the candidate selection results based on the freshness of the updates. This comes from observing a secondary creator-side effect while ramping an earlier version of the candidate selection algorithm. While viewers are less sensitive to the freshness, content creators do care about receiving prompt responses. By adjusting the freshness, viewers can provide creator feedback earlier and stay in active conversations.


The project’s models are already rolling out across LinkedIn. The feed updates are scored and ranked in Quasar. The machine learning models have been deployed through LinkedIn’s CICD system using FollowFeed’s data pack. Over the past few years, we’ve tried about 100 variations of the machine learning models and picked the best performing one through online A/B testing. We have seen a lot more members on LinkedIn's feed participating in professional conversations due to better candidate selection. We’ve also observed more reactions given to a broader range of creators so that they have additional ways to engage with content on the platform. We are looking forward to ramping this to all our members soon. In the meantime, we are continuing to innovate in our machine learning infrastructure and modeling technologies to bring the best conversations to members’ homepage feed.


The work described in this blog post is carried out by engineering talents across LinkedIn’s Feed AI, Feed Infra, and Ranking Infra teams, with strong support from Data Science, Relevance Explains, and SRE. In no particular order: Fei Chen, Greg Pendler, Hassan Khan, Ian Ackerman, Madhu Arun, Manas Somaiya, Parin Shah, Prasad Sriram, Prashant Saxena, Shihai He, Souvik Ghosh, Tim Jurka, and Ying Xuan.

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

At LinkedIn, our mission is to connect the world’s professionals to make them more productive and successful. Part of the way we achieve this goal is by providing a platform where communities connected by common interests or shared experiences can form. Several features—for instance, Groups—help foster the growth of these communities. However, an integral tool for connecting members to each of their various communities is the LinkedIn feed. The feed provides a place for our members to discover and join conversations happening among their connections or within their groups. 

As we’ve discussed previously on this blog, artificial intelligence (AI) plays an important role in sifting through the myriad updates and posts available to serve to a member when they view their feed. These efforts are managed by the Feed AI team within LinkedIn. There are many different parameters to optimize for, however, within the larger umbrella of feed relevance. To assist these efforts, we have a dedicated Communities AI team focused on using AI to serve members relevant content that sparks interest and conversation within the communities they’re part of.

In this post, we give an overview of the techniques that the Communities AI team uses to help form communities and conversations around common areas of interest. These techniques include: follow recommendations, topic-based feeds, hashtag suggestions, and typeaheads.

Fostering active communities at LinkedIn can be broken down into the following components:

  1. Discover: Help members find new entities to follow that will connect them with communities that share their interests.
  2. Engage: Engage members in the conversations taking place in their communities by serving content from their areas of interest.
  3. Contribute: Help members effectively engage with the right communities when they create or share content.

These three aspects are all part of the same ecosystem and our goal is to build an AI platform that closes the loop between Discover, Engage, and Contribute.


There are many ways we can determine a member’s interests. However, we focus here on the explicit signal the member provides by following different entities (e.g., following companies, other members, influencers, hashtags, or joining groups). The vast majority of the follow connections that happen on LinkedIn are driven by the Follow Recommendations product (see Figure 1) that is developed by the Communities AI team.

Figure 1: Example follow recommendations for a member. This example shows hashtag and company recommendations.

The goal of the Follow Recommendations product is to present the member with follow recommendations that the member finds both relevant (i.e., increase the probability the member will follow the recommended entity) and engaging (i.e., the recommended entity produces content that the member finds relevant). Estimating relevance and engagement is where the bulk of the machine learning work happens. To compute these estimates, we rely on information (features) from the viewer (member), the entity to be followed (e.g., influencer, company, hashtag, group), and the interaction between the viewer and the entity (e.g., the number of times the member viewed the feed of a specific hashtag).

There are over 630 million members on LinkedIn. This presents a scaling challenge and a relevance challenge. The Follows Relevance data flow processes hundreds of terabytes of data and is the second largest at LinkedIn after the People You May Know flow. To understand how we managed this explosion of data, we refer the reader to the article Managing "Exploding" Big Data.


As soon as a member follows an entity, the content generated from that entity starts flowing to the member’s main feed and is ranked in conjunction with other content (e.g., posts from 1st degree connections). 

The member can also go to specialized feeds for each entity that they follow, be it a hashtag, group, event, etc. We personalize these feeds by ranking more relevant content higher. For example, a post in the #AI feed that is posted by an influencer the member follows is more likely to be relevant than a post that is generated by another member.

The goal of feed ranking at LinkedIn is to help members discover the most relevant conversations that will aid them in becoming more productive and successful. Relevance is determined by our objective function which optimizes for three main components: The value to the member, the value to the member’s network (downstream effects), and value to the creator of the post. A diverse set of machine learning and experimentation techniques are used to estimate these three components and the combined effect of the three (e.g., see Spreading the Love in the LinkedIn Feed with Creator-Side Optimization). 


Our Hashtag Suggestions and Typeahead (HST) product recommends hashtags that allow the member to effectively target their posts to the right communities. In addition to reducing the friction the member faces when trying to add hashtags to their post, the HST product allows us to consolidate content around areas of interests and prevent content fragmentation.

The objective of HST is to both increase the probability that a member will select relevant hashtags from the recommended list to add to their post and increase the relevant feedback the member will get on their post. Here we use a variety of natural language processing (NLP), deep learning, word embedding, and supervised learning techniques to recommend relevant hashtags. The HST product is shown in Figure 2 below.

Figure 2: A demonstration of the HST product in the LinkedIn Share Box. HST suggests hashtags while the member types their post.

Interactions among Discover, Engage, and Contribute

A member’s journey does not necessarily have to happen in the order given above. In addition, a member’s behavior in one aspect can be a valuable signal in another aspect. For example, if a member tags a post with a specific hashtag that the member does not follow, then the Follow Recommendations product can use this signal to determine the relevance of this hashtag to the member and recommend the hashtag to be followed. 


The Communities AI’s vision is to use Artificial Intelligence (AI) to serve relevant content to members based on what they are interested in and help members engage with each other. We’ve explained the three aspects of our communities ecosystem (Discover, Engage, and Contribute). In addition, we’ve highlighted some of the technical challenges in each aspect. 

There are more details that we intentionally skipped because they are out of the scope of this article. Stay tuned because we’ll likely discuss these details in future articles! 


It takes a lot of talent and dedication to build the AI products that enable communities on LinkedIn. We thank all the members of the Communities AI team (Ankan Saha, David Golland, Brian Olson, Andrew Hatch, Suman Chakravartula, Mohammad Rafiee, Emilie De Longueau, Aubrey Gress, Ian Wood, Daniel Ricciardelli, and Hitesh Kumar) for working on the products discussed above. Our products would not be possible without a robust and scalable infrastructure. We thank LinkedIn’s Feed Infrastructure partners (Parin Shah, Hassan Khan, and Ali Mohamed) for making this infrastructure easily available. The Feed AI Foundations team at LinkedIn develops tools and frameworks to automate and streamline different aspects of the AI systems that power our products. We thank the Feed AI Foundations team (Zheng Li, Boyi Chen, Ian Ackerman, and Marco Varela Alvarado) for developing the tools and frameworks that make us more productive. We thank Ann Yan and Fangshi Li from the Ranking Infrastructure and Hadoop Development teams for helping us scale our ranking infrastructure. We thank Linda Leung, Fawn Qiu, and Emily Carrolo from our Product Management team for making sure we deliver great product experiences for our members.

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

If Apache Kafka is the lifeblood of all nearline processing at LinkedIn, then Apache Samza is the beating heart pumping that blood around. Samza at LinkedIn is provided as a managed stream processing service where applications bring their logic (leveraging the wide variety of Samza APIs), while the service handles the hosting, managing, and operations of the applications. Applications run on managed clusters, equipped with a cluster manager such as YARN, Kubernetes, or Mesos.

The number one priority for such a managed service is stability, while providing the usual desirables (new features, upgrades, improved guarantees, among others). The go-to way of ensuring this is rigorous and comprehensive testing. However, comprehensive testing at a cluster (or higher) scale—be it for failure testing, testing for rolling out upgrades, or feature development—is often cumbersome and time-consuming.

What if there were a way to perform cluster-scale testing from the comfort and confines of your developer machine? At LinkedIn, we use Linux Containers (LXC), emulating a realistic YARN cluster, to be used for testing new features, automated failure testing for Samza, YARN upgrades, and YARN-Samza interactions.

Docker, LXC, and the art of OS-level virtualization

Similar to Docker, LXC is an OS-level virtualized solution that uses Linux namespaces and cgroups to create multiple isolated Linux virtual execution environments that share the same kernel, or “chroot on steroids.” In fact, early versions of Docker used LXC. OS-level virtualization benefits from lower virtualization overhead, because of which it offers higher density, compared to full- or para-virtualization approaches.

This means one can create multiple Docker or LXC instances on a machine, have them participate in a YARN-managed cluster, and run and test applications (like Samza) over this setup. YARN simply sees the Docker or LXC instances as “machines.” All multi-machine code paths get exercised and tested, while the low virtualization overhead allows for great density. We’ve been able to create YARN clusters of 10 to 100 “machines” on a single commodity physical machine.

That said, Docker supports using only a single process per Docker instance, although more recently that has been termed a best practice rather than a strict limit. Currently, LXC is our choice for the virtualization solution, but we will explore Docker soon.

Setting things up

This entire setup has been automated and is available here, and works in conjunction with samza-hello-world. The figure below provides an overview of our setup that uses LXC to emulate a YARN cluster on a given host (called base-host).

We set up a private subnet for the LXC based virtual-private-cluster. In this, the base host has a virtual network interface (using Linux’s libvirt) and a private IP (e.g., Similarly, all LXC instances use a virtual network interface with a similar private IP, all connected to a virtual subnet with the base host acting as the gateway. In addition, all LXC-instances are source-NATed. This allows them to talk to the internet, particularly useful for installing or upgrading packages within the LXC instance.

YARN and deployment
Each LXC instance runs a YARN NodeManager (NM), which is networked to a YARN Resource Manager (RM) running on the base host, along with Apache Kafka and a Zookeeper instance. In addition, we use a shared directory on the base host that is mounted within each LXC instance. This shared directory serves as a virtual repository to distribute application binaries to the LXC instances using Samza’s package.path config.

The end result is a YARN cluster with LXC-instances acting as hosts, sized to over-subscribe and hence statistically multiplex the base host’s resources (e.g., a cluster of 50 LXC instances of 8 GB each on a base host with 64 GB of RAM).

Making the network realistic

Apart from the setup, we also need to emulate the network to ensure that our testing reflects the characteristics of the real network, such as bandwidth limit, network delay, packet loss, etc. Given the private-subnet based networking for the LXC instances (above), we use Dummynet to set up bandwidth limits between LXC instances, and add network delays that represent typical inter-data-center latencies and packet loss. For example:

This adds a bandwidth limit of 100 Mbps with a 10 ms delay and 1% randomized packet loss on “pipe 1,” which connects LXC-instance-0 to the gateway. Similarly, queuing delays can be added by specifying the queue size for “pipe 1.” This article from the Association for Computing Machinery describes Dummynet in greater detail.

Putting it to use

There are a number of use cases for which we’ve found our setup useful because it avoids the overhead of using a real cluster (e.g., setup, coordination, etc.). Our use cases mostly fall under the three types: validation, failure testing, and development.

Our most common use case is validating YARN semantics and performance before adopting an upgraded version. This is especially important when running Samza as a managed service to ensure that features like Samza’s host affinity have not regressed. This setup allows us to carry out controlled runs of Samza jobs and validate different host-affinity related metrics.

Another class of use cases is failure-testing. In the case of machine failure, testing Samza behavior is much easier because it can be triggered by simply issuing a command to stop an LXC-instance. Container failures are tested similarly, while network partition behavior is tested seamlessly by setting the bandwidth of an LXC-instance to 0 (using Dummynet as described above).

Lastly, this setup also improves the developer experience when testing features that require multi-machine interactions, such as Samza’s standby containers, host-affinity, startpoints, and partition expansion. We’re working on extending this approach to test and validate other distributed systems which bear similar requirements.

With distributed systems being increasingly offered as “managed services,” conducting realistic, rigorous, yet easy testing continues to be a problem today. Hopefully, our approach finds a use in the distributed systems that you build and manage.

Want to work on similar problems in large-scale distributed systems? The Streams Infrastructure team at LinkedIn is hiring engineers at all levels! To learn more, check out all of our open roles on LinkedIn.


This work wouldn’t have been possible without the valuable help and contributions from Prateek Maheshwari, Ahmed Abdul Hamid, Jagadish Venkataram, and Sanil Jain.

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Pinot is an open source, scalable distributed OLAP data store that entered the Apache Incubation recently. Developed at LinkedIn, it works across a wide variety of production use cases to deliver real-time, low latency analytics.

One of the biggest challenges in Pinot is achieving and maintaining tight SLA on latency and throughput on large data sets. Existing indexing techniques, such as sorted index and inverted index, help accelerate the document search to improve query latencies. However, their performance scales linearly with the number of documents to be processed in computing the results. On the other hand, pre-aggregating the results can reduce the number of documents to be processed, thus enforcing a constant upper bound on query latencies. This can lead to limited query support (by limiting the dimension combinations) or storage space explosion (when multiple dimension combinations are considered).

For aggregation and group-by queries, we introduced Star-Tree index to utilize the pre-aggregated documents in a smart way that achieves low query latencies, while using the storage space efficiently.

Existing solutions

Consider the following data set as an example to discuss the existing approaches (the data set has been de-duped and pre-sorted by country):

Doc ID Country Browser Locale Impressions
0 CA Chrome en 400
1 CA Firefox fr 200
2 MX Safari es 300
3 MX Safari en 100
4 USA Chrome en 600
5 USA Firefox es 200
6 USA Firefox en 400

Sorted index
In this approach, data is sorted on a primary key (Country in the example), which is likely to appear as a filter in most queries in the set.

Below is the sorted index for column Country:

Country Doc ID
CA 0-1
MX 2-3
USA 4-6

Consider the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’

With sorted index on column, Country, the time complexity of searching the documents for the given primary key value ‘USA’ is reduced from linear scan Θ(n) to constant time value lookup Θ(1). Additionally, sorting provides spatial locality for documents, which reduces disk fetches significantly when fetching the documents, and thus improves latency.

While this is a good improvement over linear scanning, there are still a few issues with this approach:

  • Sorting can only be applied to the primary key, which means only queries filtering on that one column can benefit from the sorted index.

  • While search time is reduced from Θ(n) to Θ(1), aggregation cost is still a function of the total number of documents (in this example, 3) to be processed to answer the query.

Inverted index
In this approach, for each value of a given column, we maintain a list of document IDs where this value appears.

Below are the inverted indexes for columns Browser and Locale for our example data set:

Browser Doc ID Locale Doc ID
Chrome 0,4 en 0,3,4,6
Firefox 1,5,6 es 2,5
Safari 2,3 fr 1

Consider the query: SELECT SUM(Impressions) FROM Table WHERE Browser = ‘Firefox’

With inverted index on column Browser, similar to the sorted index, the document search becomes a simple value lookup for the key ‘Firefox’ to directly get the matching documents of [1, 5, 6].

Using inverted index can bring the documents search time down to constant time Θ(1) for arbitrary columns. However, it cannot leverage spatial locality, and, similar to sorted index, the aggregation cost is still a function of the query’s selectivity (in the example, this would be 3).

In this technique, we pre-compute the answer for a given query set up front.

In the example below, we have pre-aggregated the total Impressions for each Country:

Country Impressions
CA 600
MX 400
USA 1200

Consider the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’

With pre-aggregation, the query can be solved by just a value lookup, and we can directly get the final result of 1200 without extra aggregation cost.

However, with the pre-aggregation in the example, we are able to solve only queries with predicate on Country. To be able to answer queries with multiple predicates implies pre-aggregation for various combinations of different dimensions. This leads to exponential explosion to the number of dimensions in storage space (considering the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’ AND Browser = ‘Firefox’ AND Locale = ‘en’).

Star-Tree solution

The graph below shows the performance gain and space cost for the different techniques. On the left side, we have indexing techniques that improve search time with limited increase in space, but do not guarantee a hard upper bound on query latencies because of the aggregation cost. On the right side, we have pre-aggregation techniques that offer hard upper bound on query latencies, but suffer from exponential explosion of storage space.

We propose the Star-Tree data structure inspired by the star-cubing paper (Xin, Han, Li, & Wah, 2003) that offers a configurable trade-off between space and latency and allows us to achieve a hard upper bound for query latencies for a given use case.

In the following sections, we will first define the Star-Tree data structure, followed by a Star-Tree example on the sample data set. Then, we will discuss how Star-Tree is utilized within Pinot to achieve low latencies with high throughput.

Tree structure
Star-Tree is a tree data structure that has the following properties:

  • Root Node (Orange): Single root node from which the rest of the tree can be traversed.

  • Leaf Node (Blue): A leaf node can contain, at most, T documents, where T is configurable.

  • Non-leaf Node (Green): Nodes with more than T documents are further split into children nodes.

  • Star-Node (Yellow): Non-leaf nodes can also have a special child node called the Star-Node. This node contains the aggregated documents after removing the dimension on which the data was split for this level. Star-Node can be either a leaf or non-leaf node.

  • Dimensions Split Order ([D1, D2]): Nodes at a given level in the tree are split into children nodes on all values of a particular dimension. The dimensions split order is an ordered list of dimensions that is used to determine the dimension to split for a given level in the tree.

  • Function Column Pairs: The pre-aggregations to perform when generating the tree.

  • Max Leaf Records: The threshold T to determine whether to further split each node. This threshold is used to tune the level of pre-aggregations performed. With a larger threshold, the index size will be smaller, while more documents will need to be processed to answer the query.

Node properties
The properties stored in each node are as follows:

  • Dimension: The dimension by which the node is split on.

  • Value: The value of the dimension that the node represents.

  • Start/End Document ID: The range of documents this node points to.

  • Aggregated Document ID: One single document which is the aggregated result of all documents pointed to by this node.

Index generation
Star-Tree index is generated in the following steps:

  1. The data is first projected per the dimensionsSplitOrder. Only the dimensions from the split order are reserved, while others are dropped. For each unique combination of reserved dimensions, metrics are aggregated per configuration. The aggregated documents are written to a file and served as the initial Star-Tree documents (separate from the original documents).

  2. Sort the Star-Tree documents based on the dimensionsSplitOrder. It is primarily sorted the first dimension in this list, and then secondarily sorted on the rest of the dimensions based on their order in the list. Each node in the tree points to a range in the sorted documents.

  3. The tree structure can be created recursively (starting at the root node) as follows:

    1. If a node has more than T records, it is split into multiple children nodes, one for each value of the dimension in the split order corresponding to current level in the tree.

    2. A Star-Node can be created (per configuration) for the current node, by dropping the dimension being split on and aggregating the metrics for rows containing dimensions with identical values. These aggregated documents are appended to the end of the Star-Tree documents.

    3. If there is only one value for the current dimension, Star-Node won't be created because the documents under the Star-Node are identical to the single node.

  4. The above step is repeated recursively until there are no more nodes left to split.

  5. Multiple Star-Trees can be generated based on different configurations (dimensionsSplitOrder, aggregations, T). The query executor can pick the one with configurations capable of solving the query (discussed in Query Execution section)

Aggregation is configured as a pair of the aggregation function and the column to apply the aggregation.

Supported functions
All aggregation functions with bounded-sized intermediate results are supported:

  • COUNT: Intermediate result Long is bounded

  • MIN: Intermediate result Double is bounded

  • MAX: Intermediate result Double is bounded

  • SUM: Intermediate result Double is bounded

  • AVG: Intermediate result Pair<Double, Long> is bounded

  • MINMAXRANGE: Intermediate result Pair<Double, Double> is bounded

  • DISTINCTCOUNTHLL: Intermediate result HyperLogLog is bounded

  • PERCENTILEEST: Intermediate result QDigest is bounded

  • PERCENTILETDIGEST: Intermediate result TDigest is bounded

Unsupported functions
Some aggregation functions have unbounded-sized intermediate results, which are not supported to prevent storage space explosion. However, the approximation of the result can be achieved with the supported functions above.

  • DISTINCTCOUNT: Intermediate result Set is unbounded

  • PERCENTILE: Intermediate result List is unbounded

Query execution
For query execution, the idea is to first check metadata to determine whether the query can be solved with the Star-Tree documents. If so, then traverse the Star-Tree to identify the documents that satisfy all the predicates. After applying any remaining predicates that were missed while traversing the Star-Tree to the identified documents, apply aggregation/group-by on the qualified documents.

Metadata check
In order to solve an aggregation/group-by query with Star-Tree, all the columns in filters and group-by clauses must be materialized (configured in dimensionsSplitOrder), and all the aggregations must be pre-aggregated (configured in functionColumnPairs). The query executor will pick the first Star-Tree that meets the requirements or fall back to normal aggregation if no one is qualified.

Traverse the tree
The algorithm to traverse the tree can be described with the following diagram:

Apply remaining predicates
Some of the dimensions might not be split because of the leaf records threshold. In such a case, the remaining predicates on the not-split dimensions will be applied after traversing the tree (same as normal query execution except for the use of the pre-aggregated records). The leaf records threshold is the upper limit of records to be processed for each branch in the tree.


Doc ID Country Browser Locale Impressions
0 CA Chrome en 400
1 CA Firefox fr 200
2 MX Safari es 300
3 MX Safari en 100
4 USA Chrome en 600
5 USA Firefox es 200
6 USA Firefox en 400

Use this example data set and the following configurations as an example:

  • Dimensions Split Order: [Country, Browser, Locale]

  • Function Column Pairs: [SUM(Impressions)]

  • Max Leaf Records: 1 (We put 1 here so that all of the dimension combinations are pre-aggregated for clarity)

Tree structure
The values in the parentheses are the aggregated sum of Impressions for all the documents under the node.

Star-Tree documents

Doc ID Country Browser Locale SUM_Impressions
0 CA Chrome en 400
1 CA Firefox fr 200
2 MX Safari en 100
3 MX Safari es 300
4 USA Chrome en 600
5 USA Firefox en 400
6 USA Firefox es 200
7 CA * en 400
8 CA * fr 200
9 CA * * 600
10 MX Safari * 400
11 USA Firefox * 600
12 USA * en 1000
13 USA * es 200
14 USA * * 1200
15 * Chrome en 1000
16 * Firefox en 400
17 * Firefox es 200
18 * Firefox fr 200
19 * Firefox * 800
20 * Safari en 100
21 * Safari es 300
22 * Safari * 400
23 * * en 1500
24 * * es 500
25 * * fr 200
26 * * * 2200

Query execution
SELECT SUM(Impressions) FROM Table

Because there is no predicate or group-by on any dimension, select the Star-Node for all dimensions (document 26). Instead of aggregating all seven documents without Star-Tree, we directly get the aggregation result 2200 by processing only one document.

SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’

Because there is only a predicate on Country, select the node with value ‘USA’ for Country, the Star-Node for Browser and Locale (document 14). Instead of filtering out and aggregating three documents without Star-Tree, by processing only one document, we get the aggregation result 1200.

SELECT SUM(Impressions) FROM Table WHERE Locale = ‘en’

Similar to the last query, select the Star-Node for Country and Browser, the node with value ‘en’ for Locale (document 23). Again, by processing only one document, we get the aggregation result 1500.

SELECT SUM(Impressions) FROM Table GROUP BY Browser

Because there is a group-by clause on Browser, select the Star-Node for Country and Locale, and all nodes except for the Star-Node for Browser (document 15, 19, 22). For Country ‘*’, Browser ‘Chrome’, since there is no Star-Node for Locale, select all child nodes instead. To get the group-by result, we need to process only one document for each group.

Benchmark result

We have performed a benchmark comparing the performance gains from Star-Tree index and inverted index against 48 million records generated from the TPC-H tools. Results were as follows:

With such huge improvements in performance, the Star-Tree index only costs about 12% extra storage space..

  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

“Make LinkedIn feel instantaneous” is a mantra our performance team lives by to deliver the best possible experience to our members. We are constantly on the lookout for ways to improve our site speed and user experience, be it through optimizing our infrastructure and edge, accelerating our web and mobile applications’ performance, or tuning our systems and making our services more efficient.

Experiments at the edge

A vital area of focus to achieve our aspiration to make LinkedIn feel instantaneous is to deliver content as effectively as possible to our members. This demands continuous brainstorming, coming up with optimizations, and experimenting to see their effects. We run a lot of such experiments at the “last mile,” covering LinkedIn’s infrastructure that directly connects to our members over the internet and tremendously influences how fast content is delivered to them.

The puzzle

In the recent past, some of the experiments we have run at our edge have yielded interesting results.

For example, when we looked at how three different experiments performed across four countries, we found that different “optimizations” operate differently across geographies. Case in point is a recent study we did with TCP congestion control algorithms.

We continued to see such behavior in other experiments as well. This trend is illustrated in the chart below. For example, experiment 1 shows a lot of improvement in India, compared to other regions, while experiment 2 was more divisive, showing big gains in India but degradations in all other regions.

These findings weren’t too surprising, but reinforced the idea that we needed to deliver specific optimizations to certain regions.

Next, we analyzed the data from one experiment over a controlled population, specific to a region. The results were more concerning now. The chart below shows how much improvement we got each day from this experiment. And it varies a lot—from an improvement of 5% one day to swinging to a 2% degradation the next! (You can get an idea of how much is a lot here.)

It is critical to understand the implications of this pattern. In a controlled environment, it is very rare that results of such configurations at the edge fluctuate between being very good on one day and bad on another.

We conducted further analysis by breaking down the data from the experiment by Autonomous Systems, or ASNs (think your internet provider) in a given region. This gave us a better idea of what was happening behind the scenes. We eventually uncovered the fact that even in a given region, different networks come with highly divergent characteristics. In other words, an optimization that we think works may work for members on some networks, but not for others. By rolling out such optimizations, we were forcing a “selective penalty” on some members, while improving the content delivery for others.

Here is the chart for the above experiment, considering the top eight ASNs in the region we were investigating.

This variability is ominous to performance and there is no guarantee that all our members are getting better experience from such “optimizations.”

The missing piece

To prove our hypotheses about network characteristics playing a significant role in determining impact to our members, we took an experiment as an opportunity to understand how member engagement is impacted when provided with a faster site speed experience.

The experiment entailed analyzing and classifying members’ networks based on their historical performance (RUM) data, as “fast,” “average,” and “slow” network quality. We then served a “faster” (lighter) application experience to a randomly chosen segment of members. After a few days of experimentation, we analyzed the data and compared how members engaged with the application across the three classes of network quality.

We found that member experience and engagement improve when we deliver a faster application. The impact is that such an application engages members on “slow” networks a lot more than members who are on faster networks.

To understand if our members liked the faster experience, we looked at how many user sessions were created in each segment and how many members visited the application. For example:

  • In each network quality class, the improvement in site speed gradually improved across the board with the members on fast networks seeing the least improvement, while the ones on slow networks saw the largest gains.

  • Subsequently, the number of unique members visiting the application and the number of sessions both improved proportionally.

One segment stood out to us as an anomaly: the number of sessions did not grow proportionally with site speed improvement for the average class. We concluded through further analysis that this is most likely due to the “lighter” nature of the experience not being “preferred” by this class of members over a slower experience. This is likely where the trade off between site speed and features becomes critical.

This analysis proved two hypotheses:

  1. Members are more engaged on a faster application. As the experience gets faster, user engagement goes up. This is especially true for members already on slow networks.

  2. Network quality plays a significant role in understanding our members’ experience. By appraising the network quality of a user, we could customize their experiences by providing them what they prefer, suitable to the network they are on.

Network quality as a service: Customizing content delivery

The experiments and analyses above helped us better understand how members’ network quality, among other factors, influences their experience on the application. We can then use this knowledge to customize the content delivery to their needs and enrich our members’ experiences on LinkedIn by undertaking the effort to provide network quality as a service within LinkedIn.

Defining and measuring metrics for network quality is fairly straightforward to implement and employ. The challenge arises in circumstances where measurement might either be antiquated or simply infeasible. To handle such scenarios, we have built a deep learning pipeline using RUM data to be able to predict the network quality of every connection to LinkedIn.

Stay tuned for Part 2 of this series to be focused on delivering customized content to our members.


The entire story leading to network quality as a service has been a multi-team effort spanning many quarters. This has involved many engineers and managers across the Performance Engineering, Edge SREs, Traffic Infra Dev, Data Science, Flagship Web, LinkedIn Lite, and AI/ML teams at LinkedIn.

I would like to specifically thank Ritesh Maheshwari and Anant Rao for supporting us through this journey. Special mention to Brandon Duncan for putting up with us through the process and being supportive of such exploratory efforts!

Read for later

Articles marked as Favorite are saved for later viewing.
  • Show original
  • .
  • Share
  • .
  • Favorite
  • .
  • Email
  • .
  • Add Tags 

Separate tags by commas
To access this feature, please upgrade your account.
Start your free month
Free Preview