https://pinot.apache.org/ logo
d

Dan Hill

07/01/2020, 10:41 PM
I'm hitting a slow query case where my combined offline/realtime table
metrics
is slow to query but the individual
metrics_OFFLINE
and
metrics_REALTIME
are quick to query separately. Any ideas?
Copy code
select utc_date, sum(impressions) from metrics_OFFLINE where utc_date >= 1591142400000 and utc_date < 1593648000000 group by utc_date order by utc_date ASC limit 1831
This returns pretty fast (200ms) over a lot of 400mil rows. If I switch to
metrics_REALTIME
, it's also fast and returns zero rows.
Copy code
select utc_date, sum(impressions) from metrics_REALTIME where utc_date >= 1591142400000 and utc_date < 1593648000000 group by utc_date order by utc_date ASC limit 1831
However, if I query
metrics
, it's very slow.
Copy code
select utc_date, sum(impressions) from metrics where utc_date >= 1591142400000 and utc_date < 1593648000000 group by utc_date order by utc_date ASC limit 1831
The logs in
pinot-server
include the following timeout. I modified the query above to simplify it (in case you see differences).
Copy code
2020/07/01 22:28:11.526 ERROR [CombineGroupByOrderByOperator] [pqr-0] Timed out while combining group-by results after 9996ms, brokerRequest = BrokerRequest(querySource:QuerySource(tableName:metrics_OFFLINE), filterQuery:FilterQuery(id:0, value:null, operator:AND, nestedFilterQueryIds:[1, 2, 3]), aggregationsInfo:[AggregationInfo(aggregationType:SUM, aggregationParams:{column=clicks}, isInSelectList:true, expressions:[clicks]), AggregationInfo(aggregationType:SUM, aggregationParams:{column=impressions}, isInSelectList:true, expressions:[impressions]), AggregationInfo(aggregationType:SUM, aggregationParams:{column=cost_usd_micros}, isInSelectList:true, expressions:[cost_usd_micros])], groupBy:GroupBy(topN:1831, expressions:[utc_date]), filterSubQueryMap:FilterQueryMap(filterQueryMap:{0=FilterQuery(id:0, value:null, operator:AND, nestedFilterQueryIds:[1, 2, 3]), 1=FilterQuery(id:1, column:utc_date, value:[[1591142400000		*)], operator:RANGE, nestedFilterQueryIds:[]), 2=FilterQuery(id:2, column:utc_date, value:[(*		1593648000000)], operator:RANGE, nestedFilterQueryIds:[]), 3=FilterQuery(id:3, column:timestamp, value:[(*		1593561599986]], operator:RANGE, nestedFilterQueryIds:[])}), queryOptions:{responseFormat=sql, groupByMode=sql, timeoutMs=9997}, pinotQuery:PinotQuery(dataSource:DataSource(tableName:metrics), selectList:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:clicks))])), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:impressions))])), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:cost_usd_micros))]))], filterExpression:Expression(type:FUNCTION, functionCall:Function(operator:AND, operands:[Expression(type:FUNCTION, functionCall:Function(operator:GREATER_THAN_OR_EQUAL, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:LITERAL, literal:<Literal longValue:1591142400000>)])), Expression(type:FUNCTION, functionCall:Function(operator:LESS_THAN, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:LITERAL, literal:<Literal longValue:1593648000000>)]))])), groupByList:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date))], orderByList:[Expression(type:FUNCTION, functionCall:Function(operator:ASC, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date))]))], limit:1831), orderBy:[SelectionSort(column:utc_date, isAsc:true)], limit:1831)
Here are the metrics for the slow query `metrics`:
Copy code
{
  "exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 0,
  "numSegmentsQueried": 0,
  "numSegmentsProcessed": 0,
  "numSegmentsMatched": 0,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 0,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "numGroupsLimitReached": false,
  "totalDocs": 0,
  "timeUsedMs": 10000,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0
}
Here are the fast ones.
metrics_OFFLINE
Copy code
"exceptions": [],
    "numServersQueried": 1,
    "numServersResponded": 1,
    "numSegmentsQueried": 418,
    "numSegmentsProcessed": 418,
    "numSegmentsMatched": 319,
    "numConsumingSegmentsQueried": 0,
    "numDocsScanned": 319,
    "numEntriesScannedInFilter": 0,
    "numEntriesScannedPostFilter": 638,
    "numGroupsLimitReached": false,
    "totalDocs": 398507752,
    "timeUsedMs": 262,
    "segmentStatistics": [],
    "traceInfo": {},
    "minConsumingFreshnessTimeMs": 0
