Hi All , We have been tring to do some Kafka Inte...
# troubleshooting
v
Hi All , We have been tring to do some Kafka Integration for topics secured as SASL_PLAINTEXT . While doing this , we have been getting the below exceptions . Just to double check I have craeted a Java client and got that working and consuming messages . However Pinot is not able to consume messages with pretty much same settings . Can someone suggest whats wrong here ?
2022/02/23 16:50:56.586 ERROR [PinotTableIdealStateBuilder] [grizzly-http-server-0] Could not get PartitionGroupMetadata for topic: gsp.dataacquisition.risk.public.v2.<Redacted> of table: <Redacted>_REALTIME
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2022/02/23 16:50:56.591 ERROR [PinotTableRestletResource] [grizzly-http-server-0] org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
at org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder.getPartitionGroupMetadataList(PinotTableIdealStateBuilder.java:172) ~[pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-428e7d75f91b9d4b4a2288f131d02d643bb2df5d]
at org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.getNewPartitionGroupMetadataList(PinotLLCRealtimeSegmentManager.java:764)
Below is the table config for reference -
Copy code
{



    "tableName": "<Redacted>",

    "tableType": "REALTIME",

    "segmentsConfig": {

      "schemaName": "<Redacted>",

      "timeColumnName": "PublishDateTimeUTC",

      "allowNullTimeValue": false,

      "replication": "1",

      "replicasPerPartition": "2",

      "completionConfig":{

        "completionMode":"DOWNLOAD"

      }

    },

    "tenants": {

      "broker": "DefaultTenant",

      "server": "DefaultTenant",

      "tagOverrideConfig": {}

    },

    "tableIndexConfig": {

      "invertedIndexColumns": [],

      "noDictionaryColumns": ["some columns "],

      "rangeIndexColumns": [],

      "rangeIndexVersion": 1,

      "autoGeneratedInvertedIndex": false,

      "createInvertedIndexDuringSegmentGeneration": false,

      "sortedColumn": [],

      "bloomFilterColumns": [],

      "loadMode": "MMAP",

      "streamConfigs": {

        "streamType": "kafka",

        "stream.kafka.topic.name": "gsp.dataacquisition.risk.public.v2.<Redacted>",

        "stream.kafka.broker.list": "comma separated list of servers",

        "stream.kafka.consumer.type": "lowlevel",

        "stream.kafka.consumer.prop.auto.offset.reset": "largest",

        "stream.kafka.schema.registry.url": <http://someaddress:8081>,

        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",

                "stream.kafka.sasl.mechanism": "SCRAM-SHA-256" ,

                "stream.kafka.security.protocol": "SASL_PLAINTEXT" ,

                "stream.kafka.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"some user\" password=\"somepwd\"",

        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",

        "realtime.segment.flush.threshold.rows": "0",

        "realtime.segment.flush.threshold.size":"0",

        "realtime.segment.flush.threshold.time": "24h",

        "realtime.segment.flush.autotune.initialRows": "3000000",

        "realtime.segment.flush.threshold.segment.size": "500M"

      },

                  "onHeapDictionaryColumns": [],

      "varLengthDictionaryColumns": [],

      "enableDefaultStarTree": false,

      "enableDynamicStarTreeCreation": false,

      "aggregateMetrics": false,

      "nullHandlingEnabled": false

    },

    "metadata": {},

    "quota": {},

    "routing": {"instanceSelectorType": "strictReplicaGroup"},

    "query": {},

    "ingestionConfig": {},

    "isDimTable": false,

    "upsertConfig": {

    "mode": "FULL",

        "comparisonColumn": "PublishDateTimeUTC"

    },

    "primaryKeyColumns": [

    "BusinessDate","UID","UIDType","LegId"

  ]



}
m
I am guessing it is unable to connect to Kafka cc: @Alexander Pucher @Neha Pawar
v
@Mayank @Alexander Pucher @Neha Pawar This issue was basically integrating with the SSL . We fixed it by removing the stream.kafka prefix from stream.kafka.sasl.mechanism ,stream.kafka.security.protocol and stream.kafka.sasl.jaas.config . Please feel free to public this to the documentation because we can not find the documentation anywhere about how to integrate SSL secured Kafka with Pinot . Will be great value add for end users.
Another this we did here was added a semicolon in the end of jaas.config like -"sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"some user\" password=\"somepwd\";",
m
Could paste a sample config here @Vibhor Jaiswal? @Mark Needham we can then put it in our docs.
a
Thank you, @Vibhor Jaiswal. We’ll add this to our docs
v
@Mayank Yeah the table config is given above .
s
Hi @Vibhor Jaiswal I have added same config as you have mentioned above but still consumer kafka config shows sasl.mechanism = GSSAPI, below is the consumer config from logs :
Creating new kafka consumer and iterator for topic tSCalibrationRecord_pinot ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [XXXX] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-tscalrecord14_REALTIME_1647432834089_0-10 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = tscalrecord14_REALTIME_1647432834089_0 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.BytesDeserializer The configuration 'auto.isolation.level' was supplied but isn't a known config. The configuration 'auto.commit.enable' was supplied but isn't a known config. Kafka version: 2.8.1
@Mayank
working sasl_ssl and schema registry:
Copy code
{
  "tableName": "calrecord",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "calrec",
    "timeColumnName": "fixTimestamp",
    "allowNullTimeValue": false,
    "replication": "1",
    "replicasPerPartition": "1",
    "completionConfig": {
      "completionMode": "DOWNLOAD"
    }
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant",
    "tagOverrideConfig": {}
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "rangeIndexColumns": [],
    "rangeIndexVersion": 1,
    "autoGeneratedInvertedIndex": false,
    "createInvertedIndexDuringSegmentGeneration": false,
    "sortedColumn": [],
    "bloomFilterColumns": [],
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "tscrpinot",
      "stream.kafka.broker.list": "xxxx:9092",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
      "stream.kafka.schema.registry.url": "<https://xxx.cloud>",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.prop.schema.registry.rest.url": "<https://xxx.cloud>",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
      "<http://stream.kafka.decoder.prop.schema.registry.basic.auth.user.info|stream.kafka.decoder.prop.schema.registry.basic.auth.user.info>": "schemaregistryusername:password",
      "security.protocol": "SASL_SSL",
      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.size": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.autotune.initialRows": "3000000",
      "realtime.segment.flush.threshold.segment.size": "500M"
    },
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "enableDynamicStarTreeCreation": false,
    "aggregateMetrics": false,
    "nullHandlingEnabled": false
  },
  "metadata": {},
  "quota": {},
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "query": {},
  "ingestionConfig": {},
  "isDimTable": false
}