Noria: dynamic, partially-stateful dataflow for high-performance web applications

This paper, which appeared in OSDI 2018, considers the question of how to design a read-optimized RDBMS database with built-in support for caching and fast lookup answers to complex queries. The resulting system, Noria is implemented in Rust, and is available as opensource at https://github.com/mit-pdos/noria. There are already two good summaries written of the paper (from Adrian Colyer and Micah Lerner), so my job today will be simpler.

Artwork created by Stable Diffusion

Motivation and overview

Databases (mostly relational databases) are overwhelmingly used with read-heavy workloads due to all those SQL querying they serve, but they fail to take advantage of this and improve their designs to optimize for read-heavy workloads. Some crutch solutions was added to address this: memcached, read-only replicas etc., but these have shortcomings. Many deployments employ an in-memory key-value cache (like Redis, memcached, or TAO) to speed up common-case read queries. Such a cache avoids re-evaluating the query when the underlying records are unchanged. However, the application must invalidate or replace cache entries as the records change. This process is error-prone and requires complex application-side logic for handling cache misses, filling the cache, and deciding on cache eviction policies. Developers should also carefully avoid performance collapse due to "thundering herds" due to many database queries issued just after an invalidation.

There has been work on dataflow systems, but these focus on write performance and strong consistency at the detriment of read latency. They also have limited support for on-demand compute and eviction. Stream processing systems also use dataflow, but usually have windowed state and static queries that process only new records.


Can we do better? If we know more about the workload? What if we know the standing queries? Can we enrich the database to implement "automatic caching" to answer these queries as lookup reads from memory? Noria starts with a clean slate to address this question.

Materialize views (MV) has been used in RDBMS since the 80s. The idea in MV is to run the query and remember the result. But how do we maintain the materialization to the face of updates/inserts to the data? In Noria, you not only create a MV, but you also maintain it incrementally. Noria is able to achieve this thanks to the dataflow operators it deploys.

In this design, the updated data in the base tables moves to the compute, the changes propogate through graph of operators, and take effect at the derived views at the bottom. Each edge is a data dependency in the dataflow graph, so join depends on its inputs. The messages are represented as delta updates.

Noria dynamically changes long-running, low-latency streaming computations by modifying the dataflow. Unlike existing streaming dataflow systems like Naiad or Spark Streaming, it has no need for a restart or recovery from a checkpoint.


Ok, that is the basic idea. Let's double click on this now.

Noria design

Noria has two types of data storage: base tables, which hold data that is typically stored persistently, and derived views, which hold data that an application might cache for better performance. Compared to traditional databases, Noria's base tables are smaller because it can derive data that an application would otherwise need to pre-compute and store in a base table. The views, on the other hand, are larger than a typical cache because Noria derives more data, including intermediate results. Noria stores base tables on disk and views in server memory, and is designed to minimize memory use by only materializing records that are actually read and evicting infrequently-accessed data. (See partial-state state discussion coming next.)

Noria supports stateless and stateful operators. Stateless operators, such as filters and projections, do not need any context to process updates, while stateful operators, such as count, min/max, and top-k, maintain state to avoid inefficient re-computation of aggregate values. Stateful operators also use indexes to improve performance. Incremental SQL is a pain, doing joins is a pain, but Noria implements all that functionality.


In general, Noria ensures that if writes are completed, all external views will eventually hold results that are the same as if the queries had been executed directly against the base table data. This requires that operators are deterministic functions over their own state and the inputs from their ancestors, and that Noria avoids races between updates and queries, reordering of updates on the same data-flow path, and races between related updates that arrive independently at multi-ancestor operators via different data-flow paths. Consider an OR that combines filters using a union operator, or a join between data-flow paths connected to the same base table: such operators’ final output (and state) must be commutative over the order in which updates arrive at their inputs.

Partially-stateful data-flow

Partial-state is a key feature in Noria. The views need to evict old entries and add new ones on demand. This partially-stateful data-flow model in Noria lets operators maintain only a subset of their state. The derived views deal with the notion of missing state by issuing upqueries upstream on the dataflow operators to populate the missing state. Here is how this happens.

