Monday, March 30, 2015

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services

This paper appeared in OSDI'14, and is authored by Michael Chow, University of Michigan; David Meisner, Facebook, Inc.; Jason Flinn, University of Michigan; Daniel Peek, Facebook, Inc.; Thomas F. Wenisch, University of Michigan.

The goal of this paper is very similar to that of Google Dapper (you can read my summary of Google Dapper here). Both work try to figure out bottlenecks in performance in high fanout large-scale Internet services. Both work use similar methods, however this work (the mystery machine) tries to accomplish the task relying on less instrumentation than Google Dapper. The novelty of the mystery machine work is that it tries to infer the component call graph implicitly via mining the logs, where as Google Dapper instrumented each call in a meticulous manner and explicitly obtained the entire call graph.

The motivation for this approach is that comprehensive instrumentation as in Google Dapper requires standardization....and I am quoting the rest from the paper:
[Facebook systems] grow organically over time in a culture that favors innovation over standardization (e.g., "move fast and break things" is a well-known Facebook slogan). There is broad diversity in programming languages, communication middleware, execution environments, and scheduling mechanisms. Adding instrumentation retroactively to such an infrastructure is a Herculean task. Further, the end-to-end pipeline includes client software such as Web browsers, and adding detailed instrumentation to all such software is not feasible.

While the paper says it doesn't want to interfere with the instrumentation, of course it has to interfere to establish a minimum standard in the resulting collection of individual software component logs, which they call UberTrace. (Can you find a more Facebooky name than UberTrace---which the paper spells as √úberTrace, but I spare you here---?)
UberTrace requires that log messages contain at least:
1. A unique request identifier.
2. The executing computer (e.g., the client or a particular server)
3. A timestamp that uses the local clock of the executing computer
4. An event name (e.g., "start of DOM rendering").
5. A task name, where a task is defined to be a distributed thread of control.
In order not to incur a lot of overhead, UberTrace uses a low sampling rate of all requests to Facebook. But this necessitates another requirement on the logging:
UberTrace must ensure that the individual logging systems choose the same set of requests to monitor; otherwise the probability of all logging systems independently choosing to monitor the same request would be vanishingly small, making it infeasible to build a detailed picture of end-to-end latency. Therefore, UberTrace propagates the decision about whether or not to monitor a request from the initial logging component that makes such a decision through all logging systems along the path of the request, ensuring that the request is completely logged. The decision to log a request is made when the request is received at the Facebook Web server; the decision is included as part of the per-request metadata that is read by all subsequent components. UberTrace uses a global identifier to collect the individual log messages, extracts the data items enumerated above, and stores each message as a record in a relational database.

The mystery machine

To infer the call graph from the logs, the mystery machine starts with a call graph hypothesis and refines it gradually as each log trace provides some counterexample. Figure 1 and Figure 2 explain how the mystery machine generates the model via large scale mining of UberTrace.

For the analysis in the paper, they use traces of over 1.3 million requests to the Facebook home page gathered over 30 days. Was the sampling rate enough, statistically meaningful? Figure 3 says yes.

We know that for large scale Internet services, a single request may invoke 100s of (micro)services, and that many services can lead to 80K-100K relationships as shown in Figure 3. But it was still surprising to see that it took 400K traces for the call graph to start to converge to its final form. That must be one heck of a convoluted spaghetti of services.


The mystery machine analysis is performed by running parallel Hadoop jobs.

Figure 5 is why critical path identification is important. Check the ratios on the right side.

How can we use this analysis to improve Facebook's performance?

As Figure 9 showed, some users/requests have "slack" (another technical term this paper introduced). For the users/requests with slack, the server time constitutes only a very small fraction of the critical path, which the network- and client-side latencies dominate.

And there are also a category of users/requests with no slack. For those, the server time dominates the critical path, as the network- and client-side latencies are very low.

This suggests a potential performance improvement by offering differentiated service based on the predicted amount of slack available per connection:
By using predicted slack as a scheduling deadline, we can improve average response time in a manner similar to the earliest deadline first real-time scheduling algorithm. Connections with considerable slack can be given a lower priority without affecting end-to-end latency. However, connections with little slack should see an improvement in end-to-end latency because they are given scheduling priority. Therefore, average latency should improve. We have also shown that prior slack values are a good predictor of future slack [Figure 11]. When new connections are received, historical values can be retrieved and used in scheduling decisions. Since calculating slack is much less complex than servicing the actual Facebook request, it should be feasible to recalculate the slack for each user approximately once per month.

