Marco Villalobos
03/23/2024, 7:32 PMRoger Wang
03/25/2024, 8:45 AMCREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
May I know is there a solution can avoid to hardcode password in the code?
Truly appreciate!Gintaras Matulas
03/25/2024, 9:11 AMAri Huttunen
03/25/2024, 12:29 PMtaskmanager.memory.process.size
, but we observe the java process using more memory than than, which subsequently grinds the instance to a failure as the physical memory starts running out.
How can we figure out what is eating that memory? It's a streaming job that uses RocksDB coded with PyFlink and sql, no custom operators. Flink 1.18.1Maxim Senin
03/25/2024, 9:18 PM2024-03-25 21:09:42,733 o.a.f.k.o.l.AuditUtils [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] >>> Status | Info | DEPLOYED | The resource is deployed/submitted to Kubernetes, but it's not yet considered to be stable and might be rolled back in the future
2024-03-25 21:09:42,734 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:09:45,676 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYED_NOT_READY
2024-03-25 21:09:45,676 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] JobManager deployment is ready
2024-03-25 21:09:57,687 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xb5ef35a4]
java.util.concurrent.RejectedExecutionException: event executor terminated
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
...
2024-03-25 21:09:57,689 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:475)
at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:394)
and then
2024-03-25 21:09:57,689 o.a.f.k.o.o.JobStatusObserver [WARN ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Exception while listing jobs
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:255)
at org.apache.flink.kubernetes.operator.observer.JobStatusObserver.observe(JobStatusObserver.java:66)
and a few moments later:
2024-03-25 21:09:57,742 o.a.f.k.o.l.AuditUtils [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] >>> Status | Info | DEPLOYED | The resource is deployed/submitted to Kubernetes, but it's not yet considered to be stable and might be rolled back in the future
2024-03-25 21:09:57,743 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:09:57,746 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYING
2024-03-25 21:09:57,755 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] JobManager is being deployed
2024-03-25 21:09:57,756 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:10:02,759 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYING
2024-03-25 21:10:02,766 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] JobManager is being deployed
2024-03-25 21:10:02,768 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:10:05,513 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/basic-example] Resource fully reconciled, nothing to do...
2024-03-25 21:10:07,693 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYING
2024-03-25 21:10:07,705 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] JobManager is being deployed
2024-03-25 21:10:07,706 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:10:11,818 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYING
2024-03-25 21:10:11,825 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] JobManager is being deployed
2024-03-25 21:10:11,826 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Resource fully reconciled, nothing to do...
2024-03-25 21:10:16,829 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Observing JobManager deployment. Previous status: DEPLOYING
2024-03-25 21:10:16,839 o.a.f.k.o.c.FlinkDeploymentController [ERROR][flink/f-e48b135e-84be-51c2-b07d-e973c31e9876] Flink Deployment failed
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: back-off 10s restarting failed container=flink-main-container pod=f-e48b135e-84be-51c2-b07d-e973c31e9876-5f7c9b898b-2spwf_flink(2bf784cc-4b0f-4130-825f-5e0306110363)
at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.checkContainerBackoff(AbstractFlinkDeploymentObserver.java:181)
What does Force-closing a channel whose registration task was not accepted by an event loop
mean? Is there some kind of timeout and if the job doesn't deploy within that time the job is rejected and automatically restarted?
Flink 1.18, operator 1.7Maxim Senin
03/25/2024, 9:27 PM2024-03-25 21:18:02,404 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=1.0, taskHeapSize=537.600mb (563714445 bytes), taskOffHeapSize=0 bytes, networkMemSize=158.720mb (166429984 bytes), managedMemSize=634.880mb (665719939 bytes), numSlots=1}, current pending count: 1.
2024-03-25 21:18:02,413 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: []
2024-03-25 21:18:02,601 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job aac9fd300b7e27fdcc8a3070f29ba700 was recovered successfully.
2024-03-25 21:18:02,604 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2024-03-25 21:18:02,605 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2024-03-25 21:18:02,607 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2024-03-25 21:18:03,707 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2024-03-25 21:18:03,710 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2024-03-25 21:18:03,710 INFO org.apache.pekko.actor.CoordinatedShutdown [] - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
2024-03-25 21:18:03,716 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2024-03-25 21:18:03,716 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka1e0f1d1a-8925-48e5-8c46-98b2eb007aab.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2024-03-25 21:18:03,818 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2024-03-25 21:18:03,818 INFO org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
So it looks to me that it just suddenly starts to shut down in the middle of the deployment (Shutting down with status UNKNOWN
). Any ideas why?Ravi Nishant
03/26/2024, 1:05 AMFlinkKafkaConsumer
to KafkaSource
. But the documentation seems to suggest above upgrade steps for any generic Flink version.Євген Шепелюк
03/26/2024, 8:08 AMFlinkDeployment.spec.job.jarURI
?
From example I only noticed local://
and https://
Does it support any other protocol, especially OCI registries ?Kunal Rohitas
03/26/2024, 8:58 AMflink-kubernetes-operator-1.8.0
and flink-1.18.1
, with flinkConfiguration:
state.backend: filesystem
state.checkpoints.num-retained: "3"
taskmanager.numberOfTaskSlots: "8"
state.savepoints.dir: file:///data/new-savepoints
state.checkpoints.dir: file:///data/new-checkpoints
state.checkpoints.tolerableCheckpointFailureNumber: "10"
high-availability.type: kubernetes
high-availability.storageDir: file:///data/ha
kubernetes.operator.job.restart.failed: "true"
execution.checkpointing.tolerable-failed-checkpoints: "1000"
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
job.autoscaler.metrics.window: 3m
job.autoscaler.target.utilization: "0.6"
job.autoscaler.target.utilization.boundary: "0.2"
job.autoscaler.restart.time: 2m
job.autoscaler.catch-up.duration: 5m
job.autoscaler.memory.tuning.enabled: "true"
pipeline.max-parallelism: "120"
I'm getting this error during autoscaling, it is able to suggest what should be the updated parallelism but not able to apply it.
2024-03-26 08:41:21,617 o.a.f.a.JobAutoScalerImpl [ERROR][default/clientsession-job] Error applying overrides.
java.lang.NullPointerException
at org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer.realizeParallelismOverrides(KubernetesScalingRealizer.java:52)
at org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer.realizeParallelismOverrides(KubernetesScalingRealizer.java:40)
at org.apache.flink.autoscaler.JobAutoScalerImpl.applyParallelismOverrides(JobAutoScalerImpl.java:161)
at org.apache.flink.autoscaler.JobAutoScalerImpl.scale(JobAutoScalerImpl.java:111)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.applyAutoscaler(AbstractFlinkResourceReconciler.java:192)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:139)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:116)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:53)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:152)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:110)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:109)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:140)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:121)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-03-26 08:41:21,617 o.a.f.a.JobAutoScalerImpl [ERROR][default/clientsession-job] Error while scaling job
java.lang.NullPointerException
at org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer.realizeParallelismOverrides(KubernetesScalingRealizer.java:52)
at org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer.realizeParallelismOverrides(KubernetesScalingRealizer.java:40)
at org.apache.flink.autoscaler.JobAutoScalerImpl.applyParallelismOverrides(JobAutoScalerImpl.java:161)
at org.apache.flink.autoscaler.JobAutoScalerImpl.scale(JobAutoScalerImpl.java:111)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.applyAutoscaler(AbstractFlinkResourceReconciler.java:192)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:139)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:116)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:53)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:152)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:110)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:109)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:140)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:121)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
am I missing something ?Nitish
03/26/2024, 11:07 AMCaio Camatta
03/26/2024, 3:46 PMRoss Ellison
03/26/2024, 7:44 PMfrom pyflink.common import Configuration
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.connectors import kafka as kfk
from pyflink.common.serialization import SimpleStringSchema
import json
from pyflink.datastream import StreamExecutionEnvironment, RuntimeContext, MapFunction
from pyflink.common.typeinfo import Types
class EmbeddingAndClustering(MapFunction):
def init(self):
self.kmeans = None
self.vectorizer = None
def open(self, runtime_context: RuntimeContext):
import pickle
with open('path/model-tests/kmeans_model_ross.pkl', "rb") as f:
self.kmeans, self.vectorizer = pickle.load(f)
def map(self,value):
value = json.loads(value)
text = [value['description']]
vectorized_text = self.vectorizer.transform(text)
value['incident_cluster_number'] = str(self.kmeans.predict(vectorized_text)[0])
return json.dumps(value)
config = Configuration()
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(1)
env.add_jars("file:///path/flink-sql-connector-kafka-1.17.1.jar")
properties = {'bootstrap.servers': 'server01:9091', 'group.id': 'group-test'}
source = FlinkKafkaConsumer('ross-test', SimpleStringSchema(), properties).set_start_from_earliest()
topic_stream = env.add_source(source)
speed_stream = topic_stream.map(EmbeddingAndClustering(), output_type = Types.STRING())
speed_stream.sink_to(kafka sink omitted)
env.execute("ross-cluster-test")
A few notes:
1. This pkl model file is only 6kb for testing purposes trained from sklearn's Kmeans. Actual model much larger and slower, but even on small model see similar slowdown.
2. Have confirmed that normal data transfers benefit from increased parallelism & kafka topics partitioned properly
3. I have also tried setting source and sink to parallelism of 1, and the mapfunction to higher parallelisms and still see same slowdown, and backpressure build up in source.
4. The 'description' field is usually between 200 and 2000 words
5. The taskmanager has sufficient memory, threads, and every other observable metric. I've attached a few job config parameters. I can also provide metrics about the job manager/taskmanager/anything else
Any help would be greatly appreciated.VasuBabu Bathina
03/26/2024, 8:53 PMDaniel Suh
03/26/2024, 9:17 PMdk
03/26/2024, 10:44 PMTyron
03/27/2024, 8:40 AMЄвген Шепелюк
03/27/2024, 9:42 AMAlex
03/27/2024, 1:11 PMscala.Option
or other collection type like List or Map in Flink 1.13.
I have a POJO in Scala:
class Building(
var buildTime: Option[Instant],
var managers: List[String]
)
when I disable Kryo for generic type serialisation, I will get the error for the POJO above Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type
tamis
03/27/2024, 1:20 PMfido wang
03/27/2024, 1:46 PMAntti Ahonen
03/27/2024, 2:16 PMjava.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug
It happens when executing + collecting sql query on a StreamingTableEnvironment inside a local containerized version of Flink (jobmanager). It does not happen in local execution environment.
Basically this is the problem that someone has already seen: https://stackoverflow.com/questions/76520363/job-client-must-be-a-coordinationrequestgateway-this-is-a-bugTal
03/27/2024, 2:31 PMprocessfunction
to run a quite-heavy algorithm on each incoming event. I want to add a timeout for that process, and handle the timeout case (collect a timeout result). Only way I figured to do that is with process time timers, but it means registering a timer for each event, which is quite expensive. Is there a better way to do that?
Also, the timeouts should be in 100s of ms scaleЄвген Шепелюк
03/27/2024, 2:38 PMDataStream API
? I only see source connector for Table API
and Sink for DataStream API
If not - what is the best approach to create a bounded source for DataStream using JDBC query ?Rion Williams
03/27/2024, 3:35 PM# Descriptor
val recentlySeen = ListStateDescriptor(
"recently-seen-keys",
String::class.java
)
# Declaration
recentlySeen = runtimeContext.getListState(States.recentlySeen
.apply {
enableTimeToLive(
StateTtlConfig
.newBuilder(Time.minutes(duplicateBufferMinutes))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
}
)
# Usage
val identifier = ...
val recentIdentifiers = recentlySeen.get()
if (recentIdentifiers.contains(identifer)) { ... } else { recentlySeen.put(identifier); }
with this:
# Descriptor
val recentlySeen = MapStateDescriptor(
"recently-seen-keys",
String::class.java,
Boolean::class.java
)
# Declaration
recentlySeen = runtimeContext.getMapState(States.recentlySeen
.apply {
enableTimeToLive(
StateTtlConfig
.newBuilder(Time.minutes(duplicateBufferMinutes))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
}
)
# Usage
val identifier = ...
val wasRecentlySeen = recentlySeen.get(identifier) ?: false
if (wasRecentlySeen) { ... } else { recentlySeen.put(identifier, true); }
I’m trying to determine the best way to handle this or if it should “just work” as I’m entirely removing some state in favor of new/different state. In this case, it is acceptable if the existing contents of the ListState are lost (they should be empty anyways since they are seldom populated and only have a TTL of 15 minutes).Matan Perelmuter
03/27/2024, 4:46 PMDataStream<Result> a = processA(...)
DataStream<Result> b = processB(...)
sinkToDB(a.union(b))
one of the processors inside processA
is busy at 100%, and I see it makes the process at processB
to have backpressure 100%, although it happens before the union and they have no relation before. Does it make sense? Possibly to keep the watermarks aligned the union maybe causing b
to wait for a
?Sharath Gururaj
03/27/2024, 7:29 PMtable.exec.state.ttl=0ms
Is there any other reason why these messages would appear?
2024-03-27 19:25:23,475 INFO org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.
LinenBot
03/27/2024, 8:07 PMjoe
joined #troubleshooting.Rommel
03/28/2024, 12:23 AMsyntax = "proto3";
package com.mystuff.main.proto;
option java_multiple_files = true;
option java_outer_classname = "PowerhouseDSLProtoTable";
option java_package = "com.mystuff.proto.table";
message DataType {
oneof details {
PrimitiveDataType primitive_data_type = 1;
ComplexDataType complex_data_type = 2;
}
}
and after generating java code using protoc, i ran into msg like this in Flink
class com.mystuff.proto.table.DataType does not contain a getter for field detailsCase_
16:04:34,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class com.mystuff.proto.table.DataType cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
i used protoc to generate the java code. So I assume I used protoc wrong? Can anyone help me shine some light on this one? Thank youAntti Ahonen
03/28/2024, 8:44 AMCaused by: java.lang.IllegalStateException: userVisibleTail should not be larger than offset. This is a bug.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer.sanityCheck(AbstractCollectResultBuffer.java:144)
at org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer.dealWithResponse(AbstractCollectResultBuffer.java:94)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:150)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at com.mycompany.flink.FlinkApp.main(FlinkApp.java:89)
Here is the code that I run inside the jobmanager as a jar:
public static void main(String[] args) {
var environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5)));
environment.setStateBackend(new HashMapStateBackend());
environment.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
environment.setParallelism(1);
// make sure 500 ms of progress happen between checkpoints
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
environment.getCheckpointConfig().setCheckpointTimeout(60000);
// only two consecutive checkpoint failures are tolerated
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// allow only one checkpoint to be in progress at the same time
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained
// after job cancellation
environment
.getCheckpointConfig()
.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// enables the unaligned checkpoints
environment.getCheckpointConfig().enableUnalignedCheckpoints();
// sets the checkpoint storage where checkpoint snapshots will be written
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:////tmp///flink-checkpoints-directory");
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
environment.configure(config);
var flinkTableEnvironment = StreamTableEnvironment.create(environment);
flinkTableEnvironment.executeSql("CREATE DATABASE IF NOT EXISTS `test`;").print();
flinkTableEnvironment.executeSql("USE `test`;").print();
flinkTableEnvironment
.executeSql(
"CREATE TABLE `mytable` (" + "\n" +
"key STRING," + "\n" +
"`offset` BIGINT METADATA VIRTUAL," + "\n" +
"`ts` TIMESTAMP(3) METADATA FROM 'timestamp'," + "\n" +
"WATERMARK FOR ts AS ts" + "\n" +
") WITH (" + "\n" +
"'connector' = 'kafka'," + "\n" +
"'topic' = 'mytopic'," + "\n" +
"'properties.bootstrap.servers' = 'redpanda:29092'," + "\n" +
"'properties.group.id' = 'testGroup1'," + "\n" +
"'properties.auto.offset.reset' = 'earliest'," + "\n" +
"'value.format' = 'avro-confluent'," + "\n" +
"'value.avro-confluent.url' = '<http://redpanda:8081>'," + "\n" +
"'value.fields-include' = 'EXCEPT_KEY'," + "\n" +
"'key.fields' = 'key'," + "\n" +
"'key.format' = 'raw'," + "\n" +
"'scan.startup.mode' = 'group-offsets'" + "\n" +
");"
)
.print();
var query = flinkTableEnvironment.sqlQuery("SELECT * FROM `mytable`");
try (var collected = flinkTableEnvironment.toChangelogStream(query).executeAndCollect("myjob")) {
collected.forEachRemaining(row -> log.info("kind {}", row.getKind()));
}
}
Any idea what is wrong with the config? I tried both stopping the job with savepoint and resuming from there, or cancelling the job and resuming from a checkpoint but they both result in the same exceptionЄвген Шепелюк
03/28/2024, 10:37 AMHybridSource
for custom JDBC source (using DataGeneratorSource
) and KafkaSource
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/hybridsource/
Custom JDBC source works in standalone and n Hybrid, but the KafkaSource
doesn't work when added to HybridSource
)
java.lang.NullPointerException: Cannot invoke "org.apache.flink.streaming.runtime.streamrecord.RecordAttributes.isBacklog()" because "lastAttributes" is null
at org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder.getDefaultBacklog(RecordAttributesBuilder.java:72) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder.build(RecordAttributesBuilder.java:60) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processRecordAttributes2(AbstractStreamOperator.java:685) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecordAttributes(StreamTwoInputProcessorFactory.java:298) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.RecordAttributesCombiner.inputRecordAttributes(RecordAttributesCombiner.java:71) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:173) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) [flink-dist-1.19.0.jar:1.19.0]
at java.lang.Thread.run(Unknown Source) [?:?]