Eviction notices flow forward along the update dataflow path; they indicate that some state entries will no longer be updated. Operators drop updates that would affect these evicted state entries without further processing or forwarding. When Noria needs to read from evicted state, Noria recomputes that state. This process sends recursive upqueries to the relevant ancestors in the graph (Figure 4). An ancestor that handles such an upquery computes the desired value (possibly after sending its own upqueries), then forwards a response that follows the data-flow path to the querying operator. When the upquery response eventually arrives, Noria uses it to populate the evicted entry. After the evicted entry has been filled, subsequent updates through the data-flow keep it up-to-date until it is evicted again.

Noria makes state partial whenever it can service upqueries using efficient index lookups. If Noria would have to scan the full state of an upstream operator to satisfy upqueries, it disables partial state for that operator. This may happen because every downstream record depends on all upstream ones—consider e.g., the top 20 stories by vote count.

Dynamic dataflow

Application queries evolve over time. With it dynamic data-flow, Noria is able to adapt and support continuously-changing set of SQL expressions. In contrast, existing data-flow systems run separate data-flows for each expression, and initialize new operators with empty state and reflect only new writes, or require restarting from a checkpoint.

Given new or removed expressions, Noria transitions the data-flow to reflect the changes. Noria first plans the transition, reusing operators and state of existing expressions where possible. It then incrementally applies these changes to the data-flow, taking care to maintain its correctness invariants. Once both steps complete, the application can use new tables and queries.



Implementation and evaluation

Noria is implemented in ~45k lines of Rust and can operate both on a single server and across a cluster of servers. Applications interface with Noria either through native Rust bindings, using JSON over HTTP, or through a MySQL protocol adapter.

Noria persists base tables in RocksDB, a high- performance key-value store based on log-structured merge (LSM) trees. Batches of application updates are synchronously flushed into RocksDB’s log before Noria acknowledges them and admits them into the dataflow.

Noria processes updates in parallel on a cluster by hash-partitioning each operator on a key and assigning shards to different servers. Each machine runs a Noria instance, a process that contains a complete copy of the data-flow graph, but holds state only for its shards of each operator. When an operator with one hash partitioning links to an operator with a different partitioning, Noria inserts “shuffle” operators that perform inter-shard transfers over TCP connections. Upqueries across shuffle operators are expensive since they must contact all ancestor shards, but this is required for supporting partial-state via upqueries.

Read handlers process clients’ RPCs to read from external views. They must access the view with low latency and high concurrency, even while a data-flow worker is applying updates to the view. To minimize synchronization, Noria uses double-buffered atomic swap hash tables for external views. The data-flow worker updates one table while read handlers read the other, and an atomic pointer swap exposes new writes. This trades space and timeliness for performance: with skewed key popularity distributions, it can improve read throughput by 10× over a single-buffered hash table with bucket-level locks. The implementation of this awesome datastructure is provided here: “left-right: A lock-free, read-optimized, concurrency primitive.”



Discussion

In summary, Noria provides automated caching for sql queries, and reduces the need for complex, ad hoc caching logic. It maintains materialized views (MVs) as caches, and provides a detached read-path that serves queries/reads at the speed of lookups from memory. Noria's MVs allowing state to be missing from these materializations and using upqueries to populate missing state on demand.

I asked Jon Gjengset (the first author of Noria) about if monotonic clocks/timestamps can be used to provide consistent reads in Noria. He said that sharding introduces many challenges for implementing this. Due to sharding, the number of timestamps to track grows quickly, for every possible path from every table even for simple queries.  

Jon’s thesis: Partial State in Dataflow-Based Materialized Views

Video of Jon's thesis defense

Comments

Popular posts from this blog

Learning about distributed systems: where to start?

Hints for Distributed Systems Design

Foundational distributed systems papers

Metastable failures in the wild

The demise of coding is greatly exaggerated

The end of a myth: Distributed transactions can scale

Scalable OLTP in the Cloud: What’s the BIG DEAL?

SIGMOD panel: Future of Database System Architectures

Why I blog

There is plenty of room at the bottom