Some limitations of the mystery machine 

This approach assumes that the call graph is acyclic. With their request id based logging, they cannot handle the same event, task pair to appear multiple times for the same request trace.

This approach requires normalizing/synchronizing local clock timestamps across computers. It seems like they are doing offline post-hoc clock synchronization by leveraging the RPC calls. (Does that mean further instrumentation of the RPC calls?)
Since all log timestamps are in relation to local clocks, UberTrace translates them to estimated global clock values by compensating for clock skew. UberTrace looks for the common RPC pattern of communication in which the thread of control in an individual task passes from one computer (called the client to simplify this explanation) to another, executes on the second computer (called the server), and returns to the client. UberTrace calculates the server execution time by subtracting the latest and earliest server timestamps (according to the server's local clock) nested within the client RPC. It then calculates the client-observed execution time by subtracting the client timestamps that immediately succeed and precede the RPC. The difference between the client and server intervals is the estimated network round-trip time (RTT) between the client and server. By assuming that request and response delays are symmetric, UberTrace calculates clock skew such that, after clock-skew adjustment, the first server timestamp in the pattern is exactly 1/2 RTT after the previous client timestamp for the task.
This work also did not consider mobile users; 1.19 billion of 1.39 billion users are mobile users.

Related links

Facebook's software architecture

Scaling Memcache at Facebook

Finding a needle in Haystack: Facebook's photo storage

Google Dapper, a Large-Scale Distributed Systems Tracing Infrastructure

Wednesday, March 25, 2015

Paper review: "Simple Testing Can Prevent Most Critical Failures: An Analysis of Production Failures in Distributed Data-Intensive Systems"

This paper appeared in OSDI'14 and is written by Ding Yuan, Yu Luo, Xin Zhuang, Guilherme Renna Rodrigues, Xu Zhao, Yongle Zhang, Pranay U. Jain, and Michael Stumm, University of Toronto.

While the title mentions "analysis of production failures", the paper is not concerned/interested in root cause analysis of failures, instead, the paper is preoccupied with problems in error handling that lead to failures.

I enjoyed this paper a lot. The paper has interesting counterintuitive results. It opens to discussion the error handling issue, and particularly the Java exception handling. Digesting and interpreting the findings in the paper will require time. To contribute to this process, here is my take on the findings ---after a quick summary of the paper.

The setup and terminology 

The paper studied 198 randomly sampled user-reported failures of 5 opensource data-intensive distributed systems: Cassandra, HBase, Hadoop Distributed File System (HDFS), Hadoop MapReduce, and Redis. Table 1 shows how the sampling is done. (As a superficial observation, it seems like HBase is very buggy where 50% of sampled failures are catastrophic, and Cassandra is well engineered.) Almost all of these systems are written in Java, so Java's exception-style error handling plays a significant role in the findings of the paper.

Definitions: A fault is the initial root cause, which could be a hardware malfunction, a software bug, or a misconfiguration. A fault can produce abnormal behaviors referred to as errors, such as Java exceptions. Some of the errors will have no user-visible side-effects or may be appropriately handled by software; other errors manifest into a failure, where the system malfunction is noticed by end users or operators.

The paper defines catastrophic failure as "failures where most or all users experience an outage or data loss". Unfortunately this is a very vague definition. Table 2 provides some example categories for catastrophic failures considered.


Table 3 shows that single event input failures are relatively low, this is probably because these systems are well-tested with unit tests as they are heavily used in production. On the other hand, the predominating case is where 2 events conspire to trigger the failures.

From Table 4, what jumps up is that starting up services is particularly problematic. Incidentally, in airplanes most fatal accidents occur during the climbing stage.

Table 5 shows that almost all (98%) of the failures are guaranteed to manifest on no more than 3 nodes, and 84% will manifest on no more than 2 nodes. For large-scale distributed systems, 3 nodes being sufficient to manifest almost all failures seems surprisingly low. Of course, this paper looks at data-intensive distributed systems, which may not be representative of general distributed systems. In any case, these numbers don't surprise me as they agree with my experience using TLA+ in verifying distributed algorithms.

Deterministic failures totaling at 74% of all failures is good news. Deterministic failures are low-hanging fruit, they are easier to fix.

Catastrophic failures

Figure 5 shows a break-down of all catastrophic failures by their error handling. Based on this figure, the paper claims that "almost all (92%) of the catastrophic system failures are the result of incorrect handling of non-fatal errors explicitly signaled in software".

But, it seems to me that this provocative statement is due to a broad/vague definition of "incorrect error handling". If you use a broad/vague definition of "incorrect landing", you can say that every catastrophic airplane failure is an incorrect landing problem. Java casts everything into an error exception, then every fault materializes/surfaces as an exception. But, does that mean if we do a good job on exception handling, there will be almost no catastrophic failures? That is an incorrect assumption. Sometimes the correction required needs to be invasive (such as resetting nodes) and the correction also counts as a catastrophic failure.

And how can we do a good job on error handling? The paper does not provide help on that. Correct error-handling is very hard: You need a lot of context and a holistic understanding of the system, and that is probably why error-handling has been done sloppily in the systems studied.

The paper also claims: "in 58% of the catastrophic failures, the underlying faults could easily have been detected through simple testing of error handling code." I can agree with the 35%, as they consist of trivial mistakes in exception handling, such as an empty error handling block. But for including the other 23% labeling them as easily detectable, I think we should exercise caution and not rule out the hindsight bias.

To prevent this 23% failures, the paper suggests 100% statement coverage testing on the error handling logic. To this end, the paper suggests that reverse engineering test cases that trigger them. At the end of the day, this boils to thinking hard to see how this can be triggered. But how do we know when to quit and when we got enough? Without a rule, this can get cumbersome.

Figure10 shows an example of the 23% easily detectable failures. That example still looks tricky to me, even after we exercise the hindsight advantage.

Speculation about the causes for poor error-handling

Maybe one reason for poor error-handling in the studied systems is that features are sexy, but fault-tolerance is not. You see features, but you don't see fault-tolerance. Particularly, if you do fault-tolerance right, nobody notices it.

But, that is a simplistic answer. After all, these systems are commonly used in production, and they are well-tested with the FindBugs tool, unit tests, and fault injection. Then, why do they still suck so badly at their exception-handling code? I speculate that maybe there is a scarcity of "expert generalists", developers that understand the project as a whole and that can write error-handling/fault-tolerance code.

Another reason of course could be that developers may think a particular exception will never arise. (Some comments in the exception handlers in the 5 systems studied hint at that.) That the developers cannot anticipate how a particular exception can be raised doesn't mean it won't be raised. But, maybe we should include this case within the above case that says there is a scarcity of fault-tolerance experts/generalists in these projects.

"Correct" error handling

While the paper pushes for 100% error handling coverage,  it doesn't offer techniques for "correct" error handling. Correct error handling requires root cause analysis, a holistic view of the system, and fault-tolerance expertise.

So there is no silver bullet. Exceptions in Java, and error handling may facilitate some things, but there is no shortcut to fault-tolerance. Fault-tolerance will always have a price.

The price of reliability is the pursuit of the utmost simplicity.
It is a price which the very rich find most hard to pay.
C.A.R. Hoare

Possible future directions

Since 2-input triggered corruptions are predominant, maybe we should consider pairwise testing in addition to unit testing to guard against them. Unit testing is good for enforcing a component's local invariants. Pairwise testing can enforce an interaction invariant between two components, and guard against 2-input triggered corruptions.

I think the main reason it is hard to write "correct error handling" code is that the exception handler doesn't have enough information to know the proper way to correct the state. This is still futuristic, but if we had an eidetic system, then when exceptions hit, we could call the corrector, which can do a root-cause analysis by doing a backwards query on the eidetic system, and after determining the problem could do a forward query to figure out what are precisely the things that need to be fixed/corrected as a result of that fault.

Thursday, March 12, 2015

My crazy proposal for achieving lightweight distributed consensus

Distributed consensus is a hard problem. See some of my previous posts for discussion about impossibility results on distributed consensus.
Paxos taught
Perspectives on the CAP theorem
Attacking Generals problem

The reason distributed consensus is hard is because the parties involved don't have access to the same knowledge (the same point of view) of the system state.  Halpern's work on knowledge and common knowledge in distributed systems provides a useful framework to explain this.  Here is a summary of common knowledge paper by Halpern, and here is a nice talk on that paper.

Of course, we have distributed consensus solutions, Paxos, ZooKeeper/ZAB, that ensure safety always and provide progress whenever the system conditions step outside the impossibility results territory (with respect to synchrony, channel reliability/reachability, and number of up nodes). But these solutions come with a performance cost, as they need serialization of all update requests from a single Paxos leader, and acknowledgement of these updates by a majority quorum of the Paxos replicas. In ZooKeeper non-leader replicas can also serve reads locally, but the updates still need to get serialized by the leader. Some Paxos variants such as Mencius, ePaxos try to relieve this problem, but the bottom-line is they are still subject to the same fundamental performance limitations due to serialization at a single leader.

I have this crazy idea for circumventing the impossibility results of distributed consensus as well as improving the performance of distributed consensus. The idea is to connect the nodes involved in consensus (let's call these nodes coordinators) with a single-collision-domain Ethernet bus in order to solve the asymmetric knowledge problem.

No, this setup does not assume reliable communication. There can be collisions in the Ethernet bus. But the collisions will be total collisions since this is a shared Ethernet bus. When there is a collision, none of the coordinators would deliver the message, including the sender. Since Ethernet on a shared bus is CSMA-CD, the transmitter also detects the collision and does not accept its own message into the consensus log if that is the case.

So, in effect, the shared Ethernet bus performs the serialization of the coordinators' proposals. As a result, a coordinator proposing an operation does not need to collect explicit acknowledgement from any other coordinator let alone from a majority quorum of coordinators. This makes this consensus algorithm very lightweight and fast.

This system is masking fault tolerant to coordinator crashes until at least one coordinator left remaining. (If we are to allow reintegrating coordinators recovering back from crash, things get complicated of course. Then we would need to assume reasonable churn to allow time for recovering coordinators to catch up before they can be integrated. This would also require fast one broadcast consensus on the reconfigurations.)

That is it. That simple. Now comes the algorithmician's apology.

On the theory side, I know I am not suggesting anything novel. The impossibility results withstand; I just changed the system conditions and stepped outside the territory of the impossibility results (that is why I used the term "circumvent"). In fact, I had noticed this idea first in the context of wireless broadcast and wireless sensor networks, when I was a post-doc at Nancy Lynch's group at MIT. We had published papers exploring the concept for wireless broadcast with total and partial collisions.

On the practical side, I know this proposal has downsides. This is not readily applicable as it requires the Ethernet driver to expose collision information to the application layer. This requires setting up an auxiliary Ethernet LAN across the coordinators. And, yes, I know this doesn't scale outside a LAN. (The coordinators should be connected by the single domain Ethernet bus, but the clients to the coordinators may communicate to the coordinators over TCP/IP and need not be in the same LAN. The coordinators can be located across different racks to guard against rack-wide crashes.)

But every system design is an exercise in determining which tradeoffs you make. The important question is: Is this tradeoff worth exploring? Would the performance improvement and simplicity stemming from this setup makes this a reasonable/feasible option for solving the distributed consensus problem at the datacenter level?

Tuesday, March 10, 2015

Eidetic systems

This paper appeared in OSDI'14. The authors are all from University of Michigan: David Devecsery, Michael Chow, Xianzheng Dou, Jason Flinn, and Peter M. Chen.

This paper presents a transformative systems work, in that it introduces a practical eidetic system implementation on a Linux computer/workstation. This paper is a tour de force: It undertakes a huge implementation effort to implement a very useful and novel eidetic memory/system service. The authors should be commended for their audaciousness.

An eidetic computer system can recall any past state that existed on that computer, including all versions of all files, the memory and register state of processes, interprocess communication, and network input. An eidetic computer system can explain the lineage of each byte of current and past state. (This is related to the concept of data provenance, which I mention briefly at the end of my review.)


One use case for an eidetic system is to track where/how erroneous information entered to the system. The paper considers tracking down a faulty bibtex reference as a case study. This is done using a backwards query. After tracking down the faulty bibtex reference you can then perform a forward query on the eidetic system, in order to figure out which documents are contaminated with this faulty information and to fix them.

Another use case for an eidetic system is to do postmortem of a hack attack and whether it leaked any important information. In the evaluation section, the paper uses as another case study the heartbleed attack, which occurred during time the authors were testing/evaluating their eidetic system implementation.

With a good GUI for querying, the eidetic system concept can enhance the Mac OSX Time Machine significantly, with data lineage/provenance, backward querying, and forward querying/correction. This can augment time travel with analytics, and you can have a time machine on steroids. (Accomplishment unlocked: +100 points for serious use of time machine and time travel in writing.)

Design and implementation

The authors develop the eidetic system, Arnold, by modifying Linux kernel to record all nondeterministic data that enters a process: the order, return values, and memory addresses modified by a system call, the timing and values of received signals, and the results of querying the system time. Arnold and accompanying eidetic system tools (for replay, etc.) are available as opensource.
The key technologies that enable Arnold to provide the properties of an eidetic system efficiently are deterministic record and replay, model-based compression, deduplicated file recording, operating system tracking of information flow between processes, and retrospective binary analysis of information flow within processes.
Arnold uses deterministic record and replay, and trades storage for recomputation whenever possible. That is, Arnold only saves nondeterministic choices or new input and can reproduce everything else by recomputation. The major space saving technique Arnold uses is model based compression: Arnold constructs a model for predictable operations and records only instances in which the returned data differs from the model. Another optimization is copy on RAW (read-after-write) recording: "To deduplicate the read file data, Arnold saves a version of a file only on the first read after the file is written. Subsequent reads log only a reference to the saved version, along with the read offset and return code." These techniques enable Arnold to fit 4 years of desktop/workstation eidetic system into 4TB of off-the-shelf hard disk (which costs $150).

Querying and Replaying

Arnold uses the replay groups abstraction to perform storing and replaying efficiently. Replay groups consist of frequently communicating processes which can be replayed independently of any other group. Arnold employs "Pin" binary instrumentation to analyze replayed executions and track the lineage of data within a replay group. Inter process communication is tracked with the help of a dependency graph which keeps track of the communications between different replay groups. Bundling frequently communicating processes into a group ensures that a large number of conversations need not be recorded to the dependency graph. As such selection of replay group (and replay group size) gives rise to a tradeoff between storage efficiency and query efficiency. It would be nice if the paper provided the replay groups it used in Arnold as a table. This information would be useful to understand the replay groups concept better.

Arnold records even user propagated lineage, such as a user reading a webpage and entering text into an editor as a result. (Of course this leads to introducing some false positives, as it needs to be done speculatively.) Tracking this actually required a lot of work: "Understanding GUI output turned out to be tricky, however, because most programs we looked at did not send text to the X server, but instead sent binary glyphs generated by translating the output characters into a particular font. Arnold identifies these glyphs as they are passed to standard X and graphical library functions. It traces the lineage backward from these glyphs using one of the above linkages (e.g., the index linkage)."

Finally, for the querying of Arnold, the paper has this to say. "A backward query proceeds in a tree-like search, fanning out from one or more target states. The search continues until it is stopped by the user or all state has been traced back to external system inputs. As the search fans out, Arnold replays multiple replay groups in parallel. In addition, if no lineage is specified, it may test multiple linkages for the same group in parallel, terminating less restrictive searches if a more restrictive search finds a linkage."

Unfortunately, user-friendly GUI-based tools for querying is not available yet. That would be asking too much from this paper which already packed a lot of contributions into a single publication. The evaluation section gives some results about backward and forward querying performance in Arnold.

Related work on data provenance

Data provenance is a topic which has been studied as part of the database field traditionally. However, recent work on data provenance started considering the problem of capturing provenance for applications performing arbitrary computations (not resricted to a small set of valid transformations in database systems). The paper "A primer on provenance" provides a nice accessible survey of data provenance work.

Future work

This paper presents an eidetic system on a single computer. An obvious future direction is to enable building an eidetic distributed system. By leveraging Arnold, such a system also seems to be in reach now. Our work on hybrid logical clocks can also help here by relating and efficiently tracking causality across distributed nodes running Arnold. Since our hybrid logical clocks can work with loosely synchronized time (a la NTP), and is resilient to uncertainty (it enables efficient tracking of causality without blocking for synchronization uncertainties), it can be adopted for implementing a distributed eidetic system in practice.

A remaining kink for a distributed eidetic system could be the cost of querying. Querying and replay is already slow and hard for a single eidetic system, and it is likely to become more complicated for a distributed system since coordination of replay is needed across the machines involved in the replay.

Thursday, March 5, 2015

Extracting more concurrency from Distributed Transactions

This paper appeared in OSDI'14. The authors are: Shuai Mu, Yang Cui, Yang Zhang, Wyatt Lloyd, Jinyang Li. The paper, presentation slides and video are accessible here.

The paper proposes a concurrency control protocol for distributed transactions, and evaluates its performance comparing with two-phase locking (2PL) and optimistic concurrency control (OCC).

The protocol

The protocol introduced, ROCOCO (ReOrdering COnflicts for COncurrency) is targeted for extracting more concurrency under heavily contended workload than 2PL and OCC can handle. In fact, ROCOCO's improvements over OCC and 2PL comes after the peak throughput point of even ROCOCO. One of the questions after the OSDI presentation was about this. "That region where the system is thrashing is not a region you want to be. Why would you not employ admission control to refuse extra workload that pushes the system past peak performance?"
ROCOCO assumes that a distributed transaction consists of a set of stored procedures called pieces. Each piece accesses one or more data items stored on a single server using user-defined logic. Thus, each piece can be executed atomically with respect to other concurrent pieces by employing proper local concurrency control.

ROCOCO achieves safe interleavings without aborting or blocking transactions using two key techniques: 1) deferred and reordered execution using dependency tracking; and 2) offline safety checking based on the theory of transaction chopping.