metrics_REALTIME
Copy code
{
  "resultTable": {
    "dataSchema": {
      "columnDataTypes": [
        "STRING",
        "DOUBLE"
      ],
      "columnNames": [
        "utc_date",
        "sum(impressions)"
      ]
    },
    "rows": []
  },
  "exceptions": [],
  "numServersQueried": 1,
  "numServersResponded": 1,
  "numSegmentsQueried": 1,
  "numSegmentsProcessed": 0,
  "numSegmentsMatched": 0,
  "numConsumingSegmentsQueried": 1,
  "numDocsScanned": 0,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "numGroupsLimitReached": false,
  "totalDocs": 0,
  "timeUsedMs": 16,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 9223372036854776000
}
x

Xiang Fu

07/01/2020, 11:11 PM
so realtime has no data?
d

Dan Hill

07/01/2020, 11:11 PM
Correct
I just created it
x

Xiang Fu

07/01/2020, 11:17 PM
can you try this:
Copy code
select utc_date, sum(impressions) from metrics_OFFLINE where utc_date >= 1591142400000 and utc_date < 1593648000000 and utc_date < 1593561599986 group by utc_date order by utc_date ASC limit 1831
this is generated query sent to offline server
for hybrid table, pinot get max time from offline and use that - 1 day as time boundary
d

Dan Hill

07/01/2020, 11:18 PM
That's fast
Ohhh
x

Xiang Fu

07/01/2020, 11:18 PM
hmmm
d

Dan Hill

07/01/2020, 11:18 PM
I might have populated offline with test data for tomorrow too.
Could that impact this?
x

Xiang Fu

07/01/2020, 11:18 PM
hmm
if querying offline itself if fast, then shouldn’t be a problem
d

Dan Hill

07/01/2020, 11:19 PM
Ok. Yea, querying offline directly is fast.
x

Xiang Fu

07/01/2020, 11:19 PM
the log is from pinot -server or broker
Copy code
2020/07/01 22:28:11.526 ERROR [CombineGroupByOrderByOperator] [pqr-0] Timed out while combining group-by results after 9996ms, brokerRequest = BrokerRequest(querySource:QuerySource(tableName:metrics_OFFLINE), filterQuery:FilterQuery(id:0, value:null, operator:AND, nestedFilterQueryIds:[1, 2, 3]), aggregationsInfo:[AggregationInfo(aggregationType:SUM, aggregationParams:{column=clicks}, isInSelectList:true, expressions:[clicks]), AggregationInfo(aggregationType:SUM, aggregationParams:{column=impressions}, isInSelectList:true, expressions:[impressions]), AggregationInfo(aggregationType:SUM, aggregationParams:{column=cost_usd_micros}, isInSelectList:true, expressions:[cost_usd_micros])], groupBy:GroupBy(topN:1831, expressions:[utc_date]), filterSubQueryMap:FilterQueryMap(filterQueryMap:{0=FilterQuery(id:0, value:null, operator:AND, nestedFilterQueryIds:[1, 2, 3]), 1=FilterQuery(id:1, column:utc_date, value:[[1591142400000		*)], operator:RANGE, nestedFilterQueryIds:[]), 2=FilterQuery(id:2, column:utc_date, value:[(*		1593648000000)], operator:RANGE, nestedFilterQueryIds:[]), 3=FilterQuery(id:3, column:timestamp, value:[(*		1593561599986]], operator:RANGE, nestedFilterQueryIds:[])}), queryOptions:{responseFormat=sql, groupByMode=sql, timeoutMs=9997}, pinotQuery:PinotQuery(dataSource:DataSource(tableName:metrics), selectList:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:clicks))])), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:impressions))])), Expression(type:FUNCTION, functionCall:Function(operator:SUM, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:cost_usd_micros))]))], filterExpression:Expression(type:FUNCTION, functionCall:Function(operator:AND, operands:[Expression(type:FUNCTION, functionCall:Function(operator:GREATER_THAN_OR_EQUAL, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:LITERAL, literal:<Literal longValue:1591142400000>)])), Expression(type:FUNCTION, functionCall:Function(operator:LESS_THAN, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date)), Expression(type:LITERAL, literal:<Literal longValue:1593648000000>)]))])), groupByList:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date))], orderByList:[Expression(type:FUNCTION, functionCall:Function(operator:ASC, operands:[Expression(type:IDENTIFIER, identifier:Identifier(name:utc_date))]))], limit:1831), orderBy:[SelectionSort(column:utc_date, isAsc:true)], limit:1831)
this one shows the extra predicate
Copy code
3=FilterQuery(id:3, column:timestamp, value:[(*		1593561599986]], operator:RANGE, nestedFilterQueryIds:[])
d

