Hi, if i use datahub-kafka as my sink, and im usin...
# ingestion
a
Hi, if i use datahub-kafka as my sink, and im using AWS MSK, do I have to add extra configs on the
producer_config
in order to use datahub-kafka?
g
a
yep^^ i set it up following the guide. but for some reason i can sink it to datahub-gms but not datahub-kafka….error log said it failed to write
e
Hey. Did you set up SSL in MSK?
Are you using schema registry from confluent?
a
Hi @early-lamp-41924 yea im using the schem registry from confluent and i did not add any SSL in MSK
for your information MAE worked well but MCE for some reason is not consuming anything…
both MCE and MAE uses the same MSK’s bootstrap server but when I tried to read the topic MetadataChangeEvent_v4…it has nothing in it…
these are my MCE deployment env
Copy code
env:
            - name: MCE_CONSUMER_ENABLED
              value: "true"
            - name: KAFKA_BOOTSTRAP_SERVER
              value: "{{ $kafka_uri }}"
            - name: KAFKA_SCHEMAREGISTRY_URL
              value: "http://{{ $service_name }}-cp-schema-registry:{{ $schema_registry_port }}"
            - name: GMS_HOST
              value: "{{ $service_name }}-gms"
            - name: GMS_PORT
              value: "8080"
e
Seems like python ingest is failing to push mces to the topic
This is very weird
if ssl is not set. simply filling out bootstrap and schema_registry_url should work. https://datahubproject.io/docs/metadata-ingestion#datahub-kafka-datahub-kafka
m
So this is non-ssl Kafka push?
a
yea its an non ssl kafka push….i dont think i need to set any ssl config right? because MAE was working perfectly….i guess something in MCE is not ingesting the push from the python script…
this is my error log on datahub-ingestion
Copy code
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"} and info {'error': KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"},
m
what does your kafka sink config look like?
a
Copy code
source:
      type: glue
      config:
        aws_region: "<<region_name>>"
        extract_transforms: False
        env: "PROD"

    sink:
      type: "datahub-kafka"
      config:
        connection:
          bootstrap: "<http://blahblah-2.amazonaws.com:9092,blahblah-3.amazonaws.com:9092,blahblah-1.amazonaws.com:9092|blahblah-2.amazonaws.com:9092,blahblah-3.amazonaws.com:9092,blahblah-1.amazonaws.com:9092>"
          schema_registry_url: "<http://datahub-cp-schema-registry:8081>"
