« Sponsored Post: Gazillion, Edmunds, OPOWER, ClearStone, deviantART, ScaleOut, aiCache, WAPT, Karmasphere, Kabam, Newrelic, Cloudkick, Membase, Joyent, CloudSigma, ManageEngine, Site24x7 | Main | Stuff The Internet Says On Scalability For April 8, 2011 »

Caching and Processing 2TB Mozilla Crash Reports in memory with Hazelcast

Mozilla processes TB's of Firefox crash reports daily using HBase, Hadoop, Python and Thrift protocol. The project is called Socorro, a system for collecting, processing, and displaying crash reports from clients. Today the Socorro application stores about 2.6 million crash reports per day. During peak traffic, it receives about 2.5K crashes per minute. 

In this article we are going to demonstrate a proof of concept showing how Mozilla could integrate Hazelcast into Socorro and achieve caching and processing 2TB of crash reports with 50 node Hazelcast cluster. The video for the demo is available here.


Currently, Socorro has pythonic collectors, processors, and middleware that communicate with HBase via the Thrift protocol. One of the biggest limitations of the current architecture is that it is very sensitive to latency or outages on the HBase side. If the collectors cannot store an item in HBase then they will store it on local disk and it will not be accessible to the processors or middleware layer until it has been picked up by a cronjob and successfully stored in HBase. The same goes for processors, if they cannot retrieve the raw data and store the processed data, then it will not be available.

In the current implementation a collector stores the crash report into HBase and puts the id of the report to a queue stored in another Hbase table. The processors polls the queue and start to process the report. Given the stated problem of a potential HBase outage, the Metrics Team at Mozilla contacted us to explore the possible usage of Hazelcast Distributed Map, in front of the HBase. The idea is to replace the Thrift layer with Hazelcast client and store the crash reports into Hazelcast. An HBase Map store will be implemented and Hazelcast will write behind the entries to HBase via that persister. The system will cache the hot data and will continue to operate even if HBase is down. Beside HBase, Socorro uses Elastic Search to index the reports to search later on. In this design Elastic Search will also be fed by Hazelcast map store implementation. This enables Socorro to switch the persistence layer very easily. For small scale Socorro deployments, HBase usage becomes complex and they would like to have simpler storage options available.

So we decided to make a POC for Mozilla team and run it on EC2 servers. The details of the POC requirement can be found at the wiki page.. The implementation is available under Apache 2.0 license.

Distributed Design

The number one rule of distributed programming is "do not distribute". The throughput of the system increases as you move the operation to the data, rather than data to operation. In Hazelcast the entries are almost evenly distributed among the cluster nodes. Each node owns a set of entries. Within the application you need to process the items. There are two option here; you can either design the system with the processing in mind, and bring the data to the execution or you can take the benefit of data being distributed and do the execution on the node owning the data. If the second choice can be applied, we expect the system to behave much better in terms of latency, throughput, network and CPU usage. In this particular case the crash reports are stored in a distributed map called "crashReports" with the report id as the key. We would like to process a particular crash report on the node that owns it. 


A Collector puts the crash report into "crashReports" map and report id with the insert date as the value into another map called "process" for the reports that need to be processed. These two put operations are done within a transaction. Since both entries in 'crashReports' and 'process' maps have same key (report id), they are owned by the same node. In Hazelcast, the key determines the owner node. 


A Processor on each node adds a localEntryListener on the map "process" and is notified when a new entry is added. Notice that the listener is not listening to the entire map, but only listens the local entries. There will be one and only one event per entry and that event will be delivered locally. Remember the "number one rule of distributed programming".

When the processor is notified of an insert, it gets the report, does the processing and puts updated item back into crashReports and removes the entry from "process" map. All these operations are local, except the put which does one more put on the backup node. There is one more thing missing here. What if the Processor crashes or was not able to process the report when it gets the notification? Good news, the information regarding a particular crash report that needs to be processed is still in the map "process". There will be another thread that loops through the local entries in "process" map and processes any lingering entries. Notice that under normal circumstances and if there is no backlog the map should be empty. 

Data size and Latency

The object that we store in Hazelcast consists of:

  •     a JSON document with a median size of 1 KB, minimum size of 256 bytes and maximum size of 20K.
  •     a binary data blob with a median size of 500 KB; min 200 KB; max 20 MB; 75th percentile 5 MB.

This makes the average of 4 MB data. And given the latency and network bandwidth among the EC2 servers the latency and throughput of our application would be much better in a more real environment.    

Demo Details

To run the application you only need to have the latest Hazelcast and Elastic Search jars. The entry point to the application is com.hazelcast.socorro.Node class. When you run the main method, It will generate one Collector and a Processor. So each node will be both a Collector and a Processor. The Collectors will simulate the generation of crash reports and will submit them into Data Grid. The Processors will receive the newly submitted crash reports, process them and store back to distributed Map. If you enable persistence, Hazelcast will persist the data to Elastic Search.

