Thursday, March 10, 2011

Megastore: Providing scalable, highly available storage for interactive services

Google's Megastore is the structured data store supporting the Google Application Engine. Megastore handles more than 3 billion write and 20 billion read transactions daily and stores a petabyte of primary data across many global datacenters. Megastore tries to provide the convenience of using traditional RDBMS with the scalability of NOSQL: It is a scalable transactional indexed record manager (built on top of BigTable), providing full ACID semantics within partitions but lower consistency guarantees across partitions (aka, entity groups in Figure 1). To achieve these strict consistency requirements, Megastore employs a Paxos-based algorithm for synchronous replication across geographically distributed datacenters.

I have some problems with Megastore, but I save them to the end of the review to explain Megastore first.

Megastore uses Paxos, a proven, optimal, fault-tolerant consensus algorithm with no requirement for a distinguished master. (Paxos is hard to cover in a blog post, as I mentioned earlier, so I will not attempt to.) The reason Paxos got much more popular than other consensus protocols (such as 2-phase and 3-phase commit) is that Paxos satisfies safety properties of consensus even under asynchrony and arbitrary message loss. This does not conflict with the coordinated attack and FLP impossibility results. Those impossibility results said that you can't achieve consensus (both safety and liveness at the same time), they did not say you have to sacrifice safety under message-losses or asynchrony conditions. So Paxos preserves safety under all conditions and achieves liveness when conditions improve outside the impossibility realm (less message losses, some timing assumptions start to hold).

Basic Paxos is a 2-phase protocol. In the prepare phase the leader replica tries to get the other nonleader replicas recognize it as the leader for that consensus instance. In the accept phase the leader tries to get the nonleader replicas accept the vote it proposes. So basic Paxos requires at least two round trips, and that is very inefficient for WAN usage. Fortunately, there has been several Paxos variants to optimize the performance. One optimization is MultiPaxos, which permits single-roundtrip writes by basically piggybacking the prepare phase of the upcoming consensus instance onto the accept phase of the current consensus instance.

