Hi everyone, We are currently using a POJO for sto...
# troubleshooting
s
Hi everyone, We are currently using a POJO for storing data in Flink state. I understand that Flink utilizes the
PojoSerializer
for serializing POJOs. However, I have a question regarding how Flink handles serialization when one of the fields in the POJO is a generic type. Specifically, does the presence of a generic field cause the entire POJO to fall back to Kryo serialization, or will only that specific generic field fall back to Kryo serialization while the rest of the POJO continues to use
PojoSerializer
? Any insights would be greatly appreciated!
d
Since Flink 1.12 TypeInformationRawSerializer was added which improved the use of TypeInfo and and Type Hints. You want to use these to reduce the likelihood of fallback to Kyro. A generic field could potentially cause Flink to default to Kryo for the entire object, but providing explicit type information can help maintain efficient serialization for the rest of the POJO using PojoSerializer.
Flink 1.14+ improved GenericTypeInfo and other serializers to make native serialization more flexible.
Copy code
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeHint; TypeInformation.of(new TypeHint<MyPojo<String>>(){ }));
With this kind of guidance you can reduce the likelihood of Flink falling back to Kyro.
If that does not work you can also consider implementing a custom serializer for specific fields or the entire POJO that handles the generic part correctly. So in summary use TypeInformation and TypeHint first and if that does not give you what you need you are looking at a custom serializer.
k
Hi @Swaraj - If you’re on a recent version of Flink (1.16 or later) then you can use PojoTestUtils to verify that your POJO will be serialized without using Kryo (via the assertSerializedAsPojoWithoutKryo method). I’m pretty sure that if your class has a field that’s a generic, you’ll wind up falling back to Kryo. I tried with this, and it failed the test:
Copy code
public static class TestClassGeneric<T> {
        private String name;
        private T data;

        public TestClassGeneric() {}
        
        public TestClassGeneric(String name, T data) {
            this.name = name;
            this.data = data;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }
    }
👍 1
d
That’s a good find. Does the PojoTestUtils react to TypeHint and TypeInformation on a generic?
Is there anyway to pass it the TypeHint or TypeInformation ?
s
Thanks @Ken Krugler @D. Draco O'Brien.
I tested it with my pojo ( Ran 2 asserts, from pojoTestUtils)
Copy code
1)assertSerializedAsPojo - passed.
2)assertSerializedAsPojoWithoutKryo - Failed with Exception.java.lang.AssertionError: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
	at org.apache.flink.types.PojoTestUtils.assertSerializedAsPojoWithoutKryo(PojoTestUtils.java:79)
Does this mean , some fields will be serialized with POJO serializer and some field will be kryo serialized???. (FYI we use flink 1.17)
d
In the absence of providing TypeHint and TypeInformation probably yes
What would be a good improvement to that library would be to allow TypeHint and TypeInfo to be passed along and respond accordingly.
Here is a detailed description of how TypeInfo and TypeHinting are applied: https://www.alibabacloud.com/blog/596626
Once again
Copy code
final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
You use TypeInformation for this special case. I believe the library test class is only going to tell you if the class without Hint or Type information can be serialized.
k
@Swaraj - the test failure means that the entire record will be serialized using Kryo.
@D. Draco O'Brien - You can use the
@TypeInfo
annotation on the
List
field in the record to avoid falling back to Kryo. See https://stackoverflow.com/questions/78046867/listt-member-of-pojo-flink-type for a good description of this. But note that it doesn’t work if the list has a generic parameter. You could probably create a custom Kryo or Flink serializer for better performance, but that’s non-trivial. So I’d suggest using a concrete class.
s
@Ken Krugler, assertSerializedAsPojo test was passed. AS per doc, atleast some fields of pojo should be serialiazed without kryo.
k
Hi @Swaraj - yes, sorry…I was using an older copy of ProjoTestUtils (prior to 1.16 this wasn’t public) which I had modified for my own testing. You’ll wind up with a PojoSerializer that contains a mix of Kryo and Flink serializers (e.g. StringSerializer, IntSerializer, PojoSerializer).