Processing 100 Million Pixels a Day - Small Amounts of Contention Cause Big Problems at Scale

This is a guest post by Gordon Worley, a Software Engineer at Korrelate, where they correlate (see what they did there) online purchases to offline purchases.

Several weeks ago, we came into the office one morning to find every server alarm going off. Pixel log processing was behind by 8 hours and not making headway. Checking the logs, we discovered that a big client had come online during the night and was giving us 10 times more traffic than we were originally told to expect. I wouldn’t say we panicked, but the office was certainly more jittery than usual. Over the next several hours, though, thanks both to foresight and quick thinking, we were able to scale up to handle the added load and clear the backlog to return log processing to a steady state.

At Korrelate, we deploy tracking pixels, also known beacons or web bugs, that our partners use to send us information about their users. These tiny web objects contain no visible content, but may include transparent 1 by 1 gifs or Javascript, and allow our partners to let us know when someone sees an ad or takes an ad-related action without needing to send us their ad server logs. For example, when one of our partners displays an ad, they fire off our impression tracking pixel by including it as an image in an iframe or embedding a script tag with our pixel as the source. Their ads dynamically generate the pixel’s URL so they can pass us information about the displayed ad in the query string. For users who choose to accept third party cookies, we are also able to set and read a Korrelate user id cookie on users so we can track their activity and use it later, after aggregation and anonymization, to provide our analytics products.

In the early days, even before Korrelate was Korrelate, we knew that one day we’d need to ingest billions of tracking pixels a day. So when we started writing the pixel log processor, we made architectural choices that would allow it to scale.

At first the log processor was written as a Java servlet, but this proved difficult to manage on the server and none of us were very happy programming in Java. Looking for an alternative, we made the move to Kettle (commercially known as Pentaho Data Integration) since at the time we were using the Pentaho suite of analytics tools to generate reports on our raw data.

Kettle is an extract-transform-load tool that runs on the JVM and can take full advantage of multithreading to quickly process lots of data. It also features an easy-to-use tool GUI tool called Spoon for designing Kettle programs that require lots of configuration but relatively little programming. We enjoyed how quickly we were able to create and deploy log processing using Kettle. As time wore on, though, we became aware of Kettle’s limitations.

Running lots of data through a Kettle job requires lots of memory (as I write this, the log processor requires 8 GB of memory to process files of 250,000 records). And on the development side, the downside to the ease of using Spoon is that the source files are only barely human-editable XML, so we were not able to take advantage of our normal workflow using Git to easily merge concurrent development branches, forcing us to act as if the log processor’s source were locked while someone worked on it. But despite these limitations, we continued to use Kettle because it was working, we had a lot of other stuff to do, and we knew we could scale it up when we needed to, even if it would be expensive.

A few months ago we had to start running two log processors concurrently to keep up with our load. This was a good experience because it helped expose problem areas in log processing.

When only one log processor ran the only performance problems we had were related to individual parts of the process taking a long time to complete. For example, we discovered updates to tables in in our database took an impossibly long time, so we converted nearly all of our tables for storing pixel data to be append-only to allow for fast inserts. A few tables couldn’t be made append-only, so to work with those we created loading tables that log processing would insert data into quickly, then we would go back later and sync the loading tables with the main tables within the database much more quickly than we could have performed upserts.

Bringing up a second log processor exposed us to new problems. Although we were writing to the database quickly thanks to nonblocking writes on append-only tables, the few tables that needed to be synced with our loading tables caused enough contention that two log processors gained us almost nothing over running one. To address this, we split log processing into two parts: the part that wrote only to append-only tables and that part that needed to insert into heap tables. This let us bring up two instances of the append-only log processor, just one of the heap table log processor, and get good throughput because the heap tables receive relatively little data from each log file that needs inserting or updating, whereas the append-only tables receive a lot of data from every log file.

So, on the morning of the pixel influx, we thought  we were well positioned to scale up. We brought up additional servers running additional append-only log processor instances within a couple hours and began cranking through the logs (the heap table log processor continued to run quickly enough on its own to keep up). We quickly discovered, though, that there was contention still lurking in the log processor.

In order to keep track how log processing is doing, we write out some basic statistics to several audit tables about how long log processing takes and how many pixels it processes. To gather this data we used Kettle’s built-in logging features to write information to tables, then combine it into a summarized form that was useful to us. As it turns out, Kettle is written in such a way that it requests exclusive table-level locks on the auditing tables to write into them. And since this happens dozens of times during each run of a log processor instance, we had hundreds of requests for the same tables all waiting on each other. Each request cleared fairly quickly, but the little delays added up to result in log processor instances taking 15 minutes to run when they should have finished in 2.

We turned off Kettle’s built in logging features, replaced them with some code of our own that didn’t request table-level locks, and saw contention on the database between log processors disappear. We learned the hard way the wisdom we had often heard but apparently not yet internalized: small amounts of contention can become big problems at scale.

With contention completely removed from log processing, we were able to quickly bring up more than a dozen log processor instances and use them to quickly process through the backlog and then throttle back to a steady state. In just 24 hours everything was back to normal, save some additional hardware and log processors.

Although the log processor can now handle a high level of concurrency, need to rebuild it to handle even more pixels without the high costs of the current log processor. Bringing up new instances of the current log processors means adding servers with lots of RAM (typically about 24 GB to let us run two log processors on each server plus additional RAM for other processes), which is expensive to do. And the current log processor still faces potential contention over limited connections to the database. To resolve this, we need to look for ways to reduce the number of processes that need to communicate data to the database.

We also want to move away from Kettle to a system that will make code management easier. To that end we have started to build a new log processor using Storm, which provides a flexible framework for creating custom real time stream processing workflows, similar to the way Hadoop provides a flexible framework for batch processing. Although Storm alone provides a lot less built-in functionality than Kettle, it also doesn’t have to because Storm workflows can be written in any language and use any existing libraries we want. And since most of our code is already in Ruby, we are able to leverage our existing code base to build our Storm workflows in Ruby. Based on some of the other folks using Storm, we hope to see our Storm log processor scale to billions of pixels a day.