Blog

Updates from the Arroyo team

Announcing Arroyo 0.4.0

With the 0.4.0 release we've added Debezium support, a new REST API, and made the process of contributing connectors much easier

Micah Wylde

Micah Wylde

CEO of Arroyo

A fox in the mountains

It's been a month since we released Arroyo 0.3, which means it's time for another release! Arroyo 0.4 is now available, bringing a number of new features and improvements to the state-of-the-art Arroyo streaming engine.

Table of Contents

Overview

Arroyo 0.4 brings some big new features like update tables, Debezium support, and a major redesign of the connectors system that makes it much easier to build new connectors. Leveraging that, we've added websocket and fluvio connectors. We're also releasing the initial endpoints for our new REST API, which makes it easier to build automations around Arroyo.

Read on for more details, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

With 0.4 out, we're already looking ahead to Arroyo 0.5, to be released in early August. The headline feature of 0.5 will be the new Filesystem connector, which will support high throughput, transactional writes from Arroyo into data warehouses and data lakes backed by object stores like S3. We'll also be finishing the transition to the new REST API, adding Redis and Kinesis connectors, and adding a transactional Kafka sink. On the SQL side we'll be working on session windows and support for joining on external tables.

Anything else you'd like to see? Let us know on Discord!

Now on to the release notes.


Features

Update Tables

Arroyo 0.4 brings support for update tables. Exactly what that means is a bit complicated (and we'll dive into it below) but the short version is that you can now use Arroyo to efficiently read and write data from databases like Postgres and MySQL via Debezium, and many queries that were previously unsupported are now supported.

So what are update tables? Let's talk through the semantics of Arroyo tables today, which we'll call append tables going forward.

Take this query:

SELECT store_id, status from orders;

which produces this output stream:

Time                        store     status
7/10/23, 11:34:34 AM PDT    1142      "accepted"
7/10/23, 11:34:34 AM PDT    1737      "accepted"
7/10/23, 11:34:34 AM PDT    1149      "accepted"

This query will output one row for every record that comes in on the orders stream (let's say that's a kafka topic that receives every order). You can think of this as modeling a virtual table with three columns (time, store, and status). Each new order that comes in produces a new row in that table, or in other words is appended.

But what if we have a query that needs other operations beside appends? For example, consider this query:

SELECT store, count(*) AS count
FROM orders
GROUP BY customer;

which models a table with one row per customer. When a new order comes in, we may append a new row if it's a new customer, or we may need to update an existing row if we've already seen that customer. In other words, we need to support updates.

In Arroyo 0.3 that query is not supported, but in 0.4 it will produce an update stream that looks like this:

Time                     previous                              current                               op
7/10/23, 4:03:42 PM PDT  { "orders_store_id": 3, "count": 1 }  { "orders_store_id": 3, "count": 2 }  "u"
7/10/23, 4:03:40 PM PDT  null	                               { "orders_store_id": 1, "count": 1 }  "c"
7/10/23, 4:03:40 PM PDT  null                                  { "orders_store_id": 3, "count": 1 }  "c"

Each output records an update of some kind, either a [c]reate, [u]pdate, or [d]elete. This stream can be used directly, or it can be used to materialize the output into another database like Postgres or MySQL via Debezium, which natively supports this kind of update stream.

Update tables can also be used with Debezium to write to Arroyo from a SQL database CDC source. See the new Debezium tutorial for more details on how to set this up.

Beyond use with Debezium, update tables can also be very useful for efficiently implementing queries where it's important to know when some key enters or leaves a set. For example, for a fraud detection system you may have a set of rules that indicate possibly-fraudulent activity, like this query which looks for sites with suspiciously high click-through rates:

SELECT site as suspicious_site
FROM (
    SELECT site, clicks / impressions as click_through_rate
    FROM (SELECT site,
        SUM(CASE
            WHEN imp_type = 'click' THEN 1 ELSE 0 END) as clicks,
        SUM(CASE
            WHEN imp_type = 'impression' THEN 1 ELSE 0 END) as impressions
        FROM event_stream
    GROUP BY 1)
) WHERE click_through_rate > 0.02;

This query will produce a record with "op": "c" whenever a site first exceeds the threshold, and "op": "d" whenever a site falls below the threshold.

Connector redesign

Screenshot of Arroyo showing the available connectors

Connectors integrate Arroyo with external systems. They implement sources that read data from external systems and sinks that write data to external systems.

Arroyo 0.4 brings a major redesign of the connectors system, making it much easier to build new connectors. In previous releases of Arroyo, connectors were deeply integrated with the various Arroyo sub-systems (the console, api, database, sql planner, compiler, etc.) and adding or modifying a connector required changes to all of those systems.

In 0.4, connector implementations are cleanly separated out into the new arroyo-connectors crate. New connectors can be created by implementing a simple trait.

This redesign has allowed us to add a number of new connectors in 0.4 (detailed below), and will accelerate our connector development going forward.

We've also revamped the UI experience around creating sources and sinks, which are now jointly managed in the new Connections tab in the console. This provides a more straightforward experience for creating and managing connections.

Finally, DDL for creating sources and sinks has also been updated to be more consistent and easier to use. For example, a Kafka source can be created with the following SQL:

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'broker-1.cluster:9092,broker-2.cluster:9092',
  topic = 'order_topic',
  type = 'source',
  'source.offset' = 'earliest'
);

