I am trying to add a descriptions to columns in da...
# ingestion
h
I am trying to add a descriptions to columns in datahub source from s3 using the code below:
Copy code
import logging
import time

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper

# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
    AuditStampClass,
    EditableSchemaFieldInfoClass,
    EditableSchemaMetadataClass,
    InstitutionalMemoryClass,
)

log = logging.getLogger(__name__)
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)


def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
    """A helper function to extract simple . path notation from the v2 field path"""
    if not field_path.startswith("[version=2.0]"):
        # not a v2, we assume this is a simple path
        return field_path
        # this is a v2 field path
    tokens = [t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))]

    return ".".join(tokens)


dictionary = {
    "a": "b",
    "c": "d",
}


# Inputs -> owner, ownership_type, dataset
documentation_to_add = "The unique application (service) correlation id on service now"
dataset_name = "a/b/20230511.csv"

dataset_urn = make_dataset_urn(platform="s3", name=dataset_name, env="PROD")


def add_dict(graph, dataset_urn, column, documentation_to_add):
    need_write = False

    field_info_to_set = EditableSchemaFieldInfoClass(fieldPath=column, description=documentation_to_add)

    # Some helpful variables to fill out objects later
    now = int(time.time() * 1000)  # milliseconds since epoch
    current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")

    current_editable_schema_metadata = graph.get_aspect(
        entity_urn=dataset_urn, aspect_type=EditableSchemaMetadataClass,
    )

    # need_write = False

    if current_editable_schema_metadata:
        for fieldInfo in current_editable_schema_metadata.editableSchemaFieldInfo:
            if get_simple_field_path_from_v2_field_path(fieldInfo.fieldPath) == column:
                # we have some editable schema metadata for this field
                field_match = True
                if documentation_to_add != fieldInfo.description:
                    fieldInfo.description = documentation_to_add
                    need_write = True
    else:
        # create a brand new editable dataset properties aspect
        current_editable_schema_metadata = EditableSchemaMetadataClass(
            editableSchemaFieldInfo=[field_info_to_set], created=current_timestamp,
        )
        need_write = True

    if need_write:
        event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
            entityUrn=dataset_urn, aspect=current_editable_schema_metadata,
        )
        graph.emit(event)
        <http://log.info|log.info>(f"Documentation added to dataset {dataset_urn}")

    else:
        <http://log.info|log.info>("Documentation already exists and is identical, omitting write")


# First we get the current owners
gms_endpoint = "<http://localhost:8080>"
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))


for column, documentation_to_add in dictionary.items():
    print(f"column: {column} and documentation_to_add: {documentation_to_add}")
    add_dict(graph, dataset_urn, column, documentation_to_add)
However, the code just tells me tells me the dictionary already exists and meanwhile, it doesn't exist in datahub.
Copy code
column: a and documentation_to_add: b
INFO:__main__:Documentation already exists and is identical, omitting write
column: c and documentation_to_add: d.
INFO:__main__:Documentation already exists and is identical, omitting write
Am I missing anything? I am following instructions from here: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/library/dataset_add_column_documentation.py
g
did you verify that the dataset_urn is correct?
h
yes I did. In fact, if there is no loop, the first dictionary item is inserted in datahub
My struggle is, where do i get documentation for things like:
Copy code
AuditStampClass,
    EditableSchemaFieldInfoClass,
    EditableSchemaMetadataClass,
    InstitutionalMemoryClass,
I seem to be working with a blackbox
g
We have autogenerated docs for all of those types here https://datahubproject.io/docs/python-sdk/models