Aly Ayman
08/19/2024, 10:28 AMAndrew Sims
08/20/2024, 1:52 AMNafer Sanabria
08/20/2024, 10:17 AM$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=STREAMING -c com.example.DataStreamJob target/flink-poc-1.0.jar
I also tried by explicitly setting the runtime in the env
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
but it did not work because the job finishes in a couple of seconds. The test I want to perform is by copying a new file into the directory and check that the file is being processed
What am I missing ? Any hints ?Andre Luiz
08/20/2024, 6:06 PMЄвген Шепелюк
08/21/2024, 8:36 AMAvinash Upadhyaya
08/21/2024, 10:52 AM[a-zA-Z0-9~\-_.]+
as my regular expression and my SQL query has the same. It throws the following error
Invalid regular expression '[A-Za-z0-9~\-_.]+'
at org.apache.flink.table.functions.SqlLikeUtils.invalidRegularExpression(SqlLikeUtils.java:178)
at org.apache.flink.table.functions.SqlLikeUtils.sqlSimilarRewriteCharEnumeration(SqlLikeUtils.java:221)
at org.apache.flink.table.functions.SqlLikeUtils.sqlToRegexSimilar(SqlLikeUtils.java:285)
at org.apache.flink.table.functions.SqlLikeUtils.sqlToRegexSimilar(SqlLikeUtils.java:240)
at org.apache.flink.table.functions.SqlLikeUtils.similar(SqlLikeUtils.java:80)
However, [A-Za-z0-9~\-.]+
works. The problem is with _
and it happens on https://github.com/apache/flink/blob/56c81995d3b34ed9066b6771755407b93438f5ab/flin[…]rc/main/java/org/apache/flink/table/functions/SqlLikeUtils.java If the regex contains one of the special characters defined in _SQL_SIMILAR_SPECIALS_, it throws an invalidRegularExpression exception.Yaroslav Bezruchenko
08/21/2024, 12:05 PMorg.rocksdb.RocksDBException: WriteBatch has wrong count
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:955)
at org.apache.flink.contrib.streaming.state.RocksDBMapState.put(RocksDBMapState.java:139)
at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
.........
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
We are using:
Flink 1.19.0
Flink Kubernetes Operator 1.9.0
Java 17.0.12+7, temurin
As for dependencies for RocksDB:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.19.0</version>
</dependency>
This is not reproducable locally for me. Any ideas what can cause this?Arthur Catrisse
08/21/2024, 12:33 PMflink-kubernetes-operator
We are occasionally encountering org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted
errors on our podmanagers. This completely blocks the podmanager from restarting correctly and launching new tasks.
This seems to happen sometimes when a podmanager pod is rotated by the kubernetes cluster. (via karpenter).
Sometimes we do see the SIGTERM
in the pod logs when they are rotated out, but sometimes not. (Could some logs not go through ?)
We have tried setting up high-availability (although not sure if it would solve the issue), but the config does not seem to recognise the kubernetes.jobmanager.replicas
value (Tried setting at 2, but it is still interpreted as 1).
We have defined high-availability.type: kubernetes
high-availability.storageDir
(and we can't explicitly set
kubernetes.cluster-id
it seems handled by the deployment. When we do the app does not start)
Do you have any ideas what could be causing these instabilities ? Thanks.
Our flink confs ⬇️Guruguha Marur Sreenivasa
08/21/2024, 3:30 PMJonas Brami
08/21/2024, 5:12 PMSrivatsav Gorti
08/22/2024, 3:30 PMCaused by: java.io.FileNotFoundException: /tmp/flink-web-b7c8bf9d-504f-4c17-bf1e-b10ce5f8b242/flink-web-upload/1e56ab19-b842-41f0-9ca4-00cf4936ae81_release-1.0-jar-with-dependencies.jar (No such file or directory)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job
Jar size is barely 1MB all dependencies are baked into the class path of the cluster.D. Draco O'Brien
08/22/2024, 5:20 PM/tmp/flink-web-b7c8bf9d-504f-4c17-bf1e-b10ce5f8b242/flink-web-upload/
, verify if the directory exists and if there are any recent changes in your system’s cleanup policies that might be deleting files from /tmp
. Some systems automatically clean up /tmp
directories, which could be causing the issue.
2. Job Submission Process: Investigate how jobs are being submitted to the Flink cluster. Ensure that the JAR file is correctly uploaded and referenced during job submission. If you’re using a web interface or REST API, check the logs for any hints about failed uploads or misconfigurations.
3. Disk Space: Confirm that there is enough disk space on the nodes where Flink is running. A full disk can prevent file uploads and cause similar errors.
Long-term Resolutions & Best Practices
1. Customize Flink’s Blob Server Path: Instead of using the default /tmp
directory, consider configuring a more persistent storage location for the Flink blob server. You can do this by setting the blob.server.base-dir
property in your Flink configuration file (typically flink-conf.yaml
). Choose a directory that is less likely to be cleaned up automatically and has ample space.
2. Job Management: Implement checks before job submission to ensure that the required JAR files are present and accessible. This can be done as a pre-flight check in your job submission scripts or application logic.
3. Monitoring & Logging: Enhance monitoring around the job submission process and the blob server activity. Use Flink’s built-in metrics and consider integrating with your monitoring system to get alerts when the disk space is running low or when job submissions fail due to missing files.
4. Resource Management: Review your resource allocation, particularly for the JobManager and TaskManagers. Insufficient resources can lead to failures in managing or executing jobs.
5. Version Compatibility: Ensure that the version of Flink used for compiling the JAR matches the version running in the cluster. Mismatched versions can sometimes cause unexpected behavior.D. Draco O'Brien
08/22/2024, 5:22 PMOdin Wang
08/23/2024, 1:08 AMTudor Plugaru
08/23/2024, 9:16 AMamarjeet pasrija
08/23/2024, 1:57 PMSameer alwosaby
08/23/2024, 7:41 PMSwaraj
08/24/2024, 7:35 AMPojoSerializer
for serializing POJOs. However, I have a question regarding how Flink handles serialization when one of the fields in the POJO is a generic type. Specifically, does the presence of a generic field cause the entire POJO to fall back to Kryo serialization, or will only that specific generic field fall back to Kryo serialization while the rest of the POJO continues to use PojoSerializer
?
Any insights would be greatly appreciated!David Vittori
08/24/2024, 3:50 PMpublic BaseEvent deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)
throws IOException {
JsonNode jsonNode = objectMapper.readTree(recordValue);
// Determine the event type from the JSON and deserialize accordingly
if (!jsonNode.has("key")) {return null;}
String key = jsonNode.get("key").asText();
try {
switch (partitionKey) {
case EventConstants.KEY_EVENT_TRIP_ENDED:
return objectMapper.treeToValue(jsonNode, TripEndedEvent.class);
case EventConstants.KEY_EVENT_TRIP_REPORTED:
return objectMapper.treeToValue(jsonNode, TripReportedEvent.class);
// Add more cases for other event types
default:
return null;
}
} catch (Exception e) {
Log.error("Failed to deserialize event", e);
return null;
}
}
However, I'm running into a ClassCastException
java.lang.RuntimeException: Could not extract key from com.codistica.services.etl.events.TripReportedEvent@6169c6c5
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:61)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
at <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:101)
when I try to filter the stream by event type. It seems that Flink encounters an issue when it tries to extract the key, possibly due to how I've set up the deserialization or the filtering process.
return eventsStream
.filter(event -> event instanceof TripEndedEvent)
.map(event -> (TripEndedEvent) event)
.returns(TypeInformation.of(TripEndedEvent.class));
My question is: Is this the best approach for handling multiple event types in Flink, or is there a better pattern or configuration I should follow?
Any insights or suggestions would be greatly appreciated!Aly Ayman
08/25/2024, 10:05 AMjava.lang.NumberFormatException: For input string: ""201275004137""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:578)
at java.lang.Long.parseLong(Long.java:631)
at org.orangeFlinkDS.Main$2.filter(Main.java:111)
at org.orangeFlinkDS.Main$2.filter(Main.java:108)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
Aly Ayman
08/26/2024, 5:49 AMSandrine Bédard
08/26/2024, 4:34 PMtrackable_job
). It then triggers another workflow, called Sync (also in Cadence), which sends data to downstream services
3. The Sync workflow receives a store ID as input (among other things), waits for 5-min for an aggregation window. After 5-min, it reads all items from trackable_job
with the given store ID, and calls external APIs to sync data
The issues with the current architecture are:
1. The 5-min aggregation window results in a lot of items being read at once from trackable_job
(up to 170K), making the DB query to fetch items very long and inefficient
2. Multiple Cadence workflows read from trackable_job
, making things worse from a DB perspective
3. We track success/failed syncs in the trackable_job
table, but we don't do anything with it (e.g., publishing events to clients), so this table is used as a queue only
In my design, I'm considering 2 options:
• Option 1: Kafka + Cadence
◦ Replace trackable_job
with a proper Kafka queue (partitioned by store ID)
◦ Modify the Sync workflow to pull from that queue. I've read Cadence isn't super easy to set up with Kafka. For example, if queues are partitioned, we need to define which Cadence worker reads from which queue. Is that true?
◦ We must have grouping logic to group items by store ID, and then make the API call downstream (requirement from external dependencies)
• Option 2: Kafka + Flink
◦ Replace trackable_job with a proper Kafka queue (partitioned by store ID)
◦ Move business logic of Sync workflow into Flink. Set up Flink tasks to maximize parallelism with Kafka
◦ Add a time/count window through Flink to control to aggregation window
◦ Group events in Flink by store ID
Do you think my problem is a good use-case for Flink, and what do you think of option 2? Thanks a lot!Rion Williams
08/26/2024, 6:11 PMJobManager deployment is missing and HA data is not available to make stateful upgrades.
It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
The jobs themselves are targeting Flink 1.18 and 1.8 for the operator itself. The only job that did successfully run was a batch-based job, but all non-batch jobs failed to start and required manual intervention.
Any ideas or possibly a known issue in JIRA related to this?Ritesh Singh
08/27/2024, 4:14 AMHammad Hassan
08/27/2024, 9:03 AMAly Ayman
08/27/2024, 10:41 AMAbrar Sher
09/06/2024, 8:23 PMMichael LeGore
09/09/2024, 4:09 PMAly Ayman
09/09/2024, 9:58 AMCaused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:161)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:212)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:224)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:228)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:745)
... 26 more
Caused by: java.io.IOException: Too many open files
at java.base/sun.nio.ch.EPoll.create(Native Method)
at java.base/sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:79)
at java.base/sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.base/java.nio.channels.Selector.open(Selector.java:295)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:159)
... 30 more
Slackbot
09/08/2024, 12:42 PM