Monday, September 25, 2017

Paper Summary. Proteus: agile ML elasticity through tiered reliability in dynamic resource markets

This paper proposes an elastic ML system, Proteus, that can add/remove transient workers on the fly for exploiting the transient availability of cheap but revocable resources in order to reduce costs and latency of computation. The paper appeared in Eurosys'17 and is authored by Aaron Harlap, Alexey Tumanov, Andrew Chung, Gregory R. Ganger, and Phillip B. Gibbons.

Proteus has two components: AgileML and BidBrain. AgileML extends the parameter-server ML architecture to run on a dynamic mix of stable and transient machines, taking advantage of opportunistic availability of cheap but preemptible AWS Spot instances. BidBrain is the resource allocation component that decides when to acquire and drop transient resources by monitoring current market prices and bidding on new resources when their addition would increase work-per-dollar.

Before delving into AgileML and BidBrain, let's first review the AWS Spot model.

See Spot run

AWS provides always available compute instances, called "on-demand" instances: you get them when you like and keep them as much as you like provided that you pay their fixed hourly rate.

AWS also offers transient compute-instances via the AWS Spot market. You specify a bid price, and if the current market price is under your bid, you get the instance. You only pay the market price, and not your bid-price. In other words, your bid-price is an upperbound on how much you are comfortable for paying hourly. And if the AWS spot market price for the instance goes above your upperbound rate, AWS pulls the instance from you with only a 2-minute advance warning. Even in this case, the silverlining is that the last incomplete hour of computing is not charged for you, so you get some free computing.

As seen in Figure 3, you can save a lot of money if your computing job can exploit AWS Spot instances. (It is peculiar how the peak prices are sometimes up to 10 times higher than the fixed on-demand instances. This is speculated to prevent a high bid that secures long running instances at AWS Spot at a rate lower than EC2.)

Jobs that are especially suitable for AWS Spot's transient/preemptible computing style are embarrassingly parallel data processing tasks, where pieces are not related and where there is no need to maintain long-lived state. For example, for "shallow computing", such as thumbnail generation, there is no harm done with an instance eviction, as there is no need for continuity across the computation. The question the paper investigates is how to make the AWS Spot model work for "deeper computing", such as ML jobs.

While the paper considers this question for the AWS Spot market, the motivation also applies to enterprise computing in the datacenter. As disclosed in the Google Borg paper, Google distinguishes and prioritizes its production services over  analytic/batch services. If the production services need more resources, they will be given resources to the extent of preempting them from analytic/batch jobs if need be. On the other hand, when there is an excess of resources, analytic/batch jobs can enjoy them opportunistically.

Stages of AgileML

AgileML has 3 modes/stages as shown in Figure 4. To provide a shorter and more cost-effective computation, AgileML dynamically changes modes based on the availability of cheap transient instances. As the transient to on-demand ratio increases from 1:1 to beyond 15:1, AgileML shifts up from mode 1 up to mode 3. As the ratio decreases, AgileML shifts down from mode 3 down to mode 1.

  • Stage 1: Parameter Servers Only on Reliable Machines. Stage 1 spreads the parameter-server across reliable machines only, using transient nodes only for stateless workers. This works for most ML applications including K-means, DNN, Logistic Regression, Sparse Coding, as the workers are stateless while the parameter-servers contain the current solution state. 
  • Stage 2: ActivePSs on Transient Machines and BackupPSs on Reliable Machines. For transient to reliable node ratio greater than 1:1, AgileML switches to stage 2. Stage 2 uses a primary-backup model for parameter servers, using transient nodes for an active server (ActivePS) and reliable nodes for the hot standby (BackupPS). This relieves the heavy network load at the few reliable resources by spreading it across the many transient resources. The model parameters are sharded across the set of ActivePS instances. Workers send all updates and reads to the ActivePSs, which push updates in bulk to the BackupPSs. The solution state affected by transient node failures or evictions is recovered from BackupPSs. (For backing up ActivePS to BackupPS, it may be possible to explore a threshold-based update mechanism as outlined in the Gaia paper.)
  • Stage 3: No Workers on Reliable Machines. Workers colocated with BackupPSs on reliable machines were found to cause straggler effects at transient-to-reliable ratios beyond 15:1. Stage 3 removes these workers, and acts like a sub-case of Stage 2.

Handling elasticity

The elasticity controller component is responsible for changing modes based on the transient-to-reliable ratio and the network bandwidth. It tracks which workers are participating in the computation, assigns a subset of input data to each worker, and starts new ActivePSs.

For stage 2 and stage 3, half of the transient instances are recruited as ActivePSs, as that performed best in the evaluations. This one-half ratio is likely to be specific to using transient instances, as with reliable instances the more PSs the merrier it is.

During start-up, AgileML divides the parameter state into N partitions, where N is the maximum number of ActivePSs that can exist at any one point.  By using partitions in this way, AgileML avoids the need to re-shard the parameter state when adding or removing servers, instead re-assigning partitions as needed.

As the ActivePS instances increase and decrease, the elasticity controller re-assigns the parameter-server shards across the ActivePS instances appropriately. If all the ActivePSs are evicted, AgileML transfers to Stage 1. It seems like using a level of indirection was sufficient to get this working.


