Tuesday, December 20, 2011

Pregel: a system for large-scale graph processing

For large-scale graph processing one way to go is, of course, to use Hadoop and code the graph algorithm as a series of chained MapReduce invocations. MapReduce, however, is a functional language, so using MapReduce requires passing the entire state of the graph from one stage to the next, which is inefficient (as I alluded to at the end of this summary).

Google Pregel provides a simple straightforward solution to the large-scale graph processing problems. The Pregel solution is to use round(superstep)-based synchronized computation at the vertices supported with message-passing between the rounds. Pregel keeps vertices and edges on the machines that perform computation, and uses network transfers only for messages. This way Pregel avoids the communication overhead and programming complexity incurred by MapReduce chained iterations.

In Pregel, in each iteration (superstep), a vertex can receive messages sent to it in the previous iteration, send messages to other vertices, modify its own state and its outgoing edges' states, and mutate the graph's topology. This synchronized superstep model is inspired by Valiant’s Bulk Synchronous Parallel model. More specifically, at a superstep S, each active vertex V (in parallel and in isolation) reads messages sent to itself in superstep S-1, send messages to other vertices that will be received at superstep S+1, and modify the state of V and its outgoing edges.

Messages are accumulated and sent in batch-mode along outgoing edges, but a message may be sent to any vertex whose identifier is known. For example, the graph could be a clique, with well-known vertex identifiers V1 through VN, in which case there may be no need to even keep explicit edges in the graph. (This way Pregel reduces to a distributed message-passing programming system with N nodes.) A vertex can inspect and modify the values of out-edges. Conflicts can arise due to concurrent vertex add/remove requests, and are relegated to be resolved by user-defined handlers. This introduces significant complexity and could become a source of programmer errors.

It seems like default Pregel partitioning is not locality-preserving. This was surprising to me as this could cause excessive communication across nodes lead to inefficiency/waste. From the paper: "The default partitioning function is just hash(ID) mod N, where N is the number of partitions, but users can replace it. The assignment of vertices to worker machines is the main place where distribution is not transparent in Pregel. Some applications work well with the default assignment, but some benefit from defining custom assignment functions to better exploit locality inherent in the graph. For example, a typical heuristic employed for the Web graph is to colocate vertices representing pages of the same site."

The user program begins executing on a cluster of machines over the partitioned graph data. One of the machines acts as the master and coordinates worker activity. "The master determines how many partitions the graph will have, and assigns one or more partitions to each worker machine. The number may be controlled by the user. ... Each worker is also given the complete set of assignments for all workers [so that the worker knows which other worker to enqueue messages for its outgoing edges]." Fault-tolerance is achieved by checkpointing and replaying on machine failure. Note that if you write a self-stabilizing graph algorithm, then you can disable fault-tolerance and finish early.

The key to the scalability of Pregel is batch messaging. The message passing model allows Pregel to amortize latency by delivering messages asynchronously in batches between supersteps. Pregel is said to scale to billions of vertices and edges, but I am not sure what this means.  For some graphs, I reckon superhubs would limit scalability significantly. It is not clear if Pregel has mechanisms/optimizations to handle superhubs in some graphs.

Another question that comes to my mind is that how much of the work that is currently done by Hadoop can be (should be) moved to Pregel. I guess for any job where the data can be easily/naturally modeled as a graph (pagerank, social graph analysis, network analysis), Pregel is applicable and may be preferable to Hadoop. Especially, the ability to modify vertices/edges on-the-fly makes Pregel very flexible to accommodate a rich class of applications.

A major downside for Pregel is that it offloads a lot of responsibility to the programmer. The programmer has to develop code for this decentralized vertex-mode with round-based messaging. This model leads to some race-conditions as discussed above and those conflicts are also left to the programmer to deal with.

I am working on a Maestro architecture that can alleviate these problems. (I plan to write about Maestro here soon.) Maestro accepts as input a centralized program and takes care of decentralization and synchronization/locking of shared variables in an efficient manner. Maestro also uses a master for coordinating workers (unsurprisingly). But the master has more responsibility in Maestro; it is involved in synchronizing access to shared variables. (Recall that there are no shared variables in Pregel, so master does not get involved in synchronizing and locking.) In return, Maestro relieves the programmer  from writing decentralized code and handlers for data race conditions among vertices.

Pregel already has an opensource cloud implementation (Golden Orb). My plan next is to modify Golden Orb to see whether we can quickly develop a cloud implementation for Maestro.

Monday, December 19, 2011


"scooping (v): publish a news story before (a rival reporter, newspaper, or radio or television station)." Scooping is what happens when another team beats you to publishing results on a problem. Getting scooped is very frustrating and is dreaded by many PhD students. I heard stories about poor Math PhD students who worked on a problem for years only to discover that they got scooped by a couple months.

OK, then what is reverse-scooping? It is a term I coined last year. (Of course, the irony is that after a Google search I discovered that I was almost reverse-scooping someone else ;-). In reverse-scooping, you solve a problem and publish it first. Then several months later, another team (generally from a better-known university) solves the same problem and publish it at a more visible venue. They get all the credit, their work gets cited a lot, and it is as if your work doesn't exist! Congratulations, you got reverse-scooped.

I got reverse-scooped more than a couple of times (obviously I am not going to name publications here). There is no clear definition of same work or similar work, so there is no clear definition of how many times you got reverse-scooped. But it happens, and you know it when it happens to you. Reverse-scooping is often something that could have been easily avoided by doing a simple Google search for the keyterms (or even the title) of the paper. Could it be a simple omission that the offending authors failed to do a Google search and miss your work? That is hard to believe (but I am often generous to give the benefit of doubt). Sometimes reverse-scooping is more insidious: the offending paper cites your paper while seriously downplaying your results. The result is the same, your work will not get the credit and will not get cited further by papers citing this new paper.

Getting reverse-scooped is at least as frustrating as getting scooped. The first few times it happened to me I was very angry. But now I came to find fault with myself when I get reverse-scooped. I take getting reverse-scooped to mean that I should have polished and published that work at the best venue I possibly could. Maybe I should have publicized the idea better and elaborated on the idea further to make my contributions crystal-clear. Reverse-scooping is ugly, and I am not trying to rationalize it. But I find this new way of thinking to be more constructive than getting angry and complaining.

Friday, December 16, 2011

Scalable SPARQL Querying of Large RDF Graphs

Due to its excellent price/performance ratio, Hadoop has become the kitchen sink of big data management and analysis. Hadoop defaults are, of course, not suitable for every kind of data analysis application. For example using Hadoop on relational data processing incurs a lot of waste/inefficiencies. Similarly for graph data processing Hadoop is very inefficient. This paper by Abadi et. al. shows that Hadoop's efficiency on graph data (for semantic web subgraph matching application) can be improved 1000 times by fixing the following defaults in Hadoop.

1. Hadoop, by default, hash partitions data across nodes. For graph processing this is very inefficient. The paper advocates using a locality-preserving partitioning (METIS partitioner) that maps nearby vertices to the same worker as much as possible.

2. Hadoop, by default, replicates each data 3 times. "However, ... the data that is on the border of any particular partition is far more important to replicate than the data that is internal to a partition and already has all of its neighbors stored locally. It is a good idea to replicate data on the edges of partitions so that vertexes are stored on the same physical machine as their neighbors." For this, the paper uses a custom triple replication module.

3. Hadoop, by default, uses HDFS and HBase for storing data. These are not optimal for web semantics graph data which is of RDF form (subject-predicate-object triplet). The paper uses RDF-Store for storing web semantics graph data.

Daniel's blog mentions that each fix contributes a 10 fold improvement in efficiency which yields a 1000 fold improvement in total. The experiments results are taken using the Lehigh University Benchmark (LUBM) for semantic web querying. For queries that take less than a second to compute on a single machine, the single-machine solution was faster than both the Hadoop-default and Hadoop-optimized. Of course for these fast queries a lookup to another worker requires network communication and incurs a relatively large overhead. Therefore, Hadoop-default is at a big disadvantage for fast queries. For slow queries (that take from 10 sec to 1000 sec on a single machine) there were still cases where Hadoop-optimized was 1000 times faster than hadoop-default.

