Hi everyone! I’m using DataHub as a data catalog ...
# ingestion
a
Hi everyone! I’m using DataHub as a data catalog hackathon project at InvisionApp. I was able to adapt the data ingest scripts to pull from redshift and I have the contents of my warehouse listed as datasets in DataHub, I’m now trying to use the manifest file from DBT (getdbt.com / data build tool) to assign lineage to at least a subset of my data. DataHub is working great, however I’m struggling a bit with the MCE definitions to add upstream lineage. Right now I’ve modified the metadata-ingestion componentry to append lineage based on my DBT data. Right now it’s just hardcoded while I figure out how to make it all work. The object output for the upstream lineage object I’ve appended to ‘aspects’ that I’m seeing on send to the datahub rest point is:
Copy code
{'upstreams': [
                    {'auditStamp': {'time': 0, 'actor': '', 'impersonator': None
                        }, 'dataset': 'urn:li:dataset:(urn:li:dataPlatform:redshift,events.analytics_dev_garylucas.carr_quarterly,PROD)', 'type': 'TRANSFORMED'
                    }
                ]
            }
I don’t see an error from that but when I go to load lineage I get the following error in the back end (+ a UI error on the front end)
Copy code
datahub-frontend        | 21:36:25 [application-akka.actor.default-dispatcher-313] ERROR application - Fetch Dataset upstreams error
datahub-frontend        | com.linkedin.data.template.TemplateOutputCastException: Invalid URN syntax: Urn doesn't start with 'urn:'. Urn:  at index 0:
datahub-frontend        | 	at com.linkedin.common.urn.UrnCoercer.coerceOutput(UrnCoercer.java:25)
datahub-frontend        | 	at com.linkedin.common.urn.UrnCoercer.coerceOutput(UrnCoercer.java:11)
datahub-frontend        | 	at com.linkedin.data.template.DataTemplateUtil.coerceOutput(DataTemplateUtil.java:954)
datahub-frontend        | 	at com.linkedin.data.template.RecordTemplate.obtainCustomType(RecordTemplate.java:365)
datahub-frontend        | 	at com.linkedin.common.AuditStamp.getActor(AuditStamp.java:159)
datahub-frontend        | 	at com.linkedin.datahub.util.DatasetUtil.toLineageView(DatasetUtil.java:97)
datahub-frontend        | 	at com.linkedin.datahub.dao.table.LineageDao.lambda$getUpstreamLineage$1(LineageDao.java:39)
datahub-frontend        | 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
datahub-frontend        | 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
datahub-frontend        | 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
datahub-frontend        | 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
datahub-frontend        | 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
datahub-frontend        | 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
datahub-frontend        | 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
datahub-frontend        | 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
datahub-frontend        | 	at com.linkedin.datahub.dao.table.LineageDao.getUpstreamLineage(LineageDao.java:40)
datahub-frontend        | 	at controllers.api.v2.Dataset.getDatasetUpstreams(Dataset.java:250)
datahub-frontend        | 	at router.Routes$$anonfun$routes$1$$anonfun$applyOrElse$28$$anonfun$apply$28.apply(Routes.scala:910)
datahub-frontend        | 	at router.Routes$$anonfun$routes$1$$anonfun$applyOrElse$28$$anonfun$apply$28.apply(Routes.scala:910)
datahub-frontend        | 	at play.core.routing.HandlerInvokerFactory$$anon$3.resultCall(HandlerInvoker.scala:134)
datahub-frontend        | 	at play.core.routing.HandlerInvokerFactory$$anon$3.resultCall(HandlerInvoker.scala:133)
datahub-frontend        | 	at play.core.routing.HandlerInvokerFactory$JavaActionInvokerFactory$$anon$8$$anon$2$$anon$1.invocation(HandlerInvoker.scala:108)
I’m pretty sure that I’ve misconfigured my upstream lineage object, however it passes validation on the way in.  Any suggestions on how to troubleshoot this further? Thanks in advance and I appreciate any insight
🎉 1
m
Hey @acoustic-printer-83045, cool! Are you using the new ingestion system or did you build this from scratch?
👀 1
b
Awesome hackathon proj! I think I see the issue. Although this is not well documented anywhere, as per the PDL model, the "auditStamp" field must contain 2 fields: time (long) and actor (urn). To fix the issue, try creating a "system" actor URN:
Copy code
actor: 'urn:li:principal:system'
🙌 1
or maybe more appropriately
Copy code
actor: 'urn:li:corpUser:glucas'
Although technically "actor" is a string type field, Rest.li framework considers it to be a special "typeref" (type + some validation constraints). In this case, the validation constraints are that it must be a unique identifier (urn), which in turn must begin with the prefix "urn:li" 🙂
a
@mammoth-bear-12532 I’m using the tooling in the
datahub/metadata-ingestion
folder.
✔️ 1
Thanks John!
the “auditStamp” field must contain 2 fields: time (long) and actor (urn). To fix the issue, try creating a “system” actor URN:
Ok, that makes sense. my reading of the python generated MCE classes made it look optional, but I see that the tooling generated an empty audit for me which should have been a clue.
and that worked, thanks!
b
W000t!
a
One question, when i upload the whole graph, do new aspects for the same URN overwrite existing data? Presumably yes?
b
correct - but we save previous copies using versions
so aspects are versioned
a
Great, presumably previous versions are available programatically but not fetched in the UI.
apologies for all the questions, it’s a big project and wrapping my brain around it taking a awhile 🙂
l
Hey Gary - great to see the progress you've made. I had a quick question: ``I'm now trying to use the manifest file from DBT (getdbt.com / data build tool) to assign lineage to at least a subset of my data.` Can you explain more about how you are generating lineage across tables?
a
Happy to, DBT is a SQL templating tool. You basically drop ‘models’ into DBT (a code repo) that are templated SQL where each file represents one model or intermediary transform. DBT then acts as the executor and DAG handler. Like a subset of airflow but only for SQL. Our warehouse consists of about 650 ‘models’ where each model is templated SQL. DBT uses the template componetry references to resolve how generate a model. So
Copy code
select 
* 
From
 {ref 'some_other_model' }
When it’s executed that DAG it drops a manifest file: https://docs.getdbt.com/reference/artifacts/manifest-json And in that manifest you’ll see all upstream dependencies for each model. I’m consuming that file, for each node in the executed SQL templates I’m pulling out enough details to construct a URN and also constructing URN’s for the dependencies. Put that in a dictionary[urn] = []deps And then in the
sql_common
component I’m using that map to enrich the load step with dependencies. https://docs.getdbt.com/reference/artifacts/manifest-json
Copy code
"model.invision.account__snapshot": {
            "raw_sql": "A bunch of sql",
            "database": "events",
            "schema": "analytics_dev_garylucas",
            "fqn": [
                "invision",
                "cft",
                "main",
                "transform",
                "account__snapshot"
            ],
            "unique_id": "model.invision.account__snapshot",
            "package_name": "invision",
            "root_path": "/dbt",
            "path": "cft/main/transform/account__snapshot.sql",
            "original_file_path": "models/cft/main/transform/account__snapshot.sql",
            "name": "account__snapshot",
            "resource_type": "model",
            "alias": "account__snapshot",
            "checksum": {
                "name": "sha256",
                "checksum": "1a622db018ec430b0af35132ebf85420ea4b8c0d154e50f35cf7f5679aeb1786"
            },
            "config": {
                "enabled": true,
                "materialized": "table",
                "persist_docs": {},
                "post-hook": [],
                "pre-hook": [],
                "vars": {},
                "quoting": {},
                "column_types": {},
                "alias": null,
                "schema": null,
                "database": null,
                "tags": [
                    "daily"
                ],
                "full_refresh": null,
                "dist": "auto"
            },
            "tags": [
                "daily"
            ],
            "refs": [
                [
                    "account_summary__by_day"
                ],
                [
                    "account_summary__by_day"
                ],
                [
                    "account__snapshot_temp"
                ]
            ],
            "sources": [],
            "depends_on": {
                "macros": [],
                "nodes": [
                    "model.invision.account_summary__by_day",
                    "model.invision.account_summary__by_day",
                    "model.invision.account__snapshot_temp"
                ]
            },
            "description": "",
            "columns": {
                "as_of": {
                    "name": "as_of",
                    "description": "",
                    "meta": {},
                    "data_type": null,
                    "quote": null,
                    "tags": []
                },
                "subdomain": {
                    "name": "subdomain",
                    "description": "",
                    "meta": {},
                    "data_type": null,
                    "quote": null,
                    "tags": []
                }
            },
            "meta": {},
            "docs": {
                "show": true
            },
            "patch_path": "models/cft/main/schema.yml",
            "build_path": null,
            "deferred": false,
            "unrendered_config": {
                "materialized": "table",
                "dist": "auto",
                "tags": "daily"
            }
        }
^ is an example of the kind of structure. In an ideal world we’d pull out the SQL comments and determine some other fields based on the tags / comments etc.
l
this is awesome. any chance you can contribute the DBT based lineage event emitter back into open source?
a
I don’t see why not. that’ll probably be a week or two out though.
l
@mammoth-bear-12532 can you help with getting this contribution in?
a
right now it’s Super janky.
here’s an example of what we see in the UI:
🙌 1
one final question, some of my datasets are erroring out in the lineage graph with this error:
Copy code
datahub-frontend        | 00:44:31 [application-akka.actor.default-dispatcher-541] ERROR application - Fetch Dataset downstreams error
datahub-frontend        | com.linkedin.restli.client.RestLiResponseException: com.linkedin.restli.client.RestLiResponseException: Response status 500, serviceErrorMessage: java.lang.RuntimeException: There is no relation or more than 1 relation between the datasets!
^ figured it out. it’s an ephemeral model, IE not materialized. In DBT world it’s ephemeral, in redshift world it’s a CTE. Hence the componentry presumably breaks because there’s nothing present to link it
doh
b
@acoustic-printer-83045 I'm not sure this should be breaking though ^^ You're saying that one dataset is pointing to another that has no metadata in DH??
By this is awesome. We've been hoping for DBT integration for some time now!!
a
Well to be specific when I loaded the DBT metadata graph that includes ephemeral models, I was overlaying what is in redshift (IE I ran the SQL ingest tool on our redshift instance) and THEN re-ran it but enriched it with upstreams from DBT. When I load datasets that have references to ephemeral models, IE things that wouldn’t exist in datahub right now I get that error. However that may not be the cause of the error, IE it may just be exposing another error. however the log message certainly looks like it’s a missing dataset
There is no relation or more than 1 relation between the datasets!
I did a very janky load system, at this point I’d probably just load the DBT metadata graph which would list all instances.
IE if I were to redo it. Which I probably will