Monday, September 29, 2014

Paper Summary: High-availability distributed logging with BookKeeper

This paper is brought to you by the Yahoo Research group that developed the ZooKeeper, and it appeared in LADIS'12.

BookKeeper targets the logging problem. More specifically, the distributed logging problem where high-availability is important and where many distributed clients are interested in reading the logs.

Most current applications log to the local disk, but this constitutes a single point of failure (SPOF) and betrays high-availability. A hasty remedy is to write to an NFS partition to store log files remotely. But now the NFS server becomes the SPOF. (We can of course replicate the NSF server, but the performance would suffer.) Another solution is to use NetApp filers that implement RAID. This costs money, and still does not completely solve SPOF.

BookKeeper provides a no-SPOF efficient data store  for serving a large number of concurrent single-writer, multiple-reader logs. It stripes log entries across servers, leading to higher throughput. BookKeeper is opensource and is used in production systems.

BookKeeper presents two case studies: Hedwig and HDFS Namenode. Hedwig is a scalable topic-based publish-subscribe system. To guarantee the delivery of messages despite partitions and server failures, Hedwig uses logging to persist published messages, which is implemented with BookKeeper. Hedwig is in production use and serves push notifications for Yahoo! properties (e.g., notifications for mobile devices).

The other use case concerns replicating the HDFS Namenode, the component of HDFS (Hadoop Distributed File System) that manages the file system metadata. On each update, the Namenode writes synchronously to a journal to guarantee that the update is durable. But unfortunately the Namenode is a SPOF. To enable efficient journaling and strong durability through replication, BookKeeper is used for implementing a journal manager for HDFS. The implementation is currently part of the HDFS codebase.

BookKeeper design and architecture

BookKeeper has 3 main components:

  • A bookie is a BookKeeper storage server, and each bookie stores ledger fragments. A ledger is written across f+1 bookies for fault-tolerance and striping. 
  • BookKeeper client is used for interacting with bookies. 
  • Ledger abstracts a log file. It is a sequence of entries identified by a sequence number (id). 

BookKeeper assumes that there is only a single client writing to a ledger (clients can employ ZooKeeper coordination for this), and in return it guarantees that, once a ledger is closed, all other clients that read from it read the same sequence of entries.

Here is the happy path for BookKeeper. An application using BookKeeper initially designates a ledger writer.  This ledger writer creates a ledger and appends data to the ledger; only the ledger writer is able to append entries to the ledger. Eventually, after appending an arbitrary number of entries to the ledger, the ledger writer closes it. Once the ledger is closed, its content is immutable. Clients can open closed ledgers for reading and any individual ledger can have multiple readers over time, and even concurrent readers.

The main calls in the API enable applications to:

  • Create a ledger;
  • Add entries to a ledger;
  • Open a ledger for reading;
  • Read entries from a ledger;
  • Close a ledger to prevent further writes;
  • Delete a ledger.

All these calls have both a synchronous and an asynchronous version.
Creating and using a ledger.
When a client creates a ledger, it selects a set of bookies to form an ensemble for the ledger and stores the ensemble information as part of the ledger metadata on ZooKeeper. For each entry the ledger writer adds to the ledger, it replicates the entry across f+1 bookies. A request to add an entry e completes successfully if e has been successfully replicated across f+1 bookies. If a bookie crashes, then the client replaces that bookie. BookKeeper uses ZooKeeper to keep track of configuration changes for a ledger.

Closing a ledger.
When closing a ledger, the ledger writer writes to ZooKeeper the last entry that has been written successfully, as part of the ledger metadata. If a ledger writer crashes prematurely, before it closes its open ledger, a ledger reader would need to do ledger recovery.

Ledger recovery.
When a ledger reader opens a ledger for reading, it first obtains the ledger metadata. If it finds that it has not been closed by checking the state of the ledger, the ledger reader triggers a recovery procedure. The first step of recovery for a given ledger consists of having the reader client asking each bookie in the ensemble for the last add confirmed (LAC) field in the last entry that the bookie has processed for the ledger. Since reads are based on entry id, the recovery process can start reading from the highest LAC it receives, and thus it is not necessary to read the entire ledger.

Reading from an open ledger.
BookKeeper also enables clients to read from open ledgers. When clients need to read from an open ledger, they invoke a call to open the ledger that does not try to recover it if it is not closed. To avoid reading partially replicated entries from the ledger, which may not be in the ledger once it is closed, the client asks bookies for their LAC values. Reading entry i ≤ LAC is safe, since the ledger writer has marked it as successfully replicated.

Dealing with multiple ledgers

To enable recovery, upon each request to append an entry to a ledger, a bookie appends this entry to the journal and flushes the write to the local disk device. A bookie only acknowledges to the client once it receives a confirmation that the flush operation has completed successfully. Note that the journal is shared across all active ledgers the bookie is currently storing. A bookie also writes entries to the ledger device to serve read requests. Thus, read traffic does not affect the performance of writes to the journal device.

The ledger device stores ledger entries along with an index for each ledger. Bookie has a single file, called entry log, and interleaves entries of different ledgers by appending entries of all ledgers. For each ledger, Bookie also keeps in-memory an index mapping the entry identifier to its position in the entry log.

