I want to mesaure the throughput and record in so ...
# troubleshooting
a
I want to mesaure the throughput and record in so I added this code , recordCounter measures correctly but the throughput is always zero
Copy code
DataStream<Schema> processedStream = stream.process(new ProcessFunction<Schema, Schema>() {
    private transient Counter recordCounter;
    private transient Meter meter;
    private transient MetricGroup metricGroup;

    @Override
    public void open(Configuration parameters) {
        this.metricGroup= getRuntimeContext().getMetricGroup();
        recordCounter = metricGroup.counter("recordCounter");
        meter = metricGroup.meter("Throughput",new MeterView(1));
    }

    @Override
    public void processElement(Schema value, Context ctx, Collector<Schema> out) {
        // Increment the counter and emit the record
        recordCounter.inc();
        meter.markEvent();
        out.collect(value);
        System.out.println(value);
    }
}).setParallelism(sourceParallelism);
d
When you create a MeterView instance with new MeterView(1), the 1 represents the rate at which the meter should be decremented when no events occur.
A common practice is just to use the meter without specifying the decay rate and let Flink manage this internally.
It’s possible the decay rate is causing this. There might be other issues in the configuration as well but I would try leaving out the decay rate.
a
We can’t leave it empty
We should type counter or time
Any ideas ?
l
did you solve it? I just follow example and it work: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#meter