On August 17th, PayPal announced the Migration to DigiCert Root Certificates to address upcoming changes that major web browser and operating system providers will be making to remove trust for Symantec root certificates.
As of today, we’re happy to announce that PayPal has migrated critical endpoints to the new DigiCert Root Certificates. This significantly reduces the likelihood that Consumers and Merchants will be impacted.
To most software engineers, databases are boring, or rather — should be boring. You put the data in, you take the data out and it just works like a well-fed German. The real magic happens on the front end, or in the middle tier of APIs. Unfortunately for most developers, and fortunately for the ones who specialize in databases, it’s not always that simple.
In computer science, ACID is a fairly ancient concept and has all but disappeared from fashionable tech articles, while the CAP theorem is still considered ‘fresh’ and worthy of references due to its theoretical basis for NoSQL products. But what do they mean and are they important for us?
Let’s park that question and go on a scenic journey through time to understand the context.
History of ACID and transaction isolation levels
No, not 1965, not the Beatles’ acid. We start a bit later — 1983, the year Michael Jackson’s Thriller went crazy on the charts, Return of the Jedi hit the big screens, Ronald Reagan sent troops to Grenada, Apple released Lisa, IBM launched PC XT and Microsoft delivered the first version of Word. That same year, two German professors declared 4 major properties of a database transaction, whose availability would greatly ease the pain of software engineers building robust systems:
Atomicity — guaranteeing all or nothing writes (think COMMITs)
Isolation — concurrency control (think about it for long enough and your head will explode)
Durability — NOT losing data (think the opposite of the 2010 MongoDB)
Even though A, C and D were allegedly built into the old hierarchical IMS database system back in 1973, the paper was a major influence on the then modern relational databases to support these properties, as well as on the development of ANSI SQL.
Fast forward to 1992, the year Nirvana released Nevermind, Kurt Cobain married Courtney Love, Radiohead released Creep, Bill Clinton was elected the US president, the MP3 file format was invented and Reservoir Dogs ushered a new era of on-screen violence. The big thing in the world of databases was SQL-92, a major expansion and revamp of the then relatively immature SQL-89 standard. For our story, the most important addition was the definition of transaction isolation levels — that’s the I in ACID. Some geeky people realized there are various complications that can arise due to concurrent reads and writes of data, which were classified as distinct phenomena: dirty reads, non-repeatable reads and phantoms.
I won’t delve into further details as there is plenty of online material which does precisely that. I highly recommend kicking off with Vlad Mihalcea’s blog and his hands-on introduction. To fully grok this stuff you really need to warm up your seat, open up two SQL terminal windows and walk through the examples. BTW, check out his Hibernate posts, especially if you’re stupid enough to have painted yourself into a corner while trying to code something ‘elegantly’ with Hibernate.
Onto 1995. Radiohead released The Bends, Clinton bombed the Serbs in Bosnia and the same crowd which now queues up for Apple products queued to buy Windows 95. As for poor ole Cobain — he didn’t make it to ’95. More importantly for us, a few Microsoft researchers decided Win95 wasn’t enough of a can of worms and published a research paper titled “A Critique of ANSI SQL Isolation Levels”. 4 more phenomena were identified as well as 2 new isolation levels. Curiously, the response from the ANSI SQL team was mute: to this day the original 1992 classification stands. More annoyingly for hair splitters such as yours truly, one can argue it allowed some large vendors to get away with misclassifying their isolation level support (e.g. Oracle DBMS Serializable is in fact Snapshot Isolation).
We’re ending with 2013, I promise, but not without further digressions. All the popular new music and movies sucked and had been sucking for years, Hillary Clinton was at the height of her popularity (not long after she bombed Gaddafi in Libya) and Bill Gates finally admitted that Ctrl-Alt-Del had been a “mistake”. Anyhow, we zero in on an academic paper published by a few Greek professors innocuously titled “Survey on consistency conditions”. Beware of Greeks bearing research papers; in this case they rounded up a few tons of worms, stuffed them in a giant can and then opened it. Kyle Kingsbury from Jepsen.io neatly summarizes all the new isolation levels with the following sketch:
We’ll come back to this diagram shortly. Now, let’s talk about
The CAP Theorem
First published in 2000, then proven in 2002 (no more flashbacks — yay!), the Brewer a.k.a. the CAP Theorem claims the following: you can pick any 2 out of the following 3:
Consistency — as in I[solation] in ACID
Availability — every request receives a response; every meaning 100% every!
Partitioning — the system still functions even if messages are dropped or delayed between nodes. Note that basis of the theorem is commodity hardware in a clustered configuration.
In other words, one can classify systems as C-P, CA- or -AP.
Why is the theorem well known? There is this little company called Amazon which up until the late 90s had run their complete database on a single instance/single server Oracle DBMS. At the time, the Amazon e-Commerce database was the largest Oracle deployment in the world and unsurprisingly there were serious concurrency issues — crème de la crème of the then Oracle database experts scratched their heads witnessing weird phenomena which they never encountered before. In simple terms, the system was reaching the scalability constraints of the single-server architecture.
The Amazon engineers then realized that it was fine to ditch the ACID shackles and went into the –AP space. Who really cared about consistency for product clicks, searches and such? Eventually they even migrated their inventory management to eventual consistency, with full awareness of the possibility of unfulfilled orders due to stock outs — the dreaded balance-less-than-zero issue that comes with eventual consistency. In Amazon’s case, this scenario is indeed carefully monitored but it turns out to be a tiny fraction of orders, which gets handled through an exception process. All in all, it worked out swimmingly for Amazon, and it gave a lot of street cred to the CAP theorem, too.
So what’s my beef with the CAP Theorem then? It’s how useful it is in practice. Note that:
It only covers commodity hardware (no SANs, no Infiniband)… OK, this one is not a biggie unless you’re an Oracle RAC fanboi.
It was based on the Linearizable isolation level. There’s a grand total of one commercial DBMSs (Azure Cosmos DB) I am aware of that supports that exact model, although, to be fair the theorem does extend to the whole red area on the Jepsen sketch above, which includes more common isolation levels.
It does not consider latency at all. For any practical purposes latency is extremely important — a late response in the real world often equates to no response.
It talks about 100% availability, which simply does not exist in the real world.
Let’s look at a real world case if you don’t trust me blindly (you never should). There’s this other little company called Google. One of the big problem areas for Google is real-time bidding for online advertising. This is a serious issue because of the scale, as well due to consistency requirements — it involves concurrent bids and money. Grave stuff, indeed. Google engineers naturally tried to tackle the issue the Amazon way — with lots of developers trying to solve individual consistency problems piecemeal. And it kind of sucked. So some super-duper (not just super) clever Google engineer realized that it’s better to solve the issue once and for all rather than having hundreds of merely super-clever engineers trying to solve them individually (and often poorly). So, ladies and gents, straight from the Google’s mouth:
F1: A Distributed SQL Database That Scales
The AdWords product ecosystem requires a data store that supports ACID transactions. We store financial data and have hard requirements on data integrity and consistency. We also have a lot of experience with eventual consistency systems at Google. In all such systems, we find developers spend a significant fraction of their time building extremely complex and error-prone mechanisms to cope with eventual consistency and handle data that may be out of date. We think this is an unacceptable burden to place on developers and that consistency problems should be solved at the database level
F1 later became Spanner — a globally distributed database that supports the strong consistency isolation level, the strongest possible. And the availability I hear you ask? 99.999%. Yes, that’s 5 9s in your face right there. Try to set up and run a platform with that sort of availability, I dare you. There are significant challenges in Spanner — high latency writes, no referential integrity, issues with online schema upgrades etc… Even Google can’t make silver bullets, but keep in mind you still get much more compared to a typical NoSQL data store.
One final note on the CAP Theorem: look back at the green area on the sketch and you’ll see RC — Read Committed. It turns out that RC is the default isolation level on the old Oracle BDMS, which tends to be good enough for a lot of business applications. Curiously, I can’t find any of the NewSQL products, such as Google Spanner, that utilize this particular loophole — the fact that you can get a theoretical cAP (let’s call it a lowercase “consistency”, still much better than “eventual consistency”). Just a wink to the vendors…
This is just BS. I’m friggin’ bored. Do you have actually anything useful to say?
Indeed I do, just testing your patience. And you should try meditation, you’ll be less bored (and boring). Without further ado -
How to minimize getting burnt by your ACID-ish database
A checklist of all the questions you should ask yourself when starting a project that involves database selection. You get no answers sadly, just the questions (you seriously thought you would get free and easy answers?)
Do I need atomicity?
How will I handle data/referential integrity? Do I want to trade-off integrity issues in productions for performance?
Which isolation level do I actually need? If I need to compromise, how much work do I need to put in to measure the side effects?
Am I OK with losing data and if so, how much?
Availability / latency
What are my availability requirements? What are the trade-offs between availability, latency and consistency?
At which point high latency translates into the system being de facto unavailable?
Performance / scalability
What are my 1–3 year performance requirements? Maximum theoretical performance requirements?
How well do I expect the system to scale (latency, performance, availability) with extra hardware?
What are all the different performance workloads I can expect (e.g. full table scans, partition scans, small writes, bulk writes, mixed reads & writes, PK index reads, secondary index reads, small joins, large joins, text/fuzzy searches, spatial functions, windowing functions)
Will my database schema evolve? (NO, there is NO such a thing as a “schema-less database”! Say that again and I’ll make you re-read this painful article)
Will I have many different pathways of accessing the data (e.g. joins)?
Do I expect to have a sophisticated query optimizer for complex queries or am I OK to write those by hand?
Data recovery & resilience
If something goes wrong do I need to recover point-in-time? If so, how quickly?
What am I prepared to endure when some of the hardware components fail?
If I decide to migrate to another DBMS later on, am I prepared to invest a lot of effort into re-writing the data access layer?
TCO & Risk
What am I prepared to pay upfront for design, development vs. ongoing?
What is my risk appetite?
Am I OK to fail and re-platform in the future if the milk turns sour?
Am I OK to get a lot of late night phone calls when something goes wrong?
Monitoring / Instrumentation
How complex are my queries? Do I expect to get detailed query performance analysis?
I’d probably like to get some built-in common-sense alerts. Or do I have to think about them on my own?
Do I have existing APM tools I’d like to integrate with?
The DBMS has at least basic security, right?
Do I have PCI-DSS or stricter compliance requirements? Do I expect out-of-the-box transparent encryption, HSM support, etc?
What about Multi-tenancy support?
Databases are in fact hard, often depressing, and at best they only return the data you chuck into them. If you are any sort of a back-end developer though, you should really bite the bullet and understand the basics of ACID and isolation models. If on the other hand, you consider yourself a DB expert you ought to keep an eye on products offering “tunable consistency” (still quite immature though) as well as NewSQL products (Google Spanner, Splice Machine, VoltDB, CockroachDB, Apache Phoenix, NuoDB…). They will gradually take a big chunk of the market from the old dinosaurs — mark my words.
Oh, and I almost forgot — a quick quiz. If you are in a microservices shop (they’re all the rage now, haven’t you heard) and do basic orchestration across services, what sort of isolation level do you get? You get nothing! You don’t even get A, C, I or D in ACID! I.e you’re leaking some data or have integrity issues, but that’s OK because you don’t measure exactly how much you’re leaking so that you can sleep at night. Sometimes ignorance is indeed the best policy. Forget all the body of knowledge that went into XA & two-phase commits because they have their own challenges and don’t fit into the neat microservices RESTful paradigm. Even better, forget this whole article, keep calm and code away.
Behavioral Analytics through Big Data Applications can be used to gain insights, analyze product behavior and provide better customer experience.
Did you know that PayPal’s product performance tracking platform processes over 10 billion messages per day making it one of the busiest systems in PayPal?
Data is king at PayPal. Accurate data allows for more informed and better decisions. PayPal customer data is a treasure trove and is a major competitive differentiator. Our belief is that every employee should be empowered with timely insights to make data-driven decisions. We are at a vantage point to make this a reality. To meet these increasingly ambitious time-to-insights demands, our behavioral analytics platform will need to be agile and friction-less through the data collection, data transformation and insights generation phases.
To enable this, we revamped our big data platform taking up to 9 billion events per day from a Spring based custom framework of PayPal to Squbs framework (based on Akka Streams).
Companies are moving from systems designed for data “at rest” in data warehouses and embracing the value of data “in motion”. Fast Data processing — high velocity data processing at near real time is the new trend. Akka Streams is a leading stream processing framework handling Fast Data.
Here are the Key Outcomes we observed after moving to a Simple, Fault-tolerant and Resilient architecture:
Availability — Improved by back-pressure driven architecture.
Reduction in processing time and reduced failure rate.
Enablement — Reduced Deployment time from 6 hours to 1.5 hour.
Reduced Operational expense on support and maintenance.
Now the obvious questions that come to our minds is:
How did we do that? How was PayPal able to achieve massive concurrency with Reactive Streams? How did we start thinking in Streams?
The Reactive data processing pipeline collects the user behavior data for web and mobile PayPal products and provide us with enriched data to derive actionable insights. In this collector, each of the HTTP socket connections is its own stream as defined by Akka HTTP. We still want to siphon all the data into the main enrichment stream. Since the connection stream can come and go as we create new connections and close the connections, the merge point has to allow for change of these streams that feed into the enrichment flow. This can be achieved by using a dynamic fan-in component called MergeHub. The enrichment stream just enriches the beacons and sends them to Kafka using the sink provided by the Reactive Kafka API.
Big Data Collector and Enricher Flows
But, there is a problem in this picture. Can you guess what that problem is?
The problem is the back-pressure. Kafka does re-balancing and re-partitioning from time to time. Now we have a completely back-pressured stream. So, what do we do when Kafka is unavailable? We can’t possibly back-pressure the internet traffic, right?
For this reason, we insert another component that breaks the back-pressure from the Kafka sink. We insert a buffer here. But since a buffer can cause you to go out-of-memory, we built a persistent buffer — a buffer that writes the elements to disk via memory-mapped files. We are using a technology called Chronicle Queue behind this buffer.
Optimized Big Data Collector and Enricher Flows
In the normal case, elements would just flow through the persistent buffer and we do not really worry about it at all. Nothing is really kept in that buffer. Once Kafka becomes unavailable, the persistent buffer will now keep the elements in local disk and only back-pressure when we reach a high-watermark. We should be able to survive any re-balancing or re-partitioning of Kafka given we have adequate storage. Once Kafka reports back being available, we’ll now push all those elements to Kafka and the stream keeps continuing without data loss.
PerpetualStream — Enrichment Flow code
You can see the components lining up very much reflecting the diagram we had previously drawn up. The only one part we did not mention is the map. This just converts the enriched beacons to a Kafka ProducerRecord.
FlowDefinition for Http Flow
Now the HTTP Flow, the HTTP part of the stream. This one gets materialized for every single connection. It looks up the enrich stream through its materialized value — guess what — that MergeHub.source actually materialized to a sink we can look up. Then it creates a normal HTTP flow which deserializes the beacon, pushes it to the enrichment flow, and at the same time creates an HTTP response responding back to the beacon generator. The key piece of this flow is this alsoTo(enrichStream) which pushes the beacons to the enrichment stream. This is in essence, a fan-out — implicitly.
When thinking about reliability, starting up and shutting down the app is usually overlooked. When app is shutting down, we should not pull the carpet under the streams and let it crash. Instead, the stream should be gracefully shutdown and all the messages should be drained. Use PerpetualStream and also Akka Killswitch to gracefully shutdown a stream and drain messages.
The riskier part of a stream (e.g., external communication) should be gat ed, with Retry, Circuit-breaker, or Timeout.
We faced a lot of technical and organizational challenges of converting a well-established team into this reactive mindset. But we believe that:
“Just because your Data is Big, your Systems need not be!”
PayPal Notebooks, powered by Jupyter: Enabling the next generation of data scientists at scale
PayPal is a data-driven company. Whether building fraud detection with advanced algorithms or customizing user experiences by leveraging clickstream analytics to customize user experiences, PayPal has relied heavily on data for all aspects of business. Naturally, with the explosion of cost-efficient big data technologies, the appetite for data has only grown. As our data grows, the ability of our data scientists and researchers to glean actionable, near real-time insights from that data gets even more challenging.
Do PayPal’s data scientists and researchers have access to all its data? Most analysts proficient in running complex SQL queries on relational data warehouses built on data stores like Oracle and Teradata are not comfortable writing Map Reduce or Spark code to read the behavioral data stored in Hadoop. They are also unlikely to write code to read from near real-time data pipelines, like those established with Kafka.
This is where PayPal Notebooks, bolstered by the power of Gimel, come in. Built on the popular and versatile open-source project Jupyter, PayPal Notebooks empower PayPal’s data community with fast, simple and easy access to large volumes of data collected across a variety of data stores.
Jupyter notebooks provide anyone a way to run live code in over 40 languages, include visualizations and rich marked up text all in a web application. Since the output of the code execution is maintained within the same notebook, it makes for a collaborative environment for data scientists and data engineers to work together.
This post will walk you through how the Data Platform team has turbo-charged Jupyter at PayPal.
A brief history
Before we jump into PayPal Notebooks, it is worth looking at where we are and how we got here. PayPal’s data scientists had started using Zeppelin several years ago, but they were using it with single host deployments or running it on shared nodes to make it a multi-user deployment.
The data platform team took the plunge with Jupyter in Q2 2017 and set it up as a platform. PayPal Notebooks started as an internal beta in Q3 2017 with around 50 users, who were mostly using Spark and PySpark. In February 2018, PayPal Notebooks became generally available to internal users and had around 100 users.
Today, PayPal Notebooks engages well over 1,300 users, a majority of which use SQL (both Hive and Teradata), Spark/PySpark and Python. Recently, we enabled the R kernel, which is drawing in many more analysts and data scientists.
From Jupyter to PayPal Notebooks at PayPal
Data Platform Driving Forces
As a platform team, we are driven by platform architectural principles like security, availability, scalability, reliability and observability. Every product we build is driven by these “-ilities” and is laser-focused on improving efficiency and productivity.
Within the data platform, we target 4 types of customers: data engineer, data analyst, data scientist and infrastructure operators. To improve productivity of these customers, we are constantly looking to find ways to reduce the time it takes them to go to market with their innovation.
With growth in the variety of data stores supported, the volume of data available and in customer demand, the data platform team created a series of Jupyter extensions called PPExtensions. These extensions add key features to Jupyter that, by abstracting complexities, simplifying data access, enabling self-service deployments and easing collaboration and knowledge reuse, spur innovation at PayPal.
PPMagics — A Collection of Jupyter Magics
%hive and %teradata: These magics allow you to easily query Hive and Teradata tables without needing to remember how to connect to the various Hive clusters or Teradata systems. In addition, they also provide the ability to directly load a csv file to a Hive or Teradata table. As you will read later, these magics also enable directly publishing the output to Tableau.
%hive magic%teradata magic
%csv: CSV is the data file format of choice among our data scientists. The %csv magic enables SQL commands on CSV files, so you can now run SQL on a file just like you would run SQL on a table. It supports all ANSI SQL functionality, so you can see the distribution and quality of your data in the CSV files without needing to load them into a table.
%run: One of the most common requests from our customers after they get proficient with Jupyter is the ability to run one notebook from another. This is usually so that they can modularize their work and have notebooks for specific purposes that can be maintained separately. %run is created to simply allow running notebooks from other notebooks. It allows the ability to pass parameters to the notebook at the time of running the notebook, as well as an option to run multiple notebooks in parallel.
%run magic with parallel execution
%run_pipeline: The %run_pipeline magic is similar to %run but has one key differentiator — unlike %run, %run_pipeline allows sharing state between the notebooks and thereby being able to pass data through the pipeline. Notebooks in the pipeline are run sequentially.
%run_pipeline magic with parameters
%sts: This magic allows you to easily connect to Apache Spark Thrift Server and run SQL commands. This magic, like %hive and %teradata, has options to publish directly to Tableau.
In the hope of democratizing data, PayPal needed to overcome several hurdles. One major hurdle arose from the fact that data at PayPal is stored in disparate data stores across multiple locations, making analysis across these federated stores a manual and painful process. Gimel fixed this by providing a single API to access any data in any data store in any location. To maximize adoption, we adopted SQL as our de-facto API language. For more info on Gimel, refer to our previous post.
At PayPal, by integrating Gimel with Jupyter, we provide in-notebook execution of any SQL against any data store, whether persistent (e.g. Elastic or HBase) or streaming (e.g. Kafka). This is possible through the %sts magic.
%presto: This magic is just like %hive or %teradata in that it enables our customers to access Presto directly without needing to remember configuration and connectivity parameters.
In addition to creating multiple magics, PPExtensions also support a list of other engaging features.
Native publishing to Tableau
An analyst’s workflow typically involves finding the data, mashing it up and creating an enriched dataset, and then publishing their insights to a dashboard to share it with management and other stakeholders. The %publish magic takes the result of a query, or even a plain dataframe and publishes it to Tableau directly.
Now, analysts can fetch data, process it and publish the outcome all in one flow and within a single product without needing to switch contexts.
Publish to TableauNotebook Version Control and Collaboration/Sharing through GitHub
As a naïve means to promote notebook sharing, Jupyter allows users to export native notebooks in both a notebook native format and in various 3rd party formats such as PDF or HTML. But by doing so, the customer loses the ability to implement version control and to later merge changes in a collaborative fashion.
To foster collaboration while ensuring integrity, we looked to GitHub. Today, our customers can select any notebook they’d like to share and push it to GitHub. The extension provides a link to the posted notebook which can then be shared among the team. Every time a notebook is pushed with the same name, a new version appears in GitHub.
Similarly, anyone call pull from GitHub by browsing the user directory and choosing a notebook of interest. This is a powerful way to not just maintain clear version history of all notebooks, but also to increase knowledge sharing and code reuse.
Sharing notebooks natively through Github
Repeatable Analysis through Scheduled Notebooks
Another common ask from data scientists and analysts is the request to schedule notebooks on a periodic basis. Whether the aim is to produce daily/weekly/monthly summary reports or to run model training, our PayPal Notebook scheduling extension enables a native integration with a popular open-source scheduler, Apache Airflow. In addition to providing the ability to schedule the notebook, there is functionality to run the notebook in the background which uses a home-grown smart credential management system which supports automatic, secure credential filling.
The scheduling extension also creates a new tab in the Jupyter UI where all the key schedule-related information is brought in from the Apache Airflow dashboard. Clicking through on the notebook job name will of course bring you to the Airflow dashboard for more granular information about the job.
Scheduling UIScheduling dashboardPPExtensions Open Sourced
At Jupytercon 2018, we announced that we have open sourced PPExtensions. You can install it with pip and you can check out the code on our GitHub repo.
PayPal has witnessed an impressive growth in number of customers and transaction processing volumes over the last decade. As we continue to innovate our technology stack and mobile experience, we are tackling several challenges. Scaling our user database (db) is one such challenge that is compounded in its complexity because of our monolithic codebase. The user database employs a traditional relational database at its core. As one can imagine, this database serves several critical business needs across the entire PayPal stack. So, scaling this database while keeping the existing business running is no small feat. This database serves multi-billion queries daily.
For us, the scaling requirements were evident from day one. Whatever solution we came up with had to meet the constraints of strong consistency, high availability, parity with existing business cases along with cost and operational efficiency. We considered multiple strategies to tackle this problem including redirection of reads to replicas and vertical scaling before we started on the path to sharding. There are two keys aspects in sharding, first involves design and making the application shard-ready and the second part involves the actual data movement into new shards. This articles focuses on the first aspect. In this article, we describe the key design choices and the technical challenges associated with the process. We had earlier published an article with learnings around execution. We hope our insights would be helpful to other organizations in their scaling journey.
Sharding is commonly used to scale databases. In this technique, data is partitioned into multiple smaller databases, known as shards. Each shard holds a subset of the database and can be accessed by its shard key. Sharding offers a scalable solution while reducing overall hardware costs. Increase in transaction volume is easily handled by adding commodity servers. As such, the cost grows linearly with the increase in traffic volume.
Figure 1. Example of Sharding based on shard key ranges
The above figure represents a simple sharding scheme where rows are partitioned based on the shard key ranges.
Shard Key Selection
Shard key (or partition key) is a primary key which is used to route the queries to appropriate shards. Picking the right shard key is a complex decision. For instance, in the above diagram buyer id is used as the shard key. Here are several factors that weigh into this decision:
What are the dominant entities in the data? And, specifically, are there any obvious clustering patterns?
How much of a schema redesign is needed based on the newly picked shard key?
Does the new shard key support the existing query patterns and business use cases?
Does it provide flexibility to support future use cases?
How it impacts query performance?
As we examined our data, we observed that the data was clustered around two key entities. The majority of the queries also formed around the same entities. This helped us to establish two shard keys against which the User database is sharded. These two keys are independent of each other. The key insight here is to spend time studying the data and existing application logic up front, which in turn will help in making the right design decisions.
Data Distribution Scheme
A Data distribution scheme determines how the data is partitioned into multiple shards. For instance, in the above figure a simple range-based scheme is used to partition the data in different databases. There are many data distribution schemes available, each one with its own merits and drawbacks. Before choosing the scheme, it is important to establish the key requirements. Here are the requirements we considered:
The solution must be scalable to support future growth.
Support availability goals.
It must be backward compatible.
Facilitate easy removal and addition of shard
Avoidance of hot spots
Minimal data rebalancing
Support fast failover
Avoid scatter-gather approach for latency sensitive queries
Figure 2. The data Distribution Scheme (Courtesy Kenneth Kang)
We employ an algorithm based technique, where the location of a shard is determined by applying a predetermined algorithm, a hashing function in our case. As shown in figure 2, instead of a directly mapping a shard key to a physical host, shard keys are mapped to a unique bucket. Each bucket is then mapped to a logical host which is then mapped to a physical host. Buckets provide an important abstraction layer, it facilitates addition and deletion of shards and make the entire design more scalable. When a new database is added, it simply involves updating the mapping of buckets (scuttles) to logical shards (assuming the new db host has the necessary data copied).
Transformations to Enable Sharding
A prerequisite to sharding is ensuring the application accessing the database is shard-ready, meaning that all access is via the shard key. The process of making an application shard-ready involves a series of transformations to the underlying data model and interfaces. One has to ensure that following specifications are met by both the data model and the application layer.
Each table in a sharded database must have a shard key column.
The id generation process must guarantee uniqueness across all the shards.
Tables might need denormalization or other forms of transformations to support 1 and 2 below.
Join across tables must be achieved using the same shard key. Since, the records are partitioned based on a shard key, different shard keys may reside in different databases.
No write should span across multiple shards.
The read interfaces must be shard key based.
There are unique design challenges to address for each of the above steps. We will cover them in the rest of the article.
Application Layer Design Considerations
“A transaction is a collection of read/write operations succeeding only if all contained operations succeed. Inherently a transaction is characterized by four properties (commonly referred as ACID): Atomicity, Consistency, Isolation and Durability”³. The changes required at the application layer mostly stem from how transactions are designed in an application. At a very basic level each query could be considered a separate transaction (auto-commit) or could involve multiple queries involving several tables. In a sharded environment, distribution of data across databases pose challenges to the existing transactions. Before dwelling into how these challenges are addressed, lets first establish what is a transaction boundary or how do we determine what a transaction entails? To help answer this question, we first define a transactional entity. Transactional entity is grouping of tables that participate in a transaction. Transactional entities have following properties:
Tables belonging to the same entity are sharded based on the same shard key.
For a given entity, all tables could be joined or these tables could be part of the same transaction with the same shard key.
Consider an online store where the entities involved are customer, order, inventory, phone, address, email, shipment, and so on. Assuming customer-id to be the shard key, a coarse grain entity would be customer; fine grain entities would be phone, address, etc. Each type of the entity is a possible candidate for a transactional entity. Following are the pros and cons of choosing a fine grain vs coarse grain entity as a transactional entity:
Fine grain entity as a transactional entity provides more flexibility and allows us to create individual shards for each. However, on the flip side, it may lead to breaking transactions and queries impacting consistency and performance. In a large scale system, it also adds maintenance overhead.
Course grain entities provide better performance and consistency as they allow joins across different tables and entities. Transactions spanning beyond a single entity need not be broken. For our use case, we chose this option.
Join Across Shards
A common example where application layer joins are required are “IN” clause queries. For example, select x where customer-id in (1,2,3,…). As individual shard keys can reside on different databases, join across shards is not possible. So the only alternative is to separate queries and perform joins in the application code. If it turns out that an application requires lot of joins to serve data, it most likely is symptomatic of either a sub-optimal shard key or that the data model requires tuning.
Handling Failures - In case of a failure to retrieve data from one or more shard, one can choose to either fail the entire query or return partial data. The former impacts availability but provides most accurate data. The latter approach compromises on accuracy to favor availability.
Write Across Shards
Since writing across shards is not a possibility, we had to break any transaction that spanned across different shards. This is a relatively complex task as we have to account for impact on consistency, business rule violations and possible deadlock scenarios. Here are some common examples where breaking a transaction would be necessary.
Figure 3. Two records belonging to the same table are placed in different shards.
Writes involving different shard keys but the same tables as shown in figure figure 3. In this scenario, a transaction contains both shipping address for a buyer and warehouse address for a seller. If these addresses are placed on different shards, this can lead to write across shard scenario.
Write involves different entities sharded by different shard keys as shown in figure 4. In this scenario a transaction encompasses both Inventory entity and Order entity. These two entities have different shard keys and reside on different databases after sharding. This is a violation of the transactional entity properties and requires refactoring of the transaction.
Figure 4. Two different entities sharded by different shard keys
Handling Failures - When a transaction is broken into multiple transactions, we have to account for the possibility that one of the transactions could fail. In such scenario we may have only partial data in the database affecting consistency. The possible solutions are,
All writes must be idempotent so that subsequent retries are successful.
Asynchronously fix the tables with inconsistent data.
Leave the unreferenced rows dangling and run a reconciliation job periodically.
Loads by secondary key (query with no shard key) can’t be supported in a sharded environment. A system can have these kind of queries to support various business use cases. For instance, load-by-email-address scenario where a customer agent tries to load customer information based on the customer’s email address. If customer-id is the shard key, this load is a secondary key based load.
To address such queries we use a mapping table referred as global secondary index (GSI). GSI provides mapping from secondary key to shard key. Every secondary key based load first obtains the shard key from the mapping table to perform the actual load. Since the lookup is standard and repeated for each entity, investing in a GSI framework to manage these indexes is a good approach. In rest of this section, we discuss the design principles and failure handling mechanisms associated with GSI framework.
Figure 5. GSI table showing a mapping of secondary key to shard key
Application should be completely agnostic of the secondary index tables.
Addition of new secondary indexes or updates to the existing secondary indexes should not require changes in the application.
Failure handling and consistency mechanism for GSI should be isolated to the GSI framework level.
GSI framework should contain a metadata catalog/table defining important information related to the GSI. Specifically, it should contain primary table name, GSI table name, secondary key, shard key
Handling Failures and Maintaining Consistency between Primary and GSI Table
It is critical that the data between primary and GSI table is consistent. Data inconsistency can manifest if a record is successfully inserted in only one of the two tables. In such scenarios, order of the insert operations determine how failures are handled.
Option 1- A record is inserted in the primary table first and insertion in GSI table fails
Assuming failure to insert in GSI table doesn’t cause failure in the overall operation, this approach favors availability over consistency.
Such failures could be fixed asynchronously in the GSI table. If the update to index table is delayed more than the cache expiration time, we can get data inconsistency related issues (figure 6).
This implementation is relatively simple to implement and there is no hard dependency on the secondary table in the steady state.
This method could also be used if updates are required in the GSI table.
Figure 6. Lag (t3-t2) < Cache expiration time
Option 2 - A record is inserted in the GSI table first and write to the primary table fails.
If the write to primary table fails, it leaves a dangling row in the GSI table. As long as writes to the mapping table are idempotent, this method offers consistency over availability.
This technique works for insert operations only. If GSI tables need updates this technique doesn’t work.
There needs to be a reconcilation job that periodically cleans the dangling rows.
In order to support transaction integrity, we chose to write to mapping table before writing to the primary table.
Managing Performance in the Presence of Multiple Shard Keys
A secondary key could be associated with multiple shard keys. For example, a home phone number could be associated with all family members yielding many shard keys (customer id) for the same secondary key. In such cases, it is important to understand the use case. If the use case is for batch processing and the number of expected shard keys is huge, redirecting queries to a second tier desharded database is a good option. However, if the query is meant for live use cases where latency impacts the end user, it is best to parallelize the queries.
Challenges Associated with Data Model Transformation
Maintenance of two versions of the tables
The biggest challenge in transforming a high-volume data model is that live migrations can be extremely complex. In addition to that, if there are strong dependencies with downstream customers (reporting teams, business analytics teams, etc.), we need to ensure both the versions of tables are maintained until the teams are able to consume the newer version. In our implementation, we used dual-write strategy where writes are made to two different versions of the table in the application layer. Failures are managed asynchronously.
Complex Transformation Challenges
Denormalization of tables that take heavy traffic is a complex transformation which requires careful designing and execution. In the next section, we describe some of the challenges and the ways we have successfully addressed them in our implementation.
Implementation with mixed mode in consideration - Our feature rollouts are throttled to manage risks. This translates in some servers running older and others running newer version of the software referred to as mixed mode scenario. Supporting this scenario adds some design complexity, especially when a feature is delivering changes related to denormalization of the tables or any other form of the data model change.
Figure 7. Implications during mixed mode scenario and possible rollback
Failures due to different cache objects - Consider a scenario where a new feature is rolled out. The feature introduces reading and writing from new tables, while the existing tables are still taking traffic (step 2 in figure 4). Now imagine that an insert comes via old code path followed by a read that comes via new code path. The read in this scenario would fail, if the cache object for the old and the new model is different and data is not propagated in the replica database. We address such scenarios by introducing cache objects for both the versions before rolling out the new feature. Assuming, denormalization doesn’t involve discontinuation of columns, deriving a cache object from one version to another is simple. Steps involved in releasing a denormalization feature:
Introduce cache object mapping from the old to the new version. Each write operation populates both kinds of objects in the cache.
The new version of code also writes both versions of the cache objects. The new version continues to write to the old version of the tables until the feature stabilizes. Since, data model changes are done prior to data partitioning into multiple shards, dual writes are done in the same transaction. Dual writes to cache and old tables are discontinued after few weeks.
There must be a gap between step 1 and 2 to ensure new cache object is available for the new version. This depends upon the life span of the cache object.
Monitoring and Validation
Changes of this magnitude require diligence both during data migration and during feature rollouts. There are multiple dimensions like latency, correctness and consistency that are monitored. Here are some strategies we have in place to ensure correctness,
Data consistency checks -for data migration projects perform multiple levels of validations. We use ora-hash validation, custom scripts and validation for each copy of the data. To expose code related issues, these checks are repeated after feature is live for a day or week.
Logging- Data model changes can introduce subtle bugs in the code. In our experience, monitoring pattern deviation is very useful. Consider a scenario where a load is performed before inserting a new record in a table. This query would return no data in such scenarios (valid). Capturing this metric can help to identify situations where there are issues due to new changes in the data model. Any significant deviation in the frequency is indicative of a bug.
Special tools - the process of uncovering bugs should be automated as far as possible. For instance, we developed a tool that would compare different metrics of an application between two time instances. This tool is generic and works for any application. Post ramp, this tool is used for both the application where the changes are introduced as well as for important clients of the application. Similarly for data consistency validation, we created a daemon that verifies if data between two data stores is consistent.
This article is a detailed technical exercise in exploring how we sharded a highly business critical application DB while keeping the existing business running. As an analogy, this is akin to changing the engines on a flying plane. We believe that these lessons are applicable for any organization undergoing platform scaling and will allow them to anticipate some common problems that they will encounter in their journey. We welcome any feedback.
A Tokenization Tidal Wave is Coming. Are You Ready?
When PayPal first began, shopping online was in its infancy. The value proposition of security and convenience, coupled with the ability to sell to people around the world, enabled PayPal to become one of the most trusted brands in online shopping.
With the rise of mobile, the commerce landscape is quickly evolving — getting better, faster and smarter. Mobile and connected devices are creating unique monetization models, engaging customer experiences and new data flows — enabling businesses and consumers to interact in new ways. This however, creates new challenges that need to be solved in order to continue to enable seamless and secure commerce.
The underlying reason that still connects consumers and businesses is payments — it’s what motivates businesses to create amazing experiences for consumers to enjoy. Without a seamless and secure payment platform consumers may not feel comfortable transacting on a site, no matter how amazing the experience is.
As with any new change, there is a need for existing infrastructure to make way for new technology. For decades, the card number (also known as PAN) has been used for commerce. Because of evolving payment experiences, potential security risks and the continued sophistication of fraudsters — we need to evolve as an industry and stay ahead to continue to deliver a convenient and safe commerce experience.
For example, if a consumer’s card data is compromised, it can have negative consequences for both the consumer and the business. Often, a consumer has to go through the hassle of ordering a new card, waiting for the card to arrive and then updating their card information across all the sites where they shop. Additionally, consumers who have recurring billing set up may miss payments, or incur additional fees due to missed payments.
For businesses, if a customer’s card is blocked for reasons like fraud, they have to decline the transaction, resulting in a bad experience for their customer and lost sales. Even if the business had implemented the best risk and fraud tools, a card compromise at another merchant could impact their business.
Tokenization, a concept introduced by the networks as part of the EMVCO framework, in partnership with the larger ecosystem, helps address many of these issues. And we see a great future there. PayPal provides a simple API interface that allows businesses to leverage tokenization as one of the many benefits of integrating into PayPal’s payment platform.
In the most simplistic terms, tokenization creates an alternate card number for your actual card number (or PAN as referenced above) which is shared with merchants and thus limits access to a customers’ financial information. Coupled with other concepts like step-up authentication, contextual data of the transaction and cryptography, tokenization makes the payment experience more secure. As we are seeing in the in-store world, replacement of the traditional magnetic swipe with EMV chip cards has reduced fraud, but it is also driving the fraudsters to potentially target consumers in other ways. Tokenization enables the ecosystem to be more secure, create richer experiences and more efficient payment processing.
Tokenization has the potential to be a win-win for all players.
For consumers, it creates more engaging experiences, where consumers can see the actual credit card art instead of deciding which card to use based on the last four numbers or nicknames of cards. It also adds another layer of protection so consumers know that there are multiple layers in place to protect their information. Finally, if something happens to a consumer’s card, or if it expires, banks can seamlessly replace the token across all relevant businesses without the consumer needing to update their card.
For businesses, they may see higher approval rates as cards are refreshed as part of the life cycle events and banks are able to make better risk decisions due to contextual data.
For the payments industry and banks specifically, it enables improved presentment of their brand to create familiarity for the customer. It also helps manage losses and improves the approval rate of transactions due to better visibility into transactions.
As with any new technology, transition to and adoption of Tokenization is complex. At PayPal and Braintree, we can tokenize cards to help keep payment information accurate, show card art to consumers to create better experiences and improve the transaction approval rates. We are excited about the security that tokenization provides and believe it will ultimately be as ubiquitous as PAN. The PAN is still relevant as it helps with many critical payment operations like risk management, which has been built over the last several decades.
Of course, this standardization comes with partnership. We talk with our partners to understand systems and processes that may have been built over the years to optimize to their needs. Let us know how we can help you with yours. If you want to learn more or share ideas, we would love to listen: BraintreeExtend@getbraintree.com.
Major web browser providers have begun the process to reduce and then remove trust for Symantec root certificates. These changes will impact the Symantec root certificates currently used to sign PayPal’s TLS certificates for www.paypal.com and www.paypalobjects.com. We are working to update the certificates for these endpoints using trusted DigiCert root certificates. PayPal intends to implement these changes before browser providers remove trust for Symantec root certificates. To ensure there is no impact to your PayPal services, please make sure that you are using the most recent, non-beta web browser version available to access www.paypal.com or www.paypalobjects.com.
See the following links for more information about changes that are happening in the browsers:
Many women are tasked with leading large-scale, high-impact technical initiatives. They struggle with the organizational and technical complexity of these initiatives. Coordinating people, teams, and priorities while maintaining work-life balance is a challenge. Some shy away from the opportunity due to the above challenges.
The focus of this article is to empower these women by providing strategies that will enable them to lead with confidence.
There are many wins when one embraces the opportunity. One’s overall confidence multiplies as challenges are successfully handled. My goal is to instill confidence in participants so they can make success a habit and advance in their careers.
1. Are you stepping up to lead complex initiatives?
Most of us lean towards opportunities where we have the comfort of domain knowledge and the opportunity to work with people whom we are familiar with. We tend to shy away from large-scale initiatives that are technically challenging, involve multiple teams and require us to own the end to end responsibility.
Originally, I was one of those people who walked away from many such opportunities. At one point, I thought through the reasons why I was not stepping up. Some of those reasons were the reluctance to take responsibility for others, uncertainty about handling continuous production issues and the necessity of 24*7 availability as a working mother. Added to this was the intimidation of having to deal with brilliant people who “knew it all”. Eventually, I stepped up and courageously took on a few opportunities.
These above-mentioned challenges exist in different forms. I started looking at them through a different lens. What is the big picture? What are we trying to accomplish? How do we enable others while we try to win? How do we collaborate? How do you spread accountability across the team? The following sections describe some of the strategies for dealing with these challenges.
2. What are we trying to accomplish?
Many times we get bogged down by mundane day-to-day responsibilities. Having clarity on what we are trying to accomplish is important. Maintaining the vision of the end goal is essential to the success of the project. As the project evolves, the goals and vision may change and expand. We should embrace flexibility in adapting to these changes.
3. Who are our customers?
We should have a deep understanding of who our customers are, both internally and externally. We need to ask, “Will this initiative adversely impact their current experience or will it be seamless to them?”. It is important to be customer focused and proactively partner with them.
4. How do we keep all stakeholders informed?
Consistent communication is key to success. Each initiative usually has stakeholders from engineering, product, program, and leadership. These days most of the teams are globally spread out. How do you keep all of these stakeholders in the loop?
Here are some things to consider:
How often do the different stakeholders need progress updates?
How do we highlight initiative wins?
Should there be a constant flow of information?
How do we communicate decisions made in product and technical design?
Having an established communication rhythm is important for success.
5. Who are your champions for this initiative?
When taking responsibility for a highly complex and critical initiative, you should also be aware of the organization structure and all those invested in the overall success. Work with invested parties on becoming your sponsors and keep them updated on risks and challenges. Projects of this scale thrive with the support of leadership and stakeholders.
6. How do we win as a single team?
Multiple teams are involved in complex initiatives. How do we win as a team? Strong partnership and collaboration with all those involved are critical to the success of projects of this scale.
7. What are the potential bottlenecks?
There are numerous things that can impact the success of an initiative. Perform due diligence to identify potential risks, both technical and non-technical, and come up with a plan to mitigate them. Some of the technical challenges include capacity, scalability, and availability of the infrastructure. Some of the non-technical risks include key folks leaving, personal emergencies, legal approvals, etc.
8. How do we handle setbacks?
Setbacks are part of large-scale initiatives. In the event of a setback, leaders must step forward and reinforce the morale of the team. It is important to celebrate every win — big or small. It is also important to embrace failures as an opportunity to learn.
9. How do we execute?
The key is to execute well every day and embrace a disciplined approach. On the engineering side, this includes clarity in design, development, test cases, continuous integration, release plan, production monitoring, and insights. It is important to consider the security, availability, scalability, reliability, maintainability, extensibility, and performance aspects of the infrastructure. We should invest in quality, monitoring, and leverage product and technical insights to understand the system behavior.
10. Can you develop a passion for what you do?
It is important to enjoy what you do while you try to balance the many things in your life. When you bring passion to the table, it is contagious. Your passion will be a catalyst for igniting passion in others and help the team rally to success.
I want to encourage women to lead and drive strategic initiatives with confidence. The main takeaways are:
Do not shy away from opportunities
Establish clarity of vision
Focus on your customer
Effective communication is key
Invest in partnership and collaboration
Understand the potential risks to success and develop plans to mitigate those risks
Overcome temporary setbacks
Strategic and disciplined execution wins
Bring passion to the table
Srilatha has 17+ years of technical experience and 7+ years of leadership experience working with large e-commerce companies and small start-ups. She has successfully delivered several end-to-end initiatives playing different roles — lead for multiple domains and technical manager responsible for highly critical initiatives. These include high-volume systems where she built teams and mentored both engineers and professionals. One of the critical projects she worked on in PayPal has been published as a blog post here.
To know more about Srilatha’s work do check out her profile here or on LinkedIn!
We read a lot about scaling applications and databases in systems design. Our team went through a similar journey in scaling a high traffic database at PayPal.
The primary purpose of this article is about sharing the learnings in building a system to scale with minimum downtime and manageable infrastructure expense.
To handle the growth in traffic, we initially did vertical scaling by adding more compute and memory resources. The volume of requests kept continuously growing over the years and made it harder to scale the infrastructure vertically without significant downtime. The cost of infrastructure for vertical scaling to keep up with traffic growth was also very high.
Later we introduced Read Only(RO) replicas as another attempt at scaling. This solved some of the immediate concerns but introduced other complexities. Many of our use cases have a need to read data immediately after it is written to Source of Record(SOR) aka “read your own writes”. As the reads are being served by the RO’s and there was latency in replicating the data from SOR to RO, we introduced caching to solve the “read your own writes” use case. Caching made the code extremely complex and hard to maintain.
We took on a journey to horizontally scale the database and this involved work at different layers in the technical stack. We will walk through some of the key constraints, design, execution and learnings.
Any design to solve the data scaling had to satisfy some basic constraints:
All existing use-cases should be supported.
There should be minimal changes to application layer. The design should keep the application layer seamless from scaling logic.
There should be minimal rebalancing of data, an ability to fast-failover and set the foundation for geo-distribution.
The design should not impact current SLA’s including latency and other performance metrics.
The changes should be limited to server side since there are many clients with various technology stacks and programming languages.
The constraints can be summarized into some key principles the design had to adhere to: Availability, Scalability, Performance, Data Integrity, Simplicity, Flexibility, Manageability and Maintainability.
To address traffic growth with linear growth in infrastructure, we have to use horizontal scaling. Horizontal scale is about adding more machines to the pool of resources. In the database world, horizontal scaling is often based on partitioning of the data i.e. each node contains only part of the data.
The approach we took is to do horizontal scaling by sharding the data across multiple physical databases. The sharding logic is implemented as a multi-level architecture. The shard key is hashed and mapped into K buckets. A bucket is mapped to one of M logical shards using a database table. A logical shard is mapped to one of N physical shards. The multi-level architecture with indirection enables future additions to physical infrastructure with minimal disruption to ongoing transactions.
This is one the first steps in the design. Choosing the shard key involves understanding the current usage pattern and future usage of the system.
Global Secondary Index(GSI)
This is for handling use cases where clients do not have the shard key. GSI does the reverse look up where we can look up the shard key based on some Id.
Logical and Physical Sharding
The data is divided into K buckets. This gives us the capability to have K logical shards. Shard id is calculated as a hash. Shard id = (Hash on shard key) % K
The sequence numbers in a non-sharded environment were obtained from the SOR databases. In a sharded environment, if the sequence number for the ID is obtained from the database sequence, it can result in ID collisions. We used a global ID Service to guarantee a unique identifier.
We should keep application logic simple and transparent from sharding logic. We should always keep end-state in mind.
We need to account for mixed mode. Mixed mode means some of the machines have a current version(N) of code and some of the machines have newer version of code(N+1). This happens as new code is being rolled out to site.
We should not expose internal ids to clients. In our case, the IDs were being stored in multiple systems both upstream and downstream. Changing to 64-bit Global ID made it very hard to identify all the places where these IDs were being persisted.
We should not tightly couple domain data model with downstream enterprise data systems.
We should explore long-term approach rather than short-term workaround.
We should have a detailed and optimized wire-on strategy.
We should have a way to test infrastructure changes in production before wiring on features. It is important to understand the current topology and figure out a way to test access to new pool to figure out any DNS typo issues or network connection issues ahead.
We should make sure that we test for load and performance for both wire-on and wire-off path.
The foundation for execution is understanding the ecosystem including customers, complexities, dependencies, use-cases and potential impact due to failures. Many teams worked together to enable this effort.
We have to make the data partition ready. The work on the application includes de-normalizing tables, breaking transactions and joins, preparing the sql for sharding, creating the global secondary indexes and migration scripts for data.
The work on logical sharding includes auto-discovering shard key, SQL rewriting including shard key injection, routing based on shard key and providing visibility into violations. Application only sees a single target and shards are transparent to the application.
The work on database includes adding the shard key on all tables, data migration, data integrity and ensuring all the databases and infrastructure are in place. A significant portion of the work is to automate data movement across shards.
The work also includes ensuring data flows between upstream and downstream systems. This includes coordinating with downstream partners to ensure all the schema changes are completed with no impact to downstream processing.
There were incremental releases throughout the two years and a lot of partnership with various teams including Release engineering and Site operations.
This was an intensive project with a lot of complex moving parts and processes. This section is about the key execution learnings we got from delivering this critical successful project.
Projects of this scale cannot happen without the leadership vision, sponsorship and support.
It is important to have a small core team. Having architecture huddles is important for closure on design decisions. Decision making authority should be clear to avoid any ambiguity.
Partners and Collaboration
Strong collaboration with all dependent teams is critical for the success of a project of this scale.
Consistent communication is key for success. Having a communication rhythm is important. Having clarity on end state, timeline projection, constantly course correcting is important.
Build on Wins
Momentum and early first wins are important. For a large schema consisting of multiple entities, we sharded an entity to start with. It helped us build the confidence to take sharding forward for the rest of the entities.
Embrace failures and learn from them.
Have a motivated team that believes and is passionate about the initiative. It is important to celebrate every win — big or small.
Invest in Quality and Monitoring so you can release with confidence. Having a risk mitigation plan helps identify potential points of failure.
We need to make sure that the customer is not impacted. Doing single server testing and slow ramp ensured that the customer was not impacted.
Scaling a large scale legacy system has been a very exciting and collaborative journey with a lot of learnings. This article deconstructs the design of the solution, the steps and the support required for successful execution of one such undertaking at PayPal.
NameNode Analytics: PayPal’s Big Data GuardianNameNode Analytics logo contributed by Christina Westbrook.
It is no secret that the volume of data is exploding and the challenge of properly storing and maintaining that data has not gone away.
One primary solution for storing and managing a vast volume of data is HDFS, or the Hadoop Distributed File System. This central component is responsible for maintaining the data of the Hadoop platform. Once at scale however, it can be very difficult to get a breakdown of what has been stored into HDFS.
In most cases, users of large multi-tenant instances of HDFS rely on email broadcasts, alerting systems, or command line utilities to scan directories and to better understand their own datasets in terms of capacity. As a result of not having efficient means to get a quick view and breakdown of usage by user and directories, administrators of said large HDFS instances will also struggle to keep certain file counts or capacities within stable bounds if there is no one actively maintaining some sort of retention policy.
(This is a graph of 1 years worth of file growth on one of our largest production Hadoop clusters — culminating in 100M+ new files in 2017. ~400M is the current NameNode scalability limit.)
One major issue we have seen with large HDFS instances is the difficulty in performing large and complicated scans of the entire file system.
There are several architectures that others deploy to do this. The most commonly deployed one is to parse the FsImage, a binary file that represents all the metadata stored in the HDFS instance. Everything needed to perform this parsing, namely the OfflineImageViewer component, comes standard with any flavor of Apache HDFS. This has been the same architecture deployed at PayPal ever since Hadoop was first brought in. We would then ship the processed FsImage to Kibana or ElasticSearch and from there generate nightly usage reports and email them out. While it worked just fine; in most cases context was lost or users would check their reports way too late.
Processing the FsImage also has its own issue of actually losing certain metadata attributes because the nature of parsing the image into plaintext removes some metadata along the way. Examples of lost metadata include things like extended attributes, delegation tokens, and storage policies.
(Architecture graph of processing an FsImage from a large (300M+ files) HDFS cluster. 3 minutes for edit log tailing. 60 minutes to produce and ship image. 30 minutes to parse and ship image once more.)
As shown above, the act of fetching and parsing the FsImage can be a long process. I have personally clocked an end-to-end processing time of anywhere between 1.5 to 2 hours to generate a nightly report. Keep in mind it is unrealistic to publish a report every couple of hours using this architecture because of its reliance on the active HDFS cluster. On a large HDFS instance, an FsImage is typically produced every 3 to 4 hours. Even if an FsImage were produced every hour, we risk overwhelming the network when processing it because transferring the FsImage across hosts is a major network event that affects NameNode performance to a measurable extent.
As part of the Hadoop Platform team at PayPal, we looked externally for a long time but were not satisfied with solutions we found on the market. Part of our difficulty with finding satisfactory current market solutions was that all of them were deeply tied to that previous architecture, as pictured above. Thus, they suffered from long processing times. We had a strong desire to find a solution nonetheless because data growth spikes were happening in PayPal within as small a window as a single day.
Thus we looked instead internally into creating a new, faster, architecture; and that’s how NameNode Analytics was born.
(Architecture graph of NNA. Stays up-to-date tailing EditLog from JournalNodes; which can handle many readers. Only needs to talk to cluster NameNode once for bootstrap. Only latency is the ~3 minutes of EditLog tailing.)
Traditionally, it would be required to pull the FsImage out of the active HDFS cluster to perform this processing. However, through my years of work and understanding of HDFS architecture, I discovered that by utilizing an isolated NameNode, just for the purpose of getting up-to-the-minute updates to metadata, we were able to create a system that is able to generate vastly more superior reports for end users and targeted directories. All without having to rely on critical pieces of the active HDFS cluster.
The first trick was to keep our isolated NameNode up-to-date through JournalNode EditLog tailing; the same process that keeps the Standby NameNode of the active HDFS cluster up-to-date. The last trick was to provide a query engine on top that could facilitate the fast traversal of the entire in-memory metadata set.
Through our efforts we found we could now generate more metadata enriched reports every 30 minutes on our largest HDFS installations, and perform near-real-time queries every 3 minutes or as fast as EditLog tailing would allow; something we could not even achieve before without severely impacting HDFS and Hadoop job performance.
Perhaps the greatest result from utilizing NNA has been the ability to offload users’ large metadata scans (which typically utilize expensive calls and can affect job performance) away from the Active NameNode and instead onto NNA.
In short, NNA has sped up our reporting intervals, offloaded expensive metadata scans, opened up an interactive query API for HDFS, and overall provided a better HDFS experience for end-users and administrators. Some example screenshots / queries are below:
Screenshots of the main NNA Status Page:
(The top half of the main NNA status page. Showcasing a breakdown of files and directories and offering a way (search bar in the top right) to breakdown those numbers by user.)(The bottom half of the main NNA status page. Shows main breakdown of tiny, small, medium, large, old files and directories with quotas.)
Histogram breakdown of File Types on the cluster:
(Near real time query showing the breakdown of different type of files on the filesystem. Y-axis is number of files; X-axis is the file type.)
Histogram breakdown of File Replication Factors on the cluster:
(Near real time query showing the breakdown of different levels of file replication factors on the filesystem. Y-axis is number of files; X-axis is the replication factor buckets.)
Histogram breakdown of File Storage Policies on the cluster:
(Near real time query showing the breakdown of different storage policies of files on the filesystem. Y-axis is number of files; X-axis is the storage policies. )
Many many more types of queries, histograms, summations, and searches are possible. Check documentation at the bottom for more info.
Since this information can be obtained with relative ease, we, internally, even trend it in a time-series database and are now able to see exactly how users are behaving. If you are curious about all the different things and ways you can query about HDFS using NameNode Analytics I highly encourage you to watch my talk or read my slides available at the DataWorks Summit 2018 session page: https://dataworkssummit.com/san-jose-2018/session/namenode-analytics-scouting-the-hdfs-metadata/.
There are many things that NNA can do that would be too long to list here. We have several stories where having this level of insight that NNA provides has saved us at PayPal. I highly encourage everyone to read our “Story Time!” slides! I will briefly summarize two of the stories below.
One major and scary incident was when we reached the peak scalability limit of one of our NameNodes. Recall earlier that I said 400M+ files was the NameNode peak before Java Garbage Collection starts to thrash the NameNode.
With NNA by our side we were able to target directories for removal quickly — and we didn’t even have to communicate with the active HDFS cluster to find them. Here is what the event looked like, with file counts on the top, and Java Garbage Collection times below:
(File count (red) and block count (orange) of our largest production Hadoop cluster. I know its hard to read but at peak we had 400M files and 425M blocks.)(Java GC during this event; NameNode operations were grinding to a halt. This incident was so severe we risked a multi-hour HDFS outage if this was not addressed while the NameNodes were still alive and could process delete operations.)
Another event was when we were hitting an HDFS performance bug and had to change storage policies on several thousand directories in order to remediate. We had no idea or list of which directories had the bug and running a “recursive list status and grep” operation across the entire file system would have taken days. But with NNA we were able to get the list in about 2 minutes and get on to targeting exactly the directories we needed to in order to restore RPC performance in about an hour.
(The difference in client response times before and after we got away from the storage policy bug.)
Thank you for taking the time to have read this far!
There is a lot to cover when it comes to talking about NNA. Please look out for more news / talks from me in the near future!
If you operate a large HDFS installation and are still parsing FsImages then NNA should be a tool in your arsenal.
Plamen has been in the Hadoop field for 7 years at the time of writing. He has been a Senior Hadoop Engineer at PayPal since April 19th 2016 and has been working on HDFS and HDFS-related projects for nearly those entire 7 years. He is an Apache Hadoop contributor, most notably as part of the team that introduced truncate (https://issues.apache.org/jira/browse/HDFS-3107), part of the team at WANdisco that worked on making HDFS a unified geo-graphically distributed filesystem (https://patents.google.com/patent/US20150278244), and part of the team working on distributing the NameNode across HBase RegionServers (https://github.com/GiraffaFS/giraffa). He loves to talk HDFS, beer, crypto, and Melee. GitHub: zero45, Apache: zero45