BidBrain keeps track of historical market prices for transient instances and makes allocation decisions to minimize cost-per-work. An allocation is defined as a set of instances of the same type acquired at the same time and price. Before the end of an allocation's billing hour, BidBrain compares the cost-per-work ratios to decide whether the allocation is renewed or terminated.


The experiments were performed with 3 ML applications.

  • Matrix Factorization (MF) is a technique (a.k.a. collaborative filtering) commonly used in recommendation systems, such as recommending movies to users on Netflix. The goal is to discover latent interactions between the two entities (e.g., users and movies). Given a partially filled matrix X (e.g., a matrix where entry (i, j) is user i’s rating of movie j), MF factorizes X into factor matrices L and R such that their product approximates X.
  • Multinomial Logistic Regression (MLR) is a popular model for multi-way classification, often used in the last layer of deep learning models for image classification or text classification. The MLR experiments use the ImageNet dataset with LLC features, containing 64k observations with a feature dimension of 21,504 and 1000 classes.
  • Latent Dirichlet Allocation (LDA) is an unsupervised method for discovering hidden semantic structures (topics) in an unstructured collection of documents, each consisting of a bag (multi-set) of words. The evaluated LDA solver implements collapsed Gibbs sampling.

The baseline runs all instances on Spot market machines and uses checkpointing to recover progress if evicted. The experiments show about 17% overhead for MF due to checkpointing. Figure 1 illustrates the cost and time benefits of Proteus over the MLR application. Compared to all on-demand, the baseline improves on cost significantly as expected but increases the runtime by 25%. Proteus improves on cost and also manages to achieve reduced runtime.

On average 32% of Proteus's computing is free computing. But aggressively chasing free computing by bidding very close to market price results in high overhead: 4x increase in runtime and higher costs due to frequent evictions.

Friday, September 22, 2017

Paper summary. Distributed Deep Neural Networks over the Cloud, the Edge, and End Devices

This paper is by Surat Teerapittayanon, Bradley McDanel, and H.T. Kung at Harvard University and appeared in ICDCS'17. 

The paper is about partitioning the DNN for inference between the edge and the cloud. There has been other work on edge-partitioning of DNNs, most recently the Neurosurgeon paper. The goal there was to figure out the most energy-efficient/fastest-response-time partitioning of a DNN model for inference between the edge and cloud device. 

This paper adds a very nice twist to the problem. It adds an exit/output layer at the edge, so that if there is high-confidence in classification output the DNN replies early with the result, without going all the way to the cloud and processing the entire DNN for getting a result. In other words, samples can be classified and exited locally at the edge when the system is confident and offloaded to the cloud when additional processing is required.

This early exit at the edge is achieved by jointly training a single DNN with an edge exit point inserted. During the training, the loss is calculated by taking into account both the edge exit point and the ultimate exit/output point at the cloud end. The joint training does not need to be done via device/cloud collaboration. It can be done centrally or at a datacenter.

DDNN architecture

The paper classifies the serving/inference hierarchy as local, edge, cloud, but I think local versus edge distinction is somewhat superfluous for now. So in my summary, I am only using two layers: edge versus cloud.

Another thing this paper does differently than existing edge/cloud partitioning work is its support for horizontal model partitioning across devices. Horizontal model partitioning for inference is useful when each device might be a sensor capturing only one sensing modality of the same phenomena.

DDNN uses binary neural networks to reduce the memory cost of NN layers to run on resource constrained end devices. A shortcoming in the paper is that the NN layers that end up staying in the cloud is also binary neural network. That is unnecessary, and it may harm precision.

DDNN training

At training time, the loss from each exit is combined during backpropagation so that the entire network can be jointly trained, and each exit point achieves good accuracy relative to its depth. The idea is to optimize the lower parts of the DNN to create a sufficiently good feature representations to support both samples exited locally and those processed further in the cloud.

The training is done by forming a joint optimization problem to minimize a weighted sum of the loss functions of each exit. Equal weights for exits are used for the experimental results of this paper.

They use a normalized entropy threshold as the confidence criteria that determines whether to classify (exit) a sample at a particular exit point. The normalized entropy close to 0 means that the DDNN is confident about the prediction of the sample. At each exit point, the normalized entropy is computed and compared against a threshold T in order to determine if the sample should exit at that point.

The exits are determined/inserted manually before training. Future research should look into automating and optimizing the insertion of exit points.

DDNN evaluation

The evaluation is done using a dataset of 6 cameras capturing the same object (person, car, bus) from different perspectives. The task is to classify to these three categories: person, car, and bus. This task may be overly simple. Moreover, the evaluation of the paper is weak. The multicamera dataset has only 680 training samples and 171 testing samples.

Tuesday, September 19, 2017

Paper summary. Gaia: Geo-Distributed Machine Learning Approaching LAN Speeds

This paper appeared in NSDI'17 and is authored by Kevin Hsieh, Aaron Harlap, Nandita Vijaykumar, Dimitris Konomis, Gregory R. Ganger,  Phillip B. Gibbons, and Onur Mutlu. 


This paper proposes a framework to distribute an ML system across multiple datacenters, and train models at the same datacenter where the data is generated. This is useful because it avoids the need to move big data over wide-area networks (WANs), which can be slow (WAN bandwidth is about 15x less than LAN bandwidth), costly (AWS does not charge for inside datacenter communication but charges for WAN communication), and also prone to privacy or ownership concerns.