This design targets workloads dominated by writes, while not neglecting the performance of reads. Requests to add an entry to a ledger return as soon as the entry is flushed to the journal of a bookie, and writes to the ledger device are asynchronous and mostly sequential to enable the writes to this device to keep up with the writes to the journal device. To serve a read request, it is necessary to obtain the position of the entry in the entry log. If the index page is cached, then the read requires one disk seek.

BookKeeper stores metadata on ZooKeeper.
The ledger metadata includes the ensemble composition of ledgers, write quorum size, ledger status, the last entry successfully written to a closed ledger. For the metadata store, BookKeeper uses ZooKeeper. "A different, more scalable data store becomes necessary when the number of active ledgers is of the order of tens to hundreds of millions." For the availability of bookies, BookKeeper relies upon ZooKeeper because it provides ephemeral znodes and watches.


Experiments are conducted using a cluster of identical machines: 2 Quad Core Intel Xeon 2.5Ghz, 16GB of RAM, one 1 Gbit/s network interface, and four SATA drives of 1TB and rotational speed of 7200 RPM. Each machine in the cluster mounts an enterprise class filer via NFS (NetApp FAS3050). This hardware gives a raw performance of 1.2 milliseconds for the latency of add operations and 22.5k adds/sec for 1 kbyte entries when writing to a single bookie. nE-qQ denotes a ledger configuration with ensemble size n and write quorum q.

Using a 3E-2Q configuration, Figure shows throughput and latency for a single client as the maximum number of outstanding operations is increased. This leads to a higher throughput, in particular for 128-byte entries. No batching tricks employed to improve throughput, the processing of an operation is triggered by the call.

Here 12 clients write simultaneously to a set of bookies, and the aggregate throughput is measured. Compared to the results for a single client writer, the aggregate throughput is substantially higher for shorter entries. For longer entries, throughput is limited by the speed with which bookies are able to write to disk, so adding more bookies to the pool (configurations with 6E) results in increased throughput.


BookKeeper resembles chain replication a little. The chain replication approach is to export consensus to Paxos, and only store data providing high throughput. BookKeeper also does that, but chain replication is not referred to in the paper at all. Of course, chain replication lacks striping, and does not by default provide disjoint read replicas (in addition to write replicas) to improve read throughput.

The Tango paper mentions BookKeeper and states that it has an implementation of BookKeeper in 300 lines. How would you implement BookKeeper in Tango?  What can you speculate about the performance of BookKeeper versus TangoBookKeeper? Could you implement Tango using BookKeeper? How about transactions?

After reading the paper, I was kept with this question. What happens if the bookie writes the entry to its journal and acknowledges it, but dies before asynchronously writing this entry to its ledger? Does this cause any problems?

Final remarks
The paper does not talk about consistency of logging, because every consistency concern is exported to the ZooKeeper. I guess we can chalk this up as success points for ZooKeeper. BookKeeper's bottleneck for WAN deployment is ZooKeeper. If ZooKeeper is consulted infrequently things are OK. But if ZooKeeper is consulted frequently for LAC information in order to read from open ledgers, performance suffers.

Related links:
Flavio's blog post on BookKeeper
Flavio's presentation on BookKeeper

Sunday, September 28, 2014

Paper summary: Tango: Distributed Data Structures over a Shared Log

This paper is from the Microsoft Research Silicon Valley (which unfortunately recently got closed), and it appeared in SOSP'13. SOSP'13 provides open access, so here is the pdf for free. The talk video is also on YouTube as part of this SOSP'13 talks playlist. I think this paper didn't get the attention it deserves. It is really a great piece of work.

To facilitate construction of highly available metadata services, Tango provides developers with the abstraction of a replicated in-memory data structure (such as a map or a tree) backed by a shared log.

While ZooKeeper provides developers a fixed data structure (the data tree) for building coordination primitives, Tango enables clients to build different data structures based on the same single shared log. Tango also provides transactions across data structures.

The state of a Tango object exists in two forms. 1) a history: which is an ordered sequence of updates stored durably in the shared log, 2) any number of views: which are full or partial copies of the data structure --such as a tree or a map-- constructed from the log and stored in RAM on clients (i.e., application servers).

A client modifies a Tango object by appending a new update to the history; it accesses the object by first synchronizing its local view with the history. Views are soft state and are instantiated, reconstructed, and updated on clients by playing the shared history forward.

In Tango, the shared log provides: consistency, durability, history. Tango also provides atomicity and isolation for transactions across different objects by multiplexing & storing them on a single shared log.

Corfu shared log abstraction

Tango builds on the Corfu shared log abstraction, which employs flash disks to alleviate the concerns about the read from the history of the log, while writes are going on at the head of the log.

The CORFU interface consists of 4 calls:

  1. Clients can append entries to the shared log, obtaining an offset in return.
  2. They can check the current tail of the log. 
  3. They can read the entry at a particular offset.
  4. Clients can trim a particular offset in the log for garbage collection.
Corfu organizes a cluster of storage nodes into multiple, disjoint replica sets; for example, a 12-node cluster might consist of 4 replica sets of size 3. Each individual storage node exposes a 64-bit write-once address space, mirrored across the replica set. The cluster also contains a dedicated sequencer node, which is essentially a networked counter storing the current tail of the shared log.