ROCOCO's transaction reordering idea is adopted from the ePaxos protocol introduced a couple years ago. This is a neat idea. The first phase is sort of like a dry run for the transaction. Dependencies with other concurrently executing transactions are learned. In the second phase, the dependent transactions are synchronized. They are forced to wait for each other and executed that way.

This approach is basically pipelining the transactions. This is also similar to what the Calvin does with its log-based approach. Pipelining helps for throughput of course, but it also introduces a drawback.

Unlike 2PL and OCC which executes a depended-upon transaction to completion before allowing its dependent/conflicting transactions to execute, ROCOCO is deciding on an order and pipelining the execution of these conflicting transactions in some determined order. However, if the first transaction of these pipeline-executed transactions does not complete for some reason or due to a fault and needs to be rolled back, this also requires rolling-back the remaining transactions in the pipeline that depended on this transaction. This is a problem 2PL and OCC did not have.

This basic reordering protocol is when some of the transaction pieces/fragments are deferrable. For transaction fragments that are immediate (whose outputs are inputs to other pieces in a transaction), the reordering protocol is inapplicable, and the paper uses an offline checker to avoid conflicts in such situations.  The Offline checker works in following steps:
1. It constructs an S-C graph based on transaction chopping. Each edge in the graph is either a Sibling edge (an edge formed for pieces of same transaction instance) or a Conflict edge (an edge formed by pieces which access the same database table and any one of the piece issues a write).
2. Each vertex in the graph is either tagged as immediate (I) or deferrable (D). A conflict edge  can be an I-I edge or a D-D edge.
3. The checker observes all the S-C cycles formed by the graph. SC-cycles represent potential non-serializable interleavings. However, if an SC-cycle contains at least one D-D edge, ROCOCO can reorder the execution of the D-D edge's pieces to break the cycle and ensure serializability. For an unreorderable SC-cycle with all I-I C-edges, the checker proposes to merge those pieces in the cycle belonging to the same transaction into a larger atomic piece. ROCOCO relies on traditional distributed concurrency control methods such as 2PL or OCC to execute merged pieces.