We run the application on a 50 node cluster of EC2 servers. In our simulation the average crash report size was 4MB. The aim was to store as many crash reports as we can, so we decided to go with largest possible memory which is 68.4 GB that is available at High-Memory Quadruple Extra Large Instance.
Hazelcast Nodes uses Multicast or TCP/IP to discover each other. On EC2 environment multicast is not enabled, but nodes should be able to see each other via TCP/IP. This requires passing the IP addresses to all of the nodes somehow. In the article "Running Hazelcast on a 100 node Amazon EC2 Cluster" we described a way of deploying an application on large scale on EC2. In this demo we used a slightly different approach. We developed a tool, called Cloud Tool that starts N number of nodes. The tool takes as an input the script where we describe how to run the application and the configuration file for Hazelcast. It builds the network configuration part with the IP addresses of nodes and copies (scp) both configuration and script to the nodes. The nodes, on start, waits for the configuration and script file. After they are copied, all nodes execute the script. It is the scripts responsibility to download and run the application. This way within minutes we can deploy any Hazelcast application on any number of nodes.

With this method way we deployed the Socorro POC app on 50 m2.4xlarge instances. By default each node generates 1 crash report per second. Which makes 3K crash reports per minute in the cluster. In the app, we are listening to Hazelcast topic called "command".  This enables us externally increase and decrease the load. l3000 means "generate total of 3000 crash reports per minute".  
We did the demo and recorded all the steps. We used the Hazelcast Management Center to visualize the cluster. Through this tool we can observe the throughput of the nodes, publish messages to the topic, and observe the behavior of the cluster.

Later, we implemented the persistence to Elastic Search. But It couldn't keep up with the data load that we want to persist. We started 5 Hazelcast Nodes and 5 ES Nodes. A backlog appears while persisting to ES. Our guess is, this is because of the EC2 IO. It seems that given the file size of average 4MB, there is no way of creating a 50 node cluster, making 3K reports per minute and storing them into ES. At least this seems not to be possible in an EC2 environment and with our current ES knowledge. We still started a relatively small cluster persisting to 5 node ES. Recorded until it was obvious than we can not persist everything from memory to ES. A 10 minute video is available here.

We think on Mozilla's environment it will be easy to store to HBase and with small deployments of Socorro ES will serve quite well.


Another goal of POC was to demonstrate how one can scale up by adding more and more nodes. So we did another recording where we start with 5 nodes. The cluster is able to generate and process 600 crash reports per minute and stored 200 GB of data in memory. Then iteratively we add 5 more nodes and increase the load. We ended at 20 node cluster that was able to process 2400 reports per minute and store in memory 800 GB of data. We would like to highlight one thing here. To add a node does not immediately scale you up. It has it's initial cost which is backup and migrations. When new nodes arrive Hazelcast will start to migrate some of the partitions(buckets) from old to new nodes to achieve equal balance across the cluster. Once the partition is moved all entries falling into it will be stored on the node owning that partition. More information on this topic can be found here.

In this demo the amount of data that we cache is huge, thus migrations take some time. Also note that during the migration, a request to the migrated data should wait until the migration is completed for the sake of consistency. That's why it is not a good idea to add nodes when your cluster have significantly large amount of data and is under high load to witch it can barely respond.

An alternative approach

Finally we would like to discuss another approach that we tried but didn't record. For the sake of the throughput we chose to process the entries locally. This approach performed better but has two limitations.

  1. All nodes of the cluster that own data should also be a processor. So you can not say that on my 50 node cluster; 20 are collectors and the rest are processor. All should be equal.
  2. For a node to process the data, it should own it first. So if you add a new node, initially there will be no partitions and it will not participate on processing. As the partitions migrate it will participate more and more. You have to wait until the cluster is balanced to fully benefit from new coming nodes.

The second approach was to store the id's of unprocessed entries in a distributed queue. And Processors take ids from the queue and process the associated record. The data is not local here, most of the time a processor will be processing a record that it doesn't own. The performance will be worse but you can separate processors from collectors and when you add a processor to the cluster it will immediately start to do its job. 

Reader Comments (9)

I want to clear the data persistence process, I didn't find out any information about the report persistence in HBase and recovery process when HBase is out (using MapStore). As you sad the previous implementation has a recovery engine when HBase is unavailable it saves data on a disk and performs retry later. In your case you just save data into Elastic Search without permanent persistence, should they handle the Data Base outage manually?

How can they handle a data overflow in distributed map, for example if too much reports will come and there aren't enough memory in a cluster to store it? Do they lose the data? For example, as I know, Terracota swaps it on a disk, do you plan to have this feature in Hazelcast? I think Hazelcast should have more predictable results in such force-major situations especially for systems like Mozilla's Socorro.