Dan Hill

07/01/2020, 11:22 PM
pinot-server
x

Xiang Fu

07/01/2020, 11:22 PM
so it’s timeout for offline query
d

Dan Hill

07/01/2020, 11:22 PM
Hmm, maybe it's a different query
x

Xiang Fu

07/01/2020, 11:22 PM
is this hybrid query consistently timeout?
an d is there any broker query?
d

Dan Hill

07/01/2020, 11:23 PM
It's consistently timing out.
Hmm, broker logs do not have any recent updates
The most recent one is a little stale
Copy code
2020/07/01 22:26:55.306 ERROR [QueryRouter] [jersey-server-managed-async-executor-19] Caught exception while sending request 1496 to server: pinot-server-0_O, marking query failed
java.net.UnknownHostException: pinot-server-0.pinot-server-headless.pinot-metrics-dev.svc.cluster.local: Name or service not known
	at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_252]
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) ~[?:1.8.0_252]
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) ~[?:1.8.0_252]
	at java.net.InetAddress.getAllByName0(InetAddress.java:1277) ~[?:1.8.0_252]
	at java.net.InetAddress.getAllByName(InetAddress.java:1193) ~[?:1.8.0_252]
	at java.net.InetAddress.getAllByName(InetAddress.java:1127) ~[?:1.8.0_252]
	at java.net.InetAddress.getByName(InetAddress.java:1077) ~[?:1.8.0_252]
x

Xiang Fu

07/01/2020, 11:24 PM
ic
hmmm
from the log, seem the timeout query is
select utc_date, sum(impressions), sum(cost_usd_micros) from metrics where utc_date >= 1591142400000 and utc_date < 1593648000000 and utc_date < 1593561599986 group by utc_date order by utc_date LIMIT 1831
d

Dan Hill

07/01/2020, 11:31 PM
Weird. I don't think my client sends three time filters.
My most recently timeout didn't generate any new logs
x

Xiang Fu

07/01/2020, 11:58 PM
it should send two
that one more is what pinot broker added
then send to offline table
d

Dan Hill

07/02/2020, 12:01 AM
Ah, okay.
j

Jackie

07/02/2020, 1:52 AM
Based on the query, seems you are using a separate column
timestamp
as the time column?
And filtering on that column is causing the slowness
The query should be
select utc_date, sum(impressions), sum(cost_usd_micros) from metrics where utc_date >= 1591142400000 and utc_date < 1593648000000 and timestamp < 1593561599986 group by utc_date order by utc_date LIMIT 1831
(notice that the last filter is on
timestamp
)
What type of index do you use on
timestamp
?
d

Dan Hill

