Hello everyone, Im struggling to create a Kafka st...
# general
n
Hello everyone, Im struggling to create a Kafka stream ingestion with HLC consumer and a custom consumer group id. Anyone has worked on a similar case ?
I got this kind of error, but I can’t figure out why, the documentation is quite poor on this subject
Copy code
Caught exception while processing resource contactEvents_REALTIME, skipping.
java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 7
	at java.lang.String.checkBoundsBeginEnd(String.java:3319) ~[?:?]
	at java.lang.String.substring(String.java:1874) ~[?:?]
	at org.apache.pinot.common.utils.HLCSegmentName.<init>(HLCSegmentName.java:99) ~[pinot-all-0.10.0-jar-with-dependencies.jar:0.10.0-30c4635bfeee88f88aa9c9f63b93bcd4a650607f]
k
Hi can you share you table config and schema. Please remove any secrets from it if there
n
Don’t worry, I’m using helm with env var config secret
Copy code
{
  "tableName": "contactEvents",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "created_at",
    "timeType": "DAYS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "3650",
    "segmentPushType": "APPEND",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "schemaName": "contactEvents",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant",
    "tagOverrideConfig": {}
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "highlevel",
      "stream.kafka.topic.name": "united-pg.public.contact_events",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.broker.list": "strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9093",
      "stream.kafka.hlc.bootstrap.server": "strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9093",
      "stream.kafka.hlc.zk.connect.string": "strimzi-zookeeper-client.strimzi.svc.cluster.local:2181",
      "stream.kafka.consumer.prop.auto.offset.reset": "earliest",
      "stream.kafka.hlc.group.id": "pinot",
      "security.protocol": "SSL",
      "ssl.truststore.type": "PKCS12",
      "ssl.keystore.type": "PKCS12",
      "ssl.truststore.location": "/ssl/ca.p12",
      "ssl.keystore.location": "/ssl/strimzi-user-pinot.p12",
      "ssl.truststore.password": "${SSL_TRUSTSTORE_PASSWORD}",
      "ssl.keystore.password": "${SSL_KEYSTORE_PASSWORD}"
    },
    "bloomFilterColumns": [
      "tenant_id",
      "contact_id",
      "content_id"
    ]
  },
  "metadata": {
    "customConfigs": {}
  }
}
I tried many things so maybe some config is redundant, but while looking on pinot source code I spotted this option
"stream.kafka.hlc.group.id"
I think It is causing an issue with the
HLCSegmentName
class
k
Yes, that is the cause of the issue. Will it be possible for you to use
lowlevel
consumer? I am meanwhile looking for a fix. I guess just increasing the groupId to a bigger string should fix it but need to verify.
n
In my case, I really need a consumer group so I can’t use
lowlevel
consumer type right now 😕 I will try with a bigger group id (
apachepinot
instead and let you know)
k
The error seems to be in
PinotTableIdealStateBuilder
class. Working on alternative/fix.
n
why do you particularly need a consumer group? HLC has been deprecated.. here’s some reasons why we did that: https://www.confluent.io/resources/kafka-summit-2020/apache-pinot-case-study-building-distributed-analytics-systems-using-apache-kafka/
n
I need It to keep control on the offset and track the lag on my broker
Internally you are using
null
group id or random string generated ones ? Because in the case you enable ACLs on the external broker you will have issues ?
n
there’s a separate effort ongoing to be able to track lag between kafka consumer and broker offset: https://github.com/apache/pinot/pull/8280
wy do you think there would be issues if ACLs are enabled on external broker? are you seeing any problems like those in llc?
n
Because, If you specify a consumer group id, you need to have authorization for a user to access this group id and perform offset commit , read ,etc But if you specify a
null
group id, you will have no authorization error. After some search Kafka has change the behavior of group id
""
and group id
null
https://issues.apache.org/jira/browse/KAFKA-6774
n
Yes I already did It and It’s working