Tamir Sagi
04/24/2025, 7:24 AM"traffic.sidecar.istio.io/excludeOutboundPorts"
everything was working fine and very stable, yet once Istio was updated from 1.22.x to 1.24.x. the communication between TM and JM terminated with following warning
[Warning] [{pekkoAddress=<pekko.tcp://flink@10.233.24.57:6123>, pekkoSource=<pekko.tcp://flink@10.233.24.57:6123/system/endpointManager/reliableEndpointWriter-pekko.tcp%3A%2F%2Fflink%4010.233.90.141%3A6123-0>, pekkoTimestamp=10:56:31.006UTC, pekkoUid=6633221187301835071, sourceActorSystem=flink, sourceThread=flink-pekko.remote.default-remote-dispatcher-6}] [o.a.p.r.ReliableDeliverySupervisor]: Association with remote system [<pekko.tcp://flink@10.233.90.141:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<pekko.tcp://flink@10.233.90.141:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).
Once we restarted the Flink deployment the following warnings appeared
[Warning] [{}] [o.a.f.r.r.a.ActiveResourceManager]: Discard registration from TaskExecutor dev-p2a2-taskmanager-1-1 at (<pekko.tcp://flink@10.233.83.38:6122/user/rpc/taskmanager_0>) because the framework did not recognize it
then
[Warning] [{}] [o.a.f.r.d.StandaloneDispatcher]: Ignoring JobGraph submission 'dev-p2a2' (5443fa441f7b39f55a48528d5665f17a) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
we deleted the state folder on S3 it did not work. the cluster config map which holds the last checkpoint(managed by the k8s-operator) was not there either.
The only solution was to re-create the cluster (obviously we cannot do that in production ).
I read there were many breaking changes between Istio 1.22.x and 1.24.x, yet after recreating the cluster it worked fine.
any suggestions on why the cluster reached global terminate state and could not recover?
[1] https://doc.akka.io/libraries/akka-management/current/bootstrap/istio.html#istio
[2] https://pekko.apache.org/docs/pekko/1.0/project/migration-guides.html#migration-to-apache-pekkoBentzi Mor
04/24/2025, 3:07 PMException in thread "main" java.lang.NoSuchFieldError: Class org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator does not have member field 'org.apache.flink.streaming.api.operators.ChainingStrategy chainingStrategy'
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.<init>(SchemaOperator.java:114)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperatorFactory.<init>(SchemaOperatorFactory.java:52)
at org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator.addRegularSchemaOperator(SchemaOperatorTranslator.java:94)
at org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator.translateRegular(SchemaOperatorTranslator.java:62)
at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.translate(FlinkPipelineComposer.java:186)
at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:99)
at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)
at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:74)
rmoff
04/24/2025, 5:20 PMCURRENT_WATERMARK
in SQL isn't helping me.
I found the <http://localhost:8081/jobs/[…]/vertices/[…]/watermarks>
REST API but wondered if there was anything elseMike Ashley
04/24/2025, 8:34 PMמייקי בר יעקב
04/24/2025, 8:48 PMJeff McFarlane
04/24/2025, 8:51 PMupsert-kafka
table whose message value is split into two nested json sections. Similar to this example:
{
"header": {
"event_time": 1745499600000,
"source": "web-shop"
},
"main": {
"order_id": "o456",
"user_id": "u123",
"amount": 100.5
}
}
Is it possible to define a primary key for the upsert-kafka connector when your key is in nested json? The only way I have been able to get it to work is by "pre-flattening" the json so the key fields are top-level. Is that an actual requirement?
In this example we might do something like:
CREATE TABLE orders (
header ROW<event_time BIGINT, source STRING>,
main ROW<user_id STRING, order_id STRING, amount DECIMAL(18, 4)>,
PRIMARY KEY (main.order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'orders',
'key.format' = 'json',
'value.format' = 'json'
);
Flink will complain:
SQL parse failed. Encountered "." at line X. Was expecting one of: ")", ",".
If you alias the nested field such as:
main.order_id AS order_id
PRIMARY KEY (order_id) NOT ENFORCED
Flink will throw the error:
A PRIMARY KEY constraint must be declared on physical columns.
Has anyone found a way to reference a nested field in the PK without flattening? (or can we use the values in the message's key) Thanks!Avinash Tripathy
04/25/2025, 10:29 AM2025-04-25 10:13:06,041 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MULTISINK_APP_KAFKA_SOURCE -> MULTISINK_APP_TRANSFORMER -> MLTISINK_APP_KAFKA_SINK: Writer -> MLTISINK_APP_KAFKA_SINK: Committer (2/2)#0 (57700502b5f830802eee110ba116985e_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from RUNNING to FAILED with failure cause:
java.lang.NoSuchMethodError: 'org.apache.avro.io.BinaryEncoder com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema.getEncoder()'
at com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema.serialize(GlueSchemaRegistryAvroSerializationSchema.java:106) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at com.foo.bar.app.models.KafkaProducerRecordSchema.serialize(KafkaProducerRecordSchema.java:103) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at com.foo.bar.app.models.KafkaProducerRecordSchema.serialize(KafkaProducerRecordSchema.java:27) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:205) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
at com.foo.bar.app.tranformer.FlinkTransformerFunction.flatMap(FlinkTransformerFunction.java:102) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at com.foo.bar.app.tranformer.FlinkTransformerFunction.flatMap(FlinkTransformerFunction.java:32) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) ~[flink-dist-1.20.1.jar:1.20.1]
Avinash Tripathy
04/25/2025, 10:31 AM<flink.version>1.20.1</flink.version>
<mockito.version>3.0.0</mockito.version>
<log4j.version>2.19.0</log4j.version>
<awssdk.version>2.31.25</awssdk.version>
<spring-test.version>7.0.0-M4</spring-test.version>
<amazon-glue.version>1.1.23</amazon-glue.version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.1-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- <https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api> -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>iam</artifactId>
<version>${awssdk.version}</version> <!-- Make sure to use the latest version -->
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>core</artifactId>
<version>${awssdk.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.5.0</version> <!-- Or any newer version -->
</dependency>
<!-- Flink redis connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-prometheus</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<!-- <https://mvnrepository.com/artifact/org.junit.vintage/junit-vintage-engine> -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.2</version>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.15.0-rc2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-test.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.uhn.hapi</groupId>
<artifactId>hapi-base</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-flink-serde</artifactId>
<version>${amazon-glue.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
</exclusion>
<exclusion>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime-jvm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime-jvm</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.28.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20231013</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-build-tools</artifactId>
<version>1.1.16</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>7.2.2-ccs</version>
<scope>compile</scope>
</dependency>
</dependencies>
Pham Khoi
04/25/2025, 11:57 AMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
...
spec:
image: ...
flinkVersion: v1_15
flinkConfiguration:
security.kerberos.krb5-conf.path: /etc/krb5.conf
security.kerberos.login.keytab: /opt/keytab
security.kerberos.login.principal: ...
security.kerberos.login.contexts: KafkaClient
...
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: kerberos-keytab
mountPath: /opt/keytab
subPath: keytab
...
I don't how to apply KafkaClient to contexts. If u wanna know more detail: see in the link: https://stackoverflow.com/questions/73581288/specifying-keytab-for-flinkdeployment-of-flink-kubernetes-operator. Help me please, I am so approciate that! love. OH I forgot a another point that when I use hadoop common with the version higher than 3.2.0 I will get error: Client can not authenticate via ( token, kerberos), and if I change the version to 2.10.2 I got the error as I described : GSS initiate failed, Failed to find any Kerberos tgtTiansu Yu
04/25/2025, 12:00 PMFrancis Altomare
04/25/2025, 2:01 PMenableAsyncState()
with a KeyedProcessFunction
I would need to configure my job with something along the lines of:
upstream
.keyBy { keyFunction }
.enableAsyncState()
.transform(
"KeyedProcess",
TypeInformation.of(KeyProcessOutput::class.java),
AsyncKeyedProcessOperator(KeyedProcessFunction())
)
.sinkTo(downstream)
This approach makes sense to me when dealing with `KeyedProcessFunction`'s. However, I have a pipeline that uses a KeyedCoProcessFunction
with a state store that could really benefit from enableAsyncState
.
Does anyone know of any analog to the AsyncKeyedProcessOperator
that would work with a KeyedCoProcessFunction
? Is this something that would be possible or should I be looking for a different approach here? Thanks a lot!Frantisek Hartman
04/25/2025, 2:12 PMGuruguha Marur Sreenivasa
04/25/2025, 6:21 PM2025-04-25 18:12:20,226 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesSessionClusterEntrypoint down with application status FAILED. Diagnostics java.lang.SecurityException: java.io.IOException: Configuration Error: │
│ Line 4: expected [option key] │
│ at java.base/sun.security.provider.ConfigFile$Spi.<init>(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile.<init>(Unknown Source) │
│ at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) │
│ at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) │
│ at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) │
│ at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) │
│ at java.base/java.lang.Class.newInstance(Unknown Source) │
│ at java.base/javax.security.auth.login.Configuration$2.run(Unknown Source) │
│ at java.base/javax.security.auth.login.Configuration$2.run(Unknown Source) │
│ at java.base/java.security.AccessController.doPrivileged(Native Method) │
│ at java.base/javax.security.auth.login.Configuration.getConfiguration(Unknown Source) │
│ at org.apache.flink.runtime.security.modules.JaasModule.install(JaasModule.java:98) │
│ at org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76) │
│ at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57) │
│ at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.installSecurityContext(ClusterEntrypoint.java:279) │
│ at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:231) │
│ at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:739) │
│ at org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint.main(KubernetesSessionClusterEntrypoint.java:61) │
│ Caused by: java.io.IOException: Configuration Error: │
│ Line 4: expected [option key] │
│ at java.base/sun.security.provider.ConfigFile$Spi.ioException(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile$Spi.match(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile$Spi.parseLoginEntry(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile$Spi.readConfig(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile$Spi.init(Unknown Source) │
│ at java.base/sun.security.provider.ConfigFile$Spi.init(Unknown Source) │
│ ... 18 more │
│ .
Surya Narayana
04/25/2025, 7:43 PMmaja
04/25/2025, 9:23 PMAntti Kaikkonen
04/28/2025, 7:20 AMSourceReader
output records with strictly increasing timestamps, but I can't figure out how to implement the addSplitsBack
method in the SplitEnumerator
when all source readers have already received splits with greater timestamps. The only way I can think of is to simply throw an error in the addSplitsBack
to trigger a checkpoint recovery, but is this a valid way to do it?Arif Khan
04/28/2025, 6:12 PM1.11.0
and when I am trying to setup session cluster in AWS EKS, I am getting this error:
one or more objects failed to apply, reason: Internal error occurred: failed calling webhook "mutationwebhook.flink.apache.org": failed to call webhook: Post "<https://flink-operator-webhook-service.flink-cluster.svc:443/mutate?timeout=10s>": Address is not allowed
I also tried this setting at session cluster level, but no luck
kubernetes.hostnetwork.enabled: "true"
I see a lot of folks have a session cluster setup in AWS EKS and if you ran into this issue, please spare a minute to help me with this, appreciate your help.Nihar Rao
04/28/2025, 8:33 PMkubectl describe pod <JM pod>
shows the pod restarted due to OOM as well.
3. After JM was OOMkilled, JM was restarted and 24 new taskmanagers pods were started and is confirmed on flink UI on available task slots section.
4. There was no impact on job (it restarted successfully) but there are 48 taskmanagers running out of which 24 are standby. The expected behaviour after a JM OOM with HA enabled is no starting of new task managers.
5. I simulated the OOM on JM pod for one of our other job but no similar behaviour was observed.
I have added the flink UI showing 24 extra TMs (48 task slots) screenshot and kubectl output in the thread.
Can you please help us on how to debug this as kubernetes operator don't show any relevant information on why this happened. Thanks!Pedro Mázala
04/29/2025, 10:38 AMRaj Kunwar Singh
04/29/2025, 3:04 PMFlink v1.19.2
. I am trying to read an s3 path using the FileSystem connector
.
If I try it the Table API Connector
way, the read happens the first time but subsequently if new "partitions" are getting added, Flink is not aware of them even when I provide 'source.monitor-interval'
to make it unbounded.
String createTableSql = """
CREATE TABLE tbl (
col1 STRING,
part_col STRING
) PARTITIONED BY (part_col)
WITH (
'connector' = 'filesystem',
'path' = 's3a:/bucket_name/path',
'format' = 'parquet',
'source.monitor-interval' = '10s'
)
""";
However if I use the Datastream
Connector, which is low level and not something that I want to use, it works if new partition folders are added.
I don't see any error logs even when I enabled DEBUG logs. Can someone help with this? I am suspecting that the partition discovery isn't working.George Leonard
04/30/2025, 7:10 PM{
"ts" : 1729312551000,
"metadata" : {
"siteId" : 1009,
"deviceId" : 1042,
"sensorId" : 10180,
"unit" : "Psi",
"ts_human" : "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
},
"deviceType" : "Oil Pump",
},
"measurement" : 1013.3997
}
This is the flink create table i'm creating. idea is to consume from a kafka topic, I'm not using serialisation or a schema registry.
CREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north (
ts BIGINT,
metadata ROW<
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
location ROW<
latitude DOUBLE,
longitude DOUBLE
>,
deviceType STRING
>,
measurement DOUBLE
TS_WM AS TO_TIMESTAMP(FROM_UNIXTIME(ts))),
WATERMARK FOR TS_WM AS TS_WM
) WITH (
'connector' = 'kafka',
'topic' = 'factory_iot_north',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'devlab0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'value.fields-include' = 'ALL'
);
see err.txt attachedGeorge Leonard
04/30/2025, 7:22 PMCREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north (
ts BIGINT,
metadata ROW<
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
location ROW<
latitude DOUBLE,
longitude DOUBLE
>,
deviceType STRING
>,
measurement DOUBLE,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'factory_iot_north',
'properties.bootstrap.servers' = 'broker:9092',
'properties.group.id' = 'devlab0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
George Leonard
04/30/2025, 7:23 PMCREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north_unnested (
ts BIGINT,
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
latitude DOUBLE,
longitude DOUBLE,
deviceType STRING,
measurement DOUBLE,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) ;
George Leonard
04/30/2025, 7:25 PMINSERT INTO hive_catalog.iot.factory_iot_north_unnested
SELECT
ts,
metadata.siteId AS siteId,
metadata.deviceId AS deviceId,
metadata.sensorId AS sensorId,
metadata.unit AS unit,
metadata.ts_human AS ts_human,
metadata.location.latitude AS latitude,
metadata.location.longitude AS longitude,
metadata.deviceType AS deviceType,
measurement
FROM hive_catalog.iot.factory_iot_north;
Fabricio Lemos
04/30/2025, 8:59 PMflink-connector-jdbc.4.0.0-2.0
in a Maven repository? It's not available in Maven Central, which makes the instructions in the documentation hard to follow. The links to JdbcSource javadoc and JdbcSourceBuilder javadoc are also broken there.Antti Kaikkonen
05/02/2025, 12:13 PMVojtěch Hauser
05/03/2025, 12:14 AMGeorge Leonard
05/03/2025, 9:19 AMCREATE OR REPLACE TABLE hive_catalog.iot.factory_iot_north (
ts BIGINT,
metadata ROW<
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
location ROW<
latitude DOUBLE,
longitude DOUBLE
>,
deviceType STRING
>,
measurement DOUBLE,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'factory_iot_north',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'devlab0',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
Target Table
CREATE OR REPLACE TABLE fluss_catalog.fluss.factory_iot_north (
ts BIGINT,
metadata ROW<
siteId INTEGER,
deviceId INTEGER,
sensorId INTEGER,
unit STRING,
ts_human STRING,
location ROW<
latitude DOUBLE,
longitude DOUBLE
>,
deviceType STRING
>,
measurement DOUBLE,
partition_month STRING -- must be provided by upstream or insert logic
) PARTITIONED BY (partition_month) WITH (
'bucket.num' = '3'
,'table.datalake.enabled' = 'true'
,'table.auto-partition.time-unit' = 'MONTH'
,'table.auto-partition.num-retention'= '60'
);
Insert statement
Tried both of the below... first one is accepted but flink jobmanager logs contain the below error
INSERT INTO fluss_catalog.fluss.factory_iot_north
SELECT
ts AS ts,
metadata AS metadata,
measurement AS measurement,
DATE_FORMAT(event_time, 'yyyyMM') AS partition_month
FROM hive_catalog.iot.factory_iot_north;
err
2025-05-03 08:21:34,883 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: factory_iot_north[1] -> Calc[2] -> factory_iot_north[3]: Writer (2/2) (18c94214edcecb9d7707c5e3843d960a_cbc357ccb763df2852fee8c4fc7d55f2_1_8) switched from INITIALIZING to FAILED on 172.18.0.12:44331-ea5bc2 @ pipeline-taskmanager-1.pipeline (dataPort=44975).
java.lang.IllegalArgumentException: Currently, Data type 'ROW<`siteId` INT, `deviceId` INT, `sensorId` INT, `unit` STRING, `ts_human` STRING, `location` ROW<`latitude` DOUBLE, `longitude` DOUBLE>, `deviceType` STRING>' is not supported in indexedRow
at com.alibaba.fluss.row.indexed.IndexedRow.isFixedLength(IndexedRow.java:265) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.row.indexed.IndexedRow.calculateVariableColumnLengthListSize(IndexedRow.java:148) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.row.indexed.IndexedRowWriter.<init>(IndexedRowWriter.java:61) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.row.encode.IndexedRowEncoder.<init>(IndexedRowEncoder.java:45) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.row.encode.IndexedRowEncoder.<init>(IndexedRowEncoder.java:38) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.client.table.writer.AppendWriterImpl.<init>(AppendWriterImpl.java:65) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.client.table.writer.TableAppend.createWriter(TableAppend.java:45) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter.initialize(AppendSinkWriter.java:47) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at com.alibaba.fluss.connector.flink.sink.FlinkSink.createWriter(FlinkSink.java:59) ~[fluss-connector-flink-0.6.0.jar:0.6.0]
at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:163) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.1.jar:1.20.1]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2
and
INSERT INTO fluss_catalog.fluss.factory_iot_north
SELECT
ts
,metadata.siteId
,metadata.deviceId
,metadata.sensorId
,metadata.unit
,metadata.ts_human
,metadata.location.latitude
,metadata.location.longitude
,metadata.deviceType
,measurement
,DATE_FORMAT(event_time, 'yyyyMM')
FROM hive_catalog.iot.factory_iot_north;
Err, ps the above select works... it's some how not happy with the source/target table mapping
Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'fluss_catalog.fluss.factory_iot_north' do not match.
Cause: Different number of columns.
Query schema: [ts: BIGINT, siteId: INT, deviceId: INT, sensorId: INT, unit: STRING, ts_human: STRING, latitude: DOUBLE, longitude: DOUBLE, deviceType: STRING, measurement: DOUBLE, EXPR$10: STRING]
Sink schema: [ts: BIGINT, metadata: ROW<`siteId` INT, `deviceId` INT, `sensorId` INT, `unit` STRING, `ts_human` STRING, `location` ROW<`latitude` DOUBLE, `longitude` DOUBLE>, `deviceType` STRING>, measurement: DOUBLE, partition_month: STRING]
Martin Egri
05/05/2025, 10:10 AMflink-connector-jdbc:3.3.0-1.20
jar into the deployment runtime under /opt/flink/lib/ext/…
but I get this error when trying to run a job using the connector:
[…]
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.CatalogFactory' found in the classpath.
Ambiguous factory classes are:
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory
org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:639) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:802) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:499) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.catalog.CatalogManager.initCatalog(CatalogManager.java:376) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.catalog.CatalogManager.createCatalog(CatalogManager.java:330) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.operations.ddl.CreateCatalogOperation.execute(CreateCatalogOperation.java:88) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1102) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
at com.pagero.owl.flink.datagen.main$package$.main(main.scala:23) ~[?:?]
at com.pagero.owl.flink.datagen.main.main(main.scala:13) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
... 7 more
2025-05-05 10:06:25,220 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
Is it not possible to install the connector directly on the server? It's working with other dependencies…Philipp
05/05/2025, 1:22 PM