Paper summary. Blazes: Coordination analysis and placement for distributed programs
This paper is by Peter Alvaro, Neil Conway, Joseph M. Hellerstein, and David Maier. It appears in October 2017 issue of ACM Transactions on Database Systems. A preliminary conference version appeared in ICDE 2014.
This paper builds a theory of dataflow/stream-processing programs, which cover the Spark, Storm, Heron, Tensorflow, Naiad, TimelyDataflow work.
The paper introduces compositional operators of labels, and shows how to infer the coordination points in dataflow programs. When reading below, pay attention to the labeling section, the labels "CR, CW, OR, OW", and the section on reduction rules on labels.
To figure out these coordination points, the Blazes framework relies on annotations of the dataflow programs to be supplied as an input. This is demonstrated in terms of a Twitter Storm application and a Bloom application.
The paper has many gems. It says at the conclusion section in passing that when designing systems, we should pay attention to coordination locality, not just data-access locality. (Maybe often the two are related/correlated, and this leads people to confuse them.) It also makes practical lessons about how to organize your dataflow programs: move replication upstream, caching downstream. I wish the paper elaborated more on these points at the conclusion.
I am not done with the paper yet. It is a 31 page paper, packed with information. I gave it a first pass. I think I will be reading this paper again and keep thinking on it.
In my summary below I borrow from the paper figures and a lot of text with small edits.
Some contributions of the Blazes work are:
Look at these anomalies. Just look at them. This figure should have come with a trigger warning for distributed systems developers.
But it is possible to go to the first-principles and cut the Gordian knot. If we could ensure that every component in a dataflow produces a deterministic set of outputs regardless of the order in which its inputs are delivered, then we could prevent all of these anomalies.
There are two approaches to doing that.
*Consistency via Coordination*: A sufficient strategy for preventing the anomalies (or arbitrary components) is to remove the nondeterminism in delivery order by coordinating the (otherwise asynchronous) processing of inputs by using a coordination system such as ZooKeeper. This is a general (independent of semantics of computation) approach but imposes throughput and latency costs.
*Consistency via Component Properties*: A dataflow component is *confluent* if it produces the same set of outputs for all orderings of its inputs. At any time, the output of a confluent component (and any redundant copies of that component) is a subset of the unique, final output.
In an ideal world, we would build dataflows out of strictly confluent components. Unfortunately this is an infeasible assumption. When we add an order-sensitive component to an otherwise confluent dataflow, how can we avoid falling back on a global coordination strategy? This is the question the paper investigates.
Remark: Note that confluent is a stronger property than convergent. Convergent replicated components are guaranteed to eventually reach the same state, but this final state may not be uniquely determined by component inputs. As Figure 5 indicates, convergent components allow cross-instance nondeterminism, which can occur when reading "snapshots" of the convergent state while it is still changing. Consider what happens when the outputs of a convergent component (e.g., GETs posed to a key/value store) flow into a replicated stateful component (e.g., a replicated cache). If the caches record different stream contents, then the result is replica divergence.
The *CR* annotation indicates that a path through a component is confluent and stateless; that is, it produces deterministic output regardless of its input order, and the path does not modify the component's state.
*CW* denotes a path that is confluent and stateful. That is, calls to the API may modify the internal state of the component, but the final state is determined only by the input contents and not their order.
The annotations *OR_gate* and *OW_gate* denote non-confluent paths that are stateless or stateful, respectively.
The *Seal_key* annotation means that the stream is punctuated on the subset key of the stream's attributes.
The label *Async* corresponds to streams with deterministic contents whose order may differ on different executions or different stream instances. Streams labeled *NDRead* may exhibit cross-run nondeterminism, having different contents in different runs, or cross-instance nondeterminism within a single run if the system is replicated. Finally, streams labeled *Taint* may exhibit persistent nondeterministic contents, or replica divergence in replicated systems.
Combination rules model what happens when multiple input streams contribute to a single output stream in a dataflow. Taint is the strongest label: if any of the paths contributing to an output stream can exhibit permanent nondeterministic contents, then so can that output stream. Async is the weakest, and will be dominated by any labels with which it combines.
*Sealing Strategies.* To determine that the complete partition contents are available, the consumer must a) participate in a protocol with each producer to ensure that the local per-producer partition is complete, and b) perform a unanimous voting protocol to ensure that it has received partition data from each producer. The voting protocol is a local form of coordination, limited to the "stake holders" contributing to individual partitions. When there is only one producer instance per partition, Blazes need not synthesize a voting protocol. Once the consumer has determined that the contents of a partition are (henceforth) immutable, it may process the partition without any further synchronization.
Blazes can be directly applied to existing programming platforms based on distributed stream or dataflow processing, including Twitter Storm and Spark Streaming. Programmers of stream processing engines interact with Blazes in a graybox manner: they provide simple semantic annotations to the blackbox components in their dataflows, and Blazes performs the analysis of all data ow paths through the program.
The Storm word-count dataflow can be reduced to Taint in the following way once we supply an Async label for its input interface.
If, on the other hand, the input stream is sealed on batch, then Blazes instead produces this reduction:
Because a batch is atomic (its contents may be completely determined once a punctuation arrives) and independent (emitting a processed batch never affects any other batches), the topology will produce deterministic contents in its (possibly nondeterministically ordered) outputs, a requirement for Storm’s replay-based fault-tolerance—under all interleavings.
Figure 20 shows that the sealed solution outperforms the ordering based solution significantly as the number of workers increase.
The TLA+ approach seems to be more fine granularity and can do refinement checks between different models. But it is preliminary so far. The Blazes approach can handle components. It looks like it is more coarse granularity but can be applied easier.
This paper builds a theory of dataflow/stream-processing programs, which cover the Spark, Storm, Heron, Tensorflow, Naiad, TimelyDataflow work.
The paper introduces compositional operators of labels, and shows how to infer the coordination points in dataflow programs. When reading below, pay attention to the labeling section, the labels "CR, CW, OR, OW", and the section on reduction rules on labels.
To figure out these coordination points, the Blazes framework relies on annotations of the dataflow programs to be supplied as an input. This is demonstrated in terms of a Twitter Storm application and a Bloom application.
The paper has many gems. It says at the conclusion section in passing that when designing systems, we should pay attention to coordination locality, not just data-access locality. (Maybe often the two are related/correlated, and this leads people to confuse them.) It also makes practical lessons about how to organize your dataflow programs: move replication upstream, caching downstream. I wish the paper elaborated more on these points at the conclusion.
I am not done with the paper yet. It is a 31 page paper, packed with information. I gave it a first pass. I think I will be reading this paper again and keep thinking on it.
In my summary below I borrow from the paper figures and a lot of text with small edits.
Contributions
The key idea in Blazes is that even when components are order-sensitive, it is often possible to avoid the cost of global ordering without sacrificing consistency. In many cases, Blazes can ensure consistent outcomes via *sealing* which indicates when partitions of a stream have stopped changing. (Is "sealing" like "watermark" in the Google streaming paper?)Some contributions of the Blazes work are:
- The paper shows how to analyze the composition of consistency properties via a term-rewriting technique over dataflow paths.
- The paper demonstrates how the semantics of components can be modeled using a system of programmer-supplied annotations (graybox approach).
- The paper discusses two alternative coordination strategies, ordering and sealing, and shows how to automatically generate application-aware coordination code that uses the cheaper sealing technique in many cases.
Dataflow consistency
Look at these anomalies. Just look at them. This figure should have come with a trigger warning for distributed systems developers.
But it is possible to go to the first-principles and cut the Gordian knot. If we could ensure that every component in a dataflow produces a deterministic set of outputs regardless of the order in which its inputs are delivered, then we could prevent all of these anomalies.
There are two approaches to doing that.
*Consistency via Coordination*: A sufficient strategy for preventing the anomalies (or arbitrary components) is to remove the nondeterminism in delivery order by coordinating the (otherwise asynchronous) processing of inputs by using a coordination system such as ZooKeeper. This is a general (independent of semantics of computation) approach but imposes throughput and latency costs.
*Consistency via Component Properties*: A dataflow component is *confluent* if it produces the same set of outputs for all orderings of its inputs. At any time, the output of a confluent component (and any redundant copies of that component) is a subset of the unique, final output.
In an ideal world, we would build dataflows out of strictly confluent components. Unfortunately this is an infeasible assumption. When we add an order-sensitive component to an otherwise confluent dataflow, how can we avoid falling back on a global coordination strategy? This is the question the paper investigates.
Remark: Note that confluent is a stronger property than convergent. Convergent replicated components are guaranteed to eventually reach the same state, but this final state may not be uniquely determined by component inputs. As Figure 5 indicates, convergent components allow cross-instance nondeterminism, which can occur when reading "snapshots" of the convergent state while it is still changing. Consider what happens when the outputs of a convergent component (e.g., GETs posed to a key/value store) flow into a replicated stateful component (e.g., a replicated cache). If the caches record different stream contents, then the result is replica divergence.
Annotated dataflow graphs
The *CR* annotation indicates that a path through a component is confluent and stateless; that is, it produces deterministic output regardless of its input order, and the path does not modify the component's state.
*CW* denotes a path that is confluent and stateful. That is, calls to the API may modify the internal state of the component, but the final state is determined only by the input contents and not their order.
The annotations *OR_gate* and *OW_gate* denote non-confluent paths that are stateless or stateful, respectively.
The *Seal_key* annotation means that the stream is punctuated on the subset key of the stream's attributes.
The label *Async* corresponds to streams with deterministic contents whose order may differ on different executions or different stream instances. Streams labeled *NDRead* may exhibit cross-run nondeterminism, having different contents in different runs, or cross-instance nondeterminism within a single run if the system is replicated. Finally, streams labeled *Taint* may exhibit persistent nondeterministic contents, or replica divergence in replicated systems.
Reduction relation
Combination rules model what happens when multiple input streams contribute to a single output stream in a dataflow. Taint is the strongest label: if any of the paths contributing to an output stream can exhibit permanent nondeterministic contents, then so can that output stream. Async is the weakest, and will be dominated by any labels with which it combines.
Coordination selection
Blazes will repair nonconvergent and nonconfluent dataflows by constraining how messages are delivered to individual components. When possible, Blazes will recognize the compatibility between sealed streams and component semantics, synthesizing a seal-based strategy that avoids global coordination. Otherwise, it will enforce a total order on message delivery, say using ZooKeeper.*Sealing Strategies.* To determine that the complete partition contents are available, the consumer must a) participate in a protocol with each producer to ensure that the local per-producer partition is complete, and b) perform a unanimous voting protocol to ensure that it has received partition data from each producer. The voting protocol is a local form of coordination, limited to the "stake holders" contributing to individual partitions. When there is only one producer instance per partition, Blazes need not synthesize a voting protocol. Once the consumer has determined that the contents of a partition are (henceforth) immutable, it may process the partition without any further synchronization.
Blazes framework
Blazes can be directly applied to existing programming platforms based on distributed stream or dataflow processing, including Twitter Storm and Spark Streaming. Programmers of stream processing engines interact with Blazes in a graybox manner: they provide simple semantic annotations to the blackbox components in their dataflows, and Blazes performs the analysis of all data ow paths through the program.
Case study: Word count example
The Storm word-count dataflow can be reduced to Taint in the following way once we supply an Async label for its input interface.
If, on the other hand, the input stream is sealed on batch, then Blazes instead produces this reduction:
Because a batch is atomic (its contents may be completely determined once a punctuation arrives) and independent (emitting a processed batch never affects any other batches), the topology will produce deterministic contents in its (possibly nondeterministically ordered) outputs, a requirement for Storm’s replay-based fault-tolerance—under all interleavings.
Figure 20 shows that the sealed solution outperforms the ordering based solution significantly as the number of workers increase.
Conclusion
Recently I had summarized a computational model for TensorFlow. It was a preliminary basic attempt at creating an operational semantics for TensorFlow programs. It would be interesting to compare that with the Blazes approach, as they are solving related problems.The TLA+ approach seems to be more fine granularity and can do refinement checks between different models. But it is preliminary so far. The Blazes approach can handle components. It looks like it is more coarse granularity but can be applied easier.
Comments