https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • p

    Pouria Modaresi

    12/13/2022, 10:01 PM
    Hi Everyone im using single kinesis data stream as my data source ,and create 2-3 different notebooks ( In Apache zeppline) with different output stream but after 1-2 minutes i receive this error message why?please help me RuntimeException: Retries exceeded for getRecords operation - all 3 retry attempts failed.
  • j

    Jirawech Siwawut

    12/14/2022, 6:43 AM
    Hi. Does anyone have seen this issue before? Basically i tried to increase parallelism for Flink job that sink data to Hive. I also enable compaction. Here is the error. It seems to relate to CompactorOpeartor.
    Copy code
    java.util.NoSuchElementException
    	at java.util.ArrayList$Itr.next(ArrayList.java:864)
    	at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    	at java.lang.Thread.run(Thread.java:750)
    m
    • 2
    • 3
  • u

    upendra reddy

    12/14/2022, 8:23 AM
    Hello everyone, We have a Flink application running on AWS KDA with Kinesis as the source and sink. We have a Tumbling window Operator before the sink that aggregates the data for 6 hours and pushes it to the Kinesis sink. Because the aggregation window is large, we have a significant amount of state data. When the tumbling window timer triggers, we see a sudden surge in data being written to the output KDS, which causes our database to be throttled as our output KDS consumers read this surge data within a few minutes. We are looking for ways to rate limit or reduce the rate at which writes occur to the sink (Kinesis). We tested the Async Sink with configurations like MaxBatchSize and MaxInFlightRequests, but we did not see a significant improvement. We would appreciate any further suggestions.
    a
    • 2
    • 1
  • j

    Johannes Henrysson-Tholse

    12/14/2022, 8:59 AM
    Hey there, So this is probably a bit out of scope for this topic, and if I’m breaking any rules or anything, just let me know and I’ll remove the post. So I’m working at Prisjakt/Pricespy, which is a price comparison site located in Sweden, UK, NZ, etc. Since a while back I’m working with the search engine, elasticsearch. The plan is to use flink to ingest the data needed, such as products, offers and shops etc. We are primarily using sql in flink, with some udf’s, and while the first very simple versions ran smoothly, the more complex versions right now doesn’t work, despite having quite a lot of resources using google cloud. We have started a plan to try and figure out more specifically where the issue lies, which is why I don’t have any concrete questions just now. For a while before we tried a lot of different things, but need a bit more structure now to really point out the issues that we see/experience. However, the question of having someone to review our complete environment has come up a few times. We have even been looking a bit here and there for consultants, but it has turned out harder than expected to find any with good flink knowledge and the will/time to help us out. Can someone point us in the right direction for where to find flink knowledge, to help us review our setup?
  • a

    Amenreet Singh Sodhi

    12/14/2022, 11:52 AM
    Hi all, What are all the ways to submit jobs automatically when flink cluster is run in session mode on k8s cluster, instead of using web ui to submit the job manually? I couldnt find any good resource regarding the same. (By automatically i mean using some scripts or customized docker image or using init containers or using some sort of controller which submits the jar for us, etc)
    s
    g
    • 3
    • 19
  • p

    Pedro Mázala

    12/14/2022, 2:40 PM
    Hey!! I’m trying to deploy a FlinkSQL application but I’m getting this error:
    Copy code
    Caused by: java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @5a37d3ed; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')                                                                             at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) ~[flink-table-runtime-1.16.0.jar:1.16.0]                                                                     at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79) ~[flink-table-runtime-1.16.0.jar:1.16.0]                                                               at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426) ~[?:?]                                                                                                          at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374) ~[?:?]                                                                                                            at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109) ~[?:?]                                                                                                  at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165) ~[?:?]                                                                                           at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[?:?]                                                                                      at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[?:?]                                                                                                      at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[?:?]                                                                                               at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[?:?]                                                                                                           at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[?:?]                                                                                                                   at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]                                                                                                             at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
    I’ve tried to fix
    org.codehaus.janino:janino
    and
    org.codehaus.janino:commons-compiler
    on version
    3.0.11
    which is the one I find on flink and no other dependency is requiring those packages. my flinksql dependencies. version 1.16.0
    Copy code
    implementation "org.apache.flink:flink-table:${flinkVersion}"
        implementation "org.apache.flink:flink-table-common:${flinkVersion}"
        implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
        implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
        implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
        implementation "org.apache.flink:flink-json:${flinkVersion}"
    My build process generates one image with a Flink instance (which is stateless and works) and the FlinkSQL (which fails). Can you give me advise on it?
    m
    h
    • 3
    • 10
  • f

    Felix Angell

    12/14/2022, 4:43 PM
    hey, is it viable to backport the UniformShardAssigner for usage in Pyflink 1.13?
    m
    d
    • 3
    • 8
  • u

    umesh dangat

    12/14/2022, 4:50 PM
    Hello, I am trying to use plugins (the filesystem connector: flink-s3-fs-presto). I performed the steps mentioned in the link but I believe the hadoop conf classes are not found when I try to run my job (from within intelliJ as well as locally). This is the stack trace I get when I try to run the job in intellij
    Copy code
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
    	at org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:92)
    	at org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:52)
    	at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:80)
    	at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:74)
    	at org.apache.flink.connector.file.table.FileSystemTableSink.createWriter(FileSystemTableSink.java:381)
    	at org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(FileSystemTableSink.java:202)
    	at org.apache.flink.connector.file.table.FileSystemTableSink.consume(FileSystemTableSink.java:160)
    	at org.apache.flink.connector.file.table.FileSystemTableSink.access$000(FileSystemTableSink.java:95)
    	at org.apache.flink.connector.file.table.FileSystemTableSink$1.consumeDataStream(FileSystemTableSink.java:143)
    	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:447)
    	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:193)
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:167)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
    	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:82)
    	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    	at scala.collection.Iterator.foreach(Iterator.scala:937)
    	at scala.collection.Iterator.foreach$(Iterator.scala:937)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:81)
    	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
    	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:108)
    	at com.yelp.sqlclient.apps.SqlClientAppImpl.execute(SqlClientAppImpl.java:166)
    	at com.yelp.sqlclient.cli.SqlClientAppCmd.call(SqlClientAppCmd.java:55)
    	at com.yelp.sqlclient.cli.SqlClientAppCmd.call(SqlClientAppCmd.java:21)
    	at picocli.CommandLine.executeUserObject(CommandLine.java:1953)
    	at picocli.CommandLine.access$1300(CommandLine.java:145)
    	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2358)
    	at picocli.CommandLine$RunLast.handle(CommandLine.java:2352)
    	at picocli.CommandLine$RunLast.handle(CommandLine.java:2314)
    	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
    	at picocli.CommandLine$RunLast.execute(CommandLine.java:2316)
    	at picocli.CommandLine.execute(CommandLine.java:2078)
    	at com.yelp.sqlclient.cli.SqlClientCmd.main(SqlClientCmd.java:171)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
    	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    Any obvious thing I am missing here to help loadPlugins ? I am using flink 15.2
    s
    m
    • 3
    • 4
  • k

    Kyle Ahn

    12/14/2022, 6:00 PM
    Has
    flink-kubernetes-operator-1.1.0
    been removed from the apache downloads index?
    m
    m
    +2
    • 5
    • 16
  • t

    Tsering

    12/14/2022, 6:11 PM
    hi friends, i am trying to run a flink app on AWS Kinesis and got stuck with the following error
    Copy code
    java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file LOG does not exist on the TaskExecutor.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2068)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flink.util.FlinkException: The file LOG does not exist on the TaskExecutor.
    can any one enlight me on this 🙏
    h
    d
    • 3
    • 6
  • s

    Sami Badawi

    12/14/2022, 6:20 PM
    In Scala Flink I use asyncio for http REST calls. Is PyFlink also well suited for http REST calls? I know that PyFlink currently doesn't support asyncio. But do anybody have experiences with doing REST calls from PyFlink? The volume I am expecting is not very high but the REST calls can wait for many seconds, so I am wondering if aiohttp Python calls can be a problem.
    d
    • 2
    • 1
  • s

    Slackbot

    12/14/2022, 9:18 PM
    This message was deleted.
    m
    • 2
    • 7
  • e

    Emmanuel Leroy

    12/14/2022, 9:41 PM
    Hi, Has anyone implemented a FileSource or HybridSource to read bulk Avro format? I see examples for the batch mode, but not for streaming.
  • n

    Nathanael England

    12/14/2022, 11:43 PM
    I'm currently waiting on a jira account, but in line with https://issues.apache.org/jira/browse/FLINK-29796, I'm looking for an alternative mechanism to pull apache-flink python into our monorepo while I wait for the protobuf version bump to make it into an official release. I was exploring a VCS requirement in our
    requirements.txt
    , but I can't get beyond the step looking for
    apache-flink-libraries
    since it bases the version of that off what's in the version of the repo pulled down. Any other tricks to try or am I looking at forking the repository until an upstream release is generated?
    m
    • 2
    • 9
  • r

    Raghunadh Nittala

    12/15/2022, 2:54 AM
    Hey Everyone, Can someone provide any leads on using S3 Proxy for writing parquet files to Azure Blob Storage?
    s
    • 2
    • 10
  • b

    Bumblebee

    12/15/2022, 5:00 AM
    Hi all while using vervirica Flink CDC connector with Flink Kafka connector for 1.16 Flink version facing java.lang.NoSuchFieldError: Default
    • 1
    • 1
  • s

    Suparn Lele

    12/15/2022, 10:25 AM
    Hi, I am trying to achieve connection pooling with flink. Let me explain my use case. Basically I am fetching data from kafka. I am sending lots of different metrics in kafka topic. So I am filtering the input based on identifiers of the metrics. I have 30 different metrics. So after fetching data from kafka I think there are 30 separate pipelines In that pipeline, I am applying some transformation and sink operation I am using RichSinkFunction, where in the open method I am calling a method that would return the connection pool(Hikaripool with max size 20). When we use 4 parallelism, 4 slots, 8 cores, 32 GB memory. I can see that only 80 connections are created. But when I set 8 parallelism, 4 slots, 8 cores, 16GB memory for each taskmanager then connections are exceeding 250. My understanding was that since task managers are different host there should have been 20 connections in case 1 and 40 connections in case 2. Can someone please explain me that if I am fetching connection pool of size 20 in open method of RichSinkFunction, Lets say I have 4 slots, 8 parallelism then for how many times it should get the connection pool?
    m
    • 2
    • 6
  • d

    Dmitry Koudryavtsev

    12/15/2022, 2:22 PM
    Hi, sometimes, after application restart, we're getting following exception:
    Copy code
    Source: My Source (1/2)#5 (...) switched from INITIALIZING to FAILED with failure cause: java.lang.IllegalArgumentException: MyRMQSource retrieved invalid state.
      at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
      at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.initializeState(MessageAcknowledgingSourceBase.java:171)
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
      at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
      at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
      at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
      at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
      at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
      at java.base/java.lang.Thread.run(Unknown Source)
      Suppressed: java.lang.NullPointerException
        at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:101)
        at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:278)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
        at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1000)
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:919)
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
        ... 3 more
    MyRMQSource defined as generic Scala class:
    Copy code
    import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
    ...
    @SerialVersionUID(1)
    final class MyRMQSource[A](...) extends RMQSource[A](...) { ... }
    Probably, I found similar issue: https://issues.apache.org/jira/browse/FLINK-28274 but I don't know how this can help. Anyway
    Copy code
    Flink 1.15.3
    parallelism.default: 24
    pipeline.max-parallelism: 48
    Please, help.
    m
    • 2
    • 6
  • e

    Emmanuel Leroy

    12/15/2022, 2:41 PM
    Hi, When using a Path to define files to fetch for a FileSource, is there any guarantee about order of the files being listed from the Path? If not, how do I go about enumerating files myself and sorting them?
    m
    s
    • 3
    • 36
  • s

    Slackbot

    12/15/2022, 3:10 PM
    This message was deleted.
    m
    p
    • 3
    • 2
  • e

    Emmanuel Leroy

    12/15/2022, 3:55 PM
    other question: How do I setup the flink-s3-fs-hadoop plugin to be used locally when debugging? it’s supposed to be in the plugin folder but when debugging I don’t have this. Adding it to the classpath doesn’t seem to help. ANy ideas?
    l
    • 2
    • 12
  • s

    Suparn Lele

    12/15/2022, 4:02 PM
    Hello I had one question. Say I am running an batch job which has following flow. intialize StreamExecutionEnv and StreamTableEnv tableNames.foreach(x => { 1. load data from jdbc using table apis 2. process data using sql queries 3. store data in db }) streamExecutionEnv.execute() Now i am running this in batch mode. So will the process inside the loop would happen sequentially or it would happen simultaneously?
    m
    • 2
    • 8
  • y

    Yaroslav Bezruchenko

    12/15/2022, 4:46 PM
    Hey, I'm trying to deploy my flink app using Flink Operator. Problem is that I need to pass kubernetes secrets to flinkConfiguration, but when I do it from env it's not working. And I can't map it in flinkConfiguration, as it accepts only strings. I've tried to do next:
    Copy code
    ParameterTool tool = ParameterTool
                    .fromArgs(args)
                    .mergeWith(ParameterTool.fromMap(System.getenv()));
            final Configuration configuration = tool.getConfiguration();
            configuration.setString("s3.endpoint", configuration.get(ConfigOptions.key("S3_ENDPOINT")
                    .stringType()
                    .noDefaultValue()));
    
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    And it still not working. Any insights on how I can provide secrets to flink conf?
    e
    • 2
    • 7
  • e

    Emmanuel Leroy

    12/15/2022, 5:03 PM
    How do I access the config from theflink-conf.yaml in the main? env.getConfiguration() does not give me those values.
  • k

    Krish Narukulla

    12/15/2022, 5:08 PM
    is there a way to declare flink table for kafka which can consume all fields without declaring them in table definition?
    m
    s
    • 3
    • 3
  • y

    Yaroslav Bezruchenko

    12/15/2022, 6:26 PM
    Is there a way to disable ssl cert check for s3.endpoint specifically?
    e
    • 2
    • 3
  • s

    Steven Zhang

    12/15/2022, 9:25 PM
    If I updated the Flink operator's configmap with a new config such as changing the number of
    taskmanager.numberOfTaskSlots
    from 10 to 20, what's the best way to go about triggering redeployments of the Flink clusters managed by operator? It seems like the only way to do it is to make some other change to the FlinkDeployment definition (such as changing image) and when operator redeploys the Flink cluster because it detects that change, the new config from operator gets picked up. Is there a way to trigger a cluster redeploy at will?
    m
    s
    • 3
    • 4
  • n

    Nathanael England

    12/15/2022, 11:16 PM
    When setting up a rabbitmq source, what's the typical practice for creating a queue and configuring the bindings to exchanges for the singular queue name passed into
    RMQSource
    ? Similarly, for
    RMQSink
    , I see it take a queue name, but rabbitmq publishes to an exchange. Is this just a matter of generalizing terminology or are there important aspects at play here?
    d
    m
    • 3
    • 5
  • c

    Colin Williams

    12/16/2022, 2:04 AM
    What's prescribed when you recieve the exception
    Copy code
    Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [mytopic-0]
    Do we just set some specific kafka options like read from earliest offset? Why do I get this exception reading from a populated topic from the start?
    • 1
    • 4
  • e

    Emmanuel Leroy

    12/16/2022, 3:41 AM
    Hi, I worked out a HybridSource that reads a bunch of files from object storage before switching to Kafka streaming. Now, the job is busy 100% at the beginning as expected, number of records goes really fast, then slows down when it switches to the streaming source, however. the task stays 75% busy for some reason. With streaming alone, the task is 0% busy, so I’m confused as to why it stays busy like this. Any ideas on how to debug this issue?
1...404142...98Latest