Monday, July 31, 2017

A Comparison of Distributed Machine Learning Platforms

This paper surveys the design approaches used in distributed machine learning (ML) platforms and proposes future research directions. This is joint work with my students Kuo Zhang and Salem Alqahtani. We wrote this paper in Fall 2016, and I will be going to ICCCN'17 (Vancouver) to present this paper.

ML, and in particular Deep Learning (DL), has achieved transformative success in speech recognition, image recognition, and natural language processing, and recommendation/search engines recently. These technologies have very promising applications in self-driving cars, digital health systems, CRM, advertising, internet of things, etc. Of course, the money leads/drives the technological progress at an accelerated rate, and we have seen many ML platforms built recently.

Due to the huge dataset and model sizes involved in training, the ML platforms are often distributed ML platforms and employ 10s and 100s of workers in parallel to train the models. It is estimated that an overwhelming majority of the tasks in datacenters will be machine learning tasks in the near future.

My background is in distributed systems, so we decided to study these ML platforms from a distributed systems perspective and analyze the communication and control bottlenecks for these platforms. We also looked at fault-tolerance and ease-of-programming in these platforms.

We categorize the distributed ML platforms under 3 basic design approaches:
1. basic dataflow, 2. parameter-server model, and 3. advanced dataflow.

We talk about each approach in brief, using Apache Spark as an example of the basic dataflow approach, PMLS (Petuum) as an example of the parameter-server model, and TensorFlow and MXNet as examples of the advanced dataflow model. We provide a couple evaluation results comparing their performance. See the paper for more evaluation results. Unfortunately, we were unable to evaluate at scale as a small team from academia.

At the end of this post, I present concluding remarks and recommendation for future work for distributed ML platforms. Skip to the end, if you already have some experience with these distributed ML platforms.


In Spark, a computation is modeled as a directed acyclic graph (DAG), where each vertex denotes a Resilient Distributed Dataset(RDD) and each edge denotes an operation on RDD. RDDs are collection of objects divided in logical partitions that are stored and processed as in-memory, with shuffle/overflow to disk.

On a DAG, an edge E from vertex A to vertex B implies that RDD B is a result of performing operation E on RDD A. There are two kinds of operations: transformations and actions. A transformation (e.g., map, filter, join) performs an operation on a RDD and produces a new RDD.

The Spark user models the computation as a DAG which transforms & runs actions on RDDs. The DAG is compiled into stages. Each stage is executed as a series of tasks that run in parallel (one task for each partition). Narrow dependencies are good for efficient execution, whereas wide dependencies introduce bottlenecks since they disrupt pipelining and require communication intensive shuffle operations.

Distributed execution in Spark is performed by partitioning this DAG stages on machines. The figure shows the master-worker architecture clearly. The driver
contains two scheduler components, the DAG scheduler and the task scheduler, and tasks and coordinates the workers.

Spark was designed for general data processing, and not specifically for machine learning. However, using the MLlib for Spark, it is possible to do ML on Spark. In the basic setup, Spark stores the model parameters in the driver node, and the workers communicate with the driver to update the parameters after each iteration. For large scale deployments, the model parameters may not fit into the driver and would be maintained as an RDD. This introduces a lot of overhead because a new RDD will need to be created in each iteration to hold the updated model parameters. Updating the model involves shuffling data across machines/disks, this limits the scalability of Spark. This is where the basic dataflow model (the DAG) in Spark falls short. Spark does not support iterations needed in ML well.


PMLS was designed specifically for ML with a clean slate. It introduced the parameter-server (PS) abstraction for serving the iteration-intensive ML training process.

The PS (shown in the green boxes in the figure) is maintained as distributed in-memory key-value store. It is replicated & sharded: Each node serves as primary for a shard of the model (parameter space), and secondary/replica for other shards. Thus the PS scales well with respect to the number of nodes.

The PS nodes store & update model parameters, and respond to the requests from workers. The workers request up-to-date model parameters from their local PS copy and carry out computation over the partition of dataset assigned to them.

PMLS also adopts the Stale Synchronous Parallelism (SSP) model, which relaxes the Bulk Synchronous Parellelism (BSP) model where workers synchronize at the end of each iteration. SSP cuts some slack to the workers for synchronization, ensures the fastest worker cannot be *s* iteration ahead of the slowest worker. The relaxed consistency model is still OK for ML training due to noise tolerance of the process. I had covered this in an April 2016 blog post.


Google had a parameter-server model based distributed ML platform, called DistBelief. (Here is my review of the DistBelief paper.) From what I can tell, the major complaint about DistBelief was that it required messing with low-level code for writing ML applications. Google wanted any of its employees to be able to write ML code without requiring them to be well-versed in distributed execution ---this is the same reason why Google wrote the MapReduce framework for big data processing.

