TLA+ project2 solution (2016)

In a previous post, I had given the description of project2. Here is the TLA+ project2 solution made available on github. Below I will briefly explain the solution.

If you are not familiar with the chain-replication protocol, read this brief summary. 

The setup and variables

The first part is about the setup. We declare the process IDs for storage nodes (denoted as Nodes), the clients, and the configurator process.

The nodes maintain "db" each, where the item is updated and queried. For simplicity, our distributed key-value store maintains only a single item, and initialized to ver=-1, val=-1, cli=-1.

The nodes also have an auxiliary variable "up", which is used for modeling the status of the node. A node can go "down" at any time provided that less than FAILNUM nodes are currently down.

Each node and client has a message box, initially all message boxes are empty.

Finally, there is a global variable chain. Initially it is an empty sequence. As the configurator populates the chain, the chain will list in order the storage nodes: head, interim nodes, and the tail storage node. The configurator maintains the chain by removing the IDs of crashed nodes from the chain, and adding healthy nodes to the tail of the chain.

(In practice this can be implemented in several ways. chain could be the API of the configurator process, and can be obtained by an RPC call to the configurator. Or chain could be cached at the client-side library, and if/when the configurator changes the chain, it can update the chain variable at the library.)

The define block

Here comes the macros. We write this to simplify our modeling. As I wrote earlier, once we decide on the macros, we are halfway done with our modeling. "I find that once you get the "define" block right, the rest of the PlusCal algorithm practically writes itself. But you may need to make a couple unsuccessful takes before you figure out the most suitable /define/ block that leads to a succinct and elegant model."

IsUp is a function that takes an ID of a storage node, and returns whether that storage is up or not. Its implementation is very simple. Since we model a node being up or not using the up variable for that node, the macro just returns the up variable for the corresponding node. Why did we have a macro for this? We will use this boolean function as a filter for SelectSeq operator when we discuss the configurator.

UpNodes return a set that consists of the storage nodes that are up. We could have also written it as {n \in Nodes: IsUp(n)}. InChain(s) predicate returns whether node ID "s" is included in the chain or not. I implemented this by using the Seq operator. If chain is one of the subsequences of Nodes \ {s}, then s is not in the chain. Otherwise s is in the chain.

ChainNodes returns the set of all nodes s where InChain(s) is satisfied. FreeUpNode returns an "up" node that is not in the chain. GetIndex(s) returns the index of the node s in the chain.GetNext(s) returns the successor of node s in the chain.

The client

The client sends write requests to the storage nodes. The client gets to its work only after a chain is formed by the configurator. To keep our model checking short/finite, we make the client to send STOP number of requests.

Before sending an update request, the client first reads the current version of the item from the key value store. This is done in label CLR inside a while loop. The read request is sent to the node at the tail of the chain, which is given by chain[Len(chain)]. The client sends the request by writing to the messagebox of the tail node. Why is this inside a while loop? Because we leave the fault-tolerance to the client. The client's read request may be dropped. This can happen either via a message loss, or when the tail node crushes after receiving the read request. If the client does not get a response back to its read request, it re-sends the read request to the current tail of the chain, until it receives a message back to its message box. Then, the client sets the hver to be the latest version of the item+1, and removes the message from its message box, by resetting the message box to empty.

In the CLW tag, the client writes the update to the item to the head of the chain, i.e., chain[1], as per the chain replication protocol. This is also done in a while loop, because an update can be lost (if the head or interim node in chain crashes), and it is the client's responsibility to retry the update until it receives back an acknowledgment for that particular update.

After the write is acknowledged, the client removes the acknowledgment message from its messagebox, and increments the local variable counter, which maintains the number of successful updates the client made.

The storage nodes

The storage node actions are in two parts. The second part (starting with label NDF) is the modeling of a crash or recovery of a node. If FAILNUM number of crashes are not reached, any node can crash by setting its up variable False, and any crashed node can recover back by setting its up variable to True.

The first part actions show how a node reacts to a request in its message box. The preconditions to serving a request are that the node has a request in its message box, the node is up and part of the chain. Then, we check the type of the message: val=-1 means this is a read message, otherwise it is an update message and the db is updated with the message. The message is then propagated to the messagebox of the next node in the chain. (This is done regardless of whether it is a read message or update message). If this is the tail, then the message is written to the messagebox of the client indicated on the message (as that client has sent this request to the system). Finally the node sets its messagebox to empty when it finishes processing this message.

The configurator

The configurator process is infallible. (In practice it would be implemented as a Paxos box.) Its job is to monitor and configure the chain. (If you are not familiar with the chain-replication protocol, read this brief summary.)

It first checks if the chain is of length=3 and if there is an available up node to append to chain it will add it to chain to get the chain of length=3. When a new node is appended as the tail, its db is also initialized by copying the db of the current tail of the chain.

After repopulating the chain, the configurator checks the health of the nodes in the chain, and removes the crashed nodes in the chain. For this I used the SelectSeq operation which filters the chain based on the output of IsUp predicate on each member of the chain.

Model checking

The system is to satisfy single-copy consistency. Single-copy consistency means the storage nodes appear to outside as if it is a single virtual infallible node, even though upto FAILNUM of physical storage nodes can fail. More specifically, the highest version number result returned by a read from the tail should match the item stored by the most recently acknowledged write operation on the system.

If there is only one client of the system (you can configure this by setting C=1 in model checking parameters), then there is a shortcut to checking single-copy consistency. Since the single client is using cntr to keep track of the writes completed and updates the item val with cntr+1, the version number, hver, and val of the item should always match. The Consistent invariant checks that this is true at all storage nodes. "Consistent" can be violated if the client reads an old ver, and cntr is incremented as usual, so that ver becomes lower than cntr = val. However, if we implemented our chained-storage to implement single-copy consistency correctly, Consistent should hold as invariant in the presence of a single Client.

