Blog

Updates from the Arroyo team

We built a new SQL Engine on Arrow and DataFusion

Arroyo 0.10 has an entirely new SQL engine built with Apache Arrow and DataFusion. It's much faster, smaller, and easier to run. Read on for why and how we're making this change.

Micah Wylde

Micah Wylde

CEO of Arroyo

Arroyo is coming up on its one-year anniversary as an open-source project, and we've got something big planned. Our next release, 0.10, will feature an entirely new SQL engine built around Apache Arrow and the DataFusion SQL toolkit.

New to Arroyo?

Arroyo is an open-source stream processing engine, enabling users to transform, filter, aggregate, and join their data streams in real-time with SQL queries. It's designed to be easy enough for any SQL user to build correct, reliable, and scalable streaming pipelines.

This evolution reflects many lessons learned over the past two years of building Arroyo and many years of working on streaming engines before that. This post will go into detail on Arroyo's current implementation and why that's changing, but in short:

  • Performance: Arrow is an in-memory columnar format, designed to leverage the vector processing capabilities of modern CPUs; combined with high-performance compute kernels, we can achieve state-of-the-art performance for streaming, competitive with the best batch engines
  • Architectural simplicity: Today Arroyo generates Rust code, which is then compiled into a pipeline binary that performs data processing. Ahead-of-time compilation provides good performance, but requires complex infrastructure to compile pipelines. Arroyo 0.10 ships as a single, compact binary that can be deployed in a variety of ways.
  • Community: Arrow is fast becoming the center of the next-gen data stack, and by adopting it Arroyo can seamlessly interact with other data systems and even other languages like Python. By fully embracing DataFusion, we're able to leverage (and contribute to) the incredible work of the emerging Rust data ecosystem.

Since data folks like numbers, here are some comparisons to Arroyo 0.9:

  • Throughput: 3x higher
  • Pipeline startup: 20x faster
  • Docker image size: 11x smaller

As of today, Arroyo 0.10 is available as a developer preview. You can get started by running the docker container:

docker run -it -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:0.10-dev

We'd love to hear your feedback on Github or Discord!

So how did we get here, and why are we now making this change? Let's walk through some Arroyo history and along the way cover some of the design decisions that go into building a SQL engine.

Table of contents

Arroyo's origins

Arroyo was inspired by my experience leading Apache Flink teams at Lyft and Splunk. I'd seen the challenges of developing Flink streaming pipelines and how difficult they were operate. Around the data ecosystems, projects like Redpanda and ScyllaDB were successfully rethinking existing Java systems with simpler, higher-performance implementations in non-managed languages, and I thought there was an opportunity to do the same with Flink.

The initial prototype of Arroyo took Flink as a direct inspiration. Our new system would be written in a faster language (Rust) and would fix some of its shortcomings, particularly around state.

Other aspects we initially kept the same. For example, the core API of Flink is called the Datastream API. It's a Java API used to directly define the dataflow graph. This is a directed, acyclic graph on whose edges data messages flow, between operators which implement the various parts of query logic (like filter, joins, or windows).

In our initial prototype of Arroyo, this graph was similarly defined directly via a Rust API.

Stream::<()>::with_parallelism(1)
    .source(KafkaSource::new("localhost:9092", "events", HashMap::new()))
    .watermark(WatermarkType::Periodic {
        period: Duration::from_secs(1),
        max_lateness: Duration::ZERO,
    })
    .flat_map(|event| {
            event.split(" ").map(|w| (w.to_string(), 1)).collect()
    })
    .key_by(|(word, _)| word)
    .window(TumblingWindow::new(Duration::from_secs(1)))
    .sum_by(|(_, count)| count)
        .sink(KeyedConsoleSink::new());

But a compiled language like Rust presents some challenges here compared to the JVM. In Flink, pipelines are written in Java or Scala, compiled to JVM bytecode, and then dynamically loaded into the Flink runtime. But such an approach wouldn't fit well for Rust which expects statically compiled binaries1.

Instead we structured the Arroyo runtime as a library, which would be invoked by the actual pipeline code. Everything would then be compiled into a static binary which would execute the pipeline.

