Hi team, we're looking at dynamic filtering for th...
# general
a
Hi team, we're looking at dynamic filtering for the Presto-Pinot connector and see that we can generate proper logical plans to pushdown the dynamic filters, specifying
dynamicFilterAssignments
, but doesn't look like the plan is able to take into account or pass this information into the connector. Is there a way to order the source nodes or pass this information down, or what would we need to build in support for this?
👍 1
m
@Xiang Fu ^^
x
What does the query look like? Are you able to find the presto PlanNode structure for that?
a
@Xiang Fu here's an example
Copy code
> set session enable_dynamic_filtering=true;
SET SESSION

> explain select tbl1.atbatting from baseballstats tbl2 join baseballstats tbl1 on tbl1.atbatting=tbl2.atbatting where tbl1.baseonballs > 439;

- Output[atbatting] => [atbatting:integer]
    - RemoteStreamingExchange[GATHER] => [atbatting:integer]
        - InnerJoin[(""atbatting"" = ""atbatting_4"")][$hashvalue, $hashvalue_31] => [atbatting:integer]
                Distribution: PARTITIONED
                dynamicFilterAssignments = {atbatting_4 -> 386}
            - RemoteStreamingExchange[REPARTITION][$hashvalue] => [atbatting:integer, $hashvalue:bigint]
                    Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                - ScanFilterProject[table = TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, forBroker=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT ""AtBatting"" FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE____TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=SQL, table=baseballStats, expectedColumnIndices=[0], groupByClauses=0, haveFilter=false, forBroker=false}]}', layout='Optional[PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, forBroker=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT ""AtBatting"" FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE____TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=SQL, table=baseballStats, expectedColumnIndices=[0], groupByClauses=0, haveFilter=false, forBroker=false}]}]'}, filterPredicate = BOOLEAN'true', dynamicFilter = {386 -> atbatting}, projectLocality = LOCAL] => [atbatting:integer, $hashvalue_30:bigint]
                        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                        $hashvalue_30 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(atbatting), BIGINT'0')) (1:36)
                        atbatting := PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR} (1:36)
            - LocalExchange[HASH][$hashvalue_31] (atbatting_4) => [atbatting_4:integer, $hashvalue_31:bigint]
                    Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                - RemoteStreamingExchange[REPARTITION][$hashvalue_32] => [atbatting_4:integer, $hashvalue_32:bigint]
                        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                    - ScanProject[table = TableHandle {connectorId='pinot', connectorHandle='PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, forBroker=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR}, PinotColumnHandle{columnName=""baseOnBalls"", dataType=integer, type=REGULAR}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT ""AtBatting"", ""baseOnBalls"" FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE__ WHERE (""baseOnBalls"" > 439)__TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=SQL, table=baseballStats, expectedColumnIndices=[0, 1], groupByClauses=0, haveFilter=true, forBroker=false}]}', layout='Optional[PinotTableHandle{connectorId=pinot, schemaName=default, tableName=baseballStats, forBroker=Optional[false], expectedColumnHandles=Optional[[PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR}, PinotColumnHandle{columnName=""baseOnBalls"", dataType=integer, type=REGULAR}]], pinotQuery=Optional[GeneratedPinotQuery{query=SELECT ""AtBatting"", ""baseOnBalls"" FROM baseballStats__TABLE_NAME_SUFFIX_TEMPLATE__ WHERE (""baseOnBalls"" > 439)__TIME_BOUNDARY_FILTER_TEMPLATE__ LIMIT 2147483647, format=SQL, table=baseballStats, expectedColumnIndices=[0, 1], groupByClauses=0, haveFilter=true, forBroker=false}]}]'}, projectLocality = LOCAL] => [atbatting_4:integer, $hashvalue_33:bigint]
                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                            $hashvalue_33 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(atbatting_4), BIGINT'0')) (1:60)
                            atbatting_4 := PinotColumnHandle{columnName=""AtBatting"", dataType=integer, type=REGULAR} (1:60)
there's this piece that gets applied to a query: `filterPredicate = BOOLEAN'true', dynamicFilter = {386 -> atbatting}`which is defined in InnerJoin node
dynamicFilterAssignments = {atbatting_4 -> 386}
. Looking here, the source for the dynamic assignment should be a child of the table
ScanFilterProject
node requiring it?
So after looking a bit more, the
ScanProject
becomes a
ScanFilterProject
having a
filterPredicate
with no filter (true) and a
dynamicFilter
which is handled Presto-side so doesn't actually result in any fetch savings here