…another newbie question… I’ve followed the “Compe...
# getting-started
w
…another newbie question… I’ve followed the “Compensation” tutorial from one of the last town hall recordings - many thanks for that, really useful! It works and I do see the changes as expected. I do see, however, also an exception in mae-consumer-job (which I’ve restarted after a clean build to pick up the changes). So I’m wondering what I was missing… I don’t expect that the new compensation aspects will automagically appear in Elastic or in Neo4J, I’m just wondering about that exception… Thanks for your support and your patience!
Copy code
19:23:26.513 [mae-consumer-job-client-StreamThread-1] INFO  c.l.m.k.MetadataAuditEventsProcessor - {com.linkedin.metadata.snapshot.CorpUserSnapshot={urn=urn:li:corpuser:datahub, aspects=[{com.linkedin.identity.CompensationPackage={weeklyPay=1234, targetBonus=10}}]}}
19:23:26.544 [mae-consumer-job-client-StreamThread-1] ERROR c.l.m.k.MetadataAuditEventsProcessor - java.lang.RuntimeException:
com.linkedin.metadata.dao.utils.RecordUtils.invokeProtectedMethod(RecordUtils.java:257)
 com.linkedin.metadata.dao.utils.RecordUtils.getRecordTemplateField(RecordUtils.java:176)
 com.linkedin.metadata.builders.graph.BaseGraphBuilder.build(BaseGraphBuilder.java:41)
 com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.updateNeo4j(MetadataAuditEventsProcessor.java:78)
 com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.processSingleMAE(MetadataAuditEventsProcessor.java:62)
 com.linkedin.metadata.kafka.config.KafkaStreamsConfig.lambda$kStream$0(KafkaStreamsConfig.java:64)
 org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
 org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
 org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
b
Interesting. Seems like it was able to perform reflection (getting the
urn
field specifically). Do you have more stacktrace?
w
Digging a bit deeper, I see this exception:
Copy code
com.linkedin.data.template.TemplateOutputCastException: Output urn:li:corpuser:datahub has type java.lang.String, but does not have a registered coercer and cannot be coerced to type com.linkedin.common.urn.Urn
stackTrace = {StackTraceElement[22]@12428} 0 = {StackTraceElement@12430} “sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)” 1 = {StackTraceElement@12431} “sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)” 2 = {StackTraceElement@12432} “sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)” 3 = {StackTraceElement@12433} “java.lang.reflect.Method.invoke(Method.java:498)” 4 = {StackTraceElement@12434} “com.linkedin.metadata.dao.utils.RecordUtils.invokeProtectedMethod(RecordUtils.java:255)” 5 = {StackTraceElement@12435} “com.linkedin.metadata.dao.utils.RecordUtils.getRecordTemplateField(RecordUtils.java:176)” 6 = {StackTraceElement@12436} “com.linkedin.metadata.builders.graph.BaseGraphBuilder.build(BaseGraphBuilder.java:41)” 7 = {StackTraceElement@12437} “com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.updateNeo4j(MetadataAuditEventsProcessor.java:78)” 8 = {StackTraceElement@12438} “com.linkedin.metadata.kafka.MetadataAuditEventsProcessor.processSingleMAE(MetadataAuditEventsProcessor.java:62)” 9 = {StackTraceElement@12439} “com.linkedin.metadata.kafka.config.KafkaStreamsConfig.lambda$kStream$0(KafkaStreamsConfig.java:64)” 10 = {StackTraceElement@12440} “org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)” 11 = {StackTraceElement@12441} “org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)” 12 = {StackTraceElement@12442} “org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)” 13 = {StackTraceElement@12443} “org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)” 14 = {StackTraceElement@12444} “org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)” 15 = {StackTraceElement@12445} “org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)” 16 = {StackTraceElement@12446} “org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)” 17 = {StackTraceElement@12447} “org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)” 18 = {StackTraceElement@12448} “org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)” 19 = {StackTraceElement@12449} “org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)” 20 = {StackTraceElement@12450} “org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)” 21 = {StackTraceElement@12451} “org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)”
I see the same exception when starting the off-the-shelf docker images on a clean database and loading the provided test sample using mce_cli.py. So this isn’t connected to any extensions that I’ve tried.
b
Got it. Do you mind to create an issue on GitHub so we can track and fix this issue? Thanks.
o
So I think I've seen this as well if I'm understanding the issue, and I debugged through when it happened to me. Essentially what happened was that the static initializers that add all of the coercers into the unmodifiable map used by Rest.Li didn't run. Only way I've found to get rid of it was to separately run the code to set up the unmodifiable coercer map at initialization outside of relying on the static initializers to run. Add breakpoints in Custom.registerCoercer and see if that's what's happening to you as well.
b
w
Thanks!