Sunday, November 23, 2014

Google Cloud Messaging (GCM): An evaluation

I had written about our work on building a crowdsourced superplayer for the "Who wants to be a millionaire (WWTBAM)" quiz show earlier. In that work we developed an Android app that enabled the app users in Turkey to participate in WWTBAM in real time as the show was airing on TV. When a question was read by the show host, my PhD students typed the question and the multiple-choice options, which were transmitted via Google Cloud Messaging (GCM) to the app users. App users played the game, and enjoyed competing with other app users, and we got a chance to collect precious data about MCQA dynamics in crowdsourcing. Our app was downloaded 300K+ times, and at the peak of its popularity 20K participants played the game simultaneously.

We used GCM to send the questions to the participants because we wanted to keep the app simple. GCM is the default push messaging solution for the Android platform and is maintained by Google as a free service with no quotas. GCM allows app developers to send push messages to Android devices from the server. As an alternative to GCM, we could have developed our app to maintain open TCP connections with the backend servers, but this would have made our backend design complicated and we would need many machines to be able to serve the need. Using GCM, we were able to send quiz questions as push messages easily, and can get away with using only one backend server to collect the answers from the app users. Another alternative to GCM was to deploy our messaging service, say using MQTT, but since we were developing this app as an experiment/prototype, we wanted to keep things as simple as possible.

After we deployed our app, we noticed that there were no studies or analysis of GCM performance, and we wondered whether GCM was enough to satisfy the real-time requirements in our application. Since we had a lot of users, we instrumented our app to timestamp the reception time of the GCM messages and reply back to the backend server with this information.

Our evaluation was done for both offline and online mode of GCM messaging. In the online mode, we send the message to the online participants that are playing the game while WWTBAM is broadcasting. That is, in the online mode, we know that the client devices are powered on, actively in use and have network connection. In the offline mode, we send the GCM message at a random time, so the server has no knowledge of whether the client devices are powered on, are in use, or have network connection. Table2 shows the number of devices involved in each of our experiments.

We found a giant tail in the GCM delivery latencies. Figure 4 above shows the message arrival time cumulative distribution in the online and offline modes. Note that the x-axis is in logarithmic scale. Table 6 shows message arrival times broken into quartiles. The median and the average message arrival times in the table indicate that the latency in the offline experiment is significantly high compared to the online experiment.

In Figure 5, comparing the GCM delivery times under WiFi versus cellular data connection in the offline mode shows that GCM delivery latencies are lower using the cellular data connection.
To investigate the GCM arrival times in the offline scenario further, we devised a double message experiment, where we send 2 GCM messages initiated at the same time on our server. This time, instead of measuring the message arrival time, we measure the difference between the arrival times of the first message and the arrival time of the second message. We were expecting to see very low time difference between these twin messages, but Figure 9 shows that the giant tail is preserved, and that some of the devices receive the second message hours after receiving the first one. Since we know that, at the time the first of the twin messages has arrived to the device, the device is reachable by Google’s GCM servers, the latency for the second message should be due to scheduling it to be sent independently than the first message.

These results show that the GCM message delivery is unpredictable, namely having a reliable connection to Google's GCM servers on the client device does not by itself guarantee a timely message arrival. It would be worth replicating this experiment where all participants are in the US, to see how that changes the results.

Our experiments raised more questions than it answered. Delving deeper at the documentation, we found some information that may explain some of the problems with GCM in the offline mode, but we need more examples to test these and understand the behavior better. It turns out that both cellular and WiFi modes keep a long lived connection to GCM servers. In the case of deep sleep mode (where user does not turn on the screen for long), the CPU wakes up and sends heartbeat to the servers every 28 minutes on cellular, every 15 minutes on the WiFi. This causes the connection to drop in many cases, because routers and providers kill the inactive TCP connections without sending any info to the client and/or server. Not all users are affected by this problem, it depends on carrier and/or router they use.

Related links

Our GlobeCom14 paper includes more information.

Saturday, November 15, 2014

Paper Summary: Granola, Low overhead distributed transaction coordination

This paper is by Cowling and Liskov, and it appeared at Usenix ATC'12.  Most closely related papers to this paper are the Sinfonia and Calvin papers. So, it may be helpful to also read the summaries of those papers from the links above to familiarize yourself with them.

This paper looks at coordination of 1-round transactions, which are different from general transactions that involve multi-round interaction with the client. 1-round transactions execute at the participant nodes, with no communication with other nodes except for at most a single commit/abort vote.

Figure 1 shows the system architecture. The clients are the transaction managers for these 1-round transactions. They initiate transactions and evaluate the commit/conflict/abort decisions returned for the transactions. The repositories communicate with one another (for at most a single commit/abort vote) to coordinate transactions.

There are 2 types of transactions in Granola: coordinated ones, and uncoordinated ones. Actually, better/less-confusing names for these 2 types would be lock-coordinated transactions and timestamp-coordinated transactions, since the latter type of transactions still involve coordination among the repositories.

To reflect this duality in coordination mode, each repository runs in one of two modes. When there are no coordinated transactions running at a repository, it runs in timestamp mode. When a repository receives a request for a coordinated transaction, it switches to the locking mode.

Uncoordinated transactions

There are two kinds of uncoordinated transactions: single repository transactions, or independent distributed transactions. Granola uses a timestamped-based coordination to provide serializability to both kinds of uncoordinated transactions and avoids locking for them. Examples of independent distributed transactions include read-only transactions, and local-read transactions, such as give every employee a 2% raise.

(In our previous work, the slow-fast paper (2010), we had also made an analysis similar to the independent transactions idea. Our slow-fast analysis inspects the program actions, which are precondition guarded assignment statements, and determined for which actions the atomicity can be relaxed so that they can execute in an uncoordinated manner. Our finding was that if the precondition of a program action is "locally-stable" (i.e., this precondition predicate cannot get falsified by execution of other program actions), then it is safe to execute this program action in an uncoordinated/nonatomic manner.)

Repositories are assumed to have access to loosely synchronized clocks (say NTP synchronized clocks). The use of timestamps in Granola are for ordering/serializability of transactions, and logical clocks would also suffice. Granola needs time synchronization for performance/throughput not for correctness. This separation is always a welcome one. (I can't resist but refer to our hybrid logical clock, HLC, work here. I think HLC would improve exposition of Granola, and would make it tolerant to malformed clients which may increase highTS and deny service to normal clients' requests.)

Figure 5 shows the straightforward single repository execution, and Figure 6 shows the more interesting independent transaction execution. Since these are 1-round transactions, the commit/abort decisions are local and one-shot at repositories. Voting is used to notify other repositories about the local decision (a conflict vote cause a repository to abort the transaction), and also for nominating a proposed timestamp for the transaction. The transaction is assigned the highest timestamp from among the votes.

In the "pure" timestamp mode, all single repository and independent distributed transactions commit successfully, and serializability is guaranteed as they are executed in timestamp order. This provides a substantial reduction in overhead from locking and aborts, which improve the throughput.

While in the timestamp mode, coordinated transactions may also arrive which may lead to conflict decisions to be returned for the single or distributed independent transactions executing on those repositories. Those repositories will switch to coordinated mode to serve the coordinated transactions. Once all coordinated transactions have completed those repositories can again transition to the timestamp mode.

Dually, in locking mode, repositories can still serve single repository and distributed independent transactions provided they do not conflict, i.e., they do not need to update a locked item. (In effect, the timestamp mode is just a shortcut to denote the lack of any coordinated transactions in the system.)

Coordinated transactions

