Hi All, how do I use `CheckpointListener` to liste...
# troubleshooting
g
Hi All, how do I use
CheckpointListener
to listen to completed checkpoints? I'm unable to register this with my stream environment. Can someone please help here? Below is my listener code:
Copy code
public class CheckpointCoordinator implements CheckpointListener {
  @Override
  public void notifyCheckpointComplete(long checkpointId) throws Exception {
    <http://log.info|log.info>("Checkpoint {} completed successfully.", checkpointId);
    // add custom logic here
  }

  @Override
  public void notifyCheckpointAborted(long checkpointId) throws Exception {
    CheckpointListener.super.notifyCheckpointAborted(checkpointId);
  }
}
How do I register this?
s
AFAIK you can’t just add it to a random class, it has to be one of the Flink operators, e.g.
RichSinkFunction
.
g
I tried this - I get this error when trying to submit the application:
ERROR!!! Coordinator Provider for node Source: ....  is not serializable.
I added it to a
KeyedProcessFunction
Copy code
public class MultiModeProcessFilter extends KeyedProcessFunction<String, VLFRecord, FilteredRecord> implements CheckpointListener {
...

  @Override
  public void notifyCheckpointComplete(long checkpointId) throws Exception {
    <http://log.info|log.info>("**checkpoint completed ** {}", checkpointId);
    // custom logic handler
  }
...
}
s
What’s the full exception that you see after that?
g
Sorry about that - it was something internal. Nothing to do with adding the listener.