Adding SQL

At Lyft, SQL was the most requested feature from potential users of our streaming platform. While Flink had a SQL API, user tests showed that it was still too confusing and required too much Flink expertise for non-streaming experts to pick up.

So we knew from the beginning we needed to have an excellent implementation of SQL. We wanted data engineers and scientists who knew SQL to be able to build correct, reliable, and performant streaming pipelines without needing much expertise in building streaming systems.

When we began building the SQL interface we didn't want to start from scratch. So we turned to DataFusion, which is both a complete batch SQL engine and a composable library of SQL primitives. We decided to use just the frontend, which takes raw strings SQL provided by users through several stages:

  1. Parsing, where the SQL text is turned into an abstract-syntax tree (AST)
  2. Planning, which translates the AST into a logical graph of SQL operators with data dependencies between them
  3. Optimization, where various rewrite rules are applied to the graph to simplify it and make it more efficient to execute

Once we had the optimized graph, we translated it into our own dataflow representation (the physical graph) that we would execute in our runtime. This dataflow representation is the same as the one that could be manually constructed by using the Rust pipeline API described previously.

diagram showing the steps to transform a SQL query into a dataflow graph

Essentially, SQL support was layered on top of the existing graph API. This is actually quite similar to how Flink SQL works—SQL is parsed and planned by an external library (Apache Calcite), then compiled into a Flink Datastream program.

Once settling on this basic approach, there were two other design decisions we had to make, which would determine much of our future development: how we represented SQL data rows and how we implemented SQL expressions.

Representing data

SQL engines operate on rows of tabular data2. Tables are defined with schemas, which tell the engine the names and data types of the columns. Queries over those tables will project (transform) the data into other structures of data.

Let's take this example schema:

CREATE TABLE products (
    id INT,
    name TEXT,
    description TEXT,
    price FLOAT,
    quantity INT
);

We can project it into another form with a query like

SELECT price * 0.92 as euros FROM products

which will give us a new schema for the result with a single field euros FLOAT.

In the graph representation of this query, we have three nodes, or operators: a source (the table), a projection (the select statement), and a sink. Between each of those nodes is an edge, which has the data type of the records that flow along that edge.

An illustration of a dataflow with types flowing along the edges

To implement this in a real system, we need to have actual data types—ways of storing the data that operators produce and consume. There are two basic approaches here:

  • Dynamic typing: each row is represented by an array of fields, each of which is an enum of all of the supported data types. In Rust this could look like
enum Datum {
  Text(String),
  Int(i32),
  Long(i64),
  Float(f32),
  ...
)
 
struct Record {
	fields: Vec<Datum>
}
 
// The query about would produce a record like
Record {
  fields: vec![Datum::Float(50.4)]
}
  • Static typing: structs are generated for each unique data type, which are compiled into the pipeline binary. For example we'd generate this struct for the return type of that query:
struct GeneratedRecord2 {
  euros: f32
}