Figure 7 shows coordinated transaction execution. Coordination is needed for a transaction that requires a remote read for abort/commit decision. For example, the transaction to transfer \$50 from Alice's account to Bob's account first needs to remote-read Alice's account to certify that it indeed has more than \$50. (Recall that in contrast for an independent distributed transaction the read, or the guard of the transaction, was local: give every employe 2% raise.) Different from independent distributed transaction execution, the coordinated transaction execution involves a prepare phase to acquire required locks. Locks ensure serializability for coordinated transaction execution. So timestamp order may not be satisfied in commit of transactions, and thus, external consistency may not be provided for coordinated transaction execution.


This paper is written with a focus on describing the system in reasonable detail to potential users of the system. This diverges a bit from the academic style, which would put the focus on the novelties and "intellectual merit" (a la NSF). Maybe this is because of the style/focus of the USENIX ATC conference. The evaluation section is also really nice and detailed. (Granola performs like Sinfonia for coordinated transacations, and improve throughput for uncoordinated transactions.)

It seems that Granola does not offer much advantage over Calvin. The paper compares in the related work with Calvin and states the following. "The Calvin transaction coordination protocol was developed in parallel with Granola, and provides similar functionality. Rather than using a distributed timestamp voting scheme to determine execution order, Calvin delays read/write transactions and runs a global agreement protocol to produce a deterministic locking order."

I think Granola may have an edge over Calvin for low-latency, especially for transactions that involve a couple repositories. The best way to see why is to consider this analogy.
Granola:Calvin :: CSMA:TDMA.

Friday, November 14, 2014

Paper Summary: Calvin, Distributed transactions for database systems

Calvin is a transaction scheduling and replication management layer for distributed storage systems. By first writing transaction requests to a durable, replicated log, and then using a concurrency control mechanism that emulates a deterministic serial execution of the log's transaction requests, Calvin supports strongly consistent replication and fully ACID distributed transactions, and manages to incur lower inter-partition transaction coordination costs than traditional distributed database systems.

Calvin emphasizes modularity. The holy trinity in Calvin is: log, scheduler, executor. When a client submits a transaction request to Calvin, this is immediately appended to a durable log, before any actual execution begins. Calvin's scheduling mechanism then processes this request log, deciding when each transaction should be executed in a way that maintains an invariant slightly stronger than serializable isolation: Transaction execution may be parallelized but must be equivalent to a deterministic serial execution in log-specified order. As a result, the log of transaction requests itself serve as an ultimate "source of truth" about the state of a database, which makes the recovery very easy.

I thought Calvin resembles the Tango approach a lot. (I had discussed Tango here recently.) It is almost as if Calvin is Tango's cousin in databases domain. As such, Calvin has similar strengths and disadvantages like Tango. For the advantages: Calvin provides good throughput, but will not get stars for low-latency. Calvin provides scalable replication, and strongly-consistent replication. (After you have one authoritative log, this is not hard to provide anyways.)

The centralized log is the source of all disadvantages in Calvin as well.  The transactions need to always go through the centralized log; so there are no truly local transactions. Thus Calvin will perform worse for workloads that have local/non-coordinating workload. So, the TPC-C workload Calvin uses for evaluation is actually best workload to show Calvin's relative performance to other systems.

The Log component

Calvin uses Paxos to achieve availability of the log by replicating it consistently. A group of front-end servers collect client requests into batches. Each batch is assigned a globally unique ID and written to an independent, asynchronously replicated block storage service such as Voldemort or Cassandra. Once a batch of transaction requests is durable on enough replicas, its GUID is appended to a Paxos “MetaLog”. Readers can then reassemble the global transaction request log by concatenating batches in the order that their GUIDs appear in the Paxos MetaLog.

Batching trades off throughput with low-latency: you cannot have transaction latency lower than the batching duration (epoch). So an epoch is a guaranteed overhead on latency for every transaction.

The scheduler component

The Scheduler component (which is a centralized component) examines a transaction before it begins executing and decides when it is safe to execute the whole transaction, then hands the transaction request off to the storage backend for execution with no additional oversight. The storage backend therefore does not need to have any knowledge of the concurrency control mechanism or implementation. Once a transaction begins running, the storage backend can be sure that it can process it to completion without worrying about concurrent transactions accessing the same data. However, each storage backend must provide enough information prior to transactional execution in order for the scheduler to make a well-informed decision about when each transaction can safely (from a concurrency control perspective) execute.

For transaction execution, the scheduler still uses locks. Deterministic locking ensures concurrent execution equivalent to the serial transaction order in the log, and also makes deadlock impossible (and the associated nondeterministic transaction aborts).


I don't know why Calvin doesn't adopt Tango style log maintenance: Using chain replication to improve throughput of the centralized log. This might actually help Calvin.

Similarly Calvin should also adopt selective/custom stream replication per replica feature in Tango. That would implement the flexibility/generality of Calvin.

Related links

Saturday, November 1, 2014

Paper Summary: Coordination Avoidance in Database Systems

Serializing transactions is sufficient for correctness, but it is not necessary for all operations of all applications. The downside of serialization is that it kills scalability and is overkill in many cases.

