Fault-Tolerant Replication with Pull-Based Consensus in MongoDB

This paper, from NSDI 2021, presents the design and implementation of strongly consistent replication in MongoDB using a consensus protocol derived from Raft.

Raft provides fault-tolerant state-machine-replication (SMR) over asynchronous networks. Raft (like most SMR protocols) uses push-based replication. But MongoDB uses pull-based replication scheme, so when integrating/invigorating MongoDB's SMR with Raft, this caused challenges. The paper focuses on examining and solving these challenges, and explaining the resulting MongoSMR protocol (my term, not the paper's). 

The paper restricts itself to the strongest consistency level, linearizability, but it also talks about how serving weaker models interact/shape decisions made in MongoDB's replication protocol. The paper talks about extensions/optimizations of MongoDB SMR protocol, but I skip those for brevity. I also skip the evaluation section, and just focus on the core of the SMR protocol.



Unlike conventional primary-backup replication schemes where updates are usually pushed from the primary to the secondaries, in MongoDB a secondary pulls updates from other servers, and not necessarily from the primary.

The pull-based approach provides more control of how data is transmitted over the network. Depending on users' needs, the data transmission can be in a star topology, a chaining topology, or a hybrid one. This has big performance and monetory cost implications. For example, when deployed in clouds like Amazon EC2, data transmission inside a datacenter is free and fast, but is expensive and subject to limited bandwidth across datacenters. Using a linked topology, rather than a star topology, a secondary can sync from another secondary in the same datacenter, rather than use up another costly data-transmission link to the primary in the other datacenter.

In earlier releases, MongoDB assumed a semi-synchronous network: either there is manual control of failover, or all messages are bound to arrive within 30 seconds for failure detection. Starting from 2015, the MongoDB replication scheme is remodeled based on the Raft protocol. This new protocol (MongoSMR, which is the topic of this paper) guarantees safety in an asynchronous network (i.e., messages can be arbitrarily delayed or lost) and supports fully autonomous failure recovery with a smaller failover time. Same as before, MongoSMR is still pull-based. 


An oplog is a sequence of log entries that feeds the SMR. Each log entry contains a database operation. Figure 1 shows an example of oplog entry. Notice that each entry is a JSON document. The oplog is stored in the oplog collection, which behaves in almost all regards as an ordinary collection of documents. The oplog collection automatically deletes its oldest documents when they are no longer needed and appends new entries at the other end.

Each slot of the SMR (i.e., each oplog entry) is timestamp based, not sequence number based. Each oplog entry is assigned a timestamp and annotated with the term of the primary. The timestamp is a monotonically increasing logical clock that exists in the system before this work. A pair of term and timestamp, referred to as an OpTime, can identify an oplog entry uniquely in a replica set and give a total order of all oplog entries among all replicas.

Data replication

In Raft the primary initiates AppendEntries RPCs to secondaries to replicate new log entries. In MongoSMR, the primary waits for the secondaries to pull the new entries that are to be replicated.

The principle is to decouple data synchronization via AppendEntries in Raft into two parts: replicas pulling new data from the peers, and replicas reporting their latest replication status so that a request can commit after it reaches a majority of replicas.

The primary processes two types of RPCs from secondaries: PullEntries and UpdatePosition. A secondary will use PullEntries to fetch new logs, and use UpdatePosition to report its status so that the primary can determine which oplog entries have been safely replicated to a majority of servers and commit them. Similar to Raft, once an entry is committed, all prior entries are committed indirectly.


A secondary continuously sends PullEntries to the selected sync source (which may not be the primary) to retrieve new log entries. The PullEntries RPC includes the latest oplog timestamp (prevLogTimestamp) of the syncing server as an argument.

When receiving PullEntries, a server will reply with its oplog entries after and including that timestamp if it has a longer or the same log, or the server could reply with an empty array if its log sequence is shorter. Before returning a response when the log is the same, PullEntries waits for new data for a given timeout (5 seconds by default) to avoid busy looping.


After retrieving new entries into its local oplog with PullEntries, the secondary sends UpdatePosition to its sync source to report on its latest log entry's OpTime.

When receiving the UpdatePosition, the server will forward the message to its sync source, and so forth, until the UpdatePosition reaches the primary.

The primary maintains a non-persistent map in memory that records the latest known log entry's OpTime on every replica, including its own, as their log positions. When receiving a new UpdatePosition, if the received one is newer, the primary replaces its local record with the received OpTime. Then, the primary will do a count on the log positions of all replicas: If a majority of replicas have the same term and the same or greater timestamp, the primary will update its lastCommitted to that OpTime and notify secondaries of the new lastCommitted by piggybacking onto other messages, such as heartbeats and the responses to PullEntries. lastCommitted is also referred to as the commit point.

Oplog replication

Recall that each oplog entry is a document, and oplog is a collection. MongoSMR leverages this to implement oplog replication as a streaming query. Instead of initiating continuous RPC's on the syncing node, the PullEntries RPC is implemented as a query on the oplog collection with a "greater than or equal to" filter on the timestamp field. The query can be optimized easily since the oplog is naturally ordered by timestamp. Using database cursors allows the syncing node to fetch oplog entries in batches and also allows the RPC to work in a streaming manner, so that a sync source can send new data without waiting for a new request, reducing the latency of replication.

Sync source selection

