What's cool about starting a new project is you finally have a chance to do it right. You of course eventually mess everything up in your own way, but for that one moment the world has a perfect order, a rightness that feels satisfying and good. Arne Claassen, the CTO of notify.me, a brand new real time notification delivery service, is in this honeymoon period now.
Arne has been gracious enough to share with us his philosophy of how to build a notification service. I think you'll find it fascinating because Arne goes into a lot of useful detail about how his system works.
His main design philosophy is to minimize the bottlenecks that form around synchronous access, that is when some resource is requested and the requestor ties up more resources, waiting for a response. If the requested resource can’t be delivered in a timely manner, more and more requests pile up until the server can’t accept any new ones. Nobody gets what they want and you have an outage. Breaking synchronous operations into asynchronous operations by separating request and response into separate message passing actions, stops the resource overload. Instead of a system going down from too many parallel requests, it can works its way through a backlog of requests as fast as it can. And in most cases the request/response cycles are so fast that they appear like a linear sequence of events.
Notify.me is taking the innovative and risky strategy of using ejabberd, an XMPP based system, as their internal messaging and routing layer. Will Erlang and Mnesia (Erlang's database) be able to keep up with traffic and keep low latencies as traffic scales? It will be interesting to find out.
If you are interested in notify.me they've kindly offered 500 beta accounts for HS readers: http://notify.me/user/account/create/highscale
Who are you?
My name is Arne Claassen, the CTO of notify.me. I've been working on highly scalable web based applications and services for the past decade. These sites have employed various combinations of traditional scaling techniques such as server farms, caching, content pregeneration and highly available databases using replication and clustering. All of these techniques are ways to mitigate scarce resources (generally the database) being in contention by many users. Knowing the benefits and pitfalls of these techniques, it has become my focus to architect systems that circumvent scarce resource scenarios.
What is notify.me, why did you make it, and why is it a good thing?
notify.me is the brainchild of Jason Wieland, our CEO. It's a near real time notification service that alerts users to new content published on the web.
It was created to address the common user pain of staying up to date of time critical events that occur on the web. For instance, a user searching for an apartment on Craigslist would want to be alerted once a new one matching their search criteria is posted. notify.me does the grunt work of repeatedly checking Craigslist for new listings and alerts the user once a new one is posted. Notifications can be delivered to instant messenger, desktop application, mobile device, email, and web application.
Our goal is to create and publish open APIs allowing people to build new and interesting applications for generating and delivering information.
How does your service compare to other services people might be familiar with? Like Twitter, Friend Feed, Gnip, or Yahoo Pipes?
There are quite a few companies that are in our competitive landscape, some of which are direct competitors, like yotify.com or alerts.com. The main difference is our approach. Yotify and alerts are focused on being notification portal sites for users to visit. notify.me is a utility, with a focus of offering all the functionality available on the website via XMPP and REST APIs, allowing users to interact with notifications from the application of their choosing. We also allow for escalation of messages to destinations. For example if a user is not logged into their IM or have a status of away, notifications can be escalated and routed to their mobile device.
In the messaging arena, we are nearly the opposite of twitter. Twitter is inward facing publishing model based on its own user generated content. Someone makes a tweet and it gets published to their followers. notify.me is creating an externally facing message delivery system. Users add any website that supports web feed standards, or redirect existing notification emails to us. If anything, we are a messaging pipeline that is complementary to twitter (more on that below).
Friendfeed does a great job at combining all your social networks together into one centralized area. They're primary focus is to build features and tools to interact with the mashed feed. This feed would be perfect to add as a source to notify.me, allowing a user to receive all social network updates over instant messenger. Being able to know in near real time when you have a new posting on your wall so that you can immediately respond is a feature the social addicts want.
Yahoo Pipes would be considered as a possible partner, similar to how they upsell Netvibes and Newsgator. Their focus is to provide an intuitive programming interface to be able to manipulate feeds and create a useful mashup.
For example the Hot Deals Search is a nifty pipe that searches over a collection of sites for the best deal. Users might not want to use Yahoo's own notification options due to the limited options of destinations.
In our beta group we've seen similar activity with users adding ebay links. Ebay has a competitive notification pipeline to notify.me however, users still add ebay search links to our service. It turns out they they would like one central place to manage their various news feeds.
Gnip is a pure infrastructure play. We have similar technology but we are going after completely different markets.
An additional core feature of our product that we have not yet exposed is that our pipelines is bi-directional, i.e. any data source can also be a destination and vice-verse. The primary benefit of this is the ability of allowing messages to be responded to, such as acknowledging a support ticket you received. Bi-directional communication will require integration with the notify.me API through which a source can communicate reply options.
We currently are developing a deep integration with the Twitter API to provide two-way capabilities for tweets via IM in the same channel that you already receive your other notifications.
Can you explain the different parts of notify.me and how they connect together?
In general terms our system consists of three subsystems, each of which has a number of implementations.
1. Ingestion consists of rss and email ingestors, which constantly check the user's email address (email@example.com) and the user's feeds for new data. New data is turned into notifications which are propagated to routing.
2. Routing is responsible for getting the user's notifications to the right delivery components. Routing is the point in the system that the user interacts with for management, such as changing sources and destinations, and viewing history. Notification history is a specialized delivery component, allowing all messages to be perused via the website, even after they have gone through the entire pipeline.
3. Delivery is currently comprised of history (which always gets the messages), Xmpp IM, SMS and email, with private RSS, AIM and MSN in development. On a more technical level, the topology of this system is comprised of two separate message busses:
1. Store-and-forward queues (using simpleMQ)
2. XMPP (using ejabberd)
Store and forward queues are used by the ingestion side to distribute the work of ingestion and generally anywhere where data is handled before it becomes visible to the routing rules of the user. This allows for scaling flexibility as well as process isolation during a component failure.
The Xmpp bus is called the Avatar bus, named thusly because every data owning entity is represented by a daemon process that is the sole authority for that entities data. We have four types of avatars, Monitor, Agent, Source and User
1. Monitor avatars are simply the responsible parties for observing instance health and spinning up and shutting down additional computing nodes per demand.
2. Agent avatars are the delivery gateways that provide presence information of our users into the bus and deliver messages to our users.
3. Source avatars are ingestors, such as an RSS. This avatar pulls new messages from a store and forward queue and notifies its subscribers of the new message.
4. User avatars persists all the configuration and messaging data for a particular user. It is responsible for receiving new notifications from ingestion avatars, deciding on the routing and pushing messages to the appropriate delivery agent as that agent declares the ability to execute that delivery.
What particular challenges did you face and how did you overcome them? What options did you consider and why did you decide to do it another way?
From the onset, our primary goal was to avoid bottlenecks and hindrances to scaling horizontally. Initially we planned on building the entire system as a stateless flow of messages through queues, with each daemon along the line being responsible for the data flowing through it, merging, multiplexing and routing it to the next point until delivery was achieved. This meant no single part's failure would ever affect the whole, other than queues getting backed up.
However early on we realized that once a message was designated for a user we needed the ability to track where it was and be able to re-route it dynamically depending on user configuration and presence. This lead us to add a bit of inappropriate coupling between daemons via REST apis. Some of this plumbing still exists, as we're still migrating processing over to the combination queue/async bus architecture.
As we realized that pure message passing without state was not going to satisfy our dynamic needs, the easy solution was to return to the tried state keeper, the central relational database. Knowing our scaling goals, this would have introduced a point of failure that sooner or later we would not be able to mitigate. We decided to look at our state in a different way and instead of thinking of creating processing units based on function (i.e. ingestion, parsing, transformation, routing, delivery) that queried state based on the data flowing through it, we thought of the units in terms of data ownership, i.e. sources and destinations (users). Once on that track, there was precious little shared state and we were able to change our storage pattern to have each owner of data be responsible for its own data, allowing horizontal scaling of the persistence layer, as well as much more efficient caching.
The remaining need for accessing data across owner is analytics. In many systems analytics is a primary reason for the existence of a central database, since too often facts and dimensions are kept intermingled in the production schema. For our purposes, this data is not a production concern and therefore should never affect live capacity. Usage and state changes are treated as immutable events, which are queued at the point of occurence into our store-and-forward system. The nature of our store-and-forward queue allows us to automatically gather all these events from all hosts to a central archive which can then be processed into fact and dimension data by ETL processes. This allows for near real-time tracking of usage without affecting user facing systems.
Could you explain your choice of XMPP a little more? Is it used mainly as a message bus between federated XMPP servers sitting on EC2 nodes? Is the XMPP queue used as the queue for each user's messages from all sources before they are pushed to users?
We have three different xmpp clusters which take advantage of federation for cross-chatter: user, agent and the avatar bus.
This is a regular xmpp IM server on which we create accounts for each of our users, providing them an IM account that they can use from any Xmpp capable client. This account also serves as the user our desktop app signs in as and that will be the authentication for our API for third party message ingestion and distribution
The daemons connecting to this cluster serve as communication bridges between our internal Avatar bus and outside clients. Currently this is primarily for communicating with chat clients, as every user is assigned an agent that they communicate without us through, regardless of whether they use their default account or some third party account such as jabber.org, googletalk, etc. We also are testing a client API that uses Xmpp RPC via these agents for dedicated Apps. In the future we will also offer full XMPP and REST APIs for third party integration that will use the agents to communicate with the Avatar bus.
I mentioned earlier that Agents are avatars as well, however they are a little special in that they do not have a user on the Avatar bus but talk to other avatars through cross server federation. We currently are also building agents for the Oscar and MSN networks that will sit directly on the avatar bus since their native transport is not federated. We also plan to evaluate other networks for possible future support.
Avatars is our internal message bus that we use to route and process all commands and messages. We primarily use direct messaging and IQ based RPC stanzas between avatars, although we do take advantage of presence for monitoring.
So what is an avatar? It's a daemon (where a single physical daemon process can host many avatar daemons) that is the authority for some external entity's data. I.e. every user registered with notify.me has an avatar that monitors agents for status changes, receives messages in care of that user and is responsible for routing those messages to the appropriate delivery channel. This means that every avatar is the single authority for all data about that user and is responsible to persisting the data. If some other part of the system wants to know something about that user or modify its data, it uses Xmpp RPC to talk to the avatar, rather than some central database. Right now, avatars persist to disk and SimpleDB, while keeping a ttl-regulated cache in process. Since only the avatar can write its own data, it does not need to check the DB but can treat its memory and disk cache as authoritative and SDB is used primarily for writes. Reads are needed only in the case of a node failure to bring up the avatar on another node.
At the other end of the bus we have our ingestors. Ingestors are made up of a number of daemons, generally running on polling loops against external sources, queueing new data into our store-and-forward queues, where the appropriate ingestor avatar picks up new messages and distributes them to its subscribers. In the ingestor avatar scenario, it is the authority on subscription and routing data.
Here's a typical use case: A user subscribes to an RSS feed via the web interface. The web interface sends the request to the user's avatar, which persists the subscription for reference and then requests the subscription from the rss ingestor. As new rss items arrive, the rss ingestor multiplexes items to all user avatars that subscribe to that feed. The user avatars in turn determine the appropriate delivery mechanism and schedule delivery. In general that means that the user avatar is subscribed to the user's Xmpp presence via the users' agent. Until the user is in the proper state for accepting messages, the user avatar queues the rss items. Once the user is ready to receive the notification, the presence change is propagated from the agent into the internal bus and the user avatar then sends the rss items to agent, which in turn delivers it to the user.
Right now, all avatars are always online (even if mostly idle), which is fine for our present user base size. Our plan is to mod the offline storage module of ejabberd so that we can blind fire stanzas and have queued messages signal a monitor to spin up the appropriate avatar for the destination XmppId. Once this system is in place we will be able to spin up on avatars on demand and shut down them down on idle.
At what traffic load do you expect your current architecture to break and what's your plan?
Since our system is distributed and asynchronous by design, we should avoid systemwide failures under load. However, while avoiding all the usual bottlenecks, the reality is that our message bus, which makes all this possible, will likely become our limiting factor, either because it cannot handle the number of avatars (nodes on the bus) or because latency on the bus becomes unacceptable. We're only starting to use the avatar system as our backbone, so it's still a bit fragile and we're still doing load testing on ejabberd to determine at what point we run into limiting factors.
While we are already clustering ejabberd, the load of mnesia database replication and cross node chatter means that either number of connections or latency will eventually cause the cluster to fail or simply consume too much memory to be managable.
Since our messaging is primarily point-to-point, we anticipate that we can split our user base into avatar silos, each hosted on a dedicated avatar subdomain cluster, reducing message and connection load. As long as our silos are appropriately designed to keep crosssubdomain chatter to a minimum, we should be able to have n silos to keep on top of load.
Our single greatest challenge to avoid this architecture failing is eternal vigilance against introducing features that create messaging bottlenecks. A significant amount of our message traffic passing through a single processor or family of processors, would introduce dependencies we cannot scale ourselves out of with subdomain division.