steep-pizza-15641
04/23/2021, 2:03 PMmammoth-bear-12532
steep-pizza-15641
04/23/2021, 2:21 PMgray-shoe-75895
04/23/2021, 4:05 PMsteep-pizza-15641
04/23/2021, 4:06 PMgray-shoe-75895
04/23/2021, 4:08 PMsteep-pizza-15641
04/23/2021, 4:09 PMgray-shoe-75895
04/23/2021, 6:09 PM❯ cat Dockerfile
FROM apache/airflow:2.0.2
RUN pip install --no-cache-dir --user apache-airflow==2.0.2 acryl-datahub[airflow]
❯ docker build -t airflow-2-with-datahub-image .
steep-pizza-15641
04/23/2021, 7:23 PMsteep-pizza-15641
04/23/2021, 9:58 PMgray-shoe-75895
04/23/2021, 10:13 PMsteep-pizza-15641
04/23/2021, 10:21 PMairflow-worker_1 | [2021-04-23 22:15:30,364: WARNING/ForkPoolWorker-15] Running <TaskInstance: postgres_operator_dag_airflow_lineage.populate_target 2021-04-23T22:15:23.834475+00:00 [queued]> on host addb60ba09c5
datahub-mae-consumer | 22:15:30.626 [mae-consumer-job-client-0-C-1] INFO c.l.m.k.MetadataAuditEventsProcessor - {com.linkedin.metadata.snapshot.DataJobSnapshot={urn=urn:li:dataJob:(urn:li:dataFlow:(airflow,postgres_operator_dag_airflow_lineage,prod),populate_target), aspects=[{com.linkedin.common.Ownership={owners=[{owner=airflow, type=DEVELOPER, source={type=SERVICE, url=postgres_myapp_airflow_lineage.py}}], lastModified={actor=urn:li:corpuser:airflow, time=1619216123834}}}]}}
datahub-mae-consumer | 22:15:30.627 [mae-consumer-job-client-0-C-1] ERROR c.l.m.k.MetadataAuditEventsProcessor - Error in getting documents from snapshot: com.linkedin.data.template.TemplateOutputCastException: Invalid URN syntax: Urn doesn't start with 'urn:'. Urn: airflow at index 0: airflow
datahub-mae-consumer | 22:15:30.628 [mae-consumer-job-client-0-C-1] ERROR c.l.m.k.MetadataAuditEventsProcessor - com.linkedin.data.template.TemplateOutputCastException: Invalid URN syntax: Urn doesn't start with 'urn:'. Urn: airflow at index 0: airflow [com.linkedin.common.urn.UrnCoercer.coerceOutput(UrnCoercer.java:25), com.linkedin.common.urn.UrnCoercer.coerceOutput(UrnCoercer.java:11), com.linkedin.data.template.DataTemplateUtil.coerceOutput(DataTemplateUtil.java:954), com.linkedin.data.template.RecordTemplate.obtainCustomType(RecordTemplate.java:365), com.linkedin.common.Owner.getOwner(Owner.java:86), com.linkedin.metadata.builders.graph.relationship.OwnedByBuilderFromOwnership.lambda$buildRelationships$0(OwnedByBuilderFromOwnership.java:29), java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174), java.util.Iterator.forEachRemaining(Iterator.java:116), java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801), java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482), java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472), java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708), java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234), java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499), com.linkedin.metadata.builders.graph.relationship.OwnedByBuilderFromOwnership.buildRelationships(OwnedByBuilderFromOwnership.java:31), com.linkedin.metadata.builders.graph.relationship.OwnedByBuilderFromOwnership.buildRelationships(OwnedByBuilderFromOwnership.java:15), com.linkedin.metadata.builders.graph.BaseGraphBuilder.build(BaseGraphBuilder.java:51), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.updateNeo4j(MetadataAuditEventsProcessor.java:86), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.consume(MetadataAuditEventsProcessor.java:70), sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method), sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62), sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:498), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120), org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48), org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1327), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1307), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1267), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1248), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1162), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:971), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:775), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708), java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), java.util.concurrent.FutureTask.run(FutureTask.java:266), java.lang.Thread.run(Thread.java:748)]
steep-pizza-15641
04/23/2021, 10:22 PMpopulate_target_table = PostgresOperator(
task_id="populate_target",
postgres_conn_id="postgres_myapp",
sql=\
"""
insert into target_table_b (id, english, spanish)
select source_table_c.id, source_table_c.col1, source_table_d.col1
from source_table_c
join
source_table_d
on
source_table_c.id = source_table_d.id
""",
inlets={
"datasets": [
Dataset("postgresql", "myapp.public.source_table_c"),
Dataset("postgresql", "myapp.public.source_table_d"),
],
},
outlets={"datasets": [Dataset("postgresql", "myapp.public.target_table_a")]},
)
gray-shoe-75895
04/23/2021, 10:34 PMsteep-pizza-15641
04/23/2021, 11:15 PM[2021-04-23 22:15:30,555] {lineage_backend.py:139} INFO - DataHub lineage backend - emitting metadata:
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {"urn": "urn:li:dataFlow:(airflow,postgres_operator_dag_airflow_lineage,prod)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataFlowInfo": {"name": "postgres_operator_dag_airflow_lineage", "description": "None\n\nNone", "project": null}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "airflow", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "postgres_myapp_airflow_lineage.py"}}], "lastModified": {"time": 1619216123834, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}]}}, "proposedDelta": null}
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": {"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,postgres_operator_dag_airflow_lineage,prod),populate_target)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataJobInfo": {"name": "populate_target", "description": null, "type": {"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "COMMAND"}}}, {"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {"inputDatasets": [], "outputDatasets": []}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "airflow", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "postgres_myapp_airflow_lineage.py"}}], "lastModified": {"time": 1619216123834, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}]}}, "proposedDelta": null}
[2021-04-23 22:15:30,562] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: <http://datahub-gms:8080>, Port: None, Schema: , Login: , Password: None, extra: None
[2021-04-23 22:15:30,570] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: <http://datahub-gms:8080>, Port: None, Schema: , Login: , Password: None, extra: None
[2021-04-23 22:15:30,636] {taskinstance.py:1192} INFO - Marking task as SUCCESS. dag_id=postgres_operator_dag_airflow_lineage, task_id=populate_target, execution_date=20210423T221523, start_date=20210423T221530, end_date=20210423T221530
[2021-04-23 22:15:30,666] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-04-23 22:15:30,691] {local_task_job.py:146} INFO - Task exited with return code 0
gray-shoe-75895
04/23/2021, 11:22 PMgray-shoe-75895
04/23/2021, 11:30 PMgray-shoe-75895
04/23/2021, 11:30 PMsteep-pizza-15641
04/23/2021, 11:31 PMgray-shoe-75895
04/24/2021, 12:12 AMsteep-pizza-15641
04/24/2021, 12:28 AM'ERROR :: /snapshot/aspects/2 :: "com.linkedin.common.GlobalTags" is not a member type of union
gray-shoe-75895
04/24/2021, 12:29 AMsteep-pizza-15641
04/24/2021, 12:31 AMdatahub-gms:
build:
context: ../
dockerfile: docker/datahub-gms/Dockerfile
image: linkedin/datahub-gms:${DATAHUB_VERSION:-latest}
steep-pizza-15641
04/24/2021, 12:31 AMgray-shoe-75895
04/24/2021, 1:21 AMsteep-pizza-15641
04/24/2021, 2:35 AM[2021-04-24 02:29:49,640] {lineage_backend.py:182} INFO - DataHub lineage backend - emitting metadata:
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {"urn": "urn:li:dataFlow:(airflow,postgres_operator_dag_airflow_lineage,prod)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataFlowInfo": {"customProperties": {"catchup": "False", "fileloc": "'/opt/airflow/dags/postgres_myapp_airflow_lineage.py'", "_task_group": "{'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'create_source_c': (<DagAttributeTypes.OP: 'operator'>, 'create_source_c'), 'create_source_d': (<DagAttributeTypes.OP: 'operator'>, 'create_source_d'), 'create_target_b': (<DagAttributeTypes.OP: 'operator'>, 'create_target_b'), 'truncate_target': (<DagAttributeTypes.OP: 'operator'>, 'truncate_target'), 'populate_target': (<DagAttributeTypes.OP: 'operator'>, 'populate_target')}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}", "timezone": "'UTC'", "start_date": "1580601600.0", "default_args": "{<Encoding.VAR: '__var'>: {'owner': 'airflow'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}", "schedule_interval": "'@once'", "_dag_id": "'postgres_operator_dag_airflow_lineage'", "tasks": "[{'pool': 'default_pool', 'ui_color': '#ededed', 'ui_fgcolor': '#000', 'task_id': 'create_source_c', '_downstream_task_ids': ['truncate_target'], 'template_fields': ['sql'], '_outlets': [], 'label': 'create_source_c', 'owner': 'airflow', '_inlets': [], 'template_fields_renderers': {'sql': 'sql'}, '_task_type': 'PostgresOperator', '_task_module': 'airflow.providers.postgres.operators.postgres', '_is_dummy': False, 'sql': '\\ncreate table if not exists source_table_c(id integer, col1 varchar(255));\\n '}, {'pool': 'default_pool', 'ui_color': '#ededed', 'ui_fgcolor': '#000', 'task_id': 'create_source_d', '_downstream_task_ids': ['truncate_target'], 'template_fields': ['sql'], '_outlets': [], 'label': 'create_source_d', 'owner': 'airflow', '_inlets': [], 'template_fields_renderers': {'sql': 'sql'}, '_task_type': 'PostgresOperator', '_task_module': 'airflow.providers.postgres.operators.postgres', '_is_dummy': False, 'sql': '\\ncreate table if not exists source_table_d(id integer, col1 varchar(255));\\n '}, {'pool': 'default_pool', 'ui_color': '#ededed', 'ui_fgcolor': '#000', 'task_id': 'create_target_b', '_downstream_task_ids': ['truncate_target'], 'template_fields': ['sql'], '_outlets': [], 'label': 'create_target_b', 'owner': 'airflow', '_inlets': [], 'template_fields_renderers': {'sql': 'sql'}, '_task_type': 'PostgresOperator', '_task_module': 'airflow.providers.postgres.operators.postgres', '_is_dummy': False, 'sql': '\\ncreate table if not exists target_table_b(id integer, english varchar(255), spanish varchar(255));\\n '}, {'pool': 'default_pool', 'ui_color': '#ededed', 'ui_fgcolor': '#000', 'task_id': 'truncate_target', '_downstream_task_ids': ['populate_target'], 'template_fields': ['sql'], '_outlets': [], 'label': 'truncate_target', 'owner': 'airflow', '_inlets': [], 'template_fields_renderers': {'sql': 'sql'}, '_task_type': 'PostgresOperator', '_task_module': 'airflow.providers.postgres.operators.postgres', '_is_dummy': False, 'sql': '\\ntruncate table target_table_b;\\n '}, {'pool': 'default_pool', 'ui_color': '#ededed', 'ui_fgcolor': '#000', 'task_id': 'populate_target', '_downstream_task_ids': [], 'template_fields': ['sql'], '_outlets': [{<Encoding.VAR: '__var'>: {'datasets': [\"Dataset(platform='postgresql', name='myapp.public.target_table_a', env='PROD')\"]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}], 'label': 'populate_target', 'owner': 'airflow', '_inlets': [{<Encoding.VAR: '__var'>: {'datasets': [\"Dataset(platform='postgresql', name='myapp.public.source_table_c', env='PROD')\", \"Dataset(platform='postgresql', name='myapp.public.source_table_d', env='PROD')\"]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}], 'template_fields_renderers': {'sql': 'sql'}, '_task_type': 'PostgresOperator', '_task_module': 'airflow.providers.postgres.operators.postgres', '_is_dummy': False, 'sql': '\\ninsert into target_table_b (id, english, spanish)\\nselect source_table_c.id, source_table_c.col1, source_table_d.col1\\nfrom source_table_c\\njoin\\nsource_table_d\\non\\nsource_table_c.id = source_table_d.id\\n '}]", "end_date": "None", "tags": "None", "max_active_runs": "16", "params": "{}", "_access_control": "None", "orientation": "'LR'", "dagrun_timeout": "None", "doc_md": "None", "_concurrency": "16", "_description": "None", "_default_view": "'tree'", "is_paused_upon_creation": "None"}, "externalUrl": "<http://localhost:8080/tree?dag_id=postgres_operator_dag_airflow_lineage>", "name": "postgres_operator_dag_airflow_lineage", "description": "None\n\n", "project": null}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:airflow", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "postgres_myapp_airflow_lineage.py"}}], "lastModified": {"time": 1619231386689, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": []}}]}}, "proposedDelta": null}
{"auditHeader": null, "proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": {"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,postgres_operator_dag_airflow_lineage,prod),populate_target)", "aspects": [{"com.linkedin.pegasus2avro.datajob.DataJobInfo": {"customProperties": {"pool": "'default_pool'", "ui_color": "'#ededed'", "ui_fgcolor": "'#000'", "task_id": "'populate_target'", "_downstream_task_ids": "[]", "template_fields": "['sql']", "_outlets": "[{<Encoding.VAR: '__var'>: {'datasets': [\"Dataset(platform='postgresql', name='myapp.public.target_table_a', env='PROD')\"]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]", "label": "'populate_target'", "owner": "'airflow'", "_inlets": "[{<Encoding.VAR: '__var'>: {'datasets': [\"Dataset(platform='postgresql', name='myapp.public.source_table_c', env='PROD')\", \"Dataset(platform='postgresql', name='myapp.public.source_table_d', env='PROD')\"]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]", "template_fields_renderers": "{'sql': 'sql'}", "_task_type": "'PostgresOperator'", "_task_module": "'airflow.providers.postgres.operators.postgres'", "_is_dummy": "False", "sql": "'\\ninsert into target_table_b (id, english, spanish)\\nselect source_table_c.id, source_table_c.col1, source_table_d.col1\\nfrom source_table_c\\njoin\\nsource_table_d\\non\\nsource_table_c.id = source_table_d.id\\n '", "executor_config": "{}", "end_date": "None", "retry_exponential_backoff": "False", "max_retry_delay": "None", "depends_on_past": "False", "doc_json": "None", "doc_yaml": "None", "run_as_user": "None", "queue": "'default'", "_log": "<Logger airflow.task.operators (INFO)>", "retries": "0", "doc": "None", "sla": "None", "pool_slots": "1", "email_on_failure": "True", "subdag": "None", "on_execute_callback": "None", "doc_md": "None", "on_failure_callback": "None", "execution_timeout": "None", "resources": "None", "email_on_retry": "True", "start_date": "DateTime(2020, 2, 2, 0, 0, 0, tzinfo=Timezone('UTC'))", "trigger_rule": "'all_success'", "on_retry_callback": "None", "weight_rule": "'downstream'", "task_concurrency": "None", "doc_rst": "None", "priority_weight": "1", "on_success_callback": "None", "email": "None", "retry_delay": "datetime.timedelta(0, 300)", "wait_for_downstream": "False", "params": "{}", "do_xcom_push": "True"}, "externalUrl": "<http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=postgres_operator_dag_airflow_lineage&_flt_3_task_id=populate_target>", "name": "populate_target", "description": null, "type": {"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "COMMAND"}}}, {"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {"inputDatasets": [], "outputDatasets": []}}, {"com.linkedin.pegasus2avro.common.Ownership": {"owners": [{"owner": "urn:li:corpuser:airflow", "type": "DEVELOPER", "source": {"type": "SERVICE", "url": "postgres_myapp_airflow_lineage.py"}}], "lastModified": {"time": 1619231386689, "actor": "urn:li:corpuser:airflow", "impersonator": null}}}, {"com.linkedin.pegasus2avro.common.GlobalTags": {"tags": []}}]}}, "proposedDelta": null}
[2021-04-24 02:29:49,645] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: <http://datahub-gms:8080>, Port: None, Schema: , Login: , Password: None, extra: None
[2021-04-24 02:29:49,651] {base.py:78} INFO - Using connection to: id: datahub_rest_default. Host: <http://datahub-gms:8080>, Port: None, Schema: , Login: , Password: None, extra: None
[2021-04-24 02:29:49,715] {taskinstance.py:1192} INFO - Marking task as SUCCESS. dag_id=postgres_operator_dag_airflow_lineage, task_id=populate_target, execution_date=20210424T022946, start_date=20210424T022949, end_date=20210424T022949
[2021-04-24 02:29:49,739] {taskinstance.py:1246} INFO - 0 downstream tasks scheduled from follow-on schedule check
steep-pizza-15641
04/24/2021, 2:35 AMsteep-pizza-15641
04/24/2021, 2:38 AMsteep-pizza-15641
04/24/2021, 2:41 AMgray-shoe-75895
04/24/2021, 9:11 PMsteep-pizza-15641
04/25/2021, 9:29 PMpopulate_target_table = PostgresOperator(
task_id="populate_target",
postgres_conn_id="postgres_myapp",
sql=\
"""
insert into target_table_b (id, english, spanish)
select source_table_c.id, source_table_c.col1, source_table_d.col1
from source_table_c
join
source_table_d
on
source_table_c.id = source_table_d.id
""",
inlets={
"datasets": [
Dataset("postgresql", "myapp.public.source_table_c"),
Dataset("postgresql", "myapp.public.source_table_d"),
],
},
outlets={"datasets": [Dataset("postgresql", "myapp.public.target_table_a")]},
)
[create_source_table_c, create_source_table_d, create_target_table_b] >> truncate_target_table >> populate_target_table
gray-shoe-75895
04/26/2021, 8:03 AMgray-shoe-75895
04/26/2021, 8:04 AM