:wave: Hi everyone! I am new to apache pinot, I am...
# general
s
👋 Hi everyone! I am new to apache pinot, I am using it for realtime data ingestion from kafka topic, we are using confluent kafka and schema registry and avro schema. I am able to connect kafka topic as my table is successfully created and its in healthy state, but query is not showing any records. how can we check that it has some issues in consuming side. From swagger debug table api also I cannot see any errors.
f
Hi 🙂 Have you check server logs for any issues ?
s
where to check for server logs
f
It depends on the way you launch the server.
s
we launched it through helm
Got the logs : [Consumer clientId=f83d920f-6e8d-465b-bd83-2552132f241e2, groupId=tscalrecord7_REALTIME_1647430665510_0] Bootstrap broker XXXXXX:9092 (id: -1 rack: null) disconnected
f
Did you get any error ?
m
Check the table debug api in swagger to see if it bubbles up any problems
If there are events in Kafka stream, likely messages are being skipped due to decoding error or schema mismatch or table config issues (eg time unit mismatch)
s
@User no errors from table debug api,
Copy code
[
  {
    "tableName": "tscalrecord15_REALTIME",
    "numSegments": 2,
    "numServers": 2,
    "numBrokers": 1,
    "segmentDebugInfos": [],
    "serverDebugInfos": [
      {
        "serverName": "Server_pinot-server-0.pinot-server-headless.pinot.svc.cluster.local_8098",
        "numMessages": 0,
        "errors": 0
      },
      {
        "serverName": "Server_pinot-server-1.pinot-server-headless.pinot.svc.cluster.local_8098",
        "numMessages": 0,
        "errors": 0
      }
    ],
    "brokerDebugInfos": [],
    "tableSize": {
      "reportedSize": "0 bytes",
      "estimatedSize": "0 bytes"
    },
    "ingestionStatus": {
      "ingestionState": "HEALTHY",
      "errorMessage": ""
    }
  }
]
m
cc: @User
s
@User error is [Consumer clientId=f83d920f-6e8d-465b-bd83-2552132f241e2, groupId=tscalrecord7_REALTIME_1647430665510_0] Bootstrap broker XXXXXX:9092 (id: -1 rack: null) disconnected as kafka consumer config showed in server logs is not taking sasl properties what we provided in table config. Below is kafka consumer config from server logs : If you check for below properties they are not as provided in table config.
Copy code
sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
for topic tSCalibrationRecord_pinot ConsumerConfig values:
Copy code
allow.auto.create.topics = true
        <http://auto.commit.interval.ms|auto.commit.interval.ms> = 5000
        auto.offset.reset = latest
        bootstrap.servers = [XXXX]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-tscalrecord14_REALTIME_1647432834089_0-10
        client.rack =
        <http://connections.max.idle.ms|connections.max.idle.ms> = 540000
        <http://default.api.timeout.ms|default.api.timeout.ms> = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        <http://fetch.max.wait.ms|fetch.max.wait.ms> = 500
        fetch.min.bytes = 1
        group.id = tscalrecord14_REALTIME_1647432834089_0
        group.instance.id = null
        <http://heartbeat.interval.ms|heartbeat.interval.ms> = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        <http://max.poll.interval.ms|max.poll.interval.ms> = 300000
        max.poll.records = 500
        <http://metadata.max.age.ms|metadata.max.age.ms> = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        <http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        <http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
        <http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
        <http://request.timeout.ms|request.timeout.ms> = 30000
        <http://retry.backoff.ms|retry.backoff.ms> = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        <http://session.timeout.ms|session.timeout.ms> = 10000
        <http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
        <http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.BytesDeserializer
The configuration 'auto.isolation.level' was supplied but isn't a known config. The configuration 'auto.commit.enable' was supplied but isn't a known config. Kafka version: 2.8.1
x
Just a spot check, I see you’re using high level consumer. How about try low level consumer? And check out the config example noted here: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/import-from-apache-kafka#some-more-kafka-ingestion-configs which says ‘Use Kafka Partition(Low) Level Consumer with SSL’.
s
@User thank you for checking it quickly, I have tried with above link also, we are using confluent kafka , we are not using key store and other ssl key path, using basic auth(username and password) we are providing SASL_SSL as protocol. Low level consumer is always giving time out.
x
cc @User as I see you’re helping on another SASL_SSL related issue and in case you could spot any pattern here.
n
security protocol and other related configs need to be given directly, without the stream.kafka prefix: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/import-from-apache-kafka#ingesting-streaming-data
💡 1
and as per Xiaobing’s comment, please use lowlevel consumer. highlevel is deprecated
the doc doesn’t use sasl (uses ssl/truststore) etc like you pointed out, but use the same concept. any properties you want the consumer to read that are not already defined as Pinot consumer properties, should be given directly as those properties
also tagging @User, i think he uses confluent kafka with sasl_ssl? Elon, would you be able to look at the table config and check if something is off? And if you could share a sample streamConfig that works for you, we’ll put it in the docs 🙏
e
Thanks for the heads up!
We don't use sasl atm but are considering it. I can take a look and will share any findings.
👍 1
s
hi @User I am able to resolve this , will share table config in a while, you can take a look at it. many many thanks pinot team to take this up quickly and for supporting: @User, @User @User, @User
n
that’s great! yes please, we’ll put streamConfigs from your config as a reference for sasl_ssl in the docs, thank you 🙂
m
Thanks @User
s
hi @User, @User, Please find the below working config for connecting sasl_ssl :
Copy code
{
  "tableName": "calrecord",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "calrec",
    "timeColumnName": "fixTimestamp",
    "allowNullTimeValue": false,
    "replication": "1",
    "replicasPerPartition": "1",
    "completionConfig": {
      "completionMode": "DOWNLOAD"
    }
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant",
    "tagOverrideConfig": {}
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "rangeIndexColumns": [],
    "rangeIndexVersion": 1,
    "autoGeneratedInvertedIndex": false,
    "createInvertedIndexDuringSegmentGeneration": false,
    "sortedColumn": [],
    "bloomFilterColumns": [],
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "tscrpinot",
      "stream.kafka.broker.list": "xxxx:9092",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
      "stream.kafka.schema.registry.url": "<https://xxx.cloud>",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.prop.schema.registry.rest.url": "<https://xxx.cloud>",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.decoder.prop.basic.auth.credentials.source": "USER_INFO",
      "<http://stream.kafka.decoder.prop.schema.registry.basic.auth.user.info|stream.kafka.decoder.prop.schema.registry.basic.auth.user.info>": "schema_registry_username:schema_registry_password",
      "security.protocol": "SASL_SSL",
      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkausername\" password=\"kafkapassword\";",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.size": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.autotune.initialRows": "3000000",
      "realtime.segment.flush.threshold.segment.size": "500M"
    },
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "enableDynamicStarTreeCreation": false,
    "aggregateMetrics": false,
    "nullHandlingEnabled": false
  },
  "metadata": {},
  "quota": {},
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "query": {},
  "ingestionConfig": {},
  "isDimTable": false
}
m
Thanks @User @User