Verifying Transactional Consistency of MongoDB

This paper presents pseudocodes for the transaction protocols for the three possible MongoDB deployments: WiredTiger, ReplicaSet, and ShardedCluster, and shows that these satisfy different variants of snapshot isolation: namely StrongSI, RealtimeSI, and SessionSI, respectively.

Background

MongoDB transactions have evolved in three stages (Figure 1):

  • In version 3.2, MongoDB used the WiredTiger storage engine as the default storage engine. Utilizing the Multi-Version Concurrency Control (MVCC) architecture of WiredTiger storage engine, MongoDB was able to support single-document transactions in the standalone deployment.
  • In version 4.0, MongoDB supported multi-document transactions in replica sets (which consists of a primary node and several secondary nodes),
  • In version 4.2, MongoDB further introduced distributed multi-document transactions in sharded clusters (which is a group of multiple replica sets among which data is sharded).

I love that the paper managed to present the transactional protocols on these three deployment types in a layered/superimposed manner. We start with the bottom layer, the WiredTiger transactions in Algorithm 1. Then the replicaset algorithm, Algorithm 2, is presented, which uses primitives from Algorithm 1. Finally, the ShardedCluster transactions algorithm is presented, using primitives from Algorithm 2. Ignore the underlined and highlighted lines in Algorithms 1 and 2; they are needed for the higher layer algorithms, which are discussed later on.

If you need a primer on transaction isolation layers, you can check this and this. The TLA+ model I presented for snapshot isolation is also useful to revisit to understand how snapshot isolated works in principle. 

WiredTiger (WT) transactions

Clients interact with WiredTiger via sessions. Each client is bound to a single session with a unique session identifier wt_sid. At most one transaction is active on a session at any time. Intuitively, each transaction is only aware of all the transactions that have already been committed before it starts. To this end, a transaction txn maintains txn.concur (the set of identifiers of currently active transactions that have obtained their identifiers) and txn.limit (the next transaction identifier, tid, when txn starts).

A client starts a transaction on a session wt_sid by calling wt_start, which creates and populates a transaction txn (lines 1:2–1:5). Particularly, it scans wt_global to collect the concurrently active transactions on other sessions into txn.concur. tid tracks the next monotonically increasing transaction identifier to be allocated. When a transaction txn starts, it initializes txn.tid to 0. The actual (non-zero) txn.tid is assigned when its first update operation is successfully executed. A transaction txn with txn.tid $\neq$ 0 may be aborted due to a conflict caused by a later update.

To read from a key, we iterate over the update list store[key] forward and returns the value written by the first visible transaction (line 1:10). To update a key, we first check whether the transaction, denoted txn, should be aborted due to conflicts (lines 1:14–1:17). To this end, we iterates over the update list store[key]. If there are updates on key made by transactions that are invisible to txn and are not aborted, txn will be rolled back. If txn passes the conflict checking, it is assigned a unique transaction identifier, i.e., tid, in case it has not yet been assigned one (line 1:19). Finally, the key-value pair ⟨key,val⟩ is added into the modification set txn.mods and is inserted at the front of the update list store[key].

To commit the transaction on session wt_sid, we simply reset wt_global[wt_sid] to $\bot tid$, indicating that there is currently no active transaction on this session (line 1:32). To roll back a transaction txn, we additionally reset txn.tid in store to −1 (line 1:38). Note that read-only transactions (which are characterized by txn.tid=0) can always commit successfully.

Replica Set Transactions

A replica set consists of a single primary node and several secondary nodes. All transactional operations, i.e., start, read, update, and commit, are first performed on the primary. Committed transactions are wholesale-replicated to the secondaries via a leader-based consensus protocol similar to Raft. In other words, before the completion of the transaction, the entire effect of transaction is sent to secondaries and are majority replicated with the assigned timestamp.