Google's Federated Learning also considered similar motivations, and set out to reduce WAN communication. It worked as follows: 1) smartphones are sent the model by the master/datacenter parameter-server,  2) smartphones compute an updated model based on their local data over some number of iterations, 3) the updated models are sent from the smartphones to the datacenter parameter-server, 4) the datacenter parameter-server aggregates these models (by averaging) to construct the new global model, and 5) Repeat.

The Gaia paper does not cite Federated Learning paper, because they are likely submitted around the same time. There are many parallels between Gaia's approach and that of Federated Learning. Both are based on the parameter-server model, and both prescribe updating the model parameters in a relaxed/stale/approximate synchronous parallel fashion: several iterations are run in-situ before updating the "master" parameter-server. The difference is for Federated Learning there is a "master" parameter-server in the datacenter, whereas Gaia takes a peer-to-peer approach where each datacenter has a parameter-server, and updating the "master" datacenter means synchronizing the parameter-servers across the datacenters.

Approximate Synchronous Parallelism (ASP) idea

Gaia's Approximate Synchronous Parallelism (ASP) idea tries to eliminate insignificant communication between data centers while still guaranteeing the correctness of ML algorithms. ASP is motivated by the observation that the vast majority of updates to the global ML model parameters from each worker are insignificant, e.g., more than 95% of the updates may produce less than a 1% change to the parameter value. With ASP, these insignificant updates to the same parameter within a data center are aggregated (and thus not communicated to other data centers) until the aggregated updates are significant enough.

ASP builds heavily on the Stale Synchronous Parallelism (SSP) idea for parameter-server ML systems. While SSP bounds how stale (i.e., old) a parameter can be, ASP bounds how inaccurate a parameter can be, in comparison to the most up-to-date value.

ASP allows the ML programmer to specify the function and the threshold to determine the significance of updates for each ML algorithm. The significance threshold has 2 parts: a hard and a soft threshold. The purpose of the hard threshold is to guarantee ML algorithm convergence, while that of soft threshold is to use underutilized WAN bandwidth to speed up convergence. In other words, the soft threshold provides an opportunistic synchronization threshold.


The Gaia architecture is simple: It prescribes adding a layer of indirection to parameter-server model to account for multiple datacenter deployments.

Figure 4 shows the overview of Gaia. In Gaia, each data center has some worker machines and parameter servers. Each worker machine works on a shard of the input data stored in its data center to achieve data parallelism. The parameter servers in each data center collectively maintain a version of the global model copy, and each parameter server handles a shard of this global model copy. A worker machine only READs and UPDATEs the global model copy in its data center. To reduce the communication overhead over WANs, ASP is used between parameter-servers across different data centers. Below are the 3 components of ASP.

The significance filter. ASP takes 2 inputs from user: (1) a significance function and (2) an initial significance threshold. A parameter server aggregates updates from the local worker machines and shares the aggregated updates with other datacenters when the aggregate becomes significant. To facilitate convergence to the optimal point, ASP automatically reduces the significance threshold over time: if the original threshold is v, then the threshold at iteration t of the ML algorithm is $\frac{v}{\sqrt{t}}$.

ASP selective barrier. When a parameter-server receives the significant updates at a rate that is higher than the WAN bandwidth can support, instead of sending updates (which will take a long time), it first sends a short control message to other datacenters. The receiver of this ASP selective barrier message blocks its local workers from reading the specified parameters until it receives the significant updates from the sender of the barrier.

Mirror clock. This provides a final safety net implementing SSP across datacenters. When each parameter server receives all the updates from its local worker machines at the end of a clock (e.g., an iteration), it reports its clock to the servers that are in charge of the same parameters in the other data centers. When a server detects its clock is ahead of the slowest server, it blocks until the slowest mirror server catches up.


The paper evaluates Gaia with 3 popular ML applications. Matrix Factorization (MF) is a technique commonly used in recommender systems. Topic Modeling (TM) is an unsupervised method for discovering hidden semantic structures (topics) in an unstructured collection of documents, each consisting of a bag (multi-set) of words. Image Classification (IC) is a task to classify images into categories, and uses deep learning and convolutional neural networks (CNNs). All applications use SGD-based optimization.

The experiments, running across 11 Amazon EC2 global regions and on a cluster that emulates EC2 WAN bandwidth, compare Gaia against the Baseline that uses BSP (Bulk Synchronous Parallelism) across all datacenters and inside a LAN.


Is this general enough? The introduction says this should apply for SGD based ML algorithms. But are there hidden/implicit assumptions?

What are some examples of advanced significance functions? ML users can define advanced significance functions to be used with Gaia, but this is not explored/explained much in the paper. This may be a hard thing to do even for advanced users.

Even though it is easier to improve bandwidth than latency, the paper focuses on the challenge imposed by the limited WAN bandwidth rather than the WAN latency.  While the end metric for evaluation is completion time of training, the paper does not investigate the effect of network latency. How would the evaluations look if the improvements are investigated in correlation to latency rather than throughput limitations? (I guess we can have a rough idea on this, if we knew how much the barrier control message was used.)

Saturday, September 16, 2017

Paper summary. OpenCL Caffe: Accelerating and enabling a cross platform machine learning framework

