Hello everyone, I’ve been working on Datahub for a...
# getting-started
b
Hello everyone, I’ve been working on Datahub for a few days with the intention to evaluate it as a possible solution for metadata sharing and data lineage in my company. I’ve been following the docs to try to onboard a new entity in the context of a local Docker environment. The entity is quite simple (copying from
corpGroup
but with a different urn and just a simple property, the
name
) as I’m trying to understand all the steps involved in the process. Nonetheless, when I try to add an entity instance via the REST api (curl) I get the following error:
Copy code
datahub-mae-consumer    | 17:55:48.772 [mae-consumer-job-client-0-C-1] INFO  c.l.m.k.MetadataAuditEventsProcessor - {com.linkedin.metadata.snapshot.AdevintaOrganisationSnapshot={urn=urn:li:adevintaOrganisation:finance, aspects=[{com.linkedin.identity.AdevintaOrganisationInfo={name=finance}}]}}
datahub-mae-consumer    | 17:55:48.774 [mae-consumer-job-client-0-C-1] ERROR c.l.m.k.MetadataAuditEventsProcessor - java.util.NoSuchElementException: No value present [java.util.Optional.get(Optional.java:135), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.updateNeo4j(MetadataAuditEventsProcessor.java:83), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.consume(MetadataAuditEventsProcessor.java:68), sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method), sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62), sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:498), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120), org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48), org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1327), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1307), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1267), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1248), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1162), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:971), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:775), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708), java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), java.util.concurrent.FutureTask.run(FutureTask.java:266), java.lang.Thread.run(Thread.java:748)]
could you help debugging it, or pointing me to the right direction? Thank you!
b
Did you modify any PDL files?
o
Did you add a GraphBuilder for your new entity? This is throwing an error on the Neo4J section indicating there is no GraphBuilder in the RegisteredGraphBuilders that matches your entity type. If you don't care about updating Neo4J with your current changes, it should still update MySQL & Elastic. This error shouldn't be blocking you since that whole method is covered by a try/catch(Exception).
m
Yeah, as an FYI if you're just looking at docker output, it unifies the logs of all services. As Ryan said, this the service that updates the graph, and may not be critical if you don't care about graph updates yet
b
hi again, thanks for your replies. To answer your questions, the metadata pdls look a bit like this:
Copy code
namespace com.linkedin.identity

import com.linkedin.common.AdevintaOrganisationUrn

/**
 * an organisation in Adevinta
 */
@Aspect.EntityUrns = [ "com.linkedin.common.AdevintaOrganisationUrn" ]
record AdevintaOrganisationInfo {

  /**
   * name of this organisation
   */
  name: string
}
Copy code
namespace com.linkedin.metadata.aspect

import com.linkedin.identity.AdevintaOrganisationInfo

/**
 * A union of all supported metadata aspects for a AdevintaOrganisation
 */
typeref AdevintaOrganisationAspect = union[AdevintaOrganisationInfo]
Copy code
namespace com.linkedin.metadata.entity

import com.linkedin.common.AdevintaOrganisationUrn

/**
 * Data model for a AdevintaOrganisation entity(go/groupId)
 */
record AdevintaOrganisationEntity includes BaseEntity {

  /**
   * Urn for the Organisation
   */
  urn: AdevintaOrganisationUrn

  /**
   * name of the organisation, e.g. wherehows-dev, ask_metadata
   */
  name: optional string
}
Copy code
namespace com.linkedin.metadata.search

import com.linkedin.common.AdevintaOrganisationUrn

/**
 * Data model for Adevinta Organisation search
 */
record AdevintaOrganisationDocument includes BaseDocument {

  /**
   * Urn for the Adevinta Organisation.
   */
  urn: AdevintaOrganisationUrn

  /**
   * Name of the Adevinta organisation
   */
  name: optional string
}
Copy code
namespace com.linkedin.metadata.snapshot

import com.linkedin.common.AdevintaOrganisationUrn
import com.linkedin.metadata.aspect.AdevintaOrganisationAspect