So TensorFlow is designed to enable that goal. TensorFlow adopts the dataflow paradigm, but the advanced version where the computation graph does not need to be a DAG but can include cycles and support mutable state. I think Naiad design might have some influence on TensorFlow design.

TensorFlow denotes computation with a directed graph of nodes and edges. The nodes represent computations, with mutable state. And the edges represent multidimensional data arrays (tensors) communicated between nodes. TensorFlow requires the user to statically declare this symbolic computation graph, and uses rewrite & partitioning of the graph to machines for distributed execution. (MXNet, and particularly DyNet, uses dynamic declaration of the graph, which improves on ease & flexibility of programming.)

The distributed ML training in TensorFlow  uses parameter-server approach as the figure shows. When you use the PS abstraction in TensorFlow, you use a parameter-server and data parallelism. TensorFlow says you can do more complicated stuff, but that requires writing custom code and marching into uncharted territory.

Some evaluation results

For our evaluations we used Amazon EC2 m4.xlarge instances. Each contains 4 vCPU powered by Intel Xeon E5-2676 v3 processor and 16GiB RAM. EBS Bandwidth is 750Mbps. We used two common machine learning tasks for evaluation: 2-class logistic regression and image classification using multi-layered neural networks. I am only providing couple graphs here, check our paper for more experiments. Our experiments had several limitations: we used small number of machines, and couldn't test to scale. We also limited to CPU computing, and didn't test with GPUs.

This figure shows the speed of platforms for logistic regression. Spark performs good here behind PMLS and MXNet.

This figure shows the speed of platforms for DNNs. Spark sees greater performance loss going to two layers NN compared to single layer logistic regression. This is due to more iterative computation needed. We kept the parameters at the driver in Spark because they could fit, things would have been much worse if we kept the parameters in an RDD and updated after every iteration.

This figure shows the CPU utilization of the platforms. Spark application seems to have significantly high CPU utilization, which comes mainly as serialization overhead. This problem has been pointed out before by earlier work. 

Concluding remarks and future directions

ML/DL applications are embarrassingly parallel, and not very interesting from concurrent algorithms perspective. It is safe to say the parameter-server approach won for training in distributed ML platforms.

As far as bottlenecks is concerned, network still remains as a bottleneck for distributed ML applications. Instead of work on more advanced general purpose dataflow platforms, it is more useful to provide better data/model staging; treat data/model as first class citizen.

However, there can be some surprises and subtleties. In Spark, the CPU overhead was becoming the bottleneck before the network limitations. The programming language used in Spark, i.e., Scala/JVMs, affected its performance significantly. Therefore there is especially a need for better tools for monitoring and/or performance-prediction of distributed ML platforms. Some tools addressing the problem for Spark data processing applications have been proposed recently, such as Ernest and CherryPick.

There are many open questions for distributed systems support for ML runtime, such as resource scheduling and runtime performance improvement. With runtime monitoring/profiling of the application, the next generation distributed ML platforms should provide informed runtime elastic provisioning/scheduling of the computation, memory, network resources for the tasks running atop.

Finally there are open questions for programming & software engineering support. What are suitable [distributed] programming abstractions for ML applications? Also more research needed for verification and validation (testing DNNs with particularly problematic input) of distributed ML applications.

Thursday, July 27, 2017

Paper summary: Zorua: A holistic approach to resource virtualization in GPU

This paper recently appeared in MICRO'16 and addresses the problem of ease of managing GPU as a computational resource.

GPU computing today struggles with the following problems:
  • Programming ease: The programmer needs to statically allocate GPU resources (registers, scratchpad, threads) to threads and this is hard and non-optimal as tuning is hard.  
  • Portability: An optimized specification on one GPU may be suboptimal (losing upto 70% performance) on another GPU.
  • Performance: Since the programmer allocates resources statically and fixed manner, the performance suffer and dynamic underutilization occur when the program resource utilization vary through execution.

To address the above problems, Zorua (named after the shapeshifting illusionist Pokemon) virtualizes the GPU resources (threads, registers, and scratchpad). Zorua gives the illusion of more GPU resources than physically available, and dynamically manages these resources behind the scenes to co-host multiple applications on the GPU, alleviating the dynamic underutilization problem alluded above.

To create this illusion, Zorua employs a hardware-software codesign that consists of 3 components: (1) the compiler annotates the program to specify the resource needs of each phase of the application; (2) a runtime system, referred to as the coordinator, uses the compiler annotations to dynamically manage the virtualization of the different on-chip resources; and (3) the hardware employs mapping tables to locate a virtual resource in the physically available resources or in the swap space in main memory.

