millions-pencil-75565
03/24/2023, 5:42 AMcuddly-garden-9148
06/19/2023, 8:20 AMbetter-agent-91402
06/26/2023, 11:01 AMrich-crowd-33361
07/05/2023, 7:30 PMpowerful-monitor-13002
07/10/2023, 1:18 AMFineGrainedLineage
construct. Will there be any interest into integrating spline with the datahub Spark listener jar?
Some cool features of spline: It supports a lot more low level Spark commands along with support for multiple data providers out of the box such as : Kafka, Mongo, ES, Hive, JDBC, Cassandra, etcgifted-diamond-19544
07/14/2023, 2:15 PMhandsome-park-80602
08/01/2023, 3:28 PMmost-monkey-10812
08/21/2023, 10:41 AM{
"inputDatasetFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,catalog.src.srcTable,PROD),col1)"
],
"outputDatasetFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,catalog.dest.destTable,PROD),col1)"
],
"inputDatasetEdges": [
{
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,catalog.src.srcTable,PROD)",
"lastModified": {
"actor": "urn:li:corpuser:UNKNOWN",
"time": 1692613604982
},
"created": {
"actor": "urn:li:corpuser:UNKNOWN",
"time": 1692613604982
}
}
],
"outputDatasetEdges": [
{
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:athena,catalog.dest.destTable,PROD)",
"lastModified": {
"actor": "urn:li:corpuser:UNKNOWN",
"time": 1692613604984
},
"created": {
"actor": "urn:li:corpuser:UNKNOWN",
"time": 1692613604984
}
}
],
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:athena,catalog.src.srcTable,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:athena,catalog.dest.destTable,PROD)"
],
"fineGrainedLineages": [
{
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,catalog.dest.destTable,PROD),col1)"
],
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:athena,catalog.src.srcTable,PROD),col1)"
]
}
]
}
proud-mouse-39373
08/22/2023, 8:11 AMproud-mouse-39373
08/22/2023, 8:12 AMpurple-refrigerator-27989
08/24/2023, 3:08 AMclever-match-44392
08/29/2023, 12:39 PMicy-yacht-31703
09/18/2023, 12:10 PMred-florist-94889
09/22/2023, 2:09 PMtall-flag-84207
09/28/2023, 9:20 AMbitter-baker-81702
10/11/2023, 12:14 PMbulky-shoe-65107
10/16/2023, 12:34 AMicy-yacht-31703
10/30/2023, 9:48 AMstale-ram-69119
11/07/2023, 11:02 AMbrief-eye-25921
11/08/2023, 12:35 PM{"query": "SELECT * FROM test.test_table", "downstram_tables": [], "upstram_tables": ["test.test_table"]}
{"query": "INSERT INTO test.test_son (runoob_id) SELECT runoob_id FROM test.test_table", "downstram_tables": ["test.test_son"], "upstram_tables": []}
late-lizard-17365
11/10/2023, 10:24 AMbig-table-62755
11/24/2023, 11:22 AMSqlQueriesSource
to create column lineage
from datahub.ingestion.source.sql_queries import SqlQueriesSource, SqlQueriesSourceConfig
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.api.common import PipelineContext
datahub_graph_client = DataHubGraph(config=DatahubClientConfig())
conf = SqlQueriesSourceConfig(platform='clickhouse', query_file='./queries/subscription_events.sql')
cxt = PipelineContext(run_id='test_column_lineage', graph=datahub_graph_client)
src = SqlQueriesSource(config=conf, ctx=cxt)
I am missing the piece how to emit this to my datahub cluster, should I use DatahubRestEmitter
somehow?astonishing-byte-5433
11/29/2023, 9:06 AMearly-hydrogen-27542
11/29/2023, 11:27 PMbland-orange-13353
11/30/2023, 8:29 AMglamorous-spoon-27211
12/26/2023, 2:58 AMimport datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
def datasetUrn(tbl):
return builder.make_dataset_urn("hive", tbl)
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld)
fineGrainedLineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
fldUrn("account_balance", "account_key"),
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("account_balance_delta", "account_balance_key")],
),
]
# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(
dataset=datasetUrn("account_balance"), type=DatasetLineageType.TRANSFORMED
)
fieldLineages = UpstreamLineage(
upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
)
lineageMcp = MetadataChangeProposalWrapper(
entityUrn=datasetUrn("account_balance_delta"),
aspect=fieldLineages,
)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("<http://localhost:8080>")
# Emit metadata!
emitter.emit_mcp(lineageMcp)
fresh-book-19245
02/06/2024, 5:13 PMgray-sundown-82407
02/08/2024, 3:32 PMsome-car-9623
03/13/2024, 2:47 PMsome-car-9623
03/14/2024, 8:45 PM