Saturday, September 25, 2010

The Design and Implementation of a Log-Structured File System

by Rosenblum, Mendel and Ousterhout, John K. SOSP 1991. PDF link for the paper

The motivation for the log-structured filesystem (LFS) are threefold.
1) Traditional file systems (say UNIX FAST filesystem) perform poorly for write. FAST requires 6 writes to create a new one block file.
2) Main memory caching capacity is increasing, which takes care of reads, so the workloads to disks are dominated by writes. And this is bad, see above.
3) There is a large gap between random and sequential i/o performance. Random is dominated by seeking time (which is very costly), whereas you avoid the seek time with sequential i/o.

The LFS idea is simple: focus on write performance and utilize sequential bandwidth of disks. Here is how it works. LFS buffers all writes into a segment in main memory. (Even though the writes may be for different files, they are appended into the segment.) When the segment is full, LFS writes the segment to unused place on disk sequentially again.

Since LFS writes sequentially by appending inodes, new versions can be later in the disk. Then we have the question of how to find the inodes. The answer is inode map data structure is used for finding inodes. But then how do we find the inode map? inode map is also written in the segment in a log structure manner.

The answer is checkpoint region (CR), which has a fixed place on disk. CR points to the inodes on the disks, and it is primarily kept in the main memory. The disk copy of CR is updated periodically every minute (as a defense to power outage, and for use during bootup). The main memory always has the most up to date version of CR. To read a file from disk, LFS consults the CR (at memory) to learn the disk address from the inode at CR.

Since new versions are always appended, old versions of things are left in the disk. These old dead versions should be garbage collected. But the problem is in a segment, there can be live blocks next to some dead blocks. So, LFS copies/packs the live blocks in to a new segment, and then reclaim the entire segment as available, free.

The way LFS detect live and dead blocks in a segment is by scanning the blocks. LFS looks at the blocks in a segment, learns its inode#, address in disk, checks this with the inode map in CR (in main memory). If these match, this block is new, otherwise the block is old as the CR points to a new version.

LFS performs crash recovery as follows. CR (on disk) has a pointer to the last segment it knows of, but another segment may have been added after CR has been updated. Luckily, each segment store a pointer to the next segment. So the CR asks the last segment it knows of to see if it has a next segment pointer. If so, CR follows these pointer[s] and updates itself.

Since LFS reduces the 6 write per block in FAST to almost 1 write (paper gives this number as 1.7 empirically), LFS performs very well. In fact for the experiment with several 1K sized file writes, LFS shows 10 times improvement over FAST. For larger files, LFS is still better, but the improvement is not that drastic, because for large files seek time is not the most dominant factor anymore. LFS performs worse that FAST if a file is re-read sequentially.

LFS has spanned of a lot of file systems including:
jffs: fs for flash, log structured
sun's zfs: copy on write
netapp's wafl: zero copy snapshot

In particular LFS becomes important for SSDs, due to a technical property of writing segments in SSD. Yes, SSDs can do random write very fast, and LFS is not needed. But the way SSDs update something on a segment is to read the entire segment and update and write the entire updated segment back. And that is inefficient. LFS can reduce that inefficiency. Here is a great coverage of the subject.

We discussed an interesting question after the presentation. "Is LFS a good filesystem for the mapreduce workload?" The answer is not straightforward. I would argue no, because mapreduce already deals with large sequential data blocks. But, in the shuffle, the system may need to a lot of random access writes, which may make a case for LFS.

Pig Latin: A Not-So-Foreign Language for Data Processing

I will be honest, I haven't read the week-3 papers. But this does not stop me from writing a short summary of the Pig Latin paper based on the notes I took during its class discussion. So I apologize if there is a mistake in my review.