MongoSMR introduced Heartbeats RPC to decoupled the heartbeat responsibility from Raft's AppendEntries RPC. Heartbeats are sent among *all* replicas, and are used for liveness monitoring, commit point propagation and sync source selection.

A server chooses its sync source only if the sync source has newer oplog entries than itself by comparing their log positions (learned via Heartbeat RPC). This total order on log positions guarantees that the replicas can never form a cycle of sync sources.


A crucial difference between MongoRep and Raft

In Raft, if a server has voted for a higher term in an election, the server cannot take new log entries sent from an old primary with a lower term. In contrast in MongoSMR, even if the sync source is a stale primary with a lower term number, the server would still fetch new log entries generated by the stale primary. This is because the PullEntries RPC does not check the term of the sync source (it only checks OpTimes).

Before we explore the correctness implications of this, let's talk about why MongoDB does not check the term of the sync source, and eagerly replicate entries. This has to do with achieving faster failovers and preserving uncommitted oplog entries.

In addition to strong consistency considered in this paper, MongoDB supports fast but weak consistency levels that acknowledge writes before they are replicated to a majority. Thus, a failover could cause a large loss of uncommitted writes. Though the clients are not promised durability with weak consistency levels, MongoDB still prefers to preserve these uncommitted writes as much as possible.

For this purpose, it introduced an extra phase for a newly elected primary: the primary catchup phase. The new primary will not accept new writes immediately after winning an election. Instead, it will keep retrieving oplog entries from its sync source until it does not see any newer entries, or a timeout occurs. This timeout is configurable in case users prefer faster failovers to preserving uncommitted oplog entries. This primary catchup design is only possible because in MongoSMR, a server (including the new primary) is allowed to keep syncing oplog entries generated by the old primary after voting for a higher term as long as it hasn't written any entry with its new term. This important difference between MongoDB and Raft allows MongoDB to preserve uncommitted data as much as possible during failovers.


Let's explore the correctness implications of this difference. Consider Fig 2.c. There it seems like value 2 (in blue) is anchored and decided, but it is not! It is just replicated widely, but some of those replicas are from a newer term and was done only to be able to recover more entries when using weaker consistency levels.

If we take Raft's rule that "a log entry is committed once the leader that created the entry has replicated it on a majority of the servers" without any qualifiers, indeed value 2 would be counted as committed, only later to be overturned. To prevent cases like this from happening, MongoSMR adds a new argument in the UpdatePosition RPC: the term of the syncing server. The recipient of UpdatePosition will update its local term if the received term is higher. If the recipient is the stale primary, seeing a higher term will make the primary step down before committing anything, thus avoiding any safety issue.

Therefore, in the above example, when server A receives UpdatePosition from Server C/D, it will see term 3 and step down immediately without updating its lastCommitted. Even though the entry with term 2 is in a majority of servers’ logs, it is not committed.

This revised UpdatePosition manages to maintain a key invariant in Raft --Leader Completeness Property. This property refers to the fact that "if a log entry is committed in a given term, then that entry will be present in the logs of the leaders for all higher-numbered terms."

To verify that MongoSMR design and implementation are correct, the team had done extensive verification and testing on the protocol including model checking using TLA+, unit testing, integration testing, fuzz testing, and fault-injection testing. The TLA+ specification of the protocol is available here. 


Chain replication

Reading MongoSMR may lead people to think that the lines between Paxos/Raft SMR and chain-replication is somewhat blurred. If MongoRep uses a chained topology, what would be the differences from chain-replication?

Well, there are big differences. In chain replication, you fall back to the next node in the chain as the new primary. This is a restrictive (inflexible in terms of options) albeit efficient way to have log monotonicity/retainment. In MongoSMR, any node can become the new primary. The log monotonicity/retainment comes from Raft leader election rather than the topology.

A bigger difference is of course in the philosophy of the two approaches. Chain replication requires a separate consensus box to maintain the topology of the chain. Having an external consensus box hosted outside the replicaset causes logistics and fatesharing issues about whether what the consensus box agrees on has good fidelity to the field/replicaset.  (Well, there are versions of chain replication which puts consensus in the chain, and yeah that blurs the lines a bit.) In Paxos/Raft SMR, the consensus is part of the SMR. So it comes with batteries included for fault-tolerant state machine replication. 


We talked about the advantages of pull-based replication over push-based replication, and mentioned that it allows more flexible topologies rather than just the star topology where the primary is in the middle. It is in fact possible to be flexible with push-based replication and solve throughput/performance problems stemming from using the star topology. In our 2020 work, PigPaxos, we showed how that is possible using relay nodes. At that time, we did not know of MongoDB's chaining topology/approach, and hadn't mentioned it. 


Here is the NSDI'21 presentation of the paper.

Aleksey has a review of the paper accompanied by a presentation video.


Popular posts from this blog

The end of a myth: Distributed transactions can scale

Foundational distributed systems papers

Hints for Distributed Systems Design

Learning about distributed systems: where to start?

Metastable failures in the wild

Scalable OLTP in the Cloud: What’s the BIG DEAL?

SIGMOD panel: Future of Database System Architectures

Amazon Aurora: Design Considerations + On Avoiding Distributed Consensus for I/Os, Commits, and Membership Changes

Dude, where's my Emacs?

There is plenty of room at the bottom