Hi all, I am using the Pinot connector for Trino a...
# troubleshooting
t
Hi all, I am using the Pinot connector for Trino and for the exact same query pinot is able to query in 53 ms but it takes trino over 46 seconds to return the same result. Would anyone know why there is such a discrepancy there?
m
Can you run explain on that query in Presto? It should show exactly what query was pushed down to Pinot. Likely there is some construct/transform that the connector couldn’t push down
t
Copy code
EXPLAIN SELECT * FROM pinot.default.uplinkpayloadevent
WHERE app_tok='c47e949cc0428bdac390'
ORDER BY message_timestamp DESC
LIMIT 30;
Copy code
Fragment 0 [SINGLE]
    Output layout: [app_tok, gatewayaddress, message_str, key_hash, net_tok, message_timestamp, acctid, id, moduleaddress, time_string, key_range]
    Output partitioning: SINGLE []
    Output[columnNames = [app_tok, gatewayaddress, message_str, key_hash, net_tok, message_timestamp, acctid, id, moduleaddress, time_string, key_range]]
    │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    └─ TopN[count = 30, orderBy = [message_timestamp DESC NULLS LAST]]
       │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
       │   Estimates: 
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
          └─ RemoteSource[sourceFragmentIds = [1]]
                 Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
                 Estimates: 