We don't go in to this in the protocol description here, but there is a clever speculative snapshot isolation algorithm used by the primary for transaction execution. I summarized that at the end of my review for "Tunable Consistency in MongoDB". Here is the relevant part: MongoDB uses an innovative strategy for implementing readConcern within transactions that greatly reduced aborts due to write conflicts in back-to-back transactions. When a user specifies readConcern level “majority” or “snapshot”, the returned data is guaranteed to be committed to a majority of replica set members. Outside of transactions, this is accomplished by reading at a timestamp at or earlier than the majority commit point in WiredTiger. However, this is problematic for transactions: It is useful for write operations to read the freshest version of a document, since the write will abort if there is a newer version of the document than the one it read. This motivated the implementation of “speculative” majority and snapshot isolation for transactions. Transactions read the latest data for both read and write operations, and at commit time, if the writeConcern is w:“majority”, they wait for all the data they read to become majority committed. This means that a transaction only satisfies its readConcern guarantees if the transaction commits with writeConcern w:“majority”. 

ReplicaSet uses hybrid logical clocks (HLC) as the read and commit timestamps of transactions. When a transaction starts, it is assigned a read timestamp on the primary such that all transactions with smaller commit timestamps have been committed in WiredTiger. That is, the read timestamp is the maximum point at which the oplog of the primary has no gaps (1:42).

When the primary receives the first operation of an transaction (lines 2:4 and 2:11), it calls open_wt_session to open a new session wt_sid to WiredTiger, start a new WiredTiger transaction on wt_sid, and more importantly set the transaction’s read timestamp. The primary delegates the read/update operations to WiredTiger (lines 2:7 and 2:14). If an update succeeds, the ⟨key,val⟩ pair is recorded in txn_mods[rs_sid] (line 2:16). To commit a transaction, the primary first atomically increments its cluster time ct via tick, takes it as the transaction’s commit timestamp (line 2:23), uses it to update max_commit_ts, and records it in wt_global (lines 2:24 and 1:46).

If this is a read-only transaction, the primary appends a noop entry to its oplog (line 2:27; Section 4.1.2). Otherwise, it appends an entry containing the updates of the transaction. Each oplog entry is associated with the commit timestamp of the transaction. Then, the primary asks WiredTiger to locally commit this transaction in wt_commit (line 2:30), which associates the updated key-value pairs in store with the commit timestamp (line 1:31). Note that wt_commit needs not to be atomically executed with tick and wt_set_commit_ts.

Finally, the primary waits for all updates of the transaction to be majority committed (line 2:31). Specifically, it waits for last_majority_committed \geq ct, where last_majority_committed is the timestamp of the last oplog entry that has been majority committed.

Sharded cluster transactions

A client issues distributed transactions via a session connected to a mongos. The mongos, as a transaction router, uses its cluster time as the read timestamp of the transaction and forwards the transactional operations to corresponding shards. The shard which receives the first read/update operation of a transaction is designated as the transaction coordinator.

If a transaction has not been aborted due to write conflicts in sc_update, the mongos can proceed to commit it. If this transaction is read-only, the mongos instructs each of the participants to directly commit locally via rs_commit; otherwise, the mongos instructs the transaction coordinator to perform a variant of two-phase commit (2PC) that always commits among all participants (line 4:9). So, the coordinator sends a prepare message to all participants. After receiving the prepare message, a participant computes a local prepare timestamp and returns it to the coordinator in a prepare_ack message. When the coordinator receives prepare_ack messages from all participants, it calculates the transaction’s commit timestamp by taking the maximum of all prepare timestamps (line 4:14), and sends a commit message to all participants. After receiving dec_ack messages from all participants, the coordinator replies to the mongos (line 4:18).

Consider a session sc_sid connected to a mongos. We use read_ts[sc_sid] to denote the read timestamp, assigned by the mongos, of the currently active transaction on the session. ShardedCluster uses HLCs which are loosely synchronized to assign read and commit timestamps to transactions. Due to clock skew or pending commit, a transaction may receive a read timestamp from a mongos, but the corresponding snapshot may not yet be fully available at transaction participants. This will lead to delaying of the read/update operations until the snapshot becomes available. These cases are referred to as Case-XXX in the below description.