This paper (which will appear in VLDB'15) has the following insight: Given knowledge of application transactions and correctness criteria (i.e., invariants), it is possible to avoid this over-coordination of serializability and execute some transactions without coordination while still preserving those correctness criteria (invariants).

In particular the authors propose the concept of "invariant confluence" to relax the use of serialization for some operations of a coordination-requiring application. By operating on application-level invariants over database states (e.g., integrity constraints), the invariant confluence analysis provides a necessary and sufficient condition for safe, coordination-free execution. When programmers specify application invariants, this analysis allows databases to coordinate only when concurrency may violate those application invariants.

So how do they get the application invariants? "Many production databases today already support invariants in the form of primary key, uniqueness, foreign key, and row-level check constraints. We analyze this and show that many are invariant-confluent, including forms of foreign key constraints unique value generation, and check constraints, while others, like primary key constraints are, in general, not."

They claim that many common integrity constraints found in SQL and standardized benchmarks are invariant confluent, allowing order-of-magnitude performance gains over coordinated execution. To substantiate this claim, they apply invariant confluence analysis to a database prototype and show 25-fold improvement over prior TPC-C New-Order performance on a 200 server cluster. They find that 10 out of 12 of TPC-C's invariants are invariant-confluent, under the workload transaction.

The invariant-confluence model

Invariant-confluence captures a simple (informal) rule: coordination can be avoided if and only if all local commit decisions are globally valid. (In other words, the commit decisions are composable.)

They model transactions to operate over independent logical snapshots of database state. Transaction writes are applied at one or more snapshots initially when the transaction commits and are then integrated into other snapshots asynchronously via a merge operator that incorporates those changes into the snapshot's state. "Merge" is simply the set union of versions, and is used to capture the process of reconciling divergent states.

In effect, this model states that each transaction can modify its replica state without modifying any other concurrently executing transactions' replica state. Replicas therefore provide transactions with partial snapshot views of global state. They define local validity/consistency as a safety property, but global replica consistency is not defined as a safety property. Instead it is defined as a liveness property under the name "convergence".

(Formal definition of invariant-confluent:)
A set of transactions T is I-confluent with respect to invariant I if, for all I-T-reachable states Di, Dj with a common ancestor state, Di union Dj is I-valid.

Applying the invariant-confluence concept

As the definition implies, I-confluence holds for specific combinations of invariants and transactions. Removing a user from the database is I-confluent with respect to the invariant that the user IDs are unique. However, two transactions that remove two different users from the database are not I-confluent with respect to the invariant that there exists at least one user in the database at all times. As another example, uniqueness is not I-confluent for inserts of unique values. However, reads and deletions are both I-confluent under uniqueness invariants: reading and removing items cannot introduce duplicates.

Table 3 summarizes the 12 invariants found in TPC-C benchmark as well as their I-confluence analysis results as determined by Table 2. They classify the invariants into 3 broad categories: materialized view maintenance, foreign key constraint maintenance, and unique ID assignment.

Figure 5 shows the concurrency/throughput improvement made possible by applying the invariant-confluence analysis to the TPC-C workload.

Related work

This paper is an extension of the "CALM" approach that uses monotonicity and convergence concepts to relax the coordination needs of applications.

The "Scalable commutatitivity rule" paper, which was one of the best papers in SOSP'13, is a closely related work. In order to relax serializability and boost concurrency, that work prescribes exploiting the commutativity of operations. Another related work that exploits commutativity to relax serializability is the "Making Geo-Replicated Systems Fast as Possible, Consistent when Necessary" paper. The invariant confluence analysis concept is more general (but probably harder to apply) than the commutativity rule approach, because while commutativity is sufficient for correctness it is not always necessary.

In our previous work, the slow-fast paper (2010), we had also used the concept of "invariant-relaxed serializability" in distributed systems domain, particularly in application to wireless sensor/actor network concurrency control. (Maybe somewhat of a misnomer we called a slow action as one which can be executed in a concurrent/uncoordinated/nonatomic manner, and a fast action as one which needs to be executed in an atomic/coordinated/no-conflicts manner.)

I suspect our slow-fast approach used a less aggressive optimization than invariant-confluence: Slow-fast did not require/inspect program invariants explicitly, it only required access to the program actions (i.e., transactions). The slow-fast approach inferred that the invariant holds when program actions (transactions) execute atomically. (Invariant-confluence may potentially use even a weaker invariant than what slow-fast used.)

Our slow-fast analysis inspects the program actions, which are precondition guarded assignment statements, and determined for which actions the atomicity can be relaxed so that they can execute in an uncoordinated manner. Our finding was that if the precondition of a program action is "locally-stable" (i.e., this precondition predicate cannot get falsified by execution of other program actions), then it is safe to execute this program action in an uncoordinated/nonatomic manner. (This check probably implies "mergeability" of the state.) Our analysis also prescribed ways to break a coordination requiring action into two smaller actions to make it coordination free.

The "coordination avoidance in databases" paper applies the "invariant-relaxed serializability" idea in a more restricted and more useful domain, database transactions, and demonstrates the idea in a very practical way.

Monday, October 27, 2014

Clock-SI: Snapshot Isolation for Partitioned Data Stores Using Loosely Synchronized Clocks

This paper appeared in SRDS 2013, and is concerned with the snapshot isolation problem for distributed databases/data stores.

What is snapshot isolation (SI)?

(I took these definitions almost verbatim from the paper.)
SI is a multiversion concurrency control scheme with 3 properties:
1) Each transaction reads from a consistent snapshot, taken at the start of the transaction and identified by a snapshot timestamp. A snapshot is consistent if it includes all writes of transactions committed before the snapshot timestamp, and if it does not include any writes of aborted transactions or transactions committed after the snapshot timestamp.
2) Update transactions commit in a total order. Every commit produces a new database snapshot, identified by the commit timestamp.
3) An update transaction aborts if it introduces a write-write conflict with a concurrent committed transaction. Transaction T1 is concurrent with committed update transaction T2, if T1 took its snapshot before T2 committed and T1 tries to commit after T2 committed.

When a transaction starts, its snapshot timestamp is set to the current value of the database version. All its reads are satisfied from the corresponding snapshot. To support snapshots, multiple versions of each data item are kept, each tagged with a version number equal to the commit timestamp of the transaction that creates the version. The transaction reads the version with the largest version number smaller than its snapshot timestamp. If the transaction is read-only, it always commits without further checks. If the transaction has updates, its writes are buffered in a workspace. When the update transaction requests to commit, a certification check verifies that the transaction writeset does not intersect with the writesets of concurrent committed transactions. If the certification succeeds, the database version is incremented, and the transaction commit timestamp is set to this value.

What is the innovation in the Clock-SI paper?

The conventional SI implementations use a centralized timestamp authority for consistent versioning. This is because local clocks on different nodes may differ a lot (NTP synchronization may have 10s of ms of inaccuracies), and is not suitable for consistent versioning.

Clock-SI, instead, proposes a way to use loosely synchronized clocks to assign snapshot and commit timestamps to transactions. Compared to conventional SI, Clock-SI does not have a single point of failure and a potential performance bottleneck. It saves one round-trip message for a ready-only transaction (to obtain the snapshot timestamp), and two round-trip messages for an update transaction (to obtain the snapshot timestamp and the commit timestamp). A transaction's snapshot timestamp is the value of the local clock at the partition where it starts. Similarly, the commit timestamp of a local update transaction is obtained by reading the local clock.

If you read Google's Spanner paper, you know that Google Spanner solves this problem by introducing TrueTime, which uses atomic clocks.

How does Clock-SI work?

Clock-SI essentially response-delays a read in a transaction
1) to account for clock synchronization differences (epsilon) as in Fig1, and
2) to account for the pending commit of an update transaction.

In Fig1, the read arrives at time t′ on P2's clock, before P2’s clock has reached the value t, and thus t′ < t. The snapshot with timestamp t at P2 is therefore not yet available. Another transaction on P2 could commit at time t′′, between t′ and t, and change the value of x. This new value should be included in T1's snapshot.

T2's snapshot is unavailable due to the commit in progress of transaction T1, which is assigned the value of the local clock, say t, as its commit timestamp. T1 updates item x and commits. The commit operation involves a write to stable storage and completes at time t′. Transaction T2 starts between t and t′, and gets assigned a snapshot timestamp t′′, t < t′′ < t′. If T2 issues a read for item x, we cannot return the value written by T1, because we do not yet know if the commit will succeed, but we can also not return the earlier value, because, if T1's commit succeeds, this older value will not be part of a consistent snapshot at t′′.


The paper does not include a performance comparison to Spanner. The NTP synchronized clocks in the evaluation experiments have an NTP offset/epsilon less than 0.1 msec, which is actually more precise than Spanner's atomic clock! I guess this is thanks to the Gigabit Ethernet they use in their LAN deployment.

Discussion: Use of Hybrid Logical Clocks (HLC) for the Clock-SI problem

HLC is a hybrid version of logical clocks and physical clocks, introduced by us recently, to combine the advantages of both clocks, while avoiding their disadvantages. Since HLC captures happened-before relationship and uses this extra information in ordering, it does not need to wait out uncertainty regions of physical clock synchronization. Dually, since HLC is related to physical clocks it allows querying with respect to physical time. We had shown HLC's advantages for the consistent snapshot problem in our work.

Here we find that HLC indeed improves the clock-SI problem of snapshot isolation if it is used instead of physical clocks. HLC avoids the delay in Figure 1. HLC would not incur the delay because it also uses happened-before information as encoded in HLC clocks.

Saturday, October 18, 2014

Facebook's software architecture

I had summarized/discussed a couple papers (Haystack, Memcache caching) about Facebook's architecture before.

Facebook uses simple architecture that gets things done. Papers from Facebook are refreshingly simple, and I like reading these papers.

Two more Facebook papers appeared recently, and I briefly summarize them below.

TAO: Facebook's distributed data store for the social graph (ATC'13)

