Blog

Updates from the Arroyo team

Announcing Arroyo 0.10.0

Arroyo 0.10 is now available! This is our biggest release ever, featuring an entirely new SQL engine that's >3x faster and ships as a single binary. Plus NATS and MQTT connectors, more SQL features, and more.

Micah Wylde

Micah Wylde

CEO of Arroyo

The Arroyo team is thrilled to announce general availability for Arroyo 0.10. This release is our biggest ever. Since 0.9, we've completely rebuilt the core SQL engine around Apache Arrow and the DataFusion SQL toolkit.

Why did we do this? Well in short:

  • Throughput: 3x higher
  • Pipeline startup: 20x faster
  • Docker image size: 11x smaller
  • SQL functions: 4x more

This release also radically simplifies the Arroyo architecture. The entire system now compiles to a single binary that can be flexibly deployed on a single machine, simple container orchestration platforms like Fargate, and scalable distributed runtimes like Kubernetes.

For way more detail on how we got here and why we're making this change, see our migration announcement blog post.

While most of our focus has been on the migration, we still have a few new features and two new connectors contributed by our amazing community.

Want to try it out? Getting started is as easy as running

docker run -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:0.10

We're very happy to welcome several new contributors to the Arroyo project:

  • @bakjos, who contributed the MQTT connector
  • @gbto, who contributed the NATS connector
  • @FourSpaces, who contributed bug fixes to helm

And thanks to everyone else who helped with this release!

Table of Contents

New Connectors

NATS

NATS is a messaging and queueing system designed for simplicity and performance. Arroyo 0.10 adds sources and sinks for Core NATS and NATS Jetstream, which layers on persistence and delivery guarantees.

NATS logo

For example, to consume from a NATS stream, we can use this DDL:

CREATE TABLE logs (
    id BIGINT NOT NULL,
    time TIMESTAMP NOT NULL,
    host TEXT NOT NULL,
    level TEXT NOT NULL,
    message TEXT NOT NULL
) with (
    type = 'source',
    connector = 'nats',
    servers = 'localhost:4222',
    subject = 'logs',
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json'
);

See the connector docs for more details.

Thanks to Quentin Gaborit for this incredible contribution!

  • Add NATS source and sink connectors by @gbto in #578

MQTT

MQTT is a lightweight messaging protocol widely used with low-power devices and in the Internet of Things (IoT) space. Arroyo 0.10 now ships with an MQTT source and sink to consume and produce from MQTT brokers.

MQTT logo

For example, you can create an MQTT source with the following SQL

CREATE TABLE devices (
    device_id BIGINT NOT NULL,
    time TIMESTAMP NOT NULL,
    lat FLOAT NOT NULL,
    lng FLOAT NOT NULL,
    metadata JSON
) with (
    connector = 'mqtt',
    url = 'tcp://localhost:1883',
    type = 'source',
    topic = 'events',
    format = 'json'
);

See the connector docs for more details.

Thanks to community member Giovanny Gutiérrez (@bakjos) for this amazing contribution!

$ arroyo

With the new architecture in 0.10, the entire Arroyo system now builds as a single binary

$ arroyo
Usage: arroyo <COMMAND>
 
Commands:
  api         Starts an Arroyo API server
  controller  Starts an Arroyo Controller
  cluster     Starts a complete Arroyo cluster
  worker      Starts an Arroyo worker
  compiler    Starts an Arroyo compiler
  node        Starts an Arroyo node server
  migrate     Runs database migrations on the configure Postgres database
  help        Print this message or the help of the given subcommand(s)
 
Options:
  -h, --help     Print help
  -V, --version  Print version

Running a complete local cluster is as easy as

$ arroyo cluster
INFO arroyo_controller: Using process scheduler
INFO arroyo_server_common: Starting cluster admin server on 0.0.0.0:8001
INFO arroyo_controller: Starting arroyo-controller on 0.0.0.0:9190
INFO arroyo_api: Starting API server on 0.0.0.0:8000
INFO arroyo_compiler_service: Starting compiler service at 0.0.0.0:9000

Today we are providing pre-compiled binaries on the release page for Linux and MacOS, or you can build your own by following the dev instructions.

Note that Arroyo currently requires a running Postgres database for configuration, but in the next release we will be adding the option of using sqlite. Prometheus is also used to power the pipeline metrics in the Web UI if available.

  • Add a new binary crate 'arroyo-bin' to package the entire system by @mwylde in #514

Performance

Arroyo 0.10 is significantly faster than 0.9:

Performance comparison benchmarks between 0.9.1 and 0.10.0

Benchmarks comparing Arroyo 0.10.0 to 0.9.1. In all cases, JSON-encoded Nexmark data was read from a Redpanda source. Query numbers refer to Nexmark benchmarks. Benchmarks run on a C7i.4xlarge.

In fact, it's so fast that for many benchmarks we've run on EC2 instances, it can saturate the network before using all the available CPU. We'll be following up with rigorous benchmarks against other systems, but in early tests it significantly beats other streaming systems in throughput.

How has it gotten so much faster? This is deserving of an entire blog post, but the short version is that Arroyo now operates internally on columnar data using carefully tuned vector kernels, thanks to the hard work of the Apache Arrow and DataFusion communities.

Columnar data is now the standard for OLAP (analytics-oriented) query engines like ClickHouse, Pinot, and Presto. There are a few reasons for this:

  • By storing all values in a column together, you can achieve better compression ratios and make better use of CPU cache
  • Only the columns actually referenced in a query need to be read, reducing disk and network IO
  • Columnar processing aligns well with the vector capabilities in modern CPUs, providing 3x or more speedups

