Aly Ayman
08/18/2024, 9:10 AMDataStream<Schema> processedStream = stream.process(new ProcessFunction<Schema, Schema>() {
private transient Counter recordCounter;
private transient Meter meter;
private transient MetricGroup metricGroup;
@Override
public void open(Configuration parameters) {
this.metricGroup= getRuntimeContext().getMetricGroup();
recordCounter = metricGroup.counter("recordCounter");
meter = metricGroup.meter("Throughput",new MeterView(1));
}
@Override
public void processElement(Schema value, Context ctx, Collector<Schema> out) {
// Increment the counter and emit the record
recordCounter.inc();
meter.markEvent();
out.collect(value);
System.out.println(value);
}
}).setParallelism(sourceParallelism);
D. Draco O'Brien
08/18/2024, 10:08 AMD. Draco O'Brien
08/18/2024, 10:09 AMD. Draco O'Brien
08/18/2024, 10:10 AMAly Ayman
08/18/2024, 12:12 PMAly Ayman
08/18/2024, 12:13 PMAly Ayman
08/19/2024, 1:12 AML P V
09/24/2024, 5:38 PM