Friday, September 19, 2014

Revisiting the EWDs

Dijkstra was the original hipster. He was blogging before blogging was cool. "For over four decades, he mailed copies of his consecutively numbered technical notes, trip reports, insightful observations, and pungent commentaries, known collectively as EWDs, to several dozen recipients in academia and industry. Thanks to the ubiquity of the photocopier and the wide interest in Dijkstra’s writings, the informal circulation of many of the EWDs eventually reached into the thousands." And, thanks to the efforts of the University of Texas at Austin CS Department, all of these EWDs have been accessible to the public conveniently.

I remember when I first discovered the EWDs as a fresh graduate student. I was mesmerized. I read them with a lot of joy. It was as if a new world had opened to me to discover. He had many insightful observations. I recommend all CSE graduate students to read the EWDs to grow their minds.

Now, I don't agree with Dijkstra on everything. He was too much of a perfectionist, and believed in getting things right in one shot. He had this to say on this:
There are very different programming styles. I tend to see them as Mozart versus Beethoven. When Mozart started to write, the composition was finished. He wrote the manuscript and it was 'aus einem Guss' (from one cast). In beautiful handwriting, too. Beethoven was a doubter and a struggler who started writing before he finished the composition and then glued corrections onto the page. In one place he did this nine times. When they peeled them, the last version proved identical to the first one.

In contrast to Dijkstra's position, I believe in rapid prototyping and that perfection comes from iteration.

Of course I still adore all the EWDs and respect Dijkstra all the same. I mean, look at these gems in the Wikiquotes page for Dijkstra:

  • It is not the task of the University to offer what society asks for, but to give what society needs.
  • The required techniques of effective reasoning are pretty formal, but as long as programming is done by people that don't master them, the software crisis will remain with us and will be considered an incurable disease. And you know what incurable diseases do: they invite the quacks and charlatans in, who in this case take the form of Software Engineering gurus.
  • Elegance is not a dispensable luxury but a quality that decides between success and failure.
  • The problems of the real world are primarily those you are left with when you refuse to apply their effective solutions.

Some of his writings can be construed as starting a flamewar (Are "Systems people" really necessary?  :-). But he always had an important point to make. In some of his EWDs, he role-played as the "Chairman of the Board" of the fictitious Mathematics Inc., "a company that commercialized mathematical theorems the same way that software companies commercialized computer programs". He did this to show how ridiculous it is to patent a theorem, algorithm, or code.

And then there is this: "The cruelty of teaching computer science."

This is a 30 page handwritten (beautifully) manifesto against the state of CS teaching then, which unfortunately got worse in the following years. The manifesto finishes with a bang!
Teaching to unsuspecting youngsters the effective use of formal methods is one of the joys of life because it is so extremely rewarding. Within a few months, they find their way in a new world with a justified degree of confidence that is radically novel for them; within a few months, their concept of intellectual culture has acquired a radically novel dimension. To my taste and style, that is what education is about. Universities should not be afraid of teaching radical novelties; on the contrary, it is their calling to welcome the opportunity to do so. Their willingness to do so is our main safeguard against dictatorships, be they of the proletariat, of the scientific establishment, or of the corporate elite.

And about Microsoft's closing of the MS Research at Silicon Valley:

Paper summary: Can a decentralized metadata service layer benefit parallel filesystems?

Parallel filesystems do a good job of providing parallel and scalable access to the data transfer, but, due to consistency concerns, the metadata accesses are still directed to one metadata server (MDS) which becomes a bottleneck. This is a problem for scalability because studies show that over 75% of all filesystem calls require access to file metadata.

This paper proposes to adopt ZooKeeper as a decentralized MDS for parallel filesystems and test whether that improves performance. You can ask, what is decentralized about ZooKeeper, and you would be right about the update requests. But for read requests, ZooKeeper helps by allowing any ZooKeeper server to respond while guaranteeing consistency. (You would still need to do a sync operation if the request needs the read to be freshest and satisfy precedence order.)

If you recall, ZooKeeper uses a filesystem API to enable clients to build higher-level coordination primitives (group membership, locking, barrier sync). This paper is interesting because it takes ZooKeeper and uses it directly as a metadata server for a filesystem leveraging the filesystem API ZooKeeper exposes in a literal manner. FUSE is used to act as a glue between the ZooKeeper as MDS and the underlying physical storage filesystem.

Distributed Union FileSystem (DUFS) architecture

A DUFS client instance does not interact directly with other DUFS clients; Any necessary interaction is made through ZooKeeper service. The figure shows the basic steps required to perform an open() operation on a file using DUFS.

  • A. The open() call is intercepted by FUSE which gives the virtual path of the file to DUFS.
  • B. DUFS queries ZooKeeper to get the Znode based on the filename and to retrieve the file id (FID).
  • C. DUFS uses the deterministic mapping function to find the physical path associated to the FID.
  • D. Finally, DUFS opens the file based on its physical path. The result is returned to the application via FUSE.

Alternatively, directory operations take place only at the metadata level, so only ZooKeeper is involved and not the back-end storage. For example, the directory stat() operation is satisfied at the Zookeeper itself (the back-end storage is not contacted) since we maintain the entire directory hierarchy in Zookeeper.


