Hi, if I wanted to write a program to intentionall...
# troubleshooting
j
Hi, if I wanted to write a program to intentionally fail some nodes to show missing data and data duplication, what is the best way to go about it?
d
How are you deploying your Flink Cluster? What nodes are are you planning to fail?
If you are talking about docker containers you could use a tool like https://github.com/alexei-led/pumba
j
Thanks, I got the container failure down
What I wanted to know, is which sink I could use, to easily show data duplication and missing data
With the filesink, the data does not get committed if I skip a checkpoint
For context: I am working on a research project to allow different nodes in the job graph to have different checkpointing guarantees
i am also deploying my apache flink cluster via java processes
d
I think you can do the detection using a ProcessFunction placed at the end or your processing pipeline. This should definitely work for gap detection. A sink would also work
Otherwise implement a RickSinkFunction something like this
Copy code
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

public class AnomalyDetectingSink extends RichSinkFunction<YourDataEventType> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectingSink.class);
    private Set<String> seenIds = new HashSet<>(); // Assuming each event has a unique ID

    @Override
    public void invoke(YourDataEventType value, Context context) throws Exception {
        String eventId = value.getUniqueId(); // Replace with actual method to retrieve unique identifier

        // detect duplicate
        if (seenIds.contains(eventId)) {
            LOG.warn("Detected duplicate event: {}", eventId);
        } else {
            seenIds.add(eventId);
            // log or process the event as usual
            <http://LOG.info|LOG.info>("Received: {}", value);
            // write to storage if necessary
        }
        
        // gap detection logic, if applicable based on sequence/time
    }
    
    // implementing CheckpointedFunction
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Get the list state
        ListState<String> seenIdsState = getRuntimeContext().getCheckpointedState("seenIds", 
            new ListStateDescriptor<>("seenIds", String.class));
        
        // clear and add all elements to the state
        seenIdsState.clear();
        for (String id : seenIds) {
            seenIdsState.add(id);
        }
    }
    
    // restoring state
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // Retrieve the list state descriptor
        ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("seenIds", String.class);
        
        // get the union state (handles both restore from savepoint and checkpoint)
        ListState<String> seenIdsState = context.getOperatorStateStore().getUnionListState(descriptor);
        
        if (context.isRestored()) {
            // Restore the state if the job is being restored from a savepoint or checkpoint
            for (String id : seenIdsState.get()) {
                seenIds.add(id);
            }
        }
    }
}
j
There is no such thing as getCheckpointedState it seems
d
getListState() should work
Copy code
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
And it can be used like this
Copy code
// implementing CheckpointedFunction
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // Retrieve the list state descriptor
    ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("seenIds", String.class);
    
    // Get the list state and update it with current seenIds
    ListState<String> seenIdsState = getRuntimeContext().getListState(descriptor);
    seenIdsState.clear();
    for (String id : seenIds) {
        seenIdsState.add(id);
    }
}