Vishva Mahadevan
07/25/2024, 4:08 AMD. Draco O'Brien
07/25/2024, 6:32 AMD. Draco O'Brien
07/25/2024, 6:32 AMVishva Mahadevan
07/25/2024, 8:11 AMD. Draco O'Brien
07/25/2024, 9:54 AMD. Draco O'Brien
07/25/2024, 9:59 AMD. Draco O'Brien
07/25/2024, 10:00 AMimport org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
public class MyRichMapFunction extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public Integer map(String value) throws Exception {
try {
// an operation that might throw an exception
int result = Integer.parseInt(value);
// process the value
return result * 2;
} catch (NumberFormatException e) {
// log the exception and decide how to handle it (e.g., return a default value)
System.out.println("Error parsing value: " + e.getMessage());
throw new RuntimeException("Failed to parse integer", e);
}
}
}
D. Draco O'Brien
07/25/2024, 10:01 AMimport org.apache.flink.api.common.functions.RecoverableFunction;
import org.apache.flink.util.Preconditions;
public class MyRecoveryFunction implements RecoverableFunction<String, Integer> {
@Override
public Integer recover(String input, Throwable failure) throws Exception {
// log the failure
System.out.println("Recovering from error for input: " + input);
// decide on a recovery action (e.g., return a default value or null)
return -1; // Default value in case of failure
}
@Override
public boolean isRecoverable(Throwable throwable) {
// decide whether the given exception is recoverable.
// in this example, we assume all exceptions are recoverable.
return true;
}
}
D. Draco O'Brien
07/25/2024, 10:02 AMimport org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExceptionHandlingExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.fromElements("1", "two", "3", "four");
// apply the RichMapFunction and assign a unique ID
DataStream<Integer> mapped = source
.map(new MyRichMapFunction())
.uid("myRichMapper") // Assigning a unique ID
.name("My Rich Mapper");
// define and apply the recovery strategy
DataStream<Integer> recovered = mapped.recover(new MyRecoveryFunction());
recovered.print().setParallelism(1); // Output the results
env.execute("Exception Handling Example");
}
}
D. Draco O'Brien
07/25/2024, 10:03 AMVishva Mahadevan
07/25/2024, 1:47 PM