Hi! Where can I see examples of creating metadata ...
# ingestion
c
Hi! Where can I see examples of creating metadata directly using API? Better documentation.. I see ability to add data using plugns, but it is not enough for us. And it is too hard for me to create new plugins. Marquez for example support user friendly api, it is ease to manipulate data using this api. But I don’t see any user friendly way with datahub
i
Hello Ruslan, DataHub supports adding metadata directly using Python or Java APIs. Have you checked these pages? Happy to help if they are not enough 🙂
👍 1
c
wow seems like that what we need 👍 and where can I get all possible attributes to push in datahub? for example I need to push dataset, after that to push datajob event with link to this dataset that will show when this dataset was refreshed?
l
👍 1
so you can see entities, attributes of entities and relationships to other entities
i
Could you explain your second question Ruslan, not sure I understood.
c
@incalculable-ocean-74010 second? it is which?) @loud-island-88694 i think answered. I saw this page but was not sure that it will work. There are enough attributes there, but I din’t find simple example how to use them. I found just this, but it is too hard, your example much better, I will try it, thank you very much guys! 👍
i
This one:
I need to push dataset, after that to push datajob event with link to this dataset that will show when this dataset was refreshed?
If it is answered let me know 🙂
c
not sure, but I think need to try I have got 1. table in dwh = dataset 2. process in etl system that refresh data in this table I want to datahub users know when this table was refreshed. They search table in ui, find it and see that it related to process that refreshed it at exact datetime. it seems like I have to create datajob and after each process push there success event with
dataJobInputOutput
related to this dataset https://demo.datahubproject.io/dataset/urn:li:dataset:(urn:li:dataPlatform:datahub,DataJob,PROD)/Schema?is_lineage_mode=false
i
Yup! That’s it 🙂
c
Marquez work perfect in this case, on screen I found dataset (testdataset), immediately see related datajob and refreshing log of this dataset. But I din’t found any examples in datahab about dataset-datajob integration. Maybe Airflow integration work as we need, but we use Prefect and don’t want to be tied to scheduling service
i
That is dataset lineage, you are right that we have certain sources that do that. We currently support airflow, snowflake, dbt, bigquery. You can see examples here: https://datahubproject.io/docs/lineage/sample_code
👍 1
🔥 1
c
it is possible to post only 1 aspect per event?
and where I can find example in publishing datajob? DatasetPropertiesClass should work? and builder.make_dataset_urn
i
it is possible to post only 1 aspect per event?
You can I think, you would post an MCP (MetadataChangeProposal for the entity you want with just the aspect you want).
Copy code
and where I can find example in publishing datajob? DatasetPropertiesClass should work? and builder.make_dataset_urn (editado)
If you are talking about https://datahubproject.io/docs/metadata-ingestion/as-a-library#example-usage this is just an example on how to publish a change to a
dataset
entity. If you want to publish an updated to a
datajob
entity then you have update the:
Copy code
MetadataChangeProposalWrapper(
    entityType="dataset",
    changeType=ChangeTypeClass.UPSERT,
    entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"),
    aspectName="datasetProperties",
    aspect=dataset_properties,
)
Accordingly. You can check the documentation on the datajob entity here.
c
You can I think, you would post an MCP (MetadataChangeProposal for the entity you want with just the aspect you want).
I mean I want to post 20 aspects per 1 query. MCP support 1 aspect as I see Your answer about datajob not clear too, what did you want to say when copy this part from documentation? It is about dataset. But no examples about datajob.
i
My apologies, you are right. If you want to post 20 aspects you need to generate 20 MCPs I hadn’t understood your question well.
Your answer about datajob not clear too, what did you want to say when copy this part from documentation? It is about dataset. But no examples about datajob.
I meant that you create a
MetadataChangeProposalWrapper
and pass the same parameters but adapted for datajob. Here is an example:
Copy code
# Construct a MetadataChangeProposalWrapper object to update DataJobInfo aspect of a datajob with urn <datajob urn>.

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass

from datahub.emitter.rest_emitter import DatahubRestEmitter

