kind-vr-57445
07/25/2024, 4:56 PMjolly-motherboard-38558
07/25/2024, 4:22 PMfrom datetime import datetime, timedelta, timezone
from statistics import mean
import bytewax.operators as op
import bytewax.operators.windowing as win
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators.windowing import EventClock, TumblingWindower
from bytewax.testing import TestingSource
flow = Dataflow("windowing")
align_to = datetime(2024, 1, 1, tzinfo=timezone.utc)
inp = [
{"time": align_to, "user": "a", "val": 1},
{"time": align_to + timedelta(days=0, hours=4), "user": "a", "val": 0},
{"time": align_to + timedelta(days=0, hours=5), "user": "b", "val": 10},
{"time": align_to + timedelta(days=0, hours=14), "user": "a", "val": 52},
{"time": align_to + timedelta(days=0, hours=15), "user": "a", "val": 2},
{"time": align_to + timedelta(days=1, hours=1), "user": "a", "val": 2},
{"time": align_to + timedelta(days=1, hours=13), "user": "b", "val": 16},
{"time": align_to + timedelta(days=1, hours=16), "user": "b", "val": 44},
]
stream = op.input("input", flow, TestingSource(inp))
keyed_stream = op.key_on("key_on_user", stream, lambda e: e["user"])
clock = EventClock(lambda e: e["time"], wait_for_system_duration=timedelta(seconds=0))
windower = TumblingWindower(length=timedelta(days=1), align_to=align_to)
win_out = win.collect_window("collect_win_daily", keyed_stream, clock, windower)
def calculate_daily_stats(customer_api_group):
user, (window_metadata, grouped_data) = customer_api_group
sorted_values = sorted([g["val"] for g in grouped_data])
avg = mean(sorted_values)
median = sorted_values[len(sorted_values) // 2]
date = (align_to + timedelta(days=window_metadata)).date()
return {
"date": date,
"user": user,
"avg": avg,
"median": median,
}
daily_stats = op.map("daily_stats", win_out.down, calculate_daily_stats)
op.output("out", daily_stats, StdOutSink())
results in
{'date': datetime.date(2024, 1, 1), 'user': 'a', 'avg': 13.75, 'median': 2}
{'date': datetime.date(2024, 1, 1), 'user': 'b', 'avg': 10, 'median': 10}
{'date': datetime.date(2024, 1, 2), 'user': 'a', 'avg': 2, 'median': 2}
{'date': datetime.date(2024, 1, 2), 'user': 'b', 'avg': 30, 'median': 44}
I am currently trying to group data on a daily basis into aggregate statistics, see above for representative example.
For the current date, I don't want to wait until the end of the date to see aggregate statistics, I want partially complete results for the day to be output every hour, what is the most effective approach to achieve this?mammoth-autumn-89402
07/22/2024, 11:00 AMError: cannot re-use a name that is still in use
upon deploying a dataflow.mammoth-autumn-89402
07/22/2024, 10:59 AMhappy-addition-75353
07/19/2024, 2:47 PMdef get_key(data : dict):
return f"{data['identifier']}/{data['kafka_value'].split('=')[-1]})"
clock = EventClock(lambda e: e["time"], wait_for_system_duration=timedelta(seconds=0))
windower = TumblingWindower(length=timedelta(seconds=60), align_to=align_to)
window_out = win.count_window("count", stream, clock, windower,get_key)
My only idea for now is to rekey window_out.down and window_out.meta, then join them together on window id to get start and stop information from metadata ! Seems complicated to just get this vital information no?
Am I in the correct direction ?
Thanks !able-application-71151
07/19/2024, 1:33 PMt
. This threshold varies by key and can change during job execution. We currently use pyFlink timers to achieve this.
I tried implementing the same functionality using a Session Window in Bytewax, but it seems to lack support for varying gaps per key.
Is there any workaround for this?
Thank you!rhythmic-lawyer-43822
07/16/2024, 4:55 AMflaky-art-54410
07/11/2024, 6:06 PMechoing-sundown-84288
07/08/2024, 6:51 AMcap = cv2.VideoCapture(data)
while True:
if cap.isOpened():
now_fps = cap.get(1)
able-application-71151
07/05/2024, 12:19 PMup
field Stream[Union[KafkaSourceMessage[dict, dict], KafkaSinkMessage[dict, dict]]]
?
Shouldn't it be Stream[Union[KafkaSourceMessage[Any, Any], KafkaSinkMessage[Any, Any]]]
From my understanding, the type of the key and value depend on the Serializers you use.
For example, if you want to serialize a kafka message with a string key and a string value, you'd construct a KafkaSinkMessage[str, str]
, and use a StringSerializer.
Thanks!narrow-barista-20021
07/04/2024, 12:53 AMalign_to = datetime(2024, 7, 1, tzinfo=timezone.utc)
clock = EventClock(lambda e: pd.to_datetime(e["__time"], utc=True), wait_for_system_duration=timedelta(seconds=2))
windower = SlidingWindower(length=timedelta(seconds=5.5), offset=timedelta(seconds=5.0), align_to=align_to)
Trying to understand the window that the following group of events belong to. From debug logs:
group key= 010002332697-1719792024605
group events:
2024-07-01 00:00:24.605000+00:00 , 1719792024605 , 010002332697 , EVENT_A
2024-07-01 00:00:24.607000+00:00 , 1719792024605 , 010002332697 , EVENT_B
2024-07-01 00:00:24.608000+00:00 , 1719792024605 , 010002332697 , EVENT_C
2024-07-01 00:00:24.728000+00:00 , 1719792024605 , 010002332697 , EVENT_D
Window meta data:
meta= ('010002332697-1719792024605', (4, WindowMetadata(open_time=datetime.datetime(2024, 7, 1, 0, 0, 20, tzinfo=datetime.timezone.utc), close_time=datetime.datetime(2024, 7, 1, 0, 0, 25, 500000, tzinfo=datetime.timezone.utc), merged_ids=set())))
meta= ('010002332697-1719792024605', (5, WindowMetadata(open_time=datetime.datetime(2024, 7, 1, 0, 0, 25, tzinfo=datetime.timezone.utc), close_time=datetime.datetime(2024, 7, 1, 0, 0, 30, 500000, tzinfo=datetime.timezone.utc), merged_ids=set())))
The above group of events (group key above) seem to fall into 2 different windows per the window meta data. I am expecting it to fall only into the first window but not the second. Am I missing something?
Any help appreciated!incalculable-country-68182
07/03/2024, 10:39 PMthread '<unnamed>' panicked at src/operators.rs:811:33:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
cli_main(**kwargs)
AssertionError: (src/operators.rs:823:65) error calling `StatefulBatchLogic.on_notify` in "dataflow_binance_trades.generic_window.stateful_batch" for key "1000BONKUSDT"
Caused by => AssertionError:
Traceback (most recent call last):
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/operators/windowing.py", line 1152, in on_notify
watermark = self.clock.on_notify()
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/operators/windowing.py", line 288, in on_notify
self.before_batch()
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/operators/windowing.py", line 254, in before_batch
assert system_now >= self._system_now
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
some-magazine-53695
07/03/2024, 9:33 PMec = EventClock(ts_getter = lambda x: datetime.fromtimestamp(x['event_time']).replace(tzinfo=pytz.utc), wait_for_system_duration=timedelta(minutes=5))
hourly_tw = TumblingWindower(timedelta(hours=1), datetime(2024, 1, 1, tzinfo=timezone.utc))
daily_tw = TumblingWindower(timedelta(days=1), datetime(2024, 1, 1, tzinfo=timezone.utc))
hourly = w.fold_window("hourly_window",
output,
ec,
hourly_tw,
list,
accumulator,
list.__add__,)
daily = w.fold_window("daily_window",
output,
ec,
daily_tw,
list,
accumulator,
list.__add__,)
but I get the error:
ValueError: step 'window.generic_window' already exists; do you have two steps with the same ID?
I do not think this is the case and thought that we could have multiple windows operating in one script but maybe this is not the case?clever-doctor-13542
06/30/2024, 11:37 AMquick-belgium-55275
06/29/2024, 3:45 AMuser_id % worker_count
to choose which source partition to send. So every users has a dedicated socket/partition.
The issue is that when I test with only 1 user, it does not process the data. Data is only processed if more users are created that fill use the other partitions.
A workaround I discovered is to use round robin in selecting source partition but it does not ensure ordering of data since data is fanned out to multiple socket clients.some-magazine-53695
06/26/2024, 11:29 PMstraight-article-83728
06/26/2024, 2:50 PMquick-belgium-55275
06/26/2024, 1:32 PMwrite_batch
function, it batches data per ~100. Our input is every ~250ms, so there is a bit of a delay before the next batch is sent. I’m wondering how I can shorten the delay. I’ve seen in the docs that Values in the dataflow. Non-deterministically batched.
The built-in StdOutSink
is working as expected, so not sure what I’m missing.quick-belgium-55275
06/26/2024, 10:00 AMbored-action-58960
06/23/2024, 12:30 PMstraight-article-83728
06/20/2024, 4:22 PMcuddly-pager-16403
06/20/2024, 2:21 PMfull-shampoo-88323
06/20/2024, 2:39 AM# questDB sink
from typing import Any, List
from typing_extensions import override
from bytewax import operators as op
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from questdb.ingress import Sender, TimestampMicros, TimestampNanos
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingSource
from test_data import BinanceTest
class _QuestDBSinkPartition(StatelessSinkPartition[Any]):
def __init__(self, qdb_config, table_name):
self.qdb_config = qdb_config
self.table_name = table_name
@override
def write_batch(self, items: List[Any]) -> None:
# example with Binance's continuous klines stream
with Sender.from_conf(self.qdb_config) as sender:
for item in items:
kline = item['k']
sender.row(
self.table_name,
symbols={'sym': item['ps']},
columns={
'openTime': TimestampMicros(kline['t']),
'closeTime': TimestampMicros(kline['T']),
'interval': str(kline['i']),
'open': float(kline['o']),
'high': float(kline['h']),
'low': float(kline['l']),
'close': float(kline['c']),
'vol': float(kline['v']),
'numTrades': int(kline['n']),
'volUSD': float(kline['q']),
'volBuy': float(kline['V']),
'volBuyUSD': float(kline['Q']),
},
at=TimestampNanos(kline['t'])
)
sender.flush()
class QuestDBSink(DynamicSink[Any]):
"""Write each output item to QuestDB on that worker.
Items consumed from the dataflow must be dictionaries with the
required fields for the QuestDB table.
Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume
epoch will be duplicated right after resume.
If table wasn't previously created in QuestDB, it will be
created and columns types will auto-generate. Not what you
want for crypto. Ex: prices generated as type DOUBLE, but it's
a shitcoin priced $0.000001682.
"""
def __init__(self, qdb_config, table_name):
self.qdb_config = qdb_config
self.table_name = table_name
@override
def build(
self, _step_id: str, _worker_index: int, _worker_count: int
) -> _QuestDBSinkPartition:
return _QuestDBSinkPartition(self.qdb_config, self.table_name)
if __name__ == "__main__":
qdb_config = "http::addr=localhost:9000;username=admin;password=quest;"
table_name = 'wax_insertion'
# can provide test gen function if you'd like
test = BinanceTest()
data = test.gen_continuous_klines(50)
flow = Dataflow('quest-sink')
input = op.input('input', flow, TestingSource(data))
_ = op.inspect('inspect', input)
output = op.output('output', input, QuestDBSink(qdb_config, table_name))
run_main(flow)
cuddly-pager-16403
06/19/2024, 7:44 PMfamous-finland-88726
06/19/2024, 6:06 PMwaxctl
I am starting out with a trivial dataflow, but even that seems to have an issue deploying. I would appreciate any help you can provide with debugging the below. I suspect there may be something wrong with out env.
waxctl dataflow deploy --name dp-flow dataflow.py -n bytewax --debug --yes
2024/06/19 13:44:43 Analylics - information to send:
{"waxctl_version":"0.11.2","platform":"amd64","os":"linux","command":"dataflow","subcommand":"deploy","bytewax_team":""}
2024/06/19 13:44:43 Analylics - duration: 418.255073ms
2024/06/19 13:44:43 Validating parameters...
2024/06/19 13:44:43 Validating parameters...
2024/06/19 13:44:44 Looking for a statefulset...
2024/06/19 13:44:44 Evaluating manifest needed for namespace "bytewax"...
2024/06/19 13:44:44 Namespace "bytewax" processed.
2024/06/19 13:44:44 Defining if the argument passed is a local file path or a valid URI...
2024/06/19 13:44:44 Argument 'dataflow.py' passed is processed as a local file.
2024/06/19 13:44:44 File to store in the configmap: dataflow.py
2024/06/19 13:44:44 Creating configmap...
2024/06/19 13:44:44 Created configmap "dp-flow".
2024/06/19 13:44:44 Using Bytewax helm chart to create resources: Service Account, Service and Statefulset...
2024/06/19 13:44:44 Saving temporary file bytewax-chart.tgz in working directory...
2024/06/19 13:44:44 Processing environment variable in string BYTEWAX_CREATION_TIMESTAMP=2024.06.19-13:44:44
2024/06/19 13:44:44 key: BYTEWAX_CREATION_TIMESTAMP
2024/06/19 13:44:44 value: 2024.06.19-13:44:44
2024/06/19 13:44:44 Loading Bytewax helm chart in memory...
2024/06/19 13:44:44 Deleting temporary file bytewax-chart.tgz from working directory...
2024/06/19 13:44:44 Looking for an existing release...
2024/06/19 13:44:44 install client.Run...
2024/06/19 13:44:45 creating 5 resource(s)
Dataflow dp-flow deployed in bytewax namespace.
(venv) user@W104WSMDK3:~/CloudIQ/otel-sim$ kubectl get all -n bytewax
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/dp-flow ClusterIP None <none> 9999/TCP 13s
service/dp-flow-api ClusterIP 10.100.240.211 <none> 3030/TCP 13s
NAME READY AGE
statefulset.apps/dp-flow 0/1 13s
incalculable-country-68182
06/18/2024, 6:16 PMv0.19.0
but not 0.20.1
.
%4|1718734036.751|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
thread '<unnamed>' panicked at src/run.rs:116:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/run.py", line 354, in <module>
cli_main(**kwargs)
TypeError: (src/worker.rs:157:10) error building production dataflow
Caused by => TypeError: (src/worker.rs:477:34) error building DynamicSink
Caused by => TypeError: NullSink.build() missing 1 required positional argument: 'worker_count'
(venv) (base) [02:07 PM] TLK3@snowman : ~/PycharmProjects/stratbot2 $ python -m bytewax.run dataflow_ftfc
Traceback (most recent call last):
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/dataflow.py", line 509, in fn
bound = sig.bind(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/inspect.py", line 3212, in bind
return self._bind(args, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/inspect.py", line 3127, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'mapper'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/run.py", line 352, in <module>
kwargs["flow"] = _locate_dataflow(module_str, attrs_str)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/run.py", line 42, in _locate_dataflow
__import__(module_name)
File "/Users/TLK3/PycharmProjects/stratbot2/dataflow_ftfc.py", line 121, in <module>
op.stateful_map('create_setups', bar_stream, create_setups_from_bar_series)
File "/Users/TLK3/PycharmProjects/stratbot2/venv/lib/python3.11/site-packages/bytewax/dataflow.py", line 512, in fn
raise TypeError(msg) from ex
TypeError: operator 'stateful_map' called incorrectly; see cause above
Here’s the op.stateful_map
call:
flow = Dataflow("dataflow_ftfc")
bar_stream = (
op.input('kafka_source', flow, kafka_source)
.then(op.map, 'deserialize', deserialize)
.then(op.filter, 'filter_symbols', lambda data: data[0] in allowed_symbols)
.then(op.map, 'to_bar_series_by_tf', to_bar_series_by_tf)
.then(op.filter, 'filter_ftfc', filter_ftfc)
)
setup_stream = (
op.stateful_map('create_setups', bar_stream, create_setups_from_bar_series)
.then(op.flat_map, 'flat_map_setups', lambda data: [(data[0], setup) for _, setup in data[1].items() if setup])
.then(op.filter, 'filter_potential_outside', filter_potential_outside)
.then(op.filter, 'filter_continuation', lambda data: data[1].trigger_bar.sid != data[1].current_bar.sid)
)
s_joined = (
op.join('join', bar_stream, setup_stream)
.then(op.stateful_map, 'scan_setups', scan_setups)
)
op.output('stdout_sink', s_joined, NullSink())
cool-school-88165
06/17/2024, 9:30 PMcool-school-88165
06/17/2024, 8:45 PMfast-spoon-42006
06/16/2024, 1:46 PMnow() - 24h
)?
My use case is to have a real time unique list of value over kafka stream that is always up-to-date, and having the window that always up-to-date would be a great help. It looks like I can have multiple sliding windows of length of one day with something like 5 minute offset, but this seems like not real-time and there will be a lot of duplicated data compared to having one moving window.powerful-helmet-67874
06/16/2024, 12:39 AMfrom dataclasses import dataclass, field
from typing import List, Optional
import os
import logging
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from influxdb_client import InfluxDBClient
import pandas as pd
# Set up logging
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
# Read InfluxDB details from environment variables
url = os.getenv("INFLUXDB_URL")
token = os.getenv("INFLUXDB_TOKEN")
org = os.getenv("INFLUXDB_ORG")
bucket = os.getenv("INFLUXDB_BUCKET")
client = InfluxDBClient(url=url, token=token, org=org)
# Fetch data from InfluxDB and return as a pandas DataFrame
def fetch_data():
query = '''SELECT * FROM "table" ORDER BY _time DESC LIMIT 100'''
<http://logger.info|logger.info>("Executing query: %s", query)
data = client.query_api().query_data_frame(query, org=org)
<http://logger.info|logger.info>("Query executed successfully")
return data
# Custom source class for Bytewax
class InfluxDBSource:
def __init__(self):
self.records = self.fetch_records()
# Fetch records and convert to list of tuples
def fetch_records(self):
df = fetch_data()
records = df.to_dict('records')
return [(record["<key>"], record["<value>"]) for record in records]
def next(self):
if not self.records:
self.records = self.fetch_records()
return self.records.pop(0)
# Define the dataflow
flow = Dataflow("anomaly_detector")
metrics = op.input("metrics", flow, InfluxDBSource())
# ("metric", value)
@dataclass
class DetectorState:
last_10: List[float] = field(default_factory=list)
mu: Optional[float] = None
sigma: Optional[float] = None
def push(self, value):
self.last_10.insert(0, value)
del self.last_10[10:]
self._recalc_stats()
def _recalc_stats(self):
last_len = len(self.last_10)
<http://self.mu|self.mu> = sum(self.last_10) / last_len
sigma_sq = sum((value - <http://self.mu|self.mu>) ** 2 for value in self.last_10) / last_len
self.sigma = sigma_sq**0.5
def is_anomalous(self, value, threshold_z):
if <http://self.mu|self.mu> and self.sigma:
return abs(value - <http://self.mu|self.mu>) / self.sigma > threshold_z
return False
# Mapper function to update state and check for anomalies
def mapper(state, value):
if state is None:
state = DetectorState()
is_anomalous = state.is_anomalous(value, threshold_z=2.0)
state.push(value)
emit = (value, <http://state.mu|state.mu>, state.sigma, is_anomalous)
return (state, emit)
labeled_metrics = op.stateful_map("detector", metrics, mapper)
# ("metric", (value, mu, sigma, is_anomalous))
# Formatter function to create pretty output
def pretty_formatter(key_value):
metric, (value, mu, sigma, is_anomalous) = key_value
return (
f"{metric}: "
f"value = {value}, "
f"mu = {mu:.2f}, "
f"sigma = {sigma:.2f}, "
f"{is_anomalous}"
)
lines = op.map("format", labeled_metrics, pretty_formatter)
op.output("output", lines, StdOutSink())
And then ideally write them back into InfluxDB but I’m having trouble just verifying that the original script runs successfully. Any help on either would be really appreciated thank you!