fierce-finland-15121
05/08/2023, 10:58 PMdatahub-actions actions -c /etc/datahub/actions/system/conf/executor.yaml
%3|1683586489.986|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<my broker url>/bootstr]: sasl_ssl://<my broker url>/bootstrap: SASL authentication error: Authentication failed (after 5055ms in state AUTH_REQ, 5 identical error(s) suppressed)
lively-cat-88289
05/08/2023, 10:58 PMfierce-finland-15121
05/08/2023, 11:02 PMvalues.yml
...
acryl-datahub-actions:
enabled: true
image:
repository: acryldata/datahub-actions
tag: "v0.0.11"
extraEnvs:
- name: DATAHUB_ACTIONS_SYSTEM_CONFIGS_PATH
value: /etc/datahub/actions/system/conf
extraVolumes:
- name: actions-configs
configMap:
name: datahub-actions-configs
items:
- key: "executor.yaml"
path: "executor.yaml"
extraVolumeMounts:
- name: actions-configs
mountPath: /etc/datahub/actions/system/conf
...
global:
...
kafka:
bootstrap:
server: <my broker url>
schemaregistry:
url: <my schema registry url>
credentialsAndCertsSecrets:
name: confluent-secrets
secureEnv:
sasl.jaas.config: sasl_jaas_config
<http://basic.auth.user.info|basic.auth.user.info>: basic_auth_user_info
sasl.username: sasl_username
sasl.password: sasl_password
springKafkaConfigurationOverrides:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
client.dns.lookup: use_all_dns_ips
basic.auth.credentials.source: USER_INFO
...
apiVersion: v1
kind: ConfigMap
metadata:
name: datahub-actions-configs
namespace: datahub-prod
data:
executor.yaml: |
name: "ingestion_executor"
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-<http://localhost:8081>}
consumer_config:
security.protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:-PLAINTEXT}
sasl.mechanism: ${KAFKA_PROPERTIES_SASL_MECHANISM:-PLAIN}
sasl.username: ${KAFKA_PROPERTIES_SASL_USERNAME}
sasl.password: ${KAFKA_PROPERTIES_SASL_PASSWORD}
schema_registry_config:
<http://basic.auth.user.info|basic.auth.user.info>: ${KAFKA_PROPERTIES_BASIC_AUTH_USER_INFO}
topic_routes:
mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1}
pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1}
filter:
event_type: "MetadataChangeLogEvent_v1"
event:
entityType: "dataHubExecutionRequest"
changeType: "UPSERT"
aspectName:
- "dataHubExecutionRequestInput"
- "dataHubExecutionRequestSignal"
aspect:
value:
executorId: "${EXECUTOR_ID:-default}"
action:
type: "executor"
config:
executor_id: "${EXECUTOR_ID:-default}"
datahub:
server: "${DATAHUB_GMS_PROTOCOL:-http}://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}"
extra_headers:
Authorization: "Basic ${DATAHUB_SYSTEM_CLIENT_ID:-__datahub_system}:${DATAHUB_SYSTEM_CLIENT_SECRET:-JohnSnowKnowsNothing}"
fierce-finland-15121
05/08/2023, 11:12 PMfrom confluent_kafka import Consumer
props = {
'bootstrap.servers': '<my bootstrap server>',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '<client id>',
'sasl.password': '<client secret>',
'<http://session.timeout.ms|session.timeout.ms>': 45000
}
props["group.id"] = "python-group-1"
props["auto.offset.reset"] = "earliest"
consumer = Consumer(props)
consumer.subscribe(["DataHubUpgradeHistory_v1"])
try:
while True:
msg = consumer.poll(1.0)
if msg is not None and msg.error() is None:
print(msg.value())
else:
print("no message")
except KeyboardInterrupt:
pass
finally:
consumer.close()
astonishing-answer-96712
05/16/2023, 9:12 PMastonishing-answer-96712
05/16/2023, 11:59 PM