These tests were performed on a Linux cluster. Each node has a dual Intel Xeon E5335 CPU (8 cores in total) and 6GB memory. A SATA 250GB hard drive is used as the storage device on each node. The nodes are connected with 1GigE.

I am bugged by some of the limitations of the evaluation. In these tests the ZooKeeper servers are colocated (running on the same node) as the DUFS client. This naturally achieves wonders for read request latencies! But this is not a very reasonable set up. Moreover, the clients are not under the control of DUFS, so it is not a good idea to deploy your ZooKeeper servers on clients which are uncontrolled and can disconnect any time. Finally, this disallows clients from faraway. Of course ZooKeeper does not scale to WAN environment, and all the tests are done in a controlled cluster environment.


This paper investigates an interesting idea, that of using ZooKeeper as MDS of parallel filesystems to provide some scalability to the MDS. Thanks to the advantages of ZooKeeper, this allows improved read access because those can be served consistently from any ZooKeeper server. And, due to limitations of the ZooKeeper, this fails to address the scalability of update requests (throughput of update operations actually decrease as the number of ZooKeeper replicase increase) and also lacks the scalability needed for WAN deployments. Another limitation of this approach is that the metadata need to be able to fit into a single ZooKeeper server (and of course also the ZooKeeper replicas), so there is a scalability problem with respect to the filesystem size as well.

We are working on a scalable WAN version of ZooKeeper, and we will use the parallel filesystems as our application to showcase a WAN filesystem leveraging our prototype coordination system.

Friday, September 12, 2014

Distributed system seminar talk: Data grouping framework for energy-efficiency in distributed storage systems

My research group and Tevfik's research group meet jointly for a weekly distributed systems. This gives our students a chance to give talks about current project and get feedback for improvement in a friendly setting.

In this week's seminar, Luigi presented his research on building energy-efficient file systems. I was initially skeptical about energy-efficiency as a research topic. Academicians like to work on things that they can quantify and improve, so I was thinking that energy-efficiency in distributed storage was an opportunistic research problem, rather than a real-world problem. Turns out, I couldn't be any more wrong: IT companies spend $10 billions every year on energy consumption (This is 3% of entire expenditure of US!). $3.5 billion of that $10 billion is energy expenditure is due to the storage systems.

Dynamic power management (DPM) is the primary mechanism for energy saving at the storage systems. DPM basically means turn the disk off if you're not using it. An idling disk spends energy because it is still rotating, and this mechanic motion which burns energy. But turning a disc off is not easy. It takes 10s of seconds to stop and start hard disk, and the energy usage spikes at these transition points. This makes the problem into an optimization problem. When is it beneficial to turn the disk off? How can you create gaps long enough to turn off the disk?

The literature discusses the following DPM-enabling techniques for energy-saving in storage systems. Most of these techniques prescribe data access locality improvements.

1) Memory and disk caching: Caching is not only good for providing low-latency but also in some cases good for saving energy. If we can use cache to answer instead of turning on the disk, we can give the disk more time to sleep. But what should be the cache size? If it is too small, data won't fit, this won't provide much/any saving. If it is too large, the cache itself may consume more energy than it saves.

2) Diverting accesses: Data is stored redundantly, so this gives us the opportunity to spin down some redundant disks by diverting the accesses to the already active/hot ones. Unsurprisingly, there is a tradeoff of increased latency in doing so. By limiting concurrency/parallelism you increase latency of replies. (Is energy-efficiency versus latency a fundamental tradeoff in distributed storage?) Maybe, by offering well-drafted SLA agreements to the clients, it is possible to give incentive to the client for trading energy efficiency for slightly increased latency.

3) Popular data clustering: This technique prescribes organizing the disk storage based on the previously observed access locality of data. So if a disk is hot, it is likely to stay hot, and if a disk gets cold, it is likely to stay cold and it can sleep.

I guess there also could be orthogonal techniques if you don't need to serve requests in real-time. For those cases you have the opportunity to batch-schedule accesses.

Luigi is working on a hybrid of these techniques to provide as much energy-efficiency as possible. I wouldn't have thought energy-efficiency for distributed storage could be this interesting. There might even be a couple distributed algorithms problem here that I would enjoy.

Paper summary: ZooKeeper: Wait-free coordination for Internet-scale systems

Zookeeper is an Apache project for providing coordination services to distributed systems. ZooKeeper aims to provide a simple kernel (a filesystem API!) for empowering the clients to build more complex coordination primitives. In this post I will provide a summary of the ZooKeeper paper, and talk about some future directions I can see this going.

"Client" denotes a user of the ZooKeeper service, "server" denotes a process providing the ZooKeeper service, and "znode" denotes an in-memory data node (similar to the filesystem inode) in the ZooKeeper. znodes are organized in a hierarchical namespace referred to as the data tree.
There are 2 types of znodes. "Regular": Clients manipulate regular znodes by creating and deleting them explicitly. "Ephemeral": Clients create ephemeral znodes, and they either delete them explicitly, or let the system delete them automatically when the client's session termination. Additionally, when creating a new znode, a client can set a "Sequential" flag. Nodes created with the sequential flag set have the value of a monotonically increasing counter appended to its name. If n is the new znode and p is the parent znode, then the sequence value of n is never smaller than the value in the name of any other sequential znode ever created under p.