Of course that is a very restrictive invariant (assumes a single client, and assumes val=cntr). Let's look for more general invariants. CCon denotes that an earlier node in the chain will have more recent information "hver" than a later node in the chain. CPro is an end-to-end invariant at the client. It says that for any client, the hver maintained at the client is nondecreasing. That is, a client never reads an older/smaller hver from the system after reading a recent/larger hver. CPro is actually written as a temporal formula, so when model-checking CPro should be added to the temporal formulas section in the model. It has a peculiar form: hver' means the next state value of hver. And the formula reads as: It is always the case that for any client, the next state value of hver is greater than or equal to hver value at the current state.

When I model check with C=1 single client I see that Consistent, CCon, and CPro holds for this model. (Of course it took me many iterations to get there. There were subtle bugs that led to referencing nonexistent nodes in chain, etc.). When I check with C=2, the model checker complains that Consistent is violated. Of course this is expected, so I uncheck Consistent from the invariants, so that I can check that CCon and CPro holds.

Surprise! CCon is violated with two clients. (The same violation also applies for CPro of course.) The model checker spits out a 35 step counter example. Here is the gist of the problem.

Initially both client1 and client2 reads version as -1, and they both start an update with version 0. Client1 writes with version 0, client2's write is dropped in the messagebox at the head as Client1 overwrites that. Client1 then reads version as 0, and starts another write version=1, and succeeds. Client2 hasn't acted yet. Finally, Client2 retries its write with version 0 and manages to write with version 0 to the chain. And the chain went from version 1 down to version 0. CCon is violated. (CPro would also be violated, because Client1 would read a version=0 after it has read version=1.)

What went wrong? Modeling the messagebox with length=1 played a role in the counterexample. Only one message can be in the message box at a time, and if another message arrives before this message is read/consumed by the node, that first message gets lost. If the implementation uses TCP, this overwriting problem would be taken care of. However, it is still possible for Client2's write to get lost and CCon and CPro be violated: Client2 might write to the head which subsequently fails, then Client1 writes to the new head and succeeds with version=0 and version=1, then Client2 retries and overwrites the chain db with version=0.

As one way to fix the problem, we can consider changing the blind rewriting at label CLW. We may consider adding the client another action (which could be added to the process with an either clause) to check if the write succeeds and if not to retry the write but starting from CLR so that the client reads the new version (if any). But this is still prone to a race condition, as we make assumptions about relative speeds of Client1 and Client2. After Client2 reads a version, before it starts the CLW portion of the write, Client1 may have completed its write incrementing the version and causing the same counterexample. It seems like the proper way to handle this problem is at the storage node level. The storage node should reject an update that downgrades the version number of the item, and the client should get a rejection for its update request. I haven't put that update in my model to show the counterexample with two client case.

Some other subtle points in the modeling

I chose to model crash detection of nodes using an auxiliary variable "up", and I made it a global variable so that the configurator can readily access this information. I didn't put emphasis on a realistic failure detection part, and instead focused on the correctness modeling of the chain-based key-value store. A more realistic, closer to implementation way of doing this would be to not to use an auxiliary variable at all. For example,  the configurator could rely on periodic heartbeat messages from the nodes to implement failure detection of the nodes.

I also model "db" as a global variable. I tried to avoid this at first, but then I needed a mechanism for copying the db of the tail, when I append a new node to be the new tail of the chain. This could have been done by message passing and inventing a special request message, but that would make the model longer. I went for a shorter model and exposed db of the storage node. The only place I use this is for copying db when appending a new tail, and this would be something to refine further as we move towards implementing our protocol.

One way to refine this could be as follows. The new tail can send a "read" message to the current tail, as if it is a client. This would propagate the db of the current tail to the candidate tail, and that is copied at the candidate tail. But then we need a signaling mechanism between the candidate tail and the configurator so that the candidate tail can be declared as the new tail. But this task  is race condition prone, so this should be modeled in PlusCal and checked before it makes its way to the implementation. For this proposed refinement, the race condition can occur as follows: the current tail acknowledges a write to the client, and then the new tail is added which did not get this update, the client may query the new tail for a read and get an old version of the item. Single copy serializability is violated.

So we unearthed a concurrency race condition for 2 clients, and another race for adding a candidate tail. What other types of concurrency problems could be there with a sloppy protocol? Many different types. It is fun to go through the modeling process with TLA+ as you can observe how things may go wrong in ways you haven't anticipated. The TaxDC paper gives a good study of concurrency bugs in production use, well-debugged open source systems. Here are the common misconceptions about distributed systems that leads to these bugs:
+ One hop is faster than two hops.
+ No hop is faster than one hop.
+ Interactions between multiple protocols seem to be safe.
+ What you thought as an atomic block of execution wasn't, because another process was scheduled to execute something concurrently and that other process changed the system state in a way you didn't anticipate.


Noah Watkins said…
I can see how the client retry combined with the message inbox with capacity one models lost messages etc... are there benefits to modeling this way as opposed to explicitly representing the set of messages sent to a node, and say, removing them from the set to model message dropping?

Popular posts from this blog

I have seen things

SOSP19 File Systems Unfit as Distributed Storage Backends: Lessons from 10 Years of Ceph Evolution

PigPaxos: Devouring the communication bottlenecks in distributed consensus

Frugal computing

Learning about distributed systems: where to start?

Fine-Grained Replicated State Machines for a Cluster Storage System

My Distributed Systems Seminar's reading list for Spring 2020

Cross-chain Deals and Adversarial Commerce

Book review. Tiny Habits (2020)

Zoom Distributed Systems Reading Group