Rakesh Jain
09/12/2025, 10:15 PMKyle
09/15/2025, 6:22 AM吕威
09/16/2025, 7:22 AM@udf(
return_dtype=DataType.list(
DataType.struct(
{
"class": DataType.string(),
"score": DataType.float64(),
"cropped_img": DataType.image(),
"bbox": DataType.list(DataType.int64()),
}
)
),
num_gpus=1,
batch_size=16,
)
class YOLOWorldOnnxObjDetect:
def __init__(
self,
model_path: str,
device: str = "cuda:0",
confidence: float = 0.25,
):
# int model
pass
def __call__(self, images_2d_col: Series) -> List[List[dict]]:
images: List[np.ndarray] = images_2d_col.to_pylist()
results = self.yolo.predict(source=images, conf=self.confidence)
for r in results:
img_result = []
orig_img = r.orig_img
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(orig_img.shape[1], x2), min(orig_img.shape[0], y2)
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cls = int(box.cls[0])
img_result.append(
{
"class": self.yolo.names[cls],
"score": float(box.conf[0]),
"cropped_img": {
"cropimg": cv2.cvtColor(
orig_img[y1:y2, x1:x2], cv2.COLOR_BGR2RGB
),
},
"bbox": [x1, y1, x2, y2],
}
)
objs.append(img_result)
return objs
the cropped_img must return with dict, if direct return np.ndarray, will raise Could not convert array(..., dtype=uint8) with type numpy.ndarray: was expecting tuple of (key, value) pair error
why?ChanChan Mao
09/22/2025, 4:00 PMChanChan Mao
09/23/2025, 12:33 AMdaft.File datatype. Following that, @Colin Ho will dive into his work on Flotilla, our distributed engine, and showcase some exciting benchmark results 👀 We'll leave plenty of time at the end for questions and discussions.
Add to your calendar and we'll see you then! 👋Garrett Weaver
09/23/2025, 10:28 PMAmir Shukayev
09/24/2025, 11:38 PMNathan Cai
09/24/2025, 11:59 PM# Supply actual values for the s3
Not
# Supply actual values for the se
in the docs?
https://docs.daft.ai/en/stable/connectors/aws/#rely-on-environment
from <http://daft.io|daft.io> import IOConfig, S3Config
# Supply actual values for the se
io_config = IOConfig(s3=S3Config(key_id="key_id", session_token="session_token", secret_key="secret_key"))
# Globally set the default IOConfig for any subsequent I/O calls
daft.set_planning_config(default_io_config=io_config)
# Perform some I/O operation
df = daft.read_parquet("<s3://my_bucket/my_path/**/*>")ChanChan Mao
09/25/2025, 11:01 PMGarrett Weaver
09/26/2025, 4:58 AMuse_process=True everything works fine locally (mac), but seeing /usr/bin/bash: line 1: 58 Bus error (core dumped) when run on k8s (argo workflows). any thoughts?Garrett Weaver
09/26/2025, 5:11 PMexplode with average result being 1 row --> 12 rows and max 1 row --> 366 rows (~5m rows --> ~66m rows). Seeing decently high memory usage during the explode even with a repartition prior to explode. Is the only remedy more partitions and/or reduced number cpus to reduce parallelism?Nathan Cai
09/29/2025, 1:00 PMRobert Howell
09/30/2025, 4:39 PMDan Reverri
09/30/2025, 9:50 PMChanChan Mao
10/03/2025, 9:35 PMColin Ho
10/15/2025, 4:14 PMfr1ll
10/15/2025, 11:36 PMembedding-atlas library. At the end, I need to run UMAP against the full set of embeddings. I currently resort to the clunky approach below.
Am I missing a better way to add a column from a numpy array of same length as the dataframe?
vecs = np.stack([r["embedding"] for r in df.select("embedding").to_pylist()], axis=0)
# little helper wrapping umap
xy, knn_indices, knn_distances = umap_with_neighbors(vecs).values()
df = df.with_column("_row_index": (daft.sql_expr("row_number()") - 1))
umap_cols = daft.from_pydict({
"_row_index": list(range(xy.shape[0])),
"umap_x": daft.Series.from_arrow(pa.array(xy[:,0])),
"umap_y": daft.Series.from_arrow(pa.array(xy[:,1])),
})
df = df.join(umap_cols, on="_row_index")Nathan Cai
10/17/2025, 4:00 PMtests/connect/test_io.py
@pytest.mark.skip(reason="<https://github.com/Eventual-Inc/Daft/issues/3786>")
def test_write_csv_with_delimiter(make_df, make_spark_df, spark_session, tmp_path):
pass
But the thing is, I don't know what these arguments mean, and I don't see that function being invoked anywhere else so I don't know where it's being provided those arguments.
But the other thing is, its function signature is different from the ones above:
def test_csv_basic_roundtrip(make_spark_df, assert_spark_equals, spark_session, tmp_path):
And I think I might need the assert_spark_equals function in my scenario, I was wondering if someone is willing to guide me in the right direction.Ben Cornelis
10/17/2025, 5:37 PMdf = daft.read_parquet(
"<abfs://my_container@my_service_account.dfs.core.windows.net/in_table_path/>"
)
df = df.groupby("group_col").agg(col("count_col").count())
df.write_parquet("<abfs://my_container@my_service_account.dfs.core.windows.net/out_table_path/>")
(it's worth noting that I have daft pinned to 0.5.19 since I currently get an error using write_parquet to azure blob store on later versions. It is the same issue as here: https://github.com/Eventual-Inc/Daft/issues/5336)
When I submit a job running this script to my ray cluster, it eventually fails with this error:
ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory.
Memory on the node (IP: 10.224.2.36, ID: 637c809d2d9dfed8dd5c109e7ff9b81e4c10a024322086029a8f6def) where the task (actor ID: 53ff539b6349c1bee7efc01802000000, name=flotilla-plan-runner:RemoteFlotillaRunner.__init__, pid=1631, memory used=24.82GB) was running was 30.46GB / 32.00GB (0.951874), which exceeds the memory usage threshold of 0.95.
Is this unexpected that the flotilla runner on the head node is consuming so much memory?Ben Cornelis
10/17/2025, 9:34 PMdaft.exceptions.DaftCoreException: DaftError::External Cached error: Unable to open file <abfs://path/to/my.parquet>: Error { context: Full(Custom { kind: Io, error: reqwest::Error { kind: Decode, source: hyper::Error(Body, Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }) } }, "error converting `reqwest` request into a byte stream") }
With the job I mentioned above with 50k files it's happening often enough that the job always fails. Has anyone ever seen this?Ben Cornelis
10/20/2025, 8:13 PMAzureConfig.storage_account or the AZURE_STORAGE_ACCOUNT environment variable.` but I also see code attempting to parse it from the uri: https://github.com/Eventual-Inc/Daft/blob/main/src/daft-io/src/azure_blob.rs#L116. I'm not sure why that's not working. I'm using an abfs uri of the form <abfs://container@account_name.dfs.core.windows.net/path/>Nathan Cai
10/21/2025, 9:03 PMmake test on their own devices? It burns a whole in my RAM when I run it on my laptopVOID 001
10/23/2025, 7:40 AMBen Cornelis
10/24/2025, 7:40 PM(raylet) node_manager.cc:3193: 1 Workers (tasks / actors) killed due to memory pressure (OOM) I've tried various k8s pod sizing - currently I have 80 worker pods with 8cpu / 32gb. Does anyone know why so much memory would be used, or how to tune / think about queries like this?Garrett Weaver
10/24/2025, 8:17 PMparquet_inflation_factor still respected in flotilla? I have a parquet file that is ~2GB, but expands to ~16GB in memory. When trying to run a select with window function, it is trying to load all data into memory and struggling (OOMing). What is the best course of action here?Garrett Weaver
10/24/2025, 9:35 PMnative_parquet_writer=False)takumi hayase
10/28/2025, 12:39 PMBen Cornelis
10/28/2025, 3:40 PMVOID 001
10/29/2025, 1:33 PMNathan Cai
10/30/2025, 10:30 PMwrite_csv option
https://github.com/Eventual-Inc/Daft/issues/3786
but I realized that since I'm using one field of PyArrow's csv WriteOptions: https://arrow.apache.org/docs/python/generated/pyarrow.csv.WriteOptions.html#pyarrow.csv.WriteOptions
Might as well implement support all the fields, as it shouldn't be too difficult.
This is the original PyArrow class:
@dataclass(kw_only=True)
class WriteOptions(lib._Weakrefable):
include_header: bool = field(default=True, kw_only=False)
batch_size: int = 1024
delimiter: str = ","
quoting_style: Literal["needed", "all_valid", "none"] = "needed"
def validate(self) -> None: ...
I want to implement the corresponding class in Rust like this:
#[derive(Debug, Clone)]
pub struct CSVWriteOptions {
pub include_header: bool,
pub batch_size: usize,
pub delimiter: char,
pub quoting_style: QuotingStyle,
}
#[derive(Debug, Clone)]
pub enum CSVQuotingStyle {
Needed,
AllValid,
None,
}
However there's a couple of obstactles, the first one it seems that Rust doesn't support default fields, so I guess I create a special constructor for default I guess?
But the main problem is, the quoting_style field, it accepts only specific kinds of strings, normally this is fine, but the problem is that when that Rust struct gets passed to Daft's Python CSV Writer class which uses those options with PyArrow, it accepts a string for quoting_style, not an enum, what is the cleanest approach to this? As far as I'm aware you can't have specific values for strings in Rust, you have to use an enum.
Should I just create two structs, the one above, and a new one that's the same but accepts quoting_style as a String and the original CSVWriteOptions has a function that converts it a new struct that turns the quote_style into a string?
How does Daft deal with Python's string literal and Rust enum interoperability? I feel like this is something you have encountered before and I want to know your standard of doing this.