## Sunday, September 28, 2014

### Paper summary: Tango: Distributed Data Structures over a Shared Log

This paper is from the Microsoft Research Silicon Valley (which unfortunately recently got closed), and it appeared in SOSP'13. SOSP'13 provides open access, so here is the pdf for free. The talk video is also on YouTube as part of this SOSP'13 talks playlist. I think this paper didn't get the attention it deserves. It is really a great piece of work.

To facilitate construction of highly available metadata services, Tango provides developers with the abstraction of a replicated in-memory data structure (such as a map or a tree) backed by a shared log.

While ZooKeeper provides developers a fixed data structure (the data tree) for building coordination primitives, Tango enables clients to build different data structures based on the same single shared log. Tango also provides transactions across data structures.

The state of a Tango object exists in two forms. 1) a history: which is an ordered sequence of updates stored durably in the shared log, 2) any number of views: which are full or partial copies of the data structure --such as a tree or a map-- constructed from the log and stored in RAM on clients (i.e., application servers).

A client modifies a Tango object by appending a new update to the history; it accesses the object by first synchronizing its local view with the history. Views are soft state and are instantiated, reconstructed, and updated on clients by playing the shared history forward.

In Tango, the shared log provides: consistency, durability, history. Tango also provides atomicity and isolation for transactions across different objects by multiplexing & storing them on a single shared log.

## Corfu shared log abstraction

Tango builds on the Corfu shared log abstraction, which employs flash disks to alleviate the concerns about the read from the history of the log, while writes are going on at the head of the log.

The CORFU interface consists of 4 calls:

1. Clients can append entries to the shared log, obtaining an offset in return.
2. They can check the current tail of the log.
3. They can read the entry at a particular offset.
4. Clients can trim a particular offset in the log for garbage collection.
Corfu organizes a cluster of storage nodes into multiple, disjoint replica sets; for example, a 12-node cluster might consist of 4 replica sets of size 3. Each individual storage node exposes a 64-bit write-once address space, mirrored across the replica set. The cluster also contains a dedicated sequencer node, which is essentially a networked counter storing the current tail of the shared log.

To append, a client contacts the sequencer and obtains the next free offset in the global address space of the shared log. It then maps this offset to a local offset on one of the replica sets using a simple deterministic mapping (e.g., modulo function) over the membership of the cluster. The client then completes the append by directly issuing writes to the storage nodes in the replica set using a client-driven variant of Chain Replication.

The sequencer is merely an optimization to find the tail of the log and not required for correctness. The Chain Replication variant used to write to the storage nodes guarantees that a single client will "win" if multiple clients attempt to write to the same offset. When the sequencer goes down, any client can easily recover this state using the slow check operation on the shared log.

## The Tango architecture

There are 3 components to a Tango object. 1) A Tango object contains the view, which is an in-memory representation of the object in some form, such as a list or a map. E.g., for TangoRegister this state is a single integer. 2) Each object implements the mandatory apply upcall which changes the view when the Tango runtime calls it with new entries from the log. By customizing the apply implementation, one client can build a "tree view" while another builds a "set view" reading from the same log. 3) Each object exposes an external interface of object-specific mutator and accessor methods; e.g., the TangoRegister exposes read/write methods.
The object's mutators do not directly change the in-memory state of the object. Instead, each mutator combines its parameters into an opaque buffer --an update record-- and calls the update helper function of the Tango runtime, which appends it to the shared log.

Similarly, the accessors do not immediately read the object's state. Each accessor first calls the query helper before returning an arbitrary function over the state of the object. The query helper plays new update records in the shared log until its current tail and applies them to the object via the apply upcall before returning.

Storing multiple objects on a single shared log enables strongly consistent operations across them without requiring complex distributed protocols.  The Tango runtime on each client can multiplex the log across objects by storing and checking a unique object ID (OID) on each entry. Such a scheme has the drawback that every client has to play every entry in the shared log, but layered partitioning, as we shall discuss soon, solves this problem. It enables strongly consistent operations across objects without requiring each object to be hosted by each client, and without requiring each client to consume the entire shared log.

## Transactions

Tango implements optimistic concurrency control by appending speculative transaction commit records to the shared log.  Commit records ensure atomicity, since they determine a point in the persistent total ordering at which the changes that occur in a transaction can be made visible at all clients. To provide isolation, each commit record contains a read set: a list of objects read by the transaction along with their versions, where the version is simply the last offset in the shared log that modified the object. A transaction only succeeds if none of its reads are stale when the commit record is encountered (i.e., the objects have not changed since they were read).

To denote a transaction, calls to object accessors and mutators can be bracketed by BeginTX and EndTX calls. BeginTX creates a transaction context in thread-local storage. EndTX appends a commit record to the shared log, plays the log forward until the commit point, and then makes a commit/abort decision.