ROCOCO is  implemented as an in-memory key-value store with 20K C++ code  and evaluated using a scaled TPC-C benchmark in comparison to OCC and 2PL. Given that Calvin  is a closely related work (because it also orders transactions and pipelines their execution), it would be good to see a comparison to Calvin, but the paper does not include that.

Some miscellaneous thoughts

In the introduction, the following paragraph is provided as a motivation for ROCOCO.
Unfortunately, contention is not rare in large-scale OLTP applications. For example, consider a transaction where a customer purchases a few items from a shopping website. Concurrent purchases by different customers on the same item create conflicts. Moreover, as the system scales—i.e., the site becomes more popular and has more customers, but maintains a relatively stable set of items—concurrent purchases to the same item are more likely to happen, leading to a greater contention rate.
The OSDI presentation also includes the same example as motivation. I don't think this is the right/best way to argue that contentions will happen, because this is a faux contention, and not an inherent contention. When the number of a sale item is very high (which is almost always the case), why do we care to carefully check the number of items remaining? Conflict-Free Replicated Data Types (CRDT) approach helps here to avoid conflicts easily. (For some nice papers on this see: CRDT1, CRDT2, CRDT3) The coordination avoidance in distributed databases paper also argues for  avoiding coordination when all local commit decisions are globally valid (in other words, when the commit decisions are composable).