/**
 * A metadata snapshot for a specific AdevintaOrganisation entity.
 */
record AdevintaOrganisationSnapshot {

  /**
   * URN for the entity the metadata snapshot is associated with.
   */
  urn: AdevintaOrganisationUrn

  /**
   * The list of metadata aspects associated with the LdapUser. Depending on the use case, this can either be all, or a selection, of supported aspects.
   */
  aspects: array[AdevintaOrganisationAspect]
}
Copy code
namespace com.linkedin.metadata.snapshot

/**
 * A union of all supported metadata snapshot types.
 */
typeref Snapshot = union[
  AdevintaGroupSnapshot,
  AdevintaOrganisationSnapshot,
  CorpGroupSnapshot,
  CorpUserSnapshot,
  DatasetSnapshot,
  DataProcessSnapshot,
  MLModelSnapshot,
  MLFeatureSnapshot
]
Copy code
datahub-gms             | 09:21:21.309 [qtp626202354-14] ERROR c.l.metadata.dao.search.ESSearchDAO - SearchRequest{searchType=QUERY_THEN_FETCH, indices=[adevintaorganisationdocument], indicesOptions=IndicesOptions[id=38, ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, allow_alisases_to_multiple_indices=true, forbid_closed_indices=true], types=[], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=128, source={
datahub-gms             |   "from" : 0,
datahub-gms             |   "size" : 10,
datahub-gms             |   "query" : {
datahub-gms             |     "bool" : {
datahub-gms             |       "disable_coord" : false,
datahub-gms             |       "adjust_pure_negative" : true,
datahub-gms             |       "boost" : 1.0
datahub-gms             |     }
datahub-gms             |   },
datahub-gms             |   "sort" : [
datahub-gms             |     {
datahub-gms             |       "urn" : {
datahub-gms             |         "order" : "asc"
datahub-gms             |       }
datahub-gms             |     }
datahub-gms             |   ]
datahub-gms             | }}
Indeed I had not added the graph builder (I did it now though). But I still have this behaviour:
Copy code
datahub-mae-consumer    | 09:20:36.784 [mae-consumer-job-client-0-C-1] INFO  c.l.m.k.MetadataAuditEventsProcessor - {com.linkedin.metadata.snapshot.AdevintaOrganisationSnapshot={urn=urn:li:adevintaOrganisation:neworg, aspects=[{com.linkedin.identity.AdevintaOrganisationInfo={name=neworg}}]}}
datahub-mae-consumer    | 09:20:36.785 [mae-consumer-job-client-0-C-1] ERROR c.l.m.k.MetadataAuditEventsProcessor - java.util.NoSuchElementException: No value present [java.util.Optional.get(Optional.java:135), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.updateNeo4j(MetadataAuditEventsProcessor.java:83), com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.consume(MetadataAuditEventsProcessor.java:68), sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method), sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62), sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:498), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171), org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120), org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48), org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79), org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1327), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1307), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1267), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1248), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1162), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:971), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:775), org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708), java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), java.util.concurrent.FutureTask.run(FutureTask.java:266), java.lang.Thread.run(Thread.java:748)]
and upon calling:
Copy code
curl <http://localhost:8080/adevintaOrganisations>
I also get:
after adding:
Copy code
@RestMethod.GetAll
  @Nonnull
  public Task<List<AdevintaOrganisation>> getAll(@PagingContextParam @Nonnull PagingContext pagingContext,
      @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames,
      @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter,
      @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) {
    return super.getAll(pagingContext, aspectNames, filter, sortCriterion);
to my
gms.impl.src.main.java.com.linkedin.metadata.resources.identity.AdevintaOrganisations.java
ok, I finally solved the problem! I did not re-build correctly the
datahub-mae-consumer
container after adding the index and graph builders
👍 1
b
Glad that you had it sorted. Yes, you'll need to rebuild all images every time after updating the models.
m
also small fyi but
@Aspect.EntityUrns
does nothing right now and can be deleted 🙂
b
thank you John, I’ll take it into account 😉