Monday, July 18, 2016

Kafka, Samza, and the Unix Philosophy of Distributed Data

This paper is very related to the "Realtime Data Processing at Facebook" paper I reviewed in my previous post. As I mentioned there Kafka does basically the same thing as Facebook's Scribe, and Samza is a stream processing system on Kafka.

This paper is very easy to read. It is delightful in its simplicity. It summarizes the design of Apache Kafka and Apache Samza and compares their design principles to the design philosophy of Unix, in particular, Unix pipes.

Who says plumbing can't be sexy? (Seriously, don't Google this.) So without further ado, I present to you Mike Rowe of distributed systems.

Motivation/Applications

I had talked about the motivation and applications of stream processing in the Facebook post. The application domain is basically building web services that adapt to your behaviour and personalize on the fly, including Facebook, Quora, Linkedin, Twitter, Youtube, Amazon, etc. These webservices take in your most recent actions (likes, clicks, tweets), analyze it on the fly, merge with previous analytics on larger data, and adapt to your recent activity as part of a feedback loop.

In theory you can achieve this personalization goal with a batch workflow system, like MapReduce, which provides system scalability, organizational scalability (that of the engineering/development team's efforts), operational robustness, multi-consumer support, loose coupling, data provenance, and friendliness to experimentation. However, batch processing will add large delays. Stream processing systems preserve all the good scalability features of batch workflow systems, and add "timeliness" feature as well.

I am using shortened descriptions from the paper for the following sections.

Apache Kafka


Kafka provides a publish-subscribe messaging service. Producer (publisher) clients write messages to a named topic, and consumer (subscriber) clients read messages in a topic. A topic is divided into partitions, and messages within a partition are totally ordered. There is no ordering guarantee across different partitions. The purpose of partitioning is to provide horizontal scalability: different partitions can reside on different machines, and no coordination across partitions is required.

Each partition is replicated across multiple Kafka broker nodes to tolerate node failures. One of a partition's replicas is chosen as leader, and the leader handles all reads and writes of messages in that partition. Writes are serialized by the leader and synchronously replicated to a configurable number of replicas. On leader failure, one of the in-sync replicas is chosen as the new leader.

The throughput of a single topic-partition is limited by the computing resources of a single broker node --the bottleneck is usually either its NIC bandwidth or the sequential write throughput of the broker's disks. When adding nodes to a Kafka cluster, some partitions can be reassigned to the new nodes, without changing the number of partitions in a topic. This rebalancing technique allows the cluster's computing resources to be increased or decreased without affecting partitioning semantics.

Apache Samza

A Samza job consists of a Kafka consumer, an event loop that calls application code to process incoming messages, and a Kafka producer that sends output messages back to Kafka. Unlike many other stream-processing frameworks, Samza does not implement its own network protocol for transporting messages from one operator to another.


Figure 3 illustrates the use of partitions in the word-count example: by using the word as message key, the SplitWords task ensures that all occurrences of the same word are routed to the same partition of the words topic.


Samza implements durable state through the KeyValueStore abstraction, exemplified in Figure 2. Samza uses the RocksDB embedded key-value store, which provides low-latency, high-throughput access to data on local disk. To make the embedded store durable in the face of disk and node failures, every write to the store (i.e., the changelog) is also sent to a dedicated topic-partition in Kafka, as illustrated in Figure 4. When recovering after a failure, a task can rebuild its store contents by replaying its partition of the changelog from the beginning. Rebuilding a store from the log is only necessary if the RocksDB database is lost or corrupted. While the changelog publishing to Kafka for durability seems wasteful, it can also be a useful feature for applications: other stream processing jobs can consume the changelog topic like any other stream, and use it to perform further computations.


One characteristic form of stateful processing is a join of two or more input streams, most commonly an equi-join on a key (e.g. user ID). One type of join is a window join, in which messages from input streams A and B are matched if they have the same key, and occur within some time interval delta-t of one another. Alternatively, a stream may be joined against tabular data: for example, user clickstream events could be joined with user profile data, producing a stream of clickstream events with embedded information about the user. When joining with a table, the authors recommend to make the table data available in the form of a log-compacted stream through Kafka. Processing tasks can consume this stream to build an in-process replica of a database table partition, using the same approach as the recovery of durable local state, and then query it with low latency. It seems wasteful to me, but it looks like the authors do not feel worried about straining Kafka, and are comfortable with using Kafka as a work horse.

Even though the intermediate state between two Samza stream processing operators is always materialized to disk, Samza is able to provide good performance: a simple stream processing job can process over 1 million messages per second on one machine, and saturate a gigabit Ethernet NIC.

Discussion

The paper includes a nice discussion section as well.
  • Since the only access methods supported by a log are an appending write and a sequential read from a given offset, Kafka avoids the complexity of implementing random-access indexes. By doing less work, Kafka is able to provide much better performance than systems with richer access methods. Kafka's focus on the log abstraction is reminiscent of the Unix philosophy: "Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new features." 
  • If Kafka is like a streaming version of HDFS, then Samza is like a streaming version of MapReduce. The pipeline is loosely coupled, since a job does not know the identity of the jobs upstream or downstream from it, only the topic names. This principle again evokes a Unix maxim: “Expect the output of every program to become the input to another, as yet unknown, program.”
  • There are some key differences between Kafka topics and Unix pipes: A topic can have any number of consumers that do not interfere with each other, it tolerates failure of producers, consumers or brokers, and a topic is a named entity that can be used for tracing data provenance. Kafka topics deliberately do not provide backpressure: the on-disk log acts as an almost-unbounded buffer of messages.
  • The log-oriented model of Kafka and Samza is fundamentally built on the idea of composing heterogeneous systems through the uniform interface of a replicated, partitioned log. Individual systems for data storage and processing are encouraged to do one thing well, and to use logs as input and output. Even though Kafka's logs are not the same as Unix pipes, they encourage composability, and thus Unix-style thinking.

Related links

Further reading on this is Jay Kreps excellent blog post on logs.

Apache Bookkeeper and Hedwig are  good alternatives to Kafka

These days, there is also DistributedLog.

Saturday, July 9, 2016

Realtime Data Processing at Facebook

Recently there has been a lot of development in realtime data processing systems, including Twitter's Storm and Heron, Google's Millwheel, and LinkedIn's Samza. This paper presents Facebook's Realtime data processing system architecture and its Puma, Swift, and Stylus stream processing systems. The paper is titled "Realtime Data Processing at Facebook" and it appeared at Sigmod'16, June 26-July 1.

Motivation and applications

Facebook runs hundreds of realtime data pipelines in productions. As a motivation of the realtime data processing system the paper gives Chorus as an example. The Chorus data pipeline transforms a stream of individual Facebook posts into aggregated, anonymized, and annotated visual summaries. E.g., what are the top 5 topics being discussed for the election today? What are the demographic breakdowns (age, gender, country) of World Cup fans?

Another big application is the mobile analytics pipelines that provide realtime feedback for Facebook mobile application developers, who use this data to diagnose performance and correctness issues.

The system architecture


Scribe plays a central role in Facebook's realtime processing architecture. The main idea of the architecture is this: By trading seconds versus milliseconds latency, the architecture is able to employ a persistent message bus, i.e., Scribe, for data transport. Scribe provides a persistent, distributed messaging system for collecting, aggregating and delivering high volumes of log data with a few seconds of latency and high throughput. Scribe is the transport mechanism for sending data to both batch and realtime systems at Facebook. Using Scribe to decouple the data transport from the processing allows the system to achieve fault tolerance, scalability, and ease of use, as well as supporting multiple processing systems as options.

While Scribe incurs a few seconds of latency, it still meets Facebook's performance requirements for latency and provides hundreds of Gigabytes per second throughput. On the other hand, Scribe provides a persistent message bus service that enables decoupling and isolation of the data production and data analysis system components. Moreover, with persistent Scribe streams, the system can replay a stream from a recent time period, which makes debugging and iterative-development much easier.

The Kafka log blog by Jay Kreps described these benefits nicely as well. It talked about how practical systems can by simplified with a log-centric design, and how these log steams can enable data Integration by making all of an organization's data easily available in all its storage and processing systems. Kafka would have similar advantages to Scribe. Facebook uses Scribe because it is developed in house.

Below I copy snippets of descriptions from the paper for each of these subsystems.

Within Scribe, data is organized by distinct streams of "category". Usually, a streaming application consumes one Scribe category as input. A Scribe category has multiple buckets. A Scribe bucket is the basic processing unit for stream processing systems: applications are parallelized by sending different Scribe buckets to different processes. Scribe provides data durability by storing it in HDFS. Scribe messages are stored and streams can be replayed by the same or different receivers for up to a few days.

The realtime stream processing systems Puma, Stylus, and Swift read data from Scribe and also write to Scribe.  Laser, Scuba, and Hive are data stores that use Scribe for ingestion and serve different types of queries. Laser can also provide data to the products and streaming systems, as shown by the dashed (blue) arrows.

Puma is a stream processing system whose applications (apps) are written in a SQL-like language with UDFs (user-defined functions) written in Java. Puma apps are quick to write: it can take less than an hour to write, test, and deploy a new app. Unlike traditional relational databases, Puma is optimized for compiled queries, not for ad-hoc analysis. Puma provides filtering and processing of Scribe streams (with a few seconds delay). The output of these stateless Puma apps is another Scribe stream, which can then be the input to another Puma app, any other realtime stream processor, or a data store.

Swift is a basic stream processing engine which provides checkpointing functionalities for Scribe. If the app crashes, you can restart from the latest checkpoint; all data is thus read at least once from Scribe. Swift is mostly useful for low throughput, stateless processing.

Stylus is a low-level stream processing framework written in C++. A Stylus processor can be stateless or stateful. Stylus's processing API is similar to that of other procedural stream processing systems.

Laser is a high query throughput, low (millisecond) latency, key-value storage service built on top of RocksDB. Laser can be used to make the result of a complex Hive query or a Scribe stream available to a Puma or Stylus app, usually for a lookup join, such as identifying the topic for a given hashtag.

Scuba is Facebook's fast slice-and-dice analysis data store, most commonly used for trouble-shooting of problems as they happen. Scuba provides ad hoc queries with most response times under 1 second.

Hive is Facebook's exabyte-scale data warehouse. Facebook generates multiple new petabytes of data per day, about half of which is raw event data ingested from Scribe. (The other half of the data is derived from the raw data, e.g., by daily query pipelines.) Most event tables in Hive are partitioned by day. Scribe does not provide infinite retention; instead Facebook stores input and output streams in our data warehouse Hive for longer retention.

Design decisions


Figure 4 summarizes the five design decisions considered for this Facebook realtime processing system components. Figure 5 summarizes which alternatives were chosen by a variety of realtime systems, both at Facebook and in the related literature.

Lessons learned

The paper includes a great lessons learned section. It says: "It is not enough to provide a framework for users to write applications. Ease of use encompasses debugging, deployment, and monitoring, as well. The value of tools that make operation easier is underestimated. In our experience, every time we add a new tool, we are surprised that we managed without it."

The highlights from this section are as follows:

  • There is no single language that fits all use cases. Needing different languages (and the different levels of ease of use and performance they provide) is the main reason why Facebook has three different stream processing systems, Puma, Swift, and Stylus.
  • The ease or hassle of deploying and maintaining the application is equally important. Making Puma deployment self-service let them scale to the hundreds of data pipelines that use Puma. (See Facebook's holistic configuration management about what type of systems Facebook employs to manage/facilitate deployments.
  • Once an app is deployed, we need to monitor it: Is it using the right amount of parallelism? With Scribe, changing the parallelism is often just changing the number of Scribe buckets and restarting the nodes that output and consume that Scribe category. To find out the right amount of parallelism needed, Facebook uses alerts to detect when an app is processing its Scribe input more slowly than the input is being generated. 
  • Streaming versus batch processing is not an either/or decision. Originally, all data warehouse processing at Facebook was batch processing. Using a mix of streaming and batch processing can speed up long pipelines by hours.


Related posts

Facebook's software architecture 

Holistic Configuration Management at Facebook

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services 

Measuring and Understanding Consistency at Facebook

Thursday, July 7, 2016

Efficient Replication of Large Data Objects


This paper appeared in DISC 2003, and describes an application of the ABD replicated atomic storage algorithm for replication of large objects. When objects being replicated is much larger than the size of the metadata (such as tags or pointers), it is efficient to tradeoff performing cheaper operations on the metadata in order to avoid expensive operations on the data itself.

The basic idea of the algorithm is to separately store copies of the data objects in replica servers, and information about where the most up-to-date copies are located in directory servers. This Layered Data Replication (LDR) approach adopts the ABD algorithm for atomic fault-tolerant replication of the metadata, and prescribes how the replication of the data objects in the replica servers can accompany replication of the metadata in directory servers in a concurrent and consistent fashion: In order to read the data, a client first reads the directories to find the set of up-to-date replicas, then reads the data from one of the replicas. To write, a client first writes its data to a set of replicas, then informs the directories that these replicas are now up-to-date.

The LDR algorithm replicates a single data object supporting read and write operations, and guarantees that the operations appear to happen atomically.  While there exist multiple physical copies of the data, users only see one logical copy, and user operations appear to execute atomically on the logical copy. As such LDR provides linearizability, a strong type of consistency, that guarantees that a read operation returns the most recent version of data. LDR provides single-copy consistency and is on the CP side of the CAP triangle; availability is sacrificed when a majority of replicas are unreachable.

Client Protocol

When client i does a read, it goes through four phases in order: rdr, rdw, rrr and rok. The phase names describe what happens during the phase: read-directories-read, read-directories-write, read-replicas-read, and read-ok. During rdr, i reads (utd, tag) from a quorum of directories to find the most up-to-date replicas. i sets its own tag and utd to be the (tag, utd) it read with the highest tag, i.e., timestamp. During rdw, i writes (utd, tag) to a write quorum of directories, so that later reads will read i’s tag or higher. During rrr, i reads the value of x from a replica in utd. Since each replica may store several values of x, i tells the replica it wants to read the value of x associated with tag. During rok, i returns the x-value it read in rrr.

When i writes a value v, it also goes through four phases in order: wdr, wrw, wdw and wok. These phase names stand for write-directories-read, wrw for write-replicas-write, wdw for write-directories-write, and wok for write-ok, respectively. During wdr, i reads (utd, tag) from a quorum of directories, then sets its tag to be higher than the largest tag it read. During wrw, i writes (v, tag) to a set acc of replicas, where |acc| ≥ f + 1. Note that the set acc is arbitrary; it does not have to be a quorum. During wdw, i writes (acc, tag) to a quorum of directories, to indicate that acc is the set of most up-to-date replicas, and tag is the highest tag for x. Then i sends each replica a secure message to tell them that its write is finished, so that the replicas can garbage-collect older values of x. Then i finishes in phase wok.


If you have difficulty in understanding the need for 2-round directory reads/writes this protocol, reviewing how the ABD protocol works will help.

Replica and Directory node protocol

The replicas respond to client requests to read and write values of data object x. Replicas also garbage-collect out of date values of x, and gossip among themselves the latest value of x. The latter is an optimization to help spread the latest value of x, so that clients can read from a nearby replica.

The directories' only job is to respond to client requests to read and write utd and tag.

Questions and discussion

Google File System (SOSP 2003) addressed efficient replication of large data objects for datacenter computing in practice. GFS also provides a metadata service layer and data object replication layer. For the metadata directory service, GFS uses Chubby, a Paxos service which ZooKeeper cloned as opensource.  Today if you want to build from a consistent large object replication storage from scratch, your architecture would most likely use ZooKeeper as the metadata directory coordination service as GFS prescribed. ZooKeeper provides atomic consistency already, so it eliminates the 2-round needed for directory-reads and directory-writes in LDR.

LDR does not use a separate metadata service, instead it can scavenge raw dumb storage nodes for directory service and achieve the same effect by using ABD replication for making the metadata directory atomic/fault-tolerant. In other words, LDR takes a fully-decentralized approach, and can support loosely-connected heterogenous wimpy devices (maybe even smartphones?). I guess that means more freedom. On the other hand, LDR is bad for performance. It requires 2 rounds of directory-write for each write operation and 2 rounds of directory-read for each read operation. This is major drawback for LDR. Considering reads are generally 90% of the workload, supporting 1 round directory-reads would have alleviated the performance problem somewhat. Probably in normal cases (in the absence of failures, the first directory read (rdr operation) will show the up-to-date replica copy is present in a quorum of directory nodes, and the second round of directory access (rdw operation) can be skipped.

Using ZooKeeper for the metadata directory helps a lot, but a downside can be that ZooKeeper is a single centralized location, and that means for some clients across to ZooKeeper will always incur high WAN communication penalty. Using ZooKeepers observers reduce this cost for read operations. And as I will blog about soon, our work on WAN-Keeper reduces this cost also for write operations. The LDR paper suggests that LDR is suitable for WAN, but LDR still incurs WAN latencies while accessing a quorum of directory nodes (twice!) across WAN.

Another way to efficiently replicate large data objects is of course key-value stores. In key-value stores, you don't have a metadata directory, as "hashing" takes care of that. On the other hand, most key-value stores sacrifice strong consistency, in lieu for eventual consistency. Is it true that you can't just get away with using hashes and  need some sort of metadata service if you like to achieve consistency? The consistent key-value stores I can think of (and I can't think of too many) use either a Paxos commit on metadata or at least a chain replication approach such as in Hyperdex and Replex. The chain replication approach uses a Paxos box only for directory node replication configuration information; does that still count as a minimal and 1-level-indirect metadata service?

Friday, July 1, 2016

Replex: A Scalable, Highly Available Multi-Index Data Store

This paper received the best paper award at Usenix ATC'16 last week. It considers a timely important problem. With NoSQL databases, we got scalability, availability, and performance, but we lost secondary keys.  How do we put back the secondary indices, without compromising scalability, availability, and performance.

The paper mentions that previous work on Hyperdex  did a good job of re-introducing secondary keys to NoSQL, but with overhead: Hyperdex generates and partitions an additional copy of the datastore for each key. This introduces overhead for both storage and performance: supporting just one secondary key doubles storage requirements and write latencies.

Replex adds secondary keys to NoSQL databases without that overhead. The key insight of Replex is to combine the need to replicate for fault-tolerance and the need to replicate for index availability. After replication, Replex has both replicated and indexed a row, so there is no need for explicit indexing.

How does Replex work?

All replexes store the same data (every row in the table), the only difference across replexes is the way data is partitioned and sorted, which is by the sorting key of the index associated with the replex. Each replex is associated with a sharding function, h, such that h(r) defines the partition number in the replex that stores row r.

So, that was easy. But, there is an additional complication that needs to be dealt with. The difficulty arises because individual replexes can have requirements, such as uniqueness constraints, that cause the same operation to be both valid and invalid depending on the replex. Figure 2 gives an example scenario, linearizability requirement for a distributed log.

To deal with this problem, datastores with global secondary indexes need to employ a distributed transaction for update operations, because an operation must be atomically replicated as valid or invalid across all the indexes. But to use a distributed transaction for every update operation would cripple system throughput.

To remove the need for a distributed transaction in the replication protocol, they modify chain replication to include a consensus protocol. Figure 3 illustrates this solution. When the consensus phase (going to the right in Figure 3) reaches the last partition in the chain, the last partition aggregates each partition's decision into a final decision, which is simply the logical AND of all decisions. Then comes the replication phase, where the last partition initiates the propagation of this final decision back up the chain. As each partition receives this final decision, if the decision is to abort, then the partition discards that operation. If the decision is to commit, then that partition commits the operation to disk and continues propagating the decision.

This has similarities to the CRAQ protocol for chain replication. Linked is an earlier post that contains a summary of chain replication and CRAQ protocol.

Fault-tolerance

There is additional complexity due to failure of the replicas. Failed partitions bring up two concerns: how to reconstruct the failed partition and how to respond to queries that would have been serviced by the failed partition.

If a partition fails, a simple recovery protocol would redirect queries originally destined for the failed partition to the other replex. Then the failure amplification is maximal: the read must now be broadcast to every partition in the other replex, and at each partition, a read becomes a brute-force search that must iterate through the entire local storage of a partition.

On the other hand, to avoid failure amplification within a failure threshold f, one could introduce f replexes with the same sharding function, h; as exact replicas. There is no failure amplification within the failure threshold, because sharding is identical across exact replicas. But the cost is storage and network overhead in the steady-state.

This is the tradeoff, and the paper dedicates "Section 3: Hybrid Replexes" to explore this tradeoff space.

Concluding remarks

The paper compares Replex to Hyperdex and Cassandra and shows that Replex's steady-state performance is 76% better than Hyperdex and on par with Cassandra for writes. For reads, Replex outperforms Cassandra by as much as 2-9x while maintaining performance equivalent with HyperDex. In addition, the paper shows that Replex can recover from one or two failures 2-3x faster than Hyperdex.

Replex solves an important problem with less overhead than previous solutions. The hybrid replexes method (explained in Section 3) can also be useful in other problems for preventing failure amplification.

Thursday, June 30, 2016

Modular Composition of Coordination Services

This paper appeared in Usenix ATC'16 last week. The paper considers the problem of scaling ZooKeeper to WAN deployments. (Check this earlier post for a brief review of ZooKeeper.)

The paper suggests a client-side only modification to ZooKeeper, and calls the resultant system ZooNet. ZooNet consists of a federation of ZooKeeper cluster at each datacenter/region. ZooNet assumes the workload is highly-partitionable, so  the data is partitioned among the ZooKeeper clusters, each accompanied by learners in the remote datacenter/regions. Each ZooKeeper cluster processes only updates for its own data partition and if applications in different regions need to access unrelated items they can also do so independently and in parallel at their own site.

However, the problem with such a deployment is that it does not preserve ZooKeeper's sequential execution semantics. Consider the example in Figure 1. (It is not clear to me why the so-called "loosely-coupled" applications in different ZooKeeper partitions need to be sequentially serialized. The paper does not give an examples/reasons for motivating this.)

Their solution is fairly simple. ZooNet achieves consistency by injecting sync requests. Their algorithm only makes the remote partition reads to be synced to achieve coordination. More accurately, they insert a sync every time a client's read request accesses a different coordination service than the previous request. Subsequent reads from the same coordination service are naturally ordered after the first, and so no additional syncs are needed.


Figure 3 demonstrates the solution. I still have some problems with Figure 3. What if both syncs occur at the same time?  I mean what if both occurs after the previous updates complete. Do they, then, prioritize one site over the other and take a deterministic order to resolve ties?

The paper also mentions that they fix a performance bug in ZooKeeper, but the bug is not relevant to the algorithm/problem. It is about more general ZooKeeper performance improvement by performing proper client isolation in the ZooKeeper commit processor.

ZooNet evaluation is done only with a 2-site ZooKeeper deployment. The evaluation did not look at WAN latency and focused on pushing the limits on throughput. Reads are asynchronously pipelined to compensate for the latency introduced by the sync operation. They benchmark the throughput when the system is saturated, which is not a typical ZooKeeper use case scenario.

ZooNet does not support transactions and watches.

Update 7/1/16: Kfir, the lead author of the ZooNet work, provides clarifications for the questions/concerns in my review. See the top comment after the post.

Our work on WAN coordination

We have also been working on scaling ZooKeeper to WAN deployments. We take a different approach. We have extended ZooKeeper with hierarchical composition of ZooKeeper clusters and added lock-tokens to enable local and consistent writes as well as local and consistent reads from any region in a WAN. Our WANKeeper system supports transactions and watches. We have submitted a paper on this which is still under review, so I can talk about our work only next month. In the meanwhile we are working on polishing our WANKeeper software for opensourcing it.

Tuesday, June 28, 2016

How to package your ideas using the Winston star

I came across this advice serendipitously by reading a random Hacker News comment. The advice shows up towards the last 10 minutes of an Artificial Intelligence lecture by Patrick Winston. Winston, a prominent professor at MIT, tells his class that he will disclose them important advice that may make or break their careers. It is about how to pack and present ideas.


His advice is simple. Follow this 5-tip star to pack your ideas better. All the tips start with "S":

  • Symbol: use a symbol to make your idea visual and memorable
  • Slogan: find a catchy slogan for your idea
  • Surprise: highlight the unconventional/counterintuitive/interesting part of your idea
  • Salient: focus on the essential part of your idea, remember: less is more, fewer ideas is better
  • Story: pack your idea with a story, human brains are hardwired for stories

Here let me try to apply the  Winston's Star method on itself, to make this more concrete.

  • Symbol: star is the symbol of the Winston's method
  • Slogan:  with these 5-star tips, you can 5x the impact of your good ideas
  • Surprise: you can achieve fame and impact by packaging your ideas better following these simple presentation tips
  • Salient: devising a good presentation is as important as having good ideas and doing good work
  • Story: these presentation tips are told by a top MIT prof to his undergraduate AI class as secrets to career success

After some googling I found another relevant video from Patrick Winston. This one is titled "How to Speak". (As always, watch at 1.5x speed.)

Related posts

How to present your work
Nobody wants to read your shit

Wednesday, June 22, 2016

Nobody wants to read your shit

Earlier I wrote about how much I liked the "The War of Art" by Steven Pressfield. This newly released book by Pressfield takes on where "The War of Art" has left. While the former focused on the psychological aspects of the writing process, this book focuses on the structural/mechanical aspects of writing. The book was freely distributed as pdf and ebook for a limited time for promotion purposes. (It looks like the promotion ended.) I read it in one sitting and found it interesting. This book can benefit anyone who needs to communicate in writing. (Of course, if you are a novice writer, start with the Elements of Style.)

The book gives a very interesting account of what Steven learned from a 50+ years career performing all forms of writing, including ad writing, Hollywood script-writing, novels, and nonfiction. The first chapter lays down the most important lesson Steven has learned: "Nobody wants to read your shit", and the rest of the book talks about what you can do about it. In a nutshell, you must streamline your message (staying on theme), and make its expression fun (organizing around an interesting concept).

Steven lists these as the universal principles of story telling:
  1. Every story must have a concept. It must put a unique and original spin, twist or framing device upon the material.
  2. Every story must be about something. It must have a theme.
  3. Every story must have a beginning (that grabs the listener), a middle (that escalates in tension, suspense, stakes, and excitement), and an end (that brings it all home with a bang). Act One, Act Two, Act Three.
  4. Every story must have a hero.
  5. Every story must have a villain.
  6. Every story must start with an Inciting Incident, embedded within which is the story's climax.
  7. Every story must escalate through Act Two in terms of energy, stakes, complication and significance/meaning as it progresses.
  8. Every story must build to a climax centered around a clash between the hero and the villain that pays off everything that came before and that pays it off on-theme.
He says that these rules for writing applies to writing nonfiction as well. That includes your whitepapers, blog posts, and theses. You should definitely have a theme and an interesting concept. The hero and villain can be abstract. They can be useful for building some tension when motivating your problem.

The book is an easy and fun read. It feels like Steven is having a heart-to-heart conversation with you and coaching you about how you can become a better writer. While there were many gems, I was particularly intrigued by this passage:
I will do between 10 and 15 drafts of every book I write. Most writers do.
This is a positive, not a negative.
If I screw up in Draft #1, I'll attack it again in Draft #2.
"You can't fix everything in one draft."
Thinking in multiple drafts takes the pressure off. 

My writing related posts:


Tuesday, June 7, 2016

Progress by impossibility proofs

I recently listened to this TED talk by Google X director Astro Teller. The talk is about the Google X moonshot projects, and the process they use for managing them.

Google X moonshot projects aim to address big, important, and challenging problems.  Teller tells (yay, pun!) that the process they manage a moonshot project is basically to try to find ways to kill the project. The team first tries to identify the bottleneck at a project by focusing on the most important/critical/risky part of the project. Then they try to either solve that hardest part or show that it is unsolvable (in a feasible manner) and kill the project. Teller claims that, at Google X, they actually incentivize people to kill the project by awarding bonuses, and celebrate when a project gets proved impossible and killed. And if a project still survives, that is a successful moonshot project that has potential for transformative change.

Although, this approach looks counter intuitive at first, it is actually a good way to pursue transformative impact and fail-fast without wasting too much time and resources. This can be very useful method for academic research as well.
  1. Find a use-inspired grand-challenge problem. (This requires creativity, domain expertise, and hard thinking.)
  2. Try to prove an impossibility result.
  3. If you prove the impossibility result, that is still nice and publishable.
  4. If you can't prove an impossibility result, figure out why, and try to turn that into a solution to an almost "impossibly difficult problem". Bingo!
"Once you eliminate the impossible, whatever remains, no matter how improbable, must be the truth."
-- Sir Arthur Conan Doyle channeling Sherlock Holmes

Thursday, June 2, 2016

TensorFlow: A system for large-scale machine learning

This paper has been uploaded to arxiv.org on May 27 2016, so it is the most recent description of Google TensorFlow. (Five months ago, I had commented on an earlier TensorFlow whitepaper, if you want to check that first.) Below I summarize the main points of this paper by using several sentences/paragraphs from the paper with some paraphrasing. I end the post with my wild speculations about TensorFlow. (This speculation thing is getting strangely addictive for me.)

TensorFlow is built leveraging Google's experience with their first generation distributed machine learning system, DistBelief. The core idea of this paper is that TensorFlow's dataflow representation subsumes existing work on parameter server systems (including DistBelief), and offers a uniform programming model that allows users to harness large-scale heterogeneous systems, both for production tasks and for experimenting with new approaches.

TensorFlow versus Parameter Server systems

DistBelief was based on the parameter server architecture, and satisfied most of Google's scalable machine learning requirements. However, the paper argues that this architecture lacked extensibility, because adding a new optimization algorithm, or experimenting with an unconventional model architecture would require users to modify the parameter server implementation. Not all the users are comfortable with making those changes due to the complexity of the high-performance parameter server implementation.  In contrast, TensorFlow provides a high-level uniform programming model that allows users to customize the code that runs in all parts of the system, and experiment with different optimization algorithms, consistency schemes, and parallelization strategies in userspace/unprivilege code.

TensorFlow is based on the dataflow architecture. Dataflow with mutable state enables TensorFlow to mimic the functionality of a parameter server, and even provide additional flexibility. Using TensorFlow, it becomes possible to execute arbitrary dataflow subgraphs on the machines that host the shared model parameters. We say more on this when we discuss the TensorFlow model and the structure of a typical training application below.

TensorFlow versus dataflow systems

The principal limitation of a batch dataflow systems (including Spark) is that they require the input data to be immutable and all of the subcomputations to be deterministic, so that the system can re-execute subcomputations when machines in the cluster fail.  This unfortunately makes updating a machine learning model a heavy operation. TensorFlow improves on this by supporting expressive control-flow and stateful constructs.

Naiad is designed for computing on sparse, discrete data, and does not support GPU acceleration. TensorFlow borrows aspects of timely dataflow iteration from Naiad in achieving dynamic control flow.

TensorFlow's programming model is close to Theano's dataflow representation, but Theano is for a single node and does not support distributed execution.

Tensorflow model

TensorFlow uses a unified dataflow graph to represent both the computation in an algorithm and the state on which the algorithm operates. Unlike traditional dataflow systems, in which graph vertices represent functional computation on immutable data, TensorFlow allows vertices to represent computations that own or update mutable state. By unifying the computation and state management in a single programming model, TensorFlow allows programmers to experiment with different parallelization schemes. For example, it is possible to offload computation onto the servers that hold the shared state to reduce the amount of network traffic.

In sum, TensorFlow innovates on these two aspects:

  • Individual vertices may have mutable state that can be shared between different executions of the graph.
  • The model supports multiple concurrent executions on overlapping subgraphs of the overall graph.


Figure 1 shows a typical training application, with multiple subgraphs that execute concurrently, and interact through shared variables and queues. Shared variables and queues are stateful operations that contain mutable state. (A Variable operation owns a mutable buffer that is used to store the shared parameters of a model as it is trained. A Variable has no inputs, and produces a reference handle.)

This Figure provides a concrete explanation of how TensorFlow works. The core training subgraph depends on a set of model parameters, and input batches from a queue. Many concurrent steps of the training subgraph update the model based on different input batches, to implement data-parallel training. To fill the input queue, concurrent preprocessing steps transform individual input records (e.g., decoding images and applying random distortions), and a separate I/O subgraph reads records from a distributed file system. A checkpointing subgraph runs periodically for fault tolerance.

The API for executing a graph allows the client to specify the subgraph that should be executed. A subgraph is specified declaratively: the client selects zero or more edges to feed input tensors into the dataflow, and one or more edges to fetch output tensors from the dataflow; the run-time then prunes the graph to contain the necessary set of operations. Each invocation of the API is called a step, and TensorFlow supports multiple concurrent steps on the same graph, where stateful operations enable coordination between the steps. TensorFlow is optimized for executing large subgraphs repeatedly with low latency. Once the graph for a step has been pruned, placed, and partitioned, its subgraphs are cached in their respective devices.

Distributed execution

TensorFlow's dataflow architecture simplifies distributed execution, because it makes communication between subcomputations explicit. Each operation resides on a particular device, such as a CPU or GPU in a particular task. A device is responsible for executing a kernel for each operation assigned to it. The TensorFlow runtime places operations on devices, subject to implicit or explicit device constraints in the graph. the user may specify partial device preferences such as “any device in a particular task”, or “a GPU in any Input task”, and the runtime will respect these constraints.

TensorFlow partitions the operations into per-device subgraphs. A per-device subgraph for device d contains all of the operations that were assigned to d, with additional Send and Recv operations that replace edges across device boundaries. Send transmits its single input to a specified device as soon as the tensor is available, using a rendezvous key to name the value. Recv has a single output, and blocks until the value for a specified rendezvous key is available locally, before producing that value. Send and Recv have specialized implementations for several device-type pairs. TensorFlow supports multiple protocols, including gRPC over TCP, and RDMA over Converged Ethernet.

TensorFlow is implemented as an extensible, cross-platform library. Figure 5 illustrates the system architecture: a thin C API separates user-level in various languages from the core library written in C++.

Current development on TensorFlow

On May 18th,  it was revealed that Google built the Tensor Processing Unit (TPU) specifically for machine learning. The paper mentions that TPUs achieve an order of magnitude improvement in performance-per-watt compared to alternative state-of-the-art technology.

The paper mentions ongoing work on automatic optimization to determine default policies for performance improvement that work well for most users. While power-users can get their way by taking advantage of TensorFlow's flexibility, this automatic optimization feature would make TensorFlow more user-friendly, and can help TensorFlow adopted more widely (which looks like what Google is pushing for). The paper also mentions that, on the system level, Google Brain team is actively developing algorithms for automatic placement, kernel fusion, memory management, and scheduling.

My wild speculations about TensorFlow 

Especially with the addition of mutable state and coordination via queues, TensorFlow is equipped for providing incremental on the fly machine learning. Machine learning applications built with TensorFlow can be long-running applications that keep making progress as new input arrive, and can adapt to new conditions/trends on the fly. Instead of one shot huge batch machine learning, such an incremental but continuous machine learning system has obvious advantages in today's fast paced environment. This is definitely good for Google's core search and information indexing business. I also speculate this is important for Android phones and self-driving cars.

Previously I had speculated that with the ease of partitioning of the dataflow graph and its heterogenous device support, TensorFlow can span over and bridge smartphone and cloud backend machine learning. I still standby that prediction.
TensorFlow enables cloud backend support for machine learning to the private/device-level machine learning going on in your smartphone. It doesn't make sense for a power-hungry entire TensorFlow program to run on your wimpy smartphone. Your smartphone will be running only certain TensorFlow nodes and operations, the rest of the TensorFlow graph will be running on the Google cloud backend. Such a setup is also great for preserving privacy of your phone while still enabling machine learned insights on your Android.
Since TensorFlow supports inference as well as training, it can use 100s of servers for fast training, and run trained models for inference in smartphones concurrently. Android voice assistant (or Google Now) is a good application for this. In any case, it is a good time to be working on smartphone machine learning.

This is a wilder speculation, but a long-running self-improving machine learning backend in the datacenter can also provide great support for self-driving cars. Every minute, new data and decisions from self-driving cars would flow from TensorFlow subgraphs running on the cars to the cloud backend TensorFlow program. Using this constant flux of data, the program can adopt to changing road conditions (snowy roads, poor visibility conditions) and new scenarios on the fly, and all self-driving cars would benefit from the new improvements to the models.

Though the paper mentions that reinforcement style learning is future work, for all we know Google might already have reinforcement learning implemented on TensorFlow. It also looks like the TensorFlow model is general enough to tackle some other distributed systems data processing applications, for example large-scale distributed monitoring at the datacenters. I wonder if there are already TensorFlow implementations for such distributed systems services.

In 2011, Steve Yegge ranted about the lack of platforms thinking in Google. It seems like Google is doing good in that department lately. TensorFlow constitutes an extensible and flexible distributed machine learning platform to leverage for several directions.

Sunday, May 15, 2016

Useful podcasts

I love listening to podcasts while commuting. I find that I learn a lot by listening to my selected list of podcasts, than just listening to the radio. I am blissfully ignorant about elections, US politics, and celebrity gossip.

Here are the podcasts I have been listening to. I will appreciate if you can let me know of your recommendations. Who said podcasting is dead?

(Here is one tip you may find useful. I listen to my podcasts at 1.5x speed. It is actually a more comfortable and natural way to listen to stuff than 1x. 1x is too slow, your brain starts losing the stream of conversation due to the slow pace, and starts wondering around. Give 1.5x a try.)

Dan Carlin's Hardcore History

Who knew history could be this interesting and captivating. Dan Carlin, a seasoned radio host and American political commentator, found his true calling in this podcasts series about history. The episodes are long, 4 hour episodes. But they are so exciting and captivating. I found myself depressed and scared while listening to World War 1 episodes. The Wrath of Khans episodes about the Genghis Khan were also very interesting.

The Tim Ferris Show

I don't like Tim Ferris's writing and his persona for the most part. But I am a big fan of his podcast. His host selection and his interviewing skills are excellent. The topics are almost always interesting. And I learn a lot from his podcasts.
For example, I learned about Dan Carlin's and Mike Rowe's podcasts from Tim's episodes. Just check this list, it is quite impressive. 

Some of my favorite episodes were:
Interview master: Cal Fussman and the Power of Listening
How Seth Godin Manages His Life -- Rules, Principles, and Obsessions
Luis von Ahn on Learning Languages, Building Companies, and Changing the World
The Scariest Navy SEAL Imaginable…And What He Taught Me
Scott Adams: The Man Behind Dilbert
Chris Sacca on Being Different and Making Billions

TEDTalks (audio)

I skip about half or 2/3rds of the talks, and listen to the ones that sound interesting. It is convenient to be able to listen to TED talks on your commute.

The Way I Heard It with Mike Rowe

Mike Rowe talks about 5 minutes about a captivating story. And, the story always has an amazing twist-ending, revelation at the end. Very interesting podcast.

OPTIMIZE with Brian Johnson

This podcast has 5 minute and 15 minute posts that review recent self-improvement books. It is nice to get the gist of the books summarized to you in a couple minutes to keep upto date with books.

Curious Minds: Innovation, Inspiration, Improvement

This podcast has 20 minute interviews with recent self-improvement books. The format of the interviews are a little dry. Topics are hit and miss.

This American Life

This American Life by Ira Glass. I don't think this requires any introduction.

Wednesday, April 13, 2016

Paper review. GraphLab: A new Framework for Parallel Machine Learning

This GraphLab paper is from 2010. It is written elegantly and it does a good job of explaining the GraphLab abstraction and foundations. A later VLDB 2012 paper presents how to extend this basic GraphLab abstraction to a distributed GraphLab implementation.

It seems like GraphLab has since took off big time. There are workshops and conferences about GraphLab. And GraphLab is now developed by the company Dato (formerly GraphLab inc.). Dato Inc. raised 6.75M$ from Madrona and New Enterprise Associates in A round, and 18.5M$ in B round from Vulcan Capital and Opus Capital, as well as Madrona and New Enterprise Associates.

Introduction

The motivation for GraphLab was to hit the sweetspot in development of machine learning solutions. "(Then in 2010) Existing high-level parallel abstractions like MapReduce are insufficiently expressive, while low-level tools like MPI and Pthreads leave machine learning (ML) experts repeatedly solving the same design challenges."

GraphLab aimed to express asynchronous iterative algorithms with sparse computational dependencies while ensuring data consistency and achieving good parallel performance. This paper provides an efficient multicore parallel (but not distributed) shared-memory implementation of GraphLab. (The C++ reference implementation of the GraphLab is at http://select.cs.cmu.edu/code)

Related work

MapReduce works well on embrassingly data parallel tasks, but is not efficient for graph processing and iterative algorithms. This line from the paper gave me a chuckle. "By coercing efficient sequential ML algorithms to satisfy the restrictions imposed by MapReduce, we often produce inefficient parallel algorithms that require many processors to be competitive with comparable sequential methods." (Frank's laptop also took on against GraphLab.)

DAG abstraction represents parallel computation as a directed acyclic graph with data flowing along edges between vertices that correspond to computation/function. Dryad, Storm, Heron fit here. DAG does not naturally express iterative algorithms.

Dataflow abstraction in Naiad is a related work. This being a 2010 paper there is no comparison to Naiad in the paper. This is what Naiad (2013) has to say about GraphLab. "GraphLab and PowerGraph offer a different asynchronous programming model for graph computations, based on a shared memory abstraction. These asynchronous systems are not designed to execute dataflow graphs so the notion of completeness of an epoch or iteration is less important, but the lack of completeness notifications makes it hard to compose asynchronous computations. Although GraphLab and PowerGraph provide a global synchronization mechanism that can be used to write a program that performs one computation after another, they do not achieve task- or pipeline-parallelism between stages of a computation. Naiad allows programs to introduce coordination only where it is required, to support hybrid asynchronous and synchronous computation."

Petuum is also a closely related work. Petuum uses BSP, with SSP relaxation and nonuniform convergence. It looks like  GraphLab allows more fine-granular scheduling and supports data-graph computations and dependency modeling better. On the other hand, Petuum was designed clean-slate to support machine learning algorithms better, and introduced model-parallelism concept. GraphLab, as you will see in the summary below is principally a graph processing framework, with good applicability for graph-based machine learning tasks. Petuum paper says "Petuum ML implementations can run faster than other platforms (e.g. Spark, GraphLab4), because Petuum can exploit model dependencies, uneven convergence and error tolerance". The paper provides performance comparison experiments with GraphLab.

GraphLab abstraction

The minimalism yet generality of the GraphLab framework remind me of LISP. "The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G." (Also the set scheduler that enables users to compose custom update schedules is clever and LISPy.)


Figure 3 illustrates the overall GraphLab framework. A GraphLab program is composed of the following parts:
1. A data graph which represents the data and computational dependencies.
2. Update functions which describe local computation
3. A Sync mechanism for aggregating global state
4. A data consistency model (i.e., Fully Consistent, Edge Consistent or Vertex Consistent), which determines the extent to which computation can overlap.
5. Scheduling primitives which express the order of computation and may depend dynamically on the data.

Data Model

The GraphLab data model consists of two parts: a directed data graph and a shared data table. The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G. To support globally shared state, GraphLab provides a shared data table (SDT) which is an associative map, T: [Key] --> Value, between keys and arbitrary blocks of data. Here, Key can be a vertex, edge, result of a sync computation as explained below, or a global variable, say model parameter.

User defined computation

Computation in GraphLab can be performed either through an update function which defines the local computation, or through the sync mechanism which defines global aggregation. The Update Function is analogous to the Map in MapReduce, and sync mechanism is analogous to the Reduce operation. A GraphLab program may consist of multiple update functions and it is up to the scheduling model to determine which update functions are applied to which vertices and in which parallel order.

Scheduling

The GraphLab update schedule describes the order in which update functions are applied to vertices and is represented by a parallel data-structure called the scheduler. The scheduler abstractly represents a dynamic list of tasks (vertex-function pairs) which are to be executed by the GraphLab engine. This is minimalist yet flexible and general model. Since writing a scheduler is hard, GraphLab provides a default BSP style scheduler or a round-robin scheduler.

Since many ML algorithms (e.g., Lasso, CoEM, Residual BP) require more control over the tasks that are created and the order in which they are executed, GraphLab also allows update functions to add and reorder tasks. For this, GraphLab provides 1) FIFO schedulers that only permit task creation but do not permit task reordering, and 2) prioritized schedules that permit task reordering at the cost of increased overhead.

Evaluation

The paper  demonstrates the expressiveness of the GraphLab framework by designing and implementing parallel versions of belief propagation, Gibbs sampling, Co-EM, Lasso and Compressed Sensing.

Links

GraphLab Wikipedia page

Paper Review. Petuum: A new platform for distributed machine learning on big data

PThreads

Friday, April 8, 2016

Book review: The War Of Art by Steven Pressfield

I read this book recently and liked it a lot. The book is written by Steven Pressfield. He is also the writer of "The Legend of Bagger Vance" and "Gates of Fire" (arguably the best book about Spartans, and is being used as recommended reading in Army academies). Pressfield definitely knows and respects his craft.

This book is a call for all people, and creative people and writers in particular, to wake up and realize their calling. The book says: "Most of us have two lives. The life we live, and the unlived life within us." Yeah... About that... I know "self-help" books, and books that use "new-agey" language rub many people the wrong way. I am a pragmatic about that. The way I see it, if I can learn some good paradigms, tips, strategy to become more productive, effective, I can look past some of the turn-offs.

The book is organized in 3 parts. The first part talks about resistance, the enemy of creating anything meaningful and worthwhile. This part gives an extended and spot on description and analysis of resistance. You have to know the enemy to have a chance to beat it.

The second part talks about turning professional to beat resistance. I liked this part best. This part has very positivist/pragmatist advice in contrast to the romantic/mysticist theme dominating Part 3, which is also somewhat evident in Part 1.

The third part talks about beyond resistance, and living the life in a higher realm. This part is mostly mysticist --done tastefully, I think. (What do you expect? Pressfield is the author of The Legend of Bagger Vance after all.) There are still several useful spot on observations. One is about territorial versus hierarchical orientation.

Below I am going to paste quotes from the book to give you a taste of it. I liked this book, and recommend it for anyone who wants to understand resistance and improve her craft.

Part 1: Resistance, Defining The Enemy

Any act that rejects immediate gratification in favor of long-term growth, health or integrity … will elicit Resistance.

Resistance has no strength of its own. Every ounce of juice it possesses comes from us. We feed it with power by our fear of it. Master that fear, and we conquer Resistance.

Procrastination is the most common manifestation of Resistance because it's the easiest to rationalize. We don't tell ourselves, "I'm never going to write my symphony." Instead we say, "I am going to write my symphony; I'm just going to start tomorrow."

There's a secret that real writers know that wannabe writers don't, and the secret is this: It's not the writing part that's hard. What's hard is sitting down to write. What keeps us from sitting down is Resistance.

Resistance is invisible, internal, insidious, fueled by fear, only opposes in one direction.

Like a magnetized needle floating on a surface of oil, Resistance will unfailingly point to true North–meaning that calling or action it most wants to stop us from doing. We can use this. We can use it as a compass. We can navigate by Resistance, letting it guide us to that calling or action that we must follow before all others. Rule of thumb: The more important a call or action is to our soul’s evolution, the more Resistance we will feel toward pursuing it.

Part 2: Turning Pro, Combating Resistance

The conventional interpretation is that the amateur pursues his calling out of love, while the pro does it for money. Not the way I see it. In my view, the amateur does not love the game enough. If he did, he would not pursue it as a sideline, distinct from his "real" vocation. The professional loves it so much he dedicates his life to it. He commits full-time. That's what I mean when I say turning pro. Resistance hates it when we turn pro.

Grandiose fantasies are a symptom of Resistance. They're the sign of an amateur. The professional has learned that success, like happiness, comes as a by-product of work. The professional concentrates on the work and allows rewards to come or not come, whatever they like.

Professional is patient, prepared, acts in face of fear, dedicated to mastering technique.

The professional dedicates himself to mastering technique not because he believes technique is a substitute for inspiration but because he wants to be in possession of the full arsenal of skills when inspiration does come. The professional is sly. He knows that by toiling beside the front door of technique, he leaves room for genius to enter by the back.

Part 3: Beyond resistance, the higher realm

We come into this world with a specific, personal destiny. We have a job to do, a calling to enact, a self to become. We are who we are from the cradle, and we're stuck with it. Our job in this lifetime is not to shape ourselves into some ideal we imagine we ought to be, but to find out who we already are and become it.

The most important thing about art is to work. Nothing else matters except sitting down every day and trying.

When we sit down day after day and keep grinding, something mysterious starts to happen. A process is set into motion by which, inevitably and infallibly, heaven comes to our aid. Unseen forces enlist in our cause; serendipity reinforces our purpose.

The territory provides sustenance. -- Runners know what a territory is. So do rock climbers and kayakers and yogis. Artists and entrepreneurs know what a territory is. The swimmer who towels off after finishing her laps feels a hell of a lot better than the tired, cranky person who dove into the pool 30 minutes earlier.

Territory can only be claimed by work. -- When Arnold Schwarzenegger hits the gym, he's on his own turf. But what made it his own are the hours and years of sweat he put in to claim it. The territory doesn't give, it gives back.

The artist must operate territorially. He must do his work for its own sake. To labor in the arts for any reason other than love is prostitution.

Of any activity you do, ask yourself: If I were the last person on earth, would I still do it? If you're all alone on the planet, a hierarchical orientation makes no sense. There's no one to impress. So, if you’d still pursue that activity, congratulations. You're doing it territorially.



The book ends with the following. I think this is a useful perspective/paradigm to cultivate:

If you were meant to cure cancer or write a symphony or crack cold fusion and you don't do it, you not only hurt yourself, even destroy yourself. You hurt your children. You hurt me. You hurt the planet.

Creative work is not a selfish act or a bid for attention on the part of the actor. It's a gift to the world and every being in it. Don't cheat us of your contribution. Give us what you’ve got.

Wednesday, April 6, 2016

Consensus in the Cloud: Paxos Systems Demystified

This our most recent paper, still under submission. It is available as a technical report here. We felt we had to write this paper because we have seen misuses/abuses of Paxos-based coordination services. Glad this is off our chests.

Here is the short pitch for the paper. I hope you like it and find it helpful.

Coordination and consensus play an important role in datacenter and cloud computing. Examples are leader election, group membership, cluster management, service discovery, resource/access management, and consistent replication of the master nodes in services.

Paxos protocols and systems provide a fault-tolerant solution to the distributed consensus problem and have attracted significant attention but they also  generated substantial confusion. Zab, Multi-Paxos, Raft are examples of Paxos protocols.  ZooKeeper, Chubby, etcd are examples of Paxos systems. Paxos systems and Paxos protocols reside in different planes, but even that doesn't prevent these two concepts to be confused. Paxos protocols are useful for low-level components for server replication, whereas Paxos systems have been often shoehorned to that task. The proper use case for Paxos systems is in highly-available/durable metadata management, under the conditions that all metadata fit in main-memory and are not subject to frequent changes.

In order to elucidate the correct use of distributed coordination systems, we compare and contrast popular Paxos protocols and Paxos systems and present advantages and disadvantages for each. Zab and Raft protocols differ from Paxos as they divide execution into epochs: Each epoch begins with a new election, goes into the broadcast phase and ends with a leader failure. Similarly, Paxos systems also have nuances. Chubby uses the MultiPaxos algorithm to achieve linearizability, while Zab lies at the heart of ZooKeeper and provides not only linearizability, but also FIFO order for client requests, enabling the developers to build complex coordination primitives with ease. Etcd system uses Raft as the consensus protocol, and adopts a stateless design and implements certain features very differently than ZooKeeper and Chubby.

We also categorize the coordination use-patterns in cloud into nine broad categories: server replication (SR), log replication (LR), synchronization service (SS), barrier orchestration (BO), service discovery (SD), group membership (GM), leader election (LE), metadata management (MM) and distributed queues (Q). Using these categories, we examine Google and Facebook infrastructures, as well as Apache top-level projects to investigate how they use Paxos protocols and systems.


Finally, we analyze tradeoffs in the distributed coordination domain and identify promising future directions for achieving more scalable distributed coordination systems.

See the paper for more information.

Related links

Paper summary: ZooKeeper: Wait-free coordination for Internet-scale systems

Monday, April 4, 2016

Paper Review. Petuum: A new platform for distributed machine learning on big data

First there was big data. Industry saw that big data was good. Industry made big data storage systems, NoSQL datastores, to store and access the big data. Industry saw they were good. Industry made big data processing systems, Map Reduce, Spark, etc., to analyze and extract information and insights (CRM, business logistics, etc.) from big data. Industry saw they were good and popular, so machine learning libraries are added to these big data processing systems to provide support for machine learning algorithms and techniques.

And here is where this paper makes a case for a redesign for machine learning systems. The big data processing systems produced by the industry are general analytic systems, and are not specifically designed for machine learning from the start. Those are data analytics frameworks first, with some machine learning libraries as add on to tackle machine learning tasks. This paper considers the problem of a clean slate system design for a big data machine learning system: What if we designed and built a big data framework specifically for  machine learning systems, what would it look like?

This paper is from CMU and appeared in KDD 2015.

Machine Learning (ML) objectives and features

Naturally the paper starts by first identifying the objective and features of ML systems. "ML defines an explicit objective function over data where the goal is to attain optimality of this function in the space defined by the model parameters and other intermediate variables."

Thus, the paper argues, ML algorithms have an iterative-convergent structure and  share these principles:

  • error tolerance: iterative-convergent algorithms are against errors and converge/heal towards the solution 
  • dynamic structure dependency: iterative-convergent algorithms often have changing correlation strengths between model parameters during the course of execution
  • nonuniform convergence: model parameters converge at different speeds


The paper proposes Petuum, a new distributed ML framework, to leverage these principles.

Stale Synchronous Parallel (SSP) consistency

Petuum leverages error-tolerance by introducing SSP (Stale Synchronous Parallel) consistency. SSP reduces network synchronization costs among workers, while maintaining bounded staleness convergence guarantees.

While the BSP (Bulk Synchronous Parallel) approach, used in Map Reduce, Giraph, etc., require the workers to synchronize state and exchange messages after each round, SSP cuts some slack. The SSP consistency model guarantees that if a worker reads from parameter server at iteration c, it is guaranteed to receive all updates from all workers computed at least at iteration c-s-1, where s is the staleness threshold. If there is a straggler more than s iterations behind, the reader will stop until the straggler catches up and sends its updates. How do you determine slack, s? That requires ML expertise and experimenting.

Big data, meet Big model!


Another significant idea in the Petuum paper is the dichotomy between big data and big model. Yes, big data is big, on the order of terabytes or petabytes. And the paper observes, we now also have a big model problem: ML programs for industrial-scale big data problems use big models that are 100s of billions of parameters (Figure 1 gives a nice summary). And this big model needs special attention as well.

Here I will use snippets from the paper to give the proper definition of these concepts.




Essentially, model-parallelism provides ability to invoke dynamic schedules that reduce model parameter dependencies across workers, leading to faster convergence. So Petuum uses model-parallelism to leverage nonuniform convergence and dynamic structure dependency to improve the efficiency/performance of ML tasks.

Petuum design

Petuum consists of three main components: Scheduler, parameter server, and workers.

The scheduler is responsible for enabling model parallelism. Scheduler sends subset of parameters to workers via parameter exchange channel. The parameter server stores and updates model paramerters., which can be accessed via a distributed shared memory API by both workers and scheduler. Each worker is responsible for performing operations defined by a user on a partitioned data set and a parameter subset specified by scheduler.

Here is the Petuum API:
schedule: specify the subset of model parameters to be updated in parallel.
push: specify how individual workers compute partial results on those parameters.
pull [optional]: specify how those partial results are aggregtaed to perform the full parameter update.

To illustrate the use of Petuum API, the paper presents the code for a data-parallel Distance Metric Learning (DML) algorithm and for a model parallel Lasso algorithm.


It looks like writing an efficient/optimized schedule would need significant expertise. This will often require running the algorithm on test dataset to see relations between model parameters, convergence speeds, etc.

Evaluation

The paper provides evaluation results on the performance of Petuum.

Petuum versus TensorFlow

How does Petuum compare with Google's TensorFlow? TensorFlow framework can be used to express a wide variety of algorithms, including training and inference algorithms for deep neural network models. And yet, TensorFlow has a completely different design approach than Petuum. Tensorflow uses dataflow graphs. Implementing a ML  algorithm/recipe on TensorFlow has a more distributed nature. An algorithm/recipe consists of many operations, and TensorFlow maps one or multiple operations in this algorithm to a node/worker. In Petuum the entire algorithm/recipe is mapped on a node/worker and efficiency is achieved via data-parallel and model-parallel partitioning.

There would be advantages/disadvantages for each approach, and it will be interesting to watch how this plays out in the coming years.

Related links

Short review of Google TensorFlow