Rajat Ahuja
07/02/2023, 11:45 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: session-deployment-only-example
spec:
image: flink:1.16
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "10"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
JOb
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: session-job-only-example
spec:
deploymentName: session-deployment-only-example
job:
jarURI: file:///Users/rxahuja/Downloads/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
status
kubectl get flinksessionjobs | grep session-job-only-example
basic-session-job-only-example UPGRADING
session-job-only-example UPGRADING
kiran kumar
07/03/2023, 8:23 AM+ kubectl rollout status -f flink-deployment.yaml
error: unable to decode "flink-deployment.yaml": no kind "FlinkDeployment" is registered for version "<http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>" in scheme "<http://k8s.io/kubernetes/pkg/kubectl/scheme/scheme.go:28|k8s.io/kubernetes/pkg/kubectl/scheme/scheme.go:28>"
Dheeraj Panangat
07/03/2023, 10:08 AMRashmin Patel
07/03/2023, 10:46 AMcom.esotericsoftware.kryo.KryoException: Unable to find class: O_STATE
Serialization trace:
customerRecord (com.navi.ndp.pipelines.customerfunnel.models.CustomerMasterState)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
NO_STATE
is the default value of one enum field in the case class.
Has anyone faced such issue ever ?Oscar Perez
07/03/2023, 10:46 AMChris Tabakakis
07/03/2023, 1:04 PMShengbo
07/03/2023, 3:03 PMEugenio Gastelum
07/03/2023, 3:58 PMYaroslav Bezruchenko
07/03/2023, 6:21 PMstate.backend.incremental: "true"
But when I check my checkpoint dir I see next: savepoints have weigh of 38GB and checkpoints - 49GB.
I had couple of restarts, with restore from savepoint. So I see 4 folders with checkpoints. Can you please suggest, what to do with extra data? Can I detect what I can delete safely? Is there a way for auto clean-up of checkpoint dir by Flink Operator?
Thanks in advanceAlex Bryant
07/04/2023, 6:23 AMstream
data structure, as opposed to queue
for RabbitMQ. We are setting up a basic job and have managed to get the connector working with the queue
type, but not stream
.
version: '3.3'
services:
jobmanager:
image: flink:1.17.0
expose:
- '8081'
ports:
- '8081:8081'
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- JOB_MANAGER_RPC_PORT=6123
- MQ_IP=${MQ_IP}
- MQ_PORT=${MQ_PORT}
- MQ_Q=${MQ_Q}
- MQ_UN=${MQ_UN}
- MQ_PW=${MQ_PW}
taskmanager:
image: flink:1.17.0
depends_on:
- jobmanager
expose:
- '6121'
ports:
- '6121:6121'
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- JOB_MANAGER_RPC_PORT=6123
- TASK_MANAGER_NUMBER_OF_TASK_SLOTS=4
networks:
dev-network:
driver: bridge
Connector version:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>3.0.1-1.17</version>
</dependency>
Connecting to RabbitMQ:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("Connecting....................");
String MQ_IP = System.getenv("MQ_IP");
int MQ_PORT = Integer.parseInt(System.getenv("MQ_PORT"));
String MQ_UN = System.getenv("MQ_UN");
String MQ_PW = System.getenv("MQ_PW");
String MQ_Q = System.getenv("MQ_Q");
// RabbitMQ connection configuration
RMQConnectionConfig connectionConfig =
new RMQConnectionConfig.Builder()
.setHost(MQ_IP)
.setPort(MQ_PORT)
.setUserName(MQ_UN)
.setPassword(MQ_PW)
.setVirtualHost("/")
.build();
// Create RabbitMQ source
RMQSource<String> source = new RMQSource<>(
connectionConfig, MQ_Q, true, new SimpleStringSchema());
DataStream<String> stream = env
.addSource(source)
.name("RabbitMQ Source");
stream.print();
Does anyone know if this is supported? If so, how can this be accomplished?
We have tried using ("x-queue-type", "stream")
in ClientProperties but were unsuccessful.
We'd appreciate some support, even if the advice is to take a different approach. Thanks!Jirawech Siwawut
07/04/2023, 6:30 AMSergey Postument
07/04/2023, 1:33 PM"com.ververica" % "flink-sql-connector-postgres-cdc" % cdcConnectorVer % "provided",
"org.postgresql" % "postgresql" % "42.4.0" % "provided",
and we noticed if psql source table are empty (no data from the source) , flink cannot make an checkpoint what leads to job restart with the exception
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:205)
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:168)
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1939)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1919)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:97)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
here is the checkpoint configurationamarjeet pasrija
07/04/2023, 2:32 PMChris Tabakakis
07/04/2023, 3:15 PMChris Tabakakis
07/04/2023, 3:17 PMCaused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.
Here is the full pastebin in case more information is needed.James Watkins
07/04/2023, 6:32 PMtableEnv.executeSql("INSERT INTO flink_catalogue.flink_demo.flink_iceberg_sink VALUES (1, 1, 'dummy_merchant1', 1, '2022-04-01 00:00:00'), (2, 2, 'dummy_merchant2', 2, '2022-04-02 00:00:00');")
but when I try and use the data stream from my Kafka source it’s not writing any data and there don’t seem to be any useful error messages in the stacktrace. Here is the INSERT INTO statement (note, flink_transaction
is a kafka-backed Flink source table):
tableEnv.executeSql("""
INSERT INTO flink_catalogue.flink_demo.flink_iceberg_sink
SELECT
transaction_id
, transaction_amount
, merchant_name
, user_id
, created_at
FROM flink_transaction
""")
I also checked if the data stream is producing the data I need in the right format by removing the INSERT INTO
statement and it did print the results that I was expecting.
Can anyone help me understand what I’m doing wrong here please? (I’ll add the full code in-thread)Eugenio Gastelum
07/04/2023, 7:16 PMAlex Bryant
07/05/2023, 12:33 AMSlackbot
07/05/2023, 6:49 AMVitor Leal
07/05/2023, 8:02 AMDESCRIBE
a table that's already part of a job (not in the execution environment)?Or Keren
07/05/2023, 9:21 AMlast-state
configuration, but we needed it in a fresh state, so we upgraded it with the stateless
configuration), it takes a really long time to initialize the KafkaSink operator, could take more than an hour.
From looking at the logs, it seems that it tries to send InitProductId for each previous checkpoint id it had incrementally from 0. Anyone knows why this is happening when trying to start with a clean state? How can we solve this?Amenreet Singh Sodhi
07/05/2023, 11:27 AMBruno Filippone
07/05/2023, 12:01 PMMikhail Spirin
07/05/2023, 12:29 PM{
"str1": 970089079,
"str2": 1688400374,
"str3": {
"str4": {
"column1": {
grouped_column2: 1,
grouped_column2: 1,
....
}
}
}
}
my approaches are in thread below…Leong Wai Leong
07/05/2023, 1:19 PMsagar shinde
07/05/2023, 3:57 PMAndré Midea Jasiskis
07/05/2023, 8:13 PMuid
explicitly set. I have operator chaining enabled, those 3 stateful operators got merged into one operator in the (looking in the Flink UI). For savepoint/checkpoint matters, are those operators still 3 individual "states" or are they merged into one and a new uidhash
is given to them? If so is this new uidhash
deterministic based on my other 3 explicitly set uids?Pedro H S Teixeira
07/05/2023, 10:33 PMdev Jiang
07/06/2023, 9:35 AMKrutik Gadhiya
07/06/2023, 10:14 AMKafkaSource<Map<String, Object>> logSource = KafkaSource.<Map<String, Object>>builder()
.setBootstrapServers(BOOTSTRAP_SERVER)
.setTopics(SOURCE_TOPIC)
.setGroupId("group2")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new LogDeserializer())
.build();
also try to list groups using rpk group list
, but that didn't show any groups