To append, a client contacts the sequencer and obtains the next free offset in the global address space of the shared log. It then maps this offset to a local offset on one of the replica sets using a simple deterministic mapping (e.g., modulo function) over the membership of the cluster. The client then completes the append by directly issuing writes to the storage nodes in the replica set using a client-driven variant of Chain Replication.

The sequencer is merely an optimization to find the tail of the log and not required for correctness. The Chain Replication variant used to write to the storage nodes guarantees that a single client will "win" if multiple clients attempt to write to the same offset. When the sequencer goes down, any client can easily recover this state using the slow check operation on the shared log.

The Tango architecture

There are 3 components to a Tango object. 1) A Tango object contains the view, which is an in-memory representation of the object in some form, such as a list or a map. E.g., for TangoRegister this state is a single integer. 2) Each object implements the mandatory apply upcall which changes the view when the Tango runtime calls it with new entries from the log. By customizing the apply implementation, one client can build a "tree view" while another builds a "set view" reading from the same log. 3) Each object exposes an external interface of object-specific mutator and accessor methods; e.g., the TangoRegister exposes read/write methods.
The object's mutators do not directly change the in-memory state of the object. Instead, each mutator combines its parameters into an opaque buffer --an update record-- and calls the update helper function of the Tango runtime, which appends it to the shared log.

Similarly, the accessors do not immediately read the object's state. Each accessor first calls the query helper before returning an arbitrary function over the state of the object. The query helper plays new update records in the shared log until its current tail and applies them to the object via the apply upcall before returning.

Storing multiple objects on a single shared log enables strongly consistent operations across them without requiring complex distributed protocols.  The Tango runtime on each client can multiplex the log across objects by storing and checking a unique object ID (OID) on each entry. Such a scheme has the drawback that every client has to play every entry in the shared log, but layered partitioning, as we shall discuss soon, solves this problem. It enables strongly consistent operations across objects without requiring each object to be hosted by each client, and without requiring each client to consume the entire shared log.


Tango implements optimistic concurrency control by appending speculative transaction commit records to the shared log.  Commit records ensure atomicity, since they determine a point in the persistent total ordering at which the changes that occur in a transaction can be made visible at all clients. To provide isolation, each commit record contains a read set: a list of objects read by the transaction along with their versions, where the version is simply the last offset in the shared log that modified the object. A transaction only succeeds if none of its reads are stale when the commit record is encountered (i.e., the objects have not changed since they were read).

To denote a transaction, calls to object accessors and mutators can be bracketed by BeginTX and EndTX calls. BeginTX creates a transaction context in thread-local storage. EndTX appends a commit record to the shared log, plays the log forward until the commit point, and then makes a commit/abort decision.

Each client that encounters the commit record decides --independently but deterministically-- whether it should commit or abort by comparing the versions in the readset with the current versions of the objects. If none of the read objects have changed since they were read, the transaction commits and the objects in the write set are updated with the apply upcall.

For read-only transactions, the EndTX call does not insert a commit record into the shared log; instead, it just plays the log forward until its current tail before making the commit/abort decision. Tango also supports fast read-only transactions from stale snapshots by having EndTX make the commit/abort decision locally, without interacting with the log.

Write-only transactions require an append on the shared log but can commit immediately without playing the log forward.

Layered partitions

Each client hosts a (possibly overlapping) partition of the global state of the system, but this partitioning scheme is layered over a single shared log.  To efficiently implement layered partitions without requiring each client to play the entire shared log, Tango maps each object to a stream over the shared log.

A stream augments the conventional shared log interface (append and random read) with a streaming readnext call.  Many streams can co-exist on a single shared log; calling readnext on a stream returns the next entry belonging to that stream in the shared log, skipping over entries belonging to other streams. With this interface, clients can selectively consume the shared log by playing the streams of interest to them (i.e., the streams of objects hosted by them).

Each client plays the streams belonging to the objects in its layered partition. But, streams are not necessarily disjoint; a multiappend call allows a physical entry in the log to belong to multiple streams. When transactions cross object boundaries, Tango changes the behavior of its EndTX call to multiappend the commit record to all the streams involved in the write set. Multiappend ensures the following. A transaction that affects multiple objects occupies a single position in the global ordering; in other words, there is only one commit record per transaction in the raw shared log. A client hosting an object sees every transaction that impacts the object, even if it hosts no other objects.

Tango transactions has the following limitation though. Remote reads at the generating client is disallowed in a transaction: a client cannot execute transactions and generate commit records involving remote reads. Calling an accessor on an object that does not have a local view is problematic, since the data does not exist locally; possible solutions by invoking an RPC to a different client with a view of the object is expensive and complicated. So, if a client wants to do a transaction with reads on an object, the client should subscribe to the stream of that object.

Streaming Corfu

When the client-side library starts up, the application provides it with the list of stream IDs of interest to it. For each such stream, the library finds the last entry in the shared log belonging to that stream by asking the sequencer. The K backpointers in this entry allow it to construct a K-sized suffix of the linked list of offsets comprising the stream. It then issues a read to the offset pointed at by the Kth backpointer to obtain the previous K offsets in the linked list. In this manner, the library can construct the linked list by striding backward on the log, issuing N/K reads to build the list for a stream with N entries.