This 2016 paper presents an OpenCL branch/port of the deep learning framework Caffe. More specifically, this branch replaces the CUDA-based backend of Caffe to an open standard OpenCL backend. The software was first located at, then  graduated to

Once we develop a DNN model, we ideally like to be able to deploy it for different applications across multiple platforms (servers, NVDIA GPUs, AMD GPUs, ARM GPUs, or even over smartphones and tablets) with minimum developing efforts. Unfortunately, most of the deep learning frameworks (including Caffe) are integrated with CUDA libraries for running on NVIDIA GPUs, and that limits portability across multiple platforms.

OpenCL helps for portability of heterogenous computing across platforms since it is supported by a variety of commercial chip manufacturers: Altera, AMD, Apple, ARM Holdings, Creative Technology, IBM, Imagination Technologies, Intel, Nvidia, Qualcomm, Samsung, Vivante, Xilinx, ZiiLABS and etc. In order to enable compatibility between different platforms, OpenCL program detects the specific devices and compiles at runtime.

OpenCL was originally developed by Apple, Inc and later submitted to Khronos Group. It is also supported by many operating systems including Android, FreeBSD, Linux, macOS, Windows.

OpenCL Backend Porting and Optimization

Caffe framework is originally written in C++ and CUDA. The CUDA layer of Caffe handles optimization of hardware resource allocation and utilization, e.g. CPU-GPU task assignment, memory management, and data transfer. Since CUDA and OpenCL are different in hardware device abstraction, memory buffer management, synchronization, data transfers, the OpenCL backbend porting is not a straightforward process.

The paper breaks the OpenCL porting process into two phases. Phase 1 achieves a layerwise porting of 3 layers, namely C++ machine learning interfaces, OpenCL wrappers, and GPU kernels. Layerwise porting means the layers are ported one by one and unit tested by using the originals of the other layers to guarantee correctness and convergence of the DNN algorithm.

After all layers are ported to OpenCL in Phase 1, the Phase 2 focuses on performance optimization. Profiling the OpenCL port in Phase 1 (via the AMD profiling tool, CodeXL, assisted with OpenCL event and printf) demonstrates some big bottlenecks. OpenCL's online compilation frequently calls clBuildProgram to create each GPU kernel: for 100 iterations of Cifar training, there were 63 clBuildProgram calls that took about 68% of the time. Another bottleneck was that the convolutional layers take up most of the computation time. BLAS's performance suffered from irregular tall and skinny matrix sizes from different layers.

To handle these, the paper proposes three key optimization techniques including kernel caching to avoid OpenCL online compilation overheads, a batched manner data layout scheme to boost data parallelism, and multiple command queues to boost task parallelism. The optimization techniques effectively map the DNN problem size into existing OpenCL math libraries, and improve hardware resources utilization and boost performance by 4.5x.


The evaluation uses the AlexNet DNN model for ImageNet. The evaluation compares the performance of Caffe with CUDA (including both cuBLAS and cuDNN v2) vs OpenCL (including both original clBLAS and batched manner optimization) on NVIDIA TitanX and AMD R9 Fury, with Alexnet model with mini-batch size 100. As shown in Figure 3, OpenCL Caffe with optimizations over clBLAS matches performance of cuBLAS Caffe.

Compared to the highly optimized machine learning cuDNN library, OpenCL Caffe still has a performance gap of 2x as it lacks those optimizations. The authors argue given the current performance, the OpenCL caffe is still competitive in terms of performance per dollar, considering the market price difference of AMD R9 Fury (about 560 dollars) and the NVIDIA TitanX (about 1000 dollars).

Cross Platform Capability Analysis

A natural question that occurs is, would their OpenCL port of Caffe that is tested on AMD GPUs would automatically work with ARM MALI GPUs as well? That would be a good test of portability of OpenCL port of Caffe. This has not been answered in the paper.

However, the authors caution about minor problems in compatibility. "There are some differences in specific manufacturers' extension and keywords. For example, caffe uses a lot of template in GPU kernels to support different floating point precision. But it turns out the template keywords for different manufactures are different, which adds more difficulty for the same code to run on different platform without modifications."

OpenCL support of deep learning frameworks is still not great, but hopefully it is getting better everyday as this paper shows.

The slides presentation for the paper is also available here.

Tuesday, September 12, 2017

Retroscope: Retrospective Monitoring of Distributed Systems (part 2)

This post, part 2, focuses on monitoring distributed systems using Retroscope. This is a joint post with Aleksey Charapko. If you are unfamiliar with hybrid logical clocks and distributed snapshots, give part 1 a read first.

Monitoring and debugging distributed applications is a grueling task. When you need to debug a distributed application, you will often be required to carefully examine the logs from different components of the application and try to figure out how these components interact with each other. Our monitoring solution, Retroscope, can help you with aligning/sorting these logs and searching/focusing on the interesting parts. In particular, Retroscope captures a progression of globally consistent distributed states of a system and allows you to examine these states and search for global predicates.

Let’s say you are working on debugging ZooKeeper, a popular coordination service. Using Retroscope you can easily add instrumentation to the ZooKeeper nodes to log and stream events to the Retroscope Storage and Compute module. After that, you can use RQL, Retroscope's simple SQL-like query language, to process your queries on those logs to detect the predicates/conditions that hold on the globally consistent cuts sorted in Retroscope's compute module.

