Monday, July 18, 2016

Kafka, Samza, and the Unix Philosophy of Distributed Data

This paper is very related to the "Realtime Data Processing at Facebook" paper I reviewed in my previous post. As I mentioned there Kafka does basically the same thing as Facebook's Scribe, and Samza is a stream processing system on Kafka.

This paper is very easy to read. It is delightful in its simplicity. It summarizes the design of Apache Kafka and Apache Samza and compares their design principles to the design philosophy of Unix, in particular, Unix pipes.

Who says plumbing can't be sexy? (Seriously, don't Google this.) So without further ado, I present to you Mike Rowe of distributed systems.

Motivation/Applications

I had talked about the motivation and applications of stream processing in the Facebook post. The application domain is basically building web services that adapt to your behaviour and personalize on the fly, including Facebook, Quora, Linkedin, Twitter, Youtube, Amazon, etc. These webservices take in your most recent actions (likes, clicks, tweets), analyze it on the fly, merge with previous analytics on larger data, and adapt to your recent activity as part of a feedback loop.

In theory you can achieve this personalization goal with a batch workflow system, like MapReduce, which provides system scalability, organizational scalability (that of the engineering/development team's efforts), operational robustness, multi-consumer support, loose coupling, data provenance, and friendliness to experimentation. However, batch processing will add large delays. Stream processing systems preserve all the good scalability features of batch workflow systems, and add "timeliness" feature as well.

I am using shortened descriptions from the paper for the following sections.

Apache Kafka


Kafka provides a publish-subscribe messaging service. Producer (publisher) clients write messages to a named topic, and consumer (subscriber) clients read messages in a topic. A topic is divided into partitions, and messages within a partition are totally ordered. There is no ordering guarantee across different partitions. The purpose of partitioning is to provide horizontal scalability: different partitions can reside on different machines, and no coordination across partitions is required.

Each partition is replicated across multiple Kafka broker nodes to tolerate node failures. One of a partition's replicas is chosen as leader, and the leader handles all reads and writes of messages in that partition. Writes are serialized by the leader and synchronously replicated to a configurable number of replicas. On leader failure, one of the in-sync replicas is chosen as the new leader.

The throughput of a single topic-partition is limited by the computing resources of a single broker node --the bottleneck is usually either its NIC bandwidth or the sequential write throughput of the broker's disks. When adding nodes to a Kafka cluster, some partitions can be reassigned to the new nodes, without changing the number of partitions in a topic. This rebalancing technique allows the cluster's computing resources to be increased or decreased without affecting partitioning semantics.

Apache Samza

A Samza job consists of a Kafka consumer, an event loop that calls application code to process incoming messages, and a Kafka producer that sends output messages back to Kafka. Unlike many other stream-processing frameworks, Samza does not implement its own network protocol for transporting messages from one operator to another.


Figure 3 illustrates the use of partitions in the word-count example: by using the word as message key, the SplitWords task ensures that all occurrences of the same word are routed to the same partition of the words topic.


Samza implements durable state through the KeyValueStore abstraction, exemplified in Figure 2. Samza uses the RocksDB embedded key-value store, which provides low-latency, high-throughput access to data on local disk. To make the embedded store durable in the face of disk and node failures, every write to the store (i.e., the changelog) is also sent to a dedicated topic-partition in Kafka, as illustrated in Figure 4. When recovering after a failure, a task can rebuild its store contents by replaying its partition of the changelog from the beginning. Rebuilding a store from the log is only necessary if the RocksDB database is lost or corrupted. While the changelog publishing to Kafka for durability seems wasteful, it can also be a useful feature for applications: other stream processing jobs can consume the changelog topic like any other stream, and use it to perform further computations.


One characteristic form of stateful processing is a join of two or more input streams, most commonly an equi-join on a key (e.g. user ID). One type of join is a window join, in which messages from input streams A and B are matched if they have the same key, and occur within some time interval delta-t of one another. Alternatively, a stream may be joined against tabular data: for example, user clickstream events could be joined with user profile data, producing a stream of clickstream events with embedded information about the user. When joining with a table, the authors recommend to make the table data available in the form of a log-compacted stream through Kafka. Processing tasks can consume this stream to build an in-process replica of a database table partition, using the same approach as the recovery of durable local state, and then query it with low latency. It seems wasteful to me, but it looks like the authors do not feel worried about straining Kafka, and are comfortable with using Kafka as a work horse.

Even though the intermediate state between two Samza stream processing operators is always materialized to disk, Samza is able to provide good performance: a simple stream processing job can process over 1 million messages per second on one machine, and saturate a gigabit Ethernet NIC.

Discussion

The paper includes a nice discussion section as well.
  • Since the only access methods supported by a log are an appending write and a sequential read from a given offset, Kafka avoids the complexity of implementing random-access indexes. By doing less work, Kafka is able to provide much better performance than systems with richer access methods. Kafka's focus on the log abstraction is reminiscent of the Unix philosophy: "Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new features." 
  • If Kafka is like a streaming version of HDFS, then Samza is like a streaming version of MapReduce. The pipeline is loosely coupled, since a job does not know the identity of the jobs upstream or downstream from it, only the topic names. This principle again evokes a Unix maxim: “Expect the output of every program to become the input to another, as yet unknown, program.”
  • There are some key differences between Kafka topics and Unix pipes: A topic can have any number of consumers that do not interfere with each other, it tolerates failure of producers, consumers or brokers, and a topic is a named entity that can be used for tracing data provenance. Kafka topics deliberately do not provide backpressure: the on-disk log acts as an almost-unbounded buffer of messages.
  • The log-oriented model of Kafka and Samza is fundamentally built on the idea of composing heterogeneous systems through the uniform interface of a replicated, partitioned log. Individual systems for data storage and processing are encouraged to do one thing well, and to use logs as input and output. Even though Kafka's logs are not the same as Unix pipes, they encourage composability, and thus Unix-style thinking.

Related links

Further reading on this is Jay Kreps excellent blog post on logs.

Apache Bookkeeper and Hedwig are  good alternatives to Kafka

These days, there is also DistributedLog.

Saturday, July 9, 2016

Realtime Data Processing at Facebook

Recently there has been a lot of development in realtime data processing systems, including Twitter's Storm and Heron, Google's Millwheel, and LinkedIn's Samza. This paper presents Facebook's Realtime data processing system architecture and its Puma, Swift, and Stylus stream processing systems. The paper is titled "Realtime Data Processing at Facebook" and it appeared at Sigmod'16, June 26-July 1.

Motivation and applications

Facebook runs hundreds of realtime data pipelines in productions. As a motivation of the realtime data processing system the paper gives Chorus as an example. The Chorus data pipeline transforms a stream of individual Facebook posts into aggregated, anonymized, and annotated visual summaries. E.g., what are the top 5 topics being discussed for the election today? What are the demographic breakdowns (age, gender, country) of World Cup fans?

Another big application is the mobile analytics pipelines that provide realtime feedback for Facebook mobile application developers, who use this data to diagnose performance and correctness issues.

The system architecture


Scribe plays a central role in Facebook's realtime processing architecture. The main idea of the architecture is this: By trading seconds versus milliseconds latency, the architecture is able to employ a persistent message bus, i.e., Scribe, for data transport. Scribe provides a persistent, distributed messaging system for collecting, aggregating and delivering high volumes of log data with a few seconds of latency and high throughput. Scribe is the transport mechanism for sending data to both batch and realtime systems at Facebook. Using Scribe to decouple the data transport from the processing allows the system to achieve fault tolerance, scalability, and ease of use, as well as supporting multiple processing systems as options.

While Scribe incurs a few seconds of latency, it still meets Facebook's performance requirements for latency and provides hundreds of Gigabytes per second throughput. On the other hand, Scribe provides a persistent message bus service that enables decoupling and isolation of the data production and data analysis system components. Moreover, with persistent Scribe streams, the system can replay a stream from a recent time period, which makes debugging and iterative-development much easier.

The Kafka log blog by Jay Kreps described these benefits nicely as well. It talked about how practical systems can by simplified with a log-centric design, and how these log steams can enable data Integration by making all of an organization's data easily available in all its storage and processing systems. Kafka would have similar advantages to Scribe. Facebook uses Scribe because it is developed in house.

Below I copy snippets of descriptions from the paper for each of these subsystems.

Within Scribe, data is organized by distinct streams of "category". Usually, a streaming application consumes one Scribe category as input. A Scribe category has multiple buckets. A Scribe bucket is the basic processing unit for stream processing systems: applications are parallelized by sending different Scribe buckets to different processes. Scribe provides data durability by storing it in HDFS. Scribe messages are stored and streams can be replayed by the same or different receivers for up to a few days.

The realtime stream processing systems Puma, Stylus, and Swift read data from Scribe and also write to Scribe.  Laser, Scuba, and Hive are data stores that use Scribe for ingestion and serve different types of queries. Laser can also provide data to the products and streaming systems, as shown by the dashed (blue) arrows.

Puma is a stream processing system whose applications (apps) are written in a SQL-like language with UDFs (user-defined functions) written in Java. Puma apps are quick to write: it can take less than an hour to write, test, and deploy a new app. Unlike traditional relational databases, Puma is optimized for compiled queries, not for ad-hoc analysis. Puma provides filtering and processing of Scribe streams (with a few seconds delay). The output of these stateless Puma apps is another Scribe stream, which can then be the input to another Puma app, any other realtime stream processor, or a data store.

Swift is a basic stream processing engine which provides checkpointing functionalities for Scribe. If the app crashes, you can restart from the latest checkpoint; all data is thus read at least once from Scribe. Swift is mostly useful for low throughput, stateless processing.

Stylus is a low-level stream processing framework written in C++. A Stylus processor can be stateless or stateful. Stylus's processing API is similar to that of other procedural stream processing systems.

Laser is a high query throughput, low (millisecond) latency, key-value storage service built on top of RocksDB. Laser can be used to make the result of a complex Hive query or a Scribe stream available to a Puma or Stylus app, usually for a lookup join, such as identifying the topic for a given hashtag.

Scuba is Facebook's fast slice-and-dice analysis data store, most commonly used for trouble-shooting of problems as they happen. Scuba provides ad hoc queries with most response times under 1 second.

Hive is Facebook's exabyte-scale data warehouse. Facebook generates multiple new petabytes of data per day, about half of which is raw event data ingested from Scribe. (The other half of the data is derived from the raw data, e.g., by daily query pipelines.) Most event tables in Hive are partitioned by day. Scribe does not provide infinite retention; instead Facebook stores input and output streams in our data warehouse Hive for longer retention.

Design decisions


Figure 4 summarizes the five design decisions considered for this Facebook realtime processing system components. Figure 5 summarizes which alternatives were chosen by a variety of realtime systems, both at Facebook and in the related literature.

Lessons learned

The paper includes a great lessons learned section. It says: "It is not enough to provide a framework for users to write applications. Ease of use encompasses debugging, deployment, and monitoring, as well. The value of tools that make operation easier is underestimated. In our experience, every time we add a new tool, we are surprised that we managed without it."

The highlights from this section are as follows:

  • There is no single language that fits all use cases. Needing different languages (and the different levels of ease of use and performance they provide) is the main reason why Facebook has three different stream processing systems, Puma, Swift, and Stylus.
  • The ease or hassle of deploying and maintaining the application is equally important. Making Puma deployment self-service let them scale to the hundreds of data pipelines that use Puma. (See Facebook's holistic configuration management about what type of systems Facebook employs to manage/facilitate deployments.
  • Once an app is deployed, we need to monitor it: Is it using the right amount of parallelism? With Scribe, changing the parallelism is often just changing the number of Scribe buckets and restarting the nodes that output and consume that Scribe category. To find out the right amount of parallelism needed, Facebook uses alerts to detect when an app is processing its Scribe input more slowly than the input is being generated. 
  • Streaming versus batch processing is not an either/or decision. Originally, all data warehouse processing at Facebook was batch processing. Using a mix of streaming and batch processing can speed up long pipelines by hours.


Related posts

Facebook's software architecture 

Holistic Configuration Management at Facebook

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services 

Measuring and Understanding Consistency at Facebook

Thursday, July 7, 2016

Efficient Replication of Large Data Objects


This paper appeared in DISC 2003, and describes an application of the ABD replicated atomic storage algorithm for replication of large objects. When objects being replicated is much larger than the size of the metadata (such as tags or pointers), it is efficient to tradeoff performing cheaper operations on the metadata in order to avoid expensive operations on the data itself.

The basic idea of the algorithm is to separately store copies of the data objects in replica servers, and information about where the most up-to-date copies are located in directory servers. This Layered Data Replication (LDR) approach adopts the ABD algorithm for atomic fault-tolerant replication of the metadata, and prescribes how the replication of the data objects in the replica servers can accompany replication of the metadata in directory servers in a concurrent and consistent fashion: In order to read the data, a client first reads the directories to find the set of up-to-date replicas, then reads the data from one of the replicas. To write, a client first writes its data to a set of replicas, then informs the directories that these replicas are now up-to-date.

The LDR algorithm replicates a single data object supporting read and write operations, and guarantees that the operations appear to happen atomically.  While there exist multiple physical copies of the data, users only see one logical copy, and user operations appear to execute atomically on the logical copy. As such LDR provides linearizability, a strong type of consistency, that guarantees that a read operation returns the most recent version of data. LDR provides single-copy consistency and is on the CP side of the CAP triangle; availability is sacrificed when a majority of replicas are unreachable.

Client Protocol

When client i does a read, it goes through four phases in order: rdr, rdw, rrr and rok. The phase names describe what happens during the phase: read-directories-read, read-directories-write, read-replicas-read, and read-ok. During rdr, i reads (utd, tag) from a quorum of directories to find the most up-to-date replicas. i sets its own tag and utd to be the (tag, utd) it read with the highest tag, i.e., timestamp. During rdw, i writes (utd, tag) to a write quorum of directories, so that later reads will read i’s tag or higher. During rrr, i reads the value of x from a replica in utd. Since each replica may store several values of x, i tells the replica it wants to read the value of x associated with tag. During rok, i returns the x-value it read in rrr.

When i writes a value v, it also goes through four phases in order: wdr, wrw, wdw and wok. These phase names stand for write-directories-read, wrw for write-replicas-write, wdw for write-directories-write, and wok for write-ok, respectively. During wdr, i reads (utd, tag) from a quorum of directories, then sets its tag to be higher than the largest tag it read. During wrw, i writes (v, tag) to a set acc of replicas, where |acc| ≥ f + 1. Note that the set acc is arbitrary; it does not have to be a quorum. During wdw, i writes (acc, tag) to a quorum of directories, to indicate that acc is the set of most up-to-date replicas, and tag is the highest tag for x. Then i sends each replica a secure message to tell them that its write is finished, so that the replicas can garbage-collect older values of x. Then i finishes in phase wok.


If you have difficulty in understanding the need for 2-round directory reads/writes this protocol, reviewing how the ABD protocol works will help.

Replica and Directory node protocol

The replicas respond to client requests to read and write values of data object x. Replicas also garbage-collect out of date values of x, and gossip among themselves the latest value of x. The latter is an optimization to help spread the latest value of x, so that clients can read from a nearby replica.

The directories' only job is to respond to client requests to read and write utd and tag.

Questions and discussion

Google File System (SOSP 2003) addressed efficient replication of large data objects for datacenter computing in practice. GFS also provides a metadata service layer and data object replication layer. For the metadata directory service, GFS uses Chubby, a Paxos service which ZooKeeper cloned as opensource.  Today if you want to build from a consistent large object replication storage from scratch, your architecture would most likely use ZooKeeper as the metadata directory coordination service as GFS prescribed. ZooKeeper provides atomic consistency already, so it eliminates the 2-round needed for directory-reads and directory-writes in LDR.

LDR does not use a separate metadata service, instead it can scavenge raw dumb storage nodes for directory service and achieve the same effect by using ABD replication for making the metadata directory atomic/fault-tolerant. In other words, LDR takes a fully-decentralized approach, and can support loosely-connected heterogenous wimpy devices (maybe even smartphones?). I guess that means more freedom. On the other hand, LDR is bad for performance. It requires 2 rounds of directory-write for each write operation and 2 rounds of directory-read for each read operation. This is major drawback for LDR. Considering reads are generally 90% of the workload, supporting 1 round directory-reads would have alleviated the performance problem somewhat. Probably in normal cases (in the absence of failures, the first directory read (rdr operation) will show the up-to-date replica copy is present in a quorum of directory nodes, and the second round of directory access (rdw operation) can be skipped.

Using ZooKeeper for the metadata directory helps a lot, but a downside can be that ZooKeeper is a single centralized location, and that means for some clients across to ZooKeeper will always incur high WAN communication penalty. Using ZooKeepers observers reduce this cost for read operations. And as I will blog about soon, our work on WAN-Keeper reduces this cost also for write operations. The LDR paper suggests that LDR is suitable for WAN, but LDR still incurs WAN latencies while accessing a quorum of directory nodes (twice!) across WAN.

Another way to efficiently replicate large data objects is of course key-value stores. In key-value stores, you don't have a metadata directory, as "hashing" takes care of that. On the other hand, most key-value stores sacrifice strong consistency, in lieu for eventual consistency. Is it true that you can't just get away with using hashes and  need some sort of metadata service if you like to achieve consistency? The consistent key-value stores I can think of (and I can't think of too many) use either a Paxos commit on metadata or at least a chain replication approach such as in Hyperdex and Replex. The chain replication approach uses a Paxos box only for directory node replication configuration information; does that still count as a minimal and 1-level-indirect metadata service?

Friday, July 1, 2016

Replex: A Scalable, Highly Available Multi-Index Data Store

This paper received the best paper award at Usenix ATC'16 last week. It considers a timely important problem. With NoSQL databases, we got scalability, availability, and performance, but we lost secondary keys.  How do we put back the secondary indices, without compromising scalability, availability, and performance.

The paper mentions that previous work on Hyperdex  did a good job of re-introducing secondary keys to NoSQL, but with overhead: Hyperdex generates and partitions an additional copy of the datastore for each key. This introduces overhead for both storage and performance: supporting just one secondary key doubles storage requirements and write latencies.

Replex adds secondary keys to NoSQL databases without that overhead. The key insight of Replex is to combine the need to replicate for fault-tolerance and the need to replicate for index availability. After replication, Replex has both replicated and indexed a row, so there is no need for explicit indexing.

How does Replex work?

All replexes store the same data (every row in the table), the only difference across replexes is the way data is partitioned and sorted, which is by the sorting key of the index associated with the replex. Each replex is associated with a sharding function, h, such that h(r) defines the partition number in the replex that stores row r.

So, that was easy. But, there is an additional complication that needs to be dealt with. The difficulty arises because individual replexes can have requirements, such as uniqueness constraints, that cause the same operation to be both valid and invalid depending on the replex. Figure 2 gives an example scenario, linearizability requirement for a distributed log.

To deal with this problem, datastores with global secondary indexes need to employ a distributed transaction for update operations, because an operation must be atomically replicated as valid or invalid across all the indexes. But to use a distributed transaction for every update operation would cripple system throughput.

To remove the need for a distributed transaction in the replication protocol, they modify chain replication to include a consensus protocol. Figure 3 illustrates this solution. When the consensus phase (going to the right in Figure 3) reaches the last partition in the chain, the last partition aggregates each partition's decision into a final decision, which is simply the logical AND of all decisions. Then comes the replication phase, where the last partition initiates the propagation of this final decision back up the chain. As each partition receives this final decision, if the decision is to abort, then the partition discards that operation. If the decision is to commit, then that partition commits the operation to disk and continues propagating the decision.

This has similarities to the CRAQ protocol for chain replication. Linked is an earlier post that contains a summary of chain replication and CRAQ protocol.

Fault-tolerance

There is additional complexity due to failure of the replicas. Failed partitions bring up two concerns: how to reconstruct the failed partition and how to respond to queries that would have been serviced by the failed partition.

If a partition fails, a simple recovery protocol would redirect queries originally destined for the failed partition to the other replex. Then the failure amplification is maximal: the read must now be broadcast to every partition in the other replex, and at each partition, a read becomes a brute-force search that must iterate through the entire local storage of a partition.

On the other hand, to avoid failure amplification within a failure threshold f, one could introduce f replexes with the same sharding function, h; as exact replicas. There is no failure amplification within the failure threshold, because sharding is identical across exact replicas. But the cost is storage and network overhead in the steady-state.

This is the tradeoff, and the paper dedicates "Section 3: Hybrid Replexes" to explore this tradeoff space.

Concluding remarks

The paper compares Replex to Hyperdex and Cassandra and shows that Replex's steady-state performance is 76% better than Hyperdex and on par with Cassandra for writes. For reads, Replex outperforms Cassandra by as much as 2-9x while maintaining performance equivalent with HyperDex. In addition, the paper shows that Replex can recover from one or two failures 2-3x faster than Hyperdex.

Replex solves an important problem with less overhead than previous solutions. The hybrid replexes method (explained in Section 3) can also be useful in other problems for preventing failure amplification.