## Thursday, February 14, 2019

### Paper review. Sharding the Shards: Managing Datastore Locality at Scale with Akkio

This paper by Facebook, which appeared in OSDI'18, describes the data locality management service, Akkio. Akkio has been in production use at Facebook since 2014. It manages over 100PB of data, and processes over 10 million data accesses per second.

## Why do we need to manage locality?

Replicating all data to all datacenters is difficult to justify economically (due to the extra storage and WAN networking costs) when acceptable durability and request serving latency could be achieved with 3 replicas. It looks like Facebook had been doing full replication (at least for ViewState and AccessState applications discussed in the evaluation) to all the 6 datacenters back-in-the-day, but as the operation and the number of datacenters grew, this became untenable.

So, let's find suitable home-bases for data, instead of fully replicating it to all datacenters. But the problem is access locality is not static. What was a good location/configuration for the data ceases to become suitable when the data access patterns change. A change in the requesting datacenter can arise because the user travels from one region to another. But, meh, that is old hat, and not very interesting. The paper mentions that the more likely reason for request movements is that because service workloads are shifted from datacenters with high loads to others with lower loads in order to lower service request response latencies. A diurnal peaking pattern is evident in the graphs. During peaks, up to 50% of requests may be shifted to remote datacenters.

## Why are µ-shards the right abstraction for managing locality?

The paper advocates for µ-shards (micro-shards), very fine grained datasets (from ~1Kb to ~1Mb), to serve as the unit of migration and  the abstraction for managing locality. A µ-shard contains multiple key-value pairs or database table rows, and should be chosen such that it exhibits strong access locality. Examples could be Facebook viewing history to inform subsequent content, user profile information, Instagram messaging queues, etc.

Why not shards, but µ-shards? Shards are for datastores, µ-shards are for applications. Shard sizes are set by administrators to balance shard overhead, load balancing, and failure recovery, and they tend to be on the order of gigabytes. Since µ-shards are formed by the client application to refer to a working data set size, they capture access locality better. They are also more amenable to migration. µ-shard migration has an overhead that is many order of magnitude lower than that of shard migration, and its utility is far higher. There is no need to migrate 1GB partition, when the access is to a 1MB portion.

## Enter Akkio

To address these, the paper introduces Akkio, a µ-shard based locality management service for distributed datastores. Akkio is layered between client applications and the distributed datastore systems that implements sharding. It decides in which datacenter to place and how and when to migrate data for reducing access latencies and WAN communication. It helps direct each data access to where the target data is located, and it tracks each access to be able to make appropriate placement decisions.

In the rest of this review, we discuss how Akkio leverages the underlying datastore to build a locality management layer, the architecture and components of that management layer, and the evaluation results from deployment of Akkio at Facebook. As always we have a MAD questions section at the end to speculate and free roam.

## Underlying datastore & shard management

The paper discusses ZippyDB, as an underlying datastore that Akkio layers upon. (Akkio also runs over Cassandra, and 3 other internally developed databases at Facebook.)

ZippyDB manages shards as follows. Each shard may be configured to have multiple replicas, with one designated to be the primary and the others referred to as secondaries. A write to a shard is directed to its primary replica, which then replicates the write to the secondary replicas using Paxos to ensure that writes are processed in the same order at each replica. Reads that need to be strongly consistent are directed to the primary replica. If eventual consistency is acceptable then reads can be directed to a secondary.

A shard's replication configuration identifies the number of replicas of the shard and how the replicas are distributed over datacenters, clusters, and racks. A service may specify that it requires three replicas, with two replicas (representing a quorum) in one datacenter for improved write latencies and a third in different datacenter for durability.

A replica set collection refers to the group of all replica sets that have the same replication configuration. Each such collection is assigned a unique id, called location handle. Akkio leverages on replica set collections as housing for µ-shards.

This figure depicts several shard replica sets and a number of µ-shards within the replica sets. You can see datacenters A, B, and C in the figure and imagine it goes all the way to datacenter Z if you like. A replica set collection with location handle 1 has a primary at datacenter A and has two secondaries it replicates to in datacenter B. Replica set collection 345 has a primary in A and secondaries in B and C.

x is a µ-shard originally located in the replica set collection 78 which has a primary in datacenter C, and maybe secondaries in D and E. When access patterns change so that datacenters A and B are better location for µ-shard x, Akkio relocates x from replica set 78 to replica set 1. That means from now on, the writes for x are forwarded to datacenter A, and are automatically replicated to the secondaries of replica set 1 at B and C.

A helpful analogy is to think of replica set collections as condo buildings, which offer different locations and floor plan configurations. You can think of location handle for a replica set collection as the address for the condo building, and µ-shard as the tenant in the condo building. When a µ-shard encounters a need to migrate, Akkio relocates it to a different condo building with suitable location and configuration.

## Akkio Design and Architecture

I once heard that "All problems in computer science can be solved by another level of indirection". Akkio is built upon the following level of indirection idea. Akkio maps µ-shards onto shard replica set collections whose shards are in turn mapped to datastore storage servers. This is worth repeating, because this is the key idea in Akkio. When running on top of ZippyDB, Akkio places µ-shards on, and migrates µ-shards between different such replica set collections. This allows Akkio to piggyback on ZippyDB functionality to provide replication, consistency, and intra-cluster load balancing.

Above is an architectural overview for Akkio. The client application here is View state (which we talk about in the evaluation). The client application is responsible for partitioning data into µ-shards such that it exhibits access locality. The client application must establish its own µ-shard-id scheme that identifies its µ-shards, and specify the µ-shard the data belongs to in the call to the database client library every time it needs to access data in the underlying database.

Harkening back to our analogy above, the µ-shard-id corresponds to the name of the tenant. If you mention that to Akkio, it knows which condo building the tenant lives now and forwards your message. Akkio also acts as the relocation service for the tenants; as the needs of tenants change, Akkio relocates them to different condo buildings, and keeps track of this.

Akkio manages all these functionality transparently to the application using three components: Akkio location service, access counter service, and data placement service.

## Akkio location service (ALS)

ALS maintains a location database. The location database is used on each data access to look up the location of the target µ-shard. The ZippyDB client library makes a call to the Akkio client library getLocation(µ-shard-id) function which returns a ZippyDB location handle (representing a replica set collection) obtained from the location database. This location handle (the condo building address in our analogy) enables ZippyDB’s client library to direct the access request to the appropriate storage server. The location database is updated when a µ-shard is migrated.

The location information is configured to have an eventually consistent replica at every datacenter to ensure low read latencies and high availability, with the primary replicas evenly distributed across all datacenters. The eventual consistency is justified because the database has high read-write ratio (> 500). Moreover, distributed in-memory caches are employed at every datacenter to cache the location information to reduce the read load on the database. Note that stale information is not a big problem in ALS, because a µ-shard that is missing in the forwarded location handle will lead to the client application making another ALS query, which is more likely to return new location information.

## Access counter service (ACS)

Each time the client service accesses a µ-shard, the Akkio client library requests the ACS to record the access, the type of access, and the location from which the access was made, so that Akkio collects statistics for making  µ-shard placement and migration decisions. This request is issued asynchronously so that it is not in the critical path.

## Data placement service (DPS)

The DPS initiates and manages µ-shard migrations. The Akkio Client Library asynchronously notifies the DPS that a µ-shard placement may be suboptimal whenever a data access request needs to be directed to a remote datacenter. The DPS re-evaluates the placement of a µ-shard only when it receives such a notification, in a reactive fashion. The DPS maintains historical migration data: e.g., time of last migration to limit migration frequency (to prevent the ping-ponging of µ-shards).