Let's say you want to examine the staleness of the data in the ZooKeeper cluster (since ZooKeeper allows local reads from a single node, a client can read a stale value that has already been updated). By adding instrumentation to track the versions of znodes (the objects that keep data) at each ZooKeeper node, you can monitor how a particular znode changes across the entire cluster in a progression of globally consistent cuts with a very short query: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of a znode r1.

The above example still requires you to sift through all the changes in znode r1 across a progression of cuts to see whether there are stale values of r1 in some of the cuts. Instead, you can modify the query to have it return only globally consistent cuts that contain stale versions of the znode: "SELECT vr1 FROM setd WHEN EXISTS v1, v2 IN vr1 : (v1 – v2 >= 2)". RQL treats different versions of the variable vr1 that exist on various nodes as a set, allowing you to write the queries exploring such sets. With this expression, Retroscope computes the difference between znode r1’s versions across any two nodes for all possible combinations of nodes, and if the difference is 2 or more, Retroscope emits the cut. This query will give you a precise knowledge about global states in which znode differs by at least 2 versions between nodes in the system.

Let's say you got curious about the cause for the staleness in some of the ZooKeeper nodes. By including more information to be returned in each cut, you can test various hypotheses about the causes of staleness.  For instance, if you suspect that some ZooKeeper nodes to be disproportionally loaded by clients, you can test it with a query that returns a number of active client connections: "SELECT vr1, client_connections FROM setd WHEN EXISTS v1, v2 IN vr1 : (v1 – v2 >= 2)".  And you can further refine your query to filter staleness cases to when imbalance in client connections is high: "SELECT vr1, client_connections FROM setd WHEN EXISTS v1, v2 IN vr1 : (v1 – v2 >= 2) AND EXISTS c1, c2 IN client_connections: (c1 – c2 > 50)".


OK, let's look at how Retroscope works under the hood.

To keep the overhead as low as possible for the system being monitored, we augment the target system nodes with only a lightweight RetroLog server module to extract/forward logs to a separate Retroscope storage/compute cluster for monitoring. RetroLogs employ Apache Ignite’s streaming capabilities to deliver the logs to the Retroscope storage/compute cluster. RetroLogs also provide Hybrid Logical Clock (HLC) capabilities to the target system being monitored. HLC is an essential component of Retroscope, as it allows the system to identify consistent cuts. The logging part is similar to how regular logging tools work, except the values being logged are not simply messages, but key-value pairs with the key being the variable being monitored and the value being, well, the current value of the variable.

The architecture figure above shows the insides of the Retroscope storage/compute cluster implemented using Apache Ignite. As the RetroLog server stream logs to Ignite cluster for aggregation and storage, the Ignite streaming breaks messages into blocks based on their HLC time. For instance, all messages between timestamps 10 and 20 will be in the same block. The Ignite Data Grid then stores the blocks until they are needed for computing the past consistent cuts.

The RQLServer acts as the query parser and task scheduler for our monitoring tool. It breaks down the duration of state-changes over which a query needs to be evaluated into non-overlapping time-shards and assigns these time-shards to worker nodes for evaluation. As workers process through their assignments, they emit the results back to the RQLServer which then presents the complete output to you.

Let's now zoom in to the Retroscope workers. These workers are designed to search for predicates through a sequence of global distributed states. The search always starts at some known state and progresses forward one state-change at a time. (Each time-shard has its starting state computed before being assigned to the workers. This process is done only once, as time-shards are reused between different queries. When workers perform the search, they always start at the time-shard's starting state and progress forward one state-change at a time.) As the worker advances forward and applies changes, it arrives to new consistent cuts and evaluates the query predicate to decide if the cut must be emitted to the user or not. The figure illustrates running "SELECT vr1 FROM setd WHEN EXISTS v1, v2 IN vr1 : (v1 – v2 >= 2)" query on a ZooKeeper cluster of 3 nodes. Each time a variable's value is changed, it is being recorded into a Retroscope log. As the systems starts to search for the global states satisfying the query condition, it first looks at state c0. At this cut the difference between versions on Node 3 and Node 1 is 2, meaning that the worker emits the cut back. The system then moves forward and eventually reaches cut c5 at which the staleness is only 1 version, so c5 is skipped. The search ends when all consistent cuts are evaluated this way. In the example, you will see cuts c0, c1, c2, c3, c4 and c6 being emitted, while cut c5 have small staleness and does not satisfy the condition of the query.

Preliminary Evaluation

We performed a preliminary evaluation of Retroscope monitoring on ZooKeeper to study the znode staleness from the server perspective. To the best of our knowledge, no prior studies have been conducted to see how stale ZooKeeper values can be from the server stance.

We started by looking at a normal running ZooKeeper cluster on AWS and ran a test workload of 10,000 updates to either a single znode or 10 znodes. All the updates were done by a single synchronous client to make sure that the ZooKeeper cluster is not saturated under a heavy load. After the updates are over, we ran a simple retrospective RQL query to retrieve consistent states at which values have been changing in znode r1: "SELECT vr1 FROM setd;", where setd is the name of the log keeping track of changes and vr1 is version of znode r1. This query produces lots of cuts that must be examined manually, however a quick glance over it shows that a staleness of 1 version is common. This is a normal behavior, as Retroscope was capturing staleness of 1 version right after the value has been committed, and the commit messages may still be in flight to the follower nodes.