Another optimization is for optimizing the cost of reads. In basic Paxos, a read operation also needs to go through the two phase protocol involving all the replicas (or at least a majority of them) to be serialized and served. The read optimization enables serving reads locally but only at the leader replica. When a nonleader replica gets a read request, it has to forward it to the leader to be served locally there. The read optimization was made possible by having the leader impose a lease on being a leader at other replicas (during which the replicas cannot accept another leader's prepare phase). Thanks to the lease the leader is guaranteed to be the leader until the lease expires, and is guaranteed to have the most up-to-date view of the system and can serve the read locally. The nice thing about the MultiPaxos and local-read-at-the-leader optimizations are that they did not modify any guarantees of Paxos; safety is preserved under all conditions, and progress is satisfied when the conditions are sufficient for making progress.

Megastore's use of PaxosMegastore uses Paxos (with the MultiPaxos extension) in a pretty standard way to replicate a write-ahead log over a group of symmetric peers. Megastore runs an independent instance of the Paxos algorithm for each log position. The leader for each log position is a distinguished replica chosen alongside the preceding log position's consensus value. (This is the MultiPaxos optimization I discussed above.) The leader arbitrates which value may use proposal number zero. The first writer to submit a value to the leader wins the right to ask all replicas to accept that value as proposal number zero. All other writers must fall back on two-phase Paxos. Since a writer must communicate with the leader before submitting the value to other replicas, the system minimizes writer-leader latency. The policy for selecting the next write's leader is designed around the observation that most applications submit writes from the same region repeatedly. This leads to a simple but effective heuristic: use the closest replica.

However, in addition to the straightforward MultiPaxos optimization above, Megastore also introduces a surprising new extension to allow local reads at any up-to-date replica. This came as a big surprise to me because the best anyone could do before was to allow local-reads-at-the-leader. What was it that we were missing? I didn't get how this was possible the first time I read the paper; I only got it in my second look at the paper.

Coordinator, the rabbit pulled out of the hatMegastore uses a service called the Coordinator, with servers in each replica's datacenter. A coordinator server tracks a set of entity groups (i.e., partitions mentioned in the first paragraph) for which its replica has observed all Paxos writes. For entity groups in that set, the replica is deemed to have sufficient state to serve local reads. If the coordinator claims that it is up to date, then the corresponding replica can serve a read for that entity group locally, else the other replicas (and a couple network roundtrips) need to be involved.
But how does the coordinator know whether it is up to date or not? The paper states that it is the responsibility of the write algorithm to keep coordinator state conservative. If a write fails on a replica's Bigtable, it cannot be considered committed until the group's key has been evicted from that replica's coordinator. What does this mean? This means that write operations are penalized to improve the performance of read operations. In MegastorePaxos, before a write is considered committed and ready to apply, all full replicas must have accepted or had their coordinator invalidated for that entity group. In contrast, in Paxos a write could be committed with only a majority of replicas accepting the write.

Performance problemsUsing synchronous replication over WAN of course takes its toll on the performance. This has been noticed and discussed here.

Of course, there is also the performance degradation due to waiting for an acknowledgement (or time out) from all replicas for a write operation. This also leads to a write availability problem. The paper tries to defend that this is not a big problem in practice as follows, but it is evident that partitions/failures result in write unavailability until they are recovered from.

"In the write algorithm above, each full replica must either accept or have its coordinator invalidated, so it might appear that any single replica failure (Bigtable and coordinator) will cause unavailability. In practice this is not a common problem. The coordinator is a simple process with no external dependencies and no persistent storage, so it tends to be much more stable than a Bigtable server. Nevertheless, network and host failures can still make the coordinator unavailable.

This algorithm risks a brief (tens of seconds) write outage when a datacenter containing live coordinators suddenly becomes unavailable--all writers must wait for the coordinator's Chubby locks to expire before writes can complete (much like waiting for a master failover to trigger). Unlike after a master failover, reads and writes can proceed smoothly while the coordinator's state is reconstructed. This brief and rare outage risk is more than justified by the steady state of fast local reads it allows."

In the abstract, the paper had claimed Megastore achieves both consistency and availability, and this was a red flag for me, as we all know that something has to give due to CAP theorem. And above we have seen that write availability suffers in the presence of a partition.

Exercise question
Megastore has a limit of "a few writes per second per entity group" because higher write rates will cause even worse performance due to the conflicts and retries of the multiple leaders (aka dueling leaders). Is it possible to adopt the partitioning consensus sequence numbers technique in "Mencius: building efficient replicated state machines for Wide-Area-Networks (WANs)" to alleviate this problem?

Flexible, Wide-Area Storage for Distributed Systems with WheelFS

One of my students, Serafettin Tasci, wrote a good review of this paper, so I will save time by using his review below, instead of writing a review myself.

In this paper the authors propose a storage system for wide-area distributed systems called WheelFS. The main contribution of WheelFS is its ability of adaptation to different types of applications with different consistency, replica placement or failure handling requirements. This ability is obtained via semantic cues that can be easily expressed in path names. For example to force the primary site of a folder john to be X, we can specify the cue “home/users/.Site=X/john”. This representation enables preserving of POSIX semantics and minor change in application software to use the cues.

In WheelFS, there are 4 groups of semantic cues. Placement cues are used to arrange the location of primaries and replicas of a file or folder. Durability cues specify the number and usage of replicas. Consistency cues maintain a tradeoff between consistency and availability via timeout limits and eventual consistency. And finally, large read cues are useful in reading large files faster via entire file prefetching and usage of neighbor client caches.

WheelFS consists of clients and servers. Clients run applications that use WheelFS and uses FUSE to present the distributed file system to these applications. In addition, all clients have local caches. Servers keep file and directory objects in storage devices. They group objects into structures called slices.

A third component of WheelFS is the configuration service which keeps slice tables that contain object-server assignments. Each entry in the slice table contains a replication policy and replicas for a slice. Configuration service is replicated on a small set of servers and uses Paxos for master election. It also provides a locking interface to servers by which the usage of slices and slice table by servers is coordinated.

When a new file is created, a replication policy is used via the cues and then it contacts the configuration service to see if a slice in the table matches the policy. If no slice matches the policy, the request is forwarded to a random server matching that policy. In addition, WheelFS uses write-local policy in which the primary of a newly created file is the local server by default. This policy enables faster writes.

For replication, WheelFS uses primary/backup replication. For each slice there is a primary server and some backup servers. However, this scheme causes two problems: Firstly, since all operations pass through the primary and updates require the primary to wait ACKs from all backups; this replication scheme may cause significant delays in wide-area systems. Secondly, if the .SyncLevel cue is used the replicas may get some of the updates too late. So, if the primary dies, a backup which replaces the primary may miss some updates and needs to learn the missing updates from other backups.

By default, WheelFS uses close-to-open consistency. But in case of a primary failure, all operations will have to delay waiting for the new primary to start. To avoid this delay, WheelFS provides .EventualConsistency cue that can be used whenever consistency requirements are not strict. In addition, WheelFS uses a write-through cache that improves consistency by writing the copies of each updates in cache to disk with the cost of increased latency.

When clients need to use the data in their cache, they need to get an object lease from the primary to preserve consistency. But this also brings additional latency cost since the primary needs to wait for all leases to complete to make an update on the object.

In the experiments, they present a number of applications that can be built on top of WheelFS such as distributed web cache, email service and file distribution service. Distributed web cache application shows that it provides a comparable performance to popular systems such as CoralCDN. In addition, in case of failures, if eventual consistency is used, it provides consistently high throughput. In file distribution experiment, they revealed that with the help of locality provided via large read cues, it achieves faster file distribution than BitTorrent. Finally, comparison of WheelFS to NFSv4 shows that it is more scalable thanks to the distributed caching mechanism.

Thursday, March 3, 2011

Ceph: A Scalable, High-Performance Distributed File System

Traditional client/server filesystems (NFS, AFS) have suffered from scalability problems due to their inherent centralization. In order to improve performance, modern filesystems have taken more decentralized approaches. These systems replaced dumb disks with intelligent object storage devices (OSDs)--which include CPU, NIC and cache-- and delegated low-level block allocation decisions to these OSDs. In these systems, clients typically interact with a metadata server (MDS) to perform metadata operations (open, rename), while communicating directly with OSDs to perform file I/O (reads and writes). This separation of roles improve overall scalability significantly. Yet, these systems still face scalability limitations due to little or no distribution of the metadata workload.

Ceph is a distributed file system designed to address this issue. Ceph decouples data and metadata operations completely by eliminating file allocation tables and replacing them with generating functions (called CRUSH). This allows Ceph to leverage the intelligence present in OSDs and delegate to OSDs not only data access but also update serialization, replication, failure detection, and recovery tasks as well. The second contribution of Ceph is to employ an adaptive distributed metadata cluster architecture to vastly improve the scalability of metadata access.
Ceph has three main components: 1) the client, each instance of which exposes a near-POSIX file system interface to a host or process; 2) a cluster of OSDs, which collectively stores all data and metadata; and 3) a metadata server cluster, which manages the namespace (file names and directories), consistency, and coherence. In the rest of this review, we describe these three components in more detail.

