steep-pizza-15641
04/26/2021, 1:01 PMgreen-football-43791
04/26/2021, 3:32 PMgreen-football-43791
04/26/2021, 3:33 PMgreen-football-43791
04/26/2021, 3:33 PMgreen-football-43791
04/26/2021, 3:34 PMgreen-football-43791
04/26/2021, 3:34 PMgreen-football-43791
04/26/2021, 3:34 PMgreen-football-43791
04/26/2021, 3:34 PMgreen-football-43791
04/26/2021, 3:35 PMmammoth-bear-12532
mammoth-bear-12532
TagApplier
transformer ... but @gray-shoe-75895 was planning to write one.mammoth-bear-12532
steep-pizza-15641
04/26/2021, 4:09 PMsteep-pizza-15641
04/27/2021, 9:51 PMfrom datahub.ingestion.api.transform import Transformer
from datahub.metadata import MetadataChangeEventClass, DatasetSnapshotClass, \
SchemaMetadataClass, SchemaFieldClass, SchemaFieldDataTypeClass, TagAssociationClass, GlobalTagsClass
from logging import getLogger
LOGGER = getLogger(__file__)
TABLE_TAGS = {
"urn:li:dataset:(urn:li:dataPlatform:postgresql,myapp.information_schema.sql_parts,PROD)": {"comments" : { "tags": ["a_tag", "another_tag"],
"description": "Description for comments" }}
}
class TagApplier(Transformer):
def __init__(self, config):
self.config = config
def transform(self, record_envelopes):
for envelope in record_envelopes:
if isinstance(envelope.record, MetadataChangeEventClass):
if isinstance(envelope.record.proposedSnapshot, DatasetSnapshotClass):
urn = envelope.record.proposedSnapshot.urn
if urn in TABLE_TAGS:
for aspect in envelope.record.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
for field in aspect.fields:
if isinstance(field, SchemaFieldClass):
if field.fieldPath in TABLE_TAGS[urn]:
if 'description' in TABLE_TAGS[urn][field.fieldPath]:
desc = TABLE_TAGS[urn][field.fieldPath]['description']
<http://LOGGER.info|LOGGER.info>("Setting table %s field %s description: %s", urn, field.fieldPath, desc)
field.description = TABLE_TAGS[urn][field.fieldPath]['description']
if 'tags' in TABLE_TAGS[urn][field.fieldPath]:
tags = TABLE_TAGS[urn][field.fieldPath]['tags']
<http://LOGGER.info|LOGGER.info>("Setting table %s field %s tags: %s", urn, field.fieldPath, tags)
field.globalTags = GlobalTagsClass(tags=[TagAssociationClass(tag=t) for t in tags])
print(envelope.record, type(envelope.record))
print(envelope.record.proposedSnapshot, type(envelope.record.proposedSnapshot))
print(envelope.record.proposedSnapshot.urn, type(envelope.record.proposedSnapshot.urn))
yield envelope
@classmethod
def create(cls, config_dict, ctx):
return TagApplier(config_dict)
mammoth-bear-12532
isInstance
and nested checking... by writing some helper class.. which can help in building other kinds of appliers as well.