To search whether there exists any cases where staleness is 2 versions or greater, we then evaluated the following query: "SELECT vr1 FROM setd WHEN EXISTS v1, v2 IN vr1 : (v1 – v2 >= 2)". With this query we learned that by targeting a single znode, we were able to observe a staleness of 2 or more versions on about 0.40% of consistent cuts examined. Surprisingly, we also observed a spike of 9 versions in staleness at one cut, however the stale node was then able to quickly catch up. (Probably that spike was due to garbage collection.) We observed that spreading the load across 10 znodes reduced the staleness observed for a single znode.

We performed the same staleness experiment on the cluster with one straggler ZooKeeper node that was artificially made to introduce a slight delay of 2ms in the processing of incoming messages. With the straggler, ZooKeeper staleness hit extremes: the slow node was able to keep up with the rest of the cluster only for the first 7 seconds of the test run but then fell further and further behind.

We measured the query processing performance on a single AWS EC2 t2.micro worker node with  1 vCPU and 1 GB of RAM. With such limited resources, a Retroscope worker was still able to process up to 14,000 consistent cuts per second in our ZooKeeper experiments.

We also tested the performance of Retroscope using a synthetic workload. Our test cluster consisted of 2 quad-core machines with 16 GB or RAM connected over WiFi. Each machine was hosting the Ignite node and a synthetic workload generator that streamed events to both Ignite nodes. This cluster was able to intake ~16,500 events per second. The query performance running on the cluster was above 300,000 consistent cuts per second, for a moderately complex query involving a set operation Set operations and a comparison.

Future Plan

We are working on enhancing the capabilities of Retroscope to provide richer and more universal querying language for variety of monitoring and debugging situations. Our plans include adding more complex searching capabilities with conditions that can span across multiple consistent cuts. We are also expanding the logging and processing capabilities to allow keeping track of arrays and sets instead of just single-value variables.

Related Links
Here are my other posts on "monitoring" systems:

Monday, September 11, 2017

Web Data Management in RDF Age

This was the keynote on ICDCS'17 Day 2, by Tamer Ozsu. Below are my notes from his talk. The slides for his presentation are available here.

Querying web data presents challenges due to lack of a schema, its volatility, and its sheer scale. There have been several recent approaches to querying web data, including XML, JSON, and fusion tables. This talk is about another approach to maintaining and querying web data: RDF and SPARQL. This last one is the recommended way by W3C (World Wide Web Consortium) and is a building block for semantic web and Linked Open Data (LOD). Here is a diagram denoting the LOD datasets and links between them as of 2014.

Resource Description Framework (RDF)

In RDF, everything is a uniquely named resource (URI). The ID for Jack Nicholson's resource is JN29704. Resources have defined attributes: y:JN29704 hasName = "Jack Nicholson", y:JN29704 BornOnDate = "1937-04-22". The relationships with other resources can be defined via predicates.(This is "predicates in grammar context", and not in logic context.) Predicates for triples of the form "Subject Predicate Object". The subjects always URIs, and the objects are "literals" or URIs.

It turns out biologists are heavy users of RDF with the UniProt dataset.

RDF query model: SPARQL

SPARQL provides a flavor of SQL. It operates on the RDF triple, and now the _variables_ can be used to substitute in place of subject, predicate, object as well. Thus it is also possible to represent SPARQL queries as a graph. Once you represent the SPARQL query as a graph, answering a SPARQL query reduces to a subgraph matching matching problem between the RDF data-graph and the query-graph.

As in querying with SQL, too many joins are bad for performance, and a lot of work is extended to optimize this.

Distributed SPARQL processing

If it is possible to gather all RDF needed in one place, then querying can be done easily with using common cloud computing tools. You can partition and store the RDF on HDFS, and then run SPARQL queries on this as mapreduce jobs (say using Spark).

If it is not possible to gather all RDF data, you need to a distributed querying, and for this methods similar to distributed RDBMS can be used.

To complicate this further not all of the RDF data sites can process SPARQL queries. So several approaches are formulated to deal with the problem that poses for distributed SPARQL processing, such as writing wrappers to execute SPARQL queries on these sites.

Finally live querying of the web of RDF linked data is also possible by sending bots to traverse/browse this graph at runtime.

Thursday, September 7, 2017


The other day I had taken my little daughter to the library. There was a lady tutoring his son. The boy was slightly over-active and argumentative. He was wearing a wool Buffalo hat. It was warm in the library, so that seemed out of place. It was also strange that the boy, who is around 7-8 years old, was not in school at this time of the day. But I can't put 2 and 2 together.

I assumed that the lady is home-schooling the boy. Since I was interested about how home-schooling works (what works and what not), I asked her about it. (Although I am a shy introvert, I am not shy from asking people questions when I am curious about something. That is a little oddity I have.) She said she was not homeschooling, but her son missed a lot of classes, so she was trying to help him catch up. At that moment, I notice the thick book she was reading: "CANCER"!

My heart sank. I tried to seem upbeat though. I asked the boy his name, Nicholas, and wished him best of luck with his studies.