The client code runs entirely in user space and can be accessed either by linking to it directly or as a mounted file system via FUSE (a user-space file system interface).

File I/O
An MDS traverses the filesystem hierarchy to translate the file name into the file inode. A file consists of several objects, and Ceph generalizes a range of striping strategies to map file data onto a sequence of objects (see my xFS review for an explanation of striping). To avoid any need for file allocation metadata, object names simply combine the file inode number and the stripe number. Object replicas are then assigned to OSDs using CRUSH, a globally known mapping function (we will discuss this in the next section on OSD clusters).

Client synchronization
POSIX semantics require that reads reflect any data previously written, and that writes are atomic. When a file is opened by multiple clients with either multiple writers or a mix of readers and writers, the MDS will revoke any previously issued read caching and write buffering capabilities, forcing client I/O for that file to be synchronous. That is, each application read or write operation will block until it is acknowledged by the OSD, effectively placing the burden of update serialization and synchronization with the OSD storing each object. Since synchronous I/O is a performance killer, Ceph provides a more relaxed option that sacrifices consistency guarantees. With O_LAZY flag, performance-conscious applications which manage their own consistency (e.g., by writing to different parts of the same file, a common pattern in HPC workloads) are then allowed to buffer writes or cache reads.

Ceph delegates the responsibility for data migration, replication, failure detection, and failure recovery to the cluster of OSDs that store the data, while at a high level, OSDs collectively provide a single logical object store to clients and metadata servers. To this end, Ceph introduces the Reliable Autonomic Distributed Object Store (RADOS) system, which achieves linear scaling to tens or hundreds of thousands of OSDs. Each Ceph OSD, in this system, manages its local object storage with EBOFS, an Extent and B-tree based Object File System. We describe the features of RADOS next.