To decide on the optimal µ-shard placement, DPS uses the following algorithm. First, through querying ACS, DPS computes a per-datacenter score by summing the number of times the µ-shard was accessed from that datacenter over the last X days (where X is configurable), weighting more recent accesses more strongly. The per-datacenter scores for the datacenters on which the replica set collection has replicas are then summed to generate a replica set collection score. If there is a clear winner, DPS pick that winner. Else, among the suitable replica set collection candidates, DPS calculates another score using resource usage data, and go with the highest.

After the optimal location is identified, the migration is performed in a relatively straightforward manner. If the underlying datastore (such as ZippyDB) supports access control lists (ACL) the source µ-shard is restricted to be read-only during the migration. If the datastore does not support ACL (such as the Cassandra implementation used at Facebook), a slightly more involved migration mechanism is employed.

One thing I notice here is, while the replica set collection is a nice abstraction, it leads to some inefficiencies for certain migration patterns. What if we just wanted to swap the primary replica to the location of a secondary replica? (This is a plausible scenario, because the write region may have changed to the location of a secondary replica.) If we were working at a lower layer abstraction, this would be as simple as changing the leader in the Paxos replication group. But since we work on top of the replica set collection abstraction, this will require a full-fledged migration (following the procedure above) to a different replicaset collection where the location of the primary and the secondary replica is reversed.

## Evaluation

The paper gives an evaluation of Akkio using 4 applications used in Facebook, of which I will only cover the first two.

## ViewState

ViewState stores a history of content previously shown to a user. Each time a user is shown some content, an additional snapshot is appended to the ViewState data. ViewState data is read on the critical path when displaying content, so minimizing read latencies is important. The data needs to be replicated 3-ways for durability. Strong consistency is a requirement.

Wait... Strong consistency is a requirement?? I am surprised strong consistency is required to show the user its feed. I guess this is to improve the user experience by not re-displaying something the user has seen.

Originally, ViewState data was fully replicated across six datacenters. (I presume that was all the datacenters Facebook had back-in-the-day.) Using Akkio with the setup described above led to a 40% smaller storage footprint, a 50% reduction of cross-datacenter traffic, and about a 60% reduction in read and write latencies compared to the original non-Akkio setup. Each remote access notifies the DPS, resulting in approximately 20,000 migrations a second. Using Akkio, roughly 5% of the ViewState reads and writes go to a remote datacenter.

## AccessState

The second application is AccessState. AccessState stores information about user actions taken in response to content displayed to the user. The information includes the action taken, what content it was related to, a timestamp of when the action was taken, etc. The data needs to be replicated three ways but only needs to be eventually consistent.

Using Akkio with the setup described above led to a 40% decrease in storage footprint, a roughly 50% reduction of cross-datacenter traffic, negligible increase in read latency (0.4%) and a 60% reduction in write latency. Roughly 0.4% of the reads go remote, resulting in about 1000 migrations a second.

In the figure ViewState is at top, and AccessState at the bottom. The figure shows percentage of accesses to remote data, the number of evaluatePlacement() calls to DPS per second, and the number of ensuing µ-shard migrations per second. For ViewState the number of calls to DPS per second and the number of migrations per second are the same.

1. What are some future applications for Akkio?
The paper mentions the following.
Akkio can be used to migrate µ-shards between cold storage media (e.g. HDDs) and hot storage media (e.g., SSDs) on changes in data temperatures, similar in spirit to CockroachDB’s archival partitioning support. For public cloud solutions, Akkio could migrate µ-shards when shifting application workloads from one cloud provider to another cloud provider that is operationally less expensive. When resharding is required, Akkio could migrate µ-shards, on first access, to newly instantiated shards, allowing a more gentle, incremental form of resharding in situations where many new nodes come online simultaneously.
I had written about the need for data aware distributed systems/protocols in 2015.
This trend may indicate that the distributed algorithms should need to adopt to the data it operates on to improve performance. So, we may see the adoption of machine-learning as input/feedback to the algorithms, and the algorithms becoming data-driven and data-aware. (For example, this could be a good way to attack the tail-latency problem discussed here.)
Similarly, driven by the demand from the large-scale cloud computing services, we may see power-management, energy-efficiency, electricity-cost-efficiency as requirements for distributed algorithms. Big players already partition data as hot, warm, cold, and employ tricks to reduce power. We may see algorithms becoming more aware of this.
I think the trend is still strong. And we will see more work on data aware distributed systems/protocols in the coming years.

2. Is it possible to support transactions for µ-shards?
Akkio does not currently support inter µ-shard transactions, unless implemented entirely client-side. The paper mentions that providing this support is left for future work.

In our work on wPaxos, we not only showed policies for migration of µ-shards, but also implemented transactions on µ-shards.

We have recently presented a more detailed study on access locality and migration policies, and we hope to expand on that work in the context of Azure Cosmos DB.

3. How does this compare with other datastores?
Many applications already group together data by prefixing keys with a common identifier to ensure that related data are assigned to the same shard. FoundationDB is an example, although they don't have much of a geolocation and location management story yet. Some databases support the concept of separate partition keys, like Cassandra. But you need to deal with locality management yourself as the application developer. Spanner supports directories, and a move-dir command, although Spanner may shard directories into multiple fragments. CockroachDB uses small partitions, that should be amenable to migration. Migration can be done by relocating the Raft replica set responsible for a partition to destination datacenters gradually by leveraging Raft reconfiguration. But I don't think they have a locality management layer yet.

4. What are the limitations to the locality management?
As the paper mentions, some data is not suitable for µ-sharding as they cannot be broken into self-contained small parts without references to other entities. For example, Facebook social graph data and Google search graph data.

When there are multiple writers around the globe, the locality management technique and specifically Akkio's implementation leveraging datastores that use a primary and secondaries will fall short. Also when there are reads from many regions and request serving latency is to be minimized, efficient and streamlined full replication is a better choice.

Cosmos DB, Azure's cloud-native database service, offers frictionless global distribution across any number of 50+ Azure regions, you choose to deploy it on. It enables you to elastically scale throughput and storage worldwide on-demand quickly, and you pay only for what you provision. It guarantees single-digit-millisecond latencies at the 99th percentile, supports multiple read/write regions around the globe, multiple consistency models, and is backed by comprehensive service level agreements (SLAs).

5. Micro is the new black
µ-services, µ-shards. And even FaaS. There is an increasingly strong tendency with going micro. I think the root of this is because going with finer granularity makes for a more flexible, adaptable, agile distributed system.

## Tuesday, January 29, 2019

### Paper review: Probabilistically Bounded Staleness for Practical Partial Quorums

