Ryan Culbertson
09/19/2022, 9:14 PMXiaosheng Wu
09/19/2022, 9:55 PMPouria Modaresi
09/19/2022, 11:28 PMpok liu
09/20/2022, 7:22 AMSocketWindowWordCount.jar
. When I try to savepoint, the program throws an exception. Here is the log:
pokliu@pokliu-ubuntu:/usr/local/flink-1.15.1$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname 10.10.20.98 --port 9999
Job has been submitted with JobID 90d81a8e8893720c87c1dc12ad4607df
^Cpokliu@pokliu-ubuntu:/usr/local/flink-1.15.1./bin/flink stop --savepointPath /tmp/flink-test/ 90d81a8e8893720c87c1dc12ad4607df
Suspending job "90d81a8e8893720c87c1dc12ad4607df" with a CANONICAL savepoint.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.util.FlinkException: Could not stop with a savepoint job "90d81a8e8893720c87c1dc12ad4607df".
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:588)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1093)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:586)
... 6 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:545)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2070)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: Socket Stream -> Flat Map (1/1)#0 Failure reason: Task has failed.
at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
Caused by: java.util.concurrent.CompletionException: java.net.SocketException: Socket closed
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 3 more
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
Tong Yue
09/20/2022, 9:02 AMPrzemek
09/20/2022, 10:56 AMPedro Mázala
09/20/2022, 11:52 AMdefault/submittedJobGraph$HASH
. This file contains the job graph and its parameters.
When I want to reload the parameters I’m having to clear files under this directory. Is there a way of storing the submitted job graph and yet reading the parameters on each restart?Satya
09/20/2022, 11:47 PMRommel
09/20/2022, 11:48 PMCould not resolve ResourceManager address <akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*>.
Echo Lee
09/21/2022, 2:12 AM// TemporalRowTimeJoinOperator
private long emitResultAndCleanUpState(long currentWatermark) throws Exception {
final long previousWatermark = Long.MIN_VALUE;
List<RowData> rightRowsSorted =
getRightRowSortedBetween(rightRowtimeComparator, previousWatermark, currentWatermark);
Iterator<TemporalValue<List<RowData>>> leftIterator = tLeftState.readRange(previousWatermark, currentWatermark)
.iterator();
List<RowData> orderedLeftRecords = new ArrayList<>();
while (leftIterator.hasNext()) {
TemporalValue<List<RowData>> entry = leftIterator.next();
List<RowData> leftRow = entry.getValue();
orderedLeftRecords.addAll(leftRow);
leftIterator.remove();
}
orderedLeftRecords.forEach(leftRow -> {
long leftTime = getLeftTime(leftRow);
Optional<RowData> rightRow = latestRightRowToJoin(rightRowsSorted, leftTime);
if (rightRow.isPresent() && RowDataUtil.isAccumulateMsg(rightRow.get())) {
if (joinCondition.apply(leftRow, rightRow.get())) {
collectJoinedRow(leftRow, rightRow.get());
} else {
if (isLeftOuterJoin) {
collectJoinedRow(leftRow, rightNullRow);
}
}
} else {
if (isLeftOuterJoin) {
collectJoinedRow(leftRow, rightNullRow);
}
}
});
orderedLeftRecords.clear();
TemporalValue<Iterable<RowData>> lastUnprocessed = tLeftState.getAtOrAfter(currentWatermark + 1);
long lastUnprocessedTime = lastUnprocessed == null ? Long.MAX_VALUE : lastUnprocessed.getTimestamp();
cleanupExpiredVersionInState(currentWatermark, rightRowsSorted);
return lastUnprocessedTime;
}
private List<RowData> getRightRowSortedBetween(RowtimeComparator rightRowtimeComparator,
long beginTimestampInclusive, long endTimestampInclusive) throws IOException {
List<RowData> rightRows = new ArrayList<>();
for (TemporalValue<RowData> rightRow : tRightState
.readRange(beginTimestampInclusive, endTimestampInclusive)) {
rightRows.add(rightRow.getValue());
}
rightRows.sort(rightRowtimeComparator);
return rightRows;
}
Balazs Varga
09/21/2022, 7:48 AMCREATE TABLE test_unpartitioned (
`name` VARCHAR(2147483647),
`eventTimestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimestamp` AS `eventTimestamp` - INTERVAL '3' SECOND
) WITH (...)
INSERT INTO test_sink SELECT
`name`,
TUMBLE_START(eventTimestamp, INTERVAL '10' SECOND) AS wStart,
COUNT(*) FROM test_unpartitioned
GROUP BY
TUMBLE(eventTimestamp, INTERVAL '10' SECOND),
`name`
I’ve tried running the job with different combinations of parallelism
and input topic partitions
.
When
• parallelism = 1, input topic has 1 partition: the job works
• parallelism > 1, input topic has 1 partition: no results are produced
• parallelism = 2, input topic has 2 partitions: the job works
• when using proctime instead of event time, the p>1, with 1 partition combination also works
Please help me understand why the second case doesn’t produce results.
My theory is that each partition from the input kafka topic gets assigned to one of the subtasks of the source operator. Thus when p>1 but there is just 1 partition, only 1 subtask will read messages, only advancing the watermark there. Since the other instances do not get records, their watermarks don’t advance, so at the next operator (the window), it also cannot advance the watermark, because it depends on all of its predecessors. So the window never fires.
• Is this reasoning correct?
• Is this correct behavior from Flink? Seems a bit unusual, that by increasing the parallelism, we can break a job.
Thanks in advance.Olivier
09/21/2022, 8:01 AMAkila Wajirasena
09/21/2022, 10:32 AMjava.lang.IncompatibleClassChangeError: Class scala.collection.mutable.HashMap does not implement the requested interface scala.collection.mutable.SortedMap
2022-09-21 13:02:56,933 WARN org.apache.flink.runtime.taskmanager.Task [] - KeyedProcess -> Sink: Unnamed (1/1)#6 (ff016f5bbf176977d893cecef4c158bb)
switched from RUNNING to FAILED with failure cause: java.lang.IncompatibleClassChangeError: Class scala.collection.mutable.HashMap does not implement the requested interface scala.collection.mutable.SortedMap
at FlinkEventProcessor$Processor.$anonfun$processElement$2(FlinkEventProcessor.scala:1081)
at FlinkEventProcessor$Processor.processElement(FlinkEventProcessor.scala:1079)
at FlinkEventProcessor$Processor.processElement(FlinkEventProcessor.scala:941)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Sample code
case class State(key: String, var lastSequenceNo: Long) {
val outOfOrder: mutable.SortedMap[Long, KafkaMessage] = mutable.SortedMap.empty[Long, KafkaMessage]
}
class Processor()
extends KeyedProcessFunction[String, KafkaMessage, Row] {
private var valueState: ValueState[State] = _
override def open(parameters: Configuration): Unit = {
valueState = getRuntimeContext.getState(new ValueStateDescriptor("state", classOf[State]))
}
override def processElement(in: KafkaMessage,
ctx: KeyedProcessFunction[String, KafkaMessage, Row]#Context,
out: Collector[Row]): Unit = {
logger.debug("Processing out of order messages...")
while (state.outOfOrder.nonEmpty) {
logger.debug("Processing out of order message")
val min = state.outOfOrder.keys.head
state.outOfOrder.remove(min)
}
}
}
M Harsha
09/21/2022, 10:41 AMBreno Jacubovski
09/21/2022, 12:28 PMJirawech Siwawut
09/21/2022, 1:57 PMSELECT
window_start
,window_end
,count(1) as cnt
FROM
(
SELECT
,id
,window_end
,window_start
,ROW_NUMBER() OVER (PARTITION BY window_start, window_end, order_id ORDER BY event_time DESC) AS rownum
FROM TABLE(HOP(TABLE mytable, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '60' MINUTES))
)A
WHERE rownum = 1
GROUP BY window_start, window_end
The result is quite strange that Flink tried to aggregate the same window again
+----+---------------------+-------------------------+-------------------------+----------------------+
| op | window_start | window_end | current_timestamp | cnt |
+----+---------------------+-------------------------+-------------------------+----------------------+
| +I | 2022-09-21 12:44:00 | 2022-09-21 13:44:00 | 2022-09-21 13:44:13.010 | 265 |
| +I | 2022-09-21 12:45:00 | 2022-09-21 13:45:00 | 2022-09-21 13:45:15.207 | 360 |
| +I | 2022-09-21 12:46:00 | 2022-09-21 13:46:00 | 2022-09-21 13:46:12.659 | 1166 |
| +I | 2022-09-21 12:47:00 | 2022-09-21 13:47:00 | 2022-09-21 13:47:13.624 | 1594 |
| +I | 2022-09-21 12:44:00 | 2022-09-21 13:44:00 | 2022-09-21 13:48:15.907 | 120 |
| +I | 2022-09-21 12:45:00 | 2022-09-21 13:45:00 | 2022-09-21 13:49:07.305 | 348 |
Marcos Vinícius
09/21/2022, 3:36 PMJdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(0)
.withMaxRetries(0)
.build(),
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.withRecoveredAndRollback(true)
.build()
When I stop my job, there is a hanging statement (the locked one) and a hanging prepared transaction.
If I rollback that transaction the hanging statement finishes.
Is anyone using the JDBC exactly once sink with postgres and observed similar issues?
Flink 1.4
Postgres 14
Java 8
Latest JDBC driver
Windows
Running all locallyLukasz Krawiec
09/22/2022, 12:51 AMConfluentRegistryAvroDeserializationSchema.forGeneric
method requires org.apache.avro.Schema
to be passed in, which in my case is not possible as the actual record's schema will only be known at runtime when deserializing the object and retrieving the schema from SchemaRegistry.
Did anyone by any chance run into this use case & can suggest a workaround? Ideally I'd like this to work without having flink do kryo serialization.
Thanks for your timeSergio Sainz
09/22/2022, 2:03 AMKrish Narukulla
09/22/2022, 5:08 AMSebastian Stiernborg
09/22/2022, 7:08 AM1.2-SNAPSHOT
of the flink-kubernetes-operator
)?Dmitry Smirnov
09/22/2022, 7:45 AMjob.env
.readFile(
inputFormat = new TextInputFormat(new Path()),
filePath = job.s3Path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = job.s3ReadInterval.toMillis
)...
There's also another datastream:
val regularStream = job.env
.addSource(kafkaSource)...
Not sure if both datastreams can be used together in one App.. Any ideas how can this be done? ThanksTiansu Yu
09/22/2022, 2:06 PMException in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure.
I wonder why the getProducedType
method in the UserDeserializationSchema
not getting recognized by my custom source connector?Felix Angell
09/22/2022, 4:34 PMLeo Xiong
09/22/2022, 7:31 PMAdrian Chang
09/22/2022, 8:22 PMnumba
compatible with PyFlink ?
I am having this error
File "/usr/local/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1278, in register_global
if getattr(mod, val.__name__) is not val:
AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has no attribute 'print'
Hannah Hagen
09/22/2022, 11:02 PMpython my-flink-script.py
. However, I find that simple scripts take a painfully long time to run. For example, the following code snippet (taken from the docs) takes ~ 1 minute 4 seconds to run. 😢 (code snippet in thread 🧵 )
Couple questions:
• Is the long runtime due to emulating a different architecture? or due to running it in a docker container? or something else? if anyone else can share runtime on their machine that'd help me set my expectations..
• Would it be faster to submit it to a local Flink cluster (instead of running the python file directly)? I assumed not, but just want to double check.
• How can I set up a development environment where I can quickly write and run code during development? waiting a minute after writing a line of code is a bit painstaking 😅
thanks for any pointers!Sumit Nekar
09/23/2022, 8:09 AM"failed to create containerd task: OCI runtime create failed: container_linux.go:380: starting container process caused: exec: "/docker-entrypoint.sh": stat /docker-entrypoint.sh: no such file or directory: unknown"
Docker file for my image:
FROM private-repo/flink-base:1.13
RUN mkdir /opt/flink/usrlib
ADD my-application/target/my-application.jar /opt/flink/usrlib/my-application.jar
But flink base image (flink-base:1.13) has defined entrypoint like this
ENTRYPOINT ["/entry-point.sh"]
So I tried overriding using command in my FlinkDeployment file.
containers:
- name: flink-main-container
command: ["/entry-point.sh"]
I still see the same error. Am I missing something?Adesh Dsilva
09/23/2022, 11:09 AM.inc()
Currently we have to create metric counters in open() and if we have 10-15 groups with different combinations then we end up creating 50-60 metric counter objects in open().
This is not very ideal even if we create some dynamic method to create all the counters in open() in some map.
Clients like datadog provide a convenient method to pass tags when you call inc()
. Is it possible to add this in flink as well?Slackbot
09/23/2022, 1:37 PM