Data distribution with CRUSH
In Ceph, file data is striped onto predictably named objects, while a special-purpose data distribution function called CRUSH assigns objects to storage devices. This allows any party to calculate (rather than look up) the name and location of objects comprising a file's contents, eliminating the need to maintain and distribute object lists. CRUSH works as follows. Ceph first maps objects into placement groups (PGs). Placement groups are then assigned to OSDs using CRUSH (Controlled Replication Under Scalable Hashing), a pseudo-random data distribution function that efficiently maps each PG to an ordered list of OSDs upon which to store object replicas. To locate any object, CRUSH requires only the placement group and an OSD cluster map: a compact, hierarchical description of the devices comprising the storage cluster. As such any party (client, OSD, or MDS) can independently calculate the location of any object without any exchange of distribution-related metadata.

CRUSH resembles consistent hashing. While consistent hashing would use a flat server list to hash onto, CRUSH utilizes a server node hierarchy (shelves, racks, rows) instead and enables the user to specify policies such as "Put replicas onto different shelves than the primary".

Data is replicated in terms of PGs, each of which is mapped to an ordered list of n OSDs (for n-way replication). Clients send all writes to the first non-failed OSD in an object's PG (the primary), which assigns a new version number for the object and PG and forwards the write to any additional replica OSDs. After each replica has applied the update and responded to the primary, the primary applies the update locally and the write is acknowledged to the client. Reads are directed at the primary. This approach also resembles the replication strategy of GFS.

The MDS cluster is diskless and MDSs just serve as an index to the OSD cluster for facilitating read and write. All metadata, as well as data, are stored at the OSD cluster. When there is an update to an MDS, such as a new file creation, MDS stores this update to the metadata at the OSD cluster. File and directory metadata in Ceph is very small, consisting almost entirely of directory entries (file names) and inodes (80 bytes). Unlike conventional file systems, no file allocation metadata is necessary--object names are constructed using the inode number, and distributed to OSDs using CRUSH. This simplifies the metadata workload and allows MDS to efficiently manage a very large working set of files, independent of file sizes.

Typically there would be around 5 MDSs in a 400 node OSD deployment. This looks like an overkill for just providing an indexing service to the OSD cluster, but actually is required for achieving very high-scalability. Effective metadata management is critical to overall system performance because file system metadata operations make up as much as half of typical file system workloads. Ceph also utilizes a novel adaptive metadata cluster architecture based on Dynamic Subtree Partitioning that adaptively and intelligently distributes responsibility for managing the file system directory hierarchy among the available MDSs in the MDS cluster, as illustrated in Figure 2. Every MDS response provides the client with updated information about the authority and any replication of the relevant inode and its ancestors, allowing clients to learn the metadata partition for the parts of the file system with which they interact.

Additional links
Ceph is licensed under the LGPL and is available at
Checking out the competition

Exercise questions
1) How do you compare Ceph with GFS? XFS? GPFS?
2) It seems like the fault-tolerance discussion in Ceph assumes that OSDs are not network-partitioned. What can go wrong if this assumption is not satisfied?

Wednesday, March 2, 2011

Sinfonia: A New Paradigm for Building Scalable Distributed Systems

Sinfonia is an in-memory scalable service/infrastructure that aims to simplify the task of building scalable distributed systems. Sinfonia provides a lightweight "minitransaction" primitive that enables applications to atomically access and conditionally modify data at its multiple memory nodes. As the data model, Sinfonia provides a raw linear address space which is accessed directly by client libraries.

