adventurous-scooter-52064
08/03/2021, 8:49 AMproducer_config in order to use datahub-kafka?green-football-43791
08/03/2021, 2:50 PMadventurous-scooter-52064
08/04/2021, 1:28 AMearly-lamp-41924
08/04/2021, 3:53 PMearly-lamp-41924
08/04/2021, 3:54 PMadventurous-scooter-52064
08/05/2021, 12:04 AMadventurous-scooter-52064
08/05/2021, 12:05 AMadventurous-scooter-52064
08/05/2021, 12:06 AMadventurous-scooter-52064
08/05/2021, 12:07 AMenv:
- name: MCE_CONSUMER_ENABLED
value: "true"
- name: KAFKA_BOOTSTRAP_SERVER
value: "{{ $kafka_uri }}"
- name: KAFKA_SCHEMAREGISTRY_URL
value: "http://{{ $service_name }}-cp-schema-registry:{{ $schema_registry_port }}"
- name: GMS_HOST
value: "{{ $service_name }}-gms"
- name: GMS_PORT
value: "8080"early-lamp-41924
08/05/2021, 12:11 AMearly-lamp-41924
08/05/2021, 12:14 AMearly-lamp-41924
08/05/2021, 12:15 AMmammoth-bear-12532
mammoth-bear-12532
adventurous-scooter-52064
08/05/2021, 12:36 AMadventurous-scooter-52064
08/05/2021, 2:51 AMKafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"} and info {'error': KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"},mammoth-bear-12532
adventurous-scooter-52064
08/05/2021, 3:10 AMsource:
type: glue
config:
aws_region: "<<region_name>>"
extract_transforms: False
env: "PROD"
sink:
type: "datahub-kafka"
config:
connection:
bootstrap: "<http://blahblah-2.amazonaws.com:9092,blahblah-3.amazonaws.com:9092,blahblah-1.amazonaws.com:9092|blahblah-2.amazonaws.com:9092,blahblah-3.amazonaws.com:9092,blahblah-1.amazonaws.com:9092>"
schema_registry_url: "<http://datahub-cp-schema-registry:8081>"adventurous-scooter-52064
08/05/2021, 3:26 AMadventurous-scooter-52064
08/05/2021, 3:26 AMcurved-jordan-15657
08/13/2021, 7:51 AMadventurous-scooter-52064
08/13/2021, 11:35 AMcurved-jordan-15657
08/13/2021, 2:06 PMable-park-49455
08/16/2021, 9:06 AMmammoth-bear-12532
early-lamp-41924
08/18/2021, 3:20 AMearly-lamp-41924
08/18/2021, 3:20 AMadventurous-scooter-52064
08/18/2021, 4:33 AMcurved-jordan-15657
08/18/2021, 7:23 AMmammoth-bear-12532
mammoth-bear-12532
mammoth-bear-12532
curved-jordan-15657
08/18/2021, 7:34 AMmammoth-bear-12532
curved-jordan-15657
08/18/2021, 7:36 AMmammoth-bear-12532
kafkacat -b <broker_address> -t MetadataChangeEvent_v4 -P is working?curved-jordan-15657
08/18/2021, 7:36 AMcurved-jordan-15657
08/18/2021, 8:04 AMmammoth-bear-12532
kafkacat is also failing to produce to that topiccolossal-account-65055
08/27/2021, 5:36 PMearly-lamp-41924
08/27/2021, 5:50 PMmammoth-bear-12532
colossal-account-65055
08/30/2021, 4:00 PM╷
│ Error: failed pre-install: warning: Hook pre-install datahub/templates/kafka-setup-job.yml failed: Job in version "v1" cannot be handled as a Job: v1.Job.Spec: v1.JobSpec.Template: v1.PodTemplateSpec.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Env: []v1.EnvVar: v1.EnvVar.v1.EnvVar.Value: ReadString: expects " or n, but found 3, error found in #10 byte of ...|,"value":3},{"name":|..., bigger context ...|<http://mazonaws.com:9092%22|mazonaws.com:9092">},{"name":"PARTITIONS","value":3},{"name":"REPLICATION_FACTOR","value":3}],"image"|...
│
│ with module.datahub.helm_release.datahub_core,
│ on ../../datahub/datahub_helm.tf line 37, in resource "helm_release" "datahub_core":
│ 37: resource "helm_release" "datahub_core" {
│
╵early-lamp-41924
08/30/2021, 4:41 PMearly-lamp-41924
08/30/2021, 4:49 PMearly-lamp-41924
08/30/2021, 4:50 PMcolossal-account-65055
08/30/2021, 7:04 PMhelm_release resource). We disable Kafka in the prerequisites chart, because we are using MSK, so we don't want Kafka deployed within the EKS cluster. Separately, we also deploy MSK from terraform using the aws_msk_cluster resource. The kafka setup job is still enabled in helm, but I think that when we set these values in helm, they aren't making it all the way to MSK, because checking in the amazon console, I can see that MSK still uses the default MSK configuration.early-lamp-41924
08/30/2021, 7:05 PMearly-lamp-41924
08/30/2021, 7:05 PMearly-lamp-41924
08/30/2021, 7:05 PMearly-lamp-41924
08/30/2021, 7:05 PMcolossal-account-65055
08/30/2021, 7:10 PM% ERROR: Failed to acquire metadata: Local: Broker transport failuremammoth-bear-12532
colossal-account-65055
08/30/2021, 7:21 PMcolossal-account-65055
08/30/2021, 7:21 PMearly-lamp-41924
08/31/2021, 10:32 PMadventurous-scooter-52064
09/06/2021, 5:39 AMglobal.kafka.partitions and global.kafka.replicationFactor
?
https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html
according to this doc, is it default.replication.factor and num.partitions ? I tried the global and it failed.adventurous-scooter-52064
09/06/2021, 5:39 AMearly-lamp-41924
09/06/2021, 10:13 PMhandsome-football-66174
03/21/2022, 8:16 PMearly-lamp-41924
03/22/2022, 4:12 AMearly-lamp-41924
03/22/2022, 4:12 AMhandsome-football-66174
03/22/2022, 5:20 PMearly-lamp-41924
03/22/2022, 5:44 PMearly-lamp-41924
03/22/2022, 5:44 PMhandsome-football-66174
03/22/2022, 6:09 PMhandsome-football-66174
03/22/2022, 8:23 PMhandsome-football-66174
03/23/2022, 4:47 PMAIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-22T20:28:39.273809+00:00
[2022-03-22 20:28:41,586] {logging_mixin.py:104} INFO - Pipeline config is {'source': {'type': 'glue', 'config': {'env': 'PROD', 'aws_region': 'us-east-1', 'extract_transforms': 'false', 'table_pattern': {'allow': ['^tablename*'], 'ignoreCase': 'false'}}}, 'transformers': [{'type': 'simple_remove_dataset_ownership', 'config': {}}, {'type': 'simple_add_dataset_ownership', 'config': {'owner_urns': ['urn:li:corpuser:testuser']}}, {'type': 'set_dataset_browse_path', 'config': {'path_templates': ['/Platform/PLATFORM/DATASET_PARTS']}}], 'sink': {'type': 'datahub-kafka', 'config': {'connection': {'bootstrap': '<bootstrapserver>', 'schema_registry_url': 'https://<schemaregistry url>'}}}}
[2022-03-22 20:30:11,741] {pipeline.py:76} INFO - sink wrote workunit <tablename>
[2022-03-22 20:30:11,831] {_lineage_core.py:295} INFO - DataHub lineage backend - emitting metadata:
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {"urn": "urn:li:dataFlow:(airflow,metadata_push_ingestion_dag,prod)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataFlowInfo": {"customProperties": {"fileloc": "'/home/ec2-user/airflow/dags/metadata_push_ingestion_dag.py'", "timezone": "'UTC'", "catchup": "False", "tags": "[ 'ingestion']", "start_date": "1647734400.0", "_access_control": "None", "_concurrency": "64", "_default_view": "'tree'", "is_paused_upon_creation": "None"}, "externalUrl": "https://<airflow url>:443/tree?dag_id=hdc_da_metadata_push_ingestion_dag", "name": "metadata_push_ingestion_dag", "description": "DAG to ingest the metadata from multiple datasource\n\n", "project": null}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:da", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "metadata_push_ingestion_dag.py"}}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": [{"tag": "urn:li:tag:metadata"}, {"tag": "urn:li:tag:ingestion"}]}}]}}, "proposedDelta": null, "systemMetadata": null}
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": {"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,metadata_push_ingestion_dag,prod),ingest_metadata)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataJobInfo": {"customProperties": {"_outlets": "[]", "task_id": "'ingest_metadata'", "execution_timeout": "7200.0", "_downstream_task_ids": "[]", "email": "['testemail']", "label": "'ingest_metadata'", "_inlets": "[]", "_task_type": "'_PythonDecoratedOperator'", "_task_module": "'airflow.operators.python'", "sla": "None", "wait_for_downstream": "False", "trigger_rule": "'all_success'", "start_date": "datetime.datetime(2022, 3, 20, 0, 0, tzinfo=Timezone('UTC'))", "end_date": "None", "depends_on_past": "False"}, "externalUrl": "<https://airflowurl/taskinstance/list/?flt1_dag_id_equals=metadata_push_ingestion_dag&_flt_3_task_id=ingest_metadata>", "name": "ingest_metadata", "description": null, "type": {"string": "COMMAND"}, "flowUrn": null, "status": null}}, {"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {"inputDatasets": [], "outputDatasets": [], "inputDatajobs": []}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:test", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "metadata_push_ingestion_dag.py"}}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": [{"tag": "urn:li:tag:metadata"}, {"tag": "urn:li:tag:ingestion"}]}}]}}, "proposedDelta": null, "systemMetadata": null}
[2022-03-22 20:30:11,836] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gmshost>, Port: None, Schema: None, Login: None, Password: None, extra: None
[2022-03-22 20:30:11,840] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gms-host>, Port: None, Schema: None, Login: None, Password: None, extra: None
[2022-03-22 20:30:11,957] {taskinstance.py:1192} INFO - Marking task as SUCCESS. dag_id=metadata_push_ingestion_dag, task_id=ingest_metadata, execution_date=20220322T202839, start_date=20220322T202841, end_date=20220322T203011
[2022-03-22 20:30:11,976] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2022-03-22 20:30:11,986] {local_task_job.py:146} INFO - Task exited with return code 0early-lamp-41924
03/23/2022, 6:05 PMhandsome-football-66174
03/23/2022, 7:10 PM"source": {
"type": "glue",
"config": {
"env": "PROD",
"aws_region": "us-east-1",
"extract_transforms": "false",
"table_pattern": {
"allow": [
"^tablename*"
],
"ignoreCase": "false"
}
}
},
"transformers": [
{
"type": "simple_remove_dataset_ownership",
"config": {}
},
{
"type": "simple_add_dataset_ownership",
"config": {
"owner_urns": [
"urn:li:corpuser:user"
]
}
},
{
"type": "set_dataset_browse_path",
"config": {
"path_templates": [
"/Platform/PLATFORM/DATASET_PARTS"
]
}
}
],
"sink": {
"type": "datahub-kafka",
"config": {
"connection": {
"bootstrap": "<bootstrap server>",
"schema_registry_url": "<schema registry>"
}
}
}
}handsome-football-66174
03/23/2022, 7:15 PMearly-lamp-41924
03/23/2022, 7:16 PMdazzling-judge-80093
03/24/2022, 8:21 AMAIRFLOW__LINEAGE__DATAHUB_KWARGS={"datahub_conn_id":"datahub_kafka_default"} to use kafka.
I assume this based on the logs where the dag successfully sent in the metadata:
[2022-03-22 20:30:11,741] {pipeline.py:76} INFO - sink wrote workunit <tablename>
and after the successful airflow task run the lineage backend tried to emit metadata as well:
[2022-03-22 20:30:11,831] {_lineage_core.py:295} INFO - DataHub lineage backend - emitting metadata:
And as only the lineage backend uses airflow connections I think the lineage backend is the one which is not set up to use kafka ->
[2022-03-22 20:30:11,836] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: https://<gmshost>, Port: None, Schema: None, Login: None, Password: None, extra: Nonehandsome-football-66174
03/24/2022, 12:36 PMdazzling-judge-80093
03/24/2022, 12:36 PM