cloud‎ > ‎

Real Time Analytics for Big Data: An Alternative Approach

posted Jul 19, 2011, 4:21 PM by Kuwon Kang

Real Time Analytics for Big Data: An Alternative Approach

Lately, we've been talking to various clients about realtime analytics, and with convenient timing Todd Hoff wrote up how Facebook's realtime analytics system was designed and implemented (See previous review on that regard here).

They had some assumptions in design that centered around the reliability of in-memory systems and database neutrality that affected what they did: for memory, that transactional memory was unreliable, and for the database, that HBase was the only targeted data store.

What if those assumptions are changed? We can see reliable transactional memory in the field, as a requirement for any in-memory data grid, and certainly there are more databases than HBase; given database and platform neutrality, and reliable transactional memory, how could you build a realtime analytics system?

Joseph Ottinger and I discussed this, and this is what we came up with.

A Summary of History

To understand what a new design might look like, it’s often useful to consider a previous design. This is a very short summary of Facebook’s realtime analytics system.

First, it’s based on a system of key/value pairs, where the key might be a URL and the value is a counter. Thus, there’s a requirement for atomic, transactional updates to a very simple piece of data. The difficulties come from scale, not from the focus of the system.

The process flow is fairly simple:

  • A user creates an event by performing some action on the website. This generates an AJAX request, sent to a service.
  • Scribe is used to write the events into logs, stored on HDFS.
  • PTail is used to consolidate the HDFS logs.
  • Puma takes the consolidated logs from PTail and stores them into HBase in groupings that represent roughly 1.5 seconds’ worth of events.
  • HBase serves as the long-term repository for analytics data.

There are some questions around how PTail and Puma serve as scaling agents, and some of the notes around their use are still limited in scale – for example, one of the concerns is that an in-memory hash table will fill up, which sounds like fairly serious limitation to have to keep in mind.

A Potential for Improvement

There are lots of areas in which you can see potential improvements, if the assumptions are changed. As a contrast to Facebook's working system:

  • We can simplify the design. If memory can be seen as transactional - and it can - we can use them without transforming them as they proceed along our analytics workflow. This makes our design and implementation much simpler to implement and test, and performance improves as well.
  • We can strengthen the design. With a polling semantic, such systems are brittle, relying on systems that pull data in order to generate realtime analytics data. We should be able to reduce the fragility of the system, even while making it faster.
  • We can strengthen the implementation. With batching subsystems, there are limits shouldn’t exist. For example, one concern in Facebook's implementation is the use of an in-memory hash table that stores intermediate data; the in-memory aspect isn’t a concern until you realize that the batch sizes are chosen partially to make sure that this hash table doesn’t overflow available space.
  • We can allow deployments to change databases based on their requirements. There's nothing wrong with HBase, but it's got specific characteristics that aren't appropriate for all enterprises. We can design a system which you’d be able to deploy on various and flexible platforms, and we can migrate the underlying long-term data store to a different database if needed.
  • We can consolidate the analytics system so that management is easier and unified. While there are system management standards like SNMP that allow management events to be presented  in the same way no matter the source, having so many different pieces means that managing the system requires an encompassing understanding, which makes maintenance and scaling more difficult.

What we want to do, then, is create a general model for an application that can accomplish the same goals as Facebook’s realtime analytics system, while leveraging the capabilities that in-memory data grids offer where available, potentially offering improvement in the areas of scalability, manageability, latency, platform neutrality, and simplicity, all while increasing ease of data access.

That sounds like quite a tall order, but it’s doable.

The key is to remember that at heart, realtime analytics represent an events system. Facebook’s entire architecture is designed to funnel events through various channels, such that they can safely and sequentially manage event updates.

Therefore, they receive a massive set of events that “look like” marbles, which they line up in single file; they then sort the marbles by color, you might say, and for each color they create a bundle of sticks; the sticks are lit on fire, and when the heat goes up past a certain temperature, steam is generated, which turns a turbine.

It’s a real-life Rube Goldberg machine, which is admirable in that it works, but much of it is still unnecessary if the assumptions about memory ("unreliable") and database ("HBase is the only target that counts") are changed. Looking at the analogy from the previous paragraph, there’s no need to change a marble into anything. The marble is enough.