ZooKeeper also implements "watches" on znodes to allow clients to receive timely notifications of changes without requiring polling.

The API ZooKeeper provides to the clients

create(path, data, flags)
delete(path, version)  // operation is conditional on version (if provided)
exists(path, watch)
getData(path, watch)
setData(path, data, version) // operation is conditional on version (if provided)
getChildren(path, watch)

All methods in the API have both a synchronous and an asynchronous version. A client uses the synchronous API when it needs to execute a single ZooKeeper operation and it has no concurrent tasks to execute, so it makes the necessary ZooKeeper call and blocks. The asynchronous API enables a client to have both multiple outstanding ZooKeeper operations and other tasks executed in parallel. ZooKeeper guarantees that the corresponding callbacks for each operation are invoked in order.

Using ZooKeeper to implement coordination primitives

Configuration Management: The configuration is stored in a znode, zc. Processes start up with the full pathname of zc. Starting processes obtain their configuration by reading zc with the watch flag set to true. If the configuration in zc is ever updated, the processes are notified and read the new configuration, again setting the watch flag to true.

Rendezvous: When the master starts it fills in zr with information about addresses and ports it is using. When workers start, they read zr with watch set to true. If zr has not been filled in yet, the worker waits to be notified when zr is updated.

Group Membership: A znode, zg, is created to represent the group. When a process member of the group starts, it creates an ephemeral child znode under zg. If the process fails or ends, the znode that represents it under zg is automatically removed. Processes may put process information in the data of the child znode, e.g., addresses and ports used by the process. Processes may obtain group information by simply listing the children of zg. If a process wants to monitor changes in group membership, the process can set the watch flag to true and refresh the group information (always setting the watch flag to true) when change notifications are received.

Simple locks: To acquire a lock, a client tries to create the designated znode with the EPHEMERAL flag. If the create succeeds, the client holds the lock. Otherwise, the client can read the znode with the watch flag set. A client releases the lock explicitly or it is removed by timeout if it dies. Other clients that are waiting for a lock try again to acquire a lock once they observe the znode being deleted.

