https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • d

    Dheemanth Gowda

    05/30/2023, 9:31 AM
    Hey, We are using JdbcSink to write to MySql DB. We wanted to configure retry policy if the connection goes down. I see there was issue logged regarding this- https://issues.apache.org/jira/browse/FLINK-16708. Is there any workaround for this right now. Currently we are using a Custom JdbcConnectionProvider. But wanted to know if there is a better way to achieve this
    m
    • 2
    • 1
  • s

    Sonika Singla

    05/30/2023, 10:00 AM
    Hi, I am Facing an issue with Flink app its reading Kafka data and pushing to OpenSearch. We have implemented HPA for scalability. Threshold set at 80% utilization, when threshold breaches it triggers replica creation, restarting job and task managers. Frequent restarts is causing malfunction. Data pushed to OpenSearch is only 6.5L out of 10L records. Need help to resolve and ensure proper functioning. Appreciate any guidance to achieve desired data ingestion. CC: @Dheeraj Panangat
  • h

    Hussain Abbas

    05/30/2023, 1:34 PM
    Hello Guys, We are planning to use Rocksdb with EKS using flink operator, we have multiple jobs running in application mode, i want to ask how can we run on EKS with autoscaler ON. The only issue right now with EBS is that we cannot use with deployments. What options do we have other than using localdisk of nodes.
  • k

    Kaiqi Dong

    05/30/2023, 2:13 PM
    Hey community đź‘‹ We are using KDA to consume topic from MSK, and KDA and MSK are deployed in different AWS accounts, KDA is in account A, while MSK in B. There is vpc and vpc peering set up between these 2 accounts. And corresponding security group is also configured from both sides, aka, MSK allows ingress connection from the security group where KDA is associated. KDA is set up to access the resources in VPC with security group, and private subnets. We could connect to MSK in account B using the private subnet from an instance in account A. But when launching KDA, we get the error:
    Copy code
    Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1685455751878, tries=1, nextAllowedTryMs=1685455751979) timed out at 1685455751879 after 1 attempt(s)
    Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
    From this post, it seems this is due to misconfiguration between client application and msk cluster. But seems we have no connectivity issue, and we allow all traffic for security group. Anyone encounters similar issue, and can provide any hints on where could go wrong? Thanks very much! 🙇
    âś… 1
    m
    j
    • 3
    • 4
  • o

    Oscar Perez

    05/30/2023, 2:39 PM
    Hello! We have been testing out the
    KeyedBroadcastProcessFunction
    . We have a setup in which we evaluate transaction based on some rules. The transactions are processed in the
    processFunction
    while on the
    processBroadcastFunction
    we manage and store the rules. So far, so good. The problem we are seeing is that, when redeploying, we load from a savepoint but the
    broadcastState
    is empty. In other words, there are no rules anymore. The taskState is loaded properly, but the broadcastState seems lost. override fun open(parameters: Configuration?) { transactionActivityWindowState = runtimeContext.getMapState(transactionsWindowDescriptor) ruleBroadcastStateDescriptor = ruleConfigBroadcastDescriptor } Is there a way to load the
    broadcastState
    the same way we read this
    MapState
    ? Or how should we handle this situations?
  • a

    Amir Hossein Sharifzadeh

    05/30/2023, 4:37 PM
    Hi, I can run my standalone Flink project from command line without having any issue but when I try to rut it through Flink job, I get this error (and yes, my Java version is 11.0.18):
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
    at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
    Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:163)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:141)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache.get(LocalCache.java:3951)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at <http://org.apache.flink.calcite.shaded.com|org.apache.flink.calcite.shaded.com>.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
    at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
    at org.apache.calcite.rel.rules.ReduceExpressionsRule$FilterReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:153)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:329)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:253)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:226)
    at <http://org.varimat.com|org.varimat.com>.EMPADStreamCommand.processWorkflow(EMPADStreamCommand.java:220)
    at <http://org.varimat.com|org.varimat.com>.EMPADStreamCommand.processFromStream(EMPADStreamCommand.java:203)
    at <http://org.varimat.com|org.varimat.com>.EMPADStreamCommand.main(EMPADStreamCommand.java:240)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 9 more
    Caused by: java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @338494fa; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
    at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
    at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:161)
    ... 62 more
    s
    s
    • 3
    • 6
  • a

    André Luiz Diniz da Silva

    05/30/2023, 5:26 PM
    Hello everyone, how are you? I have some doubts about the savepoint process and TableAPIs. The question is how
    uid
    are assigned when having sources declared using the TableAPI? Is there a way of forcing the
    uid
    ? The context is that I have multiple Kafka sources declared inside my application and I want to reset some of them to earliest. I already changed the reset configuration to earliest but It looks like even if I change the reset configuration and the table name it still uses the savepoint information somehow.
    m
    • 2
    • 9
  • t

    Tudor Plugaru

    05/30/2023, 7:36 PM
    Any idea how to deal with
    Copy code
    2023-05-30 19:34:53,077 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4228
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    I've tried setting the configuration mentioned here, but the issue still persists. Thanks
  • r

    Rion Williams

    05/30/2023, 9:22 PM
    Hey fellow Flinkers, I currently have a Flink job that handles reading from a source (Kafka) and writing output to three separate sinks: Kafka, Elasticsearch, and Postgres. I’m seeing some issues with performance where the job will seemingly stall after a period of time. I’ve surmised that this bottleneck/backpressure seems to be related to Elasticsearch, but I can’t quite prove that. I suspect that because the job is having trouble writing to ES, it’s failing to commit the offsets for the original messages, and thus, not progressing to continue to read from Kafka. Is there any way to further rule out any of the other sinks in this scenario? Just trying to isolate the problem down to a specific sink so that I can uncork the bottleneck Happy to share other details about the job if that would be helpful.
    • 1
    • 5
  • a

    Ari Huttunen

    05/31/2023, 6:54 AM
    Copy code
    ERROR: Cannot install apache-flink-ml==2.2.0 and apache-flink==1.17.0 because these package versions have conflicting dependencies.
    
    The conflict is caused by:
        The user requested apache-flink==1.17.0
        apache-flink-ml 2.2.0 depends on apache-flink==1.15.1
    
    To fix this you could try to:
    1. loosen the range of package versions you've specified
    2. remove package versions to allow pip attempt to solve the dependency conflict
    
    ERROR: ResolutionImpossible: for help visit <https://pip.pypa.io/en/latest/topics/dependency-resolution/#dealing-with-dependency-conflicts>
    I'd like to use these together, but I cannot. (I could use newer ones.) (I'd like to get the KBinsDiscretizer.)
    m
    d
    • 3
    • 11
  • h

    Hangyu Wang

    05/31/2023, 10:08 AM
    Hi all. My purpose is reading csv file from s3 and to do some aggregation in flink.
    Copy code
    CsvReaderFormat<MetricEvent> csvFormat = CsvReaderFormat.forPojo(Metric.class);
            FileSource<MetricEvent> source = FileSource.forRecordStreamFormat(csvFormat, new Path("<s3://test-dev/test.csv>")).build();
            DataStream<Metric> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Metric");
    According to the log, the s3 access-key and secrete key has been successfully loaded, but I still got an error showing that 403 forbidden error. What am I missing?
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
    	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143) ~[flink-connector-files-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:181) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:615) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1044) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:961) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:424) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
    	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
    Caused by: java.nio.file.AccessDeniedException: <s3://test-dev/test.csv>: getFileStatus on <s3://test-dev/test.csv>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: WZ0MBF6DFKQ2GT21; S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=; Proxy: null), S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=:403 Forbidden
    	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255) ~[?:?]
    	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
    g
    m
    • 3
    • 13
  • m

    Mishel Liberman

    05/31/2023, 10:49 AM
    How do I use the hdfs with flink docker? (using flink session deployment mode on k8s) The documentation doesn’t help too much.. I tried to include hadoop jars in the /opt/flink/lib folder and it didn’t helped either, also tried to put them under /opt/flink/plugins/hdfs with no success. Any ideas?
    g
    • 2
    • 5
  • y

    Yaroslav Bezruchenko

    05/31/2023, 4:01 PM
    Hey, Can you guys please help me understand how state recovery works in flink? I'm having a 1.16.2 flink deployment running with 2 JobManagers and 1 taskManager. So if main JobManager dies - second will continue the work. But what if taskManager dies? Will it restore processing from latest checkpoint or from latest savepoint?
    s
    • 2
    • 3
  • a

    Amir Hossein Sharifzadeh

    05/31/2023, 4:55 PM
    Hi everybody, I am trying to create a Docker image. Before asking my question, I will explain to you about my application. My main (command) jar file: 1) Has dependencies on other jar files (they are all in the same directory) 2) It Needs to read some arguments from a config file. 3) Generates an output file.Is dealing with
    EmbeddedRocksDBStateBackend
    . 4) Is NOT doing anything with SQL Client. I am following docker documentation here and will need to create a Dockerfile and a
    docker-compose.yml
    Question 1: From docker-compose.yml, in the command section, what arguments should I pass? (I run the standalone application from the terminal:
    java -Xms10g -Xmx28g -jar my-stream-processing-1.2.jar --config stream.config
    Question 2: Where to upload other jar dependencies?
  • z

    Zgeorge2

    05/31/2023, 7:49 PM
    I have a flink cluster( 1 master, 3 workers) running on my laptop. Job submission works. Tried the same cluster on a remote VM ( internal dev cloud). The cluster works on that VM too. Next, I tried to deploy this cluster on separate VMs 1 Master VM running the Job Manager 3 Worker VMs running the Task Managers Cluster comes up fine. I can see the java processes running on each VM. However, Job submission fails with CompletionException, ConnectionException: Connection Refused Checking with netstat and lsof, the job manager ports on the Master VM is running ONLY on tcp6 proto. NOT tcp (4). So even a simple ncat ping with the ipv4 address from one of the workers to the Master does not get through. Neither does an ncat ping from Master to itself on that port get through. If I run ncat on those same ports( no flink) then port pings from the workers get through. So clearly there is no reachability to Flink running for some reason with ipv6 proto only. 1. Is there a configuration option to fix this? So the jobmanagers and task managers run both ipv6 and ipv4 protos for their ports? 2. Is there documentation on how taskmanager host, bind-host and port configurations should be done in a multi- VM cluster?
  • r

    Ravi Nishant

    05/31/2023, 11:17 PM
    Hey Everyone, I am trying to figure out the upgrade process for Flink operator. Please note that our CI/CD does not allow use of helm so the operator upgrade process using helm in official doc might not be relevant to us. The manual upgrade process described here requires job downtime and we would like to avoid that if possible. Since Flink operator documentation guarantees backward compatibility, will below steps suffice to ensure a smooth upgrade - • (kubectl) replace existing CRD with the desired version CRD • remove existing operator deployment • deploy the desired version flink operator. • For each job in cluster ◦ Suspend the jobs with savepoint ◦ delete existing FlinkDeployment resource. ◦ create new FlinkDeployment resource and ensure to start the job by resuming from savepoint.
  • z

    Zgeorge2

    06/01/2023, 12:21 AM
    https://apache-flink.slack.com/archives/C03G7LJTS2G/p1685562598369509 1. Does Flink run ipv6 proto only by default? 2. Is there a starter, but detailed, tutorial to get a multi VM cluster up amd running? Particularly interested in the flink-conf.yaml contents on each machine. 3. At the least what should be set for job manager amd task manager host, bind host addresses. I am using ver 1.17
  • b

    Bharathkrishna G M

    06/01/2023, 1:43 AM
    Hi, I am trying to use unaligned checkpoints with Flink 1.16.1 I set :
    Copy code
    execution.checkpointing.unaligned: true
    and also in the code I tried:
    Copy code
    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.enableCheckpointing(120000, CheckpointingMode.AT_LEAST_ONCE)
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        env.getCheckpointConfig.setCheckpointTimeout(240000)
        env.getCheckpointConfig.enableUnalignedCheckpoints()
    But in the UI, it still shows it uses aligned checkpoints as shown in image. What am I missing here?
    w
    • 2
    • 4
  • a

    akira

    06/01/2023, 6:30 AM
    use flinksql sink to aws s3 could lost some file (about 2%) ,but there is no error log , who ever encountered this situation? this is the log:
    Copy code
    2023-05-28 16:40:07,949 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 14 checkpointing for checkpoint with id=1715 (max part counter=1856).
    2023-05-28 16:40:13,887 INFO org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat [] - creating new record writer...org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat@bae30b0
    2023-05-28 16:40:13,887 INFO org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - initialize serde with table properties.
    2023-05-28 16:40:13,887 INFO org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - creating real writer to write at <s3a://xxx/dt=20230528/hh=16/.part-6af1dfde-765c-4bbb-b593-3c01c5d66192-14-1856.inprogress.4f30f53f-10c0-40e5-a707-7f572850526d>
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.codec.CodecConfig [] - Compression: GZIP
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Parquet block size to 134217728
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Parquet page size to 1048576
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Parquet dictionary page size to 1048576
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Dictionary is on
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Validation is off
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Writer version is: PARQUET_1_0
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Maximum row group padding size is 8388608 bytes
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Page size checking is: estimated
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Min row count for page size check is: 100
    2023-05-28 16:40:13,887 INFO org.apache.parquet.hadoop.ParquetOutputFormat [] - Max row count for page size check is: 10000
    2023-05-28 16:40:13,898 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.gz]
    2023-05-28 16:40:13,899 INFO org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - real writer: org.apache.parquet.hadoop.ParquetRecordWriter@2d1ee4ae
    2023-05-28 16:40:17,894 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 14 received completion notification for checkpoint with id=1715.
  • s

    Suparn Lele

    06/01/2023, 7:36 AM
    Hello everyone, I am trying to run unit tests using apache flink and scalatest module. I am declaring a StreamExecutionEnvironment and I am using that reference in all of my UTs. I have written 16 UTs. What happens is first 8 out of them pass while the next 8 fails. Tests which were failing earlier are passing if I shift them to 1-8 position and those passing start to fail if I keep them at 9-16 position. So in short there is no logic issue here. I am getting following error -
    Copy code
    java.lang.RuntimeException: Failed to fetch next result
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
      at scala.collection.TraversableOnce$<http://class.to|class.to>(TraversableOnce.scala:310)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$<http://1.to|1.to>(CloseableIterator.scala:35)
      ...
      Cause: java.io.IOException: Failed to fetch job execution result
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
      ...
      Cause: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      ...
      Cause: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
      at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
      at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
      at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
      at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      ...
      Cause: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
      at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
      at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
      at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
      at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      ...
      Cause: com.esotericsoftware.kryo.KryoException: Unable to find class: sun.reflect.GeneratedConstructorAccessor88
    Serialization trace:
    delegate (sun.reflect.DelegatingConstructorAccessorImpl)
    constructorAccessor (java.lang.reflect.Constructor)
    _constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
    _fromLongCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
    _valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
    _rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
    objectMapper (com.jayway.jsonpath.spi.mapper.JacksonMappingProvider)
    mappingProvider (com.jayway.jsonpath.Configuration)
    configuration (com.jayway.jsonpath.internal.JsonContext)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
      at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
      at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
      at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
      ...
      Cause: java.lang.ClassNotFoundException: sun.reflect.GeneratedConstructorAccessor88
      at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
      at java.lang.Class.forName0(Native Method)
      at java.lang.Class.forName(Class.java:348)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    I am using object to initialize StreamExecutionEnvironment. The code goes like following
    Copy code
    def getStreamExecutionEnvironment: StreamExecutionEnvironment = {
      val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      val unmodifiableCollection = Class.forName("java.util.Collections$UnmodifiableCollection")
      streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])
    
      streamExecutionEnvironment
    }
    I suspect this to be a memory issue. Could someone please help??
  • v

    Viktor Hrtanek

    06/01/2023, 10:00 AM
    Hi all, I am building a streaming app with pyflink which needs to do a lookup to shared state store to obtain a value (produced by different flink job), but as I understood flink native state stores are tied to an operator (e.g. flat_map…). Options I am thinking about are either using my own rocks db serving as shared state or building table on the top of a changelog topic in each flink job, which requires the lookup. I am wondering if there is any better/“flink native” way of implementing such a lookup to shared state store? My stack if pyflink+confluent kafka Any help will be much appreciated.
  • m

    Mourad HARMIM

    06/01/2023, 10:00 AM
    Hello Guys ! I'm considering usung the queryable state feature in my current project but on the roadmap it seems to be reaching end-of-life. So I'd like to ask you : Whether there is an issue with the feature ? Is there any work around to access state values externally ? Thank you in advance 🙏
  • e

    Emanuele Pirro

    06/01/2023, 12:38 PM
    Hi everyone 🙋 My Flink operators carry around a quite long and complex object which is heavy to serialise. I'm letting Flink figure out the
    TypeInformation
    and related serialisers, but that causes compilation time to run for several minutes, probably because these objects are created by a macro. Is there any way to solve this issue, apart from creating a custom
    TypeInformation
    & serialisers? Thank you 🙂
  • o

    Otto Remse

    06/01/2023, 2:19 PM
    Hi! We're testing the flink kubernetes operator (version 1.5.0, flink version 1.17.1). We enabled the autoscaler feature but the operator cannot find any PENDING_RECORDS metrics. We're using flink-sql-connector-kafka version 1.17.1 for our sources. I can see a pendingRecords metric on the source when looking at the dashboard. Is this something I need to manually expose so the operator can pick it up?
    • 1
    • 1
  • a

    Arthur Mantelato Rosa

    06/01/2023, 2:23 PM
    Hi folks, I have a question regarding number of task managers available and number of topic partitions on Kafka sources. Is it a problem to have more task managers than partitions on my several sources? I'm using Flink 1.17.0 and these sources don't have
    withIdleness
    enabled on their event-time-based watermark strategies.
    m
    d
    a
    • 4
    • 6
  • b

    Bhupendra Yadav

    06/01/2023, 4:19 PM
    Hi everyone. We are using the Flink Kubernetes operator. FlinkSessionJob for submitting flink jobs was recommended over REST API by Gyula a few months ago. So we have recently tried using flinkSessionJobs Custom Resource and noticing a few issues where it's not consistently updating the state of the job in CR status. A few issues: 1. Say we submitted a job and it is in a running state and fails, the CR status will be marked as reconciling(we have set kubernetes.operator.job.restart.failed: "true") and sometimes gets stuck in this state infinitely. On checking logs we are seeing, it's resubmitting the flink job and keeps checking its status, now if flink JM restarts it will log with warn level saying jobID <job-id> not found, and get's stuck here. Ideally it should mark the CR job status to a terminal state. 2. Sometimes a job status remains in UPGRADING state with below error even if the Job Manager is stable(say after some restart). Ideally it should reconcile and try to submit job.
    Copy code
    {
      "type": "org.apache.flink.kubernetes.operator.exception.ReconciliationException",
      "message": "java.util.concurrent.TimeoutException",
      "throwableList": [
        {
          "type": "java.util.concurrent.TimeoutException"
        }
      ]
    }
    In case of job retries when the job fails, it will be helpful to get a restart count in FlinkSessionJob CR status to know what exactly is going on. In the current state if CR's status.jobStatus.state is null, then it's hard to determine the state of a job. For an external observer watching these session jobs, we can't determine the state just based on if the status.error is null or not as we don't know if the error is transient or not. Please let me know if anyone already using FlinkSessionJob then how are you determining the flink job state based on it.
    g
    r
    • 3
    • 4
  • b

    Bharathkrishna G M

    06/01/2023, 7:24 PM
    Hi, I have a use-case, where I get updates through GCP Pub/Sub stream. I get the file names from Pubsub and I want to read those files through Flink. I'm able to get fileNames through PubsubSource via flink-pubsub-connector. But I'm stuck at how to read these files dynamically now. env.readTextFiles will only accept pre-defined paths, so how to do this in streaming fashion ?
  • p

    Parmveer Randhawa

    06/02/2023, 2:01 AM
    Hey,
  • p

    Parmveer Randhawa

    06/02/2023, 2:01 AM
    Is it possible to submit a pyflink program in a remote cluster
    d
    • 2
    • 1
  • e

    Emily Li

    06/02/2023, 4:31 AM
    Hey, we have a flinkapp, that we want to scale to more than 128 parallelism, because the default max parallelism is 128, if we want to change to anything above 128, we need to update the maxParalellism, then I realised changing maxParalleism is a backward incompatible change which means we won't be able to restore from previous state if we do so. Unless we use state process API to read the state, update the max parallelism, write to a new state, and restore from the new state. So we started to look at state processor API, but I found very little information online on how it works. I'm hoping to get some help from anyone who's experienced with the State Processor API. Our flink app that has a FlinkKafkaConsumer source and 3 StreamingFileSinks, those are flink built in operators, and they both have states, from my understanding, if I want to modify the max parallelism, then I'll need to : 1. Read from existing states from both operators of the existing state. 2. Create StateBootstrapTransformation for each operator (1 source, 3 sinks) 3. Update the max parallelism and write a new savepoint with the transformations 4. Restore from the new savepoint. Not sure if the above steps are correct, please correct me if things don't work this way. I was trying to figure out how to read the existing states from the KafkaConsumer source and StreamingFile sink states, and have a few questions: 1. For the kafka consumer source, we found in the flink source code that it initialise the state as UnionListState, and the type seems to be Tuple2<KafkaTopicPartition, Long>, but I did not see a processFunction that the kafkaConsumer implements to define how the element is processed (how the state is updated), if we want to write a transformation for this state, should I read the state as UnionListState, and write our own StateBootstrapFunction to write it? Or is there a better way to do it? 2. For the s3 sink, it's using the StreamingFileSink, it seems to have two states: bucketState(byte[]) and maxPartCounterState(Long), and because we have 3 different sinks and we were using it with KeyBy, should we deal these as keyedState instead of opertorStates? If we should treat this as keyed state, as I don't see any KeyedProcessFunctions defined in the StreamingFileSink either, would it work? Or is there a better way for us to modify the max parallelism of the existing checkpoints/savepoints without doing all these?
    d
    • 2
    • 3
1...848586...98Latest