I didn't see the `status`, but `removed:false` jus...
# getting-started
a
I didn't see the
status
, but
removed:false
just showed up
b
If I read this correctly,
Ownerships
was ingested correctly but not
DataProcessInfo
, and
Status
was magically set?
a
that's correct. any chance know where I did wrong? Apparently Kafka schema doesn't pick up
DataProcessInfo
b
I assume you also rebuilt
mce-consumer
after updating the PDL models?
a
yes. I rebuild two . one is gms, the other is the
mce-consumer
here is my fork branch in case you need it. https://github.com/liangjun-jiang/datahub/tree/data-process-entity. thanks!
b
Okay looks like some weirdness with Avro serialization in Python. Looks like when there's a schema mismatch, instead of failing loudly, the
avro-python3
lib does something weird (i.e. randomly picks a type from the union). Simply drop the extra
dataProcessInfo
in your MCE should fix the issue.
Copy code
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DataProcessSnapshot", {"urn": "urn:li:dataprocess:(sqoop2,4DEMO2,PROD)", "aspects": [{"owners": [{"owner": "urn:li:corpuser:datahub", "type": "DATAOWNER"}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:datahub"}}, { "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" ], "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" ] } ]}), "proposedDelta": None}
a
thanks. I think I have tried this. and tried again, still not see it. want to make sure the
mce-cli.py
is running with python 3, right?
b
Yes it should work with python 3. There's probably other validation errors, try to wrap
DataProcessInfo
in a tuple with the type info as the first element and the value as the second like this
Copy code
("com.linkedin.pegasus2avro.dataprocess.DataProcessInfo, {"inputs":[...], "outputs": [...]})
a
still not luck.
b
Hum. What's the error you're getting?
a
not getting any error. the
dataprocessinfo
aspect doesn't show. Since you think it's the python
avro
package causes this problem, I think I can use other tools, maybe a
java client
or approach like this https://stackoverflow.com/questions/51664191/pushing-avro-file-to-kafka to generate a valid message?
my point is that I don't have to use this python lib.
b
Could you share the latest MCE with me again? Thanks
a
Copy code
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DataProcessSnapshot", {"urn": "urn:li:dataprocess:(21sqoop121,4DEMO3,PROD)", "aspects": [{"owners": [{"owner": "urn:li:corpuser:datahub", "type": "DATAOWNER"}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:datahub"}}, { "com.linkedin.pegasus2avro.dataprocess.DataProcessInfo": { "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" ], "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" ] } }]}), "proposedDelta": None}
b
You're not using a tuple, which is specified using
(...)
instead of
{...}
. This is how you should do it
Copy code
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DataProcessSnapshot", {"urn": "urn:li:dataprocess:(21sqoop121,4DEMO3,PROD)", "aspects": [{"owners": [{"owner": "urn:li:corpuser:datahub", "type": "DATAOWNER"}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:datahub"}}, ( "com.linkedin.pegasus2avro.dataprocess.DataProcessInfo": { "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" ], "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" ] } )]}), "proposedDelta": None}
a
let me give it a try. thakns
a
Copy code
Traceback (most recent call last):
  File "mce_cli.py", line 108, in <module>
    main(parser.parse_args())
  File "mce_cli.py", line 88, in main
    produce(conf, args.data_file, args.schema_record)
  File "mce_cli.py", line 31, in produce
    content = ast.literal_eval(sample.strip())
  File "/Users/liajiang/opt/anaconda3/lib/python3.7/ast.py", line 46, in literal_eval
    node_or_string = parse(node_or_string, mode='eval')
  File "/Users/liajiang/opt/anaconda3/lib/python3.7/ast.py", line 35, in parse
    return compile(source, filename, mode, PyCF_ONLY_AST)
  File "<unknown>", line 1
    {"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DataProcessSnapshot", {"urn": "urn:li:dataprocess:(21sqoop121,4DEMO3,PROD)", "aspects": [{"owners": [{"owner": "urn:li:corpuser:datahub", "type": "DATAOWNER"}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:datahub"}}, ( "com.linkedin.pegasus2avro.dataprocess.DataProcessInfo": { "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" ], "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" ] } )]}), "proposedDelta": None}
                                                                                                                                                                                                                                                                                                                                                                                        ^
SyntaxError: invalid syntax
b
Oops sorry, my bad. Should also change
"com.linkedin.pegasus2avro.dataprocess.DataProcessInfo":
to
"com.linkedin.pegasus2avro.dataprocess.DataProcessInfo",
Copy code
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DataProcessSnapshot", {"urn": "urn:li:dataprocess:(21sqoop121,4DEMO3,PROD)", "aspects": [{"owners": [{"owner": "urn:li:corpuser:datahub", "type": "DATAOWNER"}], "lastModified": {"time": 0, "actor": "urn:li:corpuser:datahub"}}, ( "com.linkedin.pegasus2avro.dataprocess.DataProcessInfo", { "outputs": [ "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" ], "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" ] } )]}), "proposedDelta": None}
a
it works!
b
Yup. Updating the document now
a
apparnetly
com.linkedin.pegasus2avro.dataprocess.DataProcessInfo
is also needed. I couldn't just use
dataProcessInfo
, in case you have doubt about it
b
Yeah that's expected. It needs to be fully qualified type unless the namespace matches the default namespace of the schema