https://bytewax.io logo
Join Slack
Powered by
  • s

    steep-hospital-96238

    06/18/2025, 6:29 PM
    How do I monitor throughput per worker? I am thinking doing a
    count_window()
    but it requires me to assign a key to a stream which I don't want to do. Is there a magic "key" that keeps events on the same worker that they live?
    0
  • w

    witty-hydrogen-53156

    05/23/2025, 11:56 AM
    Hey all! I'm trying to get in contact with Bytewax' sales team to understand Bytewax Platform pricing for my startup. I've previously reached out via the contact form; is anyone able to help me connect with the right person?
    ✅ 1
    0
    • 1
    • 2
  • g

    green-mouse-71671

    05/05/2025, 12:44 PM
    Hi, I am using EventClock + TumblingWindower to process realtime events such as stock exchange trades. I want to create e.g. 1min aggregates of the events. Lets say I want to create 1min aggregate for trades that come between 123000 and 123059.599. I have an issue (probably) with
    wait_for_system_duration
    . When it's 123033.200 and the trade comes at this time with event time 123033:000 it is marked as
    late
    (because of the 100ms
    wait_for_system_duration
    ) which is not correct for my case as I still want to include such trade in the aggregate. I can set the
    wait_for_system_duration
    to 1min which then won't mark the trades as late but when I get to the end of the time window (123100.000) the aggregates won't be published - its gonna wait 1min to publish them which is also not what I want - I want to publish the aggregates immediately after window closes. How to achieve this? Is it even possible with bytewax?
    0
  • f

    full-ice-47894

    04/29/2025, 7:04 PM
    I have this weird problem where one of my pipelines (https://git.ligo.org/ultra-swift/shigawire/-/blob/main/shigawire/dataflows/testing.py) is printing the following error repeatedly when I try to enable OpenTelemetry tracing:
    OpenTelemetry trace error occurred. cannot send message to batch processor as the channel is full
    What's strange is that if I change the window clock from an EventClock to a SystemClock in https://git.ligo.org/ultra-swift/shigawire/-/blob/main/shigawire/operators/__init__.py?ref_type=heads#L96-98, the error message goes away and the traces start showing up in Jaeger. Has anyone seen an issue like this before?
    0
    • 1
    • 1
  • m

    mammoth-kitchen-79210

    04/22/2025, 3:15 PM
    hi yall 👋 have a question regarding processing semantics and the atomicity of processing in a kafka pipeline. if I have a computation node reading from KafkaSource, writing to KafkaSink, and maintaining some state (e.g. for deduplication), is persisting the state / topic offset snapshot atomic with respect to relaying the message further to the output KafkaSink? my processing is generally idempotent, in that it has no non-idempotent "side effects," but I am not OK sending a duplicate of an already processed event to the KafkaSink in a scenario where the write to the Kafka sink succeeded, but the persist of the state did not. likewise I would not be OK with the opposite: the state / offset persist succeeding while the write to Kafka sink did not, in which case I suspect relaying the message to further topics for processing would erroneously be skipped.
    0
    • 1
    • 1
  • l

    loud-fall-50710

    04/19/2025, 2:57 PM
    Hey our team is just looking into Bytewax and it looks like a great fit for MQTT/Kafka IoT stream processing. However we have one important question: Can stream-processing behavior be altered at runtime? • e.g. send a command to the bytewax stream-processing service to start an anomaly detection process on a newly discovered data stream • then send a command to the service altering e.g. the window size or the anomaly threshold without having to redeploy the service • If possible, how would one go about this? Any relevant resources would be highly valued 🙂
    🙌 1
    0
  • w

    wonderful-hairdresser-34729

    04/15/2025, 2:07 PM
    So I used to do this in pyspark..use a cronjob to schedule it to wake up at 9am everyday. At that time it reads all messages in a Kafka topic between 9am yesterday till 9am today. It then does a count based on a key. What bytewax flow + operator would you recommend for this?
    0
    • 1
    • 1
  • t

    thankful-morning-24927

    04/15/2025, 11:24 AM
    I came across this ClickHouse connector for Bytewax: https://github.com/bytewax/bytewax-clickhouse — looks great! I was wondering, has anyone built something similar for PostgreSQL? Or maybe there are any existing operators or examples for working with Postgres in Bytewax?
    0
  • a

    able-airline-43286

    04/14/2025, 7:51 PM
    I have a basic question on the stateful ops in Bytewax. Does the user need to worry about synchronizing the stateful logics? Or does Bytewax already guarantees only one stateful logics are running for 1 key at a time? say, with the following stateful map op:
    Copy code
    mapped = op.stateful_map('map_op', input, my_stateful_mapper)
    do i need to add any lock inside the
    my_stateful_mapper
    ?
    0
    p
    • 2
    • 1
  • q

    quiet-barista-67180

    04/07/2025, 12:54 AM
    Is the bytewax modules still active? It looks like the url is a shutdown shopify app? https://bytewax.io/modules Is ManualInputConfig and ManualOutputConfig the right way to build simple connectors? I saw it in the Nats blog post
    0
    p
    • 2
    • 3
  • r

    ripe-umbrella-71285

    04/05/2025, 6:40 PM
    how can i use bytewax to access data which has come through an activemq interface?
    0
  • d

    damp-finland-12935

    04/02/2025, 6:48 AM
    Hi everybody 👋 is there a flag to enable compression per
    KafkaSinkMessage
    , or do you need to manually compress bytes and add the appropriate header?
    0
    • 1
    • 1
  • i

    important-lunch-39532

    03/25/2025, 11:51 PM
    Hi guys - are there any testing tools/examples of unit tests in Python to assert a dataflow is structured as expected once it's constructed? I have some custom operators/sources, which are put together into a dataflow based on config and would like to verify the dataflow that comes out the other end matches expectations - I have tried playing around with the mermaid diagram generator as a possible solution, but feels like there might be something a bit better out there. Is there a way to stamp some metadata on steps/operators that I can identify other than using the ID field?
    0
    c
    • 2
    • 2
  • r

    rapid-nail-81224

    03/25/2025, 3:17 AM
    Hello! I’m building a dataflow reading from Kafka and doing some processing. I’m thinking productionalization and have the following questions: 1. when deployed on k8s with multiple pods, what’s is the behavior of bytewax when one of the pod crashes? 2. how do I make the application exit gracefully (for example aligned with checkpoints when recovery is enabled)? Is it even worth the effort.
    0
    b
    s
    • 3
    • 2
  • a

    ancient-computer-86146

    03/06/2025, 9:00 PM
    Questions 🙏 I have a input source where the partitions change over time. I don't want to restart my bytewax process to discover them. Is there a way to periodically refresh list_parts()?
    0
    c
    • 2
    • 2
  • g

    great-shampoo-77301

    03/06/2025, 12:54 PM
    I am using collect operator on a keyed stream, and finally passing the collected stream to a DynamicSink that write the stream to csv files that it determines based on some key in the stream. What I observe is that when collect emits multiple batches of the same key, only the final batch is being dumped into csv. Is this some thing with DynamicSink or collect operator?
    0
    p
    • 2
    • 8
  • w

    wonderful-hairdresser-34729

    03/06/2025, 3:41 AM
    hello! i have bytewax running as a sts with 2 pods using a slightly modified helm chart that allows me to ditch the configmap entirely and use custom docker images. anyhoo, it works fine, but i'm wondering how i can figure out that the 2 pods are working together as a single process. from the logs of the
    process
    container in pod-0, there is no indication that bytewax api is doing anything. in the
    api-cache
    container, it's the same story. however, for
    process
    in pod-1, i see a bunch of failed attempts to lookup the dns name of pod-0 (which i guess is because it's waiting on pod-0 to initialize and start up, which also means the k8s
    Service
    finally gets created and the internal k8s DNS name for that service is finally resolvable) and then radio silence. the next set of logs are from my own bytewax flow.
    Copy code
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    worker 1:       error connecting to worker 0: failed to lookup address information: Name or service not known; retrying
    0
    p
    w
    • 3
    • 20
  • m

    mammoth-postman-28882

    03/04/2025, 2:01 AM
    hey! what would be the approp. description for connectors? is it
    Bytewax connector for X
    or
    X connector for Bytewax
    ? previously i noticed only the former, but now seeing both. AIs are leaning towards the latter. so would like to confirm. some existing examples from https://github.com/bytewax org •
    Bytewax connector for Rerun
    •
    Bytewax Sink and Source Connector for InfluxDB
    •
    Redis connector for Bytewax
    0
    p
    • 2
    • 2
  • m

    mammoth-postman-28882

    02/27/2025, 8:43 AM
    hey! i had to prioritize few other stuffs and got back to bytewax connector for S2 recently, and i just released the initial version of the package
    bytewax-s2
    . let me know if you have any feedback on https://github.com/s2-streamstore/bytewax-s2 & https://s2.dev/docs/integrations/bytewax#real-time-insights-from-bluesky-firehose-data. before we share in our socials, would like to check a few things w/ you: 1. is there anything that is required to be done at your end before we can share? 2. what is required at our end to make S2 get listed in https://bytewax.io/modules? 3. is it cool if we tag bytewax's company page/handle when we share?
    0
    s
    p
    m
    • 4
    • 5
  • d

    damp-finland-12935

    02/26/2025, 1:27 AM
    im thinking of replacing redpanda with tansu. has anyone tried this? were there any issues with the bytewax integrations?
    0
    p
    • 2
    • 1
  • w

    wonderful-hairdresser-34729

    02/24/2025, 4:42 AM
    Note about helm chart. In my
    values.yaml
    i can set
    configuration.configMap.create
    =
    false
    . but in the helm rendered statefulset, i see there is hardcoded assumption that the configmap is always mounted into the pod. suggest to change the behaviour of the helm chart so that setting
    configuration.configMap.create = false
    would remove that hardcoded assumption.
    0
    w
    • 2
    • 2
  • w

    wonderful-hairdresser-34729

    02/23/2025, 11:13 AM
    So I'm starting to explore using helm chart to run my bytewax flows because I want better parallelism in kubernetes. My bytewax flow has a dependency on some python modules, including one which I build on my own and I host in a private pypi repo (jfrog). This module is useable by all my bytewax operators. From the helm chart documentation I see that the bytewax flow python code which I write needs to be placed in a configmap and mounted in a volume. But configmaps have a 1mb limit....what's the recommended strategy if a) I need more than 1mb (yes, I'm aware of the tarball approach), b) I have dependencies in requirements.txt (and in addition to my custom built python module, I also have some public pypi packages). I want to avoid docker build-ing my own image off of the bytewax/bytewax public image if possible. Because I have like 16 bytewax flows right now, that'll just increase storage and redundancy in my private docker repo (jfrog). Still, that's my last resort if this becomes impossible or cumbersome to do.
    0
    p
    w
    • 3
    • 13
  • m

    magnificent-kite-85568

    02/21/2025, 12:08 PM
    Hey I am giving bytewax a go and trying to figure out one peculiar thing. I’ve written application that 1) reads from kafka 2) do some tranformations on messages read (like filtering, normalization etc) 3) keys on .url property from the messages:
    Copy code
    keyed = op.key_on(‘url-hash’, normalized,
                          lambda p: str(hash(p.urls[0]) % 3))
    `
    4) builds batches of N messages
    Copy code
    batch = op.collect(‘build-batch’, keyed, timeout=timedelta(milliseconds=100),
                           max_size=app_config.BATCH_SIZE)
    5) does some heavylifing on these batches (including gpu etc)
    Copy code
    classified = op.flat_map(‘classifier’, batch, batch_classifier.process)
    6) puts the result to KafkaSinkMessage 7) sends it to kafka. Now, the problem I am experiencing is that step 5) receives a lot of data and is able to process it - I can see from the logs that it successfully produces results. But then, when it comes to creating KafkaSinkMessages in step 6), the behavior is strange - nothing happens for few minutes, then it receives a burst of messages form like 100+ processed batches, all at one time, and huge amount of messages is being sent to kafka in step 7). Then the cycle repeats - step 5) is being executed for few minutes, the impression is that 6) and 7) are not happening when they should, and after another few minutes - again the burst. In the meantime memory consumption of the app goes through the roof and usually the app dies of OOM after few such cycles. The question is - why is it happening like that?
    0
    b
    • 2
    • 25
  • d

    damp-finland-12935

    02/19/2025, 2:16 AM
    im using the bytewax helm chart and wondering... how I might add an http endpoint to invalidate in-memory KV entries? I used to publish the KV data to kafka and store it perpetually in bytewax state, but realized that some data is better as a KV lookup (allowed me to turn a bunch of flows stateless and drop the persistent volume requirement)
    0
    b
    • 2
    • 8
  • w

    wonderful-hairdresser-34729

    02/17/2025, 11:12 PM
    I currently use the
    collect
    function to accumulate messages so that I can get a numbytes and nummessages counter calculated. Is there a better way to do this? Accumulating messages is cool but it feels like it's a waste of memory when all I need is a sum of bytes and sum of message count. Ideally I would like to be able to add to message count and bytes and then throw the message away. I also want the ability to emit these two metrics once every minute but only if there's messages consumed, which is why the
    collect
    function was chosen. I am using this against a Kafka cluster.
    0
    b
    • 2
    • 8
  • g

    great-shampoo-77301

    02/17/2025, 8:22 AM
    I have a keyed stream that I am trying to write to a csv file with a StatelessSinkPartition. When write_batch is called, I want it to get batched data that contains the same key for all items. How can I achieve this? Thread in Slack Conversation
    0
    p
    • 2
    • 2
  • w

    wonderful-hairdresser-34729

    02/13/2025, 9:09 AM
    hey is there a limit to the number of topics that a kafka consumer can listen on? i have a consumer listening to 120 topics using https://docs.bytewax.io/stable/api/bytewax/bytewax.connectors.kafka.operators.html#bytewax.connectors.kafka.operators.input and i don't get any messages. when i reduce the number of topics to 2, i start seeing messages flowing in.
    0
    b
    • 2
    • 10
  • w

    wonderful-hairdresser-34729

    02/12/2025, 9:14 AM
    is there a way to change the topics in a kafka stream at runtime? i see https://docs.bytewax.io/latest/api/bytewax/bytewax.connectors.kafka.operators.html#bytewax.connectors.kafka.operators.input and every input stream requires a
    step_id
    to make it unique. i could create a new stream with a different step_id but the old one is still running.
    0
    b
    • 2
    • 3
  • g

    great-shampoo-77301

    02/12/2025, 8:56 AM
    Hey guys, I am looking to build a dynamic Sink. I have a stream of dicts and I want to write those into files, for each entry, the file it must be written to must be dynamically determined based on some key, value in the dict itself. I am looking to build a CSVSink. I cannot use list_parts as the filenames are not known in advance.
    0
    b
    • 2
    • 4
  • c

    colossal-daybreak-85530

    02/06/2025, 5:33 PM
    Naive question - can one Bytewax dataflow feed another one? I.e. can you daisy chain pipelines?
    0
    d
    c
    • 3
    • 2