There is a fundamental trade-off between operation latency and data consistency in distributed database replication. The PBS paper (VLDB'12) examines this trade-off for partial quorum replicated data stores.

## Quorum systems

We can categorize quorum systems into strict versus partial quorums. Strict quorum systems ensure strong consistency by ensuring that read & write replica sets overlap: $R + W > N$. Here N is the total number of replicas in the quorum, R is the number of replicas that need to reply to a read query, and W is the number of replicas that need to reply to a write query.

Employing partial quorums can lower latency by requiring fewer replicas to respond, but R and W need not overlap: $R+W \leq N$. Such partial quorums offer eventual consistency.

Here is a visual representation of an expanding quorum system. The coordinator forwards a write requests to all N replicas, and wait for W acknowledgements for responding back to the client for completion of the write. The quorum system is called expanding because the third replica will also get the write request soon even though the coordinator waits for only W=2 acknowledgements to confirm the write as completed. Similarly the coordinator also forwards the read request to N nodes, and responds back to the client with the highest versioned read when responses from R replicas are received.

Many quorum-replicated data stores, such as Apache Cassandra, Basho Riak, and Project Voldemort offer a choice between strict quorums with strong consistency and partial quorums with eventual consistency. Cassandra often defaults to a partial/non-strict quorum system with N=3, R=W=1, for maximum performance. While Riak defaults to a strict quorum system with N=3 and R=W=2,  users suggest using  R=W=1, N=2 for low-value data. Finally, for applications requiring very low latency and high availability, LinkedIn deploys Voldemort with N=3 and R=W=1. (Unlike Dynamo style systems, Voldemort sends read requests to R of N replicas--not N of N--; this decreases load per replica and network traffic at the expense of read latency and potential availability.)

In Cosmos DB, inside a region, we offer quorums with N=4, W=3, and allow the user to choose R=1 for session-consistency, consistent-prefix, and eventual-consistency reads, and R=2 for strong-consistency and bounded-staleness consistency reads. Cosmos DB also provides these 5 well-defined consistency levels for global replication and across region reads.

Quorum staleness
How consistent is eventual? For the average case, can we offer staleness bounds with respect to version history and time? The Probabilistically Bounded Staleness (PBS) paper investigates this problem quantify the probability of staleness for partial quorums across versions via k-staleness and time via t-visibility metrics.

Let's start with some basic math to quantify staleness. What is staleness probability? It is the probability that the read quorum does not contain the last written version. We can obtain this by dividing the number of quorums of size $R$ composed of nodes that were not written to in the write quorum by the number of all possible read quorums:

$p_s = \frac{{{N-W} \choose R}}{{N \choose R}}$

For N=3, R=W=1, this probability comes to $p_s$=2/3, that is 0.666. But this is staleness with respect to the latest written version and with an immediate read after the write. We can generalize this staleness formula in both dimensions, with respect to k-versions (in lieu of latest version), and with respect to t unit time delayed read (in lieu of immediate, t=0, read).

#### k-staleness

A system obeys *k-staleness* consistency with probability $1-p_{sk}$ if at least one value in any read quorum has been committed within k versions of /the latest committed version when the read begins/.

Given the probability of a single quorum non-intersection p, the probability of non-intersection with one of the last $k$ independent quorums is $p^k$. (Note that this calculation ignores the effects of expanding quorums and constitutes a lower bound.)

$p_{sk} = \left( \frac{{{N-W} \choose R}}{{N \choose R}} \right)^k$

#### t-visibility

A system obeys t-visibility with probability $1-p_{st}$ if any read quorum started at least t units of time after a write commits returns at least one value that is at least as recent as that write.

Let $P_w$ ($W_r$,t) denote the cumulative density function describing the number of replicas W_r that have received version v exactly t time after v commits. For expanding quorums, W replicas have the value with certainty, and we can model t-visibility by summing the conditional probabilities of each possible $W_r$:

$p_{st} = \frac{{{N-W} \choose R}}{{N \choose R}} + \sum\limits_{c \in (W,N]} (\frac{{{N-c} \choose R}}{{N \choose R}} * [P_w(c+1,t)-P_w(c,t)])$

#### <k,t>-staleness

A system obeys <k,t>-staleness consistency with $1-p_{skt}$ if  at least one value in any read quorum will be within k versions of the latest committed version when the read begins, provided the read begins t units of time after the previous k versions commit.

$p_{skt} = \left( \frac{{{N-W} \choose R}}{{N \choose R}} + \sum\limits_{c \in (W,N]} (\frac{{{N-c} \choose R}}{{N \choose R}} * [P_w(c+1,t)-P_w(c,t)]) \right)^k$

Note that k-staleness equals  <k,0>-staleness consistency, and t-visibility equals <1,t>-staleness consistency.

## Monte Carlo modeling of t-staleness

Since t-staleness formula depends on $P_w$ the cumulative density function describing the expanding write quorums (i.e., anti-entropy), it is easier to analyze t-staleness using Monte Carlo simulations. So we first model the quorum systems using the *WARS* latency distributions in the operations, and then quantify the t-staleness.

The read coordinator will return stale data if the first R responses received reached their replicas before the replicas received the latest version (delayed by *W*). Note that for a strict quorum, where R+W>N, returning stale data is impossible, because R will intersect a replica that has seen the latest write. For the partial quorum systems, the probability of the staleness depends on the latency distributions on *W*, *A*, *R*, and also indirectly on *S*.

Let wt denotes the commit time (i.e., when the coordinator received W acks). A single replica's response is stale if r' + wt + t < w', for w' drawn from *W* and r' drawn from *R* latency distributions. Of course writes expand to additional replicas during *A* and *R*, and that helps.

We can see from this formulation that longer *W* tails and faster reads increase the chance of staleness due to reordering. Dually, for improved consistency, it helps to:

• reduce variance for *W* write-req from coord to replicas
• increase *A* write-reply from replicas to coord
• increase *R* read-request from coord to replicas
• reduce variance for *S* read-respond from replicas to coord

(The effect of *S* is indirect and is very small. If S is very high variance, then stale reads may get returned faster than fresh reads. So by reducing the variance on *S*, you increase the chance of reordering of a fresher read to get returned faster.)

#### Monte Carlo simulations

Calculating t-visibility for a given value of t is straightforward using Monte Carlo simulations.

1. Draw N samples from *W*, *A*, *R*, and *S* at time t,
2. Compute wt as the Wth smallest value of {*W[i] + A[i]*, i \in [0, N )}
3. Check if the first R samples of *R*, ordered by *R[i] + S[i]* all satisfy $wt+R[i]+t \leq W[i]$

The paper uses exponential latency distribution for some Monte Carlo simulations, because exponential distributions are simple. An exponential distribution describes the time between events in a Poisson point process, i.e., a process in which events occur continuously and independently at a constant average rate. The cumulative distribution function (CDF) is given as $F(x;\lambda) = 1- e^{-\lambda*x}$,  for $x \geq 0$, which leads to the  Mean = $\frac{1}{\lambda}$, and Variance= $\frac{1}{\lambda^2}$.

The PBS webpage provides an interactive demonstration of Monte Carlo simulations using the *WARS* model with exponential distributions. The demo gives you a better understanding of the effects of *WARS* distribution and t on consistency.

## Write-latency distribution effects

In order to illustrate the effects of *W*, write-latency distribution, the paper fixes *A=R=S* with $\lambda$=1, and sweeps a range of *W* distributions by changing its $\lambda$.

As expected, we find that high write variance *W* increases staleness:

• When the variance of *W* is 0.0625ms ($\lambda$= 4, mean .25ms, one-fourth the mean of *A=R=S*), we observe a 94% chance of consistency immediately after the write and 99.9% chance after 1ms
• When the variance of *W* is 100ms ($\lambda$ = .1, mean 10ms, ten times the mean of *A=R=S*), we observe a 41% chance of consistency immediately after write and a 99.9% chance of consistency only after 65ms

As the variance and mean of W increases, so does the inconsistency. Looking at distributions with fixed means and variable variances (uniform, normal), the paper finds that the mean of *W* is less important than its variance if *W* is strictly greater than *A=R=S*.

## Using production traces

Instead of just providing simulations with exponential distributions, the paper also maps these simulations to production deployments, by first fitting  production latency distributions (obtained from LinkedIn and Yammer deployments) to distributions. It looks like Pareto distributions fit the latency distributions better for most cases.

LNKD-SSD versus LNKD-DISK comparisons provide a good validation for the PBS finding that reducing *W* variance contributes most for the consistency. The figure shows that SSDs improve consistency immensely due to their reduced write variance. Immediately after write commit, LNKD-SSD had a 97.4% probability of consistent reads, reaching over a 99.999% probability of consistent reads after 5ms.

Another thing to notice is that R=2 & W=1 (the blue square plot) blows the socks off of R=1 & W=2 (the green circle plot).  Why? Aren't these suppose to be symmetrical? My explanation is this. By increasing W by 1, you incur only a very little latency (assuming the variance on *W* is not large) and in return, you get to hit two replicas with a read, which exponentially decreases the probability of both replicas missing the latest version.

Why is this not more widely adapted in partial quorum systems? W=1 makes you vulnerable to a data loss but only if both the replica and the coordinator crashes at the same time and the coordinator did not have chance to forward the write to other replicas even though it had acknowledged the write (not very plausible). Even with W>1, reading from more replicas improves consistency quickly, so it is a low hanging fruit to reap when the performance requirements don't forbid it.

Figure 7 shows how varying N affects t-visibility while maintaining R=W=1. As expected, the probability of consistency immediately after write commit decreases as N increases. But you can see that SSDs totally rock! Even with increased N, they keep a very high consistency probability thanks to the very low variance on *W* write latency across replicas.

Finally, Table 4 compares t-visibility required for a 99.9% probability of consistent reads to the 99.9th %ile read and write latencies. The table shows that lowering values of R and W can greatly improve operation latency and  t-visibility can be low even when we require a high probability of consistent reads. Again note how much an improvement W=1 & R=2 provides over W=2 & R=1! That makes a big difference.

1. Is it possible to change the read API to capture sweetpoints in tradeoff between consistency and read/write latency??

There is a knee for large $\lambda$ (i.e.,  small mean & variance). Waiting till the knee gives you the most bang-for-the-buck for consistency, and waiting after the knee helps less.

What if we fix time waited on a read, instead of R, the number of replicas to read from? This will prevent the coordinator from accepting the response from an early stale read-reply as sufficient. The coordinator can wait the period the SLAs (or cut that short if another read reply is received), and this will avoid falling for an early stale read reply.

2. By working from first principles, can we find an unexplored part of the state space to improve consistency??

We saw that for improved consistency, it helps to

• reduce variance for *W* write-req from coord to replicas
• increase *A* write-reply from replicas to coord
• increase *R* read-request from coord to replicas

What are some alternative ways to satisfy these conditions?

If we take this to logical extremes, it is better to keep the replicas close to each other (in the same cluster in LAN, or in nearby regions in WAN), and away from the client. This setup reduces *W* variance, and increases *A* and *R* durations.

I wonder if we can find other unexplored points in the space.

3. Why don't we use PBS analysis in WAN to help cloud clients decide on which regions to deploy??

I had mentioned that Azure Cosmos DB supports clients to configure the default consistency level on their database account (and later override the consistency on a specific read request). For all four relaxed consistency levels (bounded, session, consistent-prefix, and eventual), among other metrics, Cosmos DB also tracks and reports the probabilistic bounded staleness (PBS) metric, which I think is unique among available solutions.

I think this use of PBS can be extended to guide customers decide on which regions to deploy. In the Azure cloud, customers can deploy among 50+ regions, and the selection of the regions have implications for latency and consistency tradeoffs if relaxed consistency levels are chosen. Moreover,  Cosmos DB does not restrict the client to a single write region and allows multiple write-regions and resolves conflicts by way of an Arbiter and anti-entropy mechanism. So PBS metrics can also be used to get the clients get the most out of this by choosing optimal deployment regions for the access patterns. I will be looking at this in the coming weeks.

## Wednesday, January 9, 2019

### Paper review. An Empirical Study on Crash Recovery Bugs in Large-Scale Distributed Systems

Crashes happen. In fact they occur so commonly that they are classified as anticipated faults. In a large cluster of several hundred machines, you will have one node crashing every couple of hours. Unfortunately, as this paper shows, we are still not very competent at handling crash failures.

This paper from 2018 presents a comprehensive empirical study of 103 crash recovery bugs from 4 popular open-source distributed systems: ZooKeeper, Hadoop MapReduce, Cassandra, and HBase. For all the studied bugs, they analyze their root causes, triggering conditions, bug impacts and fixing.

## Summary of the findings

Crash recovery bugs are caused by five types of bug patterns:
• incorrect backup (17%)
• incorrect crash/reboot detection (18%)
• incorrect state identification (16%)
• incorrect state recovery (28%)
• concurrency (21%)

Almost all (97%) of crash recovery bugs involve no more than four nodes. This finding indicates that we can detect crash recovery bugs in a small set of nodes, rather than thousands.

A majority (87%) of crash recovery bugs require a combination of no more than three crashes and no more than one reboot. It suggests that we can systematically test almost all node crash scenarios with very limited crashes and reboots.

Crash recovery bugs are difficult to fix. 12% of the fixes are incomplete, and 6% of the fixes only reduce the possibility of bug occurrence. This indicates that new approaches to validate crash recovery bug fixes are necessary.

## A generic crash recovery model

The study uses this model for categorizing parts of crash-recovery of a system.

The study leverages on the existing cloud bug study database, CBS. CBS contains 3,655 vital issues from six distributed systems (ZooKeeper, Hadoop MapReduce, Cassandra, HBase, HDFS and Flume), reported from January 2011 to January 2014. The dataset of the 103 crash recovery bugs this paper analyzes is available at http://www.tcse.cn/~wsdou/project/CREB.

## Root cause

Finding 1: In 17/103 crash recovery bugs, in-memory data are not backed up, or backups are not properly managed.

Finding 2: In 18/103 crash recovery bugs, crashes and reboots are not detected or not timely detected. (E.g., when a node crashes, some other relevant nodes may access the crash node without perceiving that the node has crashed, and they may then hang or throw errors. Or, if a crash node reboots very quickly, then the crash may be overlooked by the crash detection component based on timeout. Thus, the crash node may contain corrupted states, and the system has nodes whose state is out-of-synch, violating invariants.)

Finding 3: In 17/103 crash recovery bugs, the states after crashes/reboots are incorrectly identified. (E.g., the recovery process mistakenly considers wrong states as incorrect: the node may think it is still the leader after recovery.)

Finding 4: The states after crashes/reboots are incorrectly recovered in 29/103 crash recovery bugs. Among them, 14 bugs are caused by no handling or untimely handling of certain leftovers.

Finding 5: The concurrency caused by crash recovery processes is responsible for 22/103 crash recovery bugs. (Concurrency between a recovery process and a normal execution amounts to 13 bugs, concurrency between two recovery processes amount to 4 bugs, and concurrency within one recovery process amounts to 5 bugs.)

Finding 6: All the seven recovery components in our crash recovery model can be incorrect and introduce bugs. About one third of bugs are caused in crash handling component. (Overall, the crash handling component is the most error-prone (34%). The next top three components, backing up (19%), local recovery (17%) and crash detection (13%) also occupy a large portion.)

Finding 7: In 15/103 crash recovery bugs, new relevant crashes occur on/during the crash recovery process, and thus trigger failures.

So it seems that for 85 out of 103 bugs (excluding the 17 bugs from Finding 1) state-inconsistency across nodes is the culprit. I don't want to be the guy who always plugs "invariant-based design" but, come on... Invariant-based design is what we need here to prevent the problems that arose from operational reasoning. In operational reasoning, you start with a "happy path", and then you try to figure out "what could go wrong?" and how to prevent them. Of course, you always fall short in that enumeration of problem scenarios and overlook corner cases, race conditions, and cascading failures. In contrast, invariant-based reasoning focuses on "what needs to go right?" and how to ensure this properties as invariants of your system at all times. Invariant-based reasoning takes a principled state-based rather than operation/execution-based view of your system. To attain invariant-based reasoning, we specify safety and liveness properties for our models. Safety properties specify "what the system is allowed to do" and ensures "nothing bad happens". For example, at all times, all committed data is present and correct. Liveness properties specify "what the system should eventually do" and ensures "something good eventually happens". For example, whenever the system receives a request, it must eventually respond to that request.

Here is another argument for using invariant-based design, and as an  example, you can check my post on the two-phase commit protocol.

## Triggering conditions

Finding 8: Almost all (97%) crash recovery bugs involve four nodes or fewer.

Finding 9: No more than three crashes can trigger almost all (99%) crash recovery bugs. No more than one reboot can trigger 87% of the bugs. In total, a combination of no more than three crashes and no more than one reboot can trigger 87% (90 out of 103) of the bugs.

Finding 10: 63% of crash recovery bugs require at least one client request, but 92% of the bugs require no more than 3 user requests.

Finding 11: 38% of crash recovery bugs require complicated input conditions, e.g., special configurations or background services.

Finding 12: The timing of crashes/reboots is important for reproducing crash recovery bugs.

## Bug impact

Finding 13: Crash recovery bugs always have severe impacts on reliability and availability of distributed systems. 38% of the bugs can cause node downtimes, including cluster out of service and unavailable nodes.

Finding 14: Crash recovery bugs are difficult to fix. 12% of the fixes are incomplete, and 6% of the fixes only reduce the possibility of bug occurrence.

Finding 15: Crash recovery bug fixing is complicated. Amounts of developer efforts were spent on these fixes.

1. Why is this not a solved problem?
The crash faults have been with us for a long time. They have become even more relevant with the advent of containers in cloud computing, which may be shutdown or migrated for resource management purposes. If so, why are we still not very competent at handling crash-recovery?

It even seems like we have some tool support as well. There are a lot of write-ahead-logging available to deal with the bugs in Finding 1 related to backing up in-memory data. (Well, that is assuming you have a good grasp on which data are important to backup via write-ahead-logging.) We can use ZooKeeper for keeping configuration information, so that the nodes involved don't have differing opinions about which node is down. While keeping the configuration in ZooKeeper helps alleviate some of the state-inconsistency problems, we still need invariant-based design (and model-checking the protocol) to make sure the protocol does not suffer from state-inconsistency problems.

This is a sincere question, and not a covert way to suggest that developers are being sloppy. I know better than that and I have a lot of respect for the developers. That means the problem is more sticky hairy at the implementation level, and our high-level abstractions are leaking. There has been relevant work "On the complexity of crafting crash-consistent applications" in OSDI'14. There is also recent followup work on "Protocol-Aware Recovery for Consensus-Based Distributed Storage" which I like to read soon.

Finally, John Ousterhout had work on write-ahead-logging and recovery in in-memory systems as part of RamCloud project, and I should check recent work from that group.

2. How does this relate to crash-only software?
It is unfortunate that the crash-only software paper has not been cited and discussed by this paper, because I think crash-only software suggested a good, albeit radical, way to handle crash-recovery. As the findings in this paper show a big part of the reason the bugs occur is because the crash-recovery paths are not exercised/tested enough during development and even normal use. The "crash only software" work had the insight to exercise crashes as part of normal use: "Since crashes are unavoidable, software must be at least as well prepared for a crash as it is for a clean shutdown. But then --in the spirit of Occam's Razor-- if software is crash-safe, why support additional, non-crash mechanisms for shutting down? A crash-only system makes it affordable to transform every detected failure into component-level crashes; this leads to a simple fault model, and components only need to know how to recover from one type of failure."

3. How does this compare with TaxDC paper?
The TaxDC paper studied distributed coordination (DC) bugs in the cloud and showed that more than 60% of DC bugs are triggered by a single untimely message delivery that commits order violation or atomicity violation, with regard to other messages or computation. (Figure 1 shows possible triggering patterns.) While this claim sounds like a very striking and surprising finding, it is actually straightforward. What is a DC bug? It is a manifestation of state inconsistency across processes. What makes the state inconsistency into a bug? A communication/message-exchange between the two inconsistent processes.

Compared to the TaxDC paper, this paper focuses on a small set of bugs, only the crash-recovery bugs. In comparison to the TaxDC paper, the paper states that crash recovery bugs are more likely to cause fatal failures than DC bugs. In contrast to 17% of DC bugs, a whopping 38% of the crash-recovery bugs caused additional node downtimes, including cluster out of service and unavailable nodes.

## Saturday, January 5, 2019

### Paper review. Serverless computing: One step forward, two steps back

Serverless computing offers the potential to program the cloud in an autoscaling and pay-only-per-invocation manner. This paper from UC Berkeley (to appear at CIDR 19) discusses limitations in the first-generation serverless computing, and argues that its autoscaling potential is at odds with data-centric and distributed computing. I think the paper is written to ignite debate on the topic, so here I am writing some of my takes on these arguments.

Overall, I think the paper could have been written in a more constructive tone. After you read the entire paper, you get a sense that the authors want to open constructive dialogue for improving the state of serverless rather than to crucify it. However, the overly-critical tone of the paper leads to some unfair claims,  not only about serverless, but also about cloud computing as well. This paragraph in the introduction is a good example.
New computing platforms have typically fostered innovation in programming languages and environments. Yet a decade later, it is difficult to identify the new programming environments for the cloud. And whether cause or effect, the results are clearly visible in practice: the majority of cloud services are simply multi-tenant, easier-to-administer clones of legacy enterprise data services like object storage, databases, queueing systems, and web/app servers. Multitenancy and administrative simplicity are admirable and desirable goals, and some of the new services have interesting internals in their own right. But this is, at best, only a hint of the potential offered by millions of cores and exabytes of data.

I think this paragraph downplays a lot of technologies developed for the cloud (from both programming languages and environments perspective): MapReduce, Resilient Distributed Datasets, Hadoop environment, Spark environment, real time data processing and streaming systems, distributed machine learning systems, large-scale caching, scalable globe-spanning databases from NoSQL to NewSQL, integration frameworks, RESTful architectures, microservices frameworks,  and Lambda and Kappa architectures. In additon to these, cloud computing also gave rise to supporting systems (VMs, containers), scheduling frameworks (Mesos, Kubernetes), SLAs, and observability, debugging, and devops tools.

## Some background on Serverless aka Function as a Service (FaaS)

I had reviewed a paper on serverless/FaaS earlier on this blog. It is worthwhile to check that summary for a couple minutes, for refreshing your background on FaaS.

FaaS offerings today support a variety of languages (e.g., Python, Java, Javascript, Go) for shipping the code to a common runtime, and allow developers to register functions and declare events that trigger each function. The FaaS infrastructure monitors the triggering events, allocates a runtime for the function, executes it, and persists the results.  FaaS requires data management in both persistent and temporary storage, and in the case of AWS, this includes S3 (large object storage), DynamoDB (key-value storage), SQS (queuing services), SNS (notification services), and more. The user is billed only for the computing resources used during function invocation.

## The shortcomings of current FaaS offerings

1) Limited Lifetimes. After 15 minutes, function invocations are shut down by the Lambda infrastructure. Lambda may keep the function's state cached in the hosting VM to support "warm start", but there is no way to ensure that subsequent invocations are run on the same VM.