# Create an emitter to DataHub over REST
emitter = DatahubRestEmitter(gms_server="<http://localhost:8080>", extra_headers={})

# Test the connection
emitter.test_connection()
# Create the aspect 
datajob_info=DataJobInfo(name=<some name>, type="Spark", status=JobStatus.IN_PROGRESS)

# Create the Metadata Event to emit
metadata_event = MetadataChangeProposalWrapper(
    entityType="datajob",
    changeType=ChangeTypeClass.UPSERT,
    entityUrn=<datajob urn>,
    aspectName="DataJobInfo",
    aspect=datajob_info,
)

emitter.emit(metadata_event)
c
seems like need to create flow first, after that job
i
c
Pedro, thank you very much for help, you really make a lot for me may you please tell whts the problem while creating job?
Copy code
# /////////////////////////////////////////////////////////
# Construct a dataflow properties object !!!!!!SUCCESS!!!!!!
# /////////////////////////////////////////////////////////
dataFlowInfo = DataFlowInfoClass(
    description="This flow stores test flow",
    name="my_dataflow",
    externalUrl="<https://cloud.prefect.io/>")

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
    entityType="dataflow",
    changeType=ChangeTypeClass.UPSERT,
    entityUrn=builder.make_data_flow_urn("my_orchestrator", "my_dataflow", 'my_cluster'),
    aspectName="dataFlowInfo",
    aspect=dataFlowInfo,
)
emitter.emit(metadata_event)

# /////////////////////////////////////////////////////////
# Construct a dataset properties object !!!!!!SUCCESS!!!!!!
# /////////////////////////////////////////////////////////
dataset_properties = DatasetPropertiesClass(
    description="This table stored the test dataset",
    customProperties={
         "governance": "ENABLED"
    })

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
    entityType="dataset",
    changeType=ChangeTypeClass.UPSERT,
    entityUrn=builder.make_dataset_urn("my_platform", "my_database.my_schema.my_dataset"),
    aspectName="datasetProperties",
    aspect=dataset_properties,
)
emitter.emit(metadata_event)