ill just use datahub-rest for now….i tried opening all inbounds and outbounds rules in MSK and kubernetes SG and it still wouldn’t work…
😞
c
Hello @adventurous-scooter-52064, did you solve the problem about kafka msg_time_out? I’m getting the same error now but i couldn’t solve it :/
a
nope….i think there’s a connection problem between schema registry(k8s) and aws msk….so im gonna try with aws msk and aws glue schema registry soon…right now we use our recipe with datahub-rest as our sink.
c
Ok thanks for reply
a
hi, we are receiving the same error. the use case is the same but the schema registry. we deployed registry on eks. MSK has plaintext bootstraps and we are receiving timeout on metadata ingestion. Also prehook kafka job is working seemlessly when using the same plaintext bootstrap urls.
m
@able-park-49455: we are aware of this issue and working with a few folks on a recommendation for this pattern. /cc @early-lamp-41924
e
@adventurous-scooter-52064 @curved-jordan-15657 @able-park-49455 do you have SSL setup for MSK? We were able to reproduce the issue on our cluster with SSL, but it worked well on our cluster without SSL
Trying to figure out the root cause
a
no, we dont have ssl with our aws msk haha i dont even know how to set up ssl for msk honestly haha
👍 1
c
@early-lamp-41924 no we don’t have. We’ve tried everything but it doesn’t work. We can even access with python. We can produce consume with kafka-python. I think there is some extra config for MSK which datahub requires, but idk what.
m
@curved-jordan-15657 when we checked we were facing the same problem when trying to produce to the datahub topic (using python)
so can you produce to the datahub topic using python?
the topic name is : MetadataChangeEvent_v4
c
yeah we’ve configured kafka as you mentioned in docs
m
right.. but are you able to produce anything using python to that topic?
c
yes exactly
m
e.g.
kafkacat -b <broker_address> -t MetadataChangeEvent_v4 -P
is working?
👀 1
c
lets try
@mammoth-bear-12532 nothing happens with that…
m
@curved-jordan-15657: so clearly
kafkacat
is also failing to produce to that topic
c
Hello! Did any of you who were seeing this error figure it out or work around it anyhow? My team and I just ran up against it, as well. Our setup sounds similar. We are using MSK with no SSL and confluent schema registry deployed on eks.
e
Hey Martha! We realized replication factor was the issue. Once its set to 2 or more, it seems to work. More details in this thread https://datahubspace.slack.com/archives/CUMUWQU66/p1629903665478500
m
@colossal-account-65055 Happy Friday! Let us know if this solved it for you... or if you'd like us to get on a call and debug live we could do that! @early-lamp-41924 and I have spent countless hours debugging this and now know how to fix it 🙂
c
Hey Shirshanka and Dexter! I haven't quite gotten to the bottom of it, maybe a call would be good. I pulled in your changes but I now see the following error -- this is when running the helm charts via terraform:
Copy code
╷
│ Error: failed pre-install: warning: Hook pre-install datahub/templates/kafka-setup-job.yml failed: Job in version "v1" cannot be handled as a Job: v1.Job.Spec: v1.JobSpec.Template: v1.PodTemplateSpec.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Env: []v1.EnvVar: v1.EnvVar.v1.EnvVar.Value: ReadString: expects " or n, but found 3, error found in #10 byte of ...|,"value":3},{"name":|..., bigger context ...|<http://mazonaws.com:9092%22|mazonaws.com:9092">},{"name":"PARTITIONS","value":3},{"name":"REPLICATION_FACTOR","value":3}],"image"|...
│ 
│   with module.datahub.helm_release.datahub_core,
│   on ../../datahub/datahub_helm.tf line 37, in resource "helm_release" "datahub_core":
│   37: resource "helm_release" "datahub_core" {
│ 
╵
e
I see the issue. Fixing now
@colossal-account-65055 can you try again?
You need to delete the topics before though
c
Hey @early-lamp-41924, thank you! Yes I tried again. After pulling your changes, the deployment now succeeds, but I still get the same timeout error when trying to ingest to kafka. I think this is probably because of our particular deployment setup with helm and terraform. We run the helm charts from terraform (using the tf
helm_release
resource). We disable Kafka in the prerequisites chart, because we are using MSK, so we don't want Kafka deployed within the EKS cluster. Separately, we also deploy MSK from terraform using the
aws_msk_cluster
resource. The kafka setup job is still enabled in helm, but I think that when we set these values in helm, they aren't making it all the way to MSK, because checking in the amazon console, I can see that MSK still uses the default MSK configuration.
e
Can you check the exact configuration of each topic?
I usually use kafkacat
kafkacat -L -b <<bootstrap-server-address>>
Check whether the MetadataChangeEvent topic has more than 1 replication factor
c
hum, perhaps something else is afoot. from your kafkacat command I'm getting the error
% ERROR: Failed to acquire metadata: Local: Broker transport failure
m
@colossal-account-65055 @early-lamp-41924 are both of you free to chat live about this at 1pm Pacific / 4pm Eastern?
c
Yes I am free!
cc @acceptable-football-40437 if you would like to join
👍 1
e
Update: Unfortunately, we couldn’t find a way for defaults to work both on the quickstart and AWS MSK. When using a multinode AWS MSK (by default it’s 3), please explicitly set global.kafka.partitions and global.kafka.replicationFactor to 3
👀 1
a
@early-lamp-41924, where’s the documentation on AWS to add the
Copy code
global.kafka.partitions and global.kafka.replicationFactor
? https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html according to this doc, is it
default.replication.factor
and
num.partitions
? I tried the global and it failed.
e
@adventurous-scooter-52064 They are commented out in values.yaml. We will add docs asap. They are global.kafka.partitions and global.kafka.replicationFactor
h
@early-lamp-41924 - Could this be the root cause ? Can you share the documentation to resolve this ?
Check out the final paragraph in this section!
h
@early-lamp-41924 - kcat -L -b <broker list> topic "MetadataChangeEvent_v4" with 3 partitions: partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1 partition 1, leader 1, replicas: 1,2,3, isrs: 1,2,3 partition 2, leader 3, replicas: 3,1,2, isrs: 3,1,2
e
This looks good to me
Are you still seeing the issue?
h
Yes unfortunately.
@early-lamp-41924 - Got it working for a cluster with Plaintext. Now trying with TLS enabled, Any special configurations for TLS enabled ?
@early-lamp-41924 - Quick question - When I got it working for a cluster with no TLS enabled I got these logs , are these expected, it seems to be writing to rest rather than kafka-
Copy code
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-22T20:28:39.273809+00:00
[2022-03-22 20:28:41,586] {logging_mixin.py:104} INFO -  Pipeline config is {'source': {'type': 'glue', 'config': {'env': 'PROD', 'aws_region': 'us-east-1', 'extract_transforms': 'false', 'table_pattern': {'allow': ['^tablename*'], 'ignoreCase': 'false'}}}, 'transformers': [{'type': 'simple_remove_dataset_ownership', 'config': {}}, {'type': 'simple_add_dataset_ownership', 'config': {'owner_urns': ['urn:li:corpuser:testuser']}}, {'type': 'set_dataset_browse_path', 'config': {'path_templates': ['/Platform/PLATFORM/DATASET_PARTS']}}], 'sink': {'type': 'datahub-kafka', 'config': {'connection': {'bootstrap': '<bootstrapserver>', 'schema_registry_url': 'https://<schemaregistry url>'}}}}
[2022-03-22 20:30:11,741] {pipeline.py:76} INFO - sink wrote workunit <tablename>
[2022-03-22 20:30:11,831] {_lineage_core.py:295} INFO - DataHub lineage backend - emitting metadata:
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {"urn": "urn:li:dataFlow:(airflow,metadata_push_ingestion_dag,prod)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataFlowInfo": {"customProperties": {"fileloc": "'/home/ec2-user/airflow/dags/metadata_push_ingestion_dag.py'", "timezone": "'UTC'", "catchup": "False", "tags": "[ 'ingestion']", "start_date": "1647734400.0", "_access_control": "None", "_concurrency": "64", "_default_view": "'tree'", "is_paused_upon_creation": "None"}, "externalUrl": "https://<airflow url>:443/tree?dag_id=hdc_da_metadata_push_ingestion_dag", "name": "metadata_push_ingestion_dag", "description": "DAG to ingest the metadata from multiple datasource\n\n", "project": null}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:da", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "metadata_push_ingestion_dag.py"}}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": [{"tag": "urn:li:tag:metadata"}, {"tag": "urn:li:tag:ingestion"}]}}]}}, "proposedDelta": null, "systemMetadata": null}
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": {"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,metadata_push_ingestion_dag,prod),ingest_metadata)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataJobInfo": {"customProperties": {"_outlets": "[]", "task_id": "'ingest_metadata'", "execution_timeout": "7200.0", "_downstream_task_ids": "[]", "email": "['testemail']", "label": "'ingest_metadata'", "_inlets": "[]", "_task_type": "'_PythonDecoratedOperator'", "_task_module": "'airflow.operators.python'", "sla": "None", "wait_for_downstream": "False", "trigger_rule": "'all_success'", "start_date": "datetime.datetime(2022, 3, 20, 0, 0, tzinfo=Timezone('UTC'))", "end_date": "None", "depends_on_past": "False"}, "externalUrl": "<https://airflowurl/taskinstance/list/?flt1_dag_id_equals=metadata_push_ingestion_dag&_flt_3_task_id=ingest_metadata>", "name": "ingest_metadata", "description": null, "type": {"string": "COMMAND"}, "flowUrn": null, "status": null}}, {"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {"inputDatasets": [], "outputDatasets": [], "inputDatajobs": []}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:test", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "metadata_push_ingestion_dag.py"}}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": [{"tag": "urn:li:tag:metadata"}, {"tag": "urn:li:tag:ingestion"}]}}]}}, "proposedDelta": null, "systemMetadata": null}
[2022-03-22 20:30:11,836] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gmshost>, Port: None, Schema: None, Login: None, Password: None, extra: None
[2022-03-22 20:30:11,840] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gms-host>, Port: None, Schema: None, Login: None, Password: None, extra: None
[2022-03-22 20:30:11,957] {taskinstance.py:1192} INFO - Marking task as SUCCESS. dag_id=metadata_push_ingestion_dag, task_id=ingest_metadata, execution_date=20220322T202839, start_date=20220322T202841, end_date=20220322T203011
[2022-03-22 20:30:11,976] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2022-03-22 20:30:11,986] {local_task_job.py:146} INFO - Task exited with return code 0
e
how are you configuring your airflow to use the datahub-kafka sink?
h
@early-lamp-41924 This is our recipe file -
Copy code
"source": {
    "type": "glue",
    "config": {
      "env": "PROD",
      "aws_region": "us-east-1",
      "extract_transforms": "false",
      "table_pattern": {
        "allow": [
          "^tablename*"
        ],
        "ignoreCase": "false"
      }
    }
  },
  "transformers": [
    {
      "type": "simple_remove_dataset_ownership",
      "config": {}
    },
    {
      "type": "simple_add_dataset_ownership",
      "config": {
        "owner_urns": [
          "urn:li:corpuser:user"
        ]
      }
    },
    {
      "type": "set_dataset_browse_path",
      "config": {
        "path_templates": [
          "/Platform/PLATFORM/DATASET_PARTS"
        ]
      }
    }
  ],
    "sink": {
        "type": "datahub-kafka",
        "config": {
            "connection": {
                "bootstrap": "<bootstrap server>",
                "schema_registry_url": "<schema registry>"
            }
        }
    }
}
And we have Kafka connection added as well
e
@dazzling-judge-80093 whenever you are back online. could you take a look at this one?
d
Maybe I’m not right but here it seems like lineage is enabled as well and you try to send metadata from the dag itself as well. For the lineage backend you should set
AIRFLOW__LINEAGE__DATAHUB_KWARGS={"datahub_conn_id":"datahub_kafka_default"}
to use kafka. I assume this based on the logs where the dag successfully sent in the metadata:
Copy code
[2022-03-22 20:30:11,741] {pipeline.py:76} INFO - sink wrote workunit <tablename>
and after the successful airflow task run the lineage backend tried to emit metadata as well:
Copy code
[2022-03-22 20:30:11,831] {_lineage_core.py:295} INFO - DataHub lineage backend - emitting metadata:
And as only the lineage backend uses airflow connections I think the lineage backend is the one which is not set up to use kafka ->
Copy code
[2022-03-22 20:30:11,836] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gmshost>, Port: None, Schema: None, Login: None, Password: None, extra: None
h
@dazzling-judge-80093 - Got it. The lineage is where it was using the rest connection.
d
Exactly