Sunday, March 1, 2015

My trick for coordinating Dropbox collaborations

There are high-tech solutions to managing the collaborative paper writing process, such as using version control systems like CVS or git. The downside to these systems is that when you are collaborating on a paper with a low-tech author, it is cumbersome to get them on board with these tools. Some of my collaborators have been from a non-CS background. And even some CS-background collaborators refuse to deal with cryptic version control commands and error messages.

Dropbox provides a simple easy-to-use solution to sharing files which helps  teams to collaborate on projects, including collaborating on a paper. However, Dropbox does not have access control and cannot coordinate concurrent accesses to the same file by multiple writers. This causes overwritten/lost updates and frustrated collaborators.

I have a low-tech solution to this problem. I have used this solution successfully with multiple collaborators over the last 2-3 years. Chances are that, you have also come up with the same idea. If not, feel free to adopt this solution for your collaborations. It helps.

Here is the solution. When I share a folder to collaborate with coauthors, I create a token.txt file in that folder. This file is there to coordinate who edits which file at any given time.

Below is a sample token.txt file contents. The first part explains the guidelines for coordination, the second part lists the files to collaborate on. We generally use one file to correspond to one section of the paper.

Check with token.txt file to see who is editing which file in order to avoid conflicts.
If you want to write:
+ pick a section/file,
+ reopen token.txt to avoid seeing old-state,
+ write your name next to that section/file at token.txt to get the lock/token,
+ quickly hit save on token.txt, and
+ start editing that section/file on your own pace.

