Hi, another newbie question re airflow integration...
# ingestion
s
Hi, another newbie question re airflow integration. The example here shows how to ingest a mysql schema https://github.com/linkedin/datahub/blob/master/metadata-ingestion/examples/airflow/mysql_sample_dag.py Let's say I wanted to tag the ingested database as "System of Record" and one column as "PII", would it be possible to get examples of how to do that? I'd like ot specify the tags in code rather than edit manually.
g
Hey @steep-pizza-15641- you can tag entities by either emitting MCEs and via the UI
In that MCE, we are tagging the dataset as having a specific property
here is an example of us tagging a schema field as being `Legacy`:
and here is the MCE emitted to provide a definition for the Legacy tag:
Hope these examples help.
m
@steep-pizza-15641 in your case, the best way to do this cleanly is writing a Transformer (https://datahubproject.io/docs/metadata-ingestion/#transformations) which can take the input MCE generated by your database source and enrich it with Tags
We don't yet have a reference implementation of a
TagApplier
transformer ... but @gray-shoe-75895 was planning to write one.
in the end, the MCE should look like one of the examples @green-football-43791 pointed out.
s
@mammoth-bear-12532 @green-football-43791 Thanks both will try the transformer approach
@gray-shoe-75895 @mammoth-bear-12532 @green-football-43791 Something like this?
from datahub.ingestion.api.transform import Transformer
from datahub.metadata import MetadataChangeEventClass, DatasetSnapshotClass, \
SchemaMetadataClass, SchemaFieldClass, SchemaFieldDataTypeClass, TagAssociationClass, GlobalTagsClass
from logging import getLogger
LOGGER = getLogger(__file__)
TABLE_TAGS = {
"urn:li:dataset:(urn:li:dataPlatform:postgresql,myapp.information_schema.sql_parts,PROD)": {"comments" : { "tags": ["a_tag", "another_tag"],
"description": "Description for comments" }}
}
class TagApplier(Transformer):
def __init__(self, config):
self.config = config
def transform(self, record_envelopes):
for envelope in record_envelopes:
if isinstance(envelope.record, MetadataChangeEventClass):
if isinstance(envelope.record.proposedSnapshot, DatasetSnapshotClass):
urn = envelope.record.proposedSnapshot.urn
if urn in TABLE_TAGS:
for aspect in envelope.record.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
for field in aspect.fields:
if isinstance(field, SchemaFieldClass):
if field.fieldPath in TABLE_TAGS[urn]:
if 'description' in TABLE_TAGS[urn][field.fieldPath]:
desc = TABLE_TAGS[urn][field.fieldPath]['description']
<http://LOGGER.info|LOGGER.info>("Setting table %s field %s description: %s", urn, field.fieldPath, desc)
field.description = TABLE_TAGS[urn][field.fieldPath]['description']
if 'tags' in TABLE_TAGS[urn][field.fieldPath]:
tags = TABLE_TAGS[urn][field.fieldPath]['tags']
<http://LOGGER.info|LOGGER.info>("Setting table %s field %s tags: %s", urn, field.fieldPath, tags)
field.globalTags = GlobalTagsClass(tags=[TagAssociationClass(tag=t) for t in tags])
print(envelope.record, type(envelope.record))
print(envelope.record.proposedSnapshot, type(envelope.record.proposedSnapshot))
print(envelope.record.proposedSnapshot.urn, type(envelope.record.proposedSnapshot.urn))
yield envelope
@classmethod
def create(cls, config_dict, ctx):
return TagApplier(config_dict)
m
@steep-pizza-15641: that looks about right! I think we can simplify some of the
isInstance
and nested checking... by writing some helper class.. which can help in building other kinds of appliers as well.