A single Facebook page may aggregate and filter 100s of items from the social graph. Since Facebook presents each user with customized content (which needs to be filtered with privacy checks) an efficient, highly available, and scalable graph data store is needed to serve this dynamic read-heavy workload.

Before Tao, Facebook's web servers directly accessed MySql to read or write the social graph, aggressively using memcache as a look aside cache (as it was explained in this paper).

The Tao data store implements a graph abstraction directly. This allows Tao to avoid some of the fundamental shortcomings of a look-aside cache architecture. Tao implements an objects and associations model and continues to use MySql for persistent storage, but mediates access to the database and uses its own graph-aware cache.
To handle multi-region scalability, Tao uses replication using the per-record master idea. (This multi-region scalability idea was again presented earlier in the Facebook memcache scaling paper.)

F4: Facebook's warm BLOB storage system (OSDI'14)

Facebook uses Haystack to store all media data, which we discussed earlier here.

Facebook's new architecture splits the media into two categories:
1) hot/recently-added media, which is still stored in Haystack, and
2) warm media (still not cold), which is now stored in F4 storage and not in Haystack.

This paper discusses the motivation for this split and how this works.

Facebook has big data! (This is one of those rare cases where you can say big data and mean it.) Facebook stores over 400 billion photos.

Facebook found that there is a strong correlation between the age of a BLOB (Binary Large OBject) and its temperature. Newly created BLOBs are requested at a far higher rate than older BLOBs; they are hot! For instance, the request rate for week-old BLOBs is an order of magnitude lower than for less-than-a-day old content for eight of nine examined types. Content less than one day old receives more than 100 times the request rate of one-year old content. The request rate drops by an order of magnitude in less then a week, and for most content types, the request rate drops by 100x in less than 60 days. Similarly, there is a strong correlation between age and the deletion rate: older BLOBs see an order of magnitude less deletion rate than the new BLOBs. These older content is called warm, not seeing frequent access like hot content, but they are not completely frozen either.

They also find that warm content is a large percentage of all objects. They separate the last 9 months Facebook data under 3 intervals: 9-6 mo, 6-3 mo, 3-0 months. In the oldest interval, they find that for the data generated in that interval more than 80% of objects are warm for all types. For objects created in the most recent interval more than 89% of objects are warm for all types. That is the warm content is large and it is growing increasingly.

In light of these analysis, Facebook goes with a split design for BLOB storage. They introduce F4 as a warm BLOB storage system because the request rate for its content is lower than that for content in Haystack and thus is not as hot. Warm is also in contrast with cold storage systems that reliably store data but may take days or hours to retrieve it, which is unacceptably long for user-facing requests. The lower request rate of warm BLOBs enables them to provision a lower maximum throughput for F4 than Haystack, and the low delete rate for warm BLOBs enables them to simplify F4 by not needing to physically reclaim space quickly after deletes.

F4 provides a simple, efficient, and fault tolerant warm storage solution that reduces the effective-replication-factor from 3.6 to 2.8 and then to 2.1. F4 uses erasure coding with parity blocks and striping. Instead of maintaining 2 other replicas, it uses erasure coding to reduce this significantly.

The data and index files are the same as Haystack, the journal file is new. The journal file is a write-ahead journal with tombstones appended for tracking BLOBs that have been deleted. F4 keeps dedicated spare backoff nodes to help with BLOB online reconstruction. This is similar to the use of dedicated gutter nodes for tolerating memcached node failures in the Facebook memcache paper.
F4 has been running in production at Facebook for over 19 months. F4 currently stores over 65PB of logical data and saves over 53PB of storage.


1) Why go with a design that has a big binary divide between hot and warm storage? Would it be possible to use a system that handles hot and warm as gradual degrees in the spectrum? I guess the reason for this design is its simplicity. Maybe it is possible to optimize things by treating BLOBs differentially, but this design is simple and gets things done.

2) What are the major differences in F4 from the Haystack architecture? F4 uses erasure coding for replication: Instead of maintaining 2 other replicas, erasure coding reduces replication overhead significantly.  F4 uses write-ahead logging and is aggressively optimized for read-only workload. F4 has less throughput needs. (How is this reflected in its architecture?)

Caching is an orthogonal issue handled at another layer using memcache nodes. I wonder if the caching policies treat content cached from Haystack versus F4 differently.

3) Why is energy-efficiency of F4 not described at all? Can we use grouping tricks to get cold machines/clusters in F4 and improve energy-efficiency further as we discussed here?

