Blog

Updates from the Arroyo team

Announcing Arroyo 0.6.0

Arroyo 0.6 brings support for Google Cloud Storage, user-defined aggregate functions, SQL correctness tests, and more

Micah Wylde

Micah Wylde

CEO of Arroyo

Today I'm thrilled to announce the release of Arroyo 0.6.0, the latest version of the open-source stream processing engine which makes it easy to build complex, stateful streaming applications by writing SQL.

This release is focused on quality. After a break-neck pace of new features in the past few releases, we're taking a step back to focus on testing, correctness, and stability. I'm particularly excited about our new SQL test suite, which has uncovered a number of bugs and edge cases in our SQL support.

That said it wouldn't be a new Arroyo release without some new features! We've added support for Google Cloud Storage (GCS) and improved our Helm chart so that Arroyo now runs well on Google Kubernetes Engine (GKE). We've also included a couple of new connectors—a polling HTTP source and a webhook sink—that helps Arroyo fit into companies that are earlier in their streaming journey. And we've shipped the first version of our UDAF support, which allows you to write your own SQL aggregate functions in Rust.

Read on for more details, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

Now that 0.6 is out the door, we're already hard at work on the next release, planned for mid-October. We're working on a number of exciting features and improvements. We're adding support for accumulator-based UDAFS, which will allow for efficient incremental calculations of aggregates. We're also working on improvements to our checkpoint storage to support compaction and more efficient deletes. And we'll be releasing a Postgres connector to allow directly writing to Postgres tables without having to use Debezium.

Anything you'd like to see? Let us know on Discord!

Now on to the details.


Features

GCS/GKE

Arroyo has long supported Amazon S3 as a storage backend for storing pipeline artifacts and checkpoints. With this release we're greatly expanding our storage options, with support for Google Cloud Storage (GCS) and alternative S3-compatible systems like Minio and Localstack.

We've also made the way storage is configured more flexible and consistent, via two environment variables: CHECKPOINT_URL and ARTIFACT_URL. These variables can take a variety of URL forms, like:

  • s3://my-bucket
  • s3::https://my-custom-s3:1234/my-bucket
  • https://s3.us-east-1.amazonaws.com/my-bucket
  • file:///my/local/filesystem
  • gs://my-gcs-bucket

See the docs for how to configure Arroyo on GKE.

  • Add support for GCS and minio/localstack by @mwylde in #296

User-defined aggregate functions

SQL Aggregate functions allow you to compute summary statistics over a group of rows, like SUM, AVG, and COUNT. Arroyo 0.6 adds initial support for user-defined aggregate functions (UDAFs) which allow you to write your own aggregate functions in Rust.

For example, Arroyo doesn't include an aggregate function for computing Exponential Moving Average (EMA), but now you can write your own:

fn ema(data: Vec<f64>) -> f64 {
    const alpha: f64 = 0.1;
    let mut ema = data[0] as f64;
 
    for i in 1..data.len() {
        let ema_value = alpha * data[i] as f64 +
            (1.0 - alpha) * ema;
        ema = ema_value;
    }
 
    ema
}

Currently only UDAFs over vectors are supported, but we plan to expand this to support UDAFs that rely on partial aggregates and two-phase aggregations.

  • implement UDAF support by @jacksonrnewhouse in #312

Connectors

We've added two new connectors that are well suited for companies that are trying to integrate Arroyo with HTTP-based systems, to complement our existing Server-Sent Events source and Websocket sources.

Polling HTTP source

The new polling http source lets you turn any HTTP API into a streaming source. It periodically polls the endpoint, and emits any new data into the stream.

For example, we're so excited whenever we get a new PR on Github and we want to know about it as soon as possible. We can use the polling HTTP source to periodically fetch our PRs and emit them whenever this changes:

CREATE TABLE prs (
    value TEXT
) WITH (
    connector = 'polling_http',
    endpoint = 'https://api.github.com/repos/ArroyoSystems/arroyo/pulls?per_page=1&state=all',
    poll_interval_ms = '5000',
    emit_behavior = 'changed',
    headers = 'User-Agent:arroyo/0.6',
    format = 'json',
    'json.unstructured' = 'true'
);
 
SELECT extract_json_string(value, '$[0].url') as url
from prs;
  • Polling http source by @mwylde in #311

Webhook sink

On the other end of your application, we now have a webhook sink that lets you send pipeline results to any HTTP endpoint. This allows you to consume outputs without needing to adopt streaming technologies like Kafka.