However, row-oriented data remains the standard for streaming engines. There are some inherent tradeoffs between latency (how quickly an event can traverse through the pipeline) and throughput (how many events can be processed with a given amount of CPU). By batching1 data we can get higher throughput at the expense of latency. And columnar representations require that we batch a number of events together before we see performance improvements (in fact, with a small number of rows columnar processing will be much slower due to fixed overhead).

But there's a strong argument for moving to columnar representations for streaming: at any given batch size, the higher the throughput the less time we must wait to fill that batch. For example, if we want at least 100 records in our batch to overcome fixed costs, the amount of time we need to wait to receive 100 records will depend on our throughput:

  • At 10 events/second, it takes 1 second
  • At 1,000 — 0.01 seconds (100ms)
  • At 1,000,000 — 0.0001 (0.1ms)

Or looking at it from a fixed latency perspective (say, waiting at most 10ms):

  • At 10 events/second, our batch size is 1
  • At 1,000 — 100
  • At 1,000,000 — 100,000

In Arroyo 0.10, we perform batching at the source and via aggregating operators like windows and joins. The source batching behavior can be configured via two environment variables:

  • BATCH_SIZE which controls the maximum batch size
  • BATCH_LINGER_MS which controls the maximum amount of time to wait (in milliseconds) before emitting a batch

By configuring these values, users can choose their own tradeoff between latency and throughput.

SQL Improvements

Functions

We've added support for over 200 new scalar, aggregate, and window SQL functions. These unlock many powerful new capabilities that previously would have been inexpressible or would have required writing a UDF.

There are too many new functions to highlight them all, but some exciting additions include:

  • Nearly 200 scalar functions, covering math, conditions, strings, regex, JSON, times, and more
  • Statistical and approximate aggregate functions like approx_percentile_cont and approx_distinct, which uses HyperLogLog to produce very memory-efficient estimate distinct counts
  • Many new SQL window functions including rank, percent_rank, lag, and lead

Check out the Scalar, Aggregate, and Window function docs for the complete list.

Arrays

Previously, Arroyo supported a limited set of operations on arrays, but only within certain contexts. Now we have complete array support, including a comprehensive set of array function, support for indexing via square brackets (like a[1]), support for serializing arrays as JSON and Avro, the ability to convert aggregates into arrays (via array_agg), and the ability to unroll arrays into separate rows via unnest.

For example, we can write unique IPs over a tumbling window as an array to Kafka with a query like this:

CREATE TABLE sink (
  values TEXT[]
) with (
  connector = 'confluent',
  connection_profile = 'confluent-working',
  format = 'avro',
  'avro.confluent_schema_registry' = 'true',
  topic = 'list_output',
  type = 'sink'
);
 
INSERT INTO sink
SELECT array_agg(DISTINCT ip) FROM logs
GROUP BY tumble(interval '10 seconds');

Source Fusion

Arroyo 0.10 includes new optimizations that reduce the amount of data read from a single source when it's used across multiple queries. Now, each source will only be read once rather than once per subquery.

This same optimization also applies to views, so compute can be reused across outputs.

A graph showing the Source Fusion optimization

In case you ever wanted to replicate data from Kafka into (nearly) every supported Arroyo sink

Upgrading to 0.10

There are some backwards-incompatible changes in 0.10's SQL. Some pipelines written for 0.9 will need to be updated as described here.

Virtual fields

The syntax for virtual fields has changed to more closely match Postgres: we now require the STORED keyword. Previously:

event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date as TIMESTAMP))

Now:

event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date as TIMESTAMP)) STORED

UDFs

UDF definitions have changed slightly; now the UDF function must be annotated with an attribute macro #[arroyo_udf_plugin::udf], for example:

use arroyo_udf_plugin::udf;
 
#[udf]
fn plus_one(x: i64) -> i64 {
    x + 1
}

This change means we now support defining other helper functions within the UDF definition.

Additionally, async UDFs no longer support defining a Context struct; instead use a static OnceLock or tokio::sync::OnceCell to share resources between async UDF invocations.

Async UDF options are now specified in the #[udf()] attribute macro rather than in the toml comment, like

#[udf(ordered, allowed_in_flight=100, timeout="10s")]
async fn query_db(id: u64) -> String {
    ...
}

See the async UDF docs for all of the details.

Update behavior

Previously, when using update tables every change would be output. In 0.10, changes are buffered for some amount of time to reduce the volume of outputs. This interval is by default 1 second, but can be configured via the UPDATE_AGGREGATE_FLUSH_MS environment variable.

See the update table docs for more details.

Improvements

Fixes

  • Upgrade framer-motion to fix UDF popover by @jbeisen in #499
  • Pin openapi generator version by @mwylde in #559
  • Fix a bug in the k8s deployment where the worker.slots configuration by @FourSpaces in #595
  • Fix issue with multiple windows open to the pipeline editor by @mwylde in #593

Migration work

Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.9.1...v0.10.0

Footnotes

  1. Some engines like Spark Streaming operate on micro-batches, which sound similar but are quite different. In a micro-batch engine, the input is broken up into groups of data, each of which is processed as an independent batch job. Whereas in pure streaming systems like Flink and Arroyo, batching is purely a performance optimization and doesn't affect the semantics of the system.