https://www.getdaft.io logo
Join Slack
Powered by
# daft-dev
  • t

    Tan Phan

    08/07/2025, 4:07 PM
    I'm using daft on Ray to read data via a custom DataSource. I see the number of tasks are generated correctly, but the tasks are not distributed as I expected. My setup is just a local Ray cluster (1 node). What I expected is that each task will be distributed into one process. But looks like all of the tasks are scheduled into one process. My task will read a range of data from a hdf5 file, which blocks reading from multiple threads. So the current distribution is not very useful for my use case. I see that this function is responsible for starting the workers https://github.com/Eventual-Inc/Daft/blob/main/daft/runners/flotilla.py#L146-L175, when I change it to start one worker per one cpu per one node, the tasks are distributed to all the processes (workers) and speeds up my code a lot. So my question is that is there any way for the user to configure the workers pool?
    j
    c
    +2
    • 5
    • 7
  • c

    Cory Grinstead

    08/08/2025, 5:29 PM
    PR for improving the daft.func documentation, and adding a new section to the quickstart! https://github.com/Eventual-Inc/Daft/pull/4942
    ๐Ÿ‘€ 1
  • c

    Cory Grinstead

    08/08/2025, 6:45 PM
    also tiny 1 line PR for fixing a bad link in our docs https://github.com/Eventual-Inc/Daft/pull/4941
  • t

    Tan Phan

    08/08/2025, 7:12 PM
    @Robert Howell regarding custom DataSource, does DataSource's schema needs to be the same as DataSourceTask's schema? If so, what is your advice when the task's schema is different because we can absorb filtering, projection pushdown to limit the columns to read? That is my use case at the moment. Everything about the dataframe like
    collect()
    works fine until I do
    write_parquet
    , the error message is
    channel closed
    (from rust I guess). When there is projection pushdown, it works fine
    r
    • 2
    • 4
  • c

    Coury Ditch

    08/12/2025, 3:55 PM
    @Robert Howell Thanks for taking a look at this issue. I'm curious why broadcasting a dataframe during a cross join is not supported? If I'm cross joining a small df to a large df, I would think broadcasting would be the ideal strategy. Perhaps Daft is automating such an optimization under the hood already?
    r
    k
    c
    • 4
    • 10
  • a

    amit singh

    08/12/2025, 6:25 PM
    Hi @Robert Howell updated the PR https://github.com/Eventual-Inc/Daft/pull/4951 for issue 4876 (support iceberg partitioning) as per the comments. please note that as per the earlier comment to support partition_field as argument in Daft Catalog, I had to update all subclasses if create_table present and also one test file (test_catalog - MockCatalog). please take a look as per your availability. thanks
    โœ… 2
    ๐ŸŽ‰ 1
  • u

    ่’‹ๆ™“ๅณฐ

    08/12/2025, 11:45 PM
    The design document of Flight Shuffle integration with Celeborn is https://docs.google.com/document/d/1Yg8P4GDqrYlZePfaltFSaYAqvcPgZXOrAC4d4OI8Qi4/edit?usp=sharing. Flight Shuffle integration with Celeborn could support distributed push- based shuffler more efficiency and elasticity. PTAL.
    ๐Ÿ‘ 1
    j
    s
    • 3
    • 12
  • c

    Cory Grinstead

    08/13/2025, 9:59 PM
    @Kevin Wang here's a very small PR thats somewhat related to the lit/dtype work you've been working on https://github.com/Eventual-Inc/Daft/pull/4973
  • u

    ่’‹ๆ™“ๅณฐ

    08/14/2025, 3:53 AM
    Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations. Daft could support Paimon lake format which could support via the way that supports Iceberg: https://github.com/Eventual-Inc/Daft/issues/4976.
  • k

    Kevin Wang

    08/14/2025, 9:54 PM
    @Cory Grinstead @Srinivas Lade @ChanChan Mao a substantial cleanup + docs of our Daft to Python type conversion code, ptal! https://github.com/Eventual-Inc/Daft/pull/4972
    โœ… 1
  • r

    Robert Howell

    08/14/2025, 11:26 PM
    @Desmond Cheong here's a PR to read video frames into a DataFrame using a custom source. It has support for reading directly from youtube, and you can specify a list of video URLs.. https://github.com/Eventual-Inc/Daft/pull/4979
    Copy code
    df = daft.read_video_frames(
        path="<https://www.youtube.com/watch?v=jNQXAC9IVRw>",
        image_height=480,
        image_width=640,
        is_key_frame=True,
    )
    ๐Ÿ™Œ 3
    daft party 1
    a
    j
    • 3
    • 4
  • n

    Navneeth Krishnan

    08/20/2025, 1:36 PM
    Hey guys, just wondering if I can work with daft at a query planning level or some way that I can make daft โ€œload awareโ€ as Iโ€™m working with terabytes of data. My question is - is daft already designed to be load aware? It is unrealistic to imagine that Iโ€™ll always be querying TBs of data. Some queries might be smaller, some might churn more. My point is I want to make sure that partitioning happens effectively in Ray cluster. Maybe some queries require partitioning to be spread across 2 nodes, some might need 3 nodes, some might work best on the native runner. How can I control this?
    c
    • 2
    • 2
  • c

    Cory Grinstead

    08/20/2025, 7:50 PM
    PR for additional
    daft.File
    functionality is ready for review! https://github.com/Eventual-Inc/Daft/pull/5002
    • 1
    • 1
  • r

    Robert Howell

    08/20/2025, 11:09 PM
    PR which adds an OpenAI provider and support for embed_text. There are a couple of things in here. โ€ข Adds an OpenAI provider implementation along with provider-specific options. โ€ข Adds an OpenAI TextEmbedder with dynamic batching support. โ€ข Adds additional session methods for the providers e.g. set_provider, get_provider โ€ข Adds support for attaching custom providers with proper resolution. โ€ข Makes set_provider initialize with defaults e.g. daft.set_provider("openai") Examples Embedding text with OpenAI
    Copy code
    import daft
    from <http://daft.functions.ai|daft.functions.ai> import embed_text
    daft.set_provider("openai")  # <- can configure api_key here
    df = daft.from_pydict({"text": ["hello, world!"]})
    df = df.with_column("embedding", embed_text(df["text"]))
    df.show()
    โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
    โ”‚ text          โ”† embedding                โ”‚
    โ”‚ ---           โ”† ---                      โ”‚
    โ”‚ Utf8          โ”† Embedding[Float32; 1536] โ”‚
    โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
    โ”‚ hello, world! โ”† <Embedding>              โ”‚
    โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
    
    (Showing first 1 of 1 rows)
    Scoped context with an attached session
    Copy code
    import daft
    
    sess = load_session() # your session init logic
    
    with daft.use_context(sess):
         # everything within this block resolves via the session.
    https://github.com/Eventual-Inc/Daft/pull/4997
    k
    • 2
    • 5
  • k

    Kevin Wang

    08/21/2025, 12:49 AM
    Hugging Face write functionality ready for review! @Cory Grinstead @Desmond Cheong @Colin Ho https://github.com/Eventual-Inc/Daft/pull/5015
    ๐Ÿ™Œ 1
  • c

    Cory Grinstead

    08/22/2025, 4:19 PM
    Breaking PR to change how we handle tuples internally! cc @Kevin Wang @Robert Howell https://github.com/Eventual-Inc/Daft/pull/5030
  • d

    Desmond Cheong

    08/26/2025, 8:42 PM
    ๐Ÿš€๐Ÿš€ Huge shoutout to @Xin Xianyin (Xin Xianyin) for the herculean effort of moving us off scattered
    requirements.txt
    +
    pyproject.toml
    files and into a unified
    pyproject.toml
    ! https://github.com/Eventual-Inc/Daft/pull/4849
    ๐Ÿ™Œ 10
    ๐Ÿ”ฅ 16
    โœ… 7
    s
    k
    +2
    • 5
    • 4
  • z

    Zhiping Wu

    08/27/2025, 3:58 AM
    Proposal a discussion about adding Video data type into Daft, feel free to share your throughts, thx. https://github.com/Eventual-Inc/Daft/discussions/5054
    ๐Ÿ™Œ 2
    c
    r
    • 3
    • 4
  • e

    Everett Kleven

    08/28/2025, 5:01 PM
    Yo @Cory Grinstead are there any concurrency controls for row-wise udfs?
    k
    s
    +2
    • 5
    • 32
  • e

    Everett Kleven

    08/31/2025, 9:15 PM
    Has anyone else been seeing the dashboard warning come up? Not sure if its just my environment or something actually broke.
    Copy code
    /usr/local/lib/python3.12/dist-packages/daft/dashboard/__init__.py:91: UserWarning: Failed to broadcast metrics over <http://127.0.0.1:3238/api/queries>: HTTP Error 400: Bad Request
      warnings.warn(f"Failed to broadcast metrics over {url}: {e}")
    m
    • 2
    • 2
  • c

    can cai

    09/02/2025, 11:36 AM
    Doesn't read_lance() support limit pushdown?
    Copy code
    class LanceDBScanOperator(ScanOperator, SupportsPushdownFilters):
        ...
        def can_absorb_filter(self) -> bool:
            return isinstance(self, SupportsPushdownFilters)
    
        def can_absorb_limit(self) -> bool:
            return False
    
        def can_absorb_select(self) -> bool:
            return False
    r
    • 2
    • 3
  • c

    can cai

    09/03/2025, 5:27 PM
    # flotilla.py
    Copy code
    def stream_plan(
            self,
            plan: DistributedPhysicalPlan,
            partition_sets: dict[str, PartitionSet[ray.ObjectRef]],
        ) -> Iterator[RayMaterializedResult]:
            plan_id = plan.id()
            
            t=time.time()
            sz=len(cp.dumps(plan))
            print("plan pickle bytes:", sz, "secs:", time.time()-t)
          
            ray.get(self.runner.run_plan.remote(plan, partition_sets))
            while True:
                materialized_result = ray.get(self.runner.get_next_partition.remote(plan_id))
                if materialized_result is None:
                    break
                yield materialized_result
    During the test, I found that the size of DistributedPhysicalPlan reached 13GB, which is very unreasonable. Moreover, the serialization process took a very long time, nearly 10 minutes. Is there any way to easily analyze what is too big in the DistributedPhysicalPlan Object?
    Copy code
    plan pickle bytes: 13298686802 secs: 596.3822300434113
    c
    j
    • 3
    • 18
  • r

    Robert Howell

    09/03/2025, 8:13 PM
    @Desmond Cheong I split out the
    <http://daft.ai|daft.ai>
    module refactor into its own PR. https://github.com/Eventual-Inc/Daft/pull/5125
    daft bro 1
  • r

    Robert Howell

    09/03/2025, 8:21 PM
    @Desmond Cheong my morning distraction .. this fixes youtube video reading and adds the example of reading from multiple urls. https://github.com/Eventual-Inc/Daft/pull/5126
    โค๏ธ 3
  • s

    Srinivas Lade

    09/03/2025, 8:54 PM
    In case anyone here's interested, opened a PR to add an AGENTS.md: https://github.com/Eventual-Inc/Daft/pull/5124. Should hopefully make working with agents a little nicer.
    โค๏ธ 2
    j
    e
    c
    • 4
    • 12
  • p

    Patrick Kane

    09/08/2025, 1:07 AM
    Am I misusing daft by trying to process full rows and return different rows? Iโ€™m used to the spark workflow where I can pass a row into a UDF and yield a different Row object. Actual use case is to convert an arrow record batch to a list of protobuf (using the โ€˜protarrowโ€™ package) and return a different protobuf after computing, finally reversing the process to get back to a new record batch. I have this working with the native runner using iter_partitions and iter_batches but I donโ€™t believe I can take advantage of the ray cluster this way. Appreciate any advice!
    ๐Ÿ‘ 1
    c
    • 2
    • 2
  • k

    Kevin Wang

    09/08/2025, 7:00 PM
    @Cory Grinstead @R. C. Howell first batch of Daft functions migrated, PTAL! It's a big PR but we've already discussed and agreed upon the plans for the ones in this PR so I'm hoping the code is straightforward to review. I figured it would be more efficient to just batch all of them but let me know if you prefer if I split the PR up https://github.com/Eventual-Inc/Daft/pull/5086
    c
    • 2
    • 1
  • e

    Everett Kleven

    09/09/2025, 5:11 PM
    Hey team, when are we thinking we will snap the next release? I'd like to take advantage of @Srinivas Ladeโ€™s base64 encoder before I PR the structured outputs example. If its later in the week I'll just PR now.
    k
    • 2
    • 1
  • n

    Navneeth Krishnan

    09/10/2025, 5:02 PM
    Is DAFT Async UDFs a work in progress or is it already released? Couldnโ€™t find any examples in the documentation so curious to know how to implement them.
    c
    • 2
    • 5
  • e

    Everett Kleven

    09/10/2025, 9:04 PM
    Is it just me or has the functions docs section been moved? @Kevin Wang @Desmond Cheong
    k
    n
    • 3
    • 15