4) BLOBs have large variation in size. Can this be utilized in F4 to improve access efficiency? (Maybe treat/store very small BLOBs differently, store them together, don't use erasure coding for them. How about very large BLOBs?)

Facebook monitoring tools (The Facebook Mystery machine)
The Facebook Stack (by Malte Schwarzkopf) 

Wednesday, October 15, 2014

Paper Summary: A self-configurable geo-replicated cloud storage system

This paper is a followup work to the Pileus work, which I had covered here. Pileus aimed to help developers find a suitable consistency/latency combination for their application deployment. In Pileus, the configuration of primary and secondary nodes is assumed to be fixed (some storage nodes are designated as primary nodes, which hold the master data, while others are secondary nodes). The developer uses an SLA to state ranked preferences for latency and consistency of the reads that would make most sense for the application, and using this SLA, Pileus provides dynamic tuning of the performance of the application by deciding which read to forward to which replica/master.

This paper introduces a followup system, called Tuba, where the configuration is not fixed, and can be changed on the fly. Tuba extends Pileus to address the problem of finding an optimal configuration of primary and secondary replicas that maximizes the overall utility and minimizes the cost for the application.

Tuba extends Pileus with a configuration service (CS) delivering the following capabilities:
1. performing a reconfiguration periodically for different tablets, and
2. informing clients of the current configuration for different tablets.

Data is organized into tablets as access units, and each tablet is assigned to primary and secondary nodes. Of course different tablets may be assigned to different primary and secondary nodes. In order for the CS to configure a tablet's replicas to maximize the overall utility, the CS must be aware of the way the tablet is being accessed globally. Therefore, all clients in the system periodically send their observed latency and the hit and miss ratios of their SLAs to the CS.

Once a new configuration is decided, one or more of the following operations are performed as the system changes to the new configuration: (i) changing the primary replica, (ii) adding or removing secondary replicas, and (iii) changing the synchronization periods between primary and secondary replicas.

Configuration service (CS)

The CS is a centralized service. To improve utility, the CS selects a new configuration by first generating all configurations that satisfy a list of defined constraints. To permute configurations, it is free to use one of the following operations:
+ Adjust the Synchronization Period
+ Add/Remove Secondary Replica
+ Change Primary Replica
+ Add Primary Replica

For example, for a missed subSLA with strong consistency, two potential new configurations would be: (i) creating a new replica near the client and making it the solo primary replica, or (ii) adding a new primary replica near the client and making the system run in multi-primary mode. As another example, for a social network application that spans Brazil and India, the CS may decide to swap the primary and secondary replica roles to improve utility. During peak times in India, the secondary replica in South Asia becomes the primary replica. Likewise, during peak times in Brazil, the replica in Brazil becomes primary.

The constraints for configurations may include (i) Geo-replication factor, (ii) Location, (iii) Synchronization period, and (iv) Cost constraints. With the cost constraint, the application developers can indicate how much they are willing to pay (in terms of dollars) to switch to a new configuration. For instance, one possible configuration is to put secondary replicas in all available datacenters. While the gained utility for this configuration would be great, the cost of this configuration is likely unacceptably large.

For each configuration possibility that meets the constraints, the CS computes the expected gained utility and the cost of reconfiguration. The CS considers the following costs for a new configuration:
+ Storage: the cost of storing a tablet in a particular site.
+ Read/Write Operation: the cost of performing read/write operations.
+ Synchronization: the cost of synchronizing a secondary replica with a primary one.

Finally the CS choses the configuration that offers the highest utility-to-cost ratio, and executes the reconfiguration operations required to transition to the new one.

Client execution modes

Clients need to avoid two potential safety violations: (i) performing a read operation with strong consistency on a non-primary replica, or (ii) executing a write operation on a non-primary replica. Based on the freshness of a client's view, the client is either in fast or slow mode. A client is in the fast mode for a given tablet if it knows the locations of primary and secondary replicas, and it is guaranteed that the configuration will not change in the near future. Whenever a client suspects that a configuration may have changed, it enters slow mode until it refreshes its local cache.

Fast Mode. When a client is in fast mode, read and single-primary write operations involve a single round-trip to one selected replica. No additional overhead is imposed on these operations. Multi-primary write operations use a three-phase protocol in fast or slow mode.

Slow Mode. Slow mode has no affect on read operations with relaxed consistency. On the other hand, since read operations with strong consistency must always go to a primary replica, the client needs to validate that the responding replica to a strong-consistency read is still a primary replica. If not, the client retries the read operation. Write operations are more involved when a client is in slow mode. Any client in slow mode that wishes to execute a write operation on a tablet needs to take a non-exclusive lock on the tablet's configuration before issuing the write operation. On the other hand, the CS needs to take an exclusive lock on the configuration if it decides to change the set of primary replicas. This lock procedure is required to ensure the linearizability of write operations.


They implemented Tuba to extend Microsoft Azure Storage, with broad consistency choices (as in Bayou), consistency-based SLAs (as in Pileus), and a novel replication configuration service. Their evaluation compared with a system that is statically configured. An experiment with clients distributed in datacenters around the world shows that reconfiguration every two hours increases the fraction of reads guaranteeing strong consistency from 33% to 54%. This confirms that automatic reconfiguration can yield substantial benefits which are realizable in practice.

SEA is South East Asia and  WEU is West Europe.

Wednesday, October 8, 2014

Consistent snapshot analogies

Last week I taught distributed snapshot in my CSE 586: Distributed Systems class. While I teach snapshot, I invariably find myself longing for analogies to provide some intuition about this concept. The global state captured by a distributed snapshot (say using Lamport/Chandy marker algorithm) does not correspond to the "state of the system at initiation of the snapshot". Furthermore, it also may not correspond to a "state of the system from initiation to current state during this computation". This is because while the snapshot taking is progressing in the system, the underlying system computation is also proceeding and changing the state of the system progressively. (Distributed snapshot is not allowed to stop/freeze underlying system computation as that reduces availability.)

For those curious about the question, "what good is a snapshot then?": The snapshot captures a reachable state from initiation state, and from the snapshot state the current state of the computation is also reachable. In other words, snapshot is a likely state of the computation, even though it may not have occurred in this particular computation. So, for stable predicate detection and distributed system debugging the snapshot is still valuable.

Going back to my predicament, the analogy I resort to is that of 1000 ants trying to take/construct a picture of the elephant as the elephant is moving. (I had heard this example from Paul Sivilotti while I was a graduate student at Ohio State.) Here the ants correspond to the marker algorithm, and the elephant the underlying computation that we want to take a snapshot of. Of course the pictures the ants will construct will be vaguely elephant-like, it will be a picture of the elephant's outer surface as it progresses in the spacetime continuum. (Achievement Unlocked: Today I used spacetime continuum in serious writing.)

Last week I was using this analogy in class, when a better (at least more modern) analogy occurred to me. Panoramic photographs! When you use your smartphone to take a panorama picture, you are in fact taking a distributed snapshot of your surrounding. Your snapshot is not instantaneous, it needs time to complete: you need to rotate 180 to 360 degrees and probably that takes a good 5-10 seconds. If in the meanwhile something moves, that object will not be reflected in its original form/place/state in your panorama picture.

We may attempt to take the analogy further to categorizing the panorama pictures as consistent snapshots and inconsistent snapshots. In an inconsistent snapshot, although the send of a message is not recorded as part of the snapshot, the receive of the message is recorded as part of the snapshot. (You received a message from the future.) So we can say that, your panorama picture is inconsistent if the object moves in the opposite direction of the panorama/snapshot. These are examples of inconsistent snapshots.

And, these are examples of consistent snapshot. (Maybe the last two are debatable as they duplicate some state.)

Finally, this seemingly-consistent inconsistent snapshot (the bearded guy on the leftmost is teleported to reappear as the rightmost person) points to the dangers of ignoring backchannels when taking a snapshot.

Probably it is not worth trying to strain the analogy further, so I will stop.  Here are some more funny iphone panoramic pictures. 

Monday, September 29, 2014

Paper Summary: High-availability distributed logging with BookKeeper

This paper is brought to you by the Yahoo Research group that developed the ZooKeeper, and it appeared in LADIS'12.

BookKeeper targets the logging problem. More specifically, the distributed logging problem where high-availability is important and where many distributed clients are interested in reading the logs.

Most current applications log to the local disk, but this constitutes a single point of failure (SPOF) and betrays high-availability. A hasty remedy is to write to an NFS partition to store log files remotely. But now the NFS server becomes the SPOF. (We can of course replicate the NSF server, but the performance would suffer.) Another solution is to use NetApp filers that implement RAID. This costs money, and still does not completely solve SPOF.

BookKeeper provides a no-SPOF efficient data store  for serving a large number of concurrent single-writer, multiple-reader logs. It stripes log entries across servers, leading to higher throughput. BookKeeper is opensource and is used in production systems.

BookKeeper presents two case studies: Hedwig and HDFS Namenode. Hedwig is a scalable topic-based publish-subscribe system. To guarantee the delivery of messages despite partitions and server failures, Hedwig uses logging to persist published messages, which is implemented with BookKeeper. Hedwig is in production use and serves push notifications for Yahoo! properties (e.g., notifications for mobile devices).

The other use case concerns replicating the HDFS Namenode, the component of HDFS (Hadoop Distributed File System) that manages the file system metadata. On each update, the Namenode writes synchronously to a journal to guarantee that the update is durable. But unfortunately the Namenode is a SPOF. To enable efficient journaling and strong durability through replication, BookKeeper is used for implementing a journal manager for HDFS. The implementation is currently part of the HDFS codebase.

BookKeeper design and architecture

BookKeeper has 3 main components:

  • A bookie is a BookKeeper storage server, and each bookie stores ledger fragments. A ledger is written across f+1 bookies for fault-tolerance and striping. 
  • BookKeeper client is used for interacting with bookies. 
  • Ledger abstracts a log file. It is a sequence of entries identified by a sequence number (id). 

BookKeeper assumes that there is only a single client writing to a ledger (clients can employ ZooKeeper coordination for this), and in return it guarantees that, once a ledger is closed, all other clients that read from it read the same sequence of entries.

Here is the happy path for BookKeeper. An application using BookKeeper initially designates a ledger writer.  This ledger writer creates a ledger and appends data to the ledger; only the ledger writer is able to append entries to the ledger. Eventually, after appending an arbitrary number of entries to the ledger, the ledger writer closes it. Once the ledger is closed, its content is immutable. Clients can open closed ledgers for reading and any individual ledger can have multiple readers over time, and even concurrent readers.

The main calls in the API enable applications to:

  • Create a ledger;
  • Add entries to a ledger;
  • Open a ledger for reading;
  • Read entries from a ledger;
  • Close a ledger to prevent further writes;
  • Delete a ledger.

All these calls have both a synchronous and an asynchronous version.
Creating and using a ledger.
When a client creates a ledger, it selects a set of bookies to form an ensemble for the ledger and stores the ensemble information as part of the ledger metadata on ZooKeeper. For each entry the ledger writer adds to the ledger, it replicates the entry across f+1 bookies. A request to add an entry e completes successfully if e has been successfully replicated across f+1 bookies. If a bookie crashes, then the client replaces that bookie. BookKeeper uses ZooKeeper to keep track of configuration changes for a ledger.

Closing a ledger.
When closing a ledger, the ledger writer writes to ZooKeeper the last entry that has been written successfully, as part of the ledger metadata. If a ledger writer crashes prematurely, before it closes its open ledger, a ledger reader would need to do ledger recovery.

Ledger recovery.
When a ledger reader opens a ledger for reading, it first obtains the ledger metadata. If it finds that it has not been closed by checking the state of the ledger, the ledger reader triggers a recovery procedure. The first step of recovery for a given ledger consists of having the reader client asking each bookie in the ensemble for the last add confirmed (LAC) field in the last entry that the bookie has processed for the ledger. Since reads are based on entry id, the recovery process can start reading from the highest LAC it receives, and thus it is not necessary to read the entire ledger.

Reading from an open ledger.
BookKeeper also enables clients to read from open ledgers. When clients need to read from an open ledger, they invoke a call to open the ledger that does not try to recover it if it is not closed. To avoid reading partially replicated entries from the ledger, which may not be in the ledger once it is closed, the client asks bookies for their LAC values. Reading entry i ≤ LAC is safe, since the ledger writer has marked it as successfully replicated.

Dealing with multiple ledgers

To enable recovery, upon each request to append an entry to a ledger, a bookie appends this entry to the journal and flushes the write to the local disk device. A bookie only acknowledges to the client once it receives a confirmation that the flush operation has completed successfully. Note that the journal is shared across all active ledgers the bookie is currently storing. A bookie also writes entries to the ledger device to serve read requests. Thus, read traffic does not affect the performance of writes to the journal device.

The ledger device stores ledger entries along with an index for each ledger. Bookie has a single file, called entry log, and interleaves entries of different ledgers by appending entries of all ledgers. For each ledger, Bookie also keeps in-memory an index mapping the entry identifier to its position in the entry log.

This design targets workloads dominated by writes, while not neglecting the performance of reads. Requests to add an entry to a ledger return as soon as the entry is flushed to the journal of a bookie, and writes to the ledger device are asynchronous and mostly sequential to enable the writes to this device to keep up with the writes to the journal device. To serve a read request, it is necessary to obtain the position of the entry in the entry log. If the index page is cached, then the read requires one disk seek.

BookKeeper stores metadata on ZooKeeper.
The ledger metadata includes the ensemble composition of ledgers, write quorum size, ledger status, the last entry successfully written to a closed ledger. For the metadata store, BookKeeper uses ZooKeeper. "A different, more scalable data store becomes necessary when the number of active ledgers is of the order of tens to hundreds of millions." For the availability of bookies, BookKeeper relies upon ZooKeeper because it provides ephemeral znodes and watches.


Experiments are conducted using a cluster of identical machines: 2 Quad Core Intel Xeon 2.5Ghz, 16GB of RAM, one 1 Gbit/s network interface, and four SATA drives of 1TB and rotational speed of 7200 RPM. Each machine in the cluster mounts an enterprise class filer via NFS (NetApp FAS3050). This hardware gives a raw performance of 1.2 milliseconds for the latency of add operations and 22.5k adds/sec for 1 kbyte entries when writing to a single bookie. nE-qQ denotes a ledger configuration with ensemble size n and write quorum q.

Using a 3E-2Q configuration, Figure shows throughput and latency for a single client as the maximum number of outstanding operations is increased. This leads to a higher throughput, in particular for 128-byte entries. No batching tricks employed to improve throughput, the processing of an operation is triggered by the call.

Here 12 clients write simultaneously to a set of bookies, and the aggregate throughput is measured. Compared to the results for a single client writer, the aggregate throughput is substantially higher for shorter entries. For longer entries, throughput is limited by the speed with which bookies are able to write to disk, so adding more bookies to the pool (configurations with 6E) results in increased throughput.


BookKeeper resembles chain replication a little. The chain replication approach is to export consensus to Paxos, and only store data providing high throughput. BookKeeper also does that, but chain replication is not referred to in the paper at all. Of course, chain replication lacks striping, and does not by default provide disjoint read replicas (in addition to write replicas) to improve read throughput.

The Tango paper mentions BookKeeper and states that it has an implementation of BookKeeper in 300 lines. How would you implement BookKeeper in Tango?  What can you speculate about the performance of BookKeeper versus TangoBookKeeper? Could you implement Tango using BookKeeper? How about transactions?

After reading the paper, I was kept with this question. What happens if the bookie writes the entry to its journal and acknowledges it, but dies before asynchronously writing this entry to its ledger? Does this cause any problems?

Final remarks
The paper does not talk about consistency of logging, because every consistency concern is exported to the ZooKeeper. I guess we can chalk this up as success points for ZooKeeper. BookKeeper's bottleneck for WAN deployment is ZooKeeper. If ZooKeeper is consulted infrequently things are OK. But if ZooKeeper is consulted frequently for LAC information in order to read from open ledgers, performance suffers.

Related links:
Flavio's blog post on BookKeeper
Flavio's presentation on BookKeeper

Sunday, September 28, 2014

Paper summary: Tango: Distributed Data Structures over a Shared Log

This paper is from the Microsoft Research Silicon Valley (which unfortunately recently got closed), and it appeared in SOSP'13. SOSP'13 provides open access, so here is the pdf for free. The talk video is also on YouTube as part of this SOSP'13 talks playlist. I think this paper didn't get the attention it deserves. It is really a great piece of work.

To facilitate construction of highly available metadata services, Tango provides developers with the abstraction of a replicated in-memory data structure (such as a map or a tree) backed by a shared log.

While ZooKeeper provides developers a fixed data structure (the data tree) for building coordination primitives, Tango enables clients to build different data structures based on the same single shared log. Tango also provides transactions across data structures.

The state of a Tango object exists in two forms. 1) a history: which is an ordered sequence of updates stored durably in the shared log, 2) any number of views: which are full or partial copies of the data structure --such as a tree or a map-- constructed from the log and stored in RAM on clients (i.e., application servers).

