I have a problem in flink I use a wrapper that ad...
# troubleshooting
a
I have a problem in flink I use a wrapper that add some functionalities in the streamingFileSink to catch the throughput as below and I added before process function just also for throughput mesures So flink app is Source -> Process Function --> Sink But the problem is that sink return sinkFunction and when I cast sinkFunction to RichSinkFunction to use it in wrapper it get null pointer exception .. I don't know why or how force streamingFileSink to return richSinkFunction instead of sinkFunction ... the error is below
Copy code
package Sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import schema.Schema;

public class ThroughputMeteredSinkFunction<Schema> extends RichSinkFunction<Schema> {

    private final RichSinkFunction<Schema> wrappedSinkFunction;
    private transient Meter throughputMeter;
    private transient Counter counter;

    public ThroughputMeteredSinkFunction(RichSinkFunction<Schema> wrappedSinkFunction) {
        this.wrappedSinkFunction = wrappedSinkFunction;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        RuntimeContext context = getRuntimeContext();
        counter = context.getMetricGroup().counter("Counter");
        throughputMeter = context.getMetricGroup().meter("throughputMeter", new MeterView(counter));
        wrappedSinkFunction.open(parameters); // Note: Only valid for RichSinkFunction
    }

    @Override
    public void invoke(Schema value, RichSinkFunction.Context context) throws Exception {
        counter.inc();
        throughputMeter.markEvent(); // Custom behavior: record throughput
        wrappedSinkFunction.invoke(value,context);
        // Delegate to the original sink function
    }

    @Override
    public void close() throws Exception {
        wrappedSinkFunction.close(); // Note: Only valid for RichSinkFunction
    }

}
Error
2024-08-19 10:29:38 java.lang.NullPointerException at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) at Sink.ThroughputMeteredSinkFunction.invoke(ThroughputMeteredSinkFunction.java:34) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.orangeFlinkDS.Main$1.processElement(Main.java:78) at org.orangeFlinkDS.Main$1.processElement(Main.java:60) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)