Wednesday, May 24, 2017

Paper summary: Making sense of Performance in Data Analytics Frameworks (NSDI 15)

What constitutes the bottlenecks for big data processing frameworks? If CPU is a bottleneck, it is easy to fix: add more machines to the computation. Of course for any analytics job, there is some amount of coordination needed across machines. Otherwise, you are just mapping and transforming, but not reducing and aggregating information. And this is where the network and the disk as bottleneck comes into play. The reason you don't get linear speedup by adding more machines to an analytics job is the network and disk bottlenecks. And a lot of research and effort is focused on trying to optimize and alleviate the network and disk bottlenecks.

OK this sounds easy, and it looks like we understand the bottlenecks in big data analytics. But this paper argues that there is a need to put more work into understanding the performance of big data analytics framework, and shows that at least for Spark on the benchmarks and workloads they tried (see Table 1), there are some counter intuitive results. For Spark, the network is not much of a bottleneck: Network optimizations can only reduce job completion time by a median of at most 2%. The disk is more of a bottleneck than the network: Optimizing/eliminating disk accesses can reduce job completion time by a median of at most 19%. But most interestingly, the paper shows that CPU is often the bottleneck for Spark, so engineers should be careful about trading off I/O time for CPU time using more sophisticated serialization and compression techniques.

This is a bit too much to digest at once, so let's start with the observations about disk versus network bottlenecks in Spark. Since shuffled data is always written to disk and read from the disk, the disk constitutes more of a bottleneck than the network in Spark. (Here is a great Spark refresher. Yes RDDs help, but pipelining is possible only within a stage. Across the stages, shuffling is needed, and the intermediate shuffled data is always written to and read from the disk.)

To elaborate more on this point, the paper says: "One reason network performance has little effect on job completion time is that the data transferred over the network is a subset of data transferred to disk, so jobs bottleneck on the disk before bottlenecking on the network [even using a 1Gbps network]." While prior work has found much larger improvements from optimizing network performance, the paper argues that prior work focused mostly on workloads where shuffle data is equal to the amount of input data, which is not representative of typical workloads (where shuffle data is around one third of input data). Moreover the paper argues, prior work used incomplete metrics, conflating the CPU and network utilization. (More on this later below, where we discuss the blocked time analysis introduced in this paper.)

OK, now for the CPU being the bottleneck, isn't that what we want? If the CPU becomes the bottleneck (and not the network and the disk), we can add more machines to it to improve processing time. (Of course there is a side effect that this will in turn create more need for network and disk usage to consolidate the extra machines. But adding more machines is still an easy route to take until adding machines start to harm.) But I guess there is good CPU utilization, and not-so-good CPU utilization, and the paper takes issue with the latter. If you have already a lot overhead/waste associated with your CPU processing, it will be easier to speed up your framework by adding more machines, but that doesn't necessarily make your framework an efficient framework as it is argued in "Scalability, but at what COST?".

So I guess, the main criticisms in this paper for Spark is that Spark is not utilizing the CPU efficiently and leaves a lot of performance on the table.  Given the simplicity of the computation in some workloads, the authors were surprised to find the computation to be CPU bound. The paper blames this CPU over-utilization on the following factors. One reason is that Spark frameworks often store compressed data (in increasingly sophisticated formats, e.g. Parquet), trading CPU time for I/O time. They found that if they instead ran queries on uncompressed data, most queries became I/O bound. A second reason that CPU time is large is an artifact of the decision to write Spark in Scala, which is based on Java: "after being read from disk, data must be deserialized from a byte buffer to a Java object". They find that for some queries considered, as much as half of the CPU time is spent deserializing and decompressing data. Scala is high-level language and has overheads; for one query that they re-wrote in C++ instead of Scala, they found that the CPU time reduced by a factor of more than 2x.

It seems like Spark is paying a lot of performance penalty for their selection of Scala as the programming language. It turns out the programming language selection was also a factor behind the stragglers: Using their blocked time analysis technique, the authors identify the two leading causes of Spark stragglers as Java's garbage collection and time to transfer data to and from disk. The paper also mentions that optimizing stragglers can only reduce job completion time by a median of at most 10%, and in 75% of queries, they can identify the cause of more than 60% of stragglers.

Blocked time analysis

A major contribution of the paper is to introduce "blocked time analysis" methodology to enable deeper analysis of end-to-end performance in data analytics frameworks.

It is too complicated to infer job bottlenecks by just looking at log of parallel tasks. Instead the paper argues, we should go with the resources perspective, and try to infer how much faster would the job complete if tasks were never blocked on the network. The blocked time analysis method instruments the application to measure performance, uses simulations to find improved completion time while taking new scheduling opportunities into account.


In sum, this paper raised more questions than it answered for me, but that is not necessarily a bad thing. I am OK with being confused, and I am capable of holding ambivalent thoughts in my brain. These are minimal (necessary but not sufficient) requirements for being a researcher. I would rather have unanswered questions than unquestioned answers. (Aha, of course, that was a Feynman quote. "I would rather have questions that can't be answered than answers that can't be questioned." --Richard Feynman)

This analysis was done for Spark. The paper makes the analysis tools and traces available online so that others can replicate the results. The paper does not claim that these results are broadly representative and apply to other big data analytics frameworks.

Frank McSherry and University of Cambridge Computing Lab take issue with generalizability of the results, and run some experiments on the timely dataflow framework. Here are their post1 and post2 on that.

The results do not generalize for machine learning frameworks, where network is still the significant bottleneck, and optimizing the network can give up to 75% gains in performance.

Tuesday, May 9, 2017

Paper review: Prioritizing attention in fast data