2) I/O Bottlenecks. Lambdas connect to cloud services—notably, shared storage—across a network interface. This means moving data across nodes or racks. Recent studies show that a single Lambda function can achieve on average only 538Mbps network bandwidth.

3)  Communication Through Slow Storage. While Lambda functions can initiate outbound network connections, they themselves are not directly network-addressable in any way. A client of Lambda cannot address the particular function instance that handled the client's previous request: there is no "stickiness" for client connections. Hence maintaining state across client calls requires writing the state out to slow storage, and reading it back on every subsequent call.

4) No Specialized Hardware. FaaS offerings today only allow users to provision a timeslice of a CPU hyperthread and some amount of RAM; in the case of AWS Lambda, one determines the other. There is no API or mechanism to access specialized hardware.

All of these are factual and fair assessment of current FaaS offerings. I think "no specialized hardware" is not inherent, and can change as applications require it. It can be argued that the other three shortcomings are actually deliberate design decisions for FaaS offerings based on its primary goals: providing pay-only-per-invocation autoscaling computation. FaaS does not require reservation, and does not charge you when your code/app is not being used, yet allows the code/app to deploy quickly within a second in contrast to 20 seconds required for a container and on the order of minutes required for a VM.

## Forward but also backward

The paper acknowledges autoscaling as a big step forward: "By providing autoscaling, where the workload automatically drives the allocation and deallocation of resources, current FaaS offerings take a big step forward for cloud programming, offering a practically manageable, seemingly unlimited compute platform."