Simple Locks without Herd Effect: All the clients requesting the lock are lined up and each client obtains the lock in order of request arrival.
To lock: 
1 n = create(l + “/lock-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for watch event 6 goto 2

To unlock:
1 delete(n)

Read/Write Locks: The lock procedure is changed slightly to include separate read lock and write lock procedures.
Write Lock
1 n = create(l + “/write-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for event 6 goto 2

Read Lock
1 n = create(l + “/read-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if no write znodes lower than n in C, exit
4 p = write znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 3

You can build even more powerful coordination primitives using ZooKeeper, and a Python binding is also made available here.

Zookeeper applications at Yahoo!: ZooKeeper is used for the Fetching Service (FS) to achieve recovering from failures of masters, guaranteeing availability despite failures, and decoupling the clients from the servers, and allowing them to direct their request to healthy servers by just reading their status from ZooKeeper. FS uses ZooKeeper mainly to manage configuration metadata. FS is read-heavy, 10:1 to 100:1. As another example, Yahoo! Message Broker (YMB), a distributed publish-subscribe system, uses ZooKeeper to manage the distribution of topics (configuration metadata), deal with failures of machines in the system (failure detection and group membership), and control system operation.

Other practical uses of Zookeeper has been explained nicely here.

ZooKeeper architecture/internals

The replicated database is an in-memory database containing the entire data tree. Each znode in the tree stores a maximum of 1MB of data by default. For recoverability, ZooKeeper efficiently logs updates to disk, and forces writes to be on the disk media before they are applied to the in-memory database.

Every ZooKeeper server services clients. Clients connect to exactly one server to submit its requests. Read requests are serviced from the local replica of each server database.

Requests that change the state of the service, write requests, are processed by an agreement protocol. As part of the agreement protocol write requests are forwarded to a single server, called the leader. The rest of the ZooKeeper servers, called followers, receive message proposals consisting of state changes from the leader and agree upon state changes. This is similar to how Paxos works.

ZooKeeper's atomic broadcast protocol (Zab) uses by default simple majority quorums to decide on a proposal, so Zab and thus ZooKeeper can only work if a majority of servers are correct (i.e., with 2f + 1 server we can tolerate f failures). Zab guarantees that changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes.

More specifically, Zab/ZooKeeper provides both of these two basic ordering guarantees:
Linearizable writes: all requests that update the state of ZooKeeper are serializable and respect precedence.
FIFO client order: all requests from a given client are executed in the order that they were sent by the client.

ZooKeeper vs Paxos

ZooKeeper provides FIFO client order property, but Paxos doesn't. Paxos may violate the FIFO client property as follows.

Proposer P1 executes Phase 1 for sequence numbers 27 and 28. It proposes values A and B for sequence numbers 27 and 28, respectively, in Phase 2 with ballot number 1. Both proposals are accepted only by acceptor A1. Proposer P2 executes Phase 1 against acceptors A2 and A3, and end up proposing C in Phase 2 to sequence number 27 with ballot number 2. Finally, proposer P3, executes Phase 1 and 2, and is able to have a quorum of acceptors choosing C for sequence number 27, B for sequence number 28, and D for 29.

ZooKeeper argues that such a run is not acceptable because the state change represented by B causally depends upon A, and not C. Consequently, B can only be chosen for sequence number i+1 if A has been chosen for sequence number i, and C cannot be chosen before B, since the state change that B represents cannot commute with C and can only be applied after A.

Client server interaction

When a server completes a write operation, it also sends out and clears notifications relative to any watch that corresponds to that update. Servers process the writes the leader server sends in order and do not process other writes or reads concurrently in order to ensure strict succession of notifications. Note that servers handle notifications locally. Only the server that a client is connected to tracks and triggers notifications for that client.

One drawback of using fast reads (local reads at one server) is not guaranteeing precedence order for read operations. That is, a read operation may return a stale value, even though a more recent update to the same znode has been committed. Not all applications require precedence order, but for applications that do require it, the sync primitive is used. To guarantee that a given read operation returns the latest updated value, a client calls sync before the read operation. Sync flushes the pipes so to speak. The FIFO order guarantee of client operations together with the global guarantee of sync enables the result of the read operation to reflect any changes that happened before the sync was issued.

Read requests are handled locally at each server. Each read request is tagged with a zxid that corresponds to the last transaction seen by the server. ZooKeeper servers process requests from clients in FIFO order; responses include the zxid that the response is relative to. Even heartbeat messages during intervals of no activity include the last zxid seen by the server that the client is connected to. This zxid defines the partial order of the read requests with respect to the write requests. If the client connects to a new server, that new server ensures that its view of the ZooKeeper data is at least as recent as the view of the client by checking the last zxid of the client against its last zxid. If the client has a more recent view than the server, the server does not reestablish the session with the client until the server has caught up.

To detect client session failures, ZooKeeper uses time-outs. To prevent the session from timing out, the ZooKeeper client library sends a heartbeat after the session has been idle for s/3 ms and switch to a new server if it has not heard from a server for 2s/3 ms, where s is the session timeout in milliseconds.


The evaluation is performed on a cluster of 50 servers. For the target workloads, 2:1 to 100:1 read to write ratio, it is shown that ZooKeeper can handle tens to hundreds of thousands of transactions per second. Each client has at least 100 requests outstanding. Each request consists of a read or write of 1K of data.

As you add ZooKeeper servers, the read throughput improves, bu the write throughput degrades. This is because atomic broadcast needs to be done via Zab. Also the servers need to ensure that transactions are logged to non-volatile store before sending acknowledgments back to the leader.


ZooKeeper provides a minimalist and flexible coordination system and found a lot of use in production distributed systems. Zookeeper scales well with increase in read operations, but does not with increase in write operations. Zookeeper also does not scale with more Zookeeper replicas added. To alleviate this observer replicas are used, but they are limited in operation, and do not allow/benefit write operations. Finally, due to very large latencies involved ZooKeeper cannot handle across the WAN deployment of ZooKeeper servers.

In most places ZooKeeper is punting the ball to the clients. Yes, this is due to minimalistic design and such, but this burdens the clients to solve the transactional update themselves, and we know that this is error-prone. Maybe this is really the way to go. Or maybe this is the soft-belly of ZooKeeper and a big opportunity to provide a new coordination tool.

ZooKeeper is a great start, but we are just at the beginning.

Sunday, August 31, 2014

Sudoku and research

I got addicted to Sudoku... again. When I have a chance to rest, I enjoy the challenge of solving Sudoku puzzles (*cough* at the expert level) on my iPhone. I think this practice gives my mind sharpness and clarity (my wife would roll her eyes so hard if she heard me say this :-).

I was recently thinking of how the process of solving a Sudoku puzzle resembles doing research.

1) Sequencing is important. In Sudoku, you take things step by step. You fill out the obvious cells first. Having filled these, you now have more constraints/clues upon which you can fill in other blocks. You have to solve Sudoku step by step from most obvious to what is made obvious having finished that step.

This is also the case in research. You can't rush things; you should start with the simple steps. First you have to attack/complete what you can do currently, so that more things can become available for you to see. You have to climb the stair step by step to see more and do more.

2) Writing is important. You take notes on the Sudoku cells to give you clues, e.g., 4 can go into this cell or this cell. These clues eventually lead to constraints and to solutions. Without taking notes, you wouldn't be able to make any progress on hard Sudoku puzzles.

You are all computer scientists.
You know what FINITE AUTOMATA can do.
You know what TURING MACHINES can do.
For example, Finite Automata can add but not multiply.
Turing Machines can compute any computable function.
Turing machines are incredibly more powerful than Finite Automata.
Yet the only difference between a FA and a TM is that
the TM, unlike the FA, has paper and pencil.
Think about it.
It tells you something about the power of writing.
Without writing, you are reduced to a finite automaton.
With writing you have the extraordinary power of a Turing machine.
(From Manuel Blum's advice to graduate students)

Similarly, writing is very important for research. It leads the way for you. You start writing as you start the research work, and before you do the work/experiments. I think I said this many times before, so I will leave this at that. (How I write, How to write your research paper, My advice to graduate students)

3) Perspective is important. In Sudoku, when you are stuck, you change your perspective and look for alternatives, because there is always another easier way to look at the situation and get unstuck.
A change in perspective is worth 80 IQ points.
Alan Kay.

(Again from Manuel Blum's advice to graduate students)
CLAUDE SHANNON once told me that as a kid, he remembered being stuck on a jigsaw puzzle.
His brother, who was passing by, said to him:
"You know: I could tell you something."
That's all his brother said.
Yet that was enough hint to help Claude solve the puzzle.
The great thing about this hint... is that you can always give it to yourself !!!
I advise you, when you're stuck on a hard problem,
to imagine a little birdie or an older version of yourself whispering
"... I could tell you something..." 
I once asked UMESH VAZIRANI how he was able,
as an undergraduate at MIT,
to take 6 courses each and every semester.
He said that he knew he didn't have the time to work out his answers the hard way.
He had to find a shortcut.
You see, Umesh understood that problems often have short clever solutions.

In research, ... yup, you need to learn to change your perspective, and try different point of views.

4) Finally perseverance is important. In Sudoku, you learn patience and perseverance, and you try different things until you make more progress. In research, patience and perseverance are also essential.
Whatever you do, you got to like doing it....
You got to like it so much that you're willing to think about it, work on it, long after everyone else has moved on.
(Again from Manuel Blum's advice to graduate students)

Heeding my own warning on reverse scooping, I googled for "Sudoku and research" and found this nice post, which has made similar connections.
After doing a few [Sudoku puzzles], it struck me that these puzzles are a good analogy for the way science research is done. Thomas Kuhn in his classic book The Structure of Scientific Revolutions points out that normal scientific research within a paradigm is largely a puzzle solving exercise in which there is an assurance that a solution exists to the problem and that it is only the ingenuity of the scientist that stands between her and a solution. The sudoku problem is like that. We know that a solution of a particular form exists and it is this belief that makes people persevere until they arrive at a solution.

Tuesday, August 12, 2014

Using TLA+ for teaching distributed systems

I am teaching CSE 4/586 Distributed Systems class again this Fall (Fall 2014). This is the course I have most fun teaching. (I would like to think my students also feel that way :-) I teach the course with emphasis on reasoning about the correctness of distributed algorithms. Here are the topics I cover in sequence:

  1. Introduction, Syntax and semantics for distributed programs, predicate calculus
  2. Safety and progress properties
  3. Proof of program properties
  4. Time: logical clocks, State: distributed snapshots
  5. Mutual exclusion, Dining philosophers
  6. Consensus, Paxos
  7. Fault-tolerance, replication, rollback recovery, self-stabilization
  8. Programming support for distributed systems
  9. Data center computing and cloud computing 
  10. CAP theorem and NOSQL systems
  11. Distributed/WAN storage systems

I put emphasis on reasoning about distributed algorithms because concurrency is very tricky; it truly humbles human brain. More than 3 actions in a distributed program and your intuitions will fail, you won't be able to hand-wave and apply operational reasoning on the program. You may think you could, but you would be very wrong (I know from first-hand experience).

I use invariant-based reasoning of program properties for the first 4 weeks exclusively. But this becomes less applicable when we move into more involved protocols in weeks 5 and beyond. This is where I give up being rigorous and make tell the class: "We could push things down the most rigorous invariant-based reasoning and predicate calculus level but we don't. Instead we give arguments in English, with the appreciation of how these arguments correspond to the proof rules in previous chapters." Yes, this is not very satisfying, but I didn't have much choice.


So for these reasons, the AWS TLA+ article got my attention recently. The article talked about how AWS successfully used invariant-based reasoning and formal methods (in particular TLA) for building robust distributed systems. TLA is a tool for specifying distributed algorithms/protocols and model checking them. AWS used TLA in many key projects: S3, DynamoDB, EBS, and a distributed lock manager. Here is the technical report by AWS. It is a very good read.

TLA+ is Leslie Lamport's brainchild. Of course you know Lamport if you are working on distributed systems. Lamport got a Turing award in 2013; he is famous for logical clocks, Paxos, and several other influential results in distributed systems. As a side-project, he wrote a wrapper around Knuth's TeX, called LaTeX ("La" for Lamport?), which is still the typesetting tool for almost all math/CSE academic papers. Lamport has always been a firm proponent of invariant-based reasoning for distributed algorithms and it seems like he has been dedicating most of his effort on prostelyzing TLA in recent years.

There are other successful model checkers (Spin, SMV, Promela), but TLA is more focused on supporting distributed algorithms reasoning. In addition, the PlusCal language (in the TLA+ toolkit) provides a high-level pseudo language to write distributed algorithms easily.

How I went about learning TLA

This was a straightforward and easy process. This is the main page for TLA, where the other pages can be reached. To download the toolkit, I first went to this page which forwards to this download page.

Then I downloaded the Hyperbook and started following it. The chapters were all straightforward for me, because this is very similar to the material I teach in my 486/586 class for invariant-based reasoning of distributed algorithms. The hyperbook has a lot of examples and is the best place to start learning TLA.

For the PlusCal language reference I downloaded this.

After I got the hang of it,  I decided to get my hands dirty with my own toy programs. I wrote TLA+ specifications for some simple coffee bean problems.  Then using PlusCal, I wrote specifications for Dijkstra's stabilizing token ring algorithm. First without using process abstraction, then with the process abstraction when I finished Chapter 7 in Hyperbook. Finally I wrote specifications for Dijkstra's 3-state and 4-state token ring algorithms, which progressed very smoothly. Next, I will use it on Paxos (here is a TLA+ specification of epaxos) and my own work.


The guarded-command language I use for teaching 4/586 translates very easily to PlusCal, so TLA+ is a good fit for my course. I will start using it in my 4/586 class this coming semester. I think the students will enjoy having hands-on experience with reasoning about non-toy distributed protocols.

Sunday, July 27, 2014

Hybrid Logical Clocks

Here I will write about our recent work on Hybrid Logical Clocks, which provides a feasible alternative to Google's TrueTime.

A brief history of time (in distributed systems)

Logical Clocks (LC) was proposed in 1978 by Lamport for ordering events in an asynchronous distributed system. LC has several drawbacks for modern systems. Firstly, LC is divorced from physical time (PT), as a result we cannot query events in relation to real-time. Secondly, to capture happened-before relations, LC assumes that there are no backchannels and all communication occurs within the system.

Physical Time (PT) leverages on physical clocks at nodes that are synchronized using the Network Time Protocol (NTP). PT also has several drawbacks. Firstly, in a geographically distributed system obtaining precise clock synchronization is very hard; there will unavoidably be uncertainty intervals. Secondly,  PT has several kinks such as leap seconds and non-monotonic updates. And, when the uncertainty intervals are overlapping, PT cannot order events and you end up with inconsistent snapshots as the one shown below.

TrueTime (TT) was introduced by Spanner, Google's globally-distributed multiversion database, to timestamp transactions at global scale. TT leverages on tightly-synchronized physical clocks, but TT also has drawbacks. As in PT, when the uncertainty intervals are overlapping TT cannot order events, and it has to explicitly wait-out these ε intervals. To alleviate the problems of large ε, TT employs GPS/atomic clocks to achieve tight-synchronization (ε=6ms), however the cost of adding the required support infrastructure can be prohibitive and ε=6ms is still a non-negligible time.

Hybrid Logical Clocks

In our recent work (in collaboration with Sandeep Kulkarni at Michigan State University), we introduce Hybrid Logical Clocks (HLC). HLC captures the causality relationship like LC, and enables easy identification of consistent snapshots in distributed systems. Dually, HLC can be used in lieu of PT clocks since it maintains its logical clock to be always close to the PT clock.

Formally, the HLC is problem is to assign each event e a timestamp, l.e, such that
1) e hb f =>  l.e l.f,
2) Space requirement for l.e is O(1) integers,
3) l.e is close to pt.e, that is, l.e - pt.e is bounded.

