Hi, I'm looking for example of using `RichAsyncFun...
# troubleshooting
m
Hi, I'm looking for example of using
RichAsyncFunction
with retry strategy. 1. I tried using
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
but couldn't make it work. I tried directly throwing exception and using
ResultFuture.completeExceptionally
both just failed the application 2. How can I send failed events to some DLQ? I couldn't find a way to output failed events like an OutputTag. there is timeout but it doesn't seem to use it for retries exhaustion
d
The RichAsyncFunction in Apache Flink does not support a retry mechanism with backoff or sending failed events to a dead-letter queue (DLQ) out of the box. However, you can implement these features manually within your function by utilizing the ResultFuture for asynchronous processing and managing exceptions accordingly.
here is snippet showing how this might be done
Copy code
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncRetryWithDLQExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = env.fromElements("data1", "data2", "data3"); // Your data source

        DataStream<String> processed = input
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) {
                        return new Tuple2<>(value, 0); // Second value is for retry count
                    }
                })
                .flatMap(new RetryAsyncFunction(MAX_RETRIES));

        processed.print(); // Or add your sink here

        env.execute("Async Retry with DLQ Example");
    }

    private static final int MAX_RETRIES = 3;

    public static class RetryAsyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, String> {

        private transient ValueState<Integer> retryCountState;
        private transient ExecutorService executor;

        public RetryAsyncFunction(int maxRetries) {
            this.executor = Executors.newFixedThreadPool(4);
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            retryCountState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("retry-count", TypeInformation.of(Integer.class)));
        }

        @Override
        public void close() throws Exception {
            super.close();
            executor.shutdown();
        }

        @Override
        public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<String> resultFuture) throws Exception {
            final int currentRetryCount = input.f1 + 1;
            retryCountState.update(currentRetryCount);

            CompletableFuture.supplyAsync(() -> {
                try {
                    // Simulate an async operation that may fail
                    if (input.f0.equals("data2") && currentRetryCount <= MAX_RETRIES) {
                        throw new RuntimeException("Simulated failure for demo");
                    }
                    return "Processed " + input.f0;
                } catch (Exception e) {
                    ExceptionUtils.rethrow(e);
                    return null; // This line won't be reached due to rethrow
                }
            }, executor).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    // All retries failed, simulate DLQ logic
                    System.err.println("Sending to DLQ: " + input.f0 + ", Reason: " + throwable.getMessage());
                    resultFuture.completeExceptionally(throwable);
                } else {
                    resultFuture.complete(Collections.singleton(result));
                }
            });
        }
    }
}
The RetryAsyncFunction includes the retry logic and simulates sending failed items to a DLQ by printing an error message.
m
But it's not true, the documentation does show there is a built in retry mechanism
d
didi you state what version of Flink you are using?
there is built in retry mechanism but .. its at failed tasks and checkpoints level not at the level of being integrated with RichAsyncFunction
m
1.19 If it's indeed just at the failed task level then it won't help me, I'll just use failsafe library