The experimental testbed consists of 36 8-core machines in two racks, with gigabit NICs on each node and 20 Gbps between the top-of-rack switches.  In all the experiments, they run an 18-node Corfu deployment on these nodes in a 9-by-2 configuration (i.e., 9 sets of 2 replicas each), such that each entry is mirrored across racks. The other 18 nodes are used as clients. The Corfu sequencer runs on a powerful, 32-core machine in a separate rack. They use 4KB entries in the Corfu log, with a batch size of 4 at each client.
Figure shows single object serializability. Reads wait the apply upcalls from the stream. If no writes, the reads are of little cost. As more writes occur, reads take more time to catch up. Probably reads may take more time than writes in Tango, but this is not shown in the graphs.
Figure shows performance for a primary/backup scenario where two nodes host views of the same object, with all writes directed to one node and all reads to the other. Overall throughput falls sharply as writes are introduced, and then stays constant at around 40K ops/sec as the workload mix changes; however, average read latency goes up as writes dominate, reflecting the extra work the read-only 'backup' node has to do to catchup with the primary.
Figure shows elasticity of linearizable read throughput with multiple views.

Figure shows transactions over layered partitions.

Tango vs. ZooKeeper.
Using Tango, the authors build ZooKeeper (TangoZK, 1K lines), BookKeeper (TangoBK, 300 lines), TreeSets and HashMaps (100 to 300 lines each). The performance of the resulting implementation is very similar to the TangoMap numbers in Figure 10; for example, with 18 clients running independent namespaces, they obtain around 200K txes/sec if transactions do not span namespaces, and nearly 20K txes/sec for transactions that atomically move a file from one namespace to another. The capability to move files across different instances does not exist in ZooKeeper, which supports a limited form of transaction within a single instance (i.e., a multi-op call that atomically executes a batch of operations).

They also implemented the single-writer ledger abstraction of BookKeeper in around 300 lines of Java code (again, not counting Exceptions and callback interfaces). To verify that their ZooKeeper and BookKeeper were full-fledged implementations, they ran the HDFS namenode over them (modifying it only to instantiate our classes instead of the originals) and successfully demonstrated recovery from a namenode reboot as well as fail-over to a backup namenode.


Tango fits within the State Machine Replication (SMR) paradigm, replicating state by imposing a total ordering over all updates. In the vocabulary of SMR, Tango clients can be seen as learners of the total ordering. The storage nodes comprising the shared log play the role of acceptors.