But it calls out the following two points as major steps backward:
They ignore the importance of efficient data processing. Serverless functions are run on isolated VMs, separate from data. Moreover their capacity to cache state internally to service repeated requests is limited. Hence FaaS routinely "ships data to code" rather than "shipping code to data." This is a recurring architectural anti-pattern among system designers, which database aficionados seem to need to point out each generation.
They stymie the development of distributed systems. Because there is no network addressability of serverless functions, two functions can work together serverlessly only by passing data through slow and expensive storage. This stymies basic distributed computing. That field is founded on protocols performing  fine-grained communication between agents, including basics like leader election, membership, data consistency, and transaction commit.
...
In short, with all communication transiting through storage, there is no real way for thousands (much less millions) of cores in the cloud to work together efficiently using current FaaS platforms other than via largely uncoordinated (embarrassing) parallelism.

Again I agree with the desirability of efficient data processing, shipping code to data, and network addressable processes/functions. On the other hand, current FaaS offerings are making a trade-off between efficiency and easy-to-program pay-only-per-invocation autoscaling functionality. Going from on-prem to IaaS to PaaS to FaaS, you relinquish control yet in return you get higher productivity for the developer. Systems design is all about tradeoffs.

Is it possible to have both very high control and very easy-to-program/high-productivity system? If you restrict yourself to a constrained domains, it may be able to have both features at their full-high-point together. Otherwise I doubt you can have both at their full extents for general unconstrained computation domains. However, there may be better sweet-points in the design space. Azure Durable Functions provide stateful FaaS and partially addresses the two backwardness concerns above about efficient data-processing and addressable serverless functions.