In traditional transactions, a coordinator executes a transaction by asking participants to perform one or more participant-actions (such as retrieving or modifying data items), and at the end of the transaction, the coordinator decides and executes a two-phase commit. In the first phase, the coordinator asks all participants if they are ready to commit. If they all vote yes, in the second phase the coordinator tells them to commit; otherwise the coordinator tells them to abort.

Sinfonia introduces the concept of minitransactions, by making the observation that under certain restrictions/conditions on the transactions, it is possible to optimize the execution of a transaction such that the entire transaction is piggybacked onto just the two-phase commit protocol at the end. For example if the transactions participant-actions do not affect the coordinator's decision to abort or commit then the coordinator can piggyback these actions onto the first phase of the two-phase commit. Taking this a step further, even if a participant-action affects the coordinator's decision to abort or commit, if the participant knows how the coordinator makes this decision, then we can also piggyback the action onto the commit protocol. For example, if the last action is a read and the participant knows that the coordinator will abort if the read returns zero (and will commit otherwise), then the coordinator can piggyback this action onto two-phase commit and the participant can read the item and adjust its vote to abort if the result is zero.

Sinfonia designed its minitransactions so that it is always possible to piggyback the entire transaction execution onto the commit protocol. A minitransaction (Figure 2) consists of a set of compare items, a set of read items, and a set of write items. Items are chosen before the minitransaction starts executing. Upon execution, a minitransaction does the following: (1) compare the locations in the compare items, if any, against the data in the compare items (equality comparison), (2) if all comparisons succeed, or if there are no compare items, return the locations in the read items and write to the locations in the write items, and (3) if some comparison fails, abort. Thus, the compare items control whether the minitransaction commits or aborts, while the read and write items determine what data the minitransaction returns and updates.
To ensure serializability, participants lock the locations accessed by a minitransaction during phase 1 of the commit protocol. Locks are only held until phase 2 of the protocol, a short time. To avoid deadlocks, a participant tries to acquire locks without blocking; if it fails, it releases all locks and votes "abort due to busy lock" upon which the coordinator aborts the minitransaction and retries later. Figure 4 shows the execution and committing of a minitransaction. As a further optimization, if a minitransaction has just one participant, it can be executed in one phase because its outcome depends only on that participant. This case is exactly how key-value stores operate.
Fault-tolerance mechanisms
To provide fault tolerance, Sinfonia uses four mechanisms: disk images, logging, replication, and backup. A disk image keeps a copy of the data at a memory node. For efficiency, the disk image is written asynchronously and so may be slightly out-of-date. To compensate for that, a log keeps recent data updates, and the log is written synchronously to ensure data durability. When a memory node recovers from a crash, it uses a recovery algorithm to replay the log to catch up to its state before the crash. To provide high availability, Sinfonia uses primary-backup approach to replicate memory nodes, so that if a memory node fails, a replica takes over without downtime.

Minitransaction recovery protocols
Recall that in standard two-phase commit, if the coordinator crashes, the system has to block until the coordinator recovers. However, that approach is not suitable for Sinfonia. Recall that participants run on Sinfonia memory nodes whereas coordinators run on application nodes; so coordinators are unstable and very failure-prone. Running a three-phase commit protocol is expensive, and Sinfonia takes a different approach to deal with this issue.

Sinfonia modifies things a little so that instead of blocking on coordinator crashes, Sinfonia blocks on participant crashes. This is reasonable for Sinfonia because participants are memory nodes that keep application data, so if they go down and the application needs to access data, the application has to block anyway. Furthermore, Sinfonia can optionally replicate participants (memory nodes), to reduce such blocking to a minimum. This modification to block on a participant crash, however, complicates the protocols for recovery as we discuss next.

If a coordinator crashes during a minitransaction, it may leave the minitransaction with an uncertain outcome: one in which not all participants have voted yet. To fix this problem, Sinfonia employs a recovery coordinator, which runs at a dedicated management node. The recovery scheme ensures the following: (a) it will not drive the system into an unrecoverable state if the recovery coordinator crashes or if there are memory node crashes during recovery; (b) it ensures correctness even if there is concurrent execution of recovery with the original coordinator (this might happen if recovery starts but the original coordinator is still running); and (c) it allows concurrent execution by multiple recovery coordinators (this might happen if recovery restarts but a previous recovery coordinator is still running).