Next I will show you a naive algorithm for HLC which is unbounded. Then I will present our HLC algorithm and show that it is bounded.

Naive algorithm 

The naive algorithm is very simple and very similar to Lamport's LC algorithm. However, it cannot keep l-pt bounded: l can move ahead of pt in an unbounded manner for certain cases.
In this example run, we can see that l-pt diverges unboundedly if we continue the messaging loop among processes 1, 2, and 3.

HLC algorithm

Here is the improved algorithm. This algorithm bounds l-pt and c for any case, including the example run presented above. Below is that run annotated with HLC values.
Notice that in the HLC algorithm, l-pt is trivially bounded by ε. And, more importantly c gets reset regularly. This is because, either l is incremented via receiving a larger l from another node and c gets reset, or l remains the same and pt catches up to l to increase it and c gets reset.

Under the most basic/minimal constraint that physical clock of a node is incremented by at least one between any two events on that node, we prove c < N*(ε +1). Recall that the naive algorithm cannot be bounded with this minimal assumption as the counterexample showed.

Under a more lenient environment, if we can assume that the time for message transmission is long enough for the physical clock of every node to be incremented by at least d, we prove  c < ε/d+1. Note that under that assumption the naive algorithm also becomes bound-able.

Now let's see how we can get a consistent cut using HLC. The consistency of the cut is implied by ¬(∃ p,q :: l.snap.p < l.snap.q ), which is equivalent to (∀ p,q :: l.snap.p = l.snap.q). That is, to get a consistent cut, all we need to do is to take events with the same l and c value at all nodes. In the figure, we show a consistent cut for l=10 and c=0.

