Is there general guidance on using `daft.func` vs ...
# general
g
Is there general guidance on using
daft.func
vs
daft.udf
? I would guess that if the the underlying python code is not taking advantage of any vectorization but maybe just a list comprehension
[my_func(x) for x in some_series],
then just use
daft.func
?
j
Omg @YK and @Kevin Wang and I were just literally complaining about this in the office
šŸ’Æ 1
k
Hi @Garrett Weaver yes you are exactly right. If your UDF just iterates over the rows then
daft.func
should be your go-to. We will add more docs and guidance for this soon!
šŸ™Œ 2
g
follow-up question on testing UDFs, there is still no way to patch a method in a class UDF? I would be better off moving that out of the class UDF if I need to patch for unit testing (e.g. loading data from some external resource)
k
I think there might be a way but it might be hacky. A better bet may be to pass in an implementation as a class or function into the class UDF.
g
> pass in an implementation as a class or function into the class UDF pass in
__init__
or just something like the snippet below is fine where
_load_country_polygons
needs to get data from S3 that I need to mock?
Copy code
def _load_country_polygons() -> pd.DataFrame:
    country_polygons_path = os.getenv(
        "COUNTRY_POLYGONS_DATA_PATH", DEFAULT_COUNTRY_POLYGONS_DATA_PATH
    )
    storage_options = (
        io_utils.get_fsspec_s3_config(is_pandas=True)
        if country_polygons_path.startswith("s3")
        else None
    )
    country_polygons_df = pd.read_parquet(
        country_polygons_path, storage_options=storage_options
    )
    country_polygons_df[GEOMETRY_COLNAME] = [
        shape(json.loads(poly)) for poly in country_polygons_df[GEOMETRY_COLNAME]
    ]
    return country_polygons_df


@daft.udf(return_dtype=daft.DataType.string())
class CountryCodeEnricherUDF:
    # pylint: disable=too-few-public-methods
    """Country code enricher."""

    COUNTRY_CODE_COLNAME = "iso_a3"
    MAX_DISTANCE_METERS = 5000
    TRANSFORMER = Transformer.from_crs(
        CRS.from_epsg(4326), CRS.from_epsg(3857), always_xy=True
    )

    def __init__(self) -> None:
        country_polygons_df = _load_country_polygons()
        self._country_polygons_mapping = dict(
            enumerate(country_polygons_df[self.COUNTRY_CODE_COLNAME])
        )
        self._country_polygons_tree = STRtree(
            [
                self._project_geometry(poly)
                for poly in country_polygons_df[GEOMETRY_COLNAME]
            ]
        )
k
I'm not entirely sure how monkey patching interacts with pickling. Maybe try that first and see what happens, and if it doesn't work, pass it in explicitly
āœ… 1
g
last question, mypy is unhappy and I feel like I have seen this before...
CountryCodeEnricherUDF.override_options(num_cpus=1)
Copy code
marketintel_enrich_flow/lib/transforms.py:28:28: E1101: Class 'CountryCodeEnricherUDF' has no 'override_options' member (no-member)
sorry for the barrage of questions, migrating off legacy UDF in a pipeline that was using this approach to handle testing/patching
Copy code
CountryCodeEnricherUDF = UDF(
    resource_request=None,
    batch_size=None,
    name="country_code_enricher",
    inner=CountryCodeEnricher,
    return_dtype=daft.DataType.string(),
)
k
The decorator does do some funky things where
CountryCodeEnricherUDF
actually turns into an instance of
daft.UDF
. Mypy probably isn't catching that for some reason.
What version of mypy are you running?
g
1.13.0
k
I don't see any issues when using mypy 1.15.0 but I'm not sure if there is any difference in behavior between the two versions on this. You could try updating the version, or simply add a
# type: ignore
on that line haha
g
type ignore for now, updating mypy in our monorepo is painful
šŸ‘ 1
confirmed that I need to pass in
__init__
if
concurrency
is set
one more question on
daft.func
the examples I have seen so far load things like models into global environment. if I didn't want to do this, how do I do it similar to the legacy stateful udfs, pass it as an argument?
or factory pattern?
Copy code
def daft_udf_factory(x: str):

    @daft.func
    def example_udf(y: str) -> str:
        return f"{x}-{y}"

    return example_udf