Concluding remarks
Sinfonia seems to work as promised and simplify the development of scalable distributed systems. The minitransaction primitive is expressive enough to build sophisticated coordination/cooperative algorithms. The authors demonstrate Sinfonia by using it to build two applications: a cluster file system called SinfoniaFS and a group communication service called SinfoniaGCS. Using Sinfonia, the authors built these complex services easily with 3900 and 3500 lines of code, in one and two man-months, respectively. This is not an easy feat.

I, personally, am a big fan of transactions. Transactions really do simplify distributed system development a lot. And transactions does not need to be heavyweight, and Sinfonia shows that by reducing the power of transactions to minitransactions, lightweight transaction execution can be achieved. In my work on wireless sensor networks (WSNs), I had also proposed a similar transactional primitive, Transact, to simplify to development of coordination and cooperation protocols. In Transact, in order to provide a lightweight implementation of transaction processing, we had exploited the inherent atomicity and snooping properties of singlehop wireless broadcast communication in WSNs.

Exercise questions
Recently a reader suggested that I post exercises with each summary, similar to what textbooks do. I decided to give this a try. So here it goes.

1) If we use Sinfonia to build a key-value store (only providing atomic write to single key-value records), what is the overhead of Sinfonia? How would it compare with other popular key-value stores?

2) Is Sinfonia suitable for WAN access, multi-datacenter distribution?

PetaShare: A reliable, efficient and transparent distributed storage management system

This paper by my colleague Tevfik Kosar (to appear soon) presents the design and implementation of a reliable and efficient distributed data storage system, PetaShare, which manages 450 Terabytes of disk storage and spans 7 campuses across the state of Louisiana.

There are two main components in a distributed data management architecture: a data server which coordinates physical access (i.e. writing/reading data sets to/from disks) to the storage resources, and a metadata server which provides the global name space and metadata of the files. Metadata management is a challenging problem in widely distributed large-scale storage systems, and is the focus of this paper.

Petashare architectureThe back-end of PetaShare is based on iRODS. All system I/O calls made by an application are mapped to the relevant iRODS I/O calls. iRODS stores all the system information, as well as user-defined rules in centralized database, which is called iCAT. iCAT contains the information of the distributed storage resources, directories, files, accounts, metadata for files and system/user rules. iCAT is the metadata that we need to manage/distribute in PetaShare.

Multiple iRODS servers interact with the iCAT server to control the accesses to physical data in the resources. Of course, the centralized iCAT server is a single point of failure, and the entire system becomes unavailable when the iCAT server fails. As we discuss next, PetaShare employs asynchronous replication of iCAT to resolve this problem.

Asynchronous multi-master metadata replication
PetaShare first experimented with synchronous replication of the iCAT server. Not surprisingly, this led to high latency and performance degradation on data transfers, because each transfer could be committed only after iCAT servers complete replication. To eliminate this problem, PetaShare adopted an asynchronous replication system.

The biggest problem of asynchronous multi-master replication is that conflicts occur if two sites update their databases within the same replication cycle. For this reason, the proposed multi-master replication method should detect and resolve conflicts. Petashare uses a conceptual conflict resolver that handles such conflicts. Common conflict types are: (i) uniqueness conflicts: occur if two or more sites try to insert the records with the same primary key; (ii) update conflicts: occur if two or more sites try to update the same record within the same replication cycle; (iii) delete conflicts: occur if one site deletes a record from database while another site tries to update this record.

To prevent uniqueness conflicts, ID intervals are pre-assigned to different sites. (This could as well be achieved by prefacing IDs with the site ids.) Update conflicts are handled using the latest write rule if not resolved within a day, but there is a one-day grace period where negotiation (manual conflict handling) can be used. Delete conflicts are also handled similar to update conflicts.

The paper provides real-deployment experiment results on centralized, synchronous, and asynchronous replicated metadata servers. The no-replication column indicates using a centralized metadata server. Table 2 lets us to evaluate the performance of replication methods because the contribution of data transfer to the latency is minimized. For all data sets the asynchronous replication method outperforms the others, since both write and database operations are done locally. Similar to Table1, the central iCAT model gives better results than synchronous replication.

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...