David Vittori
08/24/2024, 3:50 PMpublic BaseEvent deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
JsonNode jsonNode = objectMapper.readTree(recordValue);
// Determine the event type from the JSON and deserialize accordingly
if (!jsonNode.has("key")) {return null;}
String key = jsonNode.get("key").asText();
try {
switch (partitionKey) {
case EventConstants.KEY_EVENT_TRIP_ENDED:
return objectMapper.treeToValue(jsonNode, TripEndedEvent.class);
case EventConstants.KEY_EVENT_TRIP_REPORTED:
return objectMapper.treeToValue(jsonNode, TripReportedEvent.class);
// Add more cases for other event types
default:
return null;
}
} catch (Exception e) {
Log.error("Failed to deserialize event", e);
return null;
}
}
However, I'm running into a ClassCastException
java.lang.RuntimeException: Could not extract key from com.codistica.services.etl.events.TripReportedEvent@6169c6c5
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
at <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:101)
when I try to filter the stream by event type. It seems that Flink encounters an issue when it tries to extract the key, possibly due to how I've set up the deserialization or the filtering process.
return eventsStream
.filter(event -> event instanceof TripEndedEvent)
.map(event -> (TripEndedEvent) event)
.returns(TypeInformation.of(TripEndedEvent.class));
My question is: Is this the best approach for handling multiple event types in Flink, or is there a better pattern or configuration I should follow?
Any insights or suggestions would be greatly appreciated!D. Draco O'Brien
08/25/2024, 9:47 AMpublic interface BaseEvent {}
public class TripEndedEvent implements BaseEvent {...}
public class TripReportedEvent implements BaseEvent {...}
public class EventWrapper {
private final BaseEvent event;
public EventWrapper(BaseEvent event) {
this.event = event;
}
public BaseEvent getEvent() {
return event;
}
// Optionally, you can add methods to check the type of event
public boolean isTripEndedEvent() {
return event instanceof TripEndedEvent;
}
// And so on...
}
Then, modify your deserializer to output EventWrapper instances:
public EventWrapper deserialize(...){
...
switch (partitionKey) {
case EventConstants.KEY_EVENT_TRIP_ENDED:
return new EventWrapper(objectMapper.treeToValue(jsonNode, TripEndedEvent.class));
case EventConstants.KEY_EVENT_TRIP_REPORTED:
return new EventWrapper(objectMapper.treeToValue(jsonNode, TripReportedEvent.class));
// Other cases
default:
return null;
}
}
In your Flink pipeline, instead of filtering and casting directly, you can use map operations along with the wrapper’s methods to handle different event types:
DataStream<EventWrapper> eventsWrapped = ... // your deserialized stream as EventWrappers
DataStream<TripEndedEvent> tripEndedEvents =
eventsWrapped
.filter(wrapper -> wrapper.isTripEndedEvent())
.map(wrapper -> wrapper.getEvent())
.returns(TypeInformation.of(TripEndedEvent.class));
// similarly, handle other event types
you get type safety and avoid ClassCastException, plus it scales well with additional event types. just remember to properly configure your serializers/deserializers to handle the EventWrapperDavid Vittori
08/28/2024, 4:47 PM