LinkedIn: Creating a Low Latency Change Data Capture System with Databus

This is a guest post by Siddharth Anand, a senior member of LinkedIn's Distributed Data Systems team.

Over the past 3 years, I've had the good fortune to work with many emerging NoSQL products in the context of  supporting the needs of a high-traffic, customer facing web site.

In 2010, I helped Netflix to successfully transition its web scale use-cases from Oracle to SimpleDB, AWS' hosted database service. On completion of that migration, we started a second migration, this time from SimpleDB to Cassandra. The first transition was key to our move from our own data center to AWS' cloud. The second was key to our expansion from one AWS Region to multiple geographically-distributed Regions -- today Netflix serves traffic out of two AWS Regions, one in Virginia, the other in Ireland (F1). Both of these transitions have been successful, but have involved integration pain points such as the creation of database replication technology.

In December 2011, I moved to LinkedIn's Distributed Data Systems (DDS) team. DDS develops data infrastructure, including but not limited to, NoSQL databases and data replication systems. LinkedIn, no stranger to building and open-sourcing innovative projects, is doubling down on NoSQL to accelerate its business -- DDS is developing a new NoSQL database called Espresso (R1), a topic for a future post.

Having observed two high-traffic web companies solve similar problems, I cannot help but notice a set of wheel-reinventions. Some of these problems are difficult and it is truly unfortunate for each company to solve its problems separately. At the same time, each company has had to solve these problems due to an absence of a reliable open-source alternative. This clearly has implications for an industry dominated by fast-moving start-ups that cannot build 50-person infrastructure development teams or dedicate months away from building features.

Change Data Capture Systems

Today, I'd like to focus on one such wheel-reinvention: Change Data Capture systems

Relational Databases have been around for a long time and have become a trusted storage medium for all of a company's data. In other words, it is the source of truth for a company's business-critical data. Often times, data is pulled off this primary data store, transformed, and then stored in a secondary data store, such as a data warehouse. This secondary store typically supports the data analytics that drive business insights and direction. In this scheme, the two stores are known as the OLTP store and the OLAP store, respectively. (F2)

All of this has been around for decades, so what is new? Increasingly, data from the primary store is used to feed more than just business decisions. At LinkedIn, it also feeds real-time search indexes, real-time network graph indexes, cache coherency, Database Read Replicas, etc... These are examples of LinkedIn's near-real-time data needs.

If you have ever worked in the area of transferring data from the primary store to secondary stores, you are no doubt familiar with the options available to you.

For example, if you are working in the OLTP-to-OLAP space, you are using some sort of ETL (Extract, Transform, and Load) technology. This space has seen innovation around tooling (e.g. making it easy to define a transformation using GUI drag-and-drop tools) and cross-vendor integration (e.g. Oracle to Teradata, Aster Data, etc...). The industry typically uses ETL to run nightly jobs to give executives a view of the previous day's, week's, month's, year's business performance.

What do you do if you need a stream of near-real-time updates from your primary data store, as shown below for LinkedIn's near-real-time needs?

Databus_Use_Cases

Outside of costly and proprietary vendor-specific products, there are few options.

Introducing Databus

Databus is an innovative solution in this space.

It offers the following features:

  • Pub-sub semantics
  • In-commit-order delivery guarantees
  • Commits at the source are grouped by transaction
    • ACID semantics are preserved through the entire pipeline
  • Supports partitioning of streams
    • Ordering guarantees are then per partition
  • Like other messaging systems, offers very low latency consumption for recently-published messages
  • Unlike other messaging systems, offers arbitrarily-long look-back with no impact to the source
  • High Availability and Reliability

How Does Databus Work?

Databus is composed of 3 important pieces:

  • Relays
  • Bootstrap
  • Client Library

The Databus architecture is shown in the images below. A Databus Relay will pull the recently committed transactions from the source Database (e.g. Oracle, MySQL, etc...) (Step 1). The Relay will deserialize this data into a compact form (Avro etc...) and store the result in a circular in-memory buffer. Clients (subscribers) listening for events will pull recent online changes as they appear in the Relay (Step 2). A Bootstrap component is also listening to on-line changes as they appear in the Relay.(Step 3)

Databus_Operation

If a Subscriber were to fall behind such that the data it requests is no longer in the Relay's in-memory buffer, the Subscriber can request Consolidated Deltas occurring since a time T in the past (Step 4). This will return an efficient representation of all changes that have occurred since time T.

Databus_Operation

If a new Subscriber, one with no prior knowledge of the dataset, were to join the party, it would need to fully bootstrap. At first, the Subscriber's Databus Client library would request a Consistent Snapshot at some time T in the past (Step 5). The client library would then request Consolidated Deltas since that time T (Step 6). After the Subscriber applies the Consolidated Deltas, the client library would switch to listening for online changes from the Relay (Step 7). The client library helps the subscriber get all changes since time T, where T can be any arbitrary point in time, shielding the Subscriber from the details of where the changes are coming from.