## Case studies

The paper reports from 3 case studies they implemented using AWS Lambda and compare them to implementations on EC2.

1. Model training for star prediction from Amazon product review. This algorithm on Lambda is 21× slower and 7.3× more expensive than running on EC2.

2. Low-latency prediction serving via batching.  This uses SQS (I am not sure if it could be done another way). The corresponding EC2 implementation has a  per batch latency of 2.8ms and is 127× faster than the optimized Lambda implementation. The EC2 instance on the other hand has a throughput of about 3,500 requests per second, so 1 million messages per second would require 290 EC2 instances, with a total cost of \$27.84 per hour. This is still a 57× cost savings compared to the Lambda implementation.

3. Distributed computing. This implements Garcia-Molina's bully leader election in Python. Using Lambda, all communication between the functions was done in blackboard fashion via DynamoDB. With each Lambda polling four times a second, they found that each round of leader election took 16.7 seconds. Communicating via cloud storage is not a reasonable replacement for directly-addressed networking--it is at least one order of magnitude too slow.

Again, I think this is not an apples to apples comparison. FaaS offerings  provide pay-only-per-invocation autoscaling computation. FaaS does not require reservation, and does not charge you when your code/app is not being used, yet allows the code/app to deploy quickly within a second in contrast to 20 seconds required for a container and on the order of minutes required for a VM. When you compare for a 3 hours continuous use FaaS will come out costing more, but over a longer period, say many days or weeks of sporadic use FaaS will give you the cheaper option because of its pay-only-per-invocation billing.

## Stepping forward to the future

The paper mentions the following two as particularly important directions to address for providing stronger FaaS offerings.

Fluid Code and Data Placement. To achieve good performance, the infrastructure should be able and willing to physically colocate certain code and data. This is often best achieved by shipping code to data, rather than the current FaaS approach of pulling data to code. At the same time, elasticity requires that code and data be logically separated, to allow infrastructure to adapt placement: sometimes data needs to be replicated or repartitioned to match code needs. In essence, this is the traditional challenge of data independence, but at extreme and varying scale, with multi-tenanted usage and fine-grained adaptivity in time. High-level, data-centric DSLs--e.g., SQL+UDFs, MapReduce, TensorFlow-- can make this more tractable, by exposing at a high level how data  flows through computation. The more declarative the language, the more logical separation (and optimization search space) is offered to the infrastructure.
Long-Running, Addressable Virtual Agents. Affinities between code, data and/or hardware tend to recur over time. If the platform pays a cost to create an affinity (e.g. moving data), it should recoup that cost across multiple requests. This motivates the ability for programmers to establish software agents--call them functions, actors, services, etc.-- that persist over time in the cloud, with known identities. Such agents should be addressable with performance comparable to standard networks. However, elasticity requirements dictate that these agents be virtual and dynamically remapped across physical resources. Hence we need virtual alternatives to traditional operating system constructs like "threads" and "ports": nameable endpoints in the network. Various ideas from the literature have provided aspects of this idea: actors, tuplespaces pub/sub and DHTs are all examples. Chosen solutions need to incur minimal overhead on raw network performance.

1. How much of the claims/arguments against FaaS also apply to PaaS?
PaaS is in between IaaS and FaaS in the spectrum. It is prone to the same arguments made against limitations of FaaS here. For PaaS you can also argue that it ignores the importance of efficient data processing and it stymies the development of distributed systems. PaaS is a bit better than in FaaS in the closer analysis though: It doesn't have limited lifetimes, it can maintain sessions for clients, and it can support specialized hardware better. On the other hand, FaaS can scale to more invocations much faster in in a couple seconds compared to 20+ seconds for PaaS that use containers.

2. Is ease-of-development more important than performance?
Around 2010 Hadoop MapReduce got a lot of traction/adoption, even though the platform was very inefficient/wasteful and not in "distributed systems spirit" (as this paper calls FaaS). This was pretty surprising for me to witness. It seemed like  nobody cared how inefficient the platform was; people were OK waiting hours for the inefficient mapreduce jobs to finish. They did this because this beats the alternative of spending even more hours (and developer cost) in developing an efficient implementation that does the job.

It seems like ease-of-development takes precedence over performance/cost. In other words, worse is better!

FaaS is all about optimizing developer productivity and time-to-market. FaaS enables developers to prototype something in a week rather than trying to build the same functionality "the right way" in a couple months. It is not worth investing many weeks into developing an optimized system, before you could test if there is a good product fit. But after you prototype using FaaS and test the product, decide on ways to improve and tune the system, if you still need more efficiency and lower cloud costs, you can provide that by leveraging on the experience you got from your prototype and implement an optimized version of the system "the right way" over IaaS over PaaS.