Life is that way. We get selfish and self-centered when left to our devices. We think we have it bad, that we have big challenges. We become drama queens.  We are stressed about work, we complain about other small stuff. And occasionally we are hit with an experience like that, where a real challenge presents itself for us or for others, that reminds us how grateful we should have been.

We should not give into entropy and get too self-centered. Everybody has their own problems, challenges, and burdens. And sometimes those are huge challenges. The world would be a better place if we are more grateful for ourselves and for the moment and try to sympathize and help others with their challenges, rather than being self-absorbed with our petty problems.

Here is a related video I came across recently on gratitude.

This one is for kids, but hey it is simple. 

Wednesday, September 6, 2017

Paper summary. Untangling Blockchain: A Data Processing View of Blockchain Systems

This is a recent paper submitted to arxiv (Aug 17, 2017). The paper presents a hype-free, very accessible, yet technical  survey on blockchain. It takes a data-processing centric view of blockchain technology, treating  concurrency issues and efficiency of consensus as first class citizens in blockchain design. Approaching from that perspective, the paper  tries to forge a connection with blockchain technologies and distributed transaction processing systems. I think this perspective is promising for grounding the blockchain research better. When we connect a new area to an area with a large literature, we have opportunities to compare/contrast and perform efficient OODA loops.

Thanks to this paper, I am finally getting excited about blockchains. I had a huge prejudice about blockchains based on the Proof-of-Work approach being used in BitCoin. Nick Szabo tried to defend this huge inefficiency, citing that it is necessary for attestation/decentralized/etc, but that is a false dichotomy. There are many efficient ways to design blockchains and still provide decentralization and attestation guarantees. And the paper provides an exhaustive survey of blockchain design approaches.

In my summary below, I use many sentences (sometimes paragraphs) directly lifted off from the paper. The paper is easily accessible and well written, and worth reading if you like to learn about blockchains.

Blockchain basics

Blockchain is a log of ordered blocks, each containing multiple transactions. In the database context, blockchain can be viewed as a solution to distributed transaction management: nodes keep replicas of the data and agree on an execution order of transactions. Distributed transaction processing systems often deal with concurrency control via 2-phase commit. However, in order to tolerate Byzantine behavior, the overhead of concurrency control becomes higher in blockchains.

Blockchains have many applications in business, such as security trading and settlement, asset and finance management banking, and insurance. Currently these applications are handled by systems built on MySQL and Oracle and by trusted companies/parties with whole bunch of lawyers/clerks/etc. Blockchain can disrupt this status quo because it reduces infrastructure and human costs. Blockchain's immutability, transparency, and inherent security help reduce human errors/malice and the need for manual intervention due to conflicting data. The paper says: "Goldman Sachs estimated 6 billion saving in current capital market, and J.P. Morgan forecast that blockchains will start to replace currently redundant infrastructure by 2020."

Oops, I forgot to mention Bitcoin. Bitcoin is also one of the applications of blockchains, namely a digital/crypto-currency application. Cryptocurrency is currently the most famous application of blockchains, but as the paper covers smartcontracts, you can see that those applications will dwarf anything we have seen so far.

Private vs Public blockchains

In public blockchains, any node can join/leave the system, thus the blockchain is fully decentralized, resembling a peer-to-peer system. In private blockchains, the blockchain enforces strict membership via access-control and authentication.

Public blockchain: Bitcoin uses proof-of-work (PoW) for consensus: only a miner which has successfully solved a computationally hard puzzle (finding the right nonce for the block header) can append to the blockchain. It is still possible that two blocks are appended at the same time, creating a fork in the blockchain. Bitcoin resolves this by only considering a block as confirmed after it is followed by a number of blocks (typically six blocks).  PoW works well in the public settings because it guards against Sybil attacks, however, since it is non-deterministic and computationally expensive, it is unsuitable for applications such as banking and finance which must handle large volumes of transactions in a deterministic manner. (Bitcoin supports 7 transactions per second!)

Private blockchain: Since node identities are known in the private settings, most blockchains adopt one of the protocols from the vast literature on distributed consensus. Hyperledger v0.6.0 uses PBFT, a byzantine tolerant Paxos, while some other private chains even use plain Paxos. (Hyperledger  v1.0.0-rc1 adopts a no-Byzantine consensus protocol based on Kafka.)

However, even with PBFT (or XFT) scalability is a problem. Those Paxos protocols will not be able to scale to tens of nodes, let alone hundreds. An open research question is how can you have big-Paxos? (The federated consensus approach mentioned below is a promising approach to solve this problem.)

What makes Blockchains tick?

The paper studies blockchain technology under four dimensions: distributed ledger, cryptography, consensus protocol and smartcontracts.

Distributed ledger: In blockchains, the ledger is replicated over all the nodes as an append-only data structure. A blockchain starts with some initial states, and the ledger records entire history of update operations made to the states. (This reminds me of Tango and particularly the followup vCorfu paper. The ideas explored there can be used for efficiently maintaining multiple streams in the same log so that reading the whole chain is not needed for materializing the updates at the nodes.)

Cryptography: This is used for ensuring the integrity of the ledgers, for making sure there is no tampering of the blockchain data. The global states are protected by a hash (Merkle) tree whose root hash is stored in a block.  The block history is also protected by linking the blocks through a chain of cryptographic hash pointers: the content of block number n+1 contains the hash of block number n. This way, any modification in block n immediately invalidates all the subsequent blocks.

