At Arroyo we're building a new stream processing engine to replace legacy Java systems like Flink and KSQL. So I was excited to see a project that's doing the same thing for Kafka.
It's called WarpStream, and they're building a replacement for Kafka that's backed directly by S3. It has a Kafka compatible API so naturally I wanted to see how hard it would be to use as a source and sink in Arroyo.
Spoiler: it just works!
Here's the full query we end up writing:
create table warpstream ( timestamp BIGINT, user_id TEXT, page_id TEXT, action TEXT, event_time TIMESTAMP GENERATED ALWAYS AS (CAST(from_unixtime(timestamp * 1000000000) as TIMESTAMP)), watermark TIMESTAMP GENERATED ALWAYS AS (CAST(from_unixtime(timestamp * 1000000000) as TIMESTAMP) - INTERVAL '5' SECOND), ) WITH ( connector = 'kafka', bootstrap_servers = 'api-d131463b-49e7-42b5-9112-0777af9617fe.discovery.prod-z.us-east-1.warpstream.com:9092', topic = 'demo-stream', format = 'json', type = 'source', event_time_field = 'event_time', watermark_field = 'watermark' ); select session('30 seconds') as window, user_id, sum(case when action = 'click' then 1 else 0 end) as clicks, sum(case when action = 'scroll' then 1 else 0 end) as scrolls, sum(case when action = 'hover' then 1 else 0 end) as hovers from warpstream group by window, user_id;