FaaS provides ease of development, and quick on-demand scaling to handle flash-floods. Also despite the disadvantages in efficiency, being stateless is a big plus for fault-tolerance. So I think current FaaS offerings will not have much contest at least for another 5 years. We are still at the start of the FaaS technology curve. In the meanwhile more efficient, more reactive, fluid versions will get ready, and they will hopefully slowly improve on the  ease-of-programming aspect. The early prototypes of the fluid systems, even for constrained domains, still require a lot of programmer skill and knowledge of distributed systems internals, and we need to simplify programming for those systems.

3. How does this play with disaggregated storage/computing trend?
As a cloud-service provider your biggest challenge is to find cost-effective ways to host your clients in a multitenant environment while providing them isolation. It is very hard to rightsize the clients. If you overestimate, you lose money. If you underestimate, the clients get upset, take their business elsewhere, and you lose money.

Enter disaggregation. Disaggregation gives you flexibility for multitenancy and enables keeping the costs down. It simplifies the management of resources and rightsizing the clients and enables the cloud providers to provide performant yet cost-efficient multitenancy. Yes, there is an overhead to disaggregation, namely shipping data to code/compute, but that overhead is balanced by the flexibility of multitenancy you get in return.

Acknowledgment.
I thank Chris Anderson, Matias Quaranta, Ailidani Ailijiang for insightful discussion on Azure functions and Azure durable functions.

## Tuesday, December 25, 2018

### Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+.

The two-phase commit protocol is practical and is used in many distributed systems today. Yet it is still brief enough that we can model it quickly, and learn a lot from modeling it. In fact, we see that through this example we get to illustrate a fundamental impossibility result in distributed systems directly.

## The two-phase commit problem

A transaction is performed over resource managers (RMs). All RMs must agree on whether the transaction is committed or aborted.

The transaction manager (TM) finalizes the transaction with a commit or abort decision. For the transaction to be committed, each participating RM must be prepared to commit it. Otherwise, the transaction must be aborted.

We perform this modeling in the shared memory model, rather than in message passing, to keep things simple. This also ensures that the model checking is fast. But we add nonatomicity to the "read from neighbor and update your state" actions in order to capture the interesting behaviors that would happen in a message passing implementation.

A RM can only read the TM's state and read/update its own state. It cannot read other RM's state.  A TM can read all RM nodes' state and read/update its own state.

## Definitions

Lines 9-10 set the initial rmState for each RM to "working" and that of the TM to "init".

The canCommit predicate is defined to be true iff all RMs are "prepared" (they are ready for a commit decision). If there exists an RM with "abort" state, canAbort becomes truthified.

## TM model

The TM modeling is straightforward. TM checks if it can commit or can abort, and updates tmState accordingly.

TM can also fail making tmState "unavailable", but this gets exercised only if the constant TMMAYFAIL is set to true before the model checking starts. In TLA+, the labels determine the granularity of atomicity. By putting the labels F1 and F2, we denote that the corresponding statements are executed nonatomically (after some indeterminate time passes) with respect to the previous statements.

## RM model

The RM model is also simple. Since "working" and "prepared" states are nonterminal states, the RM nondeterministically chooses among the enabled actions until a terminal state is reached. A "working" RM may transition to an "aborted" or "prepared" state. A "prepared" RM waits to hear the commit and abort decision from the TM and acts accordingly. The figure below shows the state transitions possible for one RM. But note that we have multiple RMs, each of which is going through their state transitions at their pace without the knowledge of the state of another RM.

## Model checking the two-phase commit

We are interested in checking that our two-phase commit is consistent: there are no two RMs such that one says "committed" and another "aborted".

The predicate Completed checks that the protocol does not hang on forever: eventually each RM reaches to a terminal "committed" or "aborted" state.

With that, we are ready to model check this protocol. Initially we set TMMAYFAIL=FALSE, RM=1..3 to run the protocol with three RMs and one TM that is reliable. The model checker takes 15 seconds and tells us that there are no errors. Both Consistency and Completed are satisfied by any possible execution of the protocol with different interleaving of enabled RM actions and TM actions.

Now, we set  TMMAYFAIL=TRUE and restart the model checker. The model checker is quick to give us a counterexample trace where the RMs in this execution is stuck waiting to hear back from the TM which has become unavailable.

We see that on State=4 the RM2 transitions to aborted, on State=7 the RM3 transitions to aborted, on State=8 the TM transitions to "abort", and crashes on State=9. On State=10, the system is stuck because the RM1 is in the prepared state waiting forever to learn a decision from the TM which has crashed.

## BTM modeling

In order to avoid any downtime for many transactions the TM may be commandeering, we add a Backup TM  (BTM) to quickly take over when the TM becomes unavailable. The BTM uses the same logic as the TM to make decisions. And for simplicity we assume the BTM does not fail.

When we model check with the BTM process added, we get a new error message.

BTM cannot rule for a commit, because our original, canCommit condition asserted that all RMstates should be "prepared" and didn't account for the condition when some RMs already learned the "commit" decision from the original TM before the BTM takes over. So we revise our canCommit condition to account for this.

Success! When we check the model, we find that both Consistency and Completed are satisfied, as the BTM can take over and finalizes the transaction when TM fails. Here is the 2PCwithBTM model in TLA+ (initially the BTM and the second line of canCommit commented out). Here is the pdf corresponding to the pdf.

## What if RMs could also fail?

We assumed the RMs are reliable. Now we relax that to see how the protocol behaves in the presence of RM failure. We add an "unavailable" state to model for a crash. In order to explore more behavior and model intermittent loss of availability, we allow a crashed RM to recover back and continue the protocol by reading its state from its log. Here is the RM state transition diagram again with the newly added "unavailable" state and transitions marked with red. And below that is the revised model for the RMs.

We also need to strengthen canAbort to take into account the unavailable state. The TM can rule for "abort" if any of the RMs is in "aborted" or "unavailable" state. If we omit this condition, an RM that has crashed (and never recovers) may make the transaction lose progress. Of course precaution should be taken to ensure that BTM cannot rule for an abort if there exists a RM that has learned of a "committed" decision from the original TM.

## Model checking

When we check this model, we discover an inconsistency problem! How did this happen? Let's follow the execution trace for the counterexample.

On State=6, all the RMs are in prepared state, the TM had made a commit decision, and RM1 has seen that decision and moved to label RC, meaning that it is ready to change its state to "committed". (Keep RM1 in mind, this gun will fire at the last act.) Unfortunately, the TM crashes in State=7, and RM2 becomes =unavailable= in State=8. In State 9, the BTM takes over reads the RMstates as <prepared, unavailable, prepared> and rules for an abort in State=10. Remember RM1, it finally acts on the commit decision it saw from the original TM, and transitions to committed in State=11. In State=13, RM3 heeds the abort decision from the BTM and transitions to aborted, and we have the Consistency violated with RM1 deciding for committed and RM3 for aborted.

In this case, the BTM made a decision that violated consistency. On the other hand, if we had made BTM to wait until there are no "unavailable" RMs, it could be waiting forever on a crashed node, and this would then violate the progress condition.

The updated TLA+ model file is available here, and the corresponding pdf here.

## FLP impossibility

So, what happened?  We hit the Fischer, Lynch, Paterson (FLP) impossibility result: In an asynchronous system, it is impossible to solve consensus (satisfy both safety and progress conditions) in the presence of crash faults.

