Wednesday, May 16, 2018

Paper summary. Decoupling the Control Plane from Program Control Flow for Flexibility and Performance in Cloud Computing

This paper appeared in Eurosys 2018 and is authored by Hang Qu, Omid Mashayekhi, Chinmayee Shah, and Philip Levis from Stanford University.

I liked the paper a lot, it is well written and presented. And I am getting lazy, so I use a lot of text from the paper in my summary below.

Problem motivation 

In data processing frameworks, improved parallelism is the holy grail because it can get more data processed in less time.

However, parallelism has a nemesis called the control plane. While, control plane can have a wide array of meaning, in this paper control plane is defined as the systems and protocols for scheduling computations, load balancing, and recovering from failures.

A centralized control frame becomes a bottleneck after a point. The paper cites other papers and states that a typical cloud framework control plane that uses a fully centralized design can dispatch fewer than 10,000 tasks per second. Actually, that is not bad! However, with machine learning (ML) applications we are starting to push past that limit: we need to deploy on 1000s of machines large jobs that consist of many many short tasks (10 milliseconds) as part of iterations over data mini-batches.

To improve the control plane scalability, you can distribute the control plane across worker nodes (as in Nimbus and Drizzle). But that also runs into scaling problems due to the synchronization needed between workers and the controller. Existing control planes are tightly integrated with the control flow of the programs, and this requires workers to block on communication with the controller at certain points in the program, such as spawning new tasks or resolving data dependencies. When synchronization is involved, a distributed solution is not necessarily more scalable than a centralized one. But it is more complicated for sure: in one sense that is the computer analog of mythical man-month problem.

Another approach to improve scalability is to remove the control plane entirely. Some frameworks, such as Tensorflow, Naiad, and MPI frameworks, are scheduled once as a big job and they manage their own execution after that. Well, of course the problem doesn't go away, but plays inside the framework level: the scalability is limited by the applications logic written in these frameworks and the frameworks' support for concurrency control. Furthermore, these frameworks don't play nice with the datacenter/cloud computing environment as well. Rebalancing load or migrating tasks requires killing and restarting a computation by generating a new execution plan and installing it on every node.

This paper proposes a control plane design that breaks the existing tradeoff between scalability and  flexibility. It allows jobs to run extremely short tasks (<1ms) on thousands of cores and reschedule computations in milliseconds. And that has applications in particular for ML.

Making the Control Plane Asynchronous

To prevent synchronous operations, the proposed control plane cleanly divides responsibilities between controller and workers: a controller decides where to execute tasks and workers decide when to execute them.

The control plane's traffic is completely decoupled from the control flow of the program, so running a program faster does not increase load at the controller. When a job is stably running on a  fixed set of workers, the asynchronous control plane exchanges only occasional heartbeat messages to monitor worker status.

The architecture

Datasets are divided into many partitions, which determines the available degree of parallelism. Datasets are mutable and can be updated in place. This corresponds nicely to parameters in ML applications.

The controller uses an abstraction called a partition map to control where tasks execute. The partition map describes which worker each data object should reside on. _Because task recipes trigger tasks based on what data objects are locally present, controlling the placement of data objects allows the controller to implicitly decide where tasks execute._ The partition map is updated asynchronously to the workers, and when a worker receives an update to the map it asynchronously applies any necessary changes by transferring data.

On the worker side, an abstraction called task recipes describes triggers for when to run a task by specifying a pattern matched against the task's input data. Using recipes, every worker spawns and executes tasks by examining the state of its local data objects, obviating the need to interact with the controller.

Task recipes

A task recipe specifies (1) a function to run, (2) which datasets the function reads and/or writes, and (3) preconditions that must be met for the function to run.

There are three types of preconditions to trigger a recipe:

  1. Last input writer: For each partition it reads or writes, the recipe specifies which recipe should have last written it. This enforces local write-read dependencies, so that a recipe always sees the correct version of its inputs.
  2. Output readers: For each partition it writes, the recipe specifies which recipes should have read it since the last write. This ensures that a partition is not overwritten until tasks have finished reading the old data.
  3. Read messages: The recipe specifies how many messages a recipe should read before it is ready to run. Unlike the other two preconditions, which specify local dependencies between tasks that run on the same worker, messages specify remote dependencies between tasks that can run on different workers.

Since incorrect preconditions can lead to extremely hard to debug computational errors, they are generated automatically from a sequential user program. A single recipe describes potentially many iterations of the same data-parallel computation.