Fragment 1 [SOURCE]
    Output layout: [app_tok, gatewayaddress, message_str, key_hash, net_tok, message_timestamp, acctid, id, moduleaddress, time_string, key_range]
    Output partitioning: SINGLE []
    TopNPartial[count = 30, orderBy = [message_timestamp DESC NULLS LAST]]
    │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
    │   Estimates: 
    └─ TableScan[table = pinot:PinotTableHandle{schemaName=default, tableName=uplinkpayloadevent, constraint={PinotColumnHandle{columnName=app_tok, dataType=varchar, expression=app_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}=[ SortedRangeSet[type=varchar, ranges=1, {[c47e949cc0428bdac390]}] ]}, limit=OptionalLong.empty, query=Optional.empty}]
           Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           app_tok := PinotColumnHandle{columnName=app_tok, dataType=varchar, expression=app_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           gatewayaddress := PinotColumnHandle{columnName=gatewayaddress, dataType=varchar, expression=gatewayaddress, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           message_str := PinotColumnHandle{columnName=message_str, dataType=json, expression=message_str, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           key_hash := PinotColumnHandle{columnName=key_hash, dataType=varchar, expression=key_hash, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           net_tok := PinotColumnHandle{columnName=net_tok, dataType=varchar, expression=net_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           message_timestamp := PinotColumnHandle{columnName=message_timestamp, dataType=bigint, expression=message_timestamp, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           acctid := PinotColumnHandle{columnName=acctid, dataType=varchar, expression=acctid, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           id := PinotColumnHandle{columnName=id, dataType=varchar, expression=id, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           moduleaddress := PinotColumnHandle{columnName=moduleaddress, dataType=varchar, expression=moduleaddress, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           time_string := PinotColumnHandle{columnName=time_string, dataType=varchar, expression=time_string, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           key_range := PinotColumnHandle{columnName=key_range, dataType=varchar, expression=key_range, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
Copy code
> SELECT * FROM pinot.default.uplinkpayloadevent
  WHERE app_tok='c47e949cc0428bdac390'
  ORDER BY message_timestamp DESC
  LIMIT 30
[2022-10-27 10:39:41] 30 rows retrieved starting from 1 in 19 s 65 ms (execution: 1 s 173 ms, fetching: 17 s 892 ms)
seems like its a little faster now, but still not fast
Screen Shot 2022-10-27 at 10.38.18 AM.png
Copy code
EXPLAIN ANALYZE SELECT * FROM pinot.default.uplinkpayloadevent
WHERE app_tok='c47e949cc0428bdac390'
ORDER BY message_timestamp DESC
LIMIT 30;
Copy code
Fragment 1 [SINGLE]
    CPU: 53.52ms, Scheduled: 101.32ms, Blocked 18.56s (Input: 6.36s, Output: 0.00ns), Input: 8506 rows (12.79MB); per task: avg.: 8506.00 std.dev.: 0.00, Output: 30 rows (43.48kB)
    Output layout: [app_tok, gatewayaddress, message_str, key_hash, net_tok, message_timestamp, acctid, id, moduleaddress, time_string, key_range]
    Output partitioning: SINGLE []
    TopN[count = 30, orderBy = [message_timestamp DESC NULLS LAST]]
    │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
    │   Estimates: 
    │   CPU: 10.00ms (0.01%), Scheduled: 10.00ms (0.01%), Blocked: 0.00ns (0.00%), Output: 30 rows (43.48kB)
    │   Input avg.: 8506.00 rows, Input std.dev.: 0.00%
    └─ LocalExchange[partitioning = SINGLE]
       │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
       │   CPU: 15.00ms (0.02%), Scheduled: 61.00ms (0.04%), Blocked: 12.21s (65.75%), Output: 8506 rows (12.79MB)
       │   Input avg.: 8506.00 rows, Input std.dev.: 0.00%
       └─ RemoteSource[sourceFragmentIds = [2]]
              Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
              Estimates: 
              CPU: 17.00ms (0.02%), Scheduled: 18.00ms (0.01%), Blocked: 6.36s (34.25%), Output: 8506 rows (12.79MB)
              Input avg.: 8506.00 rows, Input std.dev.: 0.00%

Fragment 2 [SOURCE]
    CPU: 1.51m, Scheduled: 2.38m, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 2660145 rows (3.99GB); per task: avg.: 532029.00 std.dev.: 36542.78, Output: 8506 rows (12.79MB)
    Output layout: [app_tok, gatewayaddress, message_str, key_hash, net_tok, message_timestamp, acctid, id, moduleaddress, time_string, key_range]
    Output partitioning: SINGLE []
    TopNPartial[count = 30, orderBy = [message_timestamp DESC NULLS LAST]]
    │   Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
    │   Estimates: 
    │   CPU: 1.82s (2.01%), Scheduled: 1.92s (1.34%), Blocked: 0.00ns (0.00%), Output: 8506 rows (12.79MB)
    │   Input avg.: 2287.31 rows, Input std.dev.: 221.29%
    └─ TableScan[table = pinot:PinotTableHandle{schemaName=default, tableName=uplinkpayloadevent, constraint={PinotColumnHandle{columnName=app_tok, dataType=varchar, expression=app_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}=[ SortedRangeSet[type=varchar, ranges=1, {[c47e949cc0428bdac390]}] ]}, limit=OptionalLong.empty, query=Optional.empty}]
           Layout: [app_tok:varchar, gatewayaddress:varchar, message_str:json, key_hash:varchar, net_tok:varchar, message_timestamp:bigint, acctid:varchar, id:varchar, moduleaddress:varchar, time_string:varchar, key_range:varchar]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 1.48m (97.95%), Scheduled: 2.35m (98.59%), Blocked: 0.00ns (0.00%), Output: 2660145 rows (3.99GB)
           Input avg.: 2287.31 rows, Input std.dev.: 221.29%
           app_tok := PinotColumnHandle{columnName=app_tok, dataType=varchar, expression=app_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           gatewayaddress := PinotColumnHandle{columnName=gatewayaddress, dataType=varchar, expression=gatewayaddress, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           message_str := PinotColumnHandle{columnName=message_str, dataType=json, expression=message_str, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           key_hash := PinotColumnHandle{columnName=key_hash, dataType=varchar, expression=key_hash, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           net_tok := PinotColumnHandle{columnName=net_tok, dataType=varchar, expression=net_tok, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           message_timestamp := PinotColumnHandle{columnName=message_timestamp, dataType=bigint, expression=message_timestamp, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           acctid := PinotColumnHandle{columnName=acctid, dataType=varchar, expression=acctid, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           id := PinotColumnHandle{columnName=id, dataType=varchar, expression=id, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           moduleaddress := PinotColumnHandle{columnName=moduleaddress, dataType=varchar, expression=moduleaddress, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           time_string := PinotColumnHandle{columnName=time_string, dataType=varchar, expression=time_string, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           key_range := PinotColumnHandle{columnName=key_range, dataType=varchar, expression=key_range, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
m
Hmm, not sure why this is not showing the exact sql sent to Pinot (I thought it does). What I am trying to find is the exact query that pinot received.
@Xiang Fu any idea?
Can you check if the query went to pinot broker or pinot server, in the logs?
t
hm I am not seeing query level logs in either server or broker, I am assuming its only logging at an INFO/WARNING level
x
I don’t think this query pushed down. It scanned: 2660145 rows (3.99GB) after predicate filtering is 8506 row, sort and limit is 30 rows
cc: @Elon any idea why this query is not pushing down ?
e
Hey, it looks like the topN: can you try ordering in pinot instead? i.e. use a "passthrough query":
Copy code
SELECT * FROM pinot.default."select * from uplinkpayloadevent
WHERE app_tok='c47e949cc0428bdac390'
ORDER BY message_timestamp DESC
LIMIT 30";
Does that work for you?
t
Yes, the pinot pass through is significantly more performant
Copy code
Query 20221028_120450_00000_sb9hy, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
1.28 [30 rows, 45.8KB] [23 rows/s, 35.9KB/s]

Query 20221028_120455_00001_sb9hy, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0.45 [30 rows, 45.8KB] [67 rows/s, 103KB/s]
For reference, the non-passthrough queries:
Copy code
Query 20221028_120556_00002_sb9hy, FINISHED, 5 nodes
Splits: 1,317 total, 1,317 done (100.00%)
1:29 [4.55M rows, 6.63GB] [51.4K rows/s, 76.7MB/s]

Query 20221028_120749_00003_sb9hy, FINISHED, 5 nodes
Splits: 1,317 total, 1,317 done (100.00%)
1:12 [4.56M rows, 6.63GB] [63.6K rows/s, 94.9MB/s]