A client modifies a Tango object by appending a new update to the history; it accesses the object by first synchronizing its local view with the history. Views are soft state and are instantiated, reconstructed, and updated on clients by playing the shared history forward.

In Tango, the shared log provides: consistency, durability, history. Tango also provides atomicity and isolation for transactions across different objects by multiplexing & storing them on a single shared log.

Corfu shared log abstraction

Tango builds on the Corfu shared log abstraction, which employs flash disks to alleviate the concerns about the read from the history of the log, while writes are going on at the head of the log.

The CORFU interface consists of 4 calls:

  1. Clients can append entries to the shared log, obtaining an offset in return.
  2. They can check the current tail of the log. 
  3. They can read the entry at a particular offset.
  4. Clients can trim a particular offset in the log for garbage collection.
Corfu organizes a cluster of storage nodes into multiple, disjoint replica sets; for example, a 12-node cluster might consist of 4 replica sets of size 3. Each individual storage node exposes a 64-bit write-once address space, mirrored across the replica set. The cluster also contains a dedicated sequencer node, which is essentially a networked counter storing the current tail of the shared log.

To append, a client contacts the sequencer and obtains the next free offset in the global address space of the shared log. It then maps this offset to a local offset on one of the replica sets using a simple deterministic mapping (e.g., modulo function) over the membership of the cluster. The client then completes the append by directly issuing writes to the storage nodes in the replica set using a client-driven variant of Chain Replication.

