You have streaming data. Yes, you. Logs, metrics, http requests, analytics from your mobile apps… it's everywhere. But if you're like most people in the data industry, you don't treat it as a stream. It's written to a file and eventually uploaded to an object store like S3 or loaded into a data warehouse (like BigQuery or Presto) via a batch ETL process.
This works. It gives you reliable, efficient access to your data. But it means waiting an hour, a day, maybe longer before your events can be queried. I hate waiting.
Arroyo is a stream processing engine that lets you treat your streaming data as, well, streams. And with the release of Arroyo 0.5, we've added a high-performance transactional file sink that also solves this waiting problem for data warehouse ingestion.
In short, Arroyo can now write the results of your streaming SQL queries to object stores like S3 in Parquet and JSON formats.
Doing this well is surprisingly tricky, so I wanted to take this opportunity to walk you through how Arroyo manages to sink your data transactionally with low latency and in such a way that it can still be efficiently read by your query engine.
Why this is hard
Writing data to S3 seems pretty simple—after all, that's what it was designed for! But there are many challenges that arise when trying to achieve the following goals:
- Data should be written to the store with low latency
- In not too many files1,
- Transactionally (every record is guaranteed to end up on S3 exactly once)
These goals are somewhat in conflict.
For example, Arroyo relies on being able to frequently checkpoint its state to provide exactly-once semantics (goal 3). But the traditional strategy for writing transactionally to an object store (like the one used by Flink) causes a new batch of files to be written for every checkpoint, violating goal 2.
Or you could wait for many minutes to write files to reduce the number, but that means that you have to wait before your data is available to query.
Arroyo is able to checkpoint frequently2 (every 10 seconds by default) due to its high-performance storage backend, and we wanted to be able to maintain that without affecting the shape of our output data.
S3 isn't (really) a file system
And while S3 can be pretty simple to use with the standard client or CLI, we need more precise control over how files are written, and that exposes us to the weirdness of the underlying object-store model.
While objects in S3 are commonly treated as files, they lack some basic operations you might be accustomed to. In particular, you can't continuously write a small number of bytes to the file. You instead need to either upload the entire file in one shot or use a multi-part upload API with tight limits on how and when data is written.
Parquet is a columnar data format that is a very popular choice for backing query engines and data warehouses. Parquet files are immutable: designed to be written once and not further modified. However, to write files across checkpoints and with low latency, we need to be able to partially write a parquet file, then later continue it. We also need the ability to recover in-progress writes in the case of failure, which isn't supported by standard Parquet libraries.
How it works
With that background, let's walk through Arroyo's approach that provides durability and write flexibility that is unmatched by similar systems.
Checkpointing Multi-part Uploads
S3 uses an HTTP API to write and read data. Small files can be written with a single PUT request, but S3 also provides a Multi-Part Upload (MPU) API that allows developers to incrementally construct single S3 objects across many requests. This is done in three steps
- Initialize the MPU: this call returns a unique upload ID that should be used to affix every subsequent operation.
- Add Part: this adds a chunk of data to the MPU, along with a part number. It returns an ETag (entity tag), which will be used to complete the upload. The same part number can be uploaded multiple times, but only one corresponding ETag can be uploaded per part. Except for the final part, all parts must be between 5MB and 5GB.
- Finish Multi-part Upload: finally, you finish the MPU by sending the upload ID and Etags. On success the file will be visible in S3 and ready to be queried.
In order to support recovery, we need to checkpoint our in-progress state. And as discussed above, we'd like to be able to do this while an upload is in progress so that we can reduce the total number of files.
What do we need to store while a file in is progress? Finished parts require only you to save the ETag value and part number, so the amount of data that needs to be saved is bounded by the data you've buffered for the next part you plan to upload (at most 5MB)3.
The Problem with Parquet
In the case of writing Parquet files, we first write out a sequence of row groups, each storing compressed columns of data. Once all of the data has been written, the file is completed by writing a footer containing all of the metadata, such as schema, location of each row group, and version information.
The metadata needs to account for all of the data written, so we need to be able to store this in-progress metadata in the checkpoint in order to be able to recover on failure.
Unfortunately, the Rust Parquet library doesn't provide a way to save this metadata, as it expects users to write parquet files in a single session.
The approach we took was to add a method that returns the bytes that would constitute the footer, if we decided to close the file now. This allows us, in the event of a failure, the ability to craft the final part of the multi-part upload.
Finishing Files with Two-Phase Commit
We can't just keep writing new parts forever (there's a limit of 10,000 parts, among other reasons); eventually we'll have to finish the MPU. Unfortunately, the CompleteMultipartUpload API is a one-way door. Once the MPU is successfully completed you'll no longer be able to add new parts.
For this reason, completing files needs to be integrated into the checkpoint lifecycle. This is done by adding a two-phase commit (2PC) stage to Arroyo's checkpointing system.
After the standard Chandy–Lamport checkpoint barriers have passed through the dataflow, the controller4 follows these steps:
- Stores the fact that checkpointing has complete in our config store
- Enters a “committing” mode
- Sends an RPC to the committing operators (like the FileSystem sink), instructing them to commit.
With the new FileSystem sink, we believe that Arroyo is now one of the best tools for efficiently and correctly streaming data into S3. If you're interested in trying it out, we'd love to hear from you at email@example.com or on Discord.
Writing lots of files slows down query performance significantly ↩
So why not just checkpoint less often? This is the strategy many Flink users adopt, but it means that on failure you have to potentially reprocess and replay much more data. ↩
Technically, we also need to store recently posted parts until the async
add_multipart()call is finished. S3 is pretty fast, and it is unlikely that you'd have many simultaneous parts in-flight. ↩
The controller is the system that schedules jobs, manages checkpointing, and handles recovery. ↩