Writers and readers are specified by their stage number, a global counter that every worker maintains. The counter counts the stages in their program order, and increments after the application determines which branch to take or whether to continue another loop. (Using the counters I think it is possible to implement SSP method easily as well.) All workers follow an identical control flow, and so have a consistent mapping of stage numbers to recipes.

Exactly-once Execution and Asynchrony

Ensuring atomic migration requires a careful design of how preconditions are encoded as well as how data objects move between workers. No node in an asynchronous control plane has a global view of the execution state of a job, so workers manage atomic migration among themselves. To ensure that the task from a given stage executes exactly once and messages are delivered correctly, when workers transfer a data partition they include the access history metadata relevant to preconditions, the last writer and how many recipes have read it.

Partition Map

A partition map is a table that specifies, for each partition, which worker stores that partition in memory. A partition map indirectly describes how a job should be distributed across workers, and is used as the mechanism for the controller to signal workers how to reschedule job execution.

The controller does five things:
(1) Starts a job by installing the job's driver program and an initial partition map on workers.
(2) Periodically exchanges heartbeat messages with workers and collects workers' execution statistics, e.g. CPU utilization and CPU cycles spent computing on each partition.
(3) Uses the collected statistics to compute partition map updates during job execution.
(4) Pushes partition map updates to all workers.
(5) Periodically checkpoints jobs for failure recovery.

Maximizing data locality

To maximize data locality, the controller updates the partition map under the constraints that the input partitions to each possible task in a job are assigned to the same worker. The execution model of task recipes is intentionally designed to make the constraints explicit and achievable: if a stage reads or writes multiple datasets, a task in the stage only reads or writes the datasets' partitions that have the same index, so those partitions are constrained to be assigned to the same worker.


The group designed Canary, an asynchronous control plane, which can execute over 100,000 tasks/second on each core, and can scale linearly with the number of cores.

The driver constructs the task recipes. A driver program specifies a sequential program order, but the runtime may reorder tasks as long as the observed result is the same as the program order (just as how processors reorder instructions).

Canary periodically checkpoints all the partitions of a job. The controller monitors whether workers fail using periodic heartbeat messages. If any worker running a job is down, the controller cleans up the job's execution on all workers, and reruns the job from the last checkpoint.

Checkpoint-based failure recovery rewinds the execution on every worker back to the last checkpoint when a failure happens, while lineage-based failure recovery as in Spark only needs to recompute lost partitions. But the cost of lineage-based failure recovery in CPU-intensive jobs outweighs the benefit, because it requires every partition to be copied before modifying it.

Evaluation results

Current synchronous control planes such as Spark execute 8,000 tasks per second; distributed ones such as Nimbus and Drizzle can execute 250,000 tasks/second. Canary, a framework with an asynchronous control plane, can execute over 100,000 tasks/second on each core, and this scales linearly with the number of cores. Experimental results on 1,152 cores show it schedules 120 million tasks per second. Jobs using an asynchronous control plane can run up to an order of magnitude faster than on prior systems. At the same time, the ability to split computations into huge numbers of tiny tasks with introducing substantial overhead allows an asynchronous control plane to e ciently balance load at runtime, achieving a 2-3× speedup over highly optimized MPI codes.

Evaluations are done with  applications performing logistic regression, K-means clustering, and PageRank.

MAD Questions

1. Is it a good idea to make tasks/recipes dependent/linked to individual data objects? How do we know the data objects in advance? Why does the code need to refer to the objects? I think that model can work well if the data objects are parameters to be tuned int ML applications. We live in the age of the `big model'. I guess graph processing applications can also fit well to this programming model. I think this can also fit well with any(?) dataflow framework application. Is it possible to make all analytics applications fit to this model?

2. The Litz paper had similar ideas for doing finer-grain scheduling at the workers and obviating the need for synchronizing with the scheduler. Litz is a resource-elastic framework supporting high-performance execution of distributed ML optimizations. Litz remains general enough to accommodate most ML applications, but also provides an expressive programming model allowing the applications (1) to support stateful workers that can store the model parameters which are co-located with a partition of input data, and (2) to define custom task scheduling strategies satisfying fine-grain task dependency constraints and allowances. At runtime, Litz executes these strategies within the specified consistency requirements, while gracefully persisting and migrating application state.

3. Since it is desirable to have one tool for batch and serving, would it be possible to adopt Canary for serving? Could it be nimble enough?

4. Is it possible to apply techniques from the Blazes paper to improve how the driver constructs the task recipes?

No comments:

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...