Garrett Weaver
08/27/2025, 6:18 PMdaft.func vs daft.udf? I would guess that if the the underlying python code is not taking advantage of any vectorization but maybe just a list comprehension [my_func(x) for x in some_series], then just use daft.func?jay
08/27/2025, 6:28 PMKevin Wang
08/27/2025, 6:29 PMdaft.func should be your go-to. We will add more docs and guidance for this soon!Garrett Weaver
08/27/2025, 6:31 PMKevin Wang
08/27/2025, 6:33 PMGarrett Weaver
08/27/2025, 6:35 PM__init__ or just something like the snippet below is fine where _load_country_polygons needs to get data from S3 that I need to mock?
def _load_country_polygons() -> pd.DataFrame:
country_polygons_path = os.getenv(
"COUNTRY_POLYGONS_DATA_PATH", DEFAULT_COUNTRY_POLYGONS_DATA_PATH
)
storage_options = (
io_utils.get_fsspec_s3_config(is_pandas=True)
if country_polygons_path.startswith("s3")
else None
)
country_polygons_df = pd.read_parquet(
country_polygons_path, storage_options=storage_options
)
country_polygons_df[GEOMETRY_COLNAME] = [
shape(json.loads(poly)) for poly in country_polygons_df[GEOMETRY_COLNAME]
]
return country_polygons_df
@daft.udf(return_dtype=daft.DataType.string())
class CountryCodeEnricherUDF:
# pylint: disable=too-few-public-methods
"""Country code enricher."""
COUNTRY_CODE_COLNAME = "iso_a3"
MAX_DISTANCE_METERS = 5000
TRANSFORMER = Transformer.from_crs(
CRS.from_epsg(4326), CRS.from_epsg(3857), always_xy=True
)
def __init__(self) -> None:
country_polygons_df = _load_country_polygons()
self._country_polygons_mapping = dict(
enumerate(country_polygons_df[self.COUNTRY_CODE_COLNAME])
)
self._country_polygons_tree = STRtree(
[
self._project_geometry(poly)
for poly in country_polygons_df[GEOMETRY_COLNAME]
]
)Kevin Wang
08/27/2025, 6:38 PMGarrett Weaver
08/27/2025, 6:39 PMCountryCodeEnricherUDF.override_options(num_cpus=1)
marketintel_enrich_flow/lib/transforms.py:28:28: E1101: Class 'CountryCodeEnricherUDF' has no 'override_options' member (no-member)Garrett Weaver
08/27/2025, 6:39 PMCountryCodeEnricherUDF = UDF(
resource_request=None,
batch_size=None,
name="country_code_enricher",
inner=CountryCodeEnricher,
return_dtype=daft.DataType.string(),
)Kevin Wang
08/27/2025, 6:41 PMCountryCodeEnricherUDF actually turns into an instance of daft.UDF. Mypy probably isn't catching that for some reason.Kevin Wang
08/27/2025, 6:41 PMGarrett Weaver
08/27/2025, 6:42 PM1.13.0Kevin Wang
08/27/2025, 6:46 PM# type: ignore on that line hahaGarrett Weaver
08/27/2025, 6:46 PMGarrett Weaver
08/27/2025, 7:02 PM__init__ if concurrency is setGarrett Weaver
08/27/2025, 9:15 PMdaft.func
the examples I have seen so far load things like models into global environment. if I didn't want to do this, how do I do it similar to the legacy stateful udfs, pass it as an argument?Garrett Weaver
08/27/2025, 9:23 PMdef daft_udf_factory(x: str):
@daft.func
def example_udf(y: str) -> str:
return f"{x}-{y}"
return example_udfKevin Wang
08/27/2025, 9:51 PMKevin Wang
08/27/2025, 9:51 PMGarrett Weaver
08/27/2025, 9:57 PMGarrett Weaver
08/27/2025, 9:58 PMKevin Wang
08/27/2025, 10:02 PMGarrett Weaver
08/27/2025, 10:05 PMKevin Wang
08/27/2025, 10:13 PMGarrett Weaver
08/28/2025, 4:48 AMconcurrency set to something non-null, seeing The actor is temporarily unavailable: RpcError: RPC Error message: failed to connect to all addresses; last error: UNAVAILABLE: ipv4:10.216.152.217:10019: recvmsg:Connection reset by peer; RPC Error details:
this is on latest daft versionGarrett Weaver
08/28/2025, 4:50 AM[2m[36m(pid=174417, ip=10.216.136.26) [0mPhysicalScan->Project->DistributedActorPoolProject->CatalogWrite 0: 13%|āā | 8.00/63.0 [00:07<00:30, 1.82it/s] [36m(UDFActor pid=4129, ip=10.216.152.217)[0m [2025-08-27 21:45:14,500 E 4129 4129] <http://actor_scheduling_queue.cc:135|actor_scheduling_queue.cc:135>: Cancelling stale RPC with seqno 0 < 1Kevin Wang
08/28/2025, 5:31 PMGarrett Weaver
08/28/2025, 7:10 PMSrinivas Lade
08/28/2025, 9:21 PMGarrett Weaver
08/28/2025, 10:46 PMmodel_store = custom_config["vectorizer_config"]["store"]
if model_store == "huggingface":
pretrained_model = custom_config["vectorizer_config"]["model"]
else:
raise ValueError(f"Model store {model_store} is not supported yet.")
tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
model = AutoModel.from_pretrained(pretrained_model, output_hidden_states=False)
model.eval()
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model = model.to(device=device) # this fails
NotImplementedError: Cannot copy out of meta tensor; no data! Please use torch.nn.Module.to_empty() instead of torch.nn.Module.to() when moving module from meta to a different device.Srinivas Lade
08/28/2025, 10:47 PMGarrett Weaver
08/28/2025, 10:48 PM0.5.13. I tried with disabled experimental distributed executor, but I was seeing increased memory usage that led to some OOMs, but job eventually succeeded.Garrett Weaver
08/28/2025, 11:30 PMforce_download=True, no errors AutoTokenizer.from_pretrained(pretrained_model, force_download=True)Garrett Weaver
08/28/2025, 11:52 PMGarrett Weaver
08/29/2025, 12:24 AMdef __init__(self) -> None:
lock = filelock.FileLock("/tmp/my.lock")
with lock:
if not os.path.exists(self.LOCAL_TEXT_CLASSIFICATION_MODEL_PATH):
self._download_model()
self.model = self._initialize_model()Srinivas Lade
08/29/2025, 12:31 AMdaft.udf(use_process=True) which is an internal hook to use processes for the UDF executionGarrett Weaver
08/29/2025, 3:20 AMconcurrencyjay
08/29/2025, 3:28 AMGarrett Weaver
08/29/2025, 3:32 AMconcurrency, will try to repro with a simpler job and put it there https://github.com/Eventual-Inc/Daft/issues/5080