Is there any benefit of writing rust functions ins...
# general
k
Is there any benefit of writing rust functions instead of Python UDF? Wondering what's the performance penalty of Python UDFs? I have billions of rows in my dataframe and need to operate row wise transformations.
c
In general, pure python code cannot be parallelized due to the Python global interpreter lock, and it also does not benefit from compiler/hardware optimizations that Rust code does. Therefore we recommend using Daft's native expressions https://docs.daft.ai/en/stable/api/expressions/ if possible. Of course, not every transformation can be supported by a native expression. You may have custom business logic, or need to use external python libraries. A lot of python libraries already run native code under the hood, like pytorch or numpy, and can be parallelized. But for cases where UDFs do run pure python code, Daft can run the UDF in a separate process. https://docs.daft.ai/en/stable/api/udf/#daft.udf.UDF.use_process
k
Is there any starter example for writing custom expression in rust?
m
I think your best bet for that in Daft will be to write your function in Rust and use pyO3 to create Python bindings for it. Then you can make a Python UDF to call into your pyO3 code. We don't have a specific tutorial going over how to do these steps. But I'd like to try and help you out here! What follows isn't working code 🙂 but I wanted to give you some psuedocode to help you get started: (1) I'll assume you have your function implemented and I'll reference it below as
crate::my_crate::my_rust_udf
. For simplicity, I'll assume it's input is of a type called
I
and output is a type called
O
. (2) The pyO3 code looks roughly like:
Copy code
use pyo3::prelude::*;
<https://pyo3.rs/v0.25.1/module>

crate::my_crate::my_rust_udf;

#[pyfunction]
fn py_rust_udf_scalar(x: I) -> O {
    my_rust_udf(x)
}

#[pymodule]
fn py_rust_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(py_rust_udf_scalar, m)?)
}
(3) Use
manturin
in your
pyproject.toml
and build this code: https://github.com/PyO3/maturin (4) Make your daft UDF:
Copy code
import daft

from py_rust_module import py_rust_udf_scalar

@daft.udf(
  // you will have to figure out how to go from the python type to something daft understands
  // if it's simple, like float, then this is just daft.DataType.float64()
  // if it's complex, like an object, you'll want to define an appropriate struct or list type
  return_dtype=...
)
def my_udf(xs: daft.Series):
  return [py_rust_udf_scalar(x) for x in xs]



// use it!
df: daft.DataFrame = ...
df = df.with_column("new_col_name", my_udf(daft.col("input_col_name")))
c
This unfortunately will still suffer from the python GIL lock as the rust code is called within python while the gil is still locked. pyo3 does have a method to unlock the gil while running cpu intensive rust only functons -- Python::detach.
Copy code
#[pyfunction]
fn py_rust_udf_scalar(x: I, py:Python) -> O {
    py.detach(|| move {my_rust_udf(x)})
}
🙌 2
also if you are not interested in implementing your own parallelism, i'd suggest the newer
@daft.func
instead of the legacy
@daft.udf
🙌 2
k
Thanks for the pointers. I'll take a look into it
a
why not use multiprocessing? like i think other engines (spark, multiprocessing map, pandarallel) handle this using multiprocessing, or i guess is there an easy way to do this? Sorry im being silly ther eis the use_process arg