April 12, 2011 | Unregistered CommenterBorislav Andruschuk

Hazelcast doesn't care where to persist. If the underlying storage is available it will persist, otherwise the entries will be kept in-memory as dirty until the storage becomes available (so no need to recover anything). Of course it cannot keep reports in-memory forever. We assume that in a reasonable time, storage (HBase, ES, etc.) will be back again.

It was Mozilla Team's choice to implement Elastic Search persistence for the sake of simplicity during the PoC. And Elastic Search IS a permanent persistence.

Hazelcast will persist the data to the storage using MapStore and will not lose any data. Hazelcast is only responsible for calling MapStore with configured time intervals to persist dirty entries. We can implement a built-in disk storage but this is not the point here.

April 13, 2011 | Registered CommenterFuad Malikov

1. Store raw crash reports directly into HDFS (I think it is more reliable than Hazelcast).
2. Keep all unprocessed report references in distributed Queue backed by either ZooKeeper or by any mature JMS middleware: HornetQ, RabbitMQ etc

April 15, 2011 | Unregistered CommenterVladimir Rodionov

1. HDFS is surely more reliable than Hazelcast. Hazelcast is in-memory. As entries are written to Hazelcast, Hazelcast will store them (write-behind) into HBase. But what if HDFS/HBase is not available for some reason? So the plan is to have a secondary, scalable and hot storage (in-memory) while HDFS/HBase isn't available.

2. Queuing the references only will be sufficient as we will also need the crash reports so that while Hadoop/HBase isn't available we can keep receiving crash reports, store them into the clustered-memory and process them directly from memory. Plus we don't want to setup and maintain a separate queuing cluster. Hazelcast is embedded into the Collector+Processor applications directly.


April 15, 2011 | Unregistered CommenterTalip Ozturk

And what if Hazelcast is not available for some reason? What you have described here reminds me classical "we are looking for problem for our solution".

April 17, 2011 | Unregistered CommenterVladimir Rodionov

I like Hazelcast a lot. It seems like the major missing feature is auto-discovery in an EC2 environment. Elastic Search has solved this problem over a year ago, but Hazelcast still requires a manual listing of TCP/IP addresses. Now, they have shown this demo, and the 100-node demo a year ago, but still this very hacked way of spinning up nodes on EC2 with a script file.

If you are on EC2 chances are you would also like to use AWS services to spin up and tear down nodes on demand. This negates the scripted discovery approach. Is it that hard to copy the ES discovery mechanism?

April 20, 2011 | Unregistered CommenterJim Cook

I accept that we should implement EC2 auto discovery over S3, and my promise we will:). This will be very useful for users but we'll still show our demos using the tool above. Even during this demo I used that tool to deploy ES cluster:). In the case of ES, they have to deploy only one type application. But we use Hazelcast embedded in the different applications and actually deploy the application and make sure that Hazelcast nodes will see each other.

With the cloud tool, It is super super easy to start a cluster with any number of nodes just within 2 minutes. No need to do any preliminary work.

May 2, 2011 | Registered CommenterFuad Malikov

We have implemented the EC2 auto discovery. The details can be found here: http://code.google.com/p/hazelcast/wiki/EC2AutoDiscovery

June 23, 2011 | Registered CommenterFuad Malikov

Hi Team,

I am currently doing a POC for developing a distributed, fault tolerant, ETL ecosystem. I have selected Hazelcast for for my clustering (data+notification) purpose. Googling through Hazelcast resources took me here and it exactly matches how I was thinking to go about, using a map based solution.

I need to understand one point. Let me give a canonical idea of our architecture.
Say we have 2 nodes A,B running our server instance clustered through hazelcast. One of them is a listener accepting requests (but can change on a fail over), say A.

A gets a request and puts it to a distributed map. This map is write-through backed by a persistent store and a single memory backup is configured on nodes.

Each instance has a local map entry listener, which on entry added event, would (asynchronous/queuing) process that entry and then remove it from the distributed map.

This is working as expected.

Say 10 requests have been received and distributed with 5 on each nodes. 2 entries on each node has been processed and now both instance crashes.

So there are total 6 entries present in the backing datastore now.

Now we bring up both the instances. As per documentation - "As of 1.9.3 MapLoader has the new MapLoader.loadAllKeys API. It is used for pre-populating the in-memory map when the map is first touched/used"

We implement loadAllKeys() by simply loading all the key values present in the store.

So does that mean there is a possibility where, both the instances will now load the 6 entries and process them (thus resulting in duplicate processing)? Or is it handled in a synchronized way so that loading is done only once in a cluster?

March 13, 2013 | Unregistered CommenterSutanu

PostPost a New Comment

Enter your information below to add a new comment.
Author Email (optional):
Author URL (optional):
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>