The findings in the Tango paper that a centralized server can be made to run at very high RPC rates matches recent observations by others. The Percolator system runs a centralized timestamp oracle with similar functionality at over 2M requests/sec with batching. Vasudevan et al. (SOCC'12) report achieving 1.6M submillisecond 4-byte reads/sec on a single server with batching. Masstree is a key-value server that provides 6M queries/sec with batching.

Tango's biggest contribution is that it provides multiple consistent object views from the same log. Objects with different in-memory data structures can share the same data on the log. For example, a namespace can be represented by different trees, one ordered on the filename and the other on a directory hierarchy, allowing applications to perform two types of queries efficiently (i.e., "list all files starting with the letter B" vs. "list all files in this directory"). Strongly consistent reads can be scaled simply by instantiating more views of the object on new clients. But is this free? Is this fast?

Tango's soft-belly is that it uses a pull-based approach of constructing the view from the shared log. Wouldn't a push-based approach be more timely? When a read comes, the pull-based approach may have a lot of catching up to do to the current state before it returns an answer. I guess it may be possible to simulate this with periodic pulls, even when no accessor function is invoked.

Tango provides a weird combination of centralized and decentralized. The log is centralized and this is exploited to provide serialization of distributed transactions. On the other hand, not having a master node and using the clients as learners is a very decentralized approach. Instead of one master taking decisions and updating the data structure, all of the clients are playing the log and taking decisions (in a deterministic way ensuring that they all make the same decisions), and updating their data structures. This resembles Lamport's extremely decentralized (to a fault!) implementation of the mutual exclusion which maintains replicated queues of all requests at all processes. (Of course, you can always code one client as master learner/decision-maker for other clients, and circumvent this!)

Tango vs. ZooKeeper.
Tango provides a better/higher-level programming support than ZooKeeper. What the Tango paper calls as Tango clients are servers that provide services for application-clients. (You may even say a Tango-client roughly corresponds to a "customized-view" ZooKeeper observer.) So, in terms of programmability and expressivity, Tango has the upper-hand. I presume using ZooKeeper for large-scale applications may become intractable and may result in spaghetti-code since ZooKeeper provides a very minimalistic/low-level-primitives for coordination. Tango, on the other hand, lets the developer build higher level abstractions of their own coordination services at the Tango-clients, and this benefits managing large projects while keeping complexity on a leash.

Comparing the efficiency of Tango and ZooKeeper, it seems like ZooKeeper would be better. In Tango, there are couple of indirections that are not present in ZooKeeper. In Tango, there is an extra step for sequencer node to get ticket/offset number. The Tango replication can correspond to ZooKeeper/Zab replication so they equal out there. But, Tango has another layer of indirection, where the clients need to read and learn from the log. In ZooKeeper, since the leader is also the decision maker, the app-client's learning can be from relatively compact state, whereas in Tango, this will be through replaying a sequence of commands and by constructing the state itself. Again, since Tango-client is like the ZooKeeper observer, that is another level of indirection before going to the app-client in Tango. So in total, two extra-levels are present in Tango (the sequencer contacting, and the Tango-client learning) that are not present in ZooKeeper. Tango provides better programmability and expressivity but this comes with a trade-off at the performance.

If your application is simple (and will remain simple), and can be implemented using ZooKeeper in a straightforward manner, it would be best to use ZooKeeper. Otherwise, by using Tango, you can have a better/extendible/tractable code-base, and potentially write some of your services as Tango-client that can even improve the performance.

Final remarks

Tango code is not open source. That is really unfortunate, as it could provide a good alternative to ZooKeeper for some applications that require coordination and transactions across distributed clients.

Since the sequencer is centralized Tango is not suitable for WAN deployments.

Some questions still remain. The stream sharing assignments seems to be done statically using the layered stream abstraction API. Can we do this on demand and dynamically?

How is the layered stream abstraction implemented at CORFU level over the replica groups? Would it pay to dedicate one group for one popular stream? This would make bulk reading possible from that replica set. (Similar to the columnar storage idea.)

Friday, September 19, 2014

Revisiting the EWDs

Dijkstra was the original hipster. He was blogging before blogging was cool. "For over four decades, he mailed copies of his consecutively numbered technical notes, trip reports, insightful observations, and pungent commentaries, known collectively as EWDs, to several dozen recipients in academia and industry. Thanks to the ubiquity of the photocopier and the wide interest in Dijkstra’s writings, the informal circulation of many of the EWDs eventually reached into the thousands." And, thanks to the efforts of the University of Texas at Austin CS Department, all of these EWDs have been accessible to the public conveniently.

I remember when I first discovered the EWDs as a fresh graduate student. I was mesmerized. I read them with a lot of joy. It was as if a new world had opened to me to discover. He had many insightful observations. I recommend all CSE graduate students to read the EWDs to grow their minds.

Now, I don't agree with Dijkstra on everything. He was too much of a perfectionist, and believed in getting things right in one shot. He had this to say on this:
There are very different programming styles. I tend to see them as Mozart versus Beethoven. When Mozart started to write, the composition was finished. He wrote the manuscript and it was 'aus einem Guss' (from one cast). In beautiful handwriting, too. Beethoven was a doubter and a struggler who started writing before he finished the composition and then glued corrections onto the page. In one place he did this nine times. When they peeled them, the last version proved identical to the first one.

In contrast to Dijkstra's position, I believe in rapid prototyping and that perfection comes from iteration.

Of course I still adore all the EWDs and respect Dijkstra all the same. I mean, look at these gems in the Wikiquotes page for Dijkstra:

  • It is not the task of the University to offer what society asks for, but to give what society needs.
  • The required techniques of effective reasoning are pretty formal, but as long as programming is done by people that don't master them, the software crisis will remain with us and will be considered an incurable disease. And you know what incurable diseases do: they invite the quacks and charlatans in, who in this case take the form of Software Engineering gurus.
  • Elegance is not a dispensable luxury but a quality that decides between success and failure.
  • The problems of the real world are primarily those you are left with when you refuse to apply their effective solutions.

Some of his writings can be construed as starting a flamewar (Are "Systems people" really necessary?  :-). But he always had an important point to make. In some of his EWDs, he role-played as the "Chairman of the Board" of the fictitious Mathematics Inc., "a company that commercialized mathematical theorems the same way that software companies commercialized computer programs". He did this to show how ridiculous it is to patent a theorem, algorithm, or code.

And then there is this: "The cruelty of teaching computer science."

This is a 30 page handwritten (beautifully) manifesto against the state of CS teaching then, which unfortunately got worse in the following years. The manifesto finishes with a bang!
Teaching to unsuspecting youngsters the effective use of formal methods is one of the joys of life because it is so extremely rewarding. Within a few months, they find their way in a new world with a justified degree of confidence that is radically novel for them; within a few months, their concept of intellectual culture has acquired a radically novel dimension. To my taste and style, that is what education is about. Universities should not be afraid of teaching radical novelties; on the contrary, it is their calling to welcome the opportunity to do so. Their willingness to do so is our main safeguard against dictatorships, be they of the proletariat, of the scientific establishment, or of the corporate elite.

And about Microsoft's closing of the MS Research at Silicon Valley:

Paper summary: Can a decentralized metadata service layer benefit parallel filesystems?

Parallel filesystems do a good job of providing parallel and scalable access to the data transfer, but, due to consistency concerns, the metadata accesses are still directed to one metadata server (MDS) which becomes a bottleneck. This is a problem for scalability because studies show that over 75% of all filesystem calls require access to file metadata.

This paper proposes to adopt ZooKeeper as a decentralized MDS for parallel filesystems and test whether that improves performance. You can ask, what is decentralized about ZooKeeper, and you would be right about the update requests. But for read requests, ZooKeeper helps by allowing any ZooKeeper server to respond while guaranteeing consistency. (You would still need to do a sync operation if the request needs the read to be freshest and satisfy precedence order.)

If you recall, ZooKeeper uses a filesystem API to enable clients to build higher-level coordination primitives (group membership, locking, barrier sync). This paper is interesting because it takes ZooKeeper and uses it directly as a metadata server for a filesystem leveraging the filesystem API ZooKeeper exposes in a literal manner. FUSE is used to act as a glue between the ZooKeeper as MDS and the underlying physical storage filesystem.

Distributed Union FileSystem (DUFS) architecture

A DUFS client instance does not interact directly with other DUFS clients; Any necessary interaction is made through ZooKeeper service. The figure shows the basic steps required to perform an open() operation on a file using DUFS.

  • A. The open() call is intercepted by FUSE which gives the virtual path of the file to DUFS.
  • B. DUFS queries ZooKeeper to get the Znode based on the filename and to retrieve the file id (FID).
  • C. DUFS uses the deterministic mapping function to find the physical path associated to the FID.
  • D. Finally, DUFS opens the file based on its physical path. The result is returned to the application via FUSE.

Alternatively, directory operations take place only at the metadata level, so only ZooKeeper is involved and not the back-end storage. For example, the directory stat() operation is satisfied at the Zookeeper itself (the back-end storage is not contacted) since we maintain the entire directory hierarchy in Zookeeper.


These tests were performed on a Linux cluster. Each node has a dual Intel Xeon E5335 CPU (8 cores in total) and 6GB memory. A SATA 250GB hard drive is used as the storage device on each node. The nodes are connected with 1GigE.

I am bugged by some of the limitations of the evaluation. In these tests the ZooKeeper servers are colocated (running on the same node) as the DUFS client. This naturally achieves wonders for read request latencies! But this is not a very reasonable set up. Moreover, the clients are not under the control of DUFS, so it is not a good idea to deploy your ZooKeeper servers on clients which are uncontrolled and can disconnect any time. Finally, this disallows clients from faraway. Of course ZooKeeper does not scale to WAN environment, and all the tests are done in a controlled cluster environment.


This paper investigates an interesting idea, that of using ZooKeeper as MDS of parallel filesystems to provide some scalability to the MDS. Thanks to the advantages of ZooKeeper, this allows improved read access because those can be served consistently from any ZooKeeper server. And, due to limitations of the ZooKeeper, this fails to address the scalability of update requests (throughput of update operations actually decrease as the number of ZooKeeper replicase increase) and also lacks the scalability needed for WAN deployments. Another limitation of this approach is that the metadata need to be able to fit into a single ZooKeeper server (and of course also the ZooKeeper replicas), so there is a scalability problem with respect to the filesystem size as well.

We are working on a scalable WAN version of ZooKeeper, and we will use the parallel filesystems as our application to showcase a WAN filesystem leveraging our prototype coordination system.

Friday, September 12, 2014

Distributed system seminar talk: Data grouping framework for energy-efficiency in distributed storage systems

My research group and Tevfik's research group meet jointly for a weekly distributed systems. This gives our students a chance to give talks about current project and get feedback for improvement in a friendly setting.

In this week's seminar, Luigi presented his research on building energy-efficient file systems. I was initially skeptical about energy-efficiency as a research topic. Academicians like to work on things that they can quantify and improve, so I was thinking that energy-efficiency in distributed storage was an opportunistic research problem, rather than a real-world problem. Turns out, I couldn't be any more wrong: IT companies spend $10 billions every year on energy consumption (This is 3% of entire expenditure of US!). $3.5 billion of that $10 billion is energy expenditure is due to the storage systems.

Dynamic power management (DPM) is the primary mechanism for energy saving at the storage systems. DPM basically means turn the disk off if you're not using it. An idling disk spends energy because it is still rotating, and this mechanic motion which burns energy. But turning a disc off is not easy. It takes 10s of seconds to stop and start hard disk, and the energy usage spikes at these transition points. This makes the problem into an optimization problem. When is it beneficial to turn the disk off? How can you create gaps long enough to turn off the disk?

The literature discusses the following DPM-enabling techniques for energy-saving in storage systems. Most of these techniques prescribe data access locality improvements.

1) Memory and disk caching: Caching is not only good for providing low-latency but also in some cases good for saving energy. If we can use cache to answer instead of turning on the disk, we can give the disk more time to sleep. But what should be the cache size? If it is too small, data won't fit, this won't provide much/any saving. If it is too large, the cache itself may consume more energy than it saves.

