Matan Perelmuter
09/12/2024, 6:13 AMRichAsyncFunction
with retry strategy.
1. I tried using .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
but couldn't make it work. I tried directly throwing exception and using ResultFuture.completeExceptionally
both just failed the application
2. How can I send failed events to some DLQ? I couldn't find a way to output failed events like an OutputTag. there is timeout but it doesn't seem to use it for retries exhaustionD. Draco O'Brien
09/12/2024, 5:23 PMD. Draco O'Brien
09/12/2024, 5:24 PMimport org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncRetryWithDLQExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("data1", "data2", "data3"); // Your data source
DataStream<String> processed = input
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) {
return new Tuple2<>(value, 0); // Second value is for retry count
}
})
.flatMap(new RetryAsyncFunction(MAX_RETRIES));
processed.print(); // Or add your sink here
env.execute("Async Retry with DLQ Example");
}
private static final int MAX_RETRIES = 3;
public static class RetryAsyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, String> {
private transient ValueState<Integer> retryCountState;
private transient ExecutorService executor;
public RetryAsyncFunction(int maxRetries) {
this.executor = Executors.newFixedThreadPool(4);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
retryCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("retry-count", TypeInformation.of(Integer.class)));
}
@Override
public void close() throws Exception {
super.close();
executor.shutdown();
}
@Override
public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception {
final int currentRetryCount = input.f1 + 1;
retryCountState.update(currentRetryCount);
CompletableFuture.supplyAsync(() -> {
try {
// Simulate an async operation that may fail
if (input.f0.equals("data2") && currentRetryCount <= MAX_RETRIES) {
throw new RuntimeException("Simulated failure for demo");
}
return "Processed " + input.f0;
} catch (Exception e) {
ExceptionUtils.rethrow(e);
return null; // This line won't be reached due to rethrow
}
}, executor).whenComplete((result, throwable) -> {
if (throwable != null) {
// All retries failed, simulate DLQ logic
System.err.println("Sending to DLQ: " + input.f0 + ", Reason: " + throwable.getMessage());
resultFuture.completeExceptionally(throwable);
} else {
resultFuture.complete(Collections.singleton(result));
}
});
}
}
}
D. Draco O'Brien
09/12/2024, 5:25 PMMatan Perelmuter
09/12/2024, 5:26 PMD. Draco O'Brien
09/12/2024, 5:27 PMD. Draco O'Brien
09/12/2024, 5:29 PMMatan Perelmuter
09/12/2024, 8:55 PM