While dynamic typing is more popular in SQL engines (for example, used by Materialize, RisingWave, and traditional databases like Postgres), we opted for static representations. There were a few reasons for this, including better performance (compilers can apply much more powerful optimizations if they know the type of the data they're working with) and a better fit for the Rust API, which supported user-defined struct types.

Implementing expressions

SQL has an expression language, and any SQL engine needs to be able to evaluate those expressions during execution. For example, in the query above we have the expression price * 0.92, which takes a field from the row and multiplies it by a constant. Modern SQL engines support a huge variety of scalar functions, aggregates, window functions, and other ways of transforming values.

Engine designers once again have two options3, one dynamic and one static:

  • Interpretation: we traverse through the expression tree at runtime and evaluate each node against our data
  • Compilation: at planning time we compile the expression into machine code that is directly executed; this will generally be much faster than an interpreter

For traditional batch SQL engines there's a difficult tradeoff here. A user will send a batch query to the engine and wait until it has returned the result. Users thus care mostly about the end-to-end time a query takes to execute. Any time spent compiling the query is time that could have been spent executing it. For long-running queries (minutes to hours) the compile time gets amortized, but for a small query (seconds) it will tend to dominate the total runtime. Thus nearly all batch engines use some form of interpretation.

But for streaming engines the calculus is reversed. Streaming pipelines run indefinitely—for days or months—and users care primarily about cost. That means almost any upfront time is worth paying if it reduces the lifetime cost of executing the pipeline. In streaming engines it's therefore much more common to see ahead-of-time (AoT) compilation.

Arroyo already had a way to run Rust pipelines, so the straightforward answer for us was to compile SQL expressions into Rust code, and then compile it with the Rust compiler, which is very good at producing fast binaries thanks to the sophisticated LLVM optimizer.

Diagram showing the process of parsing an expression and producing Rust code from it

As a bonus, this approach also made it easy to add support for user-defined functions (UDFs). The Rust code generated from SQL expressions could natively call UDFs without any translation layer and could benefit from inlining and other compiler optimizations.

Building a managed cloud

Arroyo was originally conceived and built as a managed cloud service. I'd seen that running streaming pipelines reliably was a huge challenge for companies, who found they to needed to dedicate streaming ops folks. By offering a managed service, we could completely free users from having to worry about operations and enable many more organizations to adopt streaming.

In some ways, this early focus on providing a managed service has served us very well. Excellent support for multi-tenancy is very hard to retrofit into an existing engine, as I'd experienced building managed services with Flink.

But as a now open source project designed for self-hosting it held us back. When we initially open-sourced Arroyo last April we reworked a few systems to make it simpler to self-host, like writing a new state backend that operated directly on S3. But other complex pieces of infrastructure remained.

For a multi-tenant cloud, the cost of a piece of infrastructure—both in terms of hardware and operations—is amortized across all users of the service. But for a company trying to run a few pipelines the fixed costs tend to dominate.

This was particularly true of our reliance on Rust codegen and AoT compilation. To start pipelines users would need a Rust compiler. And for quickly iterating on pipelines it was important that the compiler have enough CPU to compile pipelines in a reasonable amount of time. The initial user experience was an even bigger issue: while Rust has good incremental compilation support, the cold build will be very slow (5+ minutes)—a long time to wait to run your first pipeline.

We worked around those issues by building out a compiler service which made good use of incremental compilation by reusing the same workspace for each pipeline build. To speed up the first pipeline, we built a special docker image that included a pre-warmed compiler cache. This worked quite well, ensuring that even the first pipeline could be started fairly quickly—typically under 10 seconds.

Improving the self-hosting experience

Arroyo has seen enormous growth as a project in the past year with our current approach. So why are making this change now?

Each of these design decisions—static typing, AoT code generation, using cloud-scale infrastructure—came with costs as well as benefits. While they made sense for our initial design target (building a managed cloud service) they were preventing adoption of Arroyo as a self-hosted engine.

The core issue was the compiler service: at scale, the resources required to run it would be amortized. But for a single company, this was a significant expense and additional architectural complexity. It also limited how small Arroyo deployments could be, since they relied on big pieces of common infrastructure. And finally, managing this infrastructure required complex systems like Kubernetes.

As developers working on the engine, code generation was also proving difficult. Rust has excellent libraries like syn and quote for parsing and producing Rust code, but code gen is still hard. Developers need to think on multiple levels at once, and understand how the generated code interacts with its environment.

You're also giving up one of the biggest benefits of Rust: the compiler is very good at catching mistakes. With code gen, compile-time errors are effectively made into run-time errors. We built some cool testing infra to help mitigate this, but runtime compile errors were a persistent problem.

Finally, we found that outside contributors really struggled to be productive with this style of development, and found understanding the system and debugging it very challenging.

The new Arroyo

Last fall, we began thinking about what a version of Arroyo without AoT compilation could look like. Many aspects of the system would need to change. Static typing would no longer be possible; we'd need to move to some form of interpretation for SQL expressions. UDFs would become more challenging to support.

This was going to be a huge project, reworking many fundamental aspects of the engine. But after prototyping some possible approaches, we made the decision to move forward.

Arrow dataflow

Our first decision was to adopt Apache Arrow4 as our in-memory data representation, replacing the static Struct types. Arrow is a columnar, in-memory format designed for analytical computations. The coolest thing about Arrow is that it's a cross-language standard; it supports sharing data directly between engines and even different languages without copying or serialization overhead. For example, Pandas programs written in Python could operate directly on data generated by Arroyo.

Streaming on columns

Columnar representations5 have been adopted by nearly every OLAP (analytics-oriented) engine over the past decade. There are a few reasons for this:

  • By storing all values in a column together, you can achieve better compression ratios and make better use of CPU cache
  • Only the columns actually referenced in a query need to be read, reducing disk and network IO
  • Columnar processing aligns well with the vector capabilities in modern CPUs, providing 10x or more speedups

However, row-oriented data remains the standard for streaming engines. There are some inherent tradeoffs between latency (how quickly an event can traverse through the pipeline) and throughput (how many events can be processed with a given amount of CPU). By batching data we can get higher throughput at the expense of latency. And columnar representations require that we batch a number of events together before we see performance improvements (in fact, with a small number of rows columnar processing will be much slower due to fixed overhead).

row-oriented vs column oriented database design

Streaming engines like Flink and Arroyo operate—logically—on events one at a time, with important guarantees around ordered processing. Initially they were implemented physically operating on events one by one. But the benefits of batching are hard to ignore and more recent versions of Flink do support some batching in SQL operators6.

But I think the argument for why batching makes sense in streaming is simple: at any given batch size, the higher the throughput the less time we must wait to fill that batch. For example, if we want at least 100 records in our batch to overcome fixed costs, the amount of time we need to wait to receive 100 records will depend on our throughput:

  • At 10 events/second, it takes 1 second
  • At 1,000 — 0.01 seconds (100ms)
  • At 1,000,000 — 0.0001 (0.1ms)

Or looking at it from a fixed latency perspective (say, waiting at most 10ms):

  • At 10 events/second, our batch size is 1
  • At 1,000 — 100
  • At 1,000,000 — 100,000

The takeaway: we only have to pay high overhead of small batch sizes when our data volume is very low. But if we're only handling 10 or 100 events per second, the overall cost of processing will be very small in any case. And at high data volumes (tens of thousands to millions of events per second) we can have our cake and eat it too—achieve high throughput with batching and columnar data while still maintaining low absolute latency.

Interoperating

The other key benefit of Arrow—over building our own columnar format—is that it's an open standard that is being increasingly adopted by modern data libraries and engines. Unlike other standard data formats (e.g., Parquet) Arrow is an in-memory representation. This means that it enables multiple engines, libraries, and even languages to operate on in-memory data without serialization or even copying overhead.

We're very excited about the possibilities that Arrow provides for interacting with other parts of the data ecosystem. In a future version of Arroyo, it should be possible to write performant UDFs and UDAFs in Python or Java that operate seamlessly within the streaming computation, without needing to send results off to another system.

More broadly we see Arrow as a sea change in how data systems are built, and are excited to bring that revolution to the streaming world.

Embracing DataFusion

I've mentioned DataFusion a few times already in this post, but it's worth at this point giving the full introduction. DataFusion emerged out of the Arrow project with an ambitious set of goals: provide both a state-of-the-art query engine for end users and a composable toolkit for developers building their own SQL tools. And like Arroyo, it's written in Rust, the best language for data infrastructure. Since starting in 2019 it's achieved both of these goals. As a query engine it regularly leads benchmarks. As a library, numerous projects have successfully built their SQL engines around it, including InfluxDB, ParadeDB, GreptimeDB and many others.

The DataFusion logo

Arroyo has long relied on DataFusion for just its SQL frontend (responsible for parsing and planning SQL). In other words, DataFusion operated only in the control plane of Arroyo, while the data plane (where data is processed) was our own engine, built from scratch. This meant implementing all of the SQL operators, expressions, and functions. And since our existing implementations rely on code generation and compilation, we were looking at redoing most of this work for the next version.

Instead, we took a second look at DataFusion and asked ourselves…could we be using DataFusion physical plans, operators, and expressions7? After some prototyping, the answer turned out: yes! Despite having been designed for batch systems, we found that it was flexible enough to reuse much of the code to implement streaming versions of our operators. And the physical expression code-responsible for evaluating SQL expression logic on Arrow data—was useable more-or-less as is.

There is a huge amount of momentum in the Rust data community right now, and much of it is centered around DataFusion. We're very excited to be part of that effort, contributing back work to make it an excellent streaming engine in addition to its batch capabilities.

Just the beginning

We started Arroyo with the goal of making streaming easy enough that every company can adopt it, and to empower every data user to build reliable, correct, and efficient real-time data pipelines.

With 0.10, we're taking the next step towards that vision by greatly expanding the environments in which Arroyo can be run. But we're just getting started.

Now that Arroyo compiles down to a single binary, we're working to remove the other external dependencies, including Postgres and Prometheus; future releases of Arroyo will have the option of running their control plane on an embedded sqlite database.

We're also working on new methods of distribution and configuration, including homebrew support and the ability to run pipelines directly from the binary without the persistent API and Web UI.

Beyond deployment and operations, we're also very excited to keep pushing the boundaries of streaming performance. Arroyo 0.10 is already one of the fastest streaming engines in the market, and there are many more optimizations we haven't implemented yet.

Get involved

The Arroyo 0.10 release has been the culmination of months of effort, completely rethinking how SQL streaming engines can work. We couldn't be more excited to share this with our community and the world, and see what users build on it.

The developer preview, 0.10-dev, is available starting today. Getting started with Docker is as easy as running

docker run -it -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:0.10-dev

Note that the developer preview still has a few missing features compared to 0.9, including SQL window functions, struct joins, update tables, and async UDFs, which will be added back in the coming weeks.

There are also a couple of breaking changes to the SQL syntax:

  • Virtual field definitions must be followed by the STORED keyword, matching postgres syntax, for example watermark TIMESTAMP GENERATED ALWAYS AS (timestamp - INTERVAL '1 minute') STORED
  • Re-aggregating a window aggregate now requires that the window be explicitly included in each GROUP BY. Previously it was implicitly included.

We'd love to hear your feedback! Come join our Discord or reach us via email. We also love contributions from the community. See the dev guide to get started, and come chat with the team in Discord with any questions.

Footnotes

  1. It is in general possible to dynamically load compiled code (with shared libraries) but this is challenging in Rust which lacks a stable Application Binary Interface (ABI). This means that shared libraries may not be compatible with a host that was compiled with a different version of the compiler or with different compiler options.

  2. Logically, everything in SQL is a row, although in actual execution, modern analytical SQL engines often use columnar representations instead. More on this later!

  3. I'm simplifying here, and in fact this is more of a spectrum between fully dynamic and fully static, encompassing techniques like virtual machines and just-in-time (JIT) compilation. See this great overview for a survey of how expressions are implemented in popular databases and query engines.

  4. Yes, 'Arroyo' and 'Arrow' have the same first four letters. No, this was not intentional. Yes, this does make it very hard to consistently type the correct word.

  5. Columnar representations are pretty straightforward to understand. If you imagine a SQL table as a grid of rows and columns, a traditional row-oriented representation would store the data row-wise, top to bottom. Columnar representations are inverse: they store column-wise, going left to right.

  6. Some engines like Spark Streaming operate on micro-batches, which sound similar but are quite different. In a micro-batch engine, the input is broken up into groups of data, each of which is processed as an independent batch job. Whereas in pure streaming systems like Flink and Arroyo, batching is purely a performance optimization and doesn't affect the semantics of the system.

  7. In SQL-engine-land, systems are often broken up into “logical” and “physical” components. The logical side is responsible for the high-level behavior of a query. For example, a logical join will capture information about the inputs, the join key, method of joining, etc. Whereas the physical side is responsible for how the query is actually—physically—executed on real hardware. The physical plan knows where data lives, how it's laid out, and accessed. The physical join knows how to read the data, implements a particular join strategy, and collects its results.