2) Diverting accesses: Data is stored redundantly, so this gives us the opportunity to spin down some redundant disks by diverting the accesses to the already active/hot ones. Unsurprisingly, there is a tradeoff of increased latency in doing so. By limiting concurrency/parallelism you increase latency of replies. (Is energy-efficiency versus latency a fundamental tradeoff in distributed storage?) Maybe, by offering well-drafted SLA agreements to the clients, it is possible to give incentive to the client for trading energy efficiency for slightly increased latency.

3) Popular data clustering: This technique prescribes organizing the disk storage based on the previously observed access locality of data. So if a disk is hot, it is likely to stay hot, and if a disk gets cold, it is likely to stay cold and it can sleep.

I guess there also could be orthogonal techniques if you don't need to serve requests in real-time. For those cases you have the opportunity to batch-schedule accesses.

Luigi is working on a hybrid of these techniques to provide as much energy-efficiency as possible. I wouldn't have thought energy-efficiency for distributed storage could be this interesting. There might even be a couple distributed algorithms problem here that I would enjoy.

Paper summary: ZooKeeper: Wait-free coordination for Internet-scale systems

Zookeeper is an Apache project for providing coordination services to distributed systems. ZooKeeper aims to provide a simple kernel (a filesystem API!) for empowering the clients to build more complex coordination primitives. In this post I will provide a summary of the ZooKeeper paper, and talk about some future directions I can see this going.