It would have been nice if the paper included Hadoop-default pseudocode as well as Hadoop-optimized pseudocode. I want to see what (if anything) changed in the code. Here is another noteworthy implementation detail from the paper. The paper had to revert to some customizations in vertex partitioning. "To facilitate partitioning a RDF graph by vertex, we remove triples whose predicate is rdf:type (and other similar predicates with meaning "type"). These triples may generate undesirable connections, because if we included these "type" triples, every entity of the same type would be within two hops of each other in the RDF graph (connected to each other through the shared type object). These connections make the graph more complex and reduce the quality of graph partitioning significantly, since the more connected the graph is, the harder it is to partition it."

So, what are the fundamental contributions in this work? After all, it is now becoming a folk theorem that it is easy to make minor modifications/configurations to Hadoop to yield large performance improvements. Gun Sirer puts it nicely: "if you have a Hadoop job whose performance you're not happy with, and you're unable to speed it up by a factor of 10, there is something wrong with you." The first technique of locality preserving distribution of graph data over workers is a pretty obvious idea because it is the most sensible thing to do. The second technique of replicating border vertices is interesting and promising. However, this technique is inapplicable for graph applications that modify the graph data. The semantic web subgraph matching application did not modify the graph; it only read the graph. If instead of subgraph-matching, had we considered a graph-subcoloring application (or any such application that modified the graph), the replication would not be valid because it would be very hard to maintain consistency among the replicas of the boundary vertices.

For applications that modify the graph, even after fixing the inefficient defaults to sensible alternatives, there would still be inherent inefficiency/waste in Hadoop due to the functional nature of MapReduce programming.  For such graph-modifying applications, MapReduce is not a good fit as it neccessitates numerous iterations over the graph data and is wasteful. People don't care about this waste, because in batch execution mode this waste is not obvious/visible. Also, in return for this waste Hadoop enables hassle-free scale-out, which makes it acceptable. However, for real-time tight-synchronization-requiring applications this waste becomes clear by way of unacceptable latency and has to be dealt with. Obviously, there are other data processing tools for graphs, such as Google Pregel. The paper plans to compare with Pregel, and I also plan to write a summary of Pregel soon.

Thursday, September 29, 2011

Let's go back to the fountain pen

I love my Emacs (and org-mode), but there is something else about my fountain-pen. My fountain-pen is excellent for doodling and doing creative thinking. I guess the personality and non-uniformity of the handwriting adds a lot to the thinking process. Or maybe it is that handwriting requires more hand-eye coordination, or is more relaxing than typing. We are wired as analog and may process/control analog things better. I don't know what that special thing is. But for a task that requires deep thinking, I first hack at it with my pen and then move to Emacs to edit the text and digitally save/archive the text.

They should produce a tablet with a natural (high resolution, comfortable) pen input. That would save me a lot of time from having to type my writing to make it digitally available. I tried some tablets, and I was not happy with their pen support. I would be a loyal customer for an iPad like tablet with natural pen input. It could be a specialized device, it doesn't have to be a tablet-computer. I will just be happy to write on it naturally, and read papers, browse web.

Privacy through lies

A short sci-fi story by Vernor Vinge, synthetic serendipity, mentions "friends of privacy" which fabricates lies to re-achieve privacy that is violated by web-services *cough*Facebook*cough*. Here is an excerpt.
Doris Nguyen. Former homemaker. Mike eyed the youngish face. She looked almost his mom’s age, even though she was 40 years older. He searched on the name, shed collisions and obvious myths; the Friends of Privacy piled the lies so deep that sometimes it was hard to find the truth. But Doris Nguyen had no special connections in her past.
This privacy through lies idea has been explored somewhat. I have heard of a proposal where a node sends 4 different data to be processed (1 correct and 3 incorrect), so the server cannot learn what is the correct data, and the node can use only the reply to the correct data and discards the other 3 replies. I was wondering if this "privacy through lies" approach can be or is explored further.

Thursday, September 22, 2011

Trip notes from Mobicom 2011