The sequencer is merely an optimization to find the tail of the log and not required for correctness. The Chain Replication variant used to write to the storage nodes guarantees that a single client will "win" if multiple clients attempt to write to the same offset. When the sequencer goes down, any client can easily recover this state using the slow check operation on the shared log.

The Tango architecture

There are 3 components to a Tango object. 1) A Tango object contains the view, which is an in-memory representation of the object in some form, such as a list or a map. E.g., for TangoRegister this state is a single integer. 2) Each object implements the mandatory apply upcall which changes the view when the Tango runtime calls it with new entries from the log. By customizing the apply implementation, one client can build a "tree view" while another builds a "set view" reading from the same log. 3) Each object exposes an external interface of object-specific mutator and accessor methods; e.g., the TangoRegister exposes read/write methods.
The object's mutators do not directly change the in-memory state of the object. Instead, each mutator combines its parameters into an opaque buffer --an update record-- and calls the update helper function of the Tango runtime, which appends it to the shared log.

Similarly, the accessors do not immediately read the object's state. Each accessor first calls the query helper before returning an arbitrary function over the state of the object. The query helper plays new update records in the shared log until its current tail and applies them to the object via the apply upcall before returning.

Storing multiple objects on a single shared log enables strongly consistent operations across them without requiring complex distributed protocols.  The Tango runtime on each client can multiplex the log across objects by storing and checking a unique object ID (OID) on each entry. Such a scheme has the drawback that every client has to play every entry in the shared log, but layered partitioning, as we shall discuss soon, solves this problem. It enables strongly consistent operations across objects without requiring each object to be hosted by each client, and without requiring each client to consume the entire shared log.


Tango implements optimistic concurrency control by appending speculative transaction commit records to the shared log.  Commit records ensure atomicity, since they determine a point in the persistent total ordering at which the changes that occur in a transaction can be made visible at all clients. To provide isolation, each commit record contains a read set: a list of objects read by the transaction along with their versions, where the version is simply the last offset in the shared log that modified the object. A transaction only succeeds if none of its reads are stale when the commit record is encountered (i.e., the objects have not changed since they were read).

To denote a transaction, calls to object accessors and mutators can be bracketed by BeginTX and EndTX calls. BeginTX creates a transaction context in thread-local storage. EndTX appends a commit record to the shared log, plays the log forward until the commit point, and then makes a commit/abort decision.

Each client that encounters the commit record decides --independently but deterministically-- whether it should commit or abort by comparing the versions in the readset with the current versions of the objects. If none of the read objects have changed since they were read, the transaction commits and the objects in the write set are updated with the apply upcall.

For read-only transactions, the EndTX call does not insert a commit record into the shared log; instead, it just plays the log forward until its current tail before making the commit/abort decision. Tango also supports fast read-only transactions from stale snapshots by having EndTX make the commit/abort decision locally, without interacting with the log.

Write-only transactions require an append on the shared log but can commit immediately without playing the log forward.

Layered partitions

Each client hosts a (possibly overlapping) partition of the global state of the system, but this partitioning scheme is layered over a single shared log.  To efficiently implement layered partitions without requiring each client to play the entire shared log, Tango maps each object to a stream over the shared log.

A stream augments the conventional shared log interface (append and random read) with a streaming readnext call.  Many streams can co-exist on a single shared log; calling readnext on a stream returns the next entry belonging to that stream in the shared log, skipping over entries belonging to other streams. With this interface, clients can selectively consume the shared log by playing the streams of interest to them (i.e., the streams of objects hosted by them).

Each client plays the streams belonging to the objects in its layered partition. But, streams are not necessarily disjoint; a multiappend call allows a physical entry in the log to belong to multiple streams. When transactions cross object boundaries, Tango changes the behavior of its EndTX call to multiappend the commit record to all the streams involved in the write set. Multiappend ensures the following. A transaction that affects multiple objects occupies a single position in the global ordering; in other words, there is only one commit record per transaction in the raw shared log. A client hosting an object sees every transaction that impacts the object, even if it hosts no other objects.

Tango transactions has the following limitation though. Remote reads at the generating client is disallowed in a transaction: a client cannot execute transactions and generate commit records involving remote reads. Calling an accessor on an object that does not have a local view is problematic, since the data does not exist locally; possible solutions by invoking an RPC to a different client with a view of the object is expensive and complicated. So, if a client wants to do a transaction with reads on an object, the client should subscribe to the stream of that object.

Streaming Corfu

When the client-side library starts up, the application provides it with the list of stream IDs of interest to it. For each such stream, the library finds the last entry in the shared log belonging to that stream by asking the sequencer. The K backpointers in this entry allow it to construct a K-sized suffix of the linked list of offsets comprising the stream. It then issues a read to the offset pointed at by the Kth backpointer to obtain the previous K offsets in the linked list. In this manner, the library can construct the linked list by striding backward on the log, issuing N/K reads to build the list for a stream with N entries.


The experimental testbed consists of 36 8-core machines in two racks, with gigabit NICs on each node and 20 Gbps between the top-of-rack switches.  In all the experiments, they run an 18-node Corfu deployment on these nodes in a 9-by-2 configuration (i.e., 9 sets of 2 replicas each), such that each entry is mirrored across racks. The other 18 nodes are used as clients. The Corfu sequencer runs on a powerful, 32-core machine in a separate rack. They use 4KB entries in the Corfu log, with a batch size of 4 at each client.
Figure shows single object serializability. Reads wait the apply upcalls from the stream. If no writes, the reads are of little cost. As more writes occur, reads take more time to catch up. Probably reads may take more time than writes in Tango, but this is not shown in the graphs.
Figure shows performance for a primary/backup scenario where two nodes host views of the same object, with all writes directed to one node and all reads to the other. Overall throughput falls sharply as writes are introduced, and then stays constant at around 40K ops/sec as the workload mix changes; however, average read latency goes up as writes dominate, reflecting the extra work the read-only 'backup' node has to do to catchup with the primary.
Figure shows elasticity of linearizable read throughput with multiple views.

Figure shows transactions over layered partitions.

Tango vs. ZooKeeper.
Using Tango, the authors build ZooKeeper (TangoZK, 1K lines), BookKeeper (TangoBK, 300 lines), TreeSets and HashMaps (100 to 300 lines each). The performance of the resulting implementation is very similar to the TangoMap numbers in Figure 10; for example, with 18 clients running independent namespaces, they obtain around 200K txes/sec if transactions do not span namespaces, and nearly 20K txes/sec for transactions that atomically move a file from one namespace to another. The capability to move files across different instances does not exist in ZooKeeper, which supports a limited form of transaction within a single instance (i.e., a multi-op call that atomically executes a batch of operations).

They also implemented the single-writer ledger abstraction of BookKeeper in around 300 lines of Java code (again, not counting Exceptions and callback interfaces). To verify that their ZooKeeper and BookKeeper were full-fledged implementations, they ran the HDFS namenode over them (modifying it only to instantiate our classes instead of the originals) and successfully demonstrated recovery from a namenode reboot as well as fail-over to a backup namenode.


