Aly Ayman
08/19/2024, 8:45 AMpackage Sink;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import schema.Schema;
public class ThroughputMeteredSinkFunction<Schema> extends RichSinkFunction<Schema> {
private final RichSinkFunction<Schema> wrappedSinkFunction;
private transient Meter throughputMeter;
private transient Counter counter;
public ThroughputMeteredSinkFunction(RichSinkFunction<Schema> wrappedSinkFunction) {
this.wrappedSinkFunction = wrappedSinkFunction;
}
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext context = getRuntimeContext();
counter = context.getMetricGroup().counter("Counter");
throughputMeter = context.getMetricGroup().meter("throughputMeter", new MeterView(counter));
wrappedSinkFunction.open(parameters); // Note: Only valid for RichSinkFunction
}
@Override
public void invoke(Schema value, RichSinkFunction.Context context) throws Exception {
counter.inc();
throughputMeter.markEvent(); // Custom behavior: record throughput
wrappedSinkFunction.invoke(value,context);
// Delegate to the original sink function
}
@Override
public void close() throws Exception {
wrappedSinkFunction.close(); // Note: Only valid for RichSinkFunction
}
}
Error 2024-08-19 10:29:38 java.lang.NullPointerException at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) at Sink.ThroughputMeteredSinkFunction.invoke(ThroughputMeteredSinkFunction.java:34) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.orangeFlinkDS.Main$1.processElement(Main.java:78) at org.orangeFlinkDS.Main$1.processElement(Main.java:60) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)