07/02/2020, 3:58 AM
Copy code
metrics_realtime_table_config.json: |-
    {
      "tableName": "metrics",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "timestamp",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "metrics",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tableIndexConfig": {
        "loadMode"  : "MMAP",
        "aggregateMetrics": true,
        "noDictionaryColumns": ["insertions", "impressions", "clicks", "cost_usd_micros"],
        "starTreeIndexConfigs": [
          {
            "dimensionsSplitOrder": [
              "utc_date",
              "platform_id",
              "customer_id",
              "account_id",
              "campaign_id",
              "promotion_id"
            ],
            "skipStarNodeCreationForDimensions": [
            ],
            "functionColumnPairs": [
              "SUM__insertions",
              "SUM__impressions",
              "SUM__clicks",
              "SUM__cost_usd_micros"
            ]
          },
          {
            "dimensionsSplitOrder": [
              "utc_date",
              "platform_id",
              "content_id"
            ],
            "skipStarNodeCreationForDimensions": [
            ],
            "functionColumnPairs": [
              "SUM__insertions",
              "SUM__impressions",
              "SUM__clicks",
              "SUM__cost_usd_micros"
            ]
          }
        ],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "simple",
          "stream.kafka.topic.name": "metrics-realtime",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181",
          "stream.kafka.zk.broker.url": "kafka-zookeeper:2181",
          "stream.kafka.broker.list": "kafka:9092",
          "realtime.segment.flush.threshold.time": "3600000",
          "realtime.segment.flush.threshold.size": "50000",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
      },
      "tenants": {},
      "metadata": {
        "customConfigs": {}
      }
    }

  metrics_schema.json: |-
    {
      "schemaName": "metrics",
      "dimensionFieldSpecs": [
        {
          "name": "utc_date",
          "dataType": "LONG"
        },
        {
          "name": "platform_id",
          "dataType": "LONG"
        },
        {
          "name": "customer_id",
          "dataType": "LONG"
        },
        {
          "name": "account_id",
          "dataType": "LONG"
        },
        {
          "name": "campaign_id",
          "dataType": "LONG"
        },
        {
          "name": "promotion_id",
          "dataType": "LONG"
        },
        {
          "name": "content_id",
          "dataType": "LONG"
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "insertions",
          "dataType": "LONG"
        },
        {
          "name": "impressions",
          "dataType": "LONG"
        },
        {
          "name": "clicks",
          "dataType": "LONG"
        },
        {
          "name": "cost_usd_micros",
          "dataType": "LONG"
        }
      ],
      "timeFieldSpec": {
        "incomingGranularitySpec": {
          "name": "timestamp",
          "dataType": "LONG",
          "timeFormat" : "EPOCH",
          "timeType": "MILLISECONDS"
        }
      }
    }
Here's the realtime config. The offline one looks similar.
Even though I have my own time dimension, should I still pass in a rough timestamp filter? I could do both.
Even if I add explicit timestamp filters, it still times out.
Copy code
select utc_date, sum(impressions) from metrics where utc_date >= 1591142400000 and utc_date < 1593648000000 and "timestamp" >= 1591142400000 and "timestamp" < 1593648000000 group by utc_date order by utc_date ASC limit 1831
@Jackie @Xiang Fu - jfyi that I updated this thread.
j

Jackie

07/02/2020, 9:06 PM
@Dan Hill The timestamp filter is always automatically added for Hybrid use cases. One problem with this for OFFLINE side is that star-tree cannot be used because
timestamp
is not part of the
dimensionsSplitOrder
Can you try this query on the offline side only:
Copy code
select utc_date, sum(impressions) from metrics_OFFLINE where utc_date >= 1591142400000 and utc_date < 1593648000000 and timestamp < 1593648000000 group by utc_date order by utc_date ASC limit 1831
Also, is it possible to use
utc_date
as the time column?
d

Dan Hill

07/02/2020, 9:36 PM
Weird, yea, that query is slow.
Copy code
select utc_date, sum(impressions) from metrics_OFFLINE where utc_date >= 1591142400000 and utc_date < 1593648000000 and "timestamp" < 1593648000000 group by utc_date order by utc_date ASC limit 1831
Are there any issues with making utc the time column? I'll eventually support times in some other timezones (as separate dimensions).
j

Jackie

07/02/2020, 9:48 PM
@Dan Hill As I mentioned, the reason why this query is slow is because it cannot utilize the star-tree index
If you never explicitly query on timestamp, then it is perfectly fine to use utc as the time column
When you add other timezones, you can add them as dimensions
d