A Plan for Implementation

Our design for implementation is built around putting data and messaging together. A data grid is a perfect mechanism for this, as long as it provides some basic features: transactional operations, push and pull semantics, and data partitioning.

A data grid does provide those basic features, or else it's not really much of a data grid; it'd be more of a cache otherwise.

With a data grid, then, the events come in as individual messages. When the user chooses an operation on the web site, an asynchronous operation would write the event, just as Facebook does today. However, instead of filtering and batching the events into various forms, the events are dispatched to waiting processes that perform many transactional updates in parallel.

There’s a danger that those updates might be slower than the generated events, if each event is processed sequentially. That said, this isn’t as much a problem as one might think; if data partitioning is used, then event handlers can receive partitioned events, which localizes updates and speeds them up dramatically.

In fact, you can still use batching to process events as a group; since the events would be partitioned coming in, the batch process would still be updating local data very quickly, which would be faster than individual event processing, even while retaining simplicity.

With this design, there is no overflow condition, because a system that’s designed to scale in and out as most data grids are will repartition to maintain even usage. If a data grid can’t provide this feature intrinsically, of course some management will be necessary, but finding data grids with this feature isn’t very difficult.

One other advantage of data grids is in write-through support. With write-through, updates to the data grid are written asynchronously to a backend data store – which could be HBase (as used by Facebook), Cassandra, a relational database such as MySQL, or any other data medium you choose for long-term storage, should you need that.

RealTimeAnalytics_big_data

The memory system and the database - the external data store - work together. The in-memory solution is ideal for the realtime aspects, the events that affect now. The external data storage solution is designed to handle long-term data, for which speed is not as much of an issue.

A Discussion of Strengths

The key concept here is that event handling is the lever that can move the realtime analytics mountain. By providing a simple, scalable publisher/subscriber model, you simplify design; by using a platform that supports data partitioning, transactional updates, and write through capabilities, you gain scalability.

The data grid’s flexible query API means that events can literally react when data is available.

For a call center, for example, you want to immediately identify signals that show that the caller should be handled differently; imagine an ecommerce site that was able to determine immediately if a user was losing interest, and thus could respond appropriately, before the customer moves on.

With external processes and a long funnel for data, immediate-response capabilities are very difficult to implement, not just because of latency but because the data transformations tend to homogenize the data, instead of allowing rich expressions and flexible event types.

The data grid also has much richer support in terms of client applications. Instead of applications going through an API that focuses on a specific phase of the data’s life (for example, an API focused on HBase), you can focus on a generic API that can capture events at any point in their lifecycle, and from anywhere. An external monitoring process, then, can have the same immediate, partition-aware access to data that the integrated message-handling system does; adding features and analysis is just a matter of connecting a client to the data grid.

Here we have a quick demo that shows much of this in motion. We have a market analysis application, deployed into GigaSpaces XAP via our new Cloud deployment too, Cloudify; it uses an event-driven system to display realtime data, with a write-through to Cassandra on the back-end. The design is very simple, and demonstrates the principles we've discussed here - and can scale up and down depending on demand.

Final words

As real-time-analytics gets into mainstream application design, it becomes important to draw a blueprint on how to build this sort of system without re-inventing the wheel.

Todd Hoff (HighScalability) and Alex Himel (Facebook) provided a fairly detailed description on their solution and even more importantly they even shared the rationales that made them do things in certain ways.

One main difference in assumptions that lead to the different implementation strategies are in reliable memory for event processing, and in the use of passive data storage.

Another difference is that we had to to think of the solution as an easily cloneable solution and therefore a lot of attention was put on the simplicity of the runtime, packaging and management of the solution.

Yet another difference is that we couldn’t decide on a specific database as there isn’t a "one size fits all" solution – for certain customers, SQL would still be preferred choice and the fact that we can buffer the write to the database gives them more headroom while still allowing them to scale on writes.

I hope that this would lead to constructive dialogue on the various tradeoffs which will serve the entire industry...

Comments