"Client" denotes a user of the ZooKeeper service, "server" denotes a process providing the ZooKeeper service, and "znode" denotes an in-memory data node (similar to the filesystem inode) in the ZooKeeper. znodes are organized in a hierarchical namespace referred to as the data tree.
There are 2 types of znodes. "Regular": Clients manipulate regular znodes by creating and deleting them explicitly. "Ephemeral": Clients create ephemeral znodes, and they either delete them explicitly, or let the system delete them automatically when the client's session termination. Additionally, when creating a new znode, a client can set a "Sequential" flag. Nodes created with the sequential flag set have the value of a monotonically increasing counter appended to its name. If n is the new znode and p is the parent znode, then the sequence value of n is never smaller than the value in the name of any other sequential znode ever created under p.

ZooKeeper also implements "watches" on znodes to allow clients to receive timely notifications of changes without requiring polling.

The API ZooKeeper provides to the clients

create(path, data, flags)
delete(path, version)  // operation is conditional on version (if provided)
exists(path, watch)
getData(path, watch)
setData(path, data, version) // operation is conditional on version (if provided)
getChildren(path, watch)

All methods in the API have both a synchronous and an asynchronous version. A client uses the synchronous API when it needs to execute a single ZooKeeper operation and it has no concurrent tasks to execute, so it makes the necessary ZooKeeper call and blocks. The asynchronous API enables a client to have both multiple outstanding ZooKeeper operations and other tasks executed in parallel. ZooKeeper guarantees that the corresponding callbacks for each operation are invoked in order.

Using ZooKeeper to implement coordination primitives

Configuration Management: The configuration is stored in a znode, zc. Processes start up with the full pathname of zc. Starting processes obtain their configuration by reading zc with the watch flag set to true. If the configuration in zc is ever updated, the processes are notified and read the new configuration, again setting the watch flag to true.

Rendezvous: When the master starts it fills in zr with information about addresses and ports it is using. When workers start, they read zr with watch set to true. If zr has not been filled in yet, the worker waits to be notified when zr is updated.

Group Membership: A znode, zg, is created to represent the group. When a process member of the group starts, it creates an ephemeral child znode under zg. If the process fails or ends, the znode that represents it under zg is automatically removed. Processes may put process information in the data of the child znode, e.g., addresses and ports used by the process. Processes may obtain group information by simply listing the children of zg. If a process wants to monitor changes in group membership, the process can set the watch flag to true and refresh the group information (always setting the watch flag to true) when change notifications are received.

Simple locks: To acquire a lock, a client tries to create the designated znode with the EPHEMERAL flag. If the create succeeds, the client holds the lock. Otherwise, the client can read the znode with the watch flag set. A client releases the lock explicitly or it is removed by timeout if it dies. Other clients that are waiting for a lock try again to acquire a lock once they observe the znode being deleted.

