Managed Data Pipelines

Powerful Stream Processing Made Simple

ETL and stream processing from within your WarpStream Agents and cloud account. No additional infrastructure needed. You own the data pipeline – end to end. Raw data never leaves your account.

Everything Just Works

Documentation

Create and edit pipelines. Pause and resume them. Automatic handling of authentication and WarpStream-native features. Utilize version control and branching — roll back and forward as needed.

All from a simple YAML configuration and via WarpStream’s Console or our API / Terraform provider.

Powered by Bento
Bento's food escaped and they're having a party.

Streamline everyday data engineering tasks

Effortlessly tackle time-consuming tasks like transformations, integrations, and multiplexing, while seamlessly handling aggregations and enrichments, all with native WebAssembly (WASM) support.
Avoid the housekeeping and dive back into the work that really matters.
WarpStream BYOC Overview Video
Managed Data Pipelines
Hear WarpStream Engineer Aratz Manterola Lasa walk you through Managed Data Pipelines.
Mapping
Transform each element of a data stream based on specific functions to produce a brand new stream of transformed elements.
input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - mapping: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20
Multiplexing
Combine multiple data streams into a single, consolidated stream.
input:
  kafka:
    addresses: [ TODO ]
    topics: [ foo, bar ]
    consumer_group: foogroup

output:
  switch:
    cases:
      - check: doc.tags.contains("AWS")
        output:
          aws_sqs:
            url: https://sqs.us-west-2.amazonaws.com/TODO/TODO
            max_in_flight: 20

      - output:
          redis_pubsub:
            url: tcp://TODO:6379
            channel: baz
            max_in_flight: 20
Windowing
Group continuous streams into manageable sets or windows (like tumbling, sliding and sessions) to do operations like aggregations over periods of time.
input:
  nats_jetstream:
    urls: [ nats://TODO:4222 ]
    queue: myqueue
    subject: traffic.light.events
    deliver: all

buffer:
  system_window:
    timestamp_mapping: root = this.created_at
    size: 1h

pipeline:
  processors:
    - group_by_value:
        value: '${! json("traffic_light_id") }'
    - mapping: |
        root = if batch_index() == 0 {
          {
            "traffic_light_id": this.traffic_light_id,
            "created_at": @window_end_timestamp,
            "total_cars": 
            json("registration_plate").from_all().
            unique().length(),
            "passengers": 
            json("passengers").from_all().sum(),
          }
        } else { deleted() }

output:
  http_client:
    url: https://example.com/traffic_data
    verb: POST
    max_in_flight: 64
Enrichments
Enhance streams with additional information from external sources, making them more valuable for downstream processing and analysis.
input:
  mqtt:
    urls: [ tcp://TODO:1883 ]
    topics: [ foo ]

pipeline:
  processors:
    - branch:
        request_map: |
          root.id = this.doc.id
          root.content = this.doc.body
        processors:
          - aws_lambda:
              function: sentiment_analysis
        result_map: root.results.sentiment = this

output:
  aws_s3:
    bucket: TODO
    path: '${! meta("partition") }/${! 
    timestamp_unix_nano() }.tar.gz'
    batching:
      count: 100
      period: 10s
      processors:
        - archive:
            format: tar
        - compress:
            algorithm: gzip
Parquet
Write data ingested into a topic as a Parquet file to feed your data lake.
input:
    kafka_franz_warpstream:
        topics:
            - logs
output:
    aws_s3:
        batching:
            byte_size: 32000000
            count: 0
            period: 5s
            processors:
                - mutation: |
                    root.value = content().string()
                    root.key = @kafka_key
                    root.kafka_topic = @kafka_topic
                    root.kafka_partition = @kafka_partition
                    root.kafka_offset = @kafka_offset
                - parquet_encode:
                    default_compression: zstd
                    default_encoding: PLAIN
                    schema:
                        - name: kafka_topic
                          type: BYTE_ARRAY
                        - name: kafka_partition
                          type: INT64
                        - name: kafka_offset
                          type: INT64
                        - name: key
                          type: BYTE_ARRAY
                        - name: value
                          type: BYTE_ARRAY
        bucket: $YOUR_S3_BUCKET
        path: parquet_logs/${! timestamp_unix() }-${! 
        uuid_v4() }.parquet
        region: $YOUR_S3_REGION

warpstream:
    cluster_concurrency_target: 6
Topic Mirroring
Mirror topics while preserving partition mappings to aid in disaster recovery, centralizing processing or analysis, or geo-distribute data to allow consumers low-latency access.
input:
  kafka_franz:
    consumer_group: bento_bridge_consumer
    seed_brokers: [ TODO ]
    topics: [ foo, bar ]

output:
  kafka_franz:
    seed_brokers: [ TODO ]
    topic: ${! meta("kafka_topic") }
    partition: ${! meta("kafka_partition") }
    partitioner: manual
Get started through a cloud marketplace:
Bento logo
You don’t need to be a WarpStream user to benefit from Bento. We are committed to keeping it a 100% free, MIT-licensed, open source project and not gating features or changing licenses. Learn more about how Bento came to be.
Geoff the rice ball continues to allude the bento box.

FAQs

Don't see an answer to your question? Check our docs, or contact us directly.

Is there any difference between Managed Data Pipelines and Bento?

Bento is a 100% open source stream processing framework. Managed Data Pipelines allow WarpStream users to leverage Bento within WarpStream Agents without adding any additional infrastructure. Managed Data Pipelines are powered by Bento.

Because Bento is embedded in WarpStream Agents, there are WarpStream-exclusive capabilities that you can only accomplish with Managed Data Pipelines vs. Bento. For example, you can leverage Pipeline Groups to isolate pipelines to particular groups of Agents. This allows you to isolate the larger pipelines onto a dedicated group of Agents so that they can't interfere with the smaller pipelines.