I am attending MOBICOM'11 at Las Vegas, and mostly enjoying it. I took some notes from the talks. Here I will share my notes about a couple of talks which I found more interesting than the others.

  • SmartVNC: An effective remote computing solution for smartphones. The goal here is to improve on the usability of solutions (such as AndroidVNC) that allow accessing a remote PC from a smartphone. The problem with AndroidVNC is task-inflation: user needs to do a lot of zooming and panning and also the small keyboard is inconvenient for typing. The solution offered is to use macros to automate common user-specific tasks in AndroidVNC. The macros appear as a sidebar and the user can click a macro to perform several steps quickly. These macros are defined as hooks at the GUI level (instead of app-level or OS-level) so that they can be application-agnostic as well as robust and extensible
  • Detecting driver phone use leveraging car speakers. The goal is to block the delivery of a call or SMS for the phone in the driver seat so that the driver is not tempted to take the call. The proposed scheme is as follows. The phone sends a "beep" signal over Bluetooth to be played by the right speaker and records the round-trip time, then the phone sends a beep signal to be played by the left speaker and records the round-trip time. (Most cars at least have two channel speakers to make this scheme possible.) By using the offset round-trip times, the phone can localize itself to be in the driver seat or the passenger seat. (Yes, there are obvious/acknowledged problems with the scheme: it is not 100% robust; driver's phone sitting in the passenger seat defeats the scheme; the audible beep alerts the driver to an incoming call which may defeat the purpose.)
  • I am the antenna: accurate outdoor AP location using smartphones. The goal is to develop an app to escort a smartphone user that is connected to a WAP with a weak signal to walk towards the WAP so that he can get a stronger signal. The solution is to exploit the "body-blocking effect" to figure out the direction of the WAP. The human body blocks the RSS from the WAP significantly, so when the user's back is turned toward the WAP (that is when our body is in between the phone and the WAP) the RSS degrades significantly. In this scheme, the user fires up the app, makes a full rotation slowly, and the app tells him which direction to walk to  in order to get to the WAP. Rinse and repeat several times to locate the WAP (we could as well call this the whirling-dervish app). Unfortunately, this scheme does not work inside the buildings due to multipath effects. But it is still useful for a user outside to be able to locate the WAP in a building.

As evident from these selected talks, there is tangible excitement about smartphone research at Mobicom; but I guess the community has not completely figured out solid research directions to pursue in the smartphones area yet. In addition to the smartphone talks, there were talks on all kinds of wireless communication topics, including opportunistic routing (still kicking in 2011?), and physical and MAC layer work for wi-fi. With the help of software-defined radio platforms, the researchers can now fiddle with the wi-fi platform at granularity not possible before. As a result of this new-found flexibility, there were a couple of works which were essentially re-discovering/re-inventing some wireless sensor network MAC protocols done in 2003-2005 in the wi-fi domain.

At the end of the first day, there was an outrageous opinions session which was a lot of fun. It consisted of 8 talks with 5 minutes each. There were only two serious exploratory ideas, and the remaining 5-6 talks were in the spirit of stand-up comedy for CSE nerds. A majority of the talks were proposals (mostly tongue-in-cheek) about how to improve the review process and the conference. I like the concept a lot, and will try to include some outrageous posts of my own occassionally in this blog. By the way, here is my outrageous opinion about improving the conference experience. The keynote speaker should give $20 for each attendee as pre-compensation for their one hour. If the attendees think the talk was well-worth the $20, they will return the money back to the speaker else they keep the $20 bill as a compensation for the lost time.

During the coffee and lunch breaks I got to meet/catchup and chitchat with several colleagues. Those were probably the most fruitful time I had at the conference. We exchanged a lot of ideas, insights, stories, and gossip.  I think conferences are invaluable not because of the papers/talks, but because of these personal interactions/communications. You have to catch me in person to hear more of my Mobicom stories.

Finally, I should write a bit about Las Vegas. Vegas is home to fake Venetian canals, fake Roman statues, fake palaces, a fake Eiffel tower, fake volcanos, fake lakes, fake pirate ships, and fake hopes that fade at dawn in front of the slot machines (--while going to the registration desk at 7am the first day, I came across several people still gambling with red-eyes and frustrated faces at the casino). That said, I really liked the Bellagio fountain show; it was original and spectacular.  The conference hotel was nice, the food good. But... <bitching> the wireless in the conference was unusable the first day, and completely unavailable for me the second day. It is ironic to have these problems at a conference specializing on wireless communication. </bitching> To finish on a high note, the conference banquet was one of the best I have ever been to. It was held at the Secret Garden in the Mirage hotel. We got to see Siberian tigers, Albino lions, Albino tigers, leopards, and got to watch the dolphins' show.

Some dolphin facts. The trainers train the dolphins 6 times a day (total time spent a day is 2 hours). After 5 years of training the dolphin is able to do the tricks in typical SeaWorld show, but the trainings continue lifelong. Dolphins live around 40 years, but the trainer I talked to mentioned that he worked with some 50 year old dolphins before. The training is done mostly with positive reinforcements. If dolphins misbehave during training, the trainer stares at the dolphin's eyes and stays expressionless for 3-4 seconds. Dolphins are good at recognizing human body language (very much like dogs), and the trainers also learn to recognize dolphins' body language. Dolphins are smart, but the trainer I talked to thinks sea lions are even smarter: they can learn how to open a gate just by watching a human do it.

Sunday, September 18, 2011

Tenure advice to new faculty

A month ago, I participated at a panel as part of the new faculty orientation program of the SUNY Buffalo. There were 5 recently tenured faculty in the panel. The idea was that we would convey our tenure advice to the new faculty. I was the only one from the Engineering School, the others were mostly from Social sciences. We had 80 minutes for 5 speakers, so I expected to get 15 minutes. But, the panel was rather informal and the speaking times were not enforced. So, I got to speak as the 4th speaker for 5 minutes only. The other speakers had come with 3-4 printed pages to talk, I had keynote prepared keynote slides (which I ended up not showing with a projector). Since I feel I didn't get enough mileage out of these slides, I am sharing them here.

The panel also exposed me to some cultural differences among Social and Engineering disciplines. I didn't find their advice actionable, and probably they found my advice blunt. I won't discuss more. I am an engineer and I am proud.

Friday, September 9, 2011

Rapid prototyping FTW

Very interesting talk. It is worth your 6 minutes. Sorry, Dijkstra approach, you lose.

Thursday, June 23, 2011

I must be a sellout. My research is hot

Almost all of my students left for internships this summer. Three of my students, who are working on crowdsourcing and smartphone apps, have been grabbed by Bosch Research Pittsburgh (they got two of them) and IBM Research New York. Some of the projects these students have worked on with me include "using Twitter as a middleware for crowdsourced sensing", "detecting breakpoints in public opinion using Twitter", "crowdsourcing queue lengths at campus cafes with a smartphone campus app", "crowdsourcing location-based queries". Another student working on cloud computing have been employed by VMware (after lucking out with Google since he started interviewing late). His research is on developing user-level wide-area-network filesystems. Finally one brave soul decided he would not apply for internships and stay with me this summer to concentrate on his research (last summer, he had been on an internship with Bosch Research Paolo Alto).

There are alternative explanations for how popular my students got with the companies this summer. I like to attribute this to how hot our research is :-) Of course it could be that my students give very good interviews (I actually don't think this is the case, my students need a lot more improvement in that department). Or, we are in an IT bubble again.

Sunday, May 29, 2011

Online migration for geodistributed storage systems, Usenix ATC 2011

This paper investigates the problem of migrating data between data centers. Data needs to be moved from one center to another based on the access patterns, for example, the user may have moved from East to West coast. The problem is complicated by the large size of data that needs to be moved, the requirement to perform the migration online without blocking access to any part of the data from anywhere, and finally that the data can be accessed/modified concurrently in different locations.

To address this problem, the paper proposes an overlay abstraction. The goal of the abstraction is to implement migration as a service, so that the developer does not have to deal with the race conditions that may result while migrating data in ad hoc ways. The analogy of overlay is a sheet of transparencies. Remember the old days before powerpoint? The presenters used to print the slides on transparencies, and do animation by overlaying one transparency over another. The overlay idea is similar. "Where it is clear, the overlay reveals the contents underneath; where it is written, the overlay overrides those contents." Basically, the idea is to represent data as stacked layers in different places. This enables migration of data in smaller units, and the capability of having part of the data in one location and the other parts in other locations.

Overlay is implemented much like the (doubly) linked-list. Each overlay has two pointers, one pointing to the overlay below, and one pointing to the overlay above. Overlay insertion and deletion are similar to those one would expect from linked-list implementations. The overlay is designed such that every operation is linearized by the overlay structure even when the operations are submitted from any data center. Moreover, read and write operations can be executed concurrently with the overlay structure operations and with each other, at many clients without blocking.

To write data to an object the client first finds the highest overlay by following the above pointers starting from the based location. (Base location is learned from the directory service.) The data is written to this highest level overlay. To read an object, again the highest overlay is found as the first step. If the data to be read is not there, then below pointers are followed until the data is reached.

The contribution of the paper is the abstraction that nicely separates policy level from the concurrency-safe execution of the actual migration operations. The paper presents several optimizations and more use-cases for the overlay structure (such as exploiting in-built replication for migration, multiway caching, and split overlays).

Friday, May 6, 2011

Refuse to Crash with Re-FUSE

This paper appeared in Eurosys'10. This is a well written paper: the paper holds your hand, and takes you for a walk in the park. At each step of the path, you can easily predict what is coming next. I like this kind of easy-reading papers, compared to the cryptic or ambiguous papers which make you wander around or try to guess which paths to take through a jungle of junctions.

The goal of this work is to provide support for restartable user-level filesystems. But, before I can tell you more about that, we first need to discuss user-filesystems. User-filesystems provides a way to add custom features (such as encryption, deduplication, access to databases, access to Amazon S3, etc.) on top of existing kernel-level filesystems. FUSE is a popular software that facilitates building user-filesystems on top of kernel-level filesystems. FUSE is available for Linux, FreeBSD, NetBSD, and MacOSX, and more than 200 userfilesystems have already been implemented using FUSE. GlusterFS, HDFS, ZFS are some of the well-known user-level filesystems implemented on top of FUSE.

FUSE works by wrapping the virtual filesystem (VFS) layer in UNIX systems at both sides. FUSE has a kernel file-system module (KFM) below the VFS layer that acts as a pseudo filesystem and queues application requests that arrive through the VFS layer. FUSE also has a libfuse module that exports a simplified filesystem interface between the user-level filesystem and the KFM.

Re-FUSE modifies FUSE to enable support for transparent restartability of the user-filesystem. The fault-model considered is transient fail-stop failures of the user-filesystem. Re-FUSE is based on three basic principles: request-tagging, system-call logging, and non-interruptible system calls. After a crash of the user-filesystem, Re-FUSE does not attempt to roll it back to a consistent state, but rather continues forward from the inconsistent state towards a new consistent state. Re-FUSE does so by enabling partially-completed requests to continue executing from where they were stopped at the time of crash.

Re-FUSE is tested with 3 popular representative user-filesystems implemented on top of FUSE. For testing robustness fault-injection (both controlled and random) is used; Re-FUSE enables the user-filesystem to mask failure and carry-on uninterrupted after a crash. Re-FUSE at most around 2-3% overhead in the normal operation, and recovers the filesystem in 10-100ms after a crash.

Sabbatical help

My tenure at University at Bufffalo, SUNY has just become official after the President signed on it. Having completed my 6 years at UB, I am now eligible to spend a sabbatical year. I am planning to spend 6 months of my sabbatical in the industry/research-lab working on cloud computing challenges. If you have any suggestions/connections to make this happen, please send me an email.

Why are algorithms not scalable?

Recently, a colleague emailed me the following:
Since you have been reading so much about clouds, CAP, and presumably lots of consensus things, you can answer better the question of algorithm scalability. How scalable are the popular algorithms? Can they do a reasonable job of consensus with 100,000 processes? Is this even a reasonable question? What are the fundamental problems, the algorithms or the lower level communication issues?
These are actually the right kind of questions to ask probing for deeper CS concepts. Here are my preliminary answers to these questions.

From what I read, consensus with 100K processes is really out of question. Paxos consensus was deployed on 5 nodes for GFS and similar systems: Zookeper, Megastore, etc. As another example Sinfonia's participant nodes in a transaction is also around limited to 5-10.

So what is wrong with algorithms, why are they unscalable? I guess one obstacle against scalability is the "online" processing requirement, and most algorithms are inherently limited because of that. When you accept offline processing, as in mapreduce tasks, then you can afford more scalability.

I think a more fundamental problem for scalability is the synchronization requirements in these algorithms. Synchronization is a deal breaker for scalability. The Ladis'08 summary has a very nice discussion related to this (see pages 4-5).

Our traditional complexity measures are for time complexity and message complexity. Time complexity is not much relevant for scalability, but message complexity may give some indication of synchronization complexity. A node sending a message to all other nodes is very bad for scalability (which is the case for both Paxos and Sinfonia). Workflows on the other hand have very little message complexity. They store data to disk (rather than requiring synchronizing several processes with messages), and any process may later visit this data if they need it. This decouples data from computation and enables more processing to be added as needed for scalability. Workflows tend to be more data-centric whereas algorithms tend to be more computation-centric. Here is a brief discussion of the workflow idea.

How does one formally define "synchronization point" that is not specific to a particular system/algorithm? I don't know of a good answer, but I think it is closer to a snapshot. Bloom also identifies synchronization points as culprits, and defines synchronization points as operations that are classified as non-monotonous in the Bloom declarative language.

Tuesday, May 3, 2011

On designing and deploying Internet scale services

This 2008 paper presents hard-earned lessons from Jim Hamilton's experience over the last 20 years in high-scale data-centric software systems and internet-scale services. I liken this paper to "Elements of Style" for the domain of Internet scale services. Like the "Elements of Style" this paper is also not to be consumed at once, it is to be visited again and again every so often.

There are three main overarching principles: expect failures, keep things simple, automate everything. We will see reflections of these three principles in several subareas pertaining to Internet-scale services below.

Overall Application Design
Low-cost administration correlates highly with how closely the development, test, and operations teams work together. Some of the operations-friendly basics that have the biggest impact on overall service design are as follows.

/Design for failure/ Armando Fox had argued that the best way to test the failure path is never to shut the service down normally, just hard-fail it. This sounds counter-intuitive, but if the failure paths aren’t frequently used, they won't work when needed. The acid test for fault-tolerance is the following: is the operations team willing and able to bring down any server in the service at any time without draining the work load first? (Chaos Monkey anyone?)

/Use commodity hardware slice/ This is less expensive, scales better for performance and power-efficiency, and provides better failure granularity. For example, storage-light servers will be dual socket, 2- to 4-core systems in the $1,000 to $2,500 range with a boot disk.

Automatic Management and Provisioning
/Provide automatic provisioning and installation./
/Deliver configuration and code as a unit./
/Recover at the service level/ Handle failures and correct errors at the service level where the full execution context is available rather than in lower software levels. For example, build redundancy into the service rather than depending upon recovery at the lower software layer."

I would amend the above paragraph by saying "at the lowest possible service level where the execution context is available". Building fault-tolerance from bottom up is cheaper and more reusable. Doing it only at the service level is more expensive and not reusable. Building fault-tolerance at the service level is also conflicting with the principle they cite "Do not build the same functionality in multiple components".

Dependency Management
As a general rule, dependence on small components or services doesn't save enough to justify the complexity of managing them and should be avoided. Only depend on systems that are single, shared instance when multi-instancing to avoid dependency isn't an option. When dependency is inevitable as above, manage them as follows:
/Expect latency/ Don't let delays in one component or service cause delays in completely unrelated areas. Ensure all interactions have appropriate timeouts to avoid tying up resources for protracted periods.
/Isolate failures/ The architecture of the site must prevent cascading failures. Always "fail fast". When dependent services fail, mark them as down and stop using them to prevent threads from being tied up waiting on failed components.
/Implement inter-service monitoring and alerting/

Release Cycle and Testing
Take a new service release through standard unit, functional, and production test lab testing and then go into limited production as the final test phase. Rather than deploying as quickly as possible, it is better to put one system in production for a few days in a single data center, two data centers and eventually deploy globally. Big-bang deployments are very dangerous.
/Ship often and in small increments/
/Use production data to find problems/
/Support version roll-back/

Operations and Capacity Planning
Automate the procedure to move state off the damaged systems. Relying on operations to update SQL tables by hand or to move data using ad hoc techniques is courting disaster. Mistakes get made in the heat of battle. If testing in production is too risky, the script isn't ready or safe for use in an emergency.
/Make the development team responsible./ You built it, you manage it.
/Soft delete only./ Never delete anything. Just mark it deleted.
/Track resource allocation./
/Make one change at a time./
/Make everything configurable./ Even if there is no good reason why a value will need to change in production, make it changeable as long as it is easy to do.

Auditing, Monitoring, and Alerting
/Instrument everything/
/Data is the most valuable asset/
/Expose health information for monitoring/
/Track all fault tolerance mechanisms/ Fault tolerance mechanisms hide failures. Track every time a retry happens, or a piece of data is copied from one place to another, or a machine is rebooted or a service restarted. Know when fault tolerance is hiding little failures so they can be tracked down before they become big failures. Once they had a 2000-machine service fall slowly to only 400 available over the period of a few days without it being noticed initially.

Graceful Degradation and Admission Control
/Support a "big red switch."/ The concept of a big red switch is to keep the vital processing progressing while shedding or delaying some noncritical workload in an emergency.
/Control admission./ If the current load cannot be processed on the system, bringing more work load into the system just assures that a larger cross section of the user base is going to get a bad experience.

Tuesday, April 19, 2011

Centrifuge: Integrated Lease Management and Partitioning for Cloud Services

For performance reasons many large-scale sites (LinkedIn, Digg, Facebook, etc.) employ a pool of backend servers that operate purely on in-memory state. The frontend servers that forward the requests to the backend servers then should be very carefully implemented to ensure that they forward the requests to the correct backends. The frontend should ensure that the selected backend has the requested object in its memory and also possesses the lease to update the object. Unfortunately, things may change quickly at the backend; backend servers may fail, new ones may join, backend servers may start to cache different objects due to load-balancing requirements, and leases may exchange hands. All of these make the task of programming the frontend very challenging and frustrating.

This work (from NSDI'10) proposes Centrifuge, a datacenter lease manager that addresses this problem. The Centrifuge system has been deployed as part of Microsoft live mesh service, a large scale commercial cloud service that finds use for file sharing, notifications of activity on shared files, virtual desktop, etc.

There are 3 parts to the Centrifuge. The frontend servers use lookup library to learn about the leases and partitioning from the manager and accordingly forward the requests to the backends. The manager is centralized (and Paxos replicated for fault-tolerance) and decides on leases and partitioning for the objects. The backend servers use owner library to coordinate with the manager about the leases and partitioning.

The manager consists of one leader and two standbys. A Paxos group is also used as a persistent/consistent state storage and as the leader elector. The leader makes all the decisions for partitioning and leases. If the leader dies, one of the standbys become a leader by contacting the Paxos group, and then learn the state of leases and partitioning from the Paxos group to start serving as the new leader.

The leader partitions the key space to 64 ranges using consistent hashing and handles the leases. The leader performs load-balancing by rearranging and reassigning these lease ranges to the backends while accounting for lightly/heavily used ranges and failed backends. In contrast to the traditional model where backends would request for the leases, in Centrifuge manager assigns leases to the backends unilaterally and this simplifies a lot of things such as enabling the assignment of leases as ranges rather than per object basis.

The lookup library at the frontend maintains a complete copy of the lease table (200KB constant since leases are per range not per object basis). The lookup returns "hints", these are checked at the backend owner library again. If the lookup table was wrong, the backend informs the corresponding frontend, and this triggers the lookup library to get the new table from the manager. Otherwise, the lookup table is renewed by contacting the manager every 30 seconds. Since leases are granted by the manager to the backends for 60 second periods (and most likely are renewed by the backend), the 30 second period for lookup table renewal is reasonable.

The paper provides extensive performance evaluations both from the MS live mesh system and from controlled examples. The Centrifuge system would come as handy for many cloud deployments.

Tuesday, April 12, 2011

Smoke and Mirrors: Reflecting Files at a Geographically Remote Location Without Loss of Performance

This paper from FAST'09 introduces smoke and mirrors filesystem (SMFS) which mirrors files at a geographically remote datacenter with negligible impact on performance. It turns out remote mirroring is a major problem for banking systems which keep off-site mirrors (employing dedicated high-speed 10-40Gbits optical links) to survive disasters.

This paper is about disaster tolerance, not your everyday fault-tolerance. The fault model includes that the primary site may get destroyed, and some sporadic packet losses upto 1% may occur simultaneously as well, yet still no data should be lost. (Data is said to be lost if the client is acknowledged for the update but the corresponding update/data no longer exists in the system.) The primary site being destroyed may be a bit over-dramatization. An equivalent way to state the fault model would be that the paper just rules out a post~hoc correction (replay or manual correction). Here is how manual correction would work: if power outage occurs and the system drops some requests, and the mirror is inconsistent, then when we get the primary up again, we can restore the lost requests from the primary and make the mirror eventually-consistent. The paper rules that out, and insists that the system doesn't lose any data ever.

Existing mirroring techniques
Here are the existing mirroring techniques in use:

Synchronous mirroring only sends acknowledgments to the client after receiving a response from the mirror. Data cannot be lost unless both primary and mirror sites fail. This is the most dependable solution, but performance suffers because of wide-area oneway link latencies of upto 50ms.

Semi-synchronous mirroring sends acknowledgments to the client after data written is locally stored at the primary site and an update is sent to the mirror. This scheme does not lose data unless the primary site fails and sent packets drop on the way to the mirror.

Asynchronous mirroring sends acknowledgments to the client immediately after data is written locally. This is the solution that provides the best performance, but it is also the least dependable solution. Data loss can occur even if just the primary site fails.

Proposed network-sync mirroring
Clearly, semi-synchronous mirroring strikes a good balance between reliability and performance. The proposed approach in SMFS is actually a small improvement on the semi-synchronous mirroring. The basic idea is to ensure that once a packet has been sent, the likelihood that it will be lost is as low as possible. They do this by sending forward error correction (FEC) data along with the packet and informing the sending application when FEC has been sent along with the data. (An example of FEC is using Reed-Solomon error correction.) They call this technique "network-sync mirroring".

This idea is simple and straightforward, but this work provides a very good execution of the idea. SMFS employs previous work of the authors, Maelstrom (NSDI'08), to provide FEC for wide-area-network transmission. SMFS implements a filesystem that preserves the order of of operations in the structure of the filesystem itself, a log-structured filesystem. The paper also presents several real-world experiments to evaluate the performance of SMFS as well as its disaster tolerance. Here are two graphs from the evaluation section.

Consistency analysis in Bloom: a CALM and collected approach

This work from CIDR11 aims to provide theoretical foundations for correctness under eventual consistency, and identifies "order independence" (independence of program execution from temporal nondeterminism) as a sufficient condition for eventual consistency.

CALM (consistency and logical monotonicity) principle states that monotonic programs guarantee order independence, and hence, eventual consistency. A monotonic program is one where any true statement remains to be true as new axioms arrive. In contrast, in a non-monotonic program, new input can cause the earlier output to be revoked. A monotonic program guarantees eventual consistency, whereas non-monotonicity requires the use of coordination logic.

To simplify the task of identifying monotonic and nonmonotonic program segments, this work proposes using program analysis in a declarative language, Bloom. In Bloom, monotonicity of a program is examined via simple syntactic checks. Selection, join, and projection operators are monotonic, whereas aggregation and negation operators are nonmonotonic and are flagged as "points of order" in the program. Coordination logic needs to be introduced at these points of order to achieve consistency.

Bloom is implemented in Ruby as a domain specific language, called Bud. The paper presents two case studies, a key-value store and a shopping cart implementation, to demonstrate the above concepts. Bloom's alpha release was out a couple days ago. Congratulations to the Bloom team! I am sure they will receive useful feedback as the language gets used for building things.

Our related work on CALM principle
I tried to think of an analogue to the monotonicity property in the context of guarded-command languages. I think, monotonicity property corresponds to the guards being "stable" (closed under program actions) in a guarded-command program. If all guards of a program are stable, then the program is monotonic. For a guard that refers to the state at other processes, normally we would need synchronization and atomicity to evaluate the guard and execute the statement at the same step. But for actions with stable guards, we don't need that; we can evaluate the guard at one step and execute the statement at a later step with several other actions from other processes executing in between without need for synchronization.

We had in fact noticed this as a nice property a year ago, and published a paper on this with some generalizations of the stable property: Slow is Fast for Wireless Sensor Networks in the presence of Message Losses

This work on guarded-command languages can provide an imperative alternative to declarative languages for realizing the CALM principle. Declarative programs are hard to master for many developers (count me here) and may be difficult[different] to test and debug. Imperative programs have an advantage in this regard.

Concluding remarks
I think this is a promising direction to pursue. As Barbara Liskov mentioned in her ACM Turing Award Lecture (SOSP'09), "The Power of Abstraction", the next breakthrough for distributed computing will most likely be led by novel programming languages/abstractions.

I guess the next interesting question for this work is: What are the rules of thumbs for writing programs with less synchronization points?

Exercise questions
Are map, reduce primitives in Bloom monotonic? What does this imply for map-reduce chain programs?

Can you reconstruct any of the analysis for the provided programs?

Sunday, April 10, 2011

Life beyond Distributed Transactions: an Apostate's Opinion

Pat Helland is one of the veterans of the database community. He worked on the Tandem Computers with Jim Gray. His tribute to Jim Gray, which gives a lot of insights into Jim Gray as a researcher, is worth reading again and again.

This 2007 position paper from Pat Helland is about extreme scalability in cloud systems, and by its nature anti-transactional. Since Pat has been a strong advocate for transactions and global serializability for most of his career, the title is aptly named as an apostate's opinion.

This paper is very relevant to the NoSQL movement. Pat introduces "entity and activities" abstractions as building primitives for extreme scalability cloud systems. He also talks about at length about the need to craft a good workflow/business-logic on top of these primitives.

Entity and activities abstractions
Entities are collections of named (keyed) data which may be atomically updated within the entity but never atomically updated across entities. An entity lives on a single machine at a time and the application can only manipulate one entity atomically. A consequence of almost-infinite scaling is that this programmatic abstraction must be exposed to the developer of business logic. Each entity has a unique ID, and entities represent disjoint sets of data.

Since you can’t update the data across two entities in the same transaction, you need a mechanism to update the data in different transactions. The connection between the entities is via a message addressed to the other entity.

Activities comprise the collection of state within the entities used to manage messaging relationships with a single partner entity. Activities keep track of messages between entities. This can be used to keep entities eventually-consistent, even when we are limited to do the transaction on a single entity. (Messaging notifies the other entity about this activity, and the other entity may update its state.)

Key-value tuple concept widely employed in key-value stores in cloud computing systems is a good example of an entity. However, key-value tuples do not specify any explicit "activities". Note that, if we can manage to make messages between entities idempotent, then we don't need to keep activities for entities; hence entity+activities concept reduces to the key-value tuple concept.

In fact several developers invented on their own different ad~hoc ways of implementing activities on top entities. What Pat is advocating is to explicitly recognize activities and develop a standard primitive for implementing them to avoid inconsistency bugs.

An example of an activity is found in Google's Percolator paper which replaced MapReduce for creating Google's pagerank index. Percolator provides a distributed transaction middleware leveraging on BigTable. Each row is an entity as a transaction is atomic with respect to a row at any time. However, to build a distributed transaction, the system should remember the state of the transaction with respect to other involved rows, i.e., "activities". This Percolator metadata is again encoded as a separate field in that row in BigTable. Percolator logs the state, for example, primary and secondary locks in these fields. (See Figure 5 for full list.) I guess using coordination services such as Zookeper is also another way of implicitly implementing activities.

Workflow is for dealing with uncertainty at a distance
In a system which cannot count on atomic distributed transactions, the management of uncertainty must be implemented in the business logic. The uncertainty of the outcome is held in the business semantics rather than in the record lock. This is simply workflow. Think about the style of interactions common across businesses. Contracts between businesses include time commitments, cancellation clauses, reserved resources, and much more. The semantics of uncertainty is wrapped up in the behaviour of the business functionality. While more complicated to implement than simply using atomic distributed transactions, it is how the real world works. Again, this is simply an argument for workflow but it is fine-grained workflow with entities as the participants.

Concluding remarks
Systematic support for implementing the activities concept is still lacking today. It seems like this concept needs to address more explicitly and more methodically to improve the NoSQL systems.

Workflow is also the prescribed as the way to deal with the lack of atomic distributed transactions. Workflow requires the developer to think hard and decide on the business logic for dealing with the decentralized nature of the process: time commitments, cancellation clauses, reserved resources, etc. But, are there any support for developing/testing/verifying workflows?

Saturday, April 9, 2011

Tempest and Deutronomy reviews

I am 3 weeks behind in writing summaries for the papers we discuss in my seminar. In order to have some chance of catching up, I am skipping the summaries for Tempest and Deutronomy papers, and refer to student summaries for these two.

Tempest: Soft state replication in the service tier (DSN'08)

Deuteronomy: Transaction Support for Cloud Data (CIDR'11)

Thursday, March 10, 2011

Megastore: Providing scalable, highly available storage for interactive services

Google's Megastore is the structured data store supporting the Google Application Engine. Megastore handles more than 3 billion write and 20 billion read transactions daily and stores a petabyte of primary data across many global datacenters. Megastore tries to provide the convenience of using traditional RDBMS with the scalability of NOSQL: It is a scalable transactional indexed record manager (built on top of BigTable), providing full ACID semantics within partitions but lower consistency guarantees across partitions (aka, entity groups in Figure 1). To achieve these strict consistency requirements, Megastore employs a Paxos-based algorithm for synchronous replication across geographically distributed datacenters.

I have some problems with Megastore, but I save them to the end of the review to explain Megastore first.

Megastore uses Paxos, a proven, optimal, fault-tolerant consensus algorithm with no requirement for a distinguished master. (Paxos is hard to cover in a blog post, as I mentioned earlier, so I will not attempt to.) The reason Paxos got much more popular than other consensus protocols (such as 2-phase and 3-phase commit) is that Paxos satisfies safety properties of consensus even under asynchrony and arbitrary message loss. This does not conflict with the coordinated attack and FLP impossibility results. Those impossibility results said that you can't achieve consensus (both safety and liveness at the same time), they did not say you have to sacrifice safety under message-losses or asynchrony conditions. So Paxos preserves safety under all conditions and achieves liveness when conditions improve outside the impossibility realm (less message losses, some timing assumptions start to hold).

Basic Paxos is a 2-phase protocol. In the prepare phase the leader replica tries to get the other nonleader replicas recognize it as the leader for that consensus instance. In the accept phase the leader tries to get the nonleader replicas accept the vote it proposes. So basic Paxos requires at least two round trips, and that is very inefficient for WAN usage. Fortunately, there has been several Paxos variants to optimize the performance. One optimization is MultiPaxos, which permits single-roundtrip writes by basically piggybacking the prepare phase of the upcoming consensus instance onto the accept phase of the current consensus instance.

Another optimization is for optimizing the cost of reads. In basic Paxos, a read operation also needs to go through the two phase protocol involving all the replicas (or at least a majority of them) to be serialized and served. The read optimization enables serving reads locally but only at the leader replica. When a nonleader replica gets a read request, it has to forward it to the leader to be served locally there. The read optimization was made possible by having the leader impose a lease on being a leader at other replicas (during which the replicas cannot accept another leader's prepare phase). Thanks to the lease the leader is guaranteed to be the leader until the lease expires, and is guaranteed to have the most up-to-date view of the system and can serve the read locally. The nice thing about the MultiPaxos and local-read-at-the-leader optimizations are that they did not modify any guarantees of Paxos; safety is preserved under all conditions, and progress is satisfied when the conditions are sufficient for making progress.

Megastore's use of PaxosMegastore uses Paxos (with the MultiPaxos extension) in a pretty standard way to replicate a write-ahead log over a group of symmetric peers. Megastore runs an independent instance of the Paxos algorithm for each log position. The leader for each log position is a distinguished replica chosen alongside the preceding log position's consensus value. (This is the MultiPaxos optimization I discussed above.) The leader arbitrates which value may use proposal number zero. The first writer to submit a value to the leader wins the right to ask all replicas to accept that value as proposal number zero. All other writers must fall back on two-phase Paxos. Since a writer must communicate with the leader before submitting the value to other replicas, the system minimizes writer-leader latency. The policy for selecting the next write's leader is designed around the observation that most applications submit writes from the same region repeatedly. This leads to a simple but effective heuristic: use the closest replica.

However, in addition to the straightforward MultiPaxos optimization above, Megastore also introduces a surprising new extension to allow local reads at any up-to-date replica. This came as a big surprise to me because the best anyone could do before was to allow local-reads-at-the-leader. What was it that we were missing? I didn't get how this was possible the first time I read the paper; I only got it in my second look at the paper.

Coordinator, the rabbit pulled out of the hatMegastore uses a service called the Coordinator, with servers in each replica's datacenter. A coordinator server tracks a set of entity groups (i.e., partitions mentioned in the first paragraph) for which its replica has observed all Paxos writes. For entity groups in that set, the replica is deemed to have sufficient state to serve local reads. If the coordinator claims that it is up to date, then the corresponding replica can serve a read for that entity group locally, else the other replicas (and a couple network roundtrips) need to be involved.
But how does the coordinator know whether it is up to date or not? The paper states that it is the responsibility of the write algorithm to keep coordinator state conservative. If a write fails on a replica's Bigtable, it cannot be considered committed until the group's key has been evicted from that replica's coordinator. What does this mean? This means that write operations are penalized to improve the performance of read operations. In MegastorePaxos, before a write is considered committed and ready to apply, all full replicas must have accepted or had their coordinator invalidated for that entity group. In contrast, in Paxos a write could be committed with only a majority of replicas accepting the write.

Performance problemsUsing synchronous replication over WAN of course takes its toll on the performance. This has been noticed and discussed here.

Of course, there is also the performance degradation due to waiting for an acknowledgement (or time out) from all replicas for a write operation. This also leads to a write availability problem. The paper tries to defend that this is not a big problem in practice as follows, but it is evident that partitions/failures result in write unavailability until they are recovered from.

"In the write algorithm above, each full replica must either accept or have its coordinator invalidated, so it might appear that any single replica failure (Bigtable and coordinator) will cause unavailability. In practice this is not a common problem. The coordinator is a simple process with no external dependencies and no persistent storage, so it tends to be much more stable than a Bigtable server. Nevertheless, network and host failures can still make the coordinator unavailable.

This algorithm risks a brief (tens of seconds) write outage when a datacenter containing live coordinators suddenly becomes unavailable--all writers must wait for the coordinator's Chubby locks to expire before writes can complete (much like waiting for a master failover to trigger). Unlike after a master failover, reads and writes can proceed smoothly while the coordinator's state is reconstructed. This brief and rare outage risk is more than justified by the steady state of fast local reads it allows."

In the abstract, the paper had claimed Megastore achieves both consistency and availability, and this was a red flag for me, as we all know that something has to give due to CAP theorem. And above we have seen that write availability suffers in the presence of a partition.

Exercise question
Megastore has a limit of "a few writes per second per entity group" because higher write rates will cause even worse performance due to the conflicts and retries of the multiple leaders (aka dueling leaders). Is it possible to adopt the partitioning consensus sequence numbers technique in "Mencius: building efficient replicated state machines for Wide-Area-Networks (WANs)" to alleviate this problem?

Flexible, Wide-Area Storage for Distributed Systems with WheelFS

One of my students, Serafettin Tasci, wrote a good review of this paper, so I will save time by using his review below, instead of writing a review myself.

In this paper the authors propose a storage system for wide-area distributed systems called WheelFS. The main contribution of WheelFS is its ability of adaptation to different types of applications with different consistency, replica placement or failure handling requirements. This ability is obtained via semantic cues that can be easily expressed in path names. For example to force the primary site of a folder john to be X, we can specify the cue “home/users/.Site=X/john”. This representation enables preserving of POSIX semantics and minor change in application software to use the cues.

In WheelFS, there are 4 groups of semantic cues. Placement cues are used to arrange the location of primaries and replicas of a file or folder. Durability cues specify the number and usage of replicas. Consistency cues maintain a tradeoff between consistency and availability via timeout limits and eventual consistency. And finally, large read cues are useful in reading large files faster via entire file prefetching and usage of neighbor client caches.

WheelFS consists of clients and servers. Clients run applications that use WheelFS and uses FUSE to present the distributed file system to these applications. In addition, all clients have local caches. Servers keep file and directory objects in storage devices. They group objects into structures called slices.

A third component of WheelFS is the configuration service which keeps slice tables that contain object-server assignments. Each entry in the slice table contains a replication policy and replicas for a slice. Configuration service is replicated on a small set of servers and uses Paxos for master election. It also provides a locking interface to servers by which the usage of slices and slice table by servers is coordinated.

When a new file is created, a replication policy is used via the cues and then it contacts the configuration service to see if a slice in the table matches the policy. If no slice matches the policy, the request is forwarded to a random server matching that policy. In addition, WheelFS uses write-local policy in which the primary of a newly created file is the local server by default. This policy enables faster writes.

For replication, WheelFS uses primary/backup replication. For each slice there is a primary server and some backup servers. However, this scheme causes two problems: Firstly, since all operations pass through the primary and updates require the primary to wait ACKs from all backups; this replication scheme may cause significant delays in wide-area systems. Secondly, if the .SyncLevel cue is used the replicas may get some of the updates too late. So, if the primary dies, a backup which replaces the primary may miss some updates and needs to learn the missing updates from other backups.

By default, WheelFS uses close-to-open consistency. But in case of a primary failure, all operations will have to delay waiting for the new primary to start. To avoid this delay, WheelFS provides .EventualConsistency cue that can be used whenever consistency requirements are not strict. In addition, WheelFS uses a write-through cache that improves consistency by writing the copies of each updates in cache to disk with the cost of increased latency.

When clients need to use the data in their cache, they need to get an object lease from the primary to preserve consistency. But this also brings additional latency cost since the primary needs to wait for all leases to complete to make an update on the object.

In the experiments, they present a number of applications that can be built on top of WheelFS such as distributed web cache, email service and file distribution service. Distributed web cache application shows that it provides a comparable performance to popular systems such as CoralCDN. In addition, in case of failures, if eventual consistency is used, it provides consistently high throughput. In file distribution experiment, they revealed that with the help of locality provided via large read cues, it achieves faster file distribution than BitTorrent. Finally, comparison of WheelFS to NFSv4 shows that it is more scalable thanks to the distributed caching mechanism.

Thursday, March 3, 2011

Ceph: A Scalable, High-Performance Distributed File System

Traditional client/server filesystems (NFS, AFS) have suffered from scalability problems due to their inherent centralization. In order to improve performance, modern filesystems have taken more decentralized approaches. These systems replaced dumb disks with intelligent object storage devices (OSDs)--which include CPU, NIC and cache-- and delegated low-level block allocation decisions to these OSDs. In these systems, clients typically interact with a metadata server (MDS) to perform metadata operations (open, rename), while communicating directly with OSDs to perform file I/O (reads and writes). This separation of roles improve overall scalability significantly. Yet, these systems still face scalability limitations due to little or no distribution of the metadata workload.

Ceph is a distributed file system designed to address this issue. Ceph decouples data and metadata operations completely by eliminating file allocation tables and replacing them with generating functions (called CRUSH). This allows Ceph to leverage the intelligence present in OSDs and delegate to OSDs not only data access but also update serialization, replication, failure detection, and recovery tasks as well. The second contribution of Ceph is to employ an adaptive distributed metadata cluster architecture to vastly improve the scalability of metadata access.
Ceph has three main components: 1) the client, each instance of which exposes a near-POSIX file system interface to a host or process; 2) a cluster of OSDs, which collectively stores all data and metadata; and 3) a metadata server cluster, which manages the namespace (file names and directories), consistency, and coherence. In the rest of this review, we describe these three components in more detail.

The client code runs entirely in user space and can be accessed either by linking to it directly or as a mounted file system via FUSE (a user-space file system interface).

File I/O
An MDS traverses the filesystem hierarchy to translate the file name into the file inode. A file consists of several objects, and Ceph generalizes a range of striping strategies to map file data onto a sequence of objects (see my xFS review for an explanation of striping). To avoid any need for file allocation metadata, object names simply combine the file inode number and the stripe number. Object replicas are then assigned to OSDs using CRUSH, a globally known mapping function (we will discuss this in the next section on OSD clusters).

Client synchronization
POSIX semantics require that reads reflect any data previously written, and that writes are atomic. When a file is opened by multiple clients with either multiple writers or a mix of readers and writers, the MDS will revoke any previously issued read caching and write buffering capabilities, forcing client I/O for that file to be synchronous. That is, each application read or write operation will block until it is acknowledged by the OSD, effectively placing the burden of update serialization and synchronization with the OSD storing each object. Since synchronous I/O is a performance killer, Ceph provides a more relaxed option that sacrifices consistency guarantees. With O_LAZY flag, performance-conscious applications which manage their own consistency (e.g., by writing to different parts of the same file, a common pattern in HPC workloads) are then allowed to buffer writes or cache reads.

Ceph delegates the responsibility for data migration, replication, failure detection, and failure recovery to the cluster of OSDs that store the data, while at a high level, OSDs collectively provide a single logical object store to clients and metadata servers. To this end, Ceph introduces the Reliable Autonomic Distributed Object Store (RADOS) system, which achieves linear scaling to tens or hundreds of thousands of OSDs. Each Ceph OSD, in this system, manages its local object storage with EBOFS, an Extent and B-tree based Object File System. We describe the features of RADOS next.

Data distribution with CRUSH
In Ceph, file data is striped onto predictably named objects, while a special-purpose data distribution function called CRUSH assigns objects to storage devices. This allows any party to calculate (rather than look up) the name and location of objects comprising a file's contents, eliminating the need to maintain and distribute object lists. CRUSH works as follows. Ceph first maps objects into placement groups (PGs). Placement groups are then assigned to OSDs using CRUSH (Controlled Replication Under Scalable Hashing), a pseudo-random data distribution function that efficiently maps each PG to an ordered list of OSDs upon which to store object replicas. To locate any object, CRUSH requires only the placement group and an OSD cluster map: a compact, hierarchical description of the devices comprising the storage cluster. As such any party (client, OSD, or MDS) can independently calculate the location of any object without any exchange of distribution-related metadata.

CRUSH resembles consistent hashing. While consistent hashing would use a flat server list to hash onto, CRUSH utilizes a server node hierarchy (shelves, racks, rows) instead and enables the user to specify policies such as "Put replicas onto different shelves than the primary".

Data is replicated in terms of PGs, each of which is mapped to an ordered list of n OSDs (for n-way replication). Clients send all writes to the first non-failed OSD in an object's PG (the primary), which assigns a new version number for the object and PG and forwards the write to any additional replica OSDs. After each replica has applied the update and responded to the primary, the primary applies the update locally and the write is acknowledged to the client. Reads are directed at the primary. This approach also resembles the replication strategy of GFS.

The MDS cluster is diskless and MDSs just serve as an index to the OSD cluster for facilitating read and write. All metadata, as well as data, are stored at the OSD cluster. When there is an update to an MDS, such as a new file creation, MDS stores this update to the metadata at the OSD cluster. File and directory metadata in Ceph is very small, consisting almost entirely of directory entries (file names) and inodes (80 bytes). Unlike conventional file systems, no file allocation metadata is necessary--object names are constructed using the inode number, and distributed to OSDs using CRUSH. This simplifies the metadata workload and allows MDS to efficiently manage a very large working set of files, independent of file sizes.

Typically there would be around 5 MDSs in a 400 node OSD deployment. This looks like an overkill for just providing an indexing service to the OSD cluster, but actually is required for achieving very high-scalability. Effective metadata management is critical to overall system performance because file system metadata operations make up as much as half of typical file system workloads. Ceph also utilizes a novel adaptive metadata cluster architecture based on Dynamic Subtree Partitioning that adaptively and intelligently distributes responsibility for managing the file system directory hierarchy among the available MDSs in the MDS cluster, as illustrated in Figure 2. Every MDS response provides the client with updated information about the authority and any replication of the relevant inode and its ancestors, allowing clients to learn the metadata partition for the parts of the file system with which they interact.

Additional links
Ceph is licensed under the LGPL and is available at http://ceph.newdream.net/.
Checking out the competition

Exercise questions
1) How do you compare Ceph with GFS? XFS? GPFS?
2) It seems like the fault-tolerance discussion in Ceph assumes that OSDs are not network-partitioned. What can go wrong if this assumption is not satisfied?

Wednesday, March 2, 2011

Sinfonia: A New Paradigm for Building Scalable Distributed Systems

Sinfonia is an in-memory scalable service/infrastructure that aims to simplify the task of building scalable distributed systems. Sinfonia provides a lightweight "minitransaction" primitive that enables applications to atomically access and conditionally modify data at its multiple memory nodes. As the data model, Sinfonia provides a raw linear address space which is accessed directly by client libraries.

In traditional transactions, a coordinator executes a transaction by asking participants to perform one or more participant-actions (such as retrieving or modifying data items), and at the end of the transaction, the coordinator decides and executes a two-phase commit. In the first phase, the coordinator asks all participants if they are ready to commit. If they all vote yes, in the second phase the coordinator tells them to commit; otherwise the coordinator tells them to abort.

Sinfonia introduces the concept of minitransactions, by making the observation that under certain restrictions/conditions on the transactions, it is possible to optimize the execution of a transaction such that the entire transaction is piggybacked onto just the two-phase commit protocol at the end. For example if the transactions participant-actions do not affect the coordinator's decision to abort or commit then the coordinator can piggyback these actions onto the first phase of the two-phase commit. Taking this a step further, even if a participant-action affects the coordinator's decision to abort or commit, if the participant knows how the coordinator makes this decision, then we can also piggyback the action onto the commit protocol. For example, if the last action is a read and the participant knows that the coordinator will abort if the read returns zero (and will commit otherwise), then the coordinator can piggyback this action onto two-phase commit and the participant can read the item and adjust its vote to abort if the result is zero.

Sinfonia designed its minitransactions so that it is always possible to piggyback the entire transaction execution onto the commit protocol. A minitransaction (Figure 2) consists of a set of compare items, a set of read items, and a set of write items. Items are chosen before the minitransaction starts executing. Upon execution, a minitransaction does the following: (1) compare the locations in the compare items, if any, against the data in the compare items (equality comparison), (2) if all comparisons succeed, or if there are no compare items, return the locations in the read items and write to the locations in the write items, and (3) if some comparison fails, abort. Thus, the compare items control whether the minitransaction commits or aborts, while the read and write items determine what data the minitransaction returns and updates.
To ensure serializability, participants lock the locations accessed by a minitransaction during phase 1 of the commit protocol. Locks are only held until phase 2 of the protocol, a short time. To avoid deadlocks, a participant tries to acquire locks without blocking; if it fails, it releases all locks and votes "abort due to busy lock" upon which the coordinator aborts the minitransaction and retries later. Figure 4 shows the execution and committing of a minitransaction. As a further optimization, if a minitransaction has just one participant, it can be executed in one phase because its outcome depends only on that participant. This case is exactly how key-value stores operate.
Fault-tolerance mechanisms
To provide fault tolerance, Sinfonia uses four mechanisms: disk images, logging, replication, and backup. A disk image keeps a copy of the data at a memory node. For efficiency, the disk image is written asynchronously and so may be slightly out-of-date. To compensate for that, a log keeps recent data updates, and the log is written synchronously to ensure data durability. When a memory node recovers from a crash, it uses a recovery algorithm to replay the log to catch up to its state before the crash. To provide high availability, Sinfonia uses primary-backup approach to replicate memory nodes, so that if a memory node fails, a replica takes over without downtime.

Minitransaction recovery protocols
Recall that in standard two-phase commit, if the coordinator crashes, the system has to block until the coordinator recovers. However, that approach is not suitable for Sinfonia. Recall that participants run on Sinfonia memory nodes whereas coordinators run on application nodes; so coordinators are unstable and very failure-prone. Running a three-phase commit protocol is expensive, and Sinfonia takes a different approach to deal with this issue.

Sinfonia modifies things a little so that instead of blocking on coordinator crashes, Sinfonia blocks on participant crashes. This is reasonable for Sinfonia because participants are memory nodes that keep application data, so if they go down and the application needs to access data, the application has to block anyway. Furthermore, Sinfonia can optionally replicate participants (memory nodes), to reduce such blocking to a minimum. This modification to block on a participant crash, however, complicates the protocols for recovery as we discuss next.

If a coordinator crashes during a minitransaction, it may leave the minitransaction with an uncertain outcome: one in which not all participants have voted yet. To fix this problem, Sinfonia employs a recovery coordinator, which runs at a dedicated management node. The recovery scheme ensures the following: (a) it will not drive the system into an unrecoverable state if the recovery coordinator crashes or if there are memory node crashes during recovery; (b) it ensures correctness even if there is concurrent execution of recovery with the original coordinator (this might happen if recovery starts but the original coordinator is still running); and (c) it allows concurrent execution by multiple recovery coordinators (this might happen if recovery restarts but a previous recovery coordinator is still running).

Concluding remarks
Sinfonia seems to work as promised and simplify the development of scalable distributed systems. The minitransaction primitive is expressive enough to build sophisticated coordination/cooperative algorithms. The authors demonstrate Sinfonia by using it to build two applications: a cluster file system called SinfoniaFS and a group communication service called SinfoniaGCS. Using Sinfonia, the authors built these complex services easily with 3900 and 3500 lines of code, in one and two man-months, respectively. This is not an easy feat.

I, personally, am a big fan of transactions. Transactions really do simplify distributed system development a lot. And transactions does not need to be heavyweight, and Sinfonia shows that by reducing the power of transactions to minitransactions, lightweight transaction execution can be achieved. In my work on wireless sensor networks (WSNs), I had also proposed a similar transactional primitive, Transact, to simplify to development of coordination and cooperation protocols. In Transact, in order to provide a lightweight implementation of transaction processing, we had exploited the inherent atomicity and snooping properties of singlehop wireless broadcast communication in WSNs.

Exercise questions
Recently a reader suggested that I post exercises with each summary, similar to what textbooks do. I decided to give this a try. So here it goes.

1) If we use Sinfonia to build a key-value store (only providing atomic write to single key-value records), what is the overhead of Sinfonia? How would it compare with other popular key-value stores?

2) Is Sinfonia suitable for WAN access, multi-datacenter distribution?