New connectors

Arroyo 0.4 includes a number of new connectors leveraging the connector redesign. See the connector docs the full list of supported connectors.

Websocket sources

Arroyo 0.4 adds a new Websocket source, which allows Arroyo to read data from the many available websocket APIs.

For example, Coinbase provides a websocket API that streams the full orderbook for various cryptocurrencies. We can use the new Websocket source to stream that data into Arroyo, and perform real-time analytics on it.

As a simple example, this query computes the average price of Bitcoin in USD over the last minute:

CREATE TABLE coinbase (
    type TEXT,
    price TEXT
) WITH (
    connector = 'websocket',
    endpoint = 'wss://ws-feed.exchange.coinbase.com',
    subscription_message = '{
      "type": "subscribe",
      "product_ids": [
        "BTC-USD"
      ],
      "channels": ["ticker"]
    }',
    format = 'json'
);
 
SELECT avg(CAST(price as FLOAT)) from coinbase
WHERE type = 'ticker'
GROUP BY hop(interval '5' second, interval '1 minute');

Fluvio source/sink

Arroyo 0.4 adds a new Fluvio source and sink, which allows Arroyo to read and write data from Fluvio, a high-performance distributed streaming platform built on top of Rust and Kubernetes.

Fluvio has support for simple, stateless processing, but with Arroyo it can be extended to perform complex, stateful processing and analytics.

REST API

Today Arroyo is primarily used through the web console, which is great for individual users and small teams. But for more advanced use cases and larger orgs it's important to build automation and integrate Arroyo with internal infrastructure.

Arroyo has always provided a gRPC API that controls all aspects of the system. This is the API that powers the console. But gRPC can be difficult to work with, and it isn't widely supported by existing tools and libraries. We also haven't treated the gRPC API as a stable interface and have made regular breaking changes.

So with this release, we're starting the process of migrating the API to REST, and making it a first-class, stable interface for Arroyo. Arroyo 0.4 adds the first REST endpoints that support pipeline creation, management, and inspection. For example, a SQL pipeline can be created with the following curl command:

curl -XPOST http://localhost:8000/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders",
    "query": "SELECT * FROM orders;"
    "udfs": [],
    "parallelism": 1,
  }'

See the REST API docs for more details.

In future releases we'll continue to expand the REST API to cover all aspects of Arroyo, and will eventually deprecate the gRPC API.

SQL date/time functions

This release adds more date/time functions to the SQL engine, including:

  • date_part
  • date_trunc
  • extract
  • to_timestamp

See the SQL function docs for more details.

Fixes

  • Wait for checkpoint to finish before issuing final checkpoint by @mwylde in #192

Improvements

The full change-log is available here.