Fault tolerance

Stabilization of HLC rests on the superposition property of HLC on NTP clocks. Once the NTP/physical clock stabilizes, HLC can be corrected based on the maximum permitted value of l-pt and the maximum value of c. If bounds are violated, we take the physical clock as the authority, and reset l and c values to pt and 0 respectively.

In order to contain the spread of corruptions due to bad HLC values, we have a rule to ignore out of bounds messages. In order to make HLC resilient to common NTP synchronization errors, we assign sufficiently large space to l-pt drift so that most NTP kinks can be masked smoothly.

Concluding remarks

We can have a compact representation of HLC timestamps using l and c. NTP uses 64-bit timestamps which consist of a 32-bit part for seconds and a 32-bit part for fractional second. We restrict l to track only the most significant 48 bits of pt. Rounding up pt values to 48 bits l values still gives us microsecond granularity tracking of pt. 16 bits remain for c and allows it room to grow up to 65536. (In our experiments c mostly stayed in single digits.)

HLC provides the following benefits: HLC is substitutable for PT (NTP clocks) in any application. HLC is resilient and monotonic and can tolerate NTP kinks. HLC can be used to return a consistent snapshot at any given T. HLC is useful as a timestamping mechanism in multiversion distributed databases, such as Spanner. In fact, HLC is being used in CockroachDBan opensource clone of Spanner, and is implemented in this module in particular.

