wonderful-army-822
01/31/2022, 7:21 PMorange-night-91387
02/01/2022, 1:54 AMwonderful-army-822
02/01/2022, 7:25 PMfor key in task.get_serialized_fields():
if key not in job_property_bag:
job_property_bag[key] = repr(getattr(task, key))
# <http://operator.log.info|operator.log.info>(f"{flow_property_bag=}")
# <http://operator.log.info|operator.log.info>(f"{job_property_bag=}")
allowed_task_keys = [
"_downstream_task_ids",
"_inlets",
"_outlets",
"_task_type",
"_task_module",
"depends_on_past",
"email",
"label",
"execution_timeout",
"end_date",
"start_date",
"sla",
"sql",
"task_id",
"trigger_rule",
"wait_for_downstream",
]
job_property_bag = {
k: v for (k, v) in job_property_bag.items() if k in allowed_task_keys
}
.....
.....
flow_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataFlowSnapshotClass(
urn=flow_urn,
aspects=[
models.DataFlowInfoClass(
name=dag.dag_id,
description=f"{dag.description}\n\n{dag.doc_md or ''}",
customProperties=flow_property_bag,
externalUrl=flow_url,
),
*ownership_aspect,
*tags_aspect,
],
)
)