Of course, this illusion will fail when you try to cram more than feasible to the GPU. But the nice thing about Zorua is it fails gracefully: The coordinator component of Zorua schedules threads only when the expected gain in thread-level parallelism outweighs the cost of transferring oversubscribed resources from the swap space in memory.

One thing that I wondered was why Zorua needed the compiler annotation, and why the runtime alone was not sufficient. I think the compiler annotation helps buy us the graceful degradation property. GPU computing is not very nimble; it has a lot of staging overhead. The annotations give a heads up to the coordinator for planning the scheduling in an overhead-aware manner.

The paper does not mention any application of Zorua for machine learning applications. I think Zorua would be useful for making DNN serving/inference applications colocate on the same GPU and preventing the server-sprawl problem, as it alleviates the dynamic underutilization problem.

I wonder if Zorua can provide benefits for machine learning training. Since machine learning training is done in batch, it would utilize GPU pretty consistently, stalling briefly in between rounds/iterations. However, by running two iterations in off-step manner as in Stale-Synchronous Parallelism, it may be possible to get benefit from Zorua GPU virtualization.

A related work for using GPUs efficiently for Deep Learning is the Poseidon work. Poseidon optimizes the pipelining of GPU computing at a very fine granularity, at the sub DNN layer, to eliminate the stalling/idle-waiting of the GPU.

Wednesday, July 5, 2017

Paper Summary: Two decades of recommender systems at

This is a short article that appeared as a retrospective piece for the 2003 "Item-to-Item Collaborative filtering" paper as it was awarded a test-of-time award. This article is by Brent Smith and Greg Linden. 

I am not a machine-learning/data-mining guy, so initially I was worried I wouldn't understand or enjoy the article. But this was a very fun article to read, so I am writing a summary.

The item-based collaborative filtering is an elegant algorithm that changed the landscape of collaborative filtering which was user-based till then. User-based means "first search across other users to find people with similar interests (such as similar purchase patterns), then look at what items those similar users found that you haven't found yet". Item-based is based on the idea that "people who buy one item are unusually likely to buy the other." So, for every item i1, we want every item i2 that was purchased with unusually high frequency by people who bought i1.

The beauty of the approach is most of the computation is done offline. Once the related items table is built, we can generate recommendations quickly as a series of lookups. Moreover since the number of items sold is less than the users, this scales to better user numbers.

This was implemented for for recommending related products (mostly books at that time). Since 2003, item-based collaborative filtering has been adopted by YouTube and Netflix, among others.

Defining related items

This section was tricky and fun. Statistics is not a very intuitive area. At least for me. While reading this section I saw proposals to fix things, and thought they would work, and I was wrong. Twice.

To define related, we should define what it means for Y to be unusually-likely to be bought by X buyers. And for figuring this out, we should first figure out the reverse, what is the expected ratio that X buyers would buy Y if the two items were unrelated.

The straightforward way to estimate the number of customers, Nxy, who have bought both X and Y would be to assume X buyers had the same probability, P(Y) = |Y_buyers|/|all_buyers|, of buying Y as the general population and use |X_buyers| * P(Y) as the estimate, Exy, of the expected number of customers who bought both X and Y. In fact, the original 2003 algorithm had used this ratio.

But this ratio is misleading, because  for almost any two items X and Y, customers who bought X will be much more likely to buy Y than the general population. "Heavy buyers" are to blame for this situation. We have a biased sample. For any item X, customers who bought X (this set has many heavy buyers in it by definition) will be likely to have bought Y more than the general population.

Figure 1 shows how to account for this effect.

Now, knowing Exy, we can use it to evaluate whether Nxy, the observed number of customers who bought both X and Y, is higher or lower than randomly would be expected. For example, Nxy-Exy gives an estimate of the number of non-random cooccurrences, and [Nxy-Exy]/Exy gives the percent difference from the expected random co-occurrence.

In another surprise, neither of those work quite well. The first will be biased towards popular Ys, and the second makes it to easy for low-selling items to have high scores. The chi-square score, $[Nxy−Exy]/\sqrt{Exy}$ strikes the balance.


The article talks about tons of extensions possible. Using the feedback data about user clicks on recommendations, it is possible to further tune the recommender. One should also take into account time of purchases, causality of purchases, compatibility of purchases. One should also account for aging the history and aging the recommendation as the user ages.

Worth noting was the observation that some items have more weight. They found that a single book purchase can say a lot about a customer's interests than an arbitrary product, letting them recommend dozens of highly relevant items.

For the future, the article envisions intelligent interactive services where shopping is as easy as a conversation, and the recommender system knows you as well as your spouse or a close friend.

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...