Managed Data Pipelines

ETL and stream processing from within your WarpStream Agents

Sink and Sync

Powerful stream processing made simple

ETL and stream processing from within your WarpStream Agents and cloud account. No additional infrastructure needed. You own the data pipeline – end to end. Raw data never leaves your account.

Everything just works

Create and edit pipelines. Pause and resume them. Automatic handling of authentication and WarpStream-native features. Utilize version control and branching — roll back and forward as needed. All from a simple YAML configuration and via WarpStream’s Console or our API / Terraform provider.

Streamline everyday data engineering tasks

Effortlessly tackle time-consuming tasks like transformations, integrations, and multiplexing, while seamlessly handling aggregations and enrichments, all with native WebAssembly (WASM) support.
Avoid the housekeeping and dive back into the work that really matters.
Mapping
Transform each element of a data stream based on specific functions to produce a brand new stream of transformed elements.
Copy
input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - mapping: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20
Multiplexing
Combine multiple data streams into a single, consolidated stream.
input:
  kafka:
    addresses: [ TODO ]
    topics: [ foo, bar ]
    consumer_group: foogroup

output:
  switch:
    cases:
      - check: doc.tags.contains("AWS")
        output:
          aws_sqs:
            url: https://sqs.us-west-2.amazonaws.com/TODO/TODO
            max_in_flight: 20

      - output:
          redis_pubsub:
            url: tcp://TODO:6379
            channel: baz
            max_in_flight: 20
Copy
Windowing
Group continuous streams into manageable sets or windows (like tumbling, sliding and sessions) to do operations like aggregations over periods of time.
Copy
input:
  nats_jetstream:
    urls: [ nats://TODO:4222 ]
    queue: myqueue
    subject: traffic.light.events
    deliver: all

buffer:
  system_window:
    timestamp_mapping: root = this.created_at
    size: 1h

pipeline:
  processors:
    - group_by_value:
        value: '${! json("traffic_light_id") }'
    - mapping: |
        root = if batch_index() == 0 {
          {
            "traffic_light_id": this.traffic_light_id,
            "created_at": @window_end_timestamp,
            "total_cars": 
            json("registration_plate").from_all().
            unique().length(),
            "passengers": 
            json("passengers").from_all().sum(),
          }
        } else { deleted() }

output:
  http_client:
    url: https://example.com/traffic_data
    verb: POST
    max_in_flight: 64
Enrichments
Enhance streams with additional information from external sources, making them more valuable for downstream processing and analysis.
Copy
input:
  mqtt:
    urls: [ tcp://TODO:1883 ]
    topics: [ foo ]

pipeline:
  processors:
    - branch:
        request_map: |
          root.id = this.doc.id
          root.content = this.doc.body
        processors:
          - aws_lambda:
              function: sentiment_analysis
        result_map: root.results.sentiment = this

output:
  aws_s3:
    bucket: TODO
    path: '${! meta("partition") }/${! 
    timestamp_unix_nano() }.tar.gz'
    batching:
      count: 100
      period: 10s
      processors:
        - archive:
            format: tar
        - compress:
            algorithm: gzip
Parquet
Write data ingested into a topic as a Parquet file to feed your data lake.
Copy
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! 
        uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6
Topic Mirroring
Mirror topics while preserving partition mappings to aid in disaster recovery, centralizing processing or analysis, or geo-distribute data to allow consumers low-latency access.
Copy
input:
  kafka_franz:
    consumer_group: bento_bridge_consumer
    seed_brokers: [ TODO ]
    topics: [ foo, bar ]

output:
  kafka_franz:
    seed_brokers: [ TODO ]
    topic: ${! meta("kafka_topic") }
    partition: ${! meta("kafka_partition") }
    partitioner: manual

Managed Data Pipelines are a feature of Bring Your Own Cloud

Kafka (TCO)
WarpStream
223k/year3
Kafka (TCO)

Heading goes here

Lorem ipsum dolor sit amet, consectetur adipiscing elit.

Close ToggleClose Toggle