In our example, the BTM cannot correctly decide whether RM2 is crashed or not, and rules incorrectly for an abort. If there was only one decision maker, just the TM, the inaccurate failure detector would not be an issue. The RMs would follow whatever the TM decides and both Consistency and Progress would be satisfied.

What surfaces and illustrates the problem is that we have two decision makers TM and BTM looking at the state of the RMs at different times, and making different decisions. This asymmetry of information is the root of all evil in distributed systems.

This problem doesn't go away even with the three-phase commit extension. Here is the three-phase commit extension modeled in TLA+ (here is the pdf version), and below is the error trace that shows this time Progress is violated. (The wikipedia page on three-phase commit says in a timeout after receiving the precommit order the RMs, i.e., RM2 and RM3, should go ahead and commit, but that would instead cause a consistency problem in this case.)

## Paxos, making the world a better place

Not all hope is lost. We have Paxos. Paxos treads carefully within bounds of the FLP result. The innovation in Paxos is that it is always safe (even in presence of inaccurate failure detectors, asynchronous execution, faults), and it eventually makes progress when consensus gets in the realm of possibility.

You can emulate the TM with a Paxos cluster of 3 nodes and that will solve the inconsistent TM/BTM problem. Or as Gray and Lamport show in the transaction commit paper, if the RMs use the Paxos box to store their decisions concurrently with replying to the TM, this shaves off the extra one message delay from the obvious protocol.

## Monday, December 17, 2018

My new years resolution for 2018 was to "ask more/better/crazier questions."

To realize this resolution, I knew I needed to implement it as a system. So I committed to "ask at least a couple of MAD questions in each blog post." After close to 12 months and 70 posts, these are what I learned from this exercise.

## 1. Questioning is very useful

Adding the MAD questions section significantly extended/complemented my posts, and with little effort. Just by adding this section, I went for another 15-20 minutes, forcing myself out of my default mode. Sometimes I grudgingly did it, just because I had committed to it, but almost always I was surprised by how useful it was.

Many times the MAD questions section got longer than the main post body. And on average it was about 25% of the main body. Questioning also improved the main content, and I had incorporated the findings/leads from some of the MAD questions in the main body of the post. I keep many half-baked posts in my blog.org file, and had it not been for the questioning that extended the content, I might not have published many of those posts.

## 2. Questioning is a frame of mind

I found that questioning delivered those good results with relatively little effort.

I think the reason for this is because our default mode is to not question things. We have been implicitly taught this at our family, school, and work. Asking too many questions, especially hard ones, is frowned upon in polite company. Moreover, the human brain is programmed to save energy and go easy on itself, so it prefers shallow tasks and tries to avoid intense thinking sessions required for many creative tasks. By asking questions it is possible to prime the pump and keep the brain more engaged.

Coming up with the questions is not difficult, once you get out of your default mode and give yourself the license to indulge in the questioning frame of mind.

By calling these questions MAD questions, I gave myself the license/protection to go wild, uninhibited by traditional assumptions/constraints. This gave rise to reducing the bar, and approaching the topic with a beginner mind, as an outsider. This made it easier to ask more original questions.

By detaching and discarding the baggage of tradition, you can start working from the first principles. You should remember that questioning is the beginning, not the end. There is a separation of concerns: you first focus on what needs to be done, and not how it needs to be done. So, initially, you don't need to be afraid of the difficulty of the answers as it is not your concern for now.

So, don't feel intimidated to question the basic assumptions that people in your field take as granted. They may as well be due for replacement anyway.

## 4. To get to the good questions, get over with the bad questions

Bad questions lead to good questions. So roll with them and exhaust them to get to the good questions. Don't quit prematurely, as good things will happen if you go a bit deeper. And do this in a stress-free manner being assured that something will eventually come if you persist a bit more.

You should approach this like brainstorming and freewriting.

## 5. A good question often involves a perspective change

A good question is like a good joke. It has an unexpected element and provides a perspective change.  After I get through some questions and I eventually arrive to a good question, I have a visceral reaction similar to that of hearing a good joke.

A good question gives you a perspective change that is productive. It opens a new area to explore. It creates new relations/connections.

After following the above suggestions for asking questions, the why and what if questions can help you get to the perspective-changing questions.

A change in perspective is worth 80 IQ points. --Alan Kay

1. Is questioning how we think? If so, can we think better by questioning better?
I feel like my thinking is done mostly via asking questions. But of course I also need to be thinking to ask those questions in the first place, don't I? So I don't know who is the real hero here. (This reminds me of the joke: "My belt holds my pants up, but the belt loops hold my belt up. I don't really know what's happening down there. Who is the real hero?" ---Mitch Hedberg)

To be fair, let's call questioning as a catalyst for thinking. It seems like they bootstrap each other.

Questions also serve as an interrupt for sanity check, self-inspection, and going meta on the task.

Finally, questions also play an important role in the way we master/internalize things. I relate very close to Barry Diller's approach to learning by deconstructing and understanding from the fundamental elements.
DILLER:​ ​By purpose or by temperament, I'm only interested in those things where I haven’t figured it out, and I really do think that however it happened, that when I was presented endlessly with things I didn’t understand, the only thing I could do—because my brain is slower, and therefore is more literal—and therefore my process is, I have to get it down to its tiniest particle, or else... I can’t come in and understand an equation, if you can put it in equation terms, unless I de-equation it—I can’t pick it up. So I’m forced – by a lack of brain matter, I am forced to – no I’m not saying it – it’s true! To break it down as hardly low as I can get it, and only then—and that’s learning. That’s real – that is joyous work to me, is getting through those layers, down to something, and then once I’m down there, once I’m actually at the very, very base of it, I can actually start to do something good.

2. Can we treat questioning as a tool?
I think questioning can be viewed as a tool, and we will increasingly start to treat it as a valuable tool in the age of big data and machine learning information technology markets. The information is already out there, we just need to ask the right questions to the search engines, expert systems, and machine learning based digital assistants. The easy/usual questions will already be asked by the machine learning systems. To differentiate we should learn how to ask the good questions. In Synthetic Serendipity, Vernor Vinge short sci-fi story, the highschool had a course called "Search and Analysis" that aims to teach students to ask the right questions to the search engines and expert systems.

Even when the information is not there, if you ask the right question, find the right perspective to approach to it, the solution bit is not that hard. Questioning acts as a tool for framing/reframing. This is what the master of gedankenexperiment has to say on questioning.
If I had an hour to solve a problem and my life depended on the solution, I would spend the first 55 minutes determining the proper question to ask... for once I know the proper question, I could solve the problem in less than five minutes. --Albert Einstein

So, going forward, shouldn't we have more tools/support for questioning as a tool? What are some tools (mindmaps, etc.) you know that are designed to help support questioning/framing?

3. Is it possible to teach how to ask great questions?
Answers are teachable, so we teach answers. But we don't teach how to question. In kindergarten and primary school, the kids are already pretty good at questioning, but come middle school most kids stop asking questions. I don't know maybe students that ask questions come across hard to manage and contrarian, and get discouraged by teachers and adults.

We (academicians) try to teach questioning during PhD by way of examples/apprenticeship. I am not aware of any organized/formal way that asking better questions is being taught. There is no course or good book on it, as far as I can see.

However, I still believe questioning is a skill and can be teachable. What if we could create a course on this? It will likely be a hands-on learn-by-case-studies class, but at least we can provide more support and material than saying "watch me ask questions and start imitating".

## Friday, December 14, 2018

### Year in Review

As is the custom in a year-end post, here are some highlights among my 2018 posts.