Read our paper for the details.

Friday, July 4, 2014

Distributed is not necessarily more scalable than centralized

Centralized is not necessarily unscalable! 

Many people automatically associate centralized with unscalable, and distributed with scalable. And, this is getting ridiculous.

In the Spring semester, in my seminar class, a PhD student was pitching me a project for distributed storage: syncing from phone to work/home computers and other phones. The pitch started with the sentence "Dropbox is unscalable, because it is centralized". I was flabbergasted, and I asked a couple of times "Really? Do you actually claim that Dropbox is unscalable?". The student persisted and kept repeating that "Dropbox has a bottleneck because it is a centralized storage solution, and the distributed solution doesn't have that bottleneck". I couldn't believe my ears.

Dropbox already proved it is scalable: It serves files for more than 200 million users, who store 1 billion files every 24 hours. That it has a centralized architecture hosted in the cloud doesn't make it unscalable. As far as I can see there is no bottleneck caused by Dropbox having a more centralized architecture.

(For those who want to nitpick, I know Dropbox is not fully centralized; it uses AWS S3 for storage and Dropbox-company servers for metadata management. Also, it employs data parallelism in the backend for scalability, but, on the spectrum, it is closer to a centralized architecture than a fully decentralized one.)

Distributed is not necessarily scalable!

Some people when faced with a problem think, I know, I'll use distributed computing. Now they have N^2 problems. -- @jamesiry
Here is the second part. Distributing a system does not necessarily make it scalable. In fact, a fully decentralized architecture can sometimes be a disadvantage for scaling.

Consider Lamport's mutual exclusion (ME) algorithm presented in his seminal "Time, Clocks, and the Ordering of Events in a Distributed System". This ME algorithm is fully decentralized, and requires O(N) messages to be exchanged in response to one ME request. The Lamport ME algorithm employs broadcasts to keep all the nodes informed of all updates and get them on the same (more or less) state.

Now consider a centralized algorithm for ME: there is a centralized coordinator; the nodes send their request to the coordinator, and the coordinator assigns ME accordingly. (For the literalist: You can still have causal ordering in the centralized algorithm. Just use VC when nodes communicate and include VC in the request messages.) The centralized ME algorithm is more scalable: only 1 message is exchanged in response to one ME request. It has less drama and it is easier to maintain and build over.

Single point of failure?

A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable. -- Leslie Lamport
A common reflex argument about centralized solutions is that it constitutes a single point of failure (SPOF). But if a distributed solution is not designed carefully, it will have multiple points of failures (MPOF). Which one would you rather have?

Let's reconsider the Lamport ME and the centralized ME algorithms. The distributed algorithm does not offer any fault-tolerance advantages. Both algorithms are prone to getting stuck with one crash failure.

In fact, we can argue that it is easier to design fault-tolerance to a centralized solution: You can employ Paxos to replicate the centralized server. In contrast, it is often much harder to design and add fault-tolerance to a distributed system. Since a distributed system is complex, it is more prone to introduce corner cases that jeopardize fault-tolerance.


Distributed is not necessarily more scalable than centralized;
And centralized is not necessarily a scalability bottleneck.

As a distributed systems professor, I wouldn't imagine myself defending centralized solutions. But there it is.

To avoid potential misunderstandings, I am not saying fully distributed/decentralized solutions are bad and to be avoided. There are advantages to decentralization, like latency reduction. And some conditions necessitate decentralization, like geographic/political/corporate isolation. We know in the real world it is a mix of centralized, up to where that is manageable and has reasonable cost, and some distributed architecture. This also depends very much on the application/task.

PS: Maybe we should do an XtraNormal animation movie about this "centralized unscalable and distributed scalable" mania. Any takers?

PS2: I thank @tedherman for improvements to the 1st draft.

PS3: Optimistic replication is a great survey of more decentralized replication protocols, their advantages, and challenges.

Bonus Section: Paxos is a relatively centralized approach to distributed consensus

Consensus is usually not an all-hands-on process. That can be hard to scale. Consider our democratic system: It is pretty centralized; we only elect leaders to rule for us.

In the same sense, you can think of Paxos as the more centralized approach to distributed consensus and state machine replication. In Paxos, the participants do not interact with all other participants to decide the order of requests to be accepted, instead the leader dictates the order of requests and the participants just accept them. (A fully decentralized consensus algorithm would be like the synchronous rounds consensus algorithm where in every round each participant communicates with all other participants so that they can converge on the same state.)

Monday, June 30, 2014


Managing your resources (energy, time, and students) is a nontechnical topic, but nevertheless it is essential for your success in academia. There isn't much talk or guidance in these topics at the graduate school. You are expected to attain these skills on your own or maybe acquire them by osmosis from professors and colleagues.

Here I will keep it short, and just post my summary slides of 3 great books I read on management.

The first one is the seven habits book. This book is about managing yourself as an effective person. I first read this book around 18 and found it long, tedious, and boring. Reading it again at 38, I think the book has great advice.
Link to my summary slides on the seven habits book.

Getting Things Done (GTD) is the best book on time and project management with low stress. This summer, do yourself a favor: Read the book, and adopt the GTD system ASAP. You will thank me later.
Link to my summary slides on the GTD book.