Dan Hill

07/02/2020, 9:57 PM
My current query is only querying
utc_date
and is slow.
Copy code
select utc_date, sum(impressions) from metrics_OFFLINE where utc_date >= 1591142400000 and utc_date < 1593648000000 group by utc_date order by utc_date ASC limit 1831
j

Jackie

07/02/2020, 11:45 PM
In that case, you can use utc_date as the time column instead
Actually all the time fields (if you want to add more in the future) should be added as
DateTimeField
, @Neha Pawar can give more context on that
d

Dan Hill

07/02/2020, 11:53 PM
Oh okay. Any benefit of keeping the raw timestamp column? What do we think the current issue with the query? Is Pinot trying to use the "timestamp" column internally for something? For adding other time fields, is that just the dataType or does that go into the timeFieldSpec? This table isn't serving yet so it's very easy to change.
Copy code
{
  "schemaName": "metrics",
  "dimensionFieldSpecs": [
    {
      "name": "utc_date",
      "dataType": "LONG"
    },
    {
      "name": "platform_id",
      "dataType": "LONG"
    },
    {
      "name": "customer_id",
      "dataType": "LONG"
    },
    {
      "name": "account_id",
      "dataType": "LONG"
    },
    {
      "name": "campaign_id",
      "dataType": "LONG"
    },
    {
      "name": "promotion_id",
      "dataType": "LONG"
    },
    {
      "name": "content_id",
      "dataType": "LONG"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "insertions",
      "dataType": "LONG"
    },
    {
      "name": "impressions",
      "dataType": "LONG"
    },
    {
      "name": "clicks",
      "dataType": "LONG"
    },
    {
      "name": "cost_usd_micros",
      "dataType": "LONG"
    }
  ],
  "timeFieldSpec": {
    "incomingGranularitySpec": {
      "name": "timestamp",
      "dataType": "LONG",
      "timeFormat": "EPOCH",
      "timeType": "MILLISECONDS"
    }
  }
}
j

Jackie

07/03/2020, 2:38 AM
If you do not explicitly query it, then you can replace it with
utc_date
which uses
DAYS
granularity
For Hybrid use cases, Pinot uses the time column to merge the result from OFFLINE side and REALTIME side. That's the reason why there is an extra filter on
timestamp
column
You can update your schema to:
Copy code
{
  "schemaName": "metrics",
  "dimensionFieldSpecs": [
    {
      "name": "platform_id",
      "dataType": "LONG"
    },
    {
      "name": "customer_id",
      "dataType": "LONG"
    },
    {
      "name": "account_id",
      "dataType": "LONG"
    },
    {
      "name": "campaign_id",
      "dataType": "LONG"
    },
    {
      "name": "promotion_id",
      "dataType": "LONG"
    },
    {
      "name": "content_id",
      "dataType": "LONG"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "insertions",
      "dataType": "LONG"
    },
    {
      "name": "impressions",
      "dataType": "LONG"
    },
    {
      "name": "clicks",
      "dataType": "LONG"
    },
    {
      "name": "cost_usd_micros",
      "dataType": "LONG"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "utc_date",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:DAYS"
    }
  ]
}
d

Dan Hill

07/03/2020, 2:43 AM
Ah ok. Will it merge just the aggregates for the same dateTimeFieldSpecs?
j

Jackie

07/03/2020, 2:48 AM
Pinot will use the time column you specified in the
tableConfig.segmentsConfig.timeColumnName
to create the extra filter and merge the result
The reason for the slowness is because
timestamp
is not in the star-tree split order, so your query won't utilize the star-tree index
If you change time column from
timestamp
to
utc_date
, it is part of the split order, so your query can utilize the star-tree
d

Dan Hill

07/03/2020, 3:08 AM
Sweet, thanks!
Do I need to have
utc_date
in the star tree index? Should I remove it?
Any updates to
segmentsConfig
?
Copy code
"segmentsConfig" : {
...
    "timeColumnName": "utc_date",
    "timeType": "MILLISECONDS",
...
  },