https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sylvia Lin

    10/04/2022, 9:37 PM
    Hey forks, does Flink k8s operator supports nodeAffinity? cos I didn't see related fields defined here in the crd https://github.com/apache/flink-kubernetes-operator/blob/release-1.1.0/flink-kuber[…]che/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
    🍴 1
    d
    • 2
    • 2
  • d

    Damon Rolfs

    10/05/2022, 3:32 AM
    Hi. I have a Flink job that consumes messages from Kafka via the Kafka connector. I use the JobManager’s REST API to monitor the records-lag-max (on source operators) and other metrics to make autoscaling decisions (not using Reactive Flink - instead via stop+savepoint and restart). After a moderate period of time - during which some up and down rescaling may happen - the records-lag-max metric is no longer published by the JobManager’s REST api. If I cancel+savepoint and restart the job at the same parallelism, the metric reading returns but after awhile goes away again until another restart, etc. Aside from certain Kafka source metrics disappearing, I haven’t seen unusual activity that I can point to suggesting an issue (CPU, memory utilization, classloader). Any guidance on what may be causing the Kafka source metrics to disappear or some things to look at is appreciated. Thanks in advance!
    c
    • 2
    • 1
  • s

    Sevvy Yusuf

    10/05/2022, 12:42 PM
    Hi guys, is it possible to chain sources sequentially for a batch job. We want to read a dynamo table to get s3 pointers in the first source, then grab the files at the s3 location in the second source (this is so that we can set different parallelisms for each). Is this something that can be achieved in flink?
    c
    • 2
    • 2
  • j

    Jaume

    10/05/2022, 1:20 PM
    👋🏼 Hi everyone! I have a problem regarding Flink 1.13.2 SQL Joins (Table API) for a unbounded streaming pipeline I want to make. 😵‍💫 I'm using 3 Kafka Sources (Using
    upsert-kafka
    connector)
    for joining and sink the result into another Kafka topic. For some cases it creates a proper result, but for others it results in an unexpected Tombstone message (They are unexpected because their relationships do exist). 🤔 Am I missing something? Does anyone why is this behavior happening? Can anyone bring some light here? 🧵 I'll put more info in thread ⤵️
    m
    • 2
    • 14
  • l

    Lydian Lee

    10/05/2022, 10:39 PM
    Hi, wondering what’s the right way to use
    WebIdentityTokenCredentialsProvider
    for s3 presto? I’ve tested provide
    Copy code
    presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    but it failed with :
    Copy code
    Error creating an instance of com.amazonaws.auth.WebIdentityTokenCredentialsProvider for URI s3p
    Thanks!
    c
    m
    v
    • 4
    • 10
  • a

    Aqib Mehmood

    10/06/2022, 6:02 AM
    Hi, We are creating a table called orders using flink table api. Whenever a new order comes in regarding an item, we want to compare the price with its last order price and do some logic computation on top of it. The issue is that We are unable to retrieve the last price of the item. We are using this code
    Copy code
    DataStream<Row> processor = resultStream.map(new MapFunction<Row, Row>() {
                @Override
                public Row map(Row row) throws NullPointerException{
                    try{
                        String newPrice = row.getField("price").toString();
                        String prodcutId = row.getField("productId").toString();
                        int i=Integer.parseInt(prodcutId);
                        Table pastPrice = tEnv.executeSql("SELECT price FROM (\n" +
                        "    SELECT data.price from orders where data.productId = " + i + " order by data.updatedAt desc limit 1);");
                    }
                    catch (NullPointerException err){
                        System.out.println("Empty row");
                    }
                    return row;
                }
            })
    We are getting this error
    Copy code
    Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractStreamTableEnvironmentImpl is not serializable. The object probably contains or references non serializable fields.
    	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2194) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:201) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:575) ~[flink-dist-1.15.0.jar:1.15.0]
    	at pricingtriggers.DataStreamJob.main(DataStreamJob.java:129) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
    	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
    What am I doing wrong here? TIA
    d
    h
    • 3
    • 4
  • s

    Sumit Nekar

    10/06/2022, 6:45 AM
    Hello Everyone, I am using flink kafka connector and eventhub as sink. I am getting the following errors when FlinkKafakProducer is publishing records to eventhub
    Copy code
    Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
    		at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
    		at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) ~[?:?]
    		at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    		at java.lang.Thread.run(Unknown Source) ~[?:?]
    		Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
    			at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
    			at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) ~[?:?]
    			at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    			at java.lang.Thread.run(Unknown Source) ~[?:?]
    		Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
    			at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
    			at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095) ~[?:?]
    			at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) ~[?:?]
    			... 10 more
    		Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
    	Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
    		at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
    		at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095) ~[?:?]
    		at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) ~[?:?]
    		... 10 more
    	Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
    Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
    Its not logging which policy is violated. Appreciate your help to debug this issue.
    d
    • 2
    • 2
  • c

    clen.moras

    10/06/2022, 10:17 AM
    hello everyone , after aks upgrade, flink operator is not able to communicate with the cluster. upgrded to v1.22.15. However once upgraded we are getting the following error:
    Copy code
    2022-10-06 10:10:10,849 i.j.o.p.e.s.c.ControllerResourceEventSource [INFO ] Stopping informer 'flinksessionjobcontroller' Controller -> io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer@297ca8b5
    2022-10-06 10:10:10,849 i.j.o.p.e.s.c.ControllerResourceEventSource [INFO ] Stopping informer 'flinkdeploymentcontroller' Controller -> io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer@3c0036b
    Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: Operation: [list]  for kind: [FlinkDeployment]  with name: [null]  in namespace: [o11y-aiops]  failed.
    	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:130)
    	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:140)
    	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:415)
    	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:83)
    	at io.fabric8.kubernetes.client.informers.cache.Reflector.listSyncAndWatch(Reflector.java:81)
    	at io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer.run(DefaultSharedIndexInformer.java:146)
    	at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.createAndRunInformerFor(ControllerResourceEventSource.java:101)
    	at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.lambda$start$0(ControllerResourceEventSource.java:84)
    	at java.base/java.lang.Iterable.forEach(Unknown Source)
    	at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.start(ControllerResourceEventSource.java:83)
    	at io.javaoperatorsdk.operator.processing.event.EventSourceManager.start(EventSourceManager.java:72)
    	at io.javaoperatorsdk.operator.processing.Controller.start(Controller.java:196)
    	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
    	at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    	at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
    	at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
    	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(Unknown Source)
    	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    	at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
    	at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
    	at io.javaoperatorsdk.operator.Operator$ControllerManager.start(Operator.java:170)
    	at io.javaoperatorsdk.operator.Operator.start(Operator.java:83)
    	at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:169)
    	at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:179)
    Caused by: java.net.SocketTimeoutException: connect timed out
    appreciate any help with debugging.
    j
    • 2
    • 11
  • e

    Emily Morgan

    10/06/2022, 1:48 PM
    Hey there! I'm trying to run an experiment that measures the time it takes for a job to start from a savepoint for varying backend state sizes. Is there any way to get the total size of all state in the job? I'm looking at the state processor API docs. I have a kafka source and a custom flat map operator so I'm using
    initializeState
    and
    snapshotState
    overrides for the source and then manually managing the state inside the custom flat map operator. Updating the state is working correctly because I can see that the size of the metadata file changes after I create a savepoint, but is there an example of how to know the size of the state before the metadata file is written?
    d
    c
    d
    • 4
    • 11
  • b

    Beny Chernyak

    10/06/2022, 1:54 PM
    Hello All, I'm upgrading from flink 1.11 to 1.15. After setting up all new dependencies I faced the following error while trying to run:
    org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
    From configuration side nothing changed: I use HADOOP_CONF_DIR env variable pointing to directory with the same core-site.xml I've used earlier:
    Copy code
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
     <property>
      <name>fs.s3a.impl</name>
      <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
     </property>
     <property>
      <name>fs.s3.impl</name>
      <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
     </property>
     <property>
      <name>fs.s3n.impl</name>
      <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
     </property>
     <property>
      <name>fs.s3a.access.key</name>
      <value>xxxxx</value>
     </property>
     <property>
      <name>fs.s3a.secret.key</name>
      <value>xxxxx</value>
     </property>
     <property>
      <name>fs.s3a.aws.credentials.provider</name>
      <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
     </property>
     <property>
      <name>fs.s3a.buffer.dir</name>
      <value>${hadoop.tmp.dir}/s3a</value>
     </property>
    </configuration>
    I succeed only when I explicitly put AWS_ACCESS_KEY_ID and AWS_SECRET_KEY as env variables, and I'd like to use the values coming from core-site.xml. What I've tried so far: • Changing org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider to com.amazonaws.auth.profile.ProfileCredentialsProvider in core-site.xml • Adding FLINK_HOME and FLINK_CONF env variables • Adding flink-conf.yaml with fs.s3a.access.key: xxxxxx fs.s3a.secret.key: xxxxxx Nothing helps so far. What am I doing wrong and/or what am I missing here?
    ✅ 1
    m
    c
    +2
    • 5
    • 38
  • m

    Maher Turifi

    10/06/2022, 3:19 PM
    Hi, I'm trying to implement Processing Time Temporal Join on two streams (Flink 1.13):
    Copy code
    DROP TABLE IF EXISTS versioned_rates;
    CREATE TABLE versioned_rates (
        rate_currency STRING NOT NULL,
        rate DECIMAL(38, 10),
        rate_proctime as PROCTIME(),
        currency_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
        WATERMARK FOR currency_time AS currency_time ,
        PRIMARY KEY(rate_currency) NOT ENFORCED            -- primary key (currency)
    ) WITH (
        'connector' = 'upsert-kafka',
        ...
        'key.format' = 'raw',
        'value.format' = 'json'
    );
    
    CREATE TABLE orders (
        order_id STRING,
        order_currency STRING NOT NULL,
        amount INT,
        order_proctime as PROCTIME() 
    ) WITH (
        'connector' = 'kafka',
        ...
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json',
    );
    my join statement is :
    Copy code
    SELECT 
      o.order_id,
      o.order_proctime,
      o.amount * r.rate AS amount,
      r.rate rate,
      r.rate_currency
    FROM orders AS o JOIN versioned_rates_v FOR SYSTEM_TIME AS OF o.order_proctime r
    on o.order_currency = r.rate_currency;
    However, I;m receiving the following error:
    Copy code
    TableException: Processing-time temporal join is not supported yet.
    ...
    According to StreamExecTemporalJoin.java :
    Copy code
    "the semantic of this implementation is problematic, because the join
                    // processing
                    // for left stream doesn't wait for the complete snapshot of temporal table, this
                    // may
                    // mislead users in production environment. See FLINK-19830 for more details.
    so I tried implementing this using the temporal table function:
    Copy code
    SCALA>val versioned_rates= stenv.from("versioned_rates_v")
    SCALA>val rates = versioned_rates.createTemporalTableFunction($"rate_proctime", $"rate_currency")
    Copy code
    SELECT  
      o.order_id,
      o.amount * r.rate AS amount,
      r.rate rate,
      r.rate_currency
    FROM orders o,  LATERAL TABLE (Rates(o.order_proctime)) r
    WHERE o.order_currency = r.rate_currency;
    However the issue now is different:
    Copy code
    UnsupportedOperationException
    I also tried this in scala :
    Copy code
    val  orders = stenv.from("orders")
    val  result = orders
        	.joinLateral(call("Rates", $("order_proctime")), $("order_currency").isEqual($("rate_currency")))
        	
    stenv.dropTemporaryView("join_results")
    stenv.createTemporaryView("join_results", result)
    
    SQL> select * from join_results
    is there anything I'm doing wrong, could anyone guid me on this, thanks
  • p

    Prasaanth Neelakandan

    10/06/2022, 10:00 PM
    hi folks, we have this metric named flink_lastcheckpointDuration in our monitoring charts, this shows that checkpoints take about 7 seconds to complete for us: this seems like a JM metric, however the flink UI shows us the max checkpoint duration was 2 mins 3 seconds. We cant find the 2 min spike in the other metric. Is that metric an aggregate on the JM across all TMs and operators ? OR We suspect the time taken for the 2 mins checkpoint is in the Async operation i.e file system write to S3. Does the JM metric only measure sync duration and that would explain the absence of the 2 min spike in the chart?
  • l

    Lydian Lee

    10/07/2022, 7:14 AM
    Hi, I am having trouble with savepoint and checkpoint to s3a. It seems like it always write 0-length data to s3. I am currently using flink 1.14.5. Google search mentioned about setting parallism to 1 and use rocksdb backend, but nothing really fix the issue for me. Wondering if anyone can help me with this issue. Thanks so much!!!
    Copy code
    2022-10-05 17:05:32,027 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)/{ParDo(Out
     2022-10-05 17:19:34,272 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering cancel-with-savepoint for job 1a68f74acf0ccf403693e2f228fa62a6.
     2022-10-05 17:19:34,285 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1664990374275 for job 1a68f74acf0ccf403693e2f228fa62a6.
     2022-10-05 17:19:34,287 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Making directory: <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
     2022-10-05 17:19:34,287 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - op_mkdirs += 1  ->  3
     2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - op_get_file_status += 1  ->  6
     2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>  (flink/expansi
     2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
     2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_metadata_requests += 1  ->  6
     2022-10-05 17:19:34,381 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_list_requests += 1  ->  6
     2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Not Found: <s3a://affirm-stage-chrono/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
     2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - op_get_file_status += 1  ->  7
     2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints>  (flink/expansion-service-example/savepoints)
     2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints>
     2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_metadata_requests += 1  ->  7
     2022-10-05 17:19:34,431 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_list_requests += 1  ->  7
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Found path as directory (with /)
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Prefix count = 0; object count=1
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Summary: flink/expansion-service-example/savepoints/ 0
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT 0 bytes to flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT start 0 bytes
     2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_put_requests += 1  ->  3
     2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT completed success=true; 0 bytes
     2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_put_requests_completed += 1  ->  3
     2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Finished write to flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/, len 0
     2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - To delete unnecessary fake directory flink/expansion-service-example/savepoints/ for <s3a://affirm-stage-chrono/flink/expansion-service-exa>
     2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - To delete unnecessary fake directory flink/expansion-service-example/ for <s3a://affirm-stage-chrono/flink/expansion-service-example>
     2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - To delete unnecessary fake directory flink/ for <s3a://affirm-stage-chrono/flink>
     2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_delete_requests += 1  ->  3
     2022-10-05 17:19:34,617 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress              [] - PUT flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/: 0 bytes
     2022-10-05 17:24:34,285 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 1 of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
     2022-10-05 17:24:34,287 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6. (0 consecutive failed attempts so far)
     org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) [flink-dist_2.12-1.14.5.jar:1.14.5]
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
         at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_292]
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_292]
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
     2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - op_delete += 1  ->  1
     2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - op_get_file_status += 1  ->  8
     2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>  (flink/expansi
     2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
     2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_metadata_requests += 1  ->  8
     2022-10-05 17:24:34,360 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_list_requests += 1  ->  8
     2022-10-05 17:24:34,394 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Found path as directory (with /)
     2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Prefix count = 0; object count=1
    2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Summary: flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/ 0
     2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Delete path <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89> - recursive true
     2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - delete: Path is a directory: <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
     2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Deleting fake empty directory flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
     2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_delete_requests += 1  ->  4
     2022-10-05 17:24:34,427 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints>
     2022-10-05 17:24:34,427 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_metadata_requests += 1  ->  9
     2022-10-05 17:24:34,444 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_list_requests += 1  ->  9
     2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Not Found: <s3a://test-bucket/flink/expansion-service-example/savepoints>
     2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Creating new fake directory at <s3a://test-bucket/flink/expansion-service-example/savepoints>
     2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT 0 bytes to flink/expansion-service-example/savepoints/
     2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT start 0 bytes
     2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_put_requests += 1  ->  4
     2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - PUT completed success=true; 0 bytes
     2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_put_requests_completed += 1  ->  4
     2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - Finished write to flink/expansion-service-example/savepoints/, len 0
     2022-10-05 17:24:34,548 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - To delete unnecessary fake directory flink/expansion-service-example/ for <s3a://affirm-stage-chrono/flink/expansion-service-example>
     2022-10-05 17:24:34,548 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem                       [] - To delete unnecessary fake directory flink/ for <s3a://affirm-stage-chrono/flink>
     2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics                [] - object_delete_requests += 1  ->  5
     2022-10-05 17:24:34,597 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress              [] - PUT flink/expansion-service-example/savepoints/: 0 bytes
    c
    m
    • 3
    • 3
  • e

    Emile Alberts

    10/07/2022, 10:46 AM
    Hi. I’m looking at the
    DataDogHttpReporter
    for metrics: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters/#datadog. How can I read the API key from environment variables or a Kubernetes secret?
    c
    • 2
    • 28
  • t

    Tiansu Yu

    10/07/2022, 1:48 PM
    Hi, is there a way to read parquet files in S3 in StreamExecutionEnvironment under Flink 1.13.x?
    ✅ 1
    c
    • 2
    • 14
  • a

    Abhinav sharma

    10/07/2022, 2:16 PM
    Is it possible for me to send the flink aggregated data to an API endpoint so that I can consume it in my NodeJS code?
    c
    • 2
    • 3
  • h

    Hygor Knust

    10/07/2022, 4:54 PM
    Hi people, Is there a way to fetch environment variables in sql-client? We have several source tables using the Kafka connector with Schema Registry, resulting in lots of configuration. Ideally we would like to have something like:
    Copy code
    CREATE TABLE source_table (
    	…columns
    )
    WITH (
    	'connector' = 'kafka',
      	'topic' = 'source_topic',
    	'properties.bootstrap.servers' = ${kafka_broker},
      	'properties.sasl.jaas.config' = ${kafka_sasl_config},
      	'key.avro-confluent.basic-auth.user-info' = ${sr_auth},
      	'value.avro-confluent.basic-auth.user-info' = ${sr_auth}
    )
    This way we will also not have sensitive information hard coded.
    r
    m
    • 3
    • 3
  • c

    Carlos Castro

    10/07/2022, 6:57 PM
    Hello. Does the logic to create a Kubernetes Deployment resource based on the FlinkDeployment CRD are in the flink-operator or in the flink project?
    m
    • 2
    • 11
  • m

    Mingliang Liu

    10/07/2022, 11:40 PM
    Hi, in the “State Processor API” doc, it mentions we can “adjust the maximum parallelism of operators”. I’m wondering if there is anyone doing that? I have one job with max parallelism 128 and would like to make it 1024. The job graph is pretty complex and I don’t want to read all state values into DataSet and create a new Savepoint to write all of them. Ideally there is a solution that loads existing savepoint, sets max parallelism, and writes a new savepoint.
    c
    • 2
    • 1
  • s

    Sumit Nekar

    10/08/2022, 4:15 AM
    Hello Everyone, We are evaluating the options of session mode and application mode with flink kubernetes operator. I understand the basic differences between both. Application mode seems to be having more job restart time as whole cluster restart is done where as session mode will just restart just job and hence lesser restart time.
    All our jobs are long time running jobs
    .I am thinking of having a session cluster but deploying only one job to get the benefits of both modes. How is your experience with a production setup?
    g
    • 2
    • 4
  • r

    Ravisangar Vijayan

    10/08/2022, 4:38 AM
    Hello Team, I have a situation like, data is not getting distributed equally among all producer topic partitions .. We are using default DefaultPartitioner with Default UidHash key .. Looks like logic uses stickyPartitionCache if we arent passing the key which is mostly picking up the partition 0 mostly. So partition 0 is getting loaded with huge data and other partitions has some amount of date. Please advice how to avoid this issue
    s
    • 2
    • 4
  • r

    Ravisangar Vijayan

    10/08/2022, 9:32 PM
    Hello team, One clarification , Would flink throw rebalance exceptions / slow process when Application goes beyond interval.ms in completing the task?
  • l

    Lee Senlen

    10/10/2022, 1:40 AM
    Does lateral-Join(lateral table function) have plans to support asynchronous requests?
  • b

    Bhupendra Yadav

    10/10/2022, 10:00 AM
    Hi everyone. Hope you all are doing well. tldr: Memory leak. Metaspace/heap memory, threads count etc increases as more jobs are run in Flink Session Cluster. We are using Flink's Async data stream to process some data & using GCP flink operator to run flinkcluster in session mode on Kubernetes. We are noticing that as we submit more and more jobs, the heap & metaspace memory usage keeps increasing. Same for thread count. And after certain point, when memory usage of pod goes above limit, it's killed by Kubernetes. Has anyone faced this earlier? What are possible fixes?
    r
    • 2
    • 4
  • t

    Tiansu Yu

    10/10/2022, 10:49 AM
    I have some further questions with regard to this doc page: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins 1. The link to
    the same configuration keys as the Presto file system
    in this page is broken. Also in general, how do i pass the presto JDBC connection parameters (url, port number, user, credentials, and finally the fully qualified table name string) to the presto connector? Or one has to save the parameters in a configuration file (and load it somehow) and replace the s3 address with the s3p prefix? 2. I do not understand what it means
    It is the only S3 file system with support for the FileSystem.
    ?
    ✅ 1
    c
    • 2
    • 6
  • a

    Angelo Kastroulis

    10/10/2022, 2:19 PM
    Hi Everyone! I’m trying to get the bucket-key or number of buckets to work with flink sql, but struggling a bit. Here’s the gist of what I have:
    Copy code
    CREATE TABLE MyTable (
      id INT,
      stopTime TIMESTAMP(0),
      startTime TIMESTAMP(0),
      fk INT) WITH (
        <set a number of BUCKETs AND/OR BUCKETKEY by fk HERE>,
        <FILESYSTEM SOURCE CONNECTOR HERE>
      )
    I’m getting an error the the filesystem connector doesn’t understand the options for “bucket” or “bucket-key” which leads me to believe that you can’t use them with a connector, but there is no documentation on any of the expected usage other than mentioning buckets in passing in the docs. How would buckets normally be used?
    m
    j
    • 3
    • 20
  • t

    Tiansu Yu

    10/10/2022, 3:59 PM
    Encountered this problem while trying to read a parquet file:
    List field [optional binary attribute_key_id (STRING)] in List [attribute_values] has to be required.
    Somehow flink-parquet tries to ask the field embeded in a list to be type required, through the entire group in the parquet schema is optional. Details in 🧵
    m
    • 2
    • 6
  • r

    Ravisangar Vijayan

    10/10/2022, 4:01 PM
    Hello team ,. anybody come across this issue
    org.apache.flink.runtime.execution.CancelTaskException: Buffer pool has already been destroyed.
    ?
  • t

    Tawfik Yasser

    10/10/2022, 6:11 PM
    Hello team I'm trying to deep dive in watermarks component in flink, such as generating watermarks I just need some insights to help me understand the internal implementation for that component, for example any ideas about how to read the code and go through it. Thank you 🙏
    d
    • 2
    • 2
  • k

    Kwangin Jung

    10/11/2022, 1:20 AM
    Hello team I have some question about lifecycle of window. Currently I'm making some trigger function for window
    Copy code
    public class MyTriggerFunction extends Trigger<String, TimeWindow> {
    
    	public MyTriggerFunction() {
            this.aggregatingStateDescriptor = new AggregatingStateDescriptor<>(
                    "my-trigger",
                    new MyAggregatorFunction(),
                    new TupleSerializer<>(
                        (Class<Tuple2<Integer, Integer>>) (Class) Tuple2.class,
                        new TypeSerializer[] {new IntSerializer(), new IntSerializer()}
                );
            );
        }
    
        public String onElement(...) {
        	if (...) {
        		return TriggerResult.FIRE_AND_PURGE;
        	}
        }
    }
    
    public class MyAggregatorFunction implements AggregateFunction<String, Tuple2<Integer, Integer>, Boolean> {
    	// ...
    }
    Until I've tested, when
    TriggerResult.FIRE_AND_PURGE
    is being called, it emits the result of current window, but data in
    this.aggregatingStateDescriptor
    still remains. Does somebody knows when this data will be cleared? I've see following in document https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#fire-and-purge
    Copy code
    Purging will simply remove the contents of the window and will leave any potential meta-information about the window and any trigger state intact.
    but I don't think this will be remained forever, because millions of window can be created while processing, and remaining all these data looks bad for memory.
1...222324...98Latest