Hello, I've got a question about realtime tables. ...
# general
d
Hello, I've got a question about realtime tables. If I'm correct the kafka consumer group ID is built in the code using the table name and replica ID, however I'm not able to find a consumer group for the table in my kafka cluster. Is there a way to list all the consumer groups that a realtime table is using? I would look like those IDs are stored in ZK under ideal states but I can't find them. Thanks
k
IIUC pinot doesn’t use consumer groups to consume from kafka and it is
null
. This is done mainly to let multiple pinot consumers consume from the same partition (for replication).
d
Thanks for forwarding that answer! I think that may relate to an issue I see in one of our tables where we set
group.id
👍 1
I'd still like to track offsets on each partition if this possible at all
m
Pinot does keep track of the offsets in individual segments (which are also mapped to partitions, although the partition value is embedded in the segment name, so it's maybe not exactly what you 're after
k
Is there a metric that Pinot exposes for tracking lag in consuming?
d
I'd like to know which offset it's been consumed at any time. The information in the segment metadata only provides start and end offsets for the segment. Alternatively I'd like to be able to store the offset and partition with every ingested row
m
how about the consuming segment info REST endpoint?
e.g.
Copy code
curl -X GET "<http://localhost:9000/tables/parkrun/consumingSegmentsInfo>" -H "accept: application/json"
Copy code
{
  "_segmentToConsumingInfoMap": {
    "parkrun__0__142__20220219T2118Z": [
      {
        "serverName": "Server_172.21.0.6_8098",
        "consumerState": "CONSUMING",
        "lastConsumedTimestamp": 1645452573998,
        "partitionToOffsetMap": {
          "0": "13991875"
        }
      }
    ]
  }
}
d
Thanks Mark, that may do the trick. Just one more question, if I want to include consumer record metadata in my table (let's say headers, offset, etc) is there an easy way to extract this? Any examples you can point me at?
m
not that I know of, but lemme have a quick look at the code
If we can't find anything, you could always write it up as a GH issue --> https://github.com/apache/pinot/issues
d
I've seen issue 7004 opens the discussion to store those values as table metadata but the ticket has been opened for 8 months now
m
@User I guess https://github.com/apache/pinot/issues/7004 got stuck back in July. Anything we should do to revive it?
m
@User Can you comment on the issue describing your need and +1, it will greatly help.
d
Yup, done
Coming back to this issue, I'm looking at the data freshness metrics to see if I can get away using the timestamp. The query response return a property called
minConsumingFreshnessTimeMs
and the code has comment that reads
the timestamp indicating the freshness of the data queried in consuming events
This can be ingestion timestamp if provided by the stream, or last index time
Does this mean that any record read from the consuming segments with timestamp later than this min freshness timestamp is pruned and not included in the query?
One of the technical docs mentions a
freshnessLag
tolerance property that can be set in the table
But the only reference I can find to such name is in metrics
Also, is there anyway I can filter my queries using this timestamp? Doesn't look like it's a query option
m
For
minConsumingFreshnessTimeMs
, there is no implicit filter applied on top of what the query has to filter out any rows.
Not sure if I follow this, but you can simple put a
where
clause in your query to add any filter (including time): https://apache-pinot.slack.com/archives/CDRCA57FC/p1645637469349929?thread_ts=1645448815.668029&amp;cid=CDRCA57FC
d
Sorry, let me rephrase my issue. I have a use case for incremental queries, clients will query the table and they need some sort of reference/watermark for the next call to query only the delta. I must guarantee that when a query is issued all the data for that query has already been consumed in pinot, I also need to guarantee the next call won't read data that has been fetched in the previous calls. This is why I was interested in being able to access both partition and offset in the table
For what I've read so far I guess that at the moment there isn't an easy way to achieve this