For example, we can build a simple Slack notification for our PRs using the webhook sink and the source we defined above:

CREATE TABLE slack (
    value TEXT
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX',
    method = 'POST',
    headers = 'Content-Type:application/json',
    format = 'json',
    'json.unstructured' = 'true'
);
 
INSERT INTO slack
SELECT concat('A new PR was created ',
    extract_json_string(value, '$[0].url')) as text
FROM prs;
  • Webhook sink by @mwylde in #297

Rest API

We've finished our migration from gRPC to REST for the public-facing Arroyo API. As of this release, all endpoints are now available via REST, and we've removed the gRPC endpoint from the API service. New endpoints in this release include the ability to create connections and fetch all job details including checkpoint information.

  • Switch all connection endpoints to REST API by @jbeisen in #242
  • Add pagination to pipelines and connection tables by @jbeisen in #255
  • Switch checkpoint details to REST API by @jbeisen in #265
  • Delete the GRPC API from the API service by @jbeisen in #266

Raw string serialization

Arroyo supports a number of different serialization formats, for handling data coming into and out of pipelines, including JSON, Parquet, and raw strings. In this release we've added support for serializing arbitrary string data. This is particularly powerful when combined with UDFs, which make it easy to construct complex string-based formats that Arroyo doesn't natively support.

For example it's now possible to write a UDF that produces complex, nested JSON that Arroyo's table schemas can't represent. For example:

/*
[dependencies]
serde_json = "1.0"
*/
 
fn my_to_json(f: f64) -> String {
    let v = serde_json::json!({
        "my_complex": {
            "nested_format": f
        }
    });
 
    serde_json::to_string(&v).unwrap()
}
  • Add support for serializing raw_string formats by @mwylde in #302

Improvements & Fixes

SQL

As I mentioned in the introduction, this is really the meat of this release. We've performed a major refactor to our SQL compiler that has fixed a number of bugs and inconsistencies. And we've added a new set of SQL correctness tests that perform end-to-end validation of our SQL implementation.

This work has uncovered a number of bugs and edge cases (fixed in this release) and gives us much more confidence in the quality of our SQL support going forward.

  • First pass at CodeGenerators for arroyo-sql by @jacksonrnewhouse in #258
  • Integration tests by @jacksonrnewhouse in #299
  • Use table scan name as relation when generating the projection by @jacksonrnewhouse in #288
  • Handle aliased aggregate expressions by @jacksonrnewhouse in #263
  • Refactor virtual expressions, cast on type mismatch virtual expressions by @jacksonrnewhouse in #264
  • Unkey updating aggregates so downstream nodes are happy by @jacksonrnewhouse in #317

Console & API

Beyond SQL, we've also made a number of quality-of-life improvements to the console, including an improved preview experience, better error handling, and pagination.

  • Paginate job log messages and generalize paginated content by @jbeisen in #267
  • Apply cargo fmt with new 1.72 rules by @jacksonrnewhouse in #270
  • Send connection profile config as json and validate it by @jbeisen in #269
  • Change catalog overflow to auto by @jbeisen in #268
  • Fix sink metrics stuck loading by @jbeisen in #289
  • Show that preview pipeline has completed by @jbeisen in #295
  • Use resizable panels on create pipeline page by @jbeisen in #301
  • Fix code editor layout issue by @jbeisen in #304
  • Do not retry or revalidate graph request by @jbeisen in #309
  • Close job event source when possible by @jbeisen in #310
  • Return all operator errors by @jbeisen in #298
  • Fix preview race condition by @jbeisen in #322

Connectors & Schemas

  • Don't generate schemas with unsigned types for Kafka connect by @mwylde in #294
  • Fix compilation for nested json schema fields by @mwylde in #278
  • Fix connection tables with connection profiles by @mwylde in #303
  • Enable configuration of aws_region for Kinesis sources and sinks by @jacksonrnewhouse in #313
  • Fix fluvio table by @mwylde in #316
  • Skip lines from file in single_file source by @rcjmurillo in #321

Engine

  • Do not panic when epoch is not found by @jbeisen in #281
  • Refactor engine starting logic by @jbeisen in #272
  • Fix some data fetching race conditions by @jbeisen in #285
  • Fix how we set subtask metadata has_state by @jbeisen in #305

Deployment

  • Revamping the helm Chart and implementing CI/CD by @edmondop in #271
  • Better support for service accounts in helm and other helm improvements by @mwylde in #314

Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.5.0...v0.6.0