Databus_Operation

Databus' Bootstrap Component

One of the most innovative features of Databus is its Bootstrap component. Data Change Capture systems have existed for a long time (e.g. Oracle Streams). However, all of these systems put load on the primary data store when a consumer falls behind.

Bootstrapping a brand new consumer is another problem. It typically involves a very manual process -- i.e. restore the previous night's snapshot on a temporary Oracle instance, transform the data and transfer it to the consumer, then apply changes since the snapshot, etc...

Databus's Bootstrap component handles both of the above use-cases in a seamless, automated fashion.

How Does Databus' Bootstrap Component Work?

The Databus Bootstrap component is made up of 2 types of storage, Log Storage and Snapshot Storage. Log Storage serves Consolidated Deltas, whereas Snapshot Storage serves Consistent Snapshots.

Databus_Bootstrap
  1. As shown earlier, the Bootstrap component listens for online changes as they occur in the Relay. A LogWriter appends these changes to Log Storage.
  2. A Log Applier applies recent operations in Log Storage to Snapshot Storage
  3. If a new subscriber connects to Databus, the subscriber will bootstrap from the Bootstrap Server running inside the Bootstrap component
  4. The client will first get a Consistent Snapshot from Snapshot Storage
  5. The client will then get outstanding Consolidated Deltas from Log Storage
  6. Once the client has caught up to within the Relay's in-memory buffer window, the client will switch to reading from the Relay

Future Plans for Databus

The engineers of the Databus and Espresso (our next NoSQL store, enough said) teams have been working tirelessly to support Databus replication within Espresso -- Databus will be Espresso's native internal replication technology. Additionally, once the team finds some free time, they will open-source it.

We are looking for engineers with a strong track record of solving tough problems to join us in DDS. Feel free to ping me on LinkedIn if you are interested.

What this means for NoSQL

As cool as Databus is, it won't work for all NoSQL stores. There is currently a big gap in the feature set provided by many NoSQL technologies, especially many Dynamo-style Key-Value stores. They do not provide a timeline-consistent change stream that Databus can pull.

Without this support, there are two unfulfilled use-cases:

  • supporting outbound feeds into existing Business Intelligence infrastructure (i.e. nightly, ETL-oriented loads)
  • supporting outbound near-real-time feeds into secondary indexes such as search, network graph, caches, etc...

Recently, for Cassandra, both Netflix and Ooyala solved this problem separately. Netflix published a tech blog about Aegisthus, a system to transform an eventually-consistent set of data files into a time-line consistent stream. This stream is currently consumed by Business Intelligence -- it's not real-time as it is depends on the memtable flush interval. However, with a few tweaks, it can be near-real-time. We look forward to the open-sourcing of that technology.

More importantly, we look to NoSQL vendors to solve this problem for their products.

More Resources

Acknowledgements

I'd like to thank the tireless efforts of the engineers who built this system:

Aditya Auradkar, Chavdar Botev, Shirshanka Das, Dave DeMaagd, Alex Feinberg, Phanindra Ganti, Lei Gao, Bhaskar Ghosh, Kishore Gopalakrishna, Mihir Gandhi, Brendan Harris, Swaroop Jagadish, Joel Koshy, Kevin Krawez, Jay Kreps, Shi Lu, Sunil Nagaraj, Neha Narkhede, Sasha Pachev, Igor Perisic, Lin Qiao, Tom Quiggle, Jun Rao, Bob Schulman, Abraham Sebastian, Oliver Seeliger, Adam Silberstein, Boris Shkolnik, Chinmay Soman, Subbu Subramaniam, Roshan Sumbaly, Kapil Surlaker, Sajid Topiwala, Cuong Tran, Balaji Varadarajan, Jemiah Westerman, Zach White, David Zhang, Jason Zhang, Agila Devi, Neil Pinto, Ramana Ramakrishnan, Sai Sundar, Nishant Vyas, Agila Devi, Neil Pinto, Ramana Ramakrishnan, Sai Sundar and Nishant Vyas.

I'd also like to give a special thanks to Bob Schulman, Kapil Surlaker, and Shirshanka Das for their help with this article.

Footnotes

  1. Netflix's UK and Ireland customers benefit from Netflix's local Region presence in terms of snappy latency. If you are not familiar with AWS Regions, Regions provide geographic proximity to end users. Regions themselves are composed of several data centers, known in AWS-speak as Availability Zones. As the name implies, Availability Zones provide Disaster Recovery within the hosting Region. Disaster Recovery across Regions is never a good idea for a latency-sensitive application like a web site.
  2. OLTP (Online Transaction Processing) vs. OLAP (Online Analytic Processing) : This differentiates between their uses -- OLTP for primary data serving, OLAP for analytic processing of a modified copy of the primary data.