Simple Locks without Herd Effect: All the clients requesting the lock are lined up and each client obtains the lock in order of request arrival.
To lock: 
1 n = create(l + “/lock-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for watch event 6 goto 2

To unlock:
1 delete(n)

Read/Write Locks: The lock procedure is changed slightly to include separate read lock and write lock procedures.
Write Lock
1 n = create(l + “/write-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for event 6 goto 2

Read Lock
1 n = create(l + “/read-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if no write znodes lower than n in C, exit
4 p = write znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 3

You can build even more powerful coordination primitives using ZooKeeper, and a Python binding is also made available here.

Zookeeper applications at Yahoo!: ZooKeeper is used for the Fetching Service (FS) to achieve recovering from failures of masters, guaranteeing availability despite failures, and decoupling the clients from the servers, and allowing them to direct their request to healthy servers by just reading their status from ZooKeeper. FS uses ZooKeeper mainly to manage configuration metadata. FS is read-heavy, 10:1 to 100:1. As another example, Yahoo! Message Broker (YMB), a distributed publish-subscribe system, uses ZooKeeper to manage the distribution of topics (configuration metadata), deal with failures of machines in the system (failure detection and group membership), and control system operation.

Other practical uses of Zookeeper has been explained nicely here.

ZooKeeper architecture/internals

The replicated database is an in-memory database containing the entire data tree. Each znode in the tree stores a maximum of 1MB of data by default. For recoverability, ZooKeeper efficiently logs updates to disk, and forces writes to be on the disk media before they are applied to the in-memory database.

Every ZooKeeper server services clients. Clients connect to exactly one server to submit its requests. Read requests are serviced from the local replica of each server database.

Requests that change the state of the service, write requests, are processed by an agreement protocol. As part of the agreement protocol write requests are forwarded to a single server, called the leader. The rest of the ZooKeeper servers, called followers, receive message proposals consisting of state changes from the leader and agree upon state changes. This is similar to how Paxos works.

ZooKeeper's atomic broadcast protocol (Zab) uses by default simple majority quorums to decide on a proposal, so Zab and thus ZooKeeper can only work if a majority of servers are correct (i.e., with 2f + 1 server we can tolerate f failures). Zab guarantees that changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes.

More specifically, Zab/ZooKeeper provides both of these two basic ordering guarantees:
Linearizable writes: all requests that update the state of ZooKeeper are serializable and respect precedence.
FIFO client order: all requests from a given client are executed in the order that they were sent by the client.

ZooKeeper vs Paxos

ZooKeeper provides FIFO client order property, but Paxos doesn't. Paxos may violate the FIFO client property as follows.

Proposer P1 executes Phase 1 for sequence numbers 27 and 28. It proposes values A and B for sequence numbers 27 and 28, respectively, in Phase 2 with ballot number 1. Both proposals are accepted only by acceptor A1. Proposer P2 executes Phase 1 against acceptors A2 and A3, and end up proposing C in Phase 2 to sequence number 27 with ballot number 2. Finally, proposer P3, executes Phase 1 and 2, and is able to have a quorum of acceptors choosing C for sequence number 27, B for sequence number 28, and D for 29.

ZooKeeper argues that such a run is not acceptable because the state change represented by B causally depends upon A, and not C. Consequently, B can only be chosen for sequence number i+1 if A has been chosen for sequence number i, and C cannot be chosen before B, since the state change that B represents cannot commute with C and can only be applied after A.

Client server interaction

When a server completes a write operation, it also sends out and clears notifications relative to any watch that corresponds to that update. Servers process the writes the leader server sends in order and do not process other writes or reads concurrently in order to ensure strict succession of notifications. Note that servers handle notifications locally. Only the server that a client is connected to tracks and triggers notifications for that client.

One drawback of using fast reads (local reads at one server) is not guaranteeing precedence order for read operations. That is, a read operation may return a stale value, even though a more recent update to the same znode has been committed. Not all applications require precedence order, but for applications that do require it, the sync primitive is used. To guarantee that a given read operation returns the latest updated value, a client calls sync before the read operation. Sync flushes the pipes so to speak. The FIFO order guarantee of client operations together with the global guarantee of sync enables the result of the read operation to reflect any changes that happened before the sync was issued.

Read requests are handled locally at each server. Each read request is tagged with a zxid that corresponds to the last transaction seen by the server. ZooKeeper servers process requests from clients in FIFO order; responses include the zxid that the response is relative to. Even heartbeat messages during intervals of no activity include the last zxid seen by the server that the client is connected to. This zxid defines the partial order of the read requests with respect to the write requests. If the client connects to a new server, that new server ensures that its view of the ZooKeeper data is at least as recent as the view of the client by checking the last zxid of the client against its last zxid. If the client has a more recent view than the server, the server does not reestablish the session with the client until the server has caught up.

To detect client session failures, ZooKeeper uses time-outs. To prevent the session from timing out, the ZooKeeper client library sends a heartbeat after the session has been idle for s/3 ms and switch to a new server if it has not heard from a server for 2s/3 ms, where s is the session timeout in milliseconds.


The evaluation is performed on a cluster of 50 servers. For the target workloads, 2:1 to 100:1 read to write ratio, it is shown that ZooKeeper can handle tens to hundreds of thousands of transactions per second. Each client has at least 100 requests outstanding. Each request consists of a read or write of 1K of data.

As you add ZooKeeper servers, the read throughput improves, bu the write throughput degrades. This is because atomic broadcast needs to be done via Zab. Also the servers need to ensure that transactions are logged to non-volatile store before sending acknowledgments back to the leader.


ZooKeeper provides a minimalist and flexible coordination system and found a lot of use in production distributed systems. Zookeeper scales well with increase in read operations, but does not with increase in write operations. Zookeeper also does not scale with more Zookeeper replicas added. To alleviate this observer replicas are used, but they are limited in operation, and do not allow/benefit write operations. Finally, due to very large latencies involved ZooKeeper cannot handle across the WAN deployment of ZooKeeper servers.

In most places ZooKeeper is punting the ball to the clients. Yes, this is due to minimalistic design and such, but this burdens the clients to solve the transactional update themselves, and we know that this is error-prone. Maybe this is really the way to go. Or maybe this is the soft-belly of ZooKeeper and a big opportunity to provide a new coordination tool.

ZooKeeper is a great start, but we are just at the beginning.

Exercise questions

How does ZooKeeper implement/provide ephemeral nodes?

How can you implement distributed counters without using sequential flag?

How can you implement general purpose transactions on ZooKeeper?
Is ZooKeeper enough to implement general transactions? What is missing?

Why not use ZooKeeper for serializing and reliably storing all data? Why is it a bad idea to use ZooKeeper for maintaining application logs?

Related links

High-availability distributed logging with BookKeeper
Apache Curator project maintains most common ZooKeeper client algorithms

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...