# /////////////////////////////////////////////////////////
# Construct a datajob properties object  !!!!!!FAILED!!!!!! java.lang.RuntimeException: Unknown aspect DataJobInfo for entity datajob
# /////////////////////////////////////////////////////////
DataJobInfo = DataJobInfoClass(
    description="This job executing test flow",
    name="my_datajob",
    externalUrl="<https://cloud.prefect.io/jobid>",
    type="PrefectJob"
)

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
    entityType="datajob",
    changeType=ChangeTypeClass.UPSERT,
    entityUrn=builder.make_data_job_urn("my_orchestrator", builder.make_data_flow_urn("my_orchestrator", "my_dataflow", 'my_cluster'), builder.make_dataset_urn("bigquery", "my_database.my_schema.my_dataset"), "my_cluster"),
    aspectName="DataJobInfo",
    aspect=DataJobInfo
)
emitter.emit(metadata_event)
i
At first glance, you are missing the emit statements. To publish the events into Datahub. If you have error logs we can check I may be able to better help you
c
Copy code
datahub.configuration.common.OperationalError: ('Unable to emit metadata to DataHub GMS', {'exceptionClass': 'com.linkedin.restli.server.RestLiServiceException', 'stackTrace': 'com.linkedin.restli.server.RestLiServiceException [HTTP Status:500]: java.lang.RuntimeException: Unknown aspect DataJobInfo for entity datajob\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:42)\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:50)\n\tat com.linkedin.metadata.resources.entity.AspectResource.ingestProposal(AspectResource.java:133)\n\tat sun.reflect.GeneratedMethodAccessor288.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat com.linkedin.restli.internal.server.RestLiMethodInvoker.doInvoke(RestLiMethodInvoker.java:172)\n\tat com.linkedin.restli.internal.server.RestLiMethodInvoker.invoke(RestLiMethodInvoker.java:326)\n\tat com.linkedin.restli.internal.server.filter.FilterChainDispatcherImpl.onRequestSuccess(FilterChainDispatcherImpl.java:47)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:86)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.lambda$onRequest$0(RestLiFilterChainIterator.java:73)\n\tat java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)\n\tat java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)\n\tat java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChainIterator.onRequest(RestLiFilterChainIterator.java:72)\n\tat com.linkedin.restli.internal.server.filter.RestLiFilterChain.onRequest(RestLiFilterChain.java:55)\n\tat com.linkedin.restli.server.BaseRestLiServer.handleResourceRequest(BaseRestLiServer.java:218)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequestWithRestLiResponse(RestRestLiServer.java:242)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:211)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:181)\n\tat com.linkedin.restli.server.RestRestLiServer.doHandleRequest(RestRestLiServer.java:164)\n\tat com.linkedin.restli.server.RestRestLiServer.handleRequest(RestRestLiServer.java:120)\n\tat com.linkedin.restli.server.RestLiServer.handleRequest(RestLiServer.java:132)\n\tat com.linkedin.restli.server.DelegatingTransportDispatcher.handleRestRequest(DelegatingTransportDispatcher.java:70)\n\tat com.linkedin.r2.filter.transport.DispatcherRequestFilter.onRestRequest(DispatcherRequestFilter.java:70)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)\n\tat com.linkedin.r2.filter.transport.ServerQueryTunnelFilter.onRestRequest(ServerQueryTunnelFilter.java:58)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.TimedNextFilter.onRequest(TimedNextFilter.java:55)\n\tat com.linkedin.r2.filter.message.rest.RestFilter.onRestRequest(RestFilter.java:50)\n\tat com.linkedin.r2.filter.TimedRestFilter.onRestRequest(TimedRestFilter.java:72)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:146)\n\tat com.linkedin.r2.filter.FilterChainIterator$FilterChainRestIterator.doOnRequest(FilterChainIterator.java:132)\n\tat com.linkedin.r2.filter.FilterChainIterator.onRequest(FilterChainIterator.java:62)\n\tat com.linkedin.r2.filter.FilterChainImpl.onRestRequest(FilterChainImpl.java:96)\n\tat com.linkedin.r2.filter.transport.FilterChainDispatcher.handleRestRequest(FilterChainDispatcher.java:75)\n\tat com.linkedin.r2.util.finalizer.RequestFinalizerDispatcher.handleRestRequest(RequestFinalizerDispatcher.java:61)\n\tat com.linkedin.r2.transport.http.server.HttpDispatcher.handleRequest(HttpDispatcher.java:101)\n\tat com.linkedin.r2.transport.http.server.AbstractR2Servlet.service(AbstractR2Servlet.java:105)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\tat com.linkedin.restli.server.spring.ParallelRestliHttpRequestHandler.handleRequest(ParallelRestliHttpRequestHandler.java:63)\n\tat org.springframework.web.context.support.HttpRequestHandlerServlet.service(HttpRequestHandlerServlet.java:73)\n\tat javax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\tat org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:852)\n\tat org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1604)\n\tat com.datahub.authentication.filter.AuthenticationFilter.doFilter(AuthenticationFilter.java:77)\n\tat org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)\n\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)\n\tat org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:536)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)\n\tat org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)\n\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)\n\tat org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\tat org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)\n\tat org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.Server.handle(Server.java:494)\n\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)\n\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)\n\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)\n\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)\n\tat org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)\n\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)\n\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.RuntimeException: Unknown aspect DataJobInfo for entity datajob\n\tat com.linkedin.metadata.entity.EntityService.ingestProposal(EntityService.java:370)\n\tat com.linkedin.metadata.resources.entity.AspectResource.lambda$ingestProposal$3(AspectResource.java:136)\n\tat com.linkedin.metadata.restli.RestliUtil.toTask(RestliUtil.java:30)\n\t... 84 more\n', 'message': 'java.lang.RuntimeException: Unknown aspect DataJobInfo for entity datajob', 'status': 500})
emit statements missing, right, sorry, fixed. problem not there
java.lang.RuntimeException: Unknown aspect DataJobInfo for entity datajob
dataJobInfo lowercase first char 🤦 going forward thank you very much Pedro
👍 1