Jotham Wong
08/15/2024, 5:21 AMD. Draco O'Brien
08/15/2024, 6:55 AMD. Draco O'Brien
08/15/2024, 6:56 AMD. Draco O'Brien
08/15/2024, 7:11 AMJotham Wong
08/15/2024, 8:27 AMJotham Wong
08/15/2024, 8:27 AMJotham Wong
08/15/2024, 8:28 AMJotham Wong
08/15/2024, 8:28 AMJotham Wong
08/15/2024, 8:29 AMD. Draco O'Brien
08/15/2024, 12:06 PMD. Draco O'Brien
08/15/2024, 12:14 PMimport org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
public class AnomalyDetectingSink extends RichSinkFunction<YourDataEventType> implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectingSink.class);
private Set<String> seenIds = new HashSet<>(); // Assuming each event has a unique ID
@Override
public void invoke(YourDataEventType value, Context context) throws Exception {
String eventId = value.getUniqueId(); // Replace with actual method to retrieve unique identifier
// detect duplicate
if (seenIds.contains(eventId)) {
LOG.warn("Detected duplicate event: {}", eventId);
} else {
seenIds.add(eventId);
// log or process the event as usual
<http://LOG.info|LOG.info>("Received: {}", value);
// write to storage if necessary
}
// gap detection logic, if applicable based on sequence/time
}
// implementing CheckpointedFunction
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Get the list state
ListState<String> seenIdsState = getRuntimeContext().getCheckpointedState("seenIds",
new ListStateDescriptor<>("seenIds", String.class));
// clear and add all elements to the state
seenIdsState.clear();
for (String id : seenIds) {
seenIdsState.add(id);
}
}
// restoring state
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Retrieve the list state descriptor
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("seenIds", String.class);
// get the union state (handles both restore from savepoint and checkpoint)
ListState<String> seenIdsState = context.getOperatorStateStore().getUnionListState(descriptor);
if (context.isRestored()) {
// Restore the state if the job is being restored from a savepoint or checkpoint
for (String id : seenIdsState.get()) {
seenIds.add(id);
}
}
}
}
D. Draco O'Brien
08/15/2024, 12:14 PMJotham Wong
08/16/2024, 2:14 AMD. Draco O'Brien
08/16/2024, 2:46 PMD. Draco O'Brien
08/16/2024, 2:49 PMimport org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
D. Draco O'Brien
08/16/2024, 2:54 PM// implementing CheckpointedFunction
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Retrieve the list state descriptor
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("seenIds", String.class);
// Get the list state and update it with current seenIds
ListState<String> seenIdsState = getRuntimeContext().getListState(descriptor);
seenIdsState.clear();
for (String id : seenIds) {
seenIdsState.add(id);
}
}