Smart Contracts: A smart contract refers to the computation executed when a transaction is performed. At the trivial end of the spectrum, Bitcoin nodes implement a simple replicated state machine model which moves coins from one address to another. At the other extreme, Ethereum smart contracts can specify arbitrary computations and comes with its own virtual machine for executing Ethereum bytecodes. Similarly, Hyperledger employs Docker containers to execute its contracts written in any language. Smart contracts have many emerging business applications, but this makes software bugs very critical. In the DAO attack, the attacker stole $50M worth of asset exploiting a bug.


Since I am partial to distributed consensus, I am reserving a separate section to the use of consensus in blockchains.

Proof of work: All PoW protocols require miners to find solutions to cryptographic puzzles based on cryptographic hashes. How fast a block is created is dependent on how hard the puzzle is. The puzzle cannot be arbitrary east because it leads to unnecessary forks in the blockchain. Forks not only lead to wastage of resources but have security implication since they make it possible to double spend. Ethereum, another PoW-based blockchain, adopts GHOST protocol which helps bring down block generation time to tens of seconds without compromising much security. In GHOST, the blockchain is allowed to have branches as long as the branches do not contain conflicting transactions.

Proof of Stake: To alleviate huge cost of PoW, PoS maintains a single branch but changes the puzzle's difficulty to be inversely proportional to the miner's stake in the network. A stake is essentially a locked account with a certain balance representing the miner's commitment to keep the network healthy.

Paxos protocols: PoW suffers from non-finality, that is a block appended to a blockchain is not confirmed until it is extended by many other blocks.  PBFT protocol is deterministic. Implemented in the earlier version of Hyperledger (v0.6), the protocol ensures that once a block is appended, it is final and cannot be replaced or modified. It incurs O(N^2) network messages for each round of agreement where N is the number of nodes in the network. (This is because in contrast to classical Paxos, where every participant talks only with the leader, in PBFT,  every participant talks to every other participant as well.) The paper reports that PBFT scales poorly and collapses even before reaching the network limit. Zyzzyva optimizes for normal cases (when there are no failures) via speculative execution. XFT, assumes a network less hostile than purely Byzantine, and demonstrates better performance by reducing the number of network messages.

Federated: To overcome the scalability woes with PBFT, Ripple adopts an approach that partitions the network into smaller groups called federates. Each federate runs a local consensus protocol among its members, and the local agreements are then propagated to the entire network via nodes lying in the intersections of the federates. Ripple’s safety conditions are that there is a large majority of honest nodes in every federate, and that the intersection of any two federates contain at least one honest node. Ripple assumes federates are pre-defined and their safety conditions can be enforced by a network administrator. In a decentralized environment where node identities are unknown, such assumptions do not hold. Byzcoin and Elastico propose novel, two-phase protocols that combine PoW and PBFT. In the first phase, PoW is used to form a consensus group. Byzcoin implements this by having a sliding window over the blockchain and selecting the miners of the blocks within the window. Elastico groups nodes by their identities that change every epoch. In particular, a node identity is its solution to a cryptographic puzzle. In the second phase, the selected nodes perform PBFT to determine the next block. The end result is faster block confirmation time at a scale much greater than traditional PBFT (over 1000 nodes).

Nonbyzantine: The latest release of Hyperledger (v1.0) outsources the consensus component to Kafka. More specifically, transactions are sent to a centralized Kafka service which orders them into a stream of events. Every node subscribes to the same Kafka stream and therefore is notified of new transactions in the same order as they are published.

I love that Blockchains brings new constraints and requirements to the consensus problem. In Blockchains, the participants can now be Byzantine, motivated by financial gains. And it is not sufficient to limit the consensus participants to be 3 nodes or 5 nodes, which was enough for tolerating crashes and ensuring persistency of the data. In Blockchains, for reasons of attestability and tolerating colluding groups of Byzantine participants, it is preferred to keep the participants at 100s. Thus the problem becomes: How do you design byzantine tolerant consensus algorithm that scales to 100s or 1000s of participants?

I love when applications push us to invent new distributed algorithms.

Blockbench and evaluation

The paper introduces a benchmarking framework, BLOCKBENCH, that is designed for understanding performance of private blockchains against data processing workloads. BLOCKBENCH is open source and contains YCSB and SmallBank data processing workloads as well as some popular Ethereum contracts.

The paper then presents a comprehensive evaluation of Ethereum, Parity and Hyperledger. The comparison against H-Store presented demonstrates that blockchains perform poorly at data processing tasks currently being handled by database systems. Although databases are designed without security and tolerance to Byzantine failures, the gap remains too big for blockchains to be disruptive to incumbent database systems.

The main findings are:

  • Hyperledger performs consistently better than Ethereum and Parity across the benchmarks. But it fails to scale up to more than 16 nodes.
  • Ethereum and Parity are more resilient to node failures, but they are vulnerable to security attacks that forks the blockchain.
  • The main bottlenecks in Hyperledger and Ethereum are the consensus protocols, but for Parity the bottleneck is caused by transaction signing.
  • Ethereum and Parity incur large overheads in terms of memory and disk usage. Their execution engine is also less efficient than that of Hyperledger.  
  • Hyperledger’s data model is low level, but its flexibility enables customized optimization for analytical queries.

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...