Hi all, I'm having a tough time getting a handle o...
# troubleshooting
d
Hi all, I'm having a tough time getting a handle on Data Types & Serialization in a specific use-case. I've written a rule engine that uses the broadcast state pattern to distribute the rules. The rules are quite complicated java composite classes (hierarchical and with many fields).. I've had a concrete problem with a change in schema that I thought fitted in with https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/ but as I have now discovered is failing because the serialization is falling back to Kryo because of generic types. So not I've started trying to avoid Kryo and have set up a test with
Copy code
PojoTestUtils.assertSerializedAsPojoWithoutKryo
as detailed here https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/ and started fixing the problems that are reported. Most were initially list / map problems which I have fixed with @TypeInfo and TypeInfoFactory implementations. Now I have run up against a problem on an interface type similar to the following:
Copy code
@JsonTypeInfo(use = NAME, include = PROPERTY, property = "conditionType")
@JsonSubTypes({
        @JsonSubTypes.Type(value = NumericComparisonCondition.class, name = "NumericComparison"),
        @JsonSubTypes.Type(value = StringComparisonCondition.class, name = "StringComparison"),
        ...
})
public interface Condition {
    boolean checkMatch(Event eventContent);
    List<String> validate();
}
Is there any quick way around this? I presumed the fix would involve writing my own TypeInformation and TypeSerializer but have found no information on this in the internet. I have found references to the need to do so but no description of how or samples. I think most of the methods to implement on TypeInformation are obvious enough apart from
Copy code
@Override
    public int getArity() {
        return 0;
    }

    @Override
    public int getTotalFields() {
        return 0;
    }
I have an idea what they mean but have no idea of how to provide them for the above class. Any tips? I would use JSON / Jackson as the basis of the Serde. I also had the idea of just creating a TypeSerializer for the whole Rule using Jackson but run up against the same question there.
p
I was trying to use kotlin data classes and found this repository with some implementations on TypeInformation, maybe it helps: https://github.com/cyberdelia/flink-kotlin
I am curious what needs to be done for generic
Map<String, Object>
property field inside a POJO
d
Actually I am experimenting with that as well. The rule engine above works on events with data in the form of Map<String, Object>. Will let you know how I get on. What I am currently trying is to return a total fields and arity of 1 for my types. Some tidbits point in this direction: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html "Not all fields from a type are mapped to a separate fields in the flat schema and often, entire types are mapped to one field." Google Bard said the following "This information is used by Flink to optimize the processing of the data. For example, Flink can use this information to determine how much memory to allocate for the data and how to efficiently serialize and deserialize the data.". Of course to be taken with a grain of salt. Somewhere else i remember reading that the type information is only fetched once and therefore getTotalFields and getArity should always return the same number. A quick test did not seem to throw up any problems. Would be good to get input from someone from the team though. The documentation is pretty thin...