This paper appeared in CIDR17 and is authored by Peter Bailis, Edward Gan, Kexin Rong, and Sahaana Suri at Stanford InfoLab.

Human attention is scarce, data is abundant. The paper argues, this is how we fight back:

  • prioritize output: return fewer results
  • prioritize iteration: perform feedback driven development and give useful details and allow user to tune the analysis pipeline  
  • prioritize computation: aggressively filter and sample, tradeoff accuracy/completeness with performance where it has low impact, and use incremental data structures

The slogan for the system is: MacroBase is a search engine for fast data. MacroBase employs a customizable combination of high-performance streaming analytics operators for feature extraction, classification, and explanation.

MacroBase has a dataflow architecture (Storm, Spark Streaming, Heron). The paper argues it is better to focus on what dataflow operators to provide than to try to design from-scratch a new system (which won't be much faster/efficient than existing dataflow systems anyhow).

The architecture of MacroBase is simple:
ingestion&ETL -> feature transform -> classification -> data explanation

MacroBase focuses attention on dataflow operators to prioritize computation. This is done by applying classic systems techniques: predicate pushdown, incremental memorization, partial materialization, cardinality estimation, approximate query processing (top K sketch).

Users are engaged at three different interface levels with MacroBase.
1) Basic: web based point and click UI
2) Intermediate: custom pipeline configuring using Java
3) Advanced: custom dataflow operator design using Java/C++

Users highlight key performance metrics (e.g., power drain, latency) and metadata attributes (e.g., hostname, device ID), and MacroBase reports explanations of abnormal behavior. For example, MacroBase may report that queries running on host 5 are 10 times more likely to experience high latency than the rest of the cluster.

As a broader theme, the paper argues there is opportunity in marrying systems-oriented performance optimization and the machine learning literature. Another big message from the paper is the importance of building combined and optimized end-to-end systems.

MacroBase is currently doing mostly anomaly/outlier detection, and it is not doing any deeper machine learning training. There are plans to make the system distributed. Given that it is based on a dataflow system, there are many plausible ways to achieve distribution of MacroBase.

Saturday, May 6, 2017

Paper Review: Serverless computation with OpenLambda

This paper provides a great accessible review and evaluation of the AWS Lambda architecture. It is by Scott Hendrickson, Stephen Sturdevant, Tyler Harter, Venkateshwaran Venkataramani†, Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and it appeared at Hot Cloud 16

Virtual machines virtualized and shared the hardware so multiple VMs can colocate on the same machine. This allowed consolidation of machines, prevented the server sprawl problem, and reduced costs as well as improving manageability.

The containers virtualized and shared the operating system, and avoided the overheads of VMs. They provided fast startup times for application servers. By "fast" we mean about 25 seconds of preparation time.

In both VMs and containers, there is a "server" waiting for a client to serve to. Applications are defined as collection of servers and services.

"Serverless" takes the virtualization a step ahead. They virtualize and share the runtime, and now the unit of deployment is a function. Applications are now defined as a set of functions (i.e., lambda handlers) with access to a common data store.

Lambda handlers from different customers share common pools of runtimes managed by the cloud provider, so developers need not worry about server management. Handlers are typically written in interpreted languages such as JavaScript or Python. By sharing the runtime environment across functions, the code specific to a particular application will typically be small, and hence it is inexpensive to send the handler code to any worker in a cluster.

Performance evaluation on AWS Lambda 

Handlers can execute on any worker; in AWS, startup time for a handler on a new worker is approximately 1-2 seconds. Upon a load burst, a load balancer can start a Lambda handler on a new worker to service a queued RPC call without incurring excessive latencies. Figure 2 shows that 100 lambda workers are generated in a short time to serve 100 outstanding RPC requests.

Figure 5 shows more details about the lambda handler initialization. There is a delay for unpausing a lambda function (1ms), if you start from scratch that delay is actually several 100ms.

On the systems research side, one problem to investigate is building better execution engines. Under light load, Lambdas are significantly slower than containers as Figure 4 shows.

Lambdas are great for performance tuning. You can see which functions are accessed how many times because that is how billing is provided. This helps you to tune the performance of your applications.

On the topic of billing, in AWS, the cost of an invocation is proportional to the memory cap (not the actual memory consumed) multiplied by the actual execution time, as rounded up to the nearest 100ms. But many RPCs are shorter than 100ms, so they cost several times more than if charging were more fine-grained.

The authors are working on building an opensource lambda computing platform.

Discussion and future directions

So what kind of functions are suitable for "lambdatization"?

Currently RPC calls from web apps are being lambdatized. But as the paper observed, maybe those are too small, they last less than 100 ms, the unit of billing. A common use case scenario is when an app puts an image to S3, this triggers a call to a lambda handler that processes this image and creates a thumbnail. That is a better fit.

It is also best if the input to the lambda function is not very big, wasting/duplicating work by transferring large amounts of data. Lambda is for computation. The computation should compensate the cost of network use. Lambda excels in handling bursty traffic by autoscaling extremely quickly.

Picking up on this last observation, I think a very beneficial way to employ lambdatization is for addressing bottlenecks of the big data processing application/platform that surface at runtime. In a sense, I am advocating to use lambdatization to virtualize even the application at the unit of functions! For this we can ask the developer to provide tags to label some functions as lambda-offloadable. Then we can use a preprocessor and a shim layer to deploy these functions as lambda functions, so they can be auto-scaled based on the feedback/directives from the underlying distributed systems middleware.

Two-phase commit and beyond

In this post, we model and explore the two-phase commit protocol using TLA+. The two-phase commit protocol is practical and is used in man...