https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Amir Hossein Sharifzadeh

    05/05/2023, 5:26 PM
    Is there any example of Docker file that contains Java installation? My server does not have Java and it needs to be installed through Dockerization.
  • d

    Danila Maksimenko

    05/05/2023, 6:41 PM
    Hello, I've been trying to start a platform which consists of flink, minio(for S3), sql-gateway, postgress, metastore standalone, running hive, Hadoop and iceberg. Ive run into an issue where sql-gateway is outputting that (HiveConf of name hive.s3.aws-secret-key, hive.s3.endpoint, hive.s3.aws-access-key) do not exist even though they do and that they are in the right directory, and second metastore crashing after throwing a ([main] ERROR org.apache.hadoop.hive.metastore.utils.MetaStoreUtils - Got exception: org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3"). As this is my first Devops job after finishing courses I'd really appreciate any help here, even just direction to places I could ask. if you need any more information to diagnose I can provide. Thank you
    n
    • 2
    • 4
  • d

    Danila Maksimenko

    05/05/2023, 6:54 PM
    here are my metastore and hive-site.xml.
    Copy code
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:<postgresql://postgres:5432/metastore></value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>org.postgresql.Driver</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>hive</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>hive</value>
        </property>
        <property>
            <name>hive.metastore.schema.verification</name>
            <value>true</value>
        </property>
        <property>
            <name>metastore.thrift.port</name>
            <value>9083</value>
        </property>
        <property>
            <name>aws.region</name>
            <value>us-west-2</value>
        </property>
        <property>
            <name>metastore.task.threads.always</name>
    <!--        <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask</value>-->
            <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
        </property>
        <property>
            <name>metastore.expression.proxy</name>
            <value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
        </property>
        <property>
            <name>hive.metastore.warehouse.dir</name>
            <value><s3://tenant1/tenant1/data/></value>
        </property>
        <property>
            <name>fs.s3a.access.key</name>
            <value>sql-demo</value>
        </property>
        <property>
            <name>fs.s3a.secret.key</name>
            <value>demo-sql</value>
        </property>
        <property>
            <name>fs.s3a.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3a.path.style.access</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3.access.key</name>
            <value>sql-demo</value>
        </property>
        <property>
            <name>fs.s3.secret.key</name>
            <value>demo-sql</value>
        </property>
        <property>
            <name>fs.s3.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3.path.style.access</name>
            <value>true</value>
        </property>
        <property>
            <name>s3.access.key</name>
            <value>sql-demo</value>
        </property>
        <property>
            <name>s3.secret.key</name>
            <value>demo-sql</value>
        </property>
        <property>
            <name>s3.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>s3.path.style.access</name>
            <value>true</value>
        </property>
        <property>
        <name>hive.metastore.uris</name>
        <value><thrift://localhost:9083></value>
        </property>
        <property>
        <name>hive.metastore.client.factory.class</name>
        <value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
        </property>
        <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
        </property>
    </configuration>
  • d

    Danila Maksimenko

    05/05/2023, 6:55 PM
    and heres the hive-site.xml
    Copy code
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <configuration>
        <property>
           <name>hive.s3.endpoint</name>
            <value><http://minio:9000></value>
        </property>
    
        <property>
            <name>hive.s3.aws-access-key</name>
            <value>hive</value>
        </property>
    
        <property>
            <name>hive.s3.aws-secret-key</name>
            <value>hive</value>
        </property>
    
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:<postgresql://postgres:5432/metastore></value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>org.postgresql.Driver</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>hive</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>hive</value>
        </property>
        <property>
            <name>hive.metastore.schema.verification</name>
            <value>true</value>
        </property>
        <property>
            <name>metastore.thrift.port</name>
            <value>9083</value>
        </property>
        <property>
            <name>metastore.task.threads.always</name>
    <!--        <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask</value>-->
            <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
        </property>
        <property>
            <name>aws.region</name>
            <value>us-west-2</value>
        </property>
        <property>
            <name>metastore.expression.proxy</name>
            <value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
        </property>
        <property>
            <name>hive.metastore.warehouse.dir</name>
            <value><s3://tenant1/tenant1/data/></value>
        </property>
        <property>
            <name>fs.s3a.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3a.path.style.access</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3.access.key</name>
            <value>sql-demo</value>
        </property>
        <property>
            <name>fs.s3.secret.key</name>
            <value>demo-sql</value>
        </property>
        <property>
            <name>fs.s3.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>fs.s3.path.style.access</name>
            <value>true</value>
        </property>
        <property>
            <name>s3.access.key</name>
            <value>sql-demo</value>
        </property>
        <property>
            <name>s3.secret.key</name>
            <value>demo-sql</value>
        </property>
        <property>
            <name>s3.connection.ssl.enabled</name>
            <value>true</value>
        </property>
        <property>
            <name>s3.path.style.access</name>
            <value>true</value>
        </property>
        <property>
        <name>hive.metastore.uris</name>
        <value><thrift://localhost:9083></value>
        </property>
        <property>
        <name>hive.metastore.client.factory.class</name>
        <value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
        </property>
        <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
        </property>
        <property>
           <name>hive.s3.endpoint</name>
            <value><http://minio:9000></value>
        </property>
    </configuration>
  • v

    Vladimir Tiukhtin

    05/06/2023, 9:26 PM
    Why
    kubernetes.container.image.ref
    or
    kubernetes.pod-template-file.taskmanager
    has no effect? Job manager still prefers to spin up task managers with some default image ignoring the values from config
  • a

    Alex Brekken

    05/06/2023, 10:02 PM
    Hi all, I’m using the Flink operator and have a job that I have defined using the FlinkSessionJob CR. I’m trying to set a specific Restart Strategy on my FlinkSessionJob, but based on the logs I’m seeing when the job is submitted, it’s not honoring my configuration and is instead falling back to the default Restart Strategy. (fixed-delay with Integer.MAX restart attempts) Here is a section from my FlinkSessionJob yml:
    Copy code
    flinkConfiguration:
        restart-strategy.type: "failure-rate"
        restart-strategy.failure-rate.max-failures-per-interval: "10"
        restart-strategy.failure-rate.failure-rate-interval: "1 min"
        restart-strategy.failure-rate.delay: "5 s"
    And here is the log output when the job is submitted:
    Copy code
    INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for Flink Streaming Job (000000005a6cddc20000000000000001).
    Any ideas? Do I have it configured incorrectly?
  • b

    Brandon

    05/07/2023, 12:35 AM
    Hey all, I'm wondering if anyone has successfully deserialized protobuf from Kafka using PyFlink. Would love to see an example if some are willing to share
    s
    • 2
    • 1
  • t

    Tommaso Garuglieri

    05/07/2023, 2:22 PM
    Hi ! Is possible to create an in-memory table using flinkSQL for a batch job ? Looks live even temporary tables requires a backing catalog/connector
    d
    • 2
    • 2
  • t

    Tudor Plugaru

    05/08/2023, 7:53 AM
    Hey team 👋 When using the ElasticSearch sink, is there a way to dynamically change
    bulkFlushMaxActions
    parameter? Say that due to ES struggling to index a bulk of 1000 actions and slow response, I'm thinking to have Flink adjust the bulk size based on the latency from ES, so if it takes 5s to index 1k actions, next bulk should be 500 actions and so on... Is this doable in Flink?
    p
    • 2
    • 1
  • k

    Krzysztof Chmielewski

    05/08/2023, 10:02 AM
    Hi, is there a way to submit JVM properties(-D) to flink job when submitting job from Flink UI? Seems that only program arguments are supported
    g
    • 2
    • 2
  • s

    Saleh

    05/08/2023, 12:35 PM
    Hello, I'm trying to run the fraud detection example in https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/datastream, but I'm getting
    NoClassDefFoundError
    Here's what I did: 1. Run the Maven project
    archetype:generate
    command. 2.
    cd frauddetection
    3. Run
    mvn clean package
    4. Run
    java -jar target/frauddetection-0.1.jar
    What I get:
    Copy code
    Error: Unable to initialize main class spendreport.FraudDetectionJob
    Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
    Note that I did not touch any of the project files. I created a Slack account to post this after scouring the internet for two hours and no solution. Any help would be appreciated.
    d
    • 2
    • 3
  • p

    Pritam Agarwala

    05/08/2023, 12:57 PM
    Hello, I need to get kafka-lag to prepare a graph and dependent on kafka committed offset. Flink is updating the offsets only after checkpointing to make it consistent. Can I enable OffsetCommitMode.Kafka_periodic with checkpointing enabled ?? Default Behaviour as per doc : If checkpoint is enabled, but
    consumer.setCommitOffsetsOnCheckpoints
    set to false, then offset will not be committed at all even if the
    enable.auto.commit
    is set to true. So, when
    consumer.setCommitOffsetsOnCheckpoints
    set to false, shouldn't it fall back on the
    enable.auto.commit
    to do offset commit.
  • a

    Alex T

    05/08/2023, 1:37 PM
    Hello, I’m running pyflink jobs in session mode using the flink-kubernetes-operator 1.4.0 and flink 1.17. When I submit multiple jobs at the same time, some jobs do not start and other jobs will start multiple times. However, the total number of jobs is always correct. Logs from the kubernetes operator always show each of the distinct jobs in the running state, but the flink ui will show the jobs that are actually running. Ex, if we have job1, job2, and job3; we may see job1 running twice, job 2 running one, and job3 not running at all. When deployed one at a time, all jobs start as expected. Are there any configurations or other recommendations that could resolve this issue?
    d
    • 2
    • 2
  • k

    Kevin L

    05/08/2023, 4:57 PM
    Hello! I am trying to understand how aligned windows are implemented in the Table API/SQL. I was looking through the Flink GitHub repo and noticed that there is something called a
    SlicingWindowOperator
    which appears to formulate a sliding window with multiple non-overlapping windows and aggregate them together upon emission (source code reference). However, I am having trouble figuring out where this operator is invoked when a hopping/tumbling window is used in the table API. From the comments in the source code, it appears that it’s related but wanted to reach out to see if anyone knows here. Additionally, any further documentation on the window implementation that people could link would be helpful as well Thanks!
  • i

    Iris Grace Endozo

    05/09/2023, 11:14 AM
    Hey folks trying to troubleshoot why counter metrics are appearing as gauges on my end. Is it expected that the StatsdMetricsReporter is reporting gauges for counters as well? Looking at this one: https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207, the statsd specifications state that counters need to be reported as
    <metric name>:<value>|c[|@<sample rate>]
    but it seems it's defaulting to
    "%s:%s|g"
    in the above. Wondering if anyone else has hit this issue?
  • t

    Trevor Burke

    05/09/2023, 10:07 PM
    Thinking about extending
    ParquetProtoWriters
    to allow for adding additional columns.. We'd like to add Flink
    processingTime
    and an
    eventTime
    column but our Protobuf models do not already have these fields, and we do not want to add them to all of our existing models
  • s

    Sucheth Shivakumar

    05/09/2023, 11:42 PM
    Hello, Can someone help on how can I achieve this using DataStream API, I have 2 topics, topic1- will have
    surface visited
    events with
    visitId
    field. topic2- will have
    surface visits linked
    events which has
    sourceVisitId
    and
    destinationVisitId
    which will basically link 2 surface visiteds event say SV1 and SV2. requirement is to stream events from both the topics and copy the attributes of SV1 to SV2 using surface visits linked event coming from topic2. we can think of window as 30 mins
  • c

    Christophe Bornet

    05/09/2023, 11:46 PM
    Hi,
  • c

    Christophe Bornet

    05/09/2023, 11:47 PM
    I have issues launching a session job in a session cluster with the k8s operator. I get the following stack trace
    Copy code
    2023-05-09 23:42:36,211 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=000000000f85d61c0000000000000001.
    org.apache.flink.table.api.TableException: Failed to execute sql
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
        at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
        at com.example.flink.SqlRunner.call(SqlRunner.java:123)
        at com.example.flink.SqlRunner.call(SqlRunner.java:38)
        at picocli.CommandLine.executeUserObject(CommandLine.java:2041)
        at picocli.CommandLine.access$1500(CommandLine.java:148)
        at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
        at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
        at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
        at picocli.CommandLine.execute(CommandLine.java:2170)
        at com.example.flink.SqlRunner.main(SqlRunner.java:94)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
        at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
        at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'insert-into_pulsar.test-12/flink.temp-925ce66e-9c15-4b64-bbb1-1099060712fc'.
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
        at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)
        ... 26 more
    Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
        at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
        at org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
        at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:177)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
    Caused by: java.io.FileNotFoundException: /tmp/flink-web-878fc539-68bb-46d9-b0e4-4720c05b75f3/flink-web-upload/c86d309c-2108-45e3-840e-f8434bcef27c_flink-sql-runner-0.0.1-SNAPSHOT.jar (No such file or directory)
        at java.base/java.io.FileInputStream.open0(Native Method)
        at java.base/java.io.FileInputStream.open(Unknown Source)
        at java.base/java.io.FileInputStream.<init>(Unknown Source)
        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
        at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
        at org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
        at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
        at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
        ... 48 more
    2023-05-09 23:42:36,249 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
    Stream closed EOF for default/session-cluster-d795455b-j86nh (flink-main-container)
    I see that the session cluster started a task manager pod when I submitted the job. But it seems the session cluster should send the application jar to it but doesn't find it ? I also see that the session cluster starts to execute part of the SQL queries before the error occurs. Does it use the TM only when it gets to an INSERT/SELECT query ?
  • u

    刘路

    05/10/2023, 6:26 AM
    I am using flink sql reading data from mysql write to postgresql. here is a example code.
  • u

    刘路

    05/10/2023, 6:26 AM
    insert into sink1 from select a,count(1) from source group by a;
  • u

    刘路

    05/10/2023, 6:27 AM
    But my postgresql version is 9.4 which does not support upsert. And this job get exception about this support. How can I do to work fine
  • s

    Sumit Singh

    05/10/2023, 7:51 AM
    Creating a JDBC MySQL Catalog via SQL Client, using Flink 1.17.0 and getting below error. Any idea what might be wrong ?
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@5b4e61ef' is not supported yet.
    s
    m
    • 3
    • 4
  • a

    Abolfazl Ghahremani

    05/10/2023, 8:58 AM
    can flinks's community say me a hi
  • t

    Tsering

    05/10/2023, 10:42 AM
    Good Morning, Afternoon & Evening! i have a Flink java app on my KDA and the data source is a KDS and have 2 sink and both are firehouse(one mainDS and one sideoutput). i have one operation on that data and its is a
    ProcessFunction
    and after that process function i just sink the data, the problem is that i am facing unevenly distribution of data across the parallelism, can someone please enlight me on this?
    j
    • 2
    • 19
  • d

    Dheeraj Panangat

    05/10/2023, 11:48 AM
    Hi Team, Will we be adding support for elasticsearch or opensearch as source for flink?
    m
    • 2
    • 1
  • a

    Abolfazl Ghahremani

    05/10/2023, 2:42 PM
    can flinks's community say me hi
    👋🏻 1
    👋 1
    m
    • 2
    • 3
  • t

    Tsering

    05/10/2023, 5:03 PM
    hello, i am trying to sink to kinesis data firehouse from KDA flink app but i was getting 100% busy for only ~330 data at the sink park 😦. is there any explanation for for this ?
    a
    a
    • 3
    • 5
  • c

    Control the Guh

    05/10/2023, 6:10 PM
    Hello, is Apache Flink planning to add support for LLM's? We can run LLama using something akin to the dalai package with enough memory, will an integration be possible on streaming data in the future? I'm thinking of transcribing a speech to text to LLama
    d
    • 2
    • 1
  • c

    Cody Lightfoot

    05/11/2023, 12:10 AM
    Hello everyone, I'm new to Flink and Python is my language of choice when it comes to development. I have been trying to get into Flink for the past 2 weeks and it has been a little rough. I came across this video on Youtube, (

    Robust Stream Processing with Apache Flink▾

    from 2016, and it seems like a great introduction to stateful processing. I do not know Java/Scala that well and I have been struggling to try to replicate what is done in the video in Python. Right now I am using Flink v1.15.2 with Python 3.8. I came on here for help and guidance from a community of people who have more than likely been doing this longer than me 🤣 I kind of understand what's going on in the video, but the data generator and the keyed process is confusing to me. If I can get past creating that myself then I think I can replicate that whole video in Python and learn a lot along the way. Is there anybody who can help? 😩
1...787980...98Latest