Sandesh N Mendan
04/18/2024, 6:41 AMAntti Ahonen
04/18/2024, 10:24 AMShiv Desai
04/19/2024, 4:51 AMjava.lang.NoSuchMethodError: 'com.google.common.base.Ticker io.confluent.kafka.serializers.KafkaAvroSerializer.ticker(io.confluent.kafka.schemaregistry.client.SchemaRegistryClient)'
Shan P S
04/20/2024, 12:19 AMIf your application is not healthy, its Apache Flink job continually fails and restarts.However, the app is not restarting frequently, just once a day (not at the same time). What are the triggers for this? Is there any metric or error that can lead up to this? https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-rt-restarts.html#troubleshooting-rt-restarts-causes
Shan P S
04/20/2024, 3:57 PMShan P S
04/20/2024, 3:59 PMMikhil Mudiraj
04/22/2024, 10:16 PMShan P S
04/23/2024, 3:11 AMJanine Weber
04/23/2024, 8:02 AMSuri Veernapu
04/23/2024, 2:35 PMwindow_time
after joining the streams but still throws the same error org.apache.flink.table.api.ValidationException: The window function requires the timecol is a time attribute type, but is TIMESTAMP(3)
however when i checked the datatype it says TIMESTAMP(3) *ROWTIME*
.
Please let me know if i'm missing anything. Thanks in advance.Trystan
04/23/2024, 2:54 PMpipeline.auto-generate-uids
- the config seems like a great safeguard to apply, but sql jobs do not and cannot have uids. yet this config seems to apply to those as well. so this config and sql jobs seem incompatible.
why is that? is it extraordinarily difficult to check if a job is sql and to ignore that setting in the base flink code for sql jobs?David Bryson
04/23/2024, 5:19 PMStreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node ChangelogNormalize
I really struggle to understand what Flink is trying to tell me. Where can I read about how to proceed? Am I fundamentally missing something about streaming queries?
I want to enrich incoming data with a join to an existing table, I arrange the table data with a simple query for the latest record. When I do this the StreamPhysicalOverAggregate error appears. I am extremely confused about what I would expect is a simple query in flink: Enrich incoming data with the most recent record from a second data source. I'm puzzled as to where to go next.Max Schmidt
04/23/2024, 7:01 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: flink-playground-session-job
spec:
flinkConfiguration:
kubernetes.operator.user.artifacts.http.header: "Authorization Bearer: <value from secret or env variable>"
job:
jarURI: <https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar>
Here I want the value of kubernetes.operator.user.artifacts.http.header
to be read from a Kubernetes Secret. Is this somehow possible?David Cunningham
04/23/2024, 8:54 PMSrivatsav Gorti
04/24/2024, 5:54 AMSrivatsav Gorti
04/24/2024, 5:56 AMZabeer
04/24/2024, 8:22 AMZabeer
04/24/2024, 9:49 AMChetan Patel
04/24/2024, 9:51 AMAntti Ahonen
04/24/2024, 11:21 AMstatic <T> AvroFactory<T> create(Class<T> type, @Nullable Schema currentSchema, @Nullable Schema previousSchema) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(type)) {
return fromSpecific(type, cl, Optional.ofNullable(previousSchema));
} else {
return GenericRecord.class.isAssignableFrom(type) ? fromGeneric(cl, currentSchema) : fromReflective(type, cl, Optional.ofNullable(previousSchema));
}
}
Because my own POJO implements interface org.apache.avro.specific.SpecificRecord that is loaded from that blobStorage file, and the check SpecificRecord.class.isAssignableFrom(type) has the original SpecificRecord from the supplied job jar, it doesn't know how to treat my avro record as a specific record, instead it tried to reflective get the schema, which goes wrong.
Any idea how to resolve this issue?Adas Kavaliauskas
04/24/2024, 1:20 PMAdas Kavaliauskas
04/24/2024, 1:34 PMt_env.execute_sql(
f"""
CREATE TABLE configuration (
domain_name VARCHAR,
enabled BOOLEAN,
country VARCHAR,
created_at TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://',
'table-name' = 'configuration',
'username' = 'user',
'password' = 'pass',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '5000',
'lookup.partial-cache.expire-after-write' = '5m',
'lookup.max-retries' = '3'
);
"""
)
This is my table statement, I am using it to join with data in kafka, topic. At the initial start of the script in the table was 49 records, after 15 minutes there is 53 records, I thought this configuration retrieve and cache data from my table every 5 minutes? However, new data is not getting polled from the table, what might be wrong with my config?Adas Kavaliauskas
04/24/2024, 1:37 PMNikos Stais
04/24/2024, 6:02 PMRalph Wadner Blaise
04/24/2024, 7:07 PMMessage: Operation cannot be fulfilled on configmaps "flink-v-119-cluster-config-map": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409
Saketh
04/25/2024, 6:43 AMD. Draco O'Brien
04/25/2024, 7:15 AMPedro Mázala
04/25/2024, 1:16 PMzehong yin
04/25/2024, 7:27 PMCaused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kafka
kinesis
print
python-input-format
upsert-kafka
I didn’t find any error when flink loading that jar lib. How could I identify what’s going on? what should I do to fix it?
[INFO] Dependency-reduced POM written at: /Users/hiro/Project/Flink/flink-connector-elasticsearch/flink-sql-connector-elasticsearch7/target/dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Flink : Connectors : Elasticsearch : Parent 4.0-SNAPSHOT:
[INFO]
[INFO] Flink : Connectors : Elasticsearch : Parent ........ SUCCESS [ 9.504 s]
[INFO] Flink : Connectors : Elasticsearch base ............ SUCCESS [ 23.606 s]
[INFO] Flink : Connectors : Elasticsearch 6 ............... SUCCESS [ 12.928 s]
[INFO] Flink : Connectors : Elasticsearch 7 ............... SUCCESS [ 10.807 s]
[INFO] Flink : Connectors : Elasticsearch 8 ............... SUCCESS [ 3.988 s]
[INFO] Flink : Connectors : Elasticsearch : E2E Tests ..... SUCCESS [ 0.411 s]
[INFO] Flink : Connectors : Elasticsearch : E2E tests : Common SUCCESS [ 1.889 s]
[INFO] Flink : Connectors : Elasticsearch 6 : E2E tests ... SUCCESS [ 3.984 s]
[INFO] Flink : Connectors : Elasticsearch 7 : E2E Tests ... SUCCESS [ 4.337 s]
[INFO] Flink : Connectors : SQL : Elasticsearch 6 ......... SUCCESS [ 7.593 s]
[INFO] Flink : Connectors : SQL : Elasticsearch 7 ......... SUCCESS [ 7.100 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:26 min
[INFO] Finished at: 2024-04-25T12:15:24-07:00
[INFO] ------------------------------------------------------------------------
Maxim Senin
04/26/2024, 8:38 PMUnsupportedOperationException
?
The job starts, operator collects some stats and then the job dies, apparently on rescaling op:
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info | JOBSTATUSCHANGED | Job status changed from CREATED to RUNNING
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Status | Info | STABLE | The resource deployment is considered to be stable and won't be rolled back
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully reconciled, nothing to do...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until 2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Creating config map autoscaler-f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value 1024.000mb (1073741824 bytes), max value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until 2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully reconciled, nothing to do...
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job f5179d479dc1921693ffeb3797345458 failed with error: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180)
at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202)
at java.base/java.util.stream.IntPipeline$1$1.accept(Unknown Source)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
at org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:139)
at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198)
at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:410)
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:440)
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:206)
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:146)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1822)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1742)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:210)
at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:1239)
at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$27(AdaptiveScheduler.java:1229)
at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
... 5 more
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info | JOBSTATUSCHANGED | Job status changed from RUNNING to FAILED
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value 1024.000mb (1073741824 bytes), max value will be used instead
INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Status | Error | FAILED | {"type":"org.apache.flink.util.SerializedThrowable","message":"java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: ...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully reconciled, nothing to do...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up FlinkDeployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event | Info | CLEANUP | Cleaning up FlinkDeployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value 1024.000mb (1073741824 bytes), max value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Cleaning up autoscaling meta data
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with Foreground propagation
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager Deployment to zero with 300 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Scaling JobManager Deployment to zero
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting JobManager Deployment with 298 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Deleting JobManager Deployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting Kubernetes HA metadata