I am not aware of much work on student/postdoc management, although this is very important for project management. The one minute manager provides a minimalist method for managing people. There are also some advice on the web (this scrum method is worthy of note), but if you can suggest well-researched time-tested approaches for student management, I am eagerly waiting. This is an area where I should learn more about.

Friday, June 27, 2014

Targeted crowdsourcing using app/interest categories of users

Part of my research is on crowdsourcing. Basically, crowdsourcing means performing micro-collaborations with many people to complete a task. You divide the task into microtasks and outsource it to people. They provide solutions to your microtasks, and you aggregate those to obtain the solutions to the microtasks, and then ultimately to your task.

Aggregating the responses from the crowd is a challenge of itself. If the questions are asked as open ended questions, the answers would come in a variety of types, and you would not be able to aggregate them automatically with a computer. (You may use human intelligence again to aggregate them, but how are you going to aggregate/validate these next level aggregators?)

To simplify the aggregation process, we use multiple-choice question answering (MCQA). When the answers are provided in choices, a, b, c, or d, they become unambiguous and easier to aggregate with a computer. The simplest solution for aggregation of MCQA is the majority voting: whichever option was chosen most is provided as the ultimate answer.

Recently, we started investigating MCQA-based crowdsourcing in more depth. What are the dynamics of MCQA? Is majority voting good enough for all questions? If not, how can we do better?

To investigate these questions, we designed a gamified experiment. We developed an Android app to let the crowd answer questions with their smartphones as they watch the Who Wants To Be A Millionaire (WWTBAM) quiz show on a Turkish TV channel. When the show is on air in Turkey, our smartphone app signals the participants to pickup their phones. When a question is read by the show host, my PhD students would type the question and answers, which would be transmitted via Google Cloud Messaging (GCM) to the app users. App users play the game, and enjoy competing with other app users, and we get a chance to collect precious data about MCQA dynamics in crowdsourcing.

Our WWTBAM app has been downloaded and installed more than 300,000 times and has enabled us to collect large-scale real data about MCQA dynamics. Over the period of 9 months, we have collected over 3 GB of MCQA data. In our dataset, there are about 2000 live quiz-show questions and more than 200,000 answers to those questions from the participants.

When we analyzed the data we collected, we found that majority voting is not enough for all questions. Although majority voting does well in the simple questions (the first 5 questions) and achieves more than 90% accuracy rate, as the questions get harder, the accuracy of majority voting plummets quickly to 40%. (There are 12 questions in WWTBAM. The question difficulty increases with each question. Questions 10, 11, 12 are seldom reached by the quiz contestants.)

We then focused on how to improve the accuracy of aggregation. How can we weigh the options to give more weight to correct answers and let them win even when they are in the minority?

As expected, we found that the previous correct answers by a participant indicate higher likelihood of being correct in this answer. By collaborating with colleagues in data mining, we came up with a page-rank like solution for history-based aggregation. This solution was able to raise the accuracy of answers to 90% for even the harder questions.

We also observed some unexpected findings from the data collected by our app. Our app collected the response time of the participants, and we saw that the response time has some correlation to correct responses. But the relation is funny. For the easier questions (the first 5), earlier responses are more likely to be correct. But for the harder questions, delayed responses are more likely to be correct. We are still trying to see how we can put this observation into good use.

Another surprising result came recently. One of my PhD students, Yavuz Selim Yilmaz, proposed a simple approach, which at the end provided as effective as the sophisticated history-based solution. This approach did not even use the history of participants, and that makes it more applicable. Yavuz's approach was to /use the interests of participants to weigh their answers/.

In order to obtain the interests of the participants, Yavuz had a very nice idea. He proposed to use the category of the apps installed in the participants phone. Surprised, I asked him how he plans to learn the other apps installed in the participant phones. Turns out this is one of the basic permissions Android gives to an installed app (like our WWTBAM app): it can query and learn about the other installed apps in the users phone. (That it is this easy is telling about Android privacy and security. We didn't collect/maintain any identifying information on users, but this permission can potentially be used for bad.)

Yavuz assigned interest categories to participants using Google Play Store's predefined 32 categories for apps (e.g. Books and Reference, Business, Comics, Communication, Education, Entertainment, Finance). If a participant has more than 5 apps installed in one of these categories, the participant was marked as having interest in that category. We used half the data as training set and found which interest categories produce the highest accuracy for a given question number. Then in the testing set, the algorithm is simply to use majority voting among the category which is deemed most successful for a given question number. Is this too simplistic an approach?

Lo and behold, this approach lifted the accuracy to around 90% across all level of questions. (This paper got the outstanding paper award in the Collaboration Technologies and Systems (CTS 2014) Conference)

Ultimately we want to adopt the MCQA-crowdsourcing lessons we learned from WWTBAM in order to build crowdsourcing apps in location-based recommendation services.

Another application area of MCQA-crowdsourcing would be performing market research. A lot of people in the industry, consumer goods, music, and politics are interested in market research. But market research is difficult to get right, because you are trying to predict if a product can get traction by asking about it to a small subset of people which may not be very relevant, representative. The context and interests of the people surveyed are important in weighing out the responses. (I hope this blog post will be used in the future to kill some stupid patents proposed on this topic ;-)