(The second week-3 paper was the Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks. Unfortunately we had little time to discuss this paper, and I also had to leave early. So no review for that paper. I didn't understand much, so I will just paraphrase from its abstract "A Dryad application combines computational vertices with communication channels to form a dataflow graph. Dryad runs the application by executing the vertices of this graph on a set of available computers, communicating as appropriate through files, TCP pipes, and shared-memory FIFOs.")

Pig Latin: A Not-So-Foreign Language for Data Processing
Christopher Olston, Benjamin Reed, Utkarsh Srivastava, Ravi Kumar, Andrew Tomkins (Yahoo Research)

Pig Latin is a front end to mapreduce. Pig Latin provides support for join split chains, which is not there natively in mapreduce (I guess this means you have think how you can program a join or split job in mapreduce). The Pig Latin pseudocode is then translated to efficient mapreduce codeto be executed in the background. Pig Latin is closer to procedural language than SQL. And that makes it easier to use by programmers and gives more control to the programmers.

I am not convinced why mapreduce is not natural for join split operations. I think a reduce worker is doing a join already. And in the couple of examples the paper presents, mapreduce consistently uses less steps than the corresponding Pig Latin code.

Here is my reservation about Pig Latin. Mapreduce did not hide parallelism, you still have think and plan for parallel execution, but in contrast, Pig Latin completely hides parallelism. So, this may get the programmer lazier and not think/plan for parallel execution and write very inefficient code. And, I don't think Pig Latin can optimize badly written code. So how bad can this get in the worst case? 10 folds slower? Does someone have an example to how bad this can get? (Maybe putting a long task in FOREACH could make for a very inefficient code.) Somebody in the class stated this after my question "Only a good mapreduce programmer would make a good piglatin programmer." I also heard that if you download Pig Latin from Hadoop website and use it with the default configuration it is very slow. You have to tune to get it normal efficiency.

I think the most useful thing about Pig Latin is that it provides an iterative debugging environment for developing code. The user takes an initial stab at writing a program, and submit it to the system for execution, inspects the output, and repeats the process.

MapReduce Online

I am running behind my blogging the seminar. Here are some short notes on the papers we have been discussing in the Data Center Computing seminar.

The "MapReduce Online" paper by "Tyson Condie, Neil Conway, Peter Alvaro, Joseph M. Hellerstein, Khaled Elmeleegy and Russell Sears" was a reading assignment for the first week, but we didn't have the time to discuss it. This paper is written very simple and provides a clean discussion of Hadoop's inner workings that I felt I should write about the paper just for this reason.

The paper presents a modified Hadoop mapreduce framework that allows data to be pipelined between operators, and supports online aggregation, which allows users to see "early returns" from a job as it is being computed.

In Hadoop, a downstream worker is blocked until the previous worker process the entire data and make it available in entirety. In contrast, pipelining delivers data to downstream operators more promptly. Not only the immediate downstream worker can start before the previous worker completes its work on the entire data, several other downstream workers in the chain after the immediate downstream worker can also get started as well. This increases opportunities for parallelism, improves utilization, and reduces response time. The initial performance results demonstrate that pipelining can reduce job completion times by up to 25% in some scenarios.

While this upto 25% reduction in completion time is something, online mapreduce also comes with its luggage. Adding pipelining to mapreduce introduces significant complexity. Especially, I found the fault-tolerance discussion (checkpointing) in the paper to be complicated and unclear. So, I am not sure I buy the motivation for online mapreduce. The paper mentions as a motivation that getting early approximate results before the mapreduce completes can be useful. But, somehow this feels weak. Since mapreduce is an inherently batch process, the benefits of early approximate results are not very clear to me.

The paper also mentions that online mapreduce enables answering continuous queries, and this motivation looks more promising to me. More results and "incremental results" are computed for a continuous query as more new data arrives. If we had to answer a continuous query via traditional Hadoop, we would have to start a new mapreduce job instance periodically, but then each new mapreduce job does not have access to the computational state of the last analysis run, so this state must be recomputed from scratch.

The paper's coauthor Hellerstein has worked on continuous querying in databases world. The real applications of continuous querying in the cloud are not clear to me yet. So, it will be interesting to see what type of new applications this work will enable in the cloud domain.

Monday, September 13, 2010

Twitter, open the firehose!

Twitter has 30 millions of users just in US. The users tweet about everything that interest or bother them. These tweets are public and over 50 million tweets are added to this pool everyday. It is no surprise that data mining researchers have been swarming to Twitter data to use it for mining public opinion and trends.

Data mining researchers have a problem, though. In this year's KDD conference (arguably the top conference on data mining), the researchers were unanimously vocal about the difficulties of acquiring the data they need from Twitter. The researchers need to learn Twitter API and other networking tools to acquire the data. And Twitter imposes a strict and very low-quota on the data it serves per IP. Of course Twitter does this to ensure that their system is not overwhelmed by these third party requests and can continue to serve its users. To get access to more data there is a lengthy process of whitelisting from Twitter. But, I guess, Twitter is unlikely to whitelist and serve firehose data to many users due to scalability problems.

Enter my cloud storage project idea for tweets. Wouldn't it be nice to store and serve tweets to data mining researchers worldwide? If Twitter can give us access to the firehose, we can replicate all tweets daily in a cloud storage system and index and store the tweets for easy sharing. In particular, to make the researchers' job easier, we can store tweets about subscribed terms by users in separate files. For example, a researcher mining data about US politics would be interested in tweets that has Obama in them, and should get access to these tweets quickly.

Twitter firehose would also help us to scale up our existing opinion mining projects using Twitter. One example is our Upinion project . Traditional polls offer snapshots about public opinion, and fail to identify breakpoints in public opinion. So in the upinion project, we consider the problem of identifying breakpoints in public opinion by using and categorizing emotion words in the tweets. We develop methods to detect changes in public opinion, and find events that cause these changes. The details of this work is available in our recent paper: Identifying Breakpoints in Public Opinion.

Intermediate storage support for mapreduce for improved fault-tolerance

The second paper we discussed in the first week of the data center computing seminar is Steve Ko's work on improving fault-tolerance in mapreduce jobs. The notes below are just the edited version of the notes I took during Steve's presentation, so mistakes and omissions are most probably mine.
Steven Y. Ko, Imranul Hoque, Brian Cho, and Indranil Gupta
Proceedings of the ACM Symposium on Cloud Computing 2010 (SOCC), 2010
Yes, mapreduce is fault-tolerant in the sense that the master will reassign a failed map or reduce task to another worker to get the job done. But, this approach to fault-tolerance introduces latency to the jobs. Steve's experiments showed that 1 failure in 20 machines led to 33% increase in runtime in a single mapreduce task.

We have a worse problem in our hands when we consider mapreduce job chains. A worker failure in the middle of the chain necessitates a cascaded re-execution till the top of the chain. Since intermediate data is not saved in mapreduce, the master will need to schedule all the previous tasks till that point so that the job can continue. Given that Yahoo webmap is a chain of 100 mapreduce tasks and Google indexing is a chain of 24 mapreduce tasks, the latency due to failed tasks constitute a big problem.

And, failures do happen routinely in data centers. Google reported 5 average worker deaths per mapreduce job in 2006. (Worker thankfully refers to process not employee in this case :-) In 2009, Yahoo reported 50 machines out of 20000 machines fail on average.

The above discussion shows that intermediate data in mapreduce is important, because when it is lost, it is very costly to regenerate. So, we should treat the intermediate data as first class citizen, and store the intermediate data as well.

But there are challenges to doing this right. Storing intermediate data requires CPU and disk access, and the challenge is to minimize the interference of replication on the overall performance of the mapreduce job. Replication, if not done carefully, increases completion time by a couple folds (see the paragraph below).

Identifying the bottleneck in replication:
We start with step1-Hadoop (no replication) and incrementally add support for replication; step2-read added, step3- read-send added, and finally step4-full-replication. The first three have been chosen as control group to identify where the bottleneck is in this increasing spectrum till full replication. The experiment results show that there is a big jump from read to read-send (where network transfer is added), and no observable jump in between other phases. So the bottleneck is in the network transfer of the data.

HDFS does synchronous replication, making this asynchronous replication helps, but since the bottleneck is the network, this help is not significant. The real problem is HDFS replicates across different racks, so that is why that network bottleneck incurs overhead on performance. And we should try to get rid of that.

ISS (Intermediate Storage System):
The proposed solution ISS is a combination of two techniques: local replication and rack-level replication.

The local replication technique makes the following observation about a single mapreduce task execution. The shuffle phase between the map and reduce phases provide a natural replication by replicating the output of the map worker on the reduce worker machine. The only exception is when map worker and reduce worker are both scheduled on the same machine, so we need only to replicate at this case explicitly. For the other case, we just keep track of where the data is replicated before the reduce-workers start.

If only the local replication is used, then the overhead of replication is very insignificant. But notice that local-only replication is applicable within one mapreduce task. In mapreduce chains, there is no natural replication between the reduce and the next map, so local replication is not applicable.

The rack-level replication technique comes into play for addressing this problem. The observation in this technique is that while shared core switches (top level switches) are over-utilized and contended, the rack switches are under-utilized. So we should replicate within the rack to eliminate the network latencies, which we had identified as the bottleneck for the performance.

Rack-level replication may have a drawback: if the entire rack fails we lost our intermediate data and need to start from the beginning. However, Google's 2008 data shows that there are only around 20 rack failures per year (mostly planned downtime), so mean time to fail for racks is very big compared to job execution times, it is insignificant.

Results from ISS implementation:
The paper includes experiments on the ISS implementation. Under no failure, the performance of Hadoop augmented with ISS (i.e., job completion time) turns out to be comparable to base Hadoop. Under 1 machine failure, Hadoop with ISS incurs only a 18% increase in completion time over no fault-case, a commendable feat. To contrast, base Hadoop incurs a 59% increase in completion time under 1 machine failure over the no fault-case.

Research ideas:
While rack level replication is suggested for reducing the overhead of networking in ISS for the sake of fault-tolerance, I am left wondering why the same rack level data transfer idea is not used for reducing the overhead of networking in the mapreduce normal operation for the sake of performance. Can we use a scheduling approach to schedule the next map task to be in the same rack as the completed reduce? Is this too constraining for mapreduce normal operation to be sustainable over long mapreduce chains?

How about a tradeoff? Some replication is also done across racks (which provides overhead for the sake of fault-tolerance), but that overhead is compensated because the next map task scheduled in the same rack as the replicated data and avoids network overhead?

Another question can we use parallel TCP streams to original and replicated data to improve the overall bandwidth of mapreduce which can provide further reductions in latency even in the normal (fault-free) mapreduce operation?

Thursday, September 9, 2010

MapReduce: Simplified Data Processing on Large Clusters

Steve Ko joined our department this semester from UIUC by way of Princeton. He is offering a seminar course on data center computing. I am sitting in his seminar to catch-up on these topics. I thought I might as well write some posts about the papers we discuss in that seminar.

Several companies use mapreduce. Mapreduce helps companies process large data. Mapreduce is popular because it provides a simple abstraction for parallelizing, and hide the meticuluous tasks involved in parallelizing in the datacenter under the hood.

Mapreduce has been inspired by the map reduce operations in Lisp.
(map square '(1 2 3 4))
output is (1 4 9 16)
(reduce + '(1 4 9 16))
output is 30
because of (+ 16 (+ 9 (+ 4 1)))

Google noticed that they keep doing tasks that follow the same pattern: they individually process web pages (map), and then gather-reprocess-fuse (reduce) the output of these individual webpages together.

Mapreduce should actually be called map-shuffle-reduce. Shuffle is the rarely mentioned phase that runs after the map completes. Shuffle takes output of the map, group them together, so reduce can use them. Shuffle is local to the machine's data, so shuffle is local to the individual machine does not involve networking. Reduce, on the other hand, will collect intermediate data from the machines using the network, and run another groupby before running the reduce function.

Mapreduce provides a nice abstraction to the developer, but now let's look at how mapreduce is implemented under the hood. There is a single master that knows about the map-workers and reduce-workers. The master keeps track of how the tasks are progressing. To this end, "worker pull" scheme is used: worker signals it is empty to the master, and the master finds task to assign to the worker. Note that, there is nothing stopping the master to use the same machine/node for as a reduce-worker after using it first as a map worker. For fault-tolerance, the master monitors the worker, if a worker is lagging behind, the master assigns the task to another worker.

Hadoop is the opensource implementation of mapreduce. Hadoop has several subprojects. HDFS is the open source implementation of Google File System. Hbase is bigtable. Zookeeper is the Paxos protocol for membership protocols. Other projects under Hadoop include hive, pig, etc. In contrast to Amazon which provides elastic mapreduce, the Hadoop implementation requires you to specify the number of nodes/computers you will use for your mapreduce application.

There are two directions of improvements suggested for mapreduce. The first direction is improvements in terms of simplifying/extending the programming model. The biggest problem with mapreduce is that for a typical application you don't just have one mapreduce, but a chain of dozens of mapreduces. (Some Yahoo jobs have a chain of 100 mapreduces.) So mapreduce can get complicated. Projects like pig, dryad, hive, sawzall, map-reduce-merge, try to address this problem by providing even higher level abstractions (e.g., SQL) over mapreduce.

The second direction is improvements on the performance the runtime system supporting the mapreduce abstraction. Some example projects are on better scheduling (Late scheduler), better fault handling (Intermediate Storage System), and pipelining of the mapreduces (online mapreduce).

While I don't have experience in data center computing, I can see a couple low hanging fruits from the mapreduce work. One is how to optimize the networking/data-scheduling in the datacenter, provided that we have access to the mapreduce job sourcecodes that are scheduled to run next. One straightforward idea is to choose which machines the reduce-workers (that use the output of map-workers) and the map-workers (that use the output of reduce-workers in previous roun) should run to minimize the data that needs to be moved around.

Wednesday, September 8, 2010

Writing advice

Elements of Style gives very good writing advice.
  • use the active voice
  • put statements in positive form
  • omit needless words
  • use definite, specific, concrete language
  • keep related words together
  • work from a suitable design
  • express coordinate ideas in similar form
  • be concise
  • be precise
  • be clear
  • revise and rewrite

But, I think this has got to be the best writing advice. Originally due to Rory Marinich.

It's easy to make something incredible. All you do is, don't let what you're doing be shit.

If something is shitty, you make it better. There are usually a few hundred thousand things in anything that are shitty, but they're all easily fixable. You just have to dedicate yourself to not being shit.

Anything can be shit. An idea can be shit. Don't execute a shitty idea. A design or a lyric or a guitar part or a photo can be shit. If it is, do it better.

It baffles me that people think making really, really brilliant, stupendous, worldshocking pieces of work is a particular challenge. It's mainly a battle of endurance. The longer you try and deshit something, the less shit it is.

Experience helps, because with experience you stop being quite as shitty the first time around, but even a complete amateur can write a brilliant, masterful symphony, if he's got the dedication needed to start with complete shit and slowly remove all of the shit (which means, to be clear, that when he's done there'll likely be nothing left of his original shit symphony, not even the melody).

This is called "revision", and it is one hundred percent effective, and people still don't do it.

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...