Tango fits within the State Machine Replication (SMR) paradigm, replicating state by imposing a total ordering over all updates. In the vocabulary of SMR, Tango clients can be seen as learners of the total ordering. The storage nodes comprising the shared log play the role of acceptors.

The findings in the Tango paper that a centralized server can be made to run at very high RPC rates matches recent observations by others. The Percolator system runs a centralized timestamp oracle with similar functionality at over 2M requests/sec with batching. Vasudevan et al. (SOCC'12) report achieving 1.6M submillisecond 4-byte reads/sec on a single server with batching. Masstree is a key-value server that provides 6M queries/sec with batching.

Tango's biggest contribution is that it provides multiple consistent object views from the same log. Objects with different in-memory data structures can share the same data on the log. For example, a namespace can be represented by different trees, one ordered on the filename and the other on a directory hierarchy, allowing applications to perform two types of queries efficiently (i.e., "list all files starting with the letter B" vs. "list all files in this directory"). Strongly consistent reads can be scaled simply by instantiating more views of the object on new clients. But is this free? Is this fast?

Tango's soft-belly is that it uses a pull-based approach of constructing the view from the shared log. Wouldn't a push-based approach be more timely? When a read comes, the pull-based approach may have a lot of catching up to do to the current state before it returns an answer. I guess it may be possible to simulate this with periodic pulls, even when no accessor function is invoked.

Tango provides a weird combination of centralized and decentralized. The log is centralized and this is exploited to provide serialization of distributed transactions. On the other hand, not having a master node and using the clients as learners is a very decentralized approach. Instead of one master taking decisions and updating the data structure, all of the clients are playing the log and taking decisions (in a deterministic way ensuring that they all make the same decisions), and updating their data structures. This resembles Lamport's extremely decentralized (to a fault!) implementation of the mutual exclusion which maintains replicated queues of all requests at all processes. (Of course, you can always code one client as master learner/decision-maker for other clients, and circumvent this!)

Tango vs. ZooKeeper.
Tango provides a better/higher-level programming support than ZooKeeper. What the Tango paper calls as Tango clients are servers that provide services for application-clients. (You may even say a Tango-client roughly corresponds to a "customized-view" ZooKeeper observer.) So, in terms of programmability and expressivity, Tango has the upper-hand. I presume using ZooKeeper for large-scale applications may become intractable and may result in spaghetti-code since ZooKeeper provides a very minimalistic/low-level-primitives for coordination. Tango, on the other hand, lets the developer build higher level abstractions of their own coordination services at the Tango-clients, and this benefits managing large projects while keeping complexity on a leash.

Comparing the efficiency of Tango and ZooKeeper, it seems like ZooKeeper would be better. In Tango, there are couple of indirections that are not present in ZooKeeper. In Tango, there is an extra step for sequencer node to get ticket/offset number. The Tango replication can correspond to ZooKeeper/Zab replication so they equal out there. But, Tango has another layer of indirection, where the clients need to read and learn from the log. In ZooKeeper, since the leader is also the decision maker, the app-client's learning can be from relatively compact state, whereas in Tango, this will be through replaying a sequence of commands and by constructing the state itself. Again, since Tango-client is like the ZooKeeper observer, that is another level of indirection before going to the app-client in Tango. So in total, two extra-levels are present in Tango (the sequencer contacting, and the Tango-client learning) that are not present in ZooKeeper. Tango provides better programmability and expressivity but this comes with a trade-off at the performance.

If your application is simple (and will remain simple), and can be implemented using ZooKeeper in a straightforward manner, it would be best to use ZooKeeper. Otherwise, by using Tango, you can have a better/extendible/tractable code-base, and potentially write some of your services as Tango-client that can even improve the performance.

Final remarks

Tango code is not open source. That is really unfortunate, as it could provide a good alternative to ZooKeeper for some applications that require coordination and transactions across distributed clients.

Since the sequencer is centralized Tango is not suitable for WAN deployments.

Some questions still remain. The stream sharing assignments seems to be done statically using the layered stream abstraction API. Can we do this on demand and dynamically?

How is the layered stream abstraction implemented at CORFU level over the replica groups? Would it pay to dedicate one group for one popular stream? This would make bulk reading possible from that replica set. (Similar to the columnar storage idea.)

Friday, September 19, 2014

Revisiting the EWDs

Dijkstra was the original hipster. He was blogging before blogging was cool. "For over four decades, he mailed copies of his consecutively numbered technical notes, trip reports, insightful observations, and pungent commentaries, known collectively as EWDs, to several dozen recipients in academia and industry. Thanks to the ubiquity of the photocopier and the wide interest in Dijkstra’s writings, the informal circulation of many of the EWDs eventually reached into the thousands." And, thanks to the efforts of the University of Texas at Austin CS Department, all of these EWDs have been accessible to the public conveniently.

I remember when I first discovered the EWDs as a fresh graduate student. I was mesmerized. I read them with a lot of joy. It was as if a new world had opened to me to discover. He had many insightful observations. I recommend all CSE graduate students to read the EWDs to grow their minds.

Now, I don't agree with Dijkstra on everything. He was too much of a perfectionist, and believed in getting things right in one shot. He had this to say on this:
There are very different programming styles. I tend to see them as Mozart versus Beethoven. When Mozart started to write, the composition was finished. He wrote the manuscript and it was 'aus einem Guss' (from one cast). In beautiful handwriting, too. Beethoven was a doubter and a struggler who started writing before he finished the composition and then glued corrections onto the page. In one place he did this nine times. When they peeled them, the last version proved identical to the first one.

In contrast to Dijkstra's position, I believe in rapid prototyping and that perfection comes from iteration.

Of course I still adore all the EWDs and respect Dijkstra all the same. I mean, look at these gems in the Wikiquotes page for Dijkstra:

  • It is not the task of the University to offer what society asks for, but to give what society needs.
  • The required techniques of effective reasoning are pretty formal, but as long as programming is done by people that don't master them, the software crisis will remain with us and will be considered an incurable disease. And you know what incurable diseases do: they invite the quacks and charlatans in, who in this case take the form of Software Engineering gurus.
  • Elegance is not a dispensable luxury but a quality that decides between success and failure.
  • The problems of the real world are primarily those you are left with when you refuse to apply their effective solutions.

Some of his writings can be construed as starting a flamewar (Are "Systems people" really necessary?  :-). But he always had an important point to make. In some of his EWDs, he role-played as the "Chairman of the Board" of the fictitious Mathematics Inc., "a company that commercialized mathematical theorems the same way that software companies commercialized computer programs". He did this to show how ridiculous it is to patent a theorem, algorithm, or code.

And then there is this: "The cruelty of teaching computer science."

This is a 30 page handwritten (beautifully) manifesto against the state of CS teaching then, which unfortunately got worse in the following years. The manifesto finishes with a bang!
Teaching to unsuspecting youngsters the effective use of formal methods is one of the joys of life because it is so extremely rewarding. Within a few months, they find their way in a new world with a justified degree of confidence that is radically novel for them; within a few months, their concept of intellectual culture has acquired a radically novel dimension. To my taste and style, that is what education is about. Universities should not be afraid of teaching radical novelties; on the contrary, it is their calling to welcome the opportunity to do so. Their willingness to do so is our main safeguard against dictatorships, be they of the proletariat, of the scientific establishment, or of the corporate elite.

And about Microsoft's closing of the MS Research at Silicon Valley:

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...