k
Good question! We do not support this just yet but we are actively working on this
In terms of models, what libraries are you using?
g
xgboost, lightgbm, transformers/huggingface, pytorch
got it, anything fundamentally wrong with factory pattern approach? I used to do this for pyspark udfs šŸ˜‚
k
The issue with that approach is that your model is going to be loaded on your local machine, so if you're running the UDF on a Ray cluster, it will either • if it's in CPU memory, upload the model weights from the head node to all of the nodes, which can be pretty costly • if it's in GPU memory, it will not work since the model will try to reference the weights on the head node instead of its own node
g
got it, so for any stateful udfs I have right now, continue to use legacy?
k
yes
g
I cannot get a UDF to work with
concurrency
set to something non-null, seeing
The actor is temporarily unavailable: RpcError: RPC Error message: failed to connect to all addresses; last error: UNAVAILABLE: ipv4:10.216.152.217:10019: recvmsg:Connection reset by peer; RPC Error details:
this is on latest daft version
also seeing
[2m[36m(pid=174417, ip=10.216.136.26) [0mPhysicalScan->Project->DistributedActorPoolProject->CatalogWrite 0: 13%|ā–ˆā–Ž | 8.00/63.0 [00:07<00:30, 1.82it/s] [36m(UDFActor pid=4129, ip=10.216.152.217)[0m [2025-08-27 21:45:14,500 E 4129 4129] <http://actor_scheduling_queue.cc:135|actor_scheduling_queue.cc:135>: Cancelling stale RPC with seqno 0 < 1
k
@Srinivas Lade would you know the answer to this?
g
I think I saw this behavior on older daft versions as well and gave up on concurrency at some point, I will try to isolate if it is caused by one of the UDFs in my transformation on my side
s
@Colin Ho ^ I think we saw this before?
g
I tried to simplify, daft v0.5.22, enabled experimental distributed executor, seems to complaining about this code, will have to see why
Copy code
model_store = custom_config["vectorizer_config"]["store"]
        if model_store == "huggingface":
            pretrained_model = custom_config["vectorizer_config"]["model"]
        else:
            raise ValueError(f"Model store {model_store} is not supported yet.")
        tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
        model = AutoModel.from_pretrained(pretrained_model, output_hidden_states=False)
        model.eval()
        device = "cuda:0" if torch.cuda.is_available() else "cpu"
        model = model.to(device=device) # this fails
Copy code
NotImplementedError: Cannot copy out of meta tensor; no data! Please use torch.nn.Module.to_empty() instead of torch.nn.Module.to() when moving module from meta to a different device.
s
What version were you using before?
g
this code was on
0.5.13
. I tried with disabled experimental distributed executor, but I was seeing increased memory usage that led to some OOMs, but job eventually succeeded.
this is strange, if I set
force_download=True
, no errors
AutoTokenizer.from_pretrained(pretrained_model, force_download=True)
cc @Srinivas Lade ^^
ok, my guess at this point is that downloading the models from huggingface is not threadsafe. I moved my model initialization under a filelock (includes loading the models from huggingface). the reason force_download helped was probably because I would get rate-limited (saw 429s) which naturally spread out downloads.
Copy code
def __init__(self) -> None:
        lock = filelock.FileLock("/tmp/my.lock")
        with lock:
            if not os.path.exists(self.LOCAL_TEXT_CLASSIFICATION_MODEL_PATH):
                self._download_model()

            self.model = self._initialize_model()
s
ah that makes sense, another idea would be to set
daft.udf(use_process=True)
which is an internal hook to use processes for the UDF execution
šŸ‘€ 1
g
close out this thread for now, things seem to be working, except I still see problems "actor available" or similar with
concurrency
j
@Kevin Wang @Srinivas Lade our new UDF APIs might need to include a way to do filelocks for model downloading — this is a pretty common usecase it seems
šŸ‘ 2
āœ… 1
g
I added an issue for
concurrency
, will try to repro with a simpler job and put it there https://github.com/Eventual-Inc/Daft/issues/5080
ā¤ļø 2