PetaShare: A reliable, efficient and transparent distributed storage management system

This paper by my colleague Tevfik Kosar (to appear soon) presents the design and implementation of a reliable and efficient distributed data storage system, PetaShare, which manages 450 Terabytes of disk storage and spans 7 campuses across the state of Louisiana.

There are two main components in a distributed data management architecture: a data server which coordinates physical access (i.e. writing/reading data sets to/from disks) to the storage resources, and a metadata server which provides the global name space and metadata of the files. Metadata management is a challenging problem in widely distributed large-scale storage systems, and is the focus of this paper.

Petashare architectureThe back-end of PetaShare is based on iRODS. All system I/O calls made by an application are mapped to the relevant iRODS I/O calls. iRODS stores all the system information, as well as user-defined rules in centralized database, which is called iCAT. iCAT contains the information of the distributed storage resources, directories, files, accounts, metadata for files and system/user rules. iCAT is the metadata that we need to manage/distribute in PetaShare.

Multiple iRODS servers interact with the iCAT server to control the accesses to physical data in the resources. Of course, the centralized iCAT server is a single point of failure, and the entire system becomes unavailable when the iCAT server fails. As we discuss next, PetaShare employs asynchronous replication of iCAT to resolve this problem.

Asynchronous multi-master metadata replication
PetaShare first experimented with synchronous replication of the iCAT server. Not surprisingly, this led to high latency and performance degradation on data transfers, because each transfer could be committed only after iCAT servers complete replication. To eliminate this problem, PetaShare adopted an asynchronous replication system.

The biggest problem of asynchronous multi-master replication is that conflicts occur if two sites update their databases within the same replication cycle. For this reason, the proposed multi-master replication method should detect and resolve conflicts. Petashare uses a conceptual conflict resolver that handles such conflicts. Common conflict types are: (i) uniqueness conflicts: occur if two or more sites try to insert the records with the same primary key; (ii) update conflicts: occur if two or more sites try to update the same record within the same replication cycle; (iii) delete conflicts: occur if one site deletes a record from database while another site tries to update this record.

To prevent uniqueness conflicts, ID intervals are pre-assigned to different sites. (This could as well be achieved by prefacing IDs with the site ids.) Update conflicts are handled using the latest write rule if not resolved within a day, but there is a one-day grace period where negotiation (manual conflict handling) can be used. Delete conflicts are also handled similar to update conflicts.

The paper provides real-deployment experiment results on centralized, synchronous, and asynchronous replicated metadata servers. The no-replication column indicates using a centralized metadata server. Table 2 lets us to evaluate the performance of replication methods because the contribution of data transfer to the latency is minimized. For all data sets the asynchronous replication method outperforms the others, since both write and database operations are done locally. Similar to Table1, the central iCAT model gives better results than synchronous replication.