When done writing:
+ delete your name for that section/file at token.txt. (I.e., Unlock that file.)

Don't keep unlocked files open in your editor, lest you inadvertently save them over a newer version your collaborator wrote. An editor that auto-saves may also cause that problem.
eval: ...........Ailidani
concl: This file is always with Murat! Treat this as read-only, don't edit.

The token.txt file acts as a message board to show who is working on which file. Of course there can be concurrent updates to token.txt, but this is less likely because you make a tiny edit and then quickly save this file. Moreover, many editors, including Emacs, notifies you immediately if the token.txt file has been changed during the time you keep it open.

(This is in essence similar to the use of RTS-CTS messages in wireless communications. Instead of having a more costly collision for a long-size data transfer, it is an acceptable tradeoff to have occasional collided/lost RTS messages, because those are very short transfers and are low-cost.)

Yes, this is a lazy low-tech solution and requires some discipline from collaborators, but the effort needed is minimal, and I haven't got any bad feedback from any collaborator on this. During the time I used this method, we never had a problem with concurrent editing on the same file. (I used this with teams of size 2-to-6, and only for collaborating on papers.) One problem, however, we occasionally had was with someone forgetting to remove a lock after he is done working on that file. In this case, we resort to email: "Are you still holding the lock on this file? When do you plan to release it?"

Notice the "ALL FILES" entry at the end. That is when one author may want to lock all the files for a short duration easily. This happens infrequently, but comes handy for restructuring the paper, making fast substitution edits across all files, or spell-checking the entire paper before submission.

You may also notice a entry at the bottom. That is an org-mode file where I keep my TODO items and notes about the paper. It is my lab-notebook for this writing project. I can't function in any project without my file.

PS: Geoffrey Challen suggests: "You could also replace Murat's token.txt file with a Google Drive document." 

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...