Swaraj
08/24/2024, 7:35 AMPojoSerializer
for serializing POJOs. However, I have a question regarding how Flink handles serialization when one of the fields in the POJO is a generic type. Specifically, does the presence of a generic field cause the entire POJO to fall back to Kryo serialization, or will only that specific generic field fall back to Kryo serialization while the rest of the POJO continues to use PojoSerializer
?
Any insights would be greatly appreciated!D. Draco O'Brien
08/24/2024, 9:40 AMD. Draco O'Brien
08/24/2024, 9:41 AMD. Draco O'Brien
08/24/2024, 9:48 AMimport org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeHint; TypeInformation.of(new TypeHint<MyPojo<String>>(){ }));
D. Draco O'Brien
08/24/2024, 9:49 AMD. Draco O'Brien
08/24/2024, 9:51 AMKen Krugler
08/24/2024, 11:21 PMpublic static class TestClassGeneric<T> {
private String name;
private T data;
public TestClassGeneric() {}
public TestClassGeneric(String name, T data) {
this.name = name;
this.data = data;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
D. Draco O'Brien
08/25/2024, 9:24 AMD. Draco O'Brien
08/25/2024, 9:25 AMSwaraj
08/25/2024, 11:27 AMSwaraj
08/25/2024, 11:32 AM1)assertSerializedAsPojo - passed.
2)assertSerializedAsPojoWithoutKryo - Failed with Exception.java.lang.AssertionError: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.types.PojoTestUtils.assertSerializedAsPojoWithoutKryo(PojoTestUtils.java:79)
Does this mean , some fields will be serialized with POJO serializer and some field will be kryo serialized???.
(FYI we use flink 1.17)D. Draco O'Brien
08/25/2024, 12:25 PMD. Draco O'Brien
08/25/2024, 12:26 PMD. Draco O'Brien
08/25/2024, 12:29 PMD. Draco O'Brien
08/25/2024, 12:31 PMfinal TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
D. Draco O'Brien
08/25/2024, 12:32 PMKen Krugler
08/25/2024, 5:09 PMKen Krugler
08/25/2024, 5:26 PM@TypeInfo
annotation on the List
field in the record to avoid falling back to Kryo. See https://stackoverflow.com/questions/78046867/listt-member-of-pojo-flink-type for a good description of this.
But note that it doesn’t work if the list has a generic parameter. You could probably create a custom Kryo or Flink serializer for better performance, but that’s non-trivial. So I’d suggest using a concrete class.Swaraj
08/26/2024, 5:43 AMKen Krugler
08/26/2024, 4:11 PM