Brooklin - a stateless and distributed service for streaming data in near real-time and at scale - has been running in production at LinkedIn since 2016, powering thousands of data streams and over 2 trillion messages per day. Today, we are pleased to announce the open-sourcing of Brooklin and that the source code is available in our Github repo!
At LinkedIn, our data infrastructure has been constantly evolving to satisfy the rising demands for scalable, low-latency data processing pipelines. Challenging as it is, moving massive amounts of data reliably at high rates was not the only problem we had to tackle. Supporting a rapidly increasing variety of data storage and messaging systems has proven to be an equally critical aspect of any viable solution. We built Brooklin to address our growing needs for a system that is capable of scaling both in terms of data volume and systems variance.
What is Brooklin?
Brooklin is a distributed system intended for streaming data across multiple different data stores and messaging systems with high reliability at scale. It exposes a set of abstractions that make it possible to extend its capabilities to support consuming and producing data to and from new systems by writing new Brooklin consumers and producers. At LinkedIn, we use Brooklin as the primary solution for streaming data across various stores (e.g., Espresso and Oracle) and messaging systems (e.g., Kafka, Azure Event Hubs, and AWS Kinesis).
Brooklin supports streaming data from a variety of sources to a variety of destinations (messaging systems and data stores)
There are two major categories of use cases for Brooklin: streaming bridge and change data capture.
Data can be spread across different environments (public cloud and company data centers), geo-locations, or different deployment groups. Typically, each environment adds additional complexities due to differences in access mechanisms, serialization formats, compliance, or security requirements. Brooklin can be used as a bridge to stream data across such environments. For example, Brooklin can move data between different cloud services (e.g., AWS Kinesis and Microsoft Azure), between different clusters within a data center, or even across data centers.
A hypothetical example of a single Brooklin cluster being used as a streaming bridge to move data from AWS Kinesis into Kafka and data from Kafka into Azure Event Hubs.
Because Brooklin is a dedicated service for streaming data across various environments, all of the complexities can be managed within a single service, allowing application developers to focus on processing the data and not on data movement. Additionally, this centralized, managed, and extensible framework enables organizations to enforce policies and facilitate data governance. For example, Brooklin can be configured to enforce company-wide policies, such as requiring that any data flowing in must be in JSON format, or any data flowing out must be encrypted.
Prior to Brooklin, we were using Kafka MirrorMaker (KMM) to mirror Kafka data from one Kafka cluster to another, but we were experiencing scaling issues with it. Since Brooklin was designed as a generic bridge for streaming data, we were able to easily add support for moving enormous amounts of Kafka data. This allowed LinkedIn to move away from KMM and consolidate our Kafka mirroring solution into Brooklin.
One of the largest use cases for Brooklin as a streaming bridge at LinkedIn is to mirror Kafka data between clusters and across data centers. Kafka is used heavily at LinkedIn to store all types of data, such as logging, tracking, metrics, and much more. We use Brooklin to aggregate this data across our data centers to make it easy to access in a centralized place. We also use Brooklin to move large amounts of Kafka data between LinkedIn and Azure.
A hypothetical example of Brooklin being used to aggregate Kafka data across two data centers, making it easy to access the entire data set from within any data center. A single Brooklin cluster in each data center can handle multiple source/destination pairs.
Brooklin’s solution for mirroring Kafka data has been tested at scale, as it has fully replaced Kafka MirrorMaker at LinkedIn, mirroring trillions of messages every day. This solution has been optimized for stability and operability, which were our major pain points with Kafka MirrorMaker. By building this Kafka mirroring solution on top of Brooklin, we were able to benefit from some of its key capabilities, which we’ll discuss in more detail below.
In the Kafka MirrorMaker deployment model, each cluster could only be configured to mirror data between two Kafka clusters. As a result, KMM users typically need to operate tens or even hundreds of separate KMM clusters, one for each pipeline; this has proven to be extremely difficult to manage. However, since Brooklin is designed to handle several independent data pipelines concurrently, we are able to use a single Brooklin cluster to keep multiple Kafka clusters in sync, thus reducing the operability complexities of maintaining hundreds of KMM clusters.
A hypothetical example of Kafka MirrorMaker (KMM) being used to aggregate Kafka data across two data centers. In contrast with the Brooklin mirroring topology, more KMM clusters are needed (one for each source/destination pair).
Dynamic provisioning and management
With Brooklin, creating new data pipelines (also known as datastreams) and modifying existing ones can be easily accomplished with just an HTTP call to a REST endpoint. For Kafka mirroring use cases, this endpoint makes it very easy to create new mirroring pipelines or modify existing pipelines’ mirroring whitelists without needing to change and deploy static configurations.
Although the mirroring pipelines can all coexist within the same cluster, Brooklin exposes the ability to control and configure each individually. For instance, it is possible to edit a pipeline’s mirroring whitelist or add more resources to the pipeline without impacting any of the others. Additionally, Brooklin allows for on-demand pausing and resuming of individual pipelines, which is useful when temporarily operating on or modifying a pipeline. For the Kafka mirroring use case, Brooklin supports pausing or resuming the entire pipeline, a single topic within the whitelist, or even a single topic partition.
Brooklin also exposes a diagnostics REST endpoint that enables on-demand querying of a datastream’s status. This API makes it easy to query the internal state of a pipeline, including any individual topic partition lag or errors. Since the diagnostics endpoint consolidates all findings from the entire Brooklin cluster, this is extremely useful for quickly diagnosing issues with a particular partition without needing to scan through log files.
Since it was intended as a replacement for Kafka MirrorMaker, Brooklin’s Kafka mirroring solution was optimized for stability and operability. As such, we have introduced some improvements that are unique to Kafka mirroring.
Most importantly, we strived for better failure isolation, so that errors with mirroring a specific partition or topic would not affect the entire pipeline or cluster, as it did with KMM. Brooklin has the ability to detect errors at a partition level and automatically pause mirroring of such problematic partitions. These auto-paused partitions can be auto-resumed after a configurable amount of time, which eliminates the need for manual intervention and is especially useful for transient errors. Meanwhile, processing of other partitions and pipelines is unaffected.
For improved mirroring latency and throughput, Brooklin Kafka mirroring can also run in flushless-produce mode, where the Kafka consumption progress is tracked at the partition level. Checkpointing is done for each partition instead of at the pipeline level. This allows Brooklin to avoid making expensive Kafka producer flush calls, which are synchronous blocking calls that can often stall the entire pipeline for several minutes.
By migrating all of LinkedIn’s Kafka MirrorMaker deployments over to Brooklin, we were able to reduce the number of mirroring clusters from hundreds to about a dozen. Leveraging Brooklin for Kafka mirroring purposes also allows us to iterate much faster, as we are continuously adding features and improvements.
Change data capture (CDC)
The second major category of use cases for Brooklin is change data capture. The objective in these cases is to stream database updates in the form of a low-latency change stream. For example, most of LinkedIn’s source-of-truth data (such as jobs, connections, and profile information) resides in various databases. Several applications are interested in knowing when a new job is posted, a new professional connection is made, or a member’s profile is updated. Instead of having each of these interested applications make expensive queries to the online database to detect these changes, Brooklin can stream these database updates in near real-time. One of the biggest advantages of using Brooklin to produce change data capture events is better resource isolation between the applications and the online stores. Applications can scale independently from the database, which avoids the risk of bringing down the database. Using Brooklin, we built change data capture solutions for Oracle, Espresso, and MySQL at LinkedIn; moreover, Brooklin’s extensible model facilitates writing new connectors to add CDC support for any database source.
Change-data capture can be used to capture updates as they are made to the online data source and propagate them to numerous applications for nearline processing. An example use case is a notifications service/application to listen to any profile updates, so that it can display the notification to every relevant user.
At times, applications may need a complete snapshot of the data store before consuming the incremental updates. This could happen when the application starts for the very first time or when it needs to re-process the entire dataset because of a change in the processing logic. Brooklin’s extensible connector model can support such use cases.
Many databases have transaction support, and for these sources, Brooklin connectors can ensure transaction boundaries are maintained.
For more information about Brooklin, including an overview of its architecture and capabilities, please check out our previous engineering blog post.
In Brooklin’s first release, we are pleased to introduce the Kafka mirroring feature, which you can test drive with simple instructions and scripts we provided. We are working on adding support for more sources and destinations to the project—stay tuned!
Have any questions? Please reach out to us on Gitter!
Brooklin has been running successfully for LinkedIn workloads since October 2016. It has replaced Databus as our change-capture solution for Espresso and Oracle sources and is our streaming bridge solution for moving data amongst Azure, AWS, and LinkedIn, including mirroring trillions of messages a day across our many Kafka clusters.
We are continuing to build connectors to support additional data sources (MySQL, Cosmos DB, Azure SQL) and destinations (Azure Blob storage, Kinesis, Cosmos DB, Couchbase). We also plan to add optimizations to Brooklin, such as the ability to auto-scale based on traffic needs, the ability to skip decompression and re-compression of messages in mirroring scenarios to improve throughput, and additional read and write optimizations.
We want to thank Kartik Paramasivam, Swee Lim, and Igor Perisic for supporting the Brooklin engineering team throughout our journey. We also are grateful to the Kafka, Samza, Nuage, and Gobblin teams for being wonderful partners. Last but not least, a huge shout out to the members of the Brooklin engineering team, who put their hearts and souls into this product and shaped what it is today.
Pinot, a scalable distributed columnar OLAP data store developed at LinkedIn, delivers real-time analytics for site-facing use cases such as LinkedIn's Who viewed my profile, Talent insights, and more. Pinot uses Apache Helix for managing cluster resources and Apache Zookeeper to store metadata. Pinot has wide adoption at LinkedIn, ranging from internal dashboards to site-facing applications.
Pinot supports batch data ingestion (referred to as “offline” data) via Hadoop, as well as real-time data ingestion via streams such as Kafka. Pinot uses offline and real-time data to provide analytics on a continuous timeline from the earliest available rows (could be in offline data) up to the most recently-consumed row from the stream.
Serving queries on data while rows are being ingested from a real-time stream poses a unique set of challenges. Pinot has been chipping away at these, and getting better over time.
Pinot stores data in shards called "segments." During query execution, Pinot processes segments in parallel, and merges results across segments to construct the final response for a query. Offline data is pushed into Pinot as pre-built segments (offline segments) and stored in the Segment Store (see Architecture diagram). These segments are stored as ImmutableSegment objects (addition, deletion, or modification of rows is not possible on these segments). On the other hand, real-time data is consumed on a continuous basis from the underlying stream partitions into segments called MutableSegments (or, “consuming” segments). These segments allow for the addition of rows to them (rows can still not be deleted or updated, though). MutableSegments store rows in uncompressed (but still columnar) form in volatile memory (discarded on restart).
Every so often, the rows in a MutableSegment are compressed and persisted by “committing” the segment into the Segment Store as an ImmutableSegment. Pinot then moves on to consume the next set of rows from the stream partition into a new MutableSegment. The key question here is: “At what point (or, how often) should Pinot decide to commit a consuming segment?”
Committing segments too frequently ends up with many small segments for a table. Since Pinot queries are processed at segment level, having too many segments results in increased overhead for processing queries (number of threads spawned, metadata processing, etc.), resulting in higher query latencies.
On the other hand, committing segments less frequently can result in servers running out of memory, since new rows get added into MutableSegments all the time, expanding the memory footprint of these segments. Furthermore, servers can be restarted at any time (at LinkedIn, we push new code every week), causing the MutableSegment to discard all rows and re-start consuming from the first row of the MutableSegment again. This by itself is not a problem (Pinot can ingest back-logged data at a very high rate), but it is possible that the underlying stream topic has retention configured so that the first row of the MutableSegment has been retained out. In this case, we lose data—not good!
It turns out the answer depends on several factors—like ingestion rate and number of columns in the schema, to name a few—that vary across different applications. Pinot provides some configuration settings (e.g., a setting for maximum number of rows in a MutableSegment) that address these variations, but there were still questions from administrators regarding how to set the correct values for those settings on a per application basis. Experimenting with different settings (or combinations thereof) for each application was not a scalable solution, given Pinot’s adoption rate at LinkedIn. In this blog, we will explain how we implemented auto-tuning of real-time consumption that eliminated the experimentation process completely and helped administrators scale to Pinot’s adoption rate.
In order to understand the problem and the solution better, it is useful to go over the Pinot real-time architecture in some more detail.
Pinot real-time ingestion
Pinot real-time servers create a PartitionConsumer object for each stream partition they are directed (by Helix) to consume. If the table is configured to have q replicas and there are p partitions of the stream, then there will be (p * q) instances of PartitionConsumer objects across all the servers for the table. If there are S servers serving this table, then each server will have ⌈(p * q)/S ⌉ PartitionConsumer instances.
The figure below is an illustration of how PartitionConsumer objects are distributed across Pinot real-time servers.
Real-time servers consuming from a stream of p partitions
Helix ensures that more than one replica of any stream partition is never consumed in the same real-time server. (Therefore, we must set S >= q, otherwise table creation will not succeed).
Pinot assumes that the underlying stream partition has messages that are ordered according to their arrival within the partition, and that each message is located at a specific “offset” (essentially a pointer to the message) in the partition. Each message of the stream partition is translated into a row in the MutableSegment. Each MutableSegment instance has rows from exactly one stream partition. The metadata for a MutableSegment (in Zookeeper) has the offset in the partition from which consumption should start for that segment. This starting offset value applies to all replicas of the MutableSegment. Pinot controller sets the value of the starting offset in the segment metadata at the time the segment is created (which is either when the table is first created, or, when the previous segment in that partition is committed).
The algorithm to commit a segment involves a few steps, during which queries continue to be served from the MutableSegment. After a segment is committed, the MutableSegment is atomically swapped with the (equivalent) ImmutableSegment. The memory taken up by a MutableSegment instance is released after the last query on that instance is drained. All through this process, the application is unaware that any segment commit is going on. The algorithm to commit a segment is as follows:
Build an ImmutableSegment out of rows in the MutableSegment.
Commit the segment to the controller (In this step, the controller creates the next segment in the partition).
Await signal (from Helix) for the next segment.
Resume consumption when signal is received, indexing rows into a new MutableSegment.
This algorithm is illustrated in the figure below. The actual steps of segment completion are more involved, but we skip the details in this blog.
The problem of provisioning
The characteristics of applications being provisioned can vary widely from one another. Here is a partial list of variations across applications:
The cost of holding a row in memory depends on the data schema (more columns means more memory).
Pinot uses dictionary encoding to optimize memory consumption (values in rows are stored as integer dictionary IDs that refer to the actual value in a dictionary). Therefore, a higher number of unique values of any column will consume more memory in the dictionary.
The rate at which events are ingested into a topic varies widely across applications, and even over time in any one application. For example, events could be coming in at a much higher rate on a Monday morning than on a Friday evening.
The number of stream partitions can vary across applications (see below for the impact).
We may provision different number of machines for an application with higher query loads than another with a lower query load.
In earlier versions of Pinot, we provided two configuration settings:
Maximum number of rows that can be held across all MutableSegments in a server (N).
Maximum time (T ) for which a MutableSegment can exist. After this time, the segment is to be committed, no matter how many rows are in the segment at that time. The administrator may set the value of T depending on the retention of the underlying stream.
If a server ended up owning k ( = ⌈(p * q)/S ⌉) partitions of a table, the Pinot controller sets the segment metadata to consume at most x (= N/k) rows. The PartitionConsumer is designed to stop consumption and start the commit procedure either upon reaching time T, or after consuming x rows into the MutableSegment. However, the variations across applications will require N to be different for each one.
There is one other thing the administrators had to consider before choosing N: Resident Memory size on each server (for both MutableSegments and ImmutableSegments):
Memory for a MutableSegment is (as far as possible) acquired at the time the MutableSegment is created. The amount of memory acquired is based on the threshold x set for that segment (therefore, to have a high value of x and not use the memory allocated is a waste).
The ImmutableSegment is resident in virtual memory until the retention time of the real-time table, at which point it is unloaded. A higher value of x would mean a smaller number of (larger) ImmutableSegment objects, and larger MutableSegment objects.
The total resident memory on a server will depend on the following:
Number of stream partitions that the server hosts (k).
Number of ImmutableSegments created during the retention period.
Size of ImmutableSegments.
Size of MutableSegments (dependent on x, and other things as outlined above).
The value of k depends on the number of servers deployed. An administrator may decide to deploy as many servers as necessary to support the query throughput, given the latency requirements.
As you can see, the number of variables quickly gets out of hand, and we seem to need one to estimate the other. In order to arrive at a working configuration setting, the administrators had to run benchmark experiments before provisioning a use case:
Set up a table with some number of servers and a value of N.
Consume from earliest offset in the stream partitions so that we get to have the ImmutableSegments in place (this is an approximation, since ingestion rate varies across time for any given stream topic, causing us to hit the time limit rather than row limit).
Run the retention manager to retain out the older segments.
If there is too much paging or we run out of memory, then change the number of servers or N (depending on segment sizes) and go back to step 1.
Run a query benchmark firing queries at the rate the application expects to do so. If performance is not as desired, increase the number of hosts and go back to step 1, readjusting N as needed.
Arriving at the right configuration settings for an application took a few (sometimes several) days, not to mention the time spent by Pinot administrators while they had more urgent things to focus on.
In order to help administrators provision a use case, we decided to provide:
A target segment size setting for the committed segment. Pinot would attempt to create ImmutableSegment objects of this size.
A command line tool that helped the administrators choose the target segment size.
With these two in place, all that administrators need to do is to run the command line tool with a sample segment (generated from data previously gathered via ETL on the same topic). The tool outputs a few choices to pick from, depending on the number of servers that are needed for query processing. The administrator can then select one of the choices and provision the table, confident that it will work as desired with reasonable performance.
Command line tool
Given a sample segment, the tool estimates the resident memory on a host, and the segment size setting. The tool works by estimating the resident memory with these segment sizes.
Here is a sample output from RealtimeProvisioningHelper for a table:
The output shows, for different numbers of servers used and hours that a MutableSegment consumes data:
The total memory used in a server (for MutableSegments as well as ImmutableSegments).
Optimal Segment size setting.
The amount of memory that MutableSegments will use (Consuming Memory).
Each of these will vary according to the number of hours consumed, so the values are displayed for the different numbers as provided in the command line arguments. The administrator specifies the host counts that they are considering (in this case 8,10,12, or 14 hosts), a sample segment from consumed data (or sample segment from offline data), and the table configuration (for retention time, etc.). The utility prints out the matrix as above.
Based on the output, the administrator can choose to deploy 8, 10, 12, or 14 hosts, and choose the segment size limit appropriately as per the table. In the above example, if the administrator chooses to use 12 servers (say, based on query throughput requirements), then 10 hours seems to be utilizing memory optimally. The optimal segment size seems to be 360MB. So, the configuration would look like this (the other parameters of StreamConfigs are omitted for brevity):
Based on the output of the tool, we know that if the PartitionConsumer commits a segment when the segment size is around 360MB, we should be utilizing resident memory optimally between MutableSegments and ImmutableSegments. Note that the 360MB size is that of an ImmutableSegment. As explained before, a MutableSegment is converted to an ImmutableSegment at the time of committing the segment, so it is a chicken-and-egg problem to determine the size of an ImmutableSegment before building one.
Recall that we stop consuming when we reach a row limit (x) or time limit (T). So, if we can somehow set the row limit for a segment in such a way that we can expect the resulting segment size to be near the target segment size, we should be good. But then, how do we estimate the number of rows that results in the desired segment size?
Estimating the row limit for a desired segment size
In order to come up with a row limit for a MutableSegment, we decided to take advantage of the fact that the controller is responsible for committing a segment as well as creating a new segment (which it does in one step, as shown in the picture above).
The idea is for the controller to decide the value of x for the next segment, so as to reach the desired segment size. At the time of segment completion, the controller estimates the number of rows that need to be consumed in the next segment based on the current segment size and the number of rows consumed in the current segment.
ImmutableSegments have indices, dictionary, etc. in a compressed representation. So, the size of a segment may not vary linearly with the number of rows (e.g., the dictionary size is based on the number of unique values of a column and the average width of the column, no matter how many rows there are in the segment). Also, segment sizes can potentially vary a lot depending on the actual values in a single segment.
Therefore, we take into account the past values of segment sizes while estimating the size of the next segment. Instead of maintaining the segment sizes over time, we maintain the ratio of segment size to number of rows, improving the ratio each time a segment completes, so that we can estimate the number of rows reasonably for the next segment.
Algorithm for setting the row limit
We assume that the ratio of segment size to number of rows is a constant for each table (say, R). Since there is a fixed overhead for creating a segment even with one row, R is not really a constant, but is a good approximation. Each time a segment completes, we compute the value of R and adjust the learned value R to be more accurate, as below:
Here, Rcurrent is the row-count-to-size ratio of the current segment (i.e., the one that is in the process of completing). We choose α to be a number higher than 0.5 so that we weigh the learned value more than the new value.
The number of rows threshold for the next segment is computed as:
xn+1 = desiredSegmentSize / Rn+1
Also, it is possible that even though we set x for a segment to be some number x1, the PartitionConsumer could reach the time limit T after only x2 rows, where x2 < x1.
In this case, for the subsequent segment, we want to set the row limit to be more like x2, so that we always try to end the segments by reaching the row limit rather than the time limit (this goes back to not wasting memory allocated up front, as mentioned before).
Taking these factors into account, here is the final algorithm:
Note that the value of R is stored in local memory, not persistent store. It may happen that the lead controller needs to be restarted (e.g., for deployment, failure, etc.). In this case, another controller takes over leadership, and as per the algorithm, starts with a null value of R. However, the algorithm takes the first value of R from the completed segment, thus effectively transferring over the value to the new controller, with all the history of older segments.
Lastly, we run this algorithm only on one partition of a topic. Multiple partitions of a stream tend to have similar characteristics at similar times. For example, if 100 new articles appeared between 8 and 9am, the events for the clicks on those articles will probably follow a similar distribution across all partitions of the click-stream during that period. So, changing the value of R (which is applicable across all partitions of the table) whenever the segment completes for any partition is not a good idea, since we will be biasing the value of R towards recent segments more than we want to.
In practice, we see that all stream partitions of a topic result in more or less the same segment sizes, and complete more or less at the same time.
The algorithm presented essentially computes the row limit for the next segment, given some history and characteristics of the current completing segment. Here is a graph that shows the size of the segment adjusting to reach the target segment size over the first 20 segments. The measurements are for a single partition of the stream topic of a table. The average event ingestion rate was 630 rows/sec, with the maximum being around 1,000 rows/sec.
The number of unique values (within a segment) in a column, the dictionary sizes, etc. can vary significantly between different segments, especially as we transition from a weekend to a weekday, or from a longer holiday period to a workday. Depending on the topic (Pinot serves over 50 topics in production), major world events, publications, new product launches, etc. can significantly change the characteristics of data, thus making it hard to predict the segment size by just using number of rows. Thus, the estimated number of rows for a segment could result in much larger (as is the case with the 500MB segment size target in the graph above) or much smaller segment size.
However, the wild variations typically happen during the initial learning period. Typically, tables are provisioned first and queries ramped up over time.
Here is a graph that shows the segment size variation over 10+ days with the target segment size of 500M.
With over 630 million members, the LinkedIn platform delivers thousands of features that individually serve and store large amounts of diverse data. Protecting, maintaining, and serving data has always been of paramount importance for enriching the member experience and ensuring service reliability. In this blog post, we’ll address a critical part of data management involving ad hoc operations through data migrations or data fixes. More specifically, we’ll detail how we developed a centralized scaleable self-service platform for implementing and executing both data fixes and migrations.
Let’s review the two primary types of data operations. Data migrations generally involve a process of collecting, transforming, and transferring data from one field, table, or database to another. Data fixes on the other hand generally involve selecting and transforming data in place. To elaborate on these types, let’s go through an example:
Suppose member first names are not limited in length. A number of members have entered first names that are millions of characters long. A product decision is made to enforce a limit of 1,000 characters. A data fix might involve truncating the first names of all members with first names of length greater than 1,000 characters.
Suppose member profile data exists in a relational database that we would like to transfer to a NoSQL database. In the past, we migrated profile data from multiple Oracle databases to a NoSQL Espresso database in order to leverage existing technology developed internally by LinkedIn. This would be an example of a data migration.
From cleaning invalid or corrupted data, to migrating member data from legacy systems, these operation types are all use cases that we frequently encounter at LinkedIn.
As LinkedIn and its datasets continue to grow rapidly, executing these data operations becomes increasingly difficult at scale. Specifically, data fixes can carry a weight of urgency as immediate solutions to production issues. For both data operations, reducing the engineering effort required provides immense member value by preserving data quality and expediting feature development—especially when data migrations can require several months to complete and verify. Historically, across the company, data operations have been conducted through a decentralized variety of mechanisms including ad hoc scripts, use case-specific CLIs, deploying new migration services, and many more.
However, in order to develop a generic platform, we had to keep in mind a few requirements and considerations:
Problem 1: Ease of use
This new system should maintain a level of simplicity such that any data owners can write a datafix or migration without significant effort. Achieving this requires abstracting concepts away from users writing jobs through simple interfaces, as well as flexible support for multiple languages, frameworks, and data engines (i.e. Java, Python, Pig, Hive, Spark, etc.). To reduce the engineering effort required for data operations, we need to abstract away multiple aspects such as resource/topology management, request throttling/scaling, code deployment, iterating data operations on records, and many more. The goal of these abstractions and interfaces is to improve both the developer experience and speed of implementing data operations as much as possible.
Problem 2: Data correctness
With any data operation, we must ensure and maintain data correctness. This entails keeping data in a valid state for use by features/products and preventing data from entering a state such that member features break. Any data platform should focus on preventing these implications if at all possible by allowing for strong validation. For any software or engineering company at the scale of LinkedIn, designing systems that have perfect validation and will preempt all future data quality issues is close to impossible. With a multitude of data stores, schemas, and services, not all corner cases and holes in validation can be fully prevented or protected against. Any client, system, or script can compromise data quality. Similarly, any data operation job could do the same, potentially exacerbating pre-existing issues. Therefore, any existing validation must be honored regardless if data changes occur organically via live traffic or inorganically through a data operation platform.
Validation comes in all shapes and sizes. Simple validation can range from type checks, formatting checks, range checks, and many more. Alternatively, complex rules can include checks across multiple tables within a database or cross-service calls to multiple sources of truth.
Often times, these simple rules can be fully enforced at the database layer, but not all validation rules can or should be implemented this way. Enforcement is not always feasible at the database layer and may require an intermediate service, especially in the case of complex rules. To satisfy both use cases of simple and complex rules, it is imperative that a data platform be flexible enough to access any service or database that could contain necessary validation rules, ensuring any fixes or migrations maintain high data quality.
Problem 3: Scaling and throttling
When modifying large amounts of records, any data migration must be able to iterate on records quickly. For example, we recently needed to purge deprecated profile fields, such as associations and specialties, from millions of members as part of a decision to remove the elements of the profile that no longer seemed relevant to the product. Generally, data migrations may have to apply across millions or hundreds of millions of profile records, which requires an immense scaling of this data operation process. If we were to modify all 630 million members’ records using a data migration job that can only modify 10 records per second, this migration would take almost two years to complete! Our data platform must be able to modify large amounts of records quickly across large datasets.
On the other hand, complex systems will inevitably have bottlenecks and capacity limits (database, service, and event consumption limitations). High query frequency and volume can easily take down under-provisioned services or exposed bottlenecks, and require careful control of data operation rates. This system will require consistent throttling mechanisms to ensure services do not take on an unmanageable load.
Solution: Data operation platform
In order to solve these problems, we implemented a flexible self-service platform for users to rapidly implement, test, and execute data operation jobs. At a high level, the platform filters down large datasets to records that need to be operated upon and then iterates through each of these records to conduct corresponding updates to data until the job is complete.
The high-level components consist of the following:
User inputs: User-implemented dynamic and static job configurations, an offline script, and online data operation code.
Artifact repository: Source code repository for user implemented job build artifacts.
Azkaban workflow: A reusable generic job workflow consisting of an offline filter phase and online mapper phase. Azkaban is a batch workflow job scheduler created at LinkedIn to run Hadoop jobs.
Existing data services and infrastructure: Existing data infrastructure and data services that the workflow interacts with to read and write data for executing a data operation job.
Throughout this blog post, we’ll delve into how these components interact with each other and how they function together to address the technical requirements.
Users of the platform may build data jobs for a wide variety of reasons or use cases to be supported by the platform. To handle this, the platform workflow is intentionally made generic such that the user inputs are used as plugins during the job workflow’s execution. This allows the platform to be agnostic to the details of each job’s implementation. The platform does not define the user logic but instead controls the structure, interfaces, and execution of data operation jobs.
This approach to abstraction addresses the first problem of ease of use by removing the need for:
Complex user implementations through simple interfaces provided by the platform
Maintaining and understanding the tedious details common among most jobs
Managing computing and storage resources required for job execution
The platform abstracts away these implementation details such that clients only provide the user inputs in the form of a static distributable and dynamic configurations.
The static distributable includes specifications defined once for each given job involving an offline script, online data operation logic, and a static manifest configuration file. The offline script can be a Pig, Hive, or Spark job that limits input datasets or sources them into a filtered set of records to be operated upon. The online data operation logic is an implementation of an interface for conducting necessary reads or writes based upon a single record result from the offline script. These three subcomponents are compiled and built into a JAR to be stored into the artifact repository that will then be downloaded during the Azkaban workflow job execution.
The dynamic configurations provide easily modifiable parameters that users can adjust for each execution of a given job. To complete a data fix or migration, it’s likely multiple executions with different flavors will be required. Some examples of these modifications include request rates, throttling limits, data center fabrics, or other environment variables.
Once users provide the above components, they are ready to run their jobs. Once the platform downloads and processes the user inputs, the job begins with the filter phase. This phase narrows down the records from a given dataset to a closer representation of the records that need to be fixed or migrated. For example, even though 630 million members exist, we may only need to fix 1 million of those member records. Filtering allows jobs to iterate only on the records that actually matter and reduces unnecessary data operations.
Filter phase workflow
With the downloaded distributable JAR from the artifact repository, the platform will first extract the offline script and static manifest. Then, the offline script will be executed to read defined datasets from HDFS and apply filtering logic based upon the values and records from those datasets. Finally, the filter phase will produce a limited set of filtered records as output to be operated upon in the mapper phase.
Once the job has produced a set of records to operate on, the workflow can conduct the necessary writes. The platform will then use the user-defined online logic to conduct reads and writes to online services or data stores.
Mapper phase workflow
In order to handle large amounts of data operations, the platform must be able to dynamically scale computing resources. This is done through the use of Gobblin, a universal data ingestion framework for extracting, transforming, and loading large volumes of data. Gobblin libraries allow the platform to create tasks based upon an incoming data source, such as the filtered set of records in this case. With the online data operation logic, the workflow then constructs execution tasks to be queued, throttled, and carried out. This is how the platform addresses the second problem of scaling and throttling. Using the dynamic configurations provided, the platform scales and throttles the execution of these tasks.
Since the online logic is plugged into the tasks for execution, flexibility remains for calling existing online data services and databases and satisfying the requirement for data correctness. Therefore, users or owners of data services and databases can enforce validation rules as they so choose. Users can then ensure that their data operation logic calls the desired services or systems that contain those strict validation rules.
Lastly, the existing LinkedIn data infrastructure will propagate these job writes from databases into HDFS through ETL pipelines. As more and more updates propagate to HDFS, the number of filtered records will continue to decrease until the remaining filtered set becomes zero. Now, the loop is complete! We’ve successfully cleaned up invalid data or prepared legacy systems for deprecation and decommissioning!
Conclusion and next steps
By abstracting away a majority of the implementation details required for running data operation jobs, engineers can put aside tedious aspects—such as throttling, scaling, deployments, and resource management—allowing for faster fixing and migrating of data. Some simple data jobs can be implemented within a few hours or even a single day.
Currently, the platform supports offline to online jobs (an offline filtering phase with an online mapper phase). This will be evolved to support additional job types:
Online to online: In some cases, offline data may be unavailable or unreliable, which may require filtering with online sources of truth directly.
Nearline to online: In other cases, it may be desirable to trigger migration logic on database changes or other nearline events instead of offline filtering.
Additionally, we also plan to integrate existing quota systems into the data operation platform so that we can ensure that the throttling of data operations can be controlled by the service receiving the data operation requests, rather than on the client-side (data operation platform) that is doing the triggering. This can help ensure that systems are robustly protected against excessive data operation traffic through enforcement.
WomenConnect is a series of events focused on bringing together women in tech to build meaningful relationships. Over the past four years, we’ve shared countless stories, learned a ton of lessons, and made lifelong connections at the 16 WomenConnect events in four different cities.
On June 12, our NYC office in the Empire State Building played host to a vibrant and energetic collection of attendees at our event. The theme, “Balance for Better: Allyship,” focused on carrying out change and it was a natural progression from our previous themes of transformation and building communities. We collected various speakers and leaders to set the tone for the night by sharing their personal experiences, advice, and takeaways on how all of us can effect change.
As the networking reception kicked off the event, it was exciting to see attendees from around the local NYC engineering and product community. Having been a part of the planning process of all three WomenConnect NYC events, I was most in awe of the repeat attendees from our previous events because it showcased how we’ve come a long way in building a community. The space we’ve created for people to be vulnerable and supportive of one another is really special.
I wanted to share some takeaways to try and capture what makes the event so unique.
Treat people beautifully
Relationships matter. Throughout the event, we touched on the importance of allyship, defined as a “lifelong process of building relationships based on trust, consistency, and accountability with marginalized individuals and/or groups of people.” To build meaningful relationships, it’s important to be self-aware and bring your full self to work. Our Head of Global Diversity, Inclusion, and Belonging Rosanna Durruthy shared that for her this entails being a woman in tech, a mother, a person of color, and a member of the LGBTQ+ community. To be an ally and advocate for diversity means we understand and honor how these different identity categories interrelate with one another.
“Treat people beautifully.” – Rosanna Durruthy
To be authentic is to be vulnerable
Authenticity is what allows us to connect with others. Part of showing and bringing your true self to work involves sharing your full self, including your vulnerabilities. Rosanna fully embraced and embodied this value of authenticity when asked about dealing with failure. She went on to share a personal story from early in her career when she didn’t get along with a more senior coworker—they had started off on the wrong foot, and this set the tone for the rest of the time she was on the team. Looking back, she realized her unwillingness to try to reset things was a misstep on her part. In not taking that first step to mend the relationship, she shared that she felt she had missed a lot of learning moments. Rosanna’s introspection and confidence to look back and pinpoint what could have been done better really set the tone in our table discussions to open up with one another.
In addition, among the dozens of LinkedIn executives who participated in the event, there were many male allies. They signaled strong support to our efforts and goals, and were there to learn, but also to offer valuable advice on growth and empowerment. One of them, Chris Pruett, VP of engineering, and a facilitator at one of the tables, encouraged the guests to be confident and not be afraid to ask for what they want.
The table discussions served as an additional way to meet people, including some of our engineering leadership.
Bringing it back to work
After the fireside chat, we dove into a Q&A session, during which attendees were able to line up behind the mics to ask Rosanna and marketing development global lead Ty Heath questions. This was a time for audience members to ask for advice and share their own stories. The questions were as tactical as how to negotiate for a raise or determine salary as a contractor, which led us all to realize our shared experiences and concerns in the workplace. Throughout the evening, we were reminded that we have a collective responsibility to bring back the takeaways learned from this evening to our day-to-day lives. How can we replicate this positive energy and feeling of community throughout our own workplaces?
This sense of making sure the event’s impact lasted beyond one night was the central theme of the table discussions portion of the night. As we drew inspiration from the confident and talented women on stage, it was then time for us to regroup in a more intimate group discussion setting and brainstorm how to bring the true change home to our workplaces and communities, one action at a time. First, we were asked to come up with one change we would like to see to increase diversity in the workplace. Then, we jumped into more actionable thinking—how could we help implement that change? One point that stood out to me was the importance of managers leading by example, especially when it comes to work-life balance.
We concluded with representatives from each table coming up on stage to share the main points and take-aways from their group discussions. The amount of great ideas and actionable implementation steps was amazing to hear. By the end of the night, we left empowered with the awareness that we could impact change and that there was an entire community of allies behind us every step of the way. As Vice President of Engineering Erica Lockheimer shared, we can be change agents at any level, but we need to be intentional and have the awareness of what exactly needs to change.
Thank you to anyone who has ever attended a WomenConnect event. Each event has built on the previous ones to improve, and that is in huge part thanks to everyone’s willingness to open up and connect with one another. Thanks also to our June event’s emcees, Stephanie Killian and Abby Garcia, for their amazing work. Follow our hashtag, #WomenConnect, on LinkedIn to see what we’re up to.
“When you truly have empathy and feel it, the passion grows stronger for change.” – Erica Lockheimer
In this blog post, we’ll share how we migrated Espresso, LinkedIn’s distributed data store, to a new Netty4-based framework and achieved a large performance and capacity gain throughout the Espresso system as a result. In the larger scheme, this is particularly important since Espresso is a master data hub that serves many important applications across LinkedIn, and every improvement we implement has an impact on the user experience in one way or another.
First, we’ll provide a refresher on Espresso if you’re not familiar with it. Espresso is LinkedIn's scalable and elastic data-as-a-service infrastructure and, as mentioned, is a master data hub that serves many important applications across LinkedIn.
Here’s a 30,000-foot view of the overall Espresso ecosystem:
It’s important to note the Espresso operation can be divided into two parts:
Control Path: The Control Path maintains and controls the state transitions of the different components.
Data Path: The Data Path is how the data flows in the system. It includes three different operations, including online, nearline, and offline. The work we discuss in this post is mainly referencing the online operation, which services real-time client read/write and requires high availability and low latency.
Espresso is a horizontally-scaling, multi-colo data service, which has proved to be the most cost-effective and efficient for a large distributed system. However, even with commodity hardware, as the system grows larger and larger, the cost to build and maintain such a large system is significant. Therefore, improving the performance and capacity of each node is imperative to driving cost savings, considering the scale of the system.
About Netty framework
Netty is a Java application framework for networking services that is widely used at LinkedIn and in the industry. Netty provides high efficiency with a non-blocking, asynchronous, event-driven framework and high flexibility with a chained handler pipeline structure.
Here is an overview of protocol stacks in Netty framework:
The bottom of the stacks provide core functionalities like zero-copy-capable byte buffer, asynchronous event model, and universal APIs. The upper layers of the stacks provide comprehensive services like compression, SSL support, and network protocols like HTTP and WebSocket.
Netty migration on Espresso
The original Espresso data pipeline framework was developed around 2011 and the technology has evolved with new features and capabilities since then. However, most frameworks eventually run their course and we decided it was time to migrate the old system to a new Netty4 framework for a myriad of reasons:
To modernize the infrastructure, upgrade libraries, and enhance capabilities, security, and performance.
To support new features such as HTTP2, streaming, TLS1.3, and more.
To make fundamental changes to the Espresso online data path, including the new thread model and direct (off-heap) memory allocation with buffer pool.
When building the new framework, there were specific features we were set on implementing and building, which included a new thread model for I/O threads, better memory management with direct buffer pooling, streamlining the asynchronous pipeline, and providing native epoll support for socket. This section will expand on each of these.
A new thread model for I/O threads
We implemented a new thread model to avoid inter-thread locking between I/O threads. Thread-local variables are widely used to prevent contention between threads and improve CPU cache hit-rate. In the two following images, you can compare and contrast our old data flow with our Netty4 flow.
Direct buffer pool for better memory management
Instead of putting memory allocation load on JVM heap for transient memory usage, the direct buffer pool is now used for better memory management. The buffer pool directly allocates and manages the memory from the operating system, thus reducing the JVM heap GC pressure and related memory copy operation.
This diagram shows how the direct buffer pool is used to offload request/response data buffers from traditional JVM heap to the direct buffer pools.
Streamline the asynchronous handling pipeline
We made the online data pipeline asynchronous, where only the Store layer is still running in synchronous mode, to fit the JDBC requirements. The HTTP and Espresso layers are fully asynchronous.
In the asynchronous pipeline, the executions are non-blocking. The next execution in the queue does not need to wait for the previous execution to finish before starting the execution. This greatly improves the throughput and reduces the latency of the system.
Provide native epoll support for socket
In a typical Espresso cluster, there are thousands of TCP connections from routers to each storage node. We chose to use Netty’s native epoll approach, instead of the Java NIO epoll approach, because we found epoll to be more efficient for managing a large number of connections.
The advantages of the native epoll approach include: 1) it implements the edge-triggered model, which performs better than the level-triggered model of Java NIO epoll; and 2) the native epoll generates less garbage on the JVM heap, which reduces the GC pressure for the system.
Deployment and the result
Metrics for performance measurement
For a large system migration, it is important to compare the system performance before and after the migration. This generates evidence on what works and what does not, providing guidance for future work and references for other related projects.
In this project, the following metrics were used to measure the performance.
JVM GC: JVM OldGen GC and Young Gen GC are measured to show the heap usage.
Latency: P99 and max latencies of client requests are measured to show the health of the service.
Capacity: For capacity and throughput, we measured RCU/WCU (see the definition in Capacity Improvement section) and QPS.
In the following section, we’ll look at some of our initial results in these categories.
Production cluster results
The production Espresso system is composed of multiple clusters based on the internal customer profiles. Each cluster is configured to best fit the specific needs of the customer. From the traffic access pattern’s point of view, we can categorize the clusters into read-heavy and write-heavy clusters.
For both read-heavy and write-heavy clusters, we saw large latency improvements after the migration.
For read-heavy clusters, this latency improvement is mainly due to the new thread model asynchronous pipeline improvement, and the native epoll support.
For write-heavy clusters, since writing is slower and more expensive compared to reading, memory consumption is generally much higher than in read-heavy clusters. This leads to the direct memory buffer pool taking more load in the write-heavy cluster. Thus, we see the higher JVM GC improvement in the write-heavy cluster. From a latency point of view, in addition to the improvements in the read-heavy cluster, the JVM GC reduction in write-heavy clusters further improves the overall latency.
Here we select two sample clusters, one with write-heavy, one with read-heavy, to show the improvement after migration.
Sample cluster 1 (write-heavy)
In this sample, the migration happened on Sept. 28, and we saw large improvements in JVM GC and latency. For OldGen GC, we saw about a 100x reduction; we saw a 10x reduction for YoungGen GC; and we saw a latency reduction of 60%.
Espresso Storage Node - OldGen GC - Memory Usage
Espresso Storage Node - YoungGen GC - Collection Time
Espresso Storage Node - Total Latency
Sample cluster 2 (read-heavy)
In this sample, the migration happened on Oct. 18, as shown in the graph. We saw large improvements in latency, with a 30% reduction on P99 and max latency after migration.
Espresso Storage Node - 99th Percentile Latency
Espresso Storage Node - Total Latency
Espresso measures the cost of a request based on how many bytes are processed in the storage node. Therefore, we measure RCU/WCU, in addition to QPS (queries per second), for capacity.
RCU: Read Capacity Unit – Up to 8K bytes read is counted as 1 read capacity unit (RCU)
WCU: Write Capacity Unit – Up to 1K bytes written is counted as 1 write capacity unit (WCU)
The cost of 1 RCU is approximately 0.6 of 1 WCU, depending on the hardware.
To measure the capacity improvement, we ran a series of tests to compare the difference between the old system (Netty3) and the new system (Netty4).
The capacity of RCU/WCU is defined as the maximum throughput of RCU/WCU that can be achieved within a specific SLA on a storage node.
We used the following SLA metrics in tests:
Latency: P50 → 10ms, P99 → 30ms
GC pressure: reasonable GC, no anomaly
Capacity with different data sizes
When reviewing the results of RCU/WCU capabilities with different data sizes, we saw:
About 100% RCU improvement across small to large data sizes for read operations
About 60-100% WCU improvement on different data sizes for write operations
Capacity with different QPS levels
We also measured against a fixed data size, looking at the performance differences under different traffic loads. For this scenario, we examined a fixed 4KB data size and looked at the performance of JVM GC and the latencies under different QPS traffic loads.
For GC, we saw large improvements across all QPS levels—from 500 to 20K QPS.
For P99 latency, we saw significant improvements after the QPS reached 8K and higher.
For max latency, we saw a similar improvement threshold, with latency improvement returns significantly increasing when QPS reaches 2K and higher.
This is no small feat, as capacity improvement can directly reduce the cost of serving of the system.
Finding the “sweet spot”
The “sweet spot” is the data size that achieves the best RCU/WCU throughput. In other words, we are striving to find the most cost-effective way to run the system.
As we can see from the following diagram, the point of tradeoff is between CPU-bound (small data size) and Memory-bound (large data size). For our current Espresso system, the “sweet spot” of data size is around 40 KB.
To optimize resource usage, it is in the best interest of both Espresso users and the Espresso team to have the request data size be close to the “sweet spot.” For example, the read request with 100B data size costs about the same as the GET request with 7KB data size. Therefore, aggregating multiple reads of small data sizes into a larger data size of around 40KB would save considerable resources.
Along this journey, we learned a lot from our successes, but equally from the challenges we encountered. We felt it was important to share those lessons.
CPU affinity, or…not
CPU affinity typically plays an important part in performance for SMP (Symmetric Multi-Processor) Architecture, which is common in today’s commodity computer hardware.
CPU caches are 5-100 times faster than Memory access.
Improved CPU cache hit-rate would bring a big performance gain.
Espresso is a heavy-loaded, muti-threading JVM application and we felt if we could improve the CPU cache-hit rate by applying CPU affinity, it’d be a big win. We spent time in this area with available tools and libraries, but found it difficult to achieve CPU affinity and cache line alignment within the Java system. For cache line alignment, there are some workarounds like padding and @Contended annotation, but they are hard to use and are only targeted for part of the problems.
Eventually, we abandoned this effort.
Memory leak detection
Memory leak can be a disaster for mission critical systems like Espresso. By off-loading the memory from JVM heap to the buffer pool, we improved the memory footprint and system performance. On the other hand, this also requires dual diligence to managing the memory buffers with allocate/free and reference cnt, just like a native language programmer does.
There are two approaches we used to effectively detect memory leaks in the system:
Built-in Netty framework tool to detect memory leaks at the development stage.
Turn on JVM option '-Dio.netty.leakDetectionLevel=advanced'
For stress testing and production, we developed utilities to monitor the buffer pool stats. Here is a sample of the buffer pool stats:
ALL Arenas Stats:
All Active: 27348 alloc: 8693759 dealloc: 8666411
Normal: 5201 alloc: 7976932 dealloc: 7971731
Small: 21903 alloc: 465891 dealloc: 443988
Tiny: 244 alloc: 250936 dealloc: 250692
Huge: 0 alloc: 0 dealloc: 0
In the above memory allocation stats, it shows the number of total allocations and deallocations and the active (in use) buffers for each buffer pool. By monitoring the number of active buffers over time, we can detect if there is any memory leak issue in the system.
For complex system changes like Espresso Netty4 migration, we found new challenges in Test and Deployment. On testing, we identified a need for better stress and performance testing tools, in addition to a better testing/canary in a production-grade setup. For deployment, we found existing tools are geared for stateless services, while config change and validation is difficult.
HTTP2 for Espresso
HTTP2 is supported over the Netty4 framework. Implementing HTTP2 for Espresso would bring the following benefits:
Efficient binary protocol to reduce data transport overhead.
Resolve the router connection pool scalability issue.
Provide a foundation for end-to-end streaming on Espresso.
To expand on that last point, enabling end-to-end data streaming for Espresso would allow for an asynchronicity in multi-read that would decrease latency. The current multi-read latency without streaming is at least 3-5 times greater than that of a single read, with the additional time/memory being spent waiting on the slowest response. Also with streaming, responses with a large amount of content can be divided into pieces and sent as a stream, which reduces memory GC pressure because we no longer need to hold large amounts of data in memory. This would allow for response size limits to be removed in Espresso.
Migrating an existing, large distributed system with new technologies, while at the same time maintaining the system up and running with committed SLA requirements, is non-trivial. By completing this project, we modernized the foundation of Espresso with significant performance and capacity improvement, paving the way for the new development and..
LinkedIn’s feed stands at the center of building global professional knowledge-sharing communities for our members. Members talk about their career stories, job openings, and ideas in a variety of formats, including links, video, text, images, documents, and long-form articles. Members participate in conversations in two distinct roles: as content creators who share posts, and as feed viewers who read those posts and respond to them through reactions, comments, or reshares. By helping members actively participate in those professional conversations, we are fulfilling LinkedIn’s mission to connect the world’s professionals to make them more productive and successful.
This post focuses on one important aspect of the machine learning at Linkedin’s feed: candidate selection. Before final fine feed ranking, a personalized candidate generation algorithm is applied to tens of thousands of feed updates to select a diverse and unique candidate pool. We describe the machine learning models applied in candidate generation and the infrastructure capabilities that support accurate and agile model iterations.
Overview of Feed at LinkedIn
At the heart of the feed sits a machine learning algorithm that works to identify the best conversations for our members. In a fraction of a second, the algorithm scores tens of thousands of posts and ranks the most relevant at the top of the feed. In order to operate at this scale and speed, LinkedIn’s feed has a two-pass architecture. The first pass rankers (FPR) create a preliminary candidate selection from their inventories based on predicted relevance to the feed viewer. Examples include updates from your network, job recommendations, and sponsored updates. A second pass ranker (SPR) then combines and scores the output from all first pass rankers. The SPR creates a single, personalized ranked list. FollowFeed is the dominant FPR that serves feed updates from your network. More than 80% of feed updates are coming from FollowFeed, and those updates contribute to more than 95% of members’ conversations. Through these conversations, active communities are formed and strengthened.
Two-pass ranking architecture for LinkedIn’s homepage feed
At LinkedIn’s scale, the main technical challenge is to find the right balance between infrastructure impact and multi-objective optimization using comprehensive machine learning algorithms. Those objectives include members’ likelihood to view updates, their likelihood to participate in conversations, and providing timely reactions to content creators. There are hundreds of machine learning features we use to compute these objectives. We want to optimize these objectives continuously and accurately while satisfying the low latency requirements of our infrastructure footprints.
Our teams tackled this problem through a joint project amongst the Feed Artificial Intelligence (AI), Feed Infrastructure, and Ranking Infrastructure teams. As we ramp this project globally, we would like to share more details about our technical solutions.
We refreshed the machine learning software stack in FollowFeed, leveraging the latest productive machine learning technologies. Through a ranking engine upgrade and model deployment technology, we enabled frequent and agile updating of machine learning models. We also added accurate tracking of machine learning features in FollowFeed, which helps us guarantee data and model consistency across training and serving. Moreover, we developed tools to inspect machine learning algorithm complexity at serving time.
With minimal additional complexity, we have rebuilt our machine learning model for candidate selection from scratch with new objectives and different algorithms. As part of this, we introduced the prediction of professional conversation contribution into our model to capture the community-building aspect of each feed update. Instead of multiple logistic regressions with manual interactions, we’ve used a single XGBoost tree ensemble to trim down the complexity. Additionally, we’ve considered timely feedback to content creators in our model and make sure all members have a chance to feel heard. All of these things are done with minimal infrastructure capacity addition.
In summary, this project builds the engineering capabilities for us to iterate comprehensive machine learning models at the candidate generation stage, and we’ve leveraged these capabilities to deliver performant, multi-objective optimization models. At LinkedIn’s scale, these solutions will help bring economic opportunity to the global workforce through more relevant conversations with their professional communities.
Performant AI infrastructure with agility
FollowFeed, LinkedIn’s powerful feed indexing and serving system, has now been equipped with advanced machine learning capabilities. The initial design had accommodated the ranking needs for feed, but the field of machine learning has advanced tremendously in the past five years since the original FollowFeed design. During this project, we boosted the agility and productivity of machine learning in FollowFeed by adopting the latest machine learning inference and model deployment technologies. Such infrastructure enhancements enable the modeling capability described later in this blog post.
We have updated FollowFeed’s ranking engine to Quasar. Quasar, as part of LinkedIn’s Pro-ML technology, transforms machine learning features and inferences the machine learning model at query time. As a high-performance, multi-threaded ranking engine, Quasar not only optimizes for infrastructure system efficiency but also machine learning productivity. Such productivity improvements have enabled:
Cross-system leverage: We can easily port the latest machine learning models and transformers from the second pass layer to FollowFeed.
Training and serving consistency: At offline training time, the same codebase is used to represent the model as at serving time.
To reflect the rapid evolution of LinkedIn’s content ecosystem, machine learning models have to be constantly updated. We’ve built FollowFeed’s model deployment on top of LinkedIn’s Continuous Integration and Deployment (CICD) stack. Being a stateful system that indexes members’ past activities, FollowFeed presents a unique challenge in model deployment. We have to avoid calling external services to maintain high reliability and performance of index nodes where ranking is taking place. To optimize for such limits, we previously coupled ranking models with code, which leads to strong coupling of service deployment with model coefficient changes. To allow for easier model evolution, our solution is now a data pack-based deployment model, where we package the machine learning models in a separate code base, a.k.a. a multi-product. Those models are treated as a “data pack,” a deployable package consisting only of static files to be dropped into a specific location of production machines. Through such a design, model deployment can be easily managed by LinkedIn’s CICD system. Consequently, we’ve improved model deployment velocity from 3 days to 30 minutes.
In addition to ranking and model deployment, we designed and implemented advanced feature access and tracking in FollowFeed. As it scores thousands of documents per session, FollowFeed optimizes access to machine learning features needed by scoring models. Viewer features are passed down as part of the request without requiring an external call to be made. Features for feed updates are ingested, stored, and updated alongside these updates so that they are accessible locally on the index nodes for scoring. Given the well-known data inconsistency challenges between offline training and online scoring, we also added accurate tracking of machine learning features in FollowFeed. This guarantees data consistency between offline training data and online inference data. Aggregating these tracking events across the distributed index nodes presents a challenge. Even though results from the index nodes are aggregated in the broker layer, we do not want to gather the tracking data synchronously due to scalability and serving latency concerns. Our design overcomes these concerns by having individual index nodes and the broker node stream tracking events asynchronously to Kafka streams. A Samza job joins these Kafka streams and emits a comprehensive tracking event for the request.
Equipped with advanced machine learning capabilities, it is much easier to develop performant machine learning models for FollowFeed with accurate features. Such agility will enable better candidate feed updates to be surfaced on LinkedIn’s homepage. Through actively updating the machine learning model, we will be able to give more power to existing and newly-minted content creators in LinkedIn’s ecosystem. We will also facilitate the professional community builders’ curation of their audience.
Optimizing for conversations at candidate generation
We’ve rebuilt the machine learning model at FollowFeed to optimize multiple objectives under given infrastructure constraints. The existing candidate selection is conducted through a logistic regression (LR) model that predicts the click probability. To facilitate professional conversations in LinkedIn’s feed, we have introduced “contribution” as an additional objective in the candidate selection model. Probability of contribution captures members’ intent to share, comment, or react to a particular feed update. The model also takes into account timely feedback to content creators, which is a clear signal for cultivating and retaining audience builders on LinkedIn.
To achieve these goals, our design evaluates candidate selection through recall instead of precision, combines multiple objective functions in a gradient boosting tree model, and trims down features and manual transformations.
Given that there is a comprehensive model at the second pass layer with more sophisticated computation per update, the job of candidate selection should be focused on generating a rich set of candidates. Clarifying such goals helps us evaluate our model much more effectively. Instead of precision, we use recall as an evaluation metric because it measures the overlap between the top K candidates generated by FollowFeed and the second pass ranker. K is very large for LinkedIn’s feed. To overcome the selection bias introduced by FollowFeed, we randomize its output for a small percent of traffic so that the second pass layer can be exposed to a representative range of candidates. This technique helps us approximate the second pass ranking at the candidate selection process. This technique helps us approximate what the second pass layer's ranking would be if the candidates were randomized and not pre-ranked by the first pass layer. Through various efforts outlined below, we have effectively doubled the recall percentage throughout the project.
Examples of recall calculation
We use a logistic regression model with differential weights to incorporate additional contribution objectives in a single objective model. It helps reduce the parameter space and model complexity by half while effectively reproducing the effect of a multi-objective model. Through a suite of tools to conduct CPU/network benchmarks, JVM model profiling, and feature importance analysis, we’ve simplified the model by replacing manual feature transformations with the gradient-boosted tree and trimming down excessive features. We utilized the same differential weighting technique to train the single tree ensemble. As gradient-boosted trees are powerful models themselves, we explored using a tree model alone, without logistic regression as the final layer, to predict the re-weighted objective. The list below shows various techniques we’ve tried, with different trade-offs in terms of infrastructure cost and online results.
Baseline: The production model for FollowFeed for the past several years has been a logistic regression using manually-transformed features that predict the probability of clicks.
Tree + transformations + logistic regression: We added the contribution objective to our model. In addition to the existing transformed features, we also added XGBoost tree scorers as interaction features to the contribution-weighted-click logistic regression. This version performed much worse in terms of CPU utilization and 99th percentile latency, but online experiments showed strong lift in member engagement.
Tree + logistic regression: We removed manually-transformed features from the above implementation to reduce model complexity. Its infrastructure costs are still worse than baseline, but better than the previous implementation. We can keep similar engagement improvements.
Tree only: We use XGBoost tree scorer to calculate the contribution-weighted-click and remove the logistic regression component. We keep the majority of engagement improvements, This version has minimal additional overhead on CPU utilization and actually reduced tail ranking latency compared to the baseline. We’ve decided to ramp tree-only model to all members.
We’ve also adjusted the candidate selection results based on the freshness of the updates. This comes from observing a secondary creator-side effect while ramping an earlier version of the candidate selection algorithm. While viewers are less sensitive to the freshness, content creators do care about receiving prompt responses. By adjusting the freshness, viewers can provide creator feedback earlier and stay in active conversations.
The project’s models are already rolling out across LinkedIn. The feed updates are scored and ranked in Quasar. The machine learning models have been deployed through LinkedIn’s CICD system using FollowFeed’s data pack. Over the past few years, we’ve tried about 100 variations of the machine learning models and picked the best performing one through online A/B testing. We have seen a lot more members on LinkedIn's feed participating in professional conversations due to better candidate selection. We’ve also observed more reactions given to a broader range of creators so that they have additional ways to engage with content on the platform. We are looking forward to ramping this to all our members soon. In the meantime, we are continuing to innovate in our machine learning infrastructure and modeling technologies to bring the best conversations to members’ homepage feed.
At LinkedIn, our mission is to connect the world’s professionals to make them more productive and successful. Part of the way we achieve this goal is by providing a platform where communities connected by common interests or shared experiences can form. Several features—for instance, Groups—help foster the growth of these communities. However, an integral tool for connecting members to each of their various communities is the LinkedIn feed. The feed provides a place for our members to discover and join conversations happening among their connections or within their groups.
As we’ve discussed previously on this blog, artificial intelligence (AI) plays an important role in sifting through the myriad updates and posts available to serve to a member when they view their feed. These efforts are managed by the Feed AI team within LinkedIn. There are many different parameters to optimize for, however, within the larger umbrella of feed relevance. To assist these efforts, we have a dedicated Communities AI team focused on using AI to serve members relevant content that sparks interest and conversation within the communities they’re part of.
In this post, we give an overview of the techniques that the Communities AI team uses to help form communities and conversations around common areas of interest. These techniques include: follow recommendations, topic-based feeds, hashtag suggestions, and typeaheads.
Fostering active communities at LinkedIn can be broken down into the following components:
Discover: Help members find new entities to follow that will connect them with communities that share their interests.
Engage: Engage members in the conversations taking place in their communities by serving content from their areas of interest.
Contribute: Help members effectively engage with the right communities when they create or share content.
These three aspects are all part of the same ecosystem and our goal is to build an AI platform that closes the loop between Discover, Engage, and Contribute.
There are many ways we can determine a member’s interests. However, we focus here on the explicit signal the member provides by following different entities (e.g., following companies, other members, influencers, hashtags, or joining groups). The vast majority of the follow connections that happen on LinkedIn are driven by the Follow Recommendations product (see Figure 1) that is developed by the Communities AI team.
Figure 1: Example follow recommendations for a member. This example shows hashtag and company recommendations.
The goal of the Follow Recommendations product is to present the member with follow recommendations that the member finds both relevant (i.e., increase the probability the member will follow the recommended entity) and engaging (i.e., the recommended entity produces content that the member finds relevant). Estimating relevance and engagement is where the bulk of the machine learning work happens. To compute these estimates, we rely on information (features) from the viewer (member), the entity to be followed (e.g., influencer, company, hashtag, group), and the interaction between the viewer and the entity (e.g., the number of times the member viewed the feed of a specific hashtag).
There are over 630 million members on LinkedIn. This presents a scaling challenge and a relevance challenge. The Follows Relevance data flow processes hundreds of terabytes of data and is the second largest at LinkedIn after the People You May Know flow. To understand how we managed this explosion of data, we refer the reader to the article Managing "Exploding" Big Data.
As soon as a member follows an entity, the content generated from that entity starts flowing to the member’s main feed and is ranked in conjunction with other content (e.g., posts from 1st degree connections).
The member can also go to specialized feeds for each entity that they follow, be it a hashtag, group, event, etc. We personalize these feeds by ranking more relevant content higher. For example, a post in the #AI feed that is posted by an influencer the member follows is more likely to be relevant than a post that is generated by another member.
The goal of feed ranking at LinkedIn is to help members discover the most relevant conversations that will aid them in becoming more productive and successful. Relevance is determined by our objective function which optimizes for three main components: The value to the member, the value to the member’s network (downstream effects), and value to the creator of the post. A diverse set of machine learning and experimentation techniques are used to estimate these three components and the combined effect of the three (e.g., see Spreading the Love in the LinkedIn Feed with Creator-Side Optimization).
Our Hashtag Suggestions and Typeahead (HST) product recommends hashtags that allow the member to effectively target their posts to the right communities. In addition to reducing the friction the member faces when trying to add hashtags to their post, the HST product allows us to consolidate content around areas of interests and prevent content fragmentation.
The objective of HST is to both increase the probability that a member will select relevant hashtags from the recommended list to add to their post and increase the relevant feedback the member will get on their post. Here we use a variety of natural language processing (NLP), deep learning, word embedding, and supervised learning techniques to recommend relevant hashtags. The HST product is shown in Figure 2 below.
Figure 2: A demonstration of the HST product in the LinkedIn Share Box. HST suggests hashtags while the member types their post.
Interactions among Discover, Engage, and Contribute
A member’s journey does not necessarily have to happen in the order given above. In addition, a member’s behavior in one aspect can be a valuable signal in another aspect. For example, if a member tags a post with a specific hashtag that the member does not follow, then the Follow Recommendations product can use this signal to determine the relevance of this hashtag to the member and recommend the hashtag to be followed.
The Communities AI’s vision is to use Artificial Intelligence (AI) to serve relevant content to members based on what they are interested in and help members engage with each other. We’ve explained the three aspects of our communities ecosystem (Discover, Engage, and Contribute). In addition, we’ve highlighted some of the technical challenges in each aspect.
There are more details that we intentionally skipped because they are out of the scope of this article. Stay tuned because we’ll likely discuss these details in future articles!
If Apache Kafka is the lifeblood of all nearline processing at LinkedIn, then Apache Samza is the beating heart pumping that blood around. Samza at LinkedIn is provided as a managed stream processing service where applications bring their logic (leveraging the wide variety of Samza APIs), while the service handles the hosting, managing, and operations of the applications. Applications run on managed clusters, equipped with a cluster manager such as YARN, Kubernetes, or Mesos.
The number one priority for such a managed service is stability, while providing the usual desirables (new features, upgrades, improved guarantees, among others). The go-to way of ensuring this is rigorous and comprehensive testing. However, comprehensive testing at a cluster (or higher) scale—be it for failure testing, testing for rolling out upgrades, or feature development—is often cumbersome and time-consuming.
What if there were a way to perform cluster-scale testing from the comfort and confines of your developer machine? At LinkedIn, we use Linux Containers (LXC), emulating a realistic YARN cluster, to be used for testing new features, automated failure testing for Samza, YARN upgrades, and YARN-Samza interactions.
Docker, LXC, and the art of OS-level virtualization
Similar to Docker, LXC is an OS-level virtualized solution that uses Linux namespaces and cgroups to create multiple isolated Linux virtual execution environments that share the same kernel, or “chroot on steroids.” In fact, early versions of Docker used LXC. OS-level virtualization benefits from lower virtualization overhead, because of which it offers higher density, compared to full- or para-virtualization approaches.
This means one can create multiple Docker or LXC instances on a machine, have them participate in a YARN-managed cluster, and run and test applications (like Samza) over this setup. YARN simply sees the Docker or LXC instances as “machines.” All multi-machine code paths get exercised and tested, while the low virtualization overhead allows for great density. We’ve been able to create YARN clusters of 10 to 100 “machines” on a single commodity physical machine.
That said, Docker supports using only a single process per Docker instance, although more recently that has been termed a best practice rather than a strict limit. Currently, LXC is our choice for the virtualization solution, but we will explore Docker soon.
Setting things up
This entire setup has been automated and is available here, and works in conjunction with samza-hello-world. The figure below provides an overview of our setup that uses LXC to emulate a YARN cluster on a given host (called base-host).
Networking We set up a private subnet for the LXC based virtual-private-cluster. In this, the base host has a virtual network interface (using Linux’s libvirt) and a private IP (e.g., 192.168.9.1). Similarly, all LXC instances use a virtual network interface with a similar private IP, all connected to a virtual subnet with the base host acting as the gateway. In addition, all LXC-instances are source-NATed. This allows them to talk to the internet, particularly useful for installing or upgrading packages within the LXC instance.
YARN and deployment Each LXC instance runs a YARN NodeManager (NM), which is networked to a YARN Resource Manager (RM) running on the base host, along with Apache Kafka and a Zookeeper instance. In addition, we use a shared directory on the base host that is mounted within each LXC instance. This shared directory serves as a virtual repository to distribute application binaries to the LXC instances using Samza’s package.path config.
The end result is a YARN cluster with LXC-instances acting as hosts, sized to over-subscribe and hence statistically multiplex the base host’s resources (e.g., a cluster of 50 LXC instances of 8 GB each on a base host with 64 GB of RAM).
Making the network realistic
Apart from the setup, we also need to emulate the network to ensure that our testing reflects the characteristics of the real network, such as bandwidth limit, network delay, packet loss, etc. Given the private-subnet based networking for the LXC instances (above), we use Dummynet to set up bandwidth limits between LXC instances, and add network delays that represent typical inter-data-center latencies and packet loss. For example:
This adds a bandwidth limit of 100 Mbps with a 10 ms delay and 1% randomized packet loss on “pipe 1,” which connects LXC-instance-0 to the gateway. Similarly, queuing delays can be added by specifying the queue size for “pipe 1.” This article from the Association for Computing Machinery describes Dummynet in greater detail.
Putting it to use
There are a number of use cases for which we’ve found our setup useful because it avoids the overhead of using a real cluster (e.g., setup, coordination, etc.). Our use cases mostly fall under the three types: validation, failure testing, and development.
Our most common use case is validating YARN semantics and performance before adopting an upgraded version. This is especially important when running Samza as a managed service to ensure that features like Samza’s host affinity have not regressed. This setup allows us to carry out controlled runs of Samza jobs and validate different host-affinity related metrics.
Another class of use cases is failure-testing. In the case of machine failure, testing Samza behavior is much easier because it can be triggered by simply issuing a command to stop an LXC-instance. Container failures are tested similarly, while network partition behavior is tested seamlessly by setting the bandwidth of an LXC-instance to 0 (using Dummynet as described above).
Lastly, this setup also improves the developer experience when testing features that require multi-machine interactions, such as Samza’s standby containers, host-affinity, startpoints, and partition expansion. We’re working on extending this approach to test and validate other distributed systems which bear similar requirements.
With distributed systems being increasingly offered as “managed services,” conducting realistic, rigorous, yet easy testing continues to be a problem today. Hopefully, our approach finds a use in the distributed systems that you build and manage.
Want to work on similar problems in large-scale distributed systems? The Streams Infrastructure team at LinkedIn is hiring engineers at all levels! To learn more, check out all of our open roles on LinkedIn.
Pinot is an open source, scalable distributed OLAP data store that entered the Apache Incubation recently. Developed at LinkedIn, it works across a wide variety of production use cases to deliver real-time, low latency analytics.
One of the biggest challenges in Pinot is achieving and maintaining tight SLA on latency and throughput on large data sets. Existing indexing techniques, such as sorted index and inverted index, help accelerate the document search to improve query latencies. However, their performance scales linearly with the number of documents to be processed in computing the results. On the other hand, pre-aggregating the results can reduce the number of documents to be processed, thus enforcing a constant upper bound on query latencies. This can lead to limited query support (by limiting the dimension combinations) or storage space explosion (when multiple dimension combinations are considered).
For aggregation and group-by queries, we introduced Star-Tree index to utilize the pre-aggregated documents in a smart way that achieves low query latencies, while using the storage space efficiently.
Consider the following data set as an example to discuss the existing approaches (the data set has been de-duped and pre-sorted by country):
Sorted index In this approach, data is sorted on a primary key (Country in the example), which is likely to appear as a filter in most queries in the set.
Below is the sorted index for column Country:
Consider the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’
With sorted index on column, Country, the time complexity of searching the documents for the given primary key value ‘USA’ is reduced from linear scan Θ(n) to constant time value lookup Θ(1). Additionally, sorting provides spatial locality for documents, which reduces disk fetches significantly when fetching the documents, and thus improves latency.
While this is a good improvement over linear scanning, there are still a few issues with this approach:
Sorting can only be applied to the primary key, which means only queries filtering on that one column can benefit from the sorted index.
While search time is reduced from Θ(n) to Θ(1), aggregation cost is still a function of the total number of documents (in this example, 3) to be processed to answer the query.
Inverted index In this approach, for each value of a given column, we maintain a list of document IDs where this value appears.
Below are the inverted indexes for columns Browser and Locale for our example data set:
Consider the query: SELECT SUM(Impressions) FROM Table WHERE Browser = ‘Firefox’
With inverted index on column Browser, similar to the sorted index, the document search becomes a simple value lookup for the key ‘Firefox’ to directly get the matching documents of [1, 5, 6].
Using inverted index can bring the documents search time down to constant time Θ(1) for arbitrary columns. However, it cannot leverage spatial locality, and, similar to sorted index, the aggregation cost is still a function of the query’s selectivity (in the example, this would be 3).
Pre-aggregation In this technique, we pre-compute the answer for a given query set up front.
In the example below, we have pre-aggregated the total Impressions for each Country:
Consider the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’
With pre-aggregation, the query can be solved by just a value lookup, and we can directly get the final result of 1200 without extra aggregation cost.
However, with the pre-aggregation in the example, we are able to solve only queries with predicate on Country. To be able to answer queries with multiple predicates implies pre-aggregation for various combinations of different dimensions. This leads to exponential explosion to the number of dimensions in storage space (considering the query: SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’ AND Browser = ‘Firefox’ AND Locale = ‘en’).
The graph below shows the performance gain and space cost for the different techniques. On the left side, we have indexing techniques that improve search time with limited increase in space, but do not guarantee a hard upper bound on query latencies because of the aggregation cost. On the right side, we have pre-aggregation techniques that offer hard upper bound on query latencies, but suffer from exponential explosion of storage space.
We propose the Star-Tree data structure inspired by the star-cubing paper (Xin, Han, Li, & Wah, 2003) that offers a configurable trade-off between space and latency and allows us to achieve a hard upper bound for query latencies for a given use case.
In the following sections, we will first define the Star-Tree data structure, followed by a Star-Tree example on the sample data set. Then, we will discuss how Star-Tree is utilized within Pinot to achieve low latencies with high throughput.
Tree structure Star-Tree is a tree data structure that has the following properties:
Root Node (Orange): Single root node from which the rest of the tree can be traversed.
Leaf Node (Blue): A leaf node can contain, at most, T documents, where T is configurable.
Non-leaf Node (Green): Nodes with more than T documents are further split into children nodes.
Star-Node (Yellow): Non-leaf nodes can also have a special child node called the Star-Node. This node contains the aggregated documents after removing the dimension on which the data was split for this level. Star-Node can be either a leaf or non-leaf node.
Dimensions Split Order ([D1, D2]): Nodes at a given level in the tree are split into children nodes on all values of a particular dimension. The dimensions split order is an ordered list of dimensions that is used to determine the dimension to split for a given level in the tree.
Function Column Pairs: The pre-aggregations to perform when generating the tree.
Max Leaf Records: The threshold T to determine whether to further split each node. This threshold is used to tune the level of pre-aggregations performed. With a larger threshold, the index size will be smaller, while more documents will need to be processed to answer the query.
Node properties The properties stored in each node are as follows:
Dimension: The dimension by which the node is split on.
Value: The value of the dimension that the node represents.
Start/End Document ID: The range of documents this node points to.
Aggregated Document ID: One single document which is the aggregated result of all documents pointed to by this node.
Index generation Star-Tree index is generated in the following steps:
The data is first projected per the dimensionsSplitOrder. Only the dimensions from the split order are reserved, while others are dropped. For each unique combination of reserved dimensions, metrics are aggregated per configuration. The aggregated documents are written to a file and served as the initial Star-Tree documents (separate from the original documents).
Sort the Star-Tree documents based on the dimensionsSplitOrder. It is primarily sorted the first dimension in this list, and then secondarily sorted on the rest of the dimensions based on their order in the list. Each node in the tree points to a range in the sorted documents.
The tree structure can be created recursively (starting at the root node) as follows:
If a node has more than T records, it is split into multiple children nodes, one for each value of the dimension in the split order corresponding to current level in the tree.
A Star-Node can be created (per configuration) for the current node, by dropping the dimension being split on and aggregating the metrics for rows containing dimensions with identical values. These aggregated documents are appended to the end of the Star-Tree documents.
If there is only one value for the current dimension, Star-Node won't be created because the documents under the Star-Node are identical to the single node.
The above step is repeated recursively until there are no more nodes left to split.
Multiple Star-Trees can be generated based on different configurations (dimensionsSplitOrder, aggregations, T). The query executor can pick the one with configurations capable of solving the query (discussed in Query Execution section)
Aggregation Aggregation is configured as a pair of the aggregation function and the column to apply the aggregation.
Supported functions All aggregation functions with bounded-sized intermediate results are supported:
COUNT: Intermediate result Long is bounded
MIN: Intermediate result Double is bounded
MAX: Intermediate result Double is bounded
SUM: Intermediate result Double is bounded
AVG: Intermediate result Pair<Double, Long> is bounded
MINMAXRANGE: Intermediate result Pair<Double, Double> is bounded
DISTINCTCOUNTHLL: Intermediate result HyperLogLog is bounded
PERCENTILEEST: Intermediate result QDigest is bounded
PERCENTILETDIGEST: Intermediate result TDigest is bounded
Unsupported functions Some aggregation functions have unbounded-sized intermediate results, which are not supported to prevent storage space explosion. However, the approximation of the result can be achieved with the supported functions above.
DISTINCTCOUNT: Intermediate result Set is unbounded
PERCENTILE: Intermediate result List is unbounded
Query execution For query execution, the idea is to first check metadata to determine whether the query can be solved with the Star-Tree documents. If so, then traverse the Star-Tree to identify the documents that satisfy all the predicates. After applying any remaining predicates that were missed while traversing the Star-Tree to the identified documents, apply aggregation/group-by on the qualified documents.
Metadata check In order to solve an aggregation/group-by query with Star-Tree, all the columns in filters and group-by clauses must be materialized (configured in dimensionsSplitOrder), and all the aggregations must be pre-aggregated (configured in functionColumnPairs). The query executor will pick the first Star-Tree that meets the requirements or fall back to normal aggregation if no one is qualified.
Traverse the tree The algorithm to traverse the tree can be described with the following diagram:
Apply remaining predicates Some of the dimensions might not be split because of the leaf records threshold. In such a case, the remaining predicates on the not-split dimensions will be applied after traversing the tree (same as normal query execution except for the use of the pre-aggregated records). The leaf records threshold is the upper limit of records to be processed for each branch in the tree.
Use this example data set and the following configurations as an example:
Max Leaf Records: 1 (We put 1 here so that all of the dimension combinations are pre-aggregated for clarity)
Tree structure The values in the parentheses are the aggregated sum of Impressions for all the documents under the node.
Query execution SELECT SUM(Impressions) FROM Table
Because there is no predicate or group-by on any dimension, select the Star-Node for all dimensions (document 26). Instead of aggregating all seven documents without Star-Tree, we directly get the aggregation result 2200 by processing only one document.
SELECT SUM(Impressions) FROM Table WHERE Country = ‘USA’
Because there is only a predicate on Country, select the node with value ‘USA’ for Country, the Star-Node for Browser and Locale (document 14). Instead of filtering out and aggregating three documents without Star-Tree, by processing only one document, we get the aggregation result 1200.
SELECT SUM(Impressions) FROM Table WHERE Locale = ‘en’
Similar to the last query, select the Star-Node for Country and Browser, the node with value ‘en’ for Locale (document 23). Again, by processing only one document, we get the aggregation result 1500.
SELECT SUM(Impressions) FROM Table GROUP BY Browser
Because there is a group-by clause on Browser, select the Star-Node for Country and Locale, and all nodes except for the Star-Node for Browser (document 15, 19, 22). For Country ‘*’, Browser ‘Chrome’, since there is no Star-Node for Locale, select all child nodes instead. To get the group-by result, we need to process only one document for each group.
We have performed a benchmark comparing the performance gains from Star-Tree index and inverted index against 48 million records generated from the TPC-H tools. Results were as follows:
With such huge improvements in performance, the Star-Tree index only costs about 12% extra storage space..
“Make LinkedIn feel instantaneous” is a mantra our performance team lives by to deliver the best possible experience to our members. We are constantly on the lookout for ways to improve our site speed and user experience, be it through optimizing our infrastructure and edge, accelerating our web and mobile applications’ performance, or tuning our systems and making our services more efficient.
Experiments at the edge
A vital area of focus to achieve our aspiration to make LinkedIn feel instantaneous is to deliver content as effectively as possible to our members. This demands continuous brainstorming, coming up with optimizations, and experimenting to see their effects. We run a lot of such experiments at the “last mile,” covering LinkedIn’s infrastructure that directly connects to our members over the internet and tremendously influences how fast content is delivered to them.
In the recent past, some of the experiments we have run at our edge have yielded interesting results.
For example, when we looked at how three different experiments performed across four countries, we found that different “optimizations” operate differently across geographies. Case in point is a recent study we did with TCP congestion control algorithms.
We continued to see such behavior in other experiments as well. This trend is illustrated in the chart below. For example, experiment 1 shows a lot of improvement in India, compared to other regions, while experiment 2 was more divisive, showing big gains in India but degradations in all other regions.
These findings weren’t too surprising, but reinforced the idea that we needed to deliver specific optimizations to certain regions.
Next, we analyzed the data from one experiment over a controlled population, specific to a region. The results were more concerning now. The chart below shows how much improvement we got each day from this experiment. And it varies a lot—from an improvement of 5% one day to swinging to a 2% degradation the next! (You can get an idea of how much is a lot here.)
It is critical to understand the implications of this pattern. In a controlled environment, it is very rare that results of such configurations at the edge fluctuate between being very good on one day and bad on another.
We conducted further analysis by breaking down the data from the experiment by Autonomous Systems, or ASNs (think your internet provider) in a given region. This gave us a better idea of what was happening behind the scenes. We eventually uncovered the fact that even in a given region, different networks come with highly divergent characteristics. In other words, an optimization that we think works may work for members on some networks, but not for others. By rolling out such optimizations, we were forcing a “selective penalty” on some members, while improving the content delivery for others.
Here is the chart for the above experiment, considering the top eight ASNs in the region we were investigating.
This variability is ominous to performance and there is no guarantee that all our members are getting better experience from such “optimizations.”
The missing piece
To prove our hypotheses about network characteristics playing a significant role in determining impact to our members, we took an experiment as an opportunity to understand how member engagement is impacted when provided with a faster site speed experience.
The experiment entailed analyzing and classifying members’ networks based on their historical performance (RUM) data, as “fast,” “average,” and “slow” network quality. We then served a “faster” (lighter) application experience to a randomly chosen segment of members. After a few days of experimentation, we analyzed the data and compared how members engaged with the application across the three classes of network quality.
We found that member experience and engagement improve when we deliver a faster application. The impact is that such an application engages members on “slow” networks a lot more than members who are on faster networks.
To understand if our members liked the faster experience, we looked at how many user sessions were created in each segment and how many members visited the application. For example:
In each network quality class, the improvement in site speed gradually improved across the board with the members on fast networks seeing the least improvement, while the ones on slow networks saw the largest gains.
Subsequently, the number of unique members visiting the application and the number of sessions both improved proportionally.
One segment stood out to us as an anomaly: the number of sessions did not grow proportionally with site speed improvement for the average class. We concluded through further analysis that this is most likely due to the “lighter” nature of the experience not being “preferred” by this class of members over a slower experience. This is likely where the trade off between site speed and features becomes critical.
This analysis proved two hypotheses:
Members are more engaged on a faster application. As the experience gets faster, user engagement goes up. This is especially true for members already on slow networks.
Network quality plays a significant role in understanding our members’ experience. By appraising the network quality of a user, we could customize their experiences by providing them what they prefer, suitable to the network they are on.
Network quality as a service: Customizing content delivery
The experiments and analyses above helped us better understand how members’ network quality, among other factors, influences their experience on the application. We can then use this knowledge to customize the content delivery to their needs and enrich our members’ experiences on LinkedIn by undertaking the effort to provide network quality as a service within LinkedIn.
Defining and measuring metrics for network quality is fairly straightforward to implement and employ. The challenge arises in circumstances where measurement might either be antiquated or simply infeasible. To handle such scenarios, we have built a deep learning pipeline using RUM data to be able to predict the network quality of every connection to LinkedIn.
Stay tuned for Part 2 of this series to be focused on delivering customized content to our members.
The entire story leading to network quality as a service has been a multi-team effort spanning many quarters. This has involved many engineers and managers across the Performance Engineering, Edge SREs, Traffic Infra Dev, Data Science, Flagship Web, LinkedIn Lite, and AI/ML teams at LinkedIn.
I would like to specifically thank Ritesh Maheshwari and Anant Rao for supporting us through this journey. Special mention to Brandon Duncan for putting up with us through the process and being supportive of such exploratory efforts!