Piccolo - Building Distributed Programs that are 11x Faster than Hadoop

Piccolo (not this or this) is a system for distributed computing, Piccolo is a new data-centric programming model for writing parallel in-memory applications in data centers. Unlike existing data-flow models, Piccolo allows computation running on different machines to share distributed, mutable state via a key-value table interface. Traditional data-centric models (such as Hadoop) which present the user a single object at a time to operate on, Piccolo exposes a global table interface which is available to all parts of the computation simultaneously. This allows users to specify programs in an intuitive manner very similar to that of writing programs for a single machine.

Using an in-memory key-value store is a very different approach from the canonical map-reduce, which is based on using distributed file systems. The results are impressive:

Experiments have shown that Piccolo is fast and pro-vides excellent scaling for many applications. The performance of PageRank and k-means on Piccolo is 11×and 4× faster than that of Hadoop. Computing a PageR-ank iteration for a 1 billion-page web graph takes only 70 seconds on 100 EC2 instances. Our distributed webcrawler can easily saturate a 100 Mbps internet uplink when running on 12 machines.

Piccolo was presented at OSDI10. For the paper take a look at Piccolo: Building Fast, Distributed Programs with Partitioned Tables, here's the slide deck, and there's a video of the talk (very good).

This paper presents Piccolo, a data-centric program-ming model for writing parallel in-memory applications across many machines. In Piccolo, programmers organize the computation around a series of application ker-nel functions, where each kernel is launched as multi-ple instances concurrently executing on many compute nodes. Kernel instances share distributed, mutable state using a set of in-memory tables whose entries reside in the memory of different compute nodes. Kernel instances share state exclusively via the key-value table interface with get and put primitives. The underlying Piccolo run-time sends messages to read and modify table entries stored in the memory of remote nodes. By exposing shared global state, the programming model of Piccolo offers several attractive features. First, it allows for natural and efficient implementations for applications that require sharing of intermediate state such as k-means computation, n-body simulation, PageRank calculation etc. Second, Piccolo enables online applications that require immediate access to modified shared state. For example, a distributed crawler can learn of newly discovered pages quickly as a result of state up-dates done by ongoing web crawls.

Piccolo includes a number of optimizations to ensure that using this table interface is not just easy, but also fast (taken from the home page):

  • Locality - To ensure locality of execution, tables are explicitly partitioned across machines. User code that interacts with the tables can specify a locality preference: this ensures that the code is executed locally with the data it is accessing.
  • Load-balancing - Not all load is created equal - often some partition of a computation will take much longer then others. Waiting idly for this task to finish wastes valuable time and resources. To address this Piccolo can migrate tasks away from busy machines to take advantage of otherwise idle workers, all while preserving the locality preferences and the correctness of the program.
  • Failure Handling - Machines failures are inevitable, and generally occur when you're at the most critical time in your computation. Piccolo makes checkpointing and restoration easy and fast, allowing for quick recovery in case of failures.
  • Synchronization - Managing the correct synchronization and update across a distributed system can be complicated and slow. Piccolo addresses this by allowing users to defer synchronization logic to the system. Instead of explicitly locking tables in order to perform updates, users can attach accumulation functions to a table: these are used automatically by the framework to correctly combine concurrent updates to a table entry.