If this is the first operation the primary receives, it calls sc_start to set the transaction’s read timestamp in WiredTiger (line 4:23). In sc_start, it also calls wait_for_read_concern to handle Case-Clock-Skew and Case-Holes (line 4:24). The primary then delegates the operation to ReplicaSet (lines 4:3 and 4:7). To handle Case-Pending-Commit-Read, rs_read has been modified to keep trying reading from WiredTiger until it returns a value updated by a committed transaction (line 2:8). To handle Case-Pending-Commit-Update, rs_update first performs an sc_read on the same key (line 2:12). Moreover, if the update fails due to write conflicts, the mongos will send an abort message to the primary nodes of all other participants, without entering 2PC.

In 2PC, the transaction coordinator behaves as we described above. On the dual side, after receiving a prepare message, the participant advances its cluster time and takes it as the prepare timestamp (lines 4:27, 4:28, 1:54, and 1:58). Note that the transaction’s tid in wt_global is reset to ⊥tid (line 1:59). Thus, according to the visibility rule, this transaction is visible to other transactions that starts later in WiredTiger. Next, the participant creates an oplog entry containing the updates executed locally or a noop oplog entry for the “speculative majority” strategy. Then, it waits until the oplog entry has been majority committed (line 4:34). When a participant receives a commit message, it ticks its cluster time. After setting the transaction’s commit timestamp (line 4:39), it asks WiredTiger to commit the transaction locally (line 4:40). Note that the status of the transaction is changed to committed (line 1:70). Thus, this transaction is now visible to other waiting transactions (line 2:8). Then, the participant generates an oplog entry containing the commit timestamp and waits for it to be majority committed.

Discussion

The paper provides a nice simplified/understandable overview of MongoDB transactions. It mentions some limitations of this simplified model. The paper assumed that each procedure executes atomically, but the implementation of MongoDB is highly concurrent with intricate locking mechanisms. The paper also did not consider failures and explore the fault-tolerance and recovery of distributed transactions.

As an interesting future research direction, I double down on the cross-layer opportunities I had mentioned in my previous post. The layered/superimposed presentation in this paper strengthens my hunch that we can have more cross layer optimization opportunities in MongoDB going forward.

So what did we learn about MongoDB transactions. They are general transactions, rather than limited one-shot transactions. They use snapshot isolation, reading from a consistent snapshot, and aborting only on a write-write conflict, similar to major RDBMS transactions.

They are "OCC", but thanks to the underlying WiredTiger holding the lock on first access, they are less prone to aborting then a pure OCC transaction. An in-progress transaction stops later writes (be it from other transactions or single writes) instead of getting aborted by them. In other words, the first writer claims the item (for some time).

That being said, this is not truly locking and holding. MDB transactions still favor progress, as they do not like waiting. They would just die instead of waiting, ain't nobody got time for that.

Comments

Mark Callaghan said…
How does the latency from this compare to what CockroachDB does -- where reads might have to wait out the max clock drift?
* "If this is a read-only transaction, the primary appends a noop entry to its oplog"
* "If this transaction is read-only, the mongos instructs each of the participants to directly commit locally via rs_commit"

https://www.cockroachlabs.com/blog/living-without-atomic-clocks/

Popular posts from this blog

Hints for Distributed Systems Design

Learning about distributed systems: where to start?

Making database systems usable

Looming Liability Machines (LLMs)

Advice to the young

Foundational distributed systems papers

Distributed Transactions at Scale in Amazon DynamoDB

Linearizability: A Correctness Condition for Concurrent Objects

Understanding the Performance Implications of Storage-Disaggregated Databases

Designing Data Intensive Applications (DDIA) Book