Dear Team….I am working on exposing an API endpoin...
# getting-started
n
Dear Team….I am working on exposing an API endpoint to populate
DatasetSnapshot
metadata….. I am having some issues when Deserializing the
fields-> type-> type
within the
SchemaMetadata
Aspect…. https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/schema/SchemaFieldDataType.pdl I guess i should set the type of data as value in my Jackson Deserialization inorder for me to set the corresponding type but i am having challenges with that… If linkedin or anyone in the community handled such case with Jackson Deserialization kindly help out…. Details of input/error in the
Thread
Input JSON with only
SchemaMetadata
aspect….
Copy code
{
  "urn": {
    "dataPlatformUrn": {
      "platformName": "streamPlatform"
    },
    "datasetName": "clickstream",
    "fabricType": "TEST"
  },
  "aspects": [
    {
      "schemaMetadata": {
        "schemaName": "hbaseEvent",
        "platform": "urn:li:dataPlatform:hbase",
        "version": 0,
        "created": {
          "time": 0,
          "actor": "urn:li:corpuser:jdoe"
        },
        "lastModified": {
          "time": 0,
          "actor": "urn:li:corpuser:jdoe"
        },
        "hash": "",
        "platformSchema": {
          "kafkaSchema": {
            "documentSchema": "{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.avro2pegasus.events\",\"doc\":\"Header\"}}]}"
          }
        },
        "fields": [
          {
            "fieldPath": "hbase",
            "description": "Bar",
            "nativeDataType": "string",
            "type": {
              "type": {
                "stringType": ""
              }
            }
          }
        ],
        "primaryKeys": [
          "key1",
          "key2",
          "key3"
        ],
        "foreignKeysSpecs": {
          "key1": {
            "foreignKey": {
              "urnForeignKey": {
                "currentFieldPath": "/datalake/dataset"
              }
            }
          },
          "key2": {
            "foreignKey": {
              "urnForeignKey": {
                "currentFieldPath": "/datalake/datasets2"
              }
            }
          }
        }
      }
    }
  ]
}
Error that the
fields->type
is not set….
Copy code
Exception  - com.linkedin.data.avro.DataTranslationException: Error processing /proposedSnapshot/com.linkedin.metadata.snapshot.DatasetSnapshot/aspects/0/com.linkedin.schema.SchemaMetadata/fields/0/type
ERROR :: /proposedSnapshot/com.linkedin.metadata.snapshot.DatasetSnapshot/aspects/0/com.linkedin.schema.SchemaMetadata/fields/0/type :: required field is absent
Also the way i have structured my Json Layer is all the types extend the interface
SchemaFieldDataType
and based on the name it would go to the corresponding datatype…
Copy code
/**
* JsonSchemaFieldDataTypeType
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "@type")
@JsonSubTypes({
    @JsonSubTypes.Type(value = JsonBooleanType.class, name = "booleanType"),
    @JsonSubTypes.Type(value = JsonFixedType.class, name = "fixedType"),
    @JsonSubTypes.Type(value = JsonStringType.class, name = "stringType"),
    @JsonSubTypes.Type(value = JsonBytesType.class, name = "bytesType"),
    @JsonSubTypes.Type(value = JsonNumberType.class, name = "numberType"),
    @JsonSubTypes.Type(value = JsonEnumType.class, name = "enumType"),
    @JsonSubTypes.Type(value = JsonNullType.class, name = "nullType"),
    @JsonSubTypes.Type(value = JsonMapType.class, name = "mapType"),
    @JsonSubTypes.Type(value = JsonArrayType.class, name = "arrayType"),
    @JsonSubTypes.Type(value = JsonUnionType.class, name = "unionType"),
    @JsonSubTypes.Type(value = JsonRecordType.class, name = "recordType")
}
)
public interface JsonSchemaFieldDataTypeType {

}
b
n
Thanks for your response @bumpy-keyboard-50565 Is the above codec for Rest.li implemenation? I am using a REST implementation.
b
It's for general Pegasus-JSON serialization/deserialization. It's not limited to rest.li
a
I think it doesn't do deserialization. only does serialization
👍 1
n
Decided to simplify the json to just populate the type as a string… Just curious on why linkedIn chose this schema approach.
b
@acceptable-architect-70237 here's an example how we use the codec to convert from JSON to DataTemplate (Pegasus): https://github.com/linkedin/datahub/blob/2bdb52b104e00f0aa1c2f1556eee48eec174ef19/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/RecordUtils.java#L86. You can call the wrapper util method directly to simplify your deserialization code.
@nutritious-bird-77396  I'm not sure I understand what you're trying to achieve exactly. DataTemplate is already an in-memory representation so there's really no reason why you need to create another class (POJO or otherwise) to hold the same info in memory.
a
@bumpy-keyboard-50565 the wrapper util actually works for the purpose. thanks.
b
Cool. Like I said I'm still not sure the reason why you want to convert DataTemplate to yet another POJO or vice-versa though.
n
Sorry for not providing enough background…. I am working on providing an API to ingest MCE messages from non-Kafka producers (like say from a Jenkins Pipeline)…. So, I am getting a JSON message from the API users and converting it to MCE and pushing it to kafka for MCE Consumers to consume the message. API is just at the forefront of datahub without intruding any internal working of datahub. Once the JSON message is converted to a DataTemplate, i will convert it to a GenericRecord to push to kafka.
b
Understood. In that case why not start with Avro directly?
Avro in JSON I meant.
Saves you from double conversion.
n
After you pointed us towards the RecordTemplate conversion..I am moving in that direction…
Copy code
{
  "urn": "urn:li:dataset:(urn:li:dataPlatform:streamPlatform,clickstream,TEST)",
  "aspects": [
    {
      "com.linkedin.common.Ownership": {
        "owners": [
          {
            "owner": "urn:li:corpuser:johndoe",
            "type": "DEVELOPER",
            "source": {
              "type": "MANUAL",
              "url": ""
            }
          }
        ],
        "lastModified": {
          "actor": "urn:li:corpuser:johndoe",
          "impersonator": "urn:li:corpuser:alice",
          "time": 1597269355
        }
      }
    }
  ]
}
b
What I meant is that instead of
Pegasus-compatible JSON (API) => Data Template => Generic Record (Kafka) => Data Template (MCE consumer)
, why not just
Avro JSON (API)  => Generic Record (Kafka) => Data Template (MCE consumer)
?
n
Is there a utility to do this job for MCE?
Avro JSON (API)  => Generic Record (Kafka)
?
n
Let me give this a try. Thanks!