Each client that encounters the commit record decides --independently but deterministically-- whether it should commit or abort by comparing the versions in the readset with the current versions of the objects. If none of the read objects have changed since they were read, the transaction commits and the objects in the write set are updated with the apply upcall.

For read-only transactions, the EndTX call does not insert a commit record into the shared log; instead, it just plays the log forward until its current tail before making the commit/abort decision. Tango also supports fast read-only transactions from stale snapshots by having EndTX make the commit/abort decision locally, without interacting with the log.

Write-only transactions require an append on the shared log but can commit immediately without playing the log forward.

## Layered partitions

Each client hosts a (possibly overlapping) partition of the global state of the system, but this partitioning scheme is layered over a single shared log.  To efficiently implement layered partitions without requiring each client to play the entire shared log, Tango maps each object to a stream over the shared log.

A stream augments the conventional shared log interface (append and random read) with a streaming readnext call.  Many streams can co-exist on a single shared log; calling readnext on a stream returns the next entry belonging to that stream in the shared log, skipping over entries belonging to other streams. With this interface, clients can selectively consume the shared log by playing the streams of interest to them (i.e., the streams of objects hosted by them).

Each client plays the streams belonging to the objects in its layered partition. But, streams are not necessarily disjoint; a multiappend call allows a physical entry in the log to belong to multiple streams. When transactions cross object boundaries, Tango changes the behavior of its EndTX call to multiappend the commit record to all the streams involved in the write set. Multiappend ensures the following. A transaction that affects multiple objects occupies a single position in the global ordering; in other words, there is only one commit record per transaction in the raw shared log. A client hosting an object sees every transaction that impacts the object, even if it hosts no other objects.

Tango transactions has the following limitation though. Remote reads at the generating client is disallowed in a transaction: a client cannot execute transactions and generate commit records involving remote reads. Calling an accessor on an object that does not have a local view is problematic, since the data does not exist locally; possible solutions by invoking an RPC to a different client with a view of the object is expensive and complicated. So, if a client wants to do a transaction with reads on an object, the client should subscribe to the stream of that object.

## Streaming Corfu

When the client-side library starts up, the application provides it with the list of stream IDs of interest to it. For each such stream, the library finds the last entry in the shared log belonging to that stream by asking the sequencer. The K backpointers in this entry allow it to construct a K-sized suffix of the linked list of offsets comprising the stream. It then issues a read to the offset pointed at by the Kth backpointer to obtain the previous K offsets in the linked list. In this manner, the library can construct the linked list by striding backward on the log, issuing N/K reads to build the list for a stream with N entries.

## Evaluation

The experimental testbed consists of 36 8-core machines in two racks, with gigabit NICs on each node and 20 Gbps between the top-of-rack switches.  In all the experiments, they run an 18-node Corfu deployment on these nodes in a 9-by-2 configuration (i.e., 9 sets of 2 replicas each), such that each entry is mirrored across racks. The other 18 nodes are used as clients. The Corfu sequencer runs on a powerful, 32-core machine in a separate rack. They use 4KB entries in the Corfu log, with a batch size of 4 at each client.
Figure shows single object serializability. Reads wait the apply upcalls from the stream. If no writes, the reads are of little cost. As more writes occur, reads take more time to catch up. Probably reads may take more time than writes in Tango, but this is not shown in the graphs.

Figure shows performance for a primary/backup scenario where two nodes host views of the same object, with all writes directed to one node and all reads to the other. Overall throughput falls sharply as writes are introduced, and then stays constant at around 40K ops/sec as the workload mix changes; however, average read latency goes up as writes dominate, reflecting the extra work the read-only 'backup' node has to do to catchup with the primary.
Figure shows elasticity of linearizable read throughput with multiple views.

Figure shows transactions over layered partitions.

Tango vs. ZooKeeper.
Using Tango, the authors build ZooKeeper (TangoZK, 1K lines), BookKeeper (TangoBK, 300 lines), TreeSets and HashMaps (100 to 300 lines each). The performance of the resulting implementation is very similar to the TangoMap numbers in Figure 10; for example, with 18 clients running independent namespaces, they obtain around 200K txes/sec if transactions do not span namespaces, and nearly 20K txes/sec for transactions that atomically move a file from one namespace to another. The capability to move files across different instances does not exist in ZooKeeper, which supports a limited form of transaction within a single instance (i.e., a multi-op call that atomically executes a batch of operations).

They also implemented the single-writer ledger abstraction of BookKeeper in around 300 lines of Java code (again, not counting Exceptions and callback interfaces). To verify that their ZooKeeper and BookKeeper were full-fledged implementations, they ran the HDFS namenode over them (modifying it only to instantiate our classes instead of the originals) and successfully demonstrated recovery from a namenode reboot as well as fail-over to a backup namenode.

## Discussion

Tango fits within the State Machine Replication (SMR) paradigm, replicating state by imposing a total ordering over all updates. In the vocabulary of SMR, Tango clients can be seen as learners of the total ordering. The storage nodes comprising the shared log play the role of acceptors.

The findings in the Tango paper that a centralized server can be made to run at very high RPC rates matches recent observations by others. The Percolator system runs a centralized timestamp oracle with similar functionality at over 2M requests/sec with batching. Vasudevan et al. (SOCC'12) report achieving 1.6M submillisecond 4-byte reads/sec on a single server with batching. Masstree is a key-value server that provides 6M queries/sec with batching.

Tango's biggest contribution is that it provides multiple consistent object views from the same log. Objects with different in-memory data structures can share the same data on the log. For example, a namespace can be represented by different trees, one ordered on the filename and the other on a directory hierarchy, allowing applications to perform two types of queries efficiently (i.e., "list all files starting with the letter B" vs. "list all files in this directory"). Strongly consistent reads can be scaled simply by instantiating more views of the object on new clients. But is this free? Is this fast?

Tango's soft-belly is that it uses a pull-based approach of constructing the view from the shared log. Wouldn't a push-based approach be more timely? When a read comes, the pull-based approach may have a lot of catching up to do to the current state before it returns an answer. I guess it may be possible to simulate this with periodic pulls, even when no accessor function is invoked.

Tango provides a weird combination of centralized and decentralized. The log is centralized and this is exploited to provide serialization of distributed transactions. On the other hand, not having a master node and using the clients as learners is a very decentralized approach. Instead of one master taking decisions and updating the data structure, all of the clients are playing the log and taking decisions (in a deterministic way ensuring that they all make the same decisions), and updating their data structures. This resembles Lamport's extremely decentralized (to a fault!) implementation of the mutual exclusion which maintains replicated queues of all requests at all processes. (Of course, you can always code one client as master learner/decision-maker for other clients, and circumvent this!)

Tango vs. ZooKeeper.
Tango provides a better/higher-level programming support than ZooKeeper. What the Tango paper calls as Tango clients are servers that provide services for application-clients. (You may even say a Tango-client roughly corresponds to a "customized-view" ZooKeeper observer.) So, in terms of programmability and expressivity, Tango has the upper-hand. I presume using ZooKeeper for large-scale applications may become intractable and may result in spaghetti-code since ZooKeeper provides a very minimalistic/low-level-primitives for coordination. Tango, on the other hand, lets the developer build higher level abstractions of their own coordination services at the Tango-clients, and this benefits managing large projects while keeping complexity on a leash.

Comparing the efficiency of Tango and ZooKeeper, it seems like ZooKeeper would be better. In Tango, there are couple of indirections that are not present in ZooKeeper. In Tango, there is an extra step for sequencer node to get ticket/offset number. The Tango replication can correspond to ZooKeeper/Zab replication so they equal out there. But, Tango has another layer of indirection, where the clients need to read and learn from the log. In ZooKeeper, since the leader is also the decision maker, the app-client's learning can be from relatively compact state, whereas in Tango, this will be through replaying a sequence of commands and by constructing the state itself. Again, since Tango-client is like the ZooKeeper observer, that is another level of indirection before going to the app-client in Tango. So in total, two extra-levels are present in Tango (the sequencer contacting, and the Tango-client learning) that are not present in ZooKeeper. Tango provides better programmability and expressivity but this comes with a trade-off at the performance.

If your application is simple (and will remain simple), and can be implemented using ZooKeeper in a straightforward manner, it would be best to use ZooKeeper. Otherwise, by using Tango, you can have a better/extendible/tractable code-base, and potentially write some of your services as Tango-client that can even improve the performance.

## Final remarks

Tango code is not open source. That is really unfortunate, as it could provide a good alternative to ZooKeeper for some applications that require coordination and transactions across distributed clients.

Since the sequencer is centralized Tango is not suitable for WAN deployments.

Some questions still remain. The stream sharing assignments seems to be done statically using the layered stream abstraction API. Can we do this on demand and dynamically?

How is the layered stream abstraction implemented at CORFU level over the replica groups? Would it pay to dedicate one group for one popular stream? This would make bulk reading possible from that replica set. (Similar to the columnar storage idea.)

Noah Watkins said...

I'm a little unclear on the claim made in the paper that, "Write-only transactions require an append on the shared log but can commit immediately without playing the log forward."

If I create an object such as a TangoMap that throws an error when inserting a duplicate record, then do BeginTX; Obj.insert(x); EndTx; and 'x' has been inserted into the log via a different view of the object, then unless I play the log forward how can I commit this transaction locally?

Is there some other assumptions being made here?

Noah Watkins said...

I think in the above example insert I wasn't considering that to get the semantics I described I would have to implicitly do a read internally, and that the read-set also applies to reads that the object implementation makes.

### Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...