Hello. I am trying to integrate Confluent Kafka wi...
# ingestion
f
Hello. I am trying to integrate Confluent Kafka with Datahub using the documentation here (helm deploy) https://datahubproject.io/docs/deploy/confluent-cloud/ I've mounted a custom executor.yml file with sasl.username and sasl.password and I've confirmed that the file is mounted and the environment variables are there. (will post relevant config in replies) I've also confirmed that the credentials are correct and work using a basic local consumer. Unfortunately, when trying to run this action I get the following error. Could I get some help figuring out what exactly could be wrong here? I believe everything is mounted correctly and the credentials are correct, so I am really confused why I am getting a message that just says authentication failed
Copy code
datahub-actions actions -c /etc/datahub/actions/system/conf/executor.yaml
%3|1683586489.986|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<my broker url>/bootstr]: sasl_ssl://<my broker url>/bootstrap: SASL authentication error: Authentication failed (after 5055ms in state AUTH_REQ, 5 identical error(s) suppressed)
πŸ” 1
πŸ“– 1
l
Hey there πŸ‘‹ I'm The DataHub Community Support bot. I'm here to help make sure the community can best support you with your request. Let's double check a few things first: βœ… There's a lot of good information on our docs site: www.datahubproject.io/docs, Have you searched there for a solution? βœ… button βœ… It's not uncommon that someone has run into your exact problem before in the community. Have you searched Slack for similar issues? βœ… button Did you find a solution to your issue? ❌ Sorry you weren't able to find a solution. I'm sending you some tips on info you can provide to help the community troubleshoot. Whenever you feel your issue is solved, please react βœ… to your original message to let us know!
f
Copy code
values.yml
...
acryl-datahub-actions:
  enabled: true
  image:
    repository: acryldata/datahub-actions
    tag: "v0.0.11"
  extraEnvs:
    - name: DATAHUB_ACTIONS_SYSTEM_CONFIGS_PATH
      value: /etc/datahub/actions/system/conf
  extraVolumes:
    - name: actions-configs
      configMap:
        name: datahub-actions-configs
        items:
          - key: "executor.yaml"
            path: "executor.yaml"
  extraVolumeMounts:
    - name: actions-configs
      mountPath: /etc/datahub/actions/system/conf
...
global:
  ...
  kafka:
    bootstrap:
      server: <my broker url>
    schemaregistry:
      url: <my schema registry url>

  credentialsAndCertsSecrets:
    name: confluent-secrets
    secureEnv:
      sasl.jaas.config: sasl_jaas_config
      <http://basic.auth.user.info|basic.auth.user.info>: basic_auth_user_info
      sasl.username: sasl_username
      sasl.password: sasl_password

  springKafkaConfigurationOverrides:
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        client.dns.lookup: use_all_dns_ips
        basic.auth.credentials.source: USER_INFO
...
Copy code
apiVersion: v1
kind: ConfigMap
metadata:
  name: datahub-actions-configs
  namespace: datahub-prod
data:
  executor.yaml: |
    name: "ingestion_executor"
    source:
      type: "kafka"
      config:
        connection:
          bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
          schema_registry_url: ${SCHEMA_REGISTRY_URL:-<http://localhost:8081>}
          consumer_config:
            security.protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:-PLAINTEXT}
            sasl.mechanism: ${KAFKA_PROPERTIES_SASL_MECHANISM:-PLAIN}
            sasl.username: ${KAFKA_PROPERTIES_SASL_USERNAME}
            sasl.password: ${KAFKA_PROPERTIES_SASL_PASSWORD}
          schema_registry_config:
            <http://basic.auth.user.info|basic.auth.user.info>: ${KAFKA_PROPERTIES_BASIC_AUTH_USER_INFO}
        topic_routes:
          mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1}
          pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1}
    filter:
      event_type: "MetadataChangeLogEvent_v1"
      event:
        entityType: "dataHubExecutionRequest"
        changeType: "UPSERT"
        aspectName:
          - "dataHubExecutionRequestInput"
          - "dataHubExecutionRequestSignal"
        aspect:
          value:
            executorId: "${EXECUTOR_ID:-default}"
    action:
      type: "executor"
      config:
        executor_id: "${EXECUTOR_ID:-default}"
    datahub:
      server: "${DATAHUB_GMS_PROTOCOL:-http}://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}"
      extra_headers:
        Authorization: "Basic ${DATAHUB_SYSTEM_CLIENT_ID:-__datahub_system}:${DATAHUB_SYSTEM_CLIENT_SECRET:-JohnSnowKnowsNothing}"
Also for what its worth this is the quick script I wrote up to test the credentials and this worked fine, so I do not think the error is as simple as a credentials typo
Copy code
from confluent_kafka import Consumer

props = {
     'bootstrap.servers': '<my bootstrap server>',
     'security.protocol': 'SASL_SSL',
     'sasl.mechanisms': 'PLAIN',
     'sasl.username': '<client id>',
     'sasl.password': '<client secret>',
     '<http://session.timeout.ms|session.timeout.ms>': 45000
}
props["group.id"] = "python-group-1"
props["auto.offset.reset"] = "earliest"

consumer = Consumer(props)
consumer.subscribe(["DataHubUpgradeHistory_v1"])
try:
    while True:
         msg = consumer.poll(1.0)
         if msg is not None and msg.error() is None:
              print(msg.value())
         else:
              print("no message")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()
a
Hm, could you try entering the secret and ID as raw text in the recipe rather than saving in DH? That sometimes fixes stuff- what version are you on?
Could we hop on a call to understand how you’re connecting things? CC: @delightful-ram-75848 can help find time