https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    Felix Angell

    04/26/2023, 10:54 AM
    hey there, is there any rule of thumb for how many seconds i should set my max_drift + watermark_interval? the docs specify 20 seconds and 1 second respectively. we have events that can be as late as 10-30 minutes. would this mean we are throttling sources for that long? what is the impact on having a super large drift setting?
    • 1
    • 1
  • g

    Gintaras Matulas

    04/26/2023, 11:01 AM
    Hey. What is the best way to limit data rate ? We need to buffer data at the start of the stream of some time. To limit the state growth rate we added a thread sleep. But this causes issue with checkpoint time
    m
    • 2
    • 1
  • ю

    Юрий Смирнов

    04/26/2023, 11:30 AM
    Hi All! I have a problem with PyFlink(1.17.0) -> NumPy(1.21.6) with M1 processor while running any of PyFlink Examples.
    Copy code
    Original error was: dlopen(/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so, 0x0002): tried: '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (no such file), '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))
    I've tried to use conda, reinstalling numpy and pyflink, set "-arch arm64" in pip install, tried all pythons 3.7-3.10, tried to use rosetta. Nothing helps. If anyone knows how to solve this issue, please help.
    d
    g
    y
    • 4
    • 22
  • a

    Alexis Leclerc

    04/26/2023, 2:02 PM
    Hello! 👋 Is there a known issue with the JDBC connector causing it to be late with the 1.17.0 release? The official docs for the connector refer to a version (
    3.0.0-1.17
    ) that does not seem to exist (see https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc). Same applies to version
    1.17.0
    of the connector, which is usually the format we use.
    m
    • 2
    • 11
  • d

    David Wisecup

    04/26/2023, 3:11 PM
    Can anyone recommend any exemplar flink projects which show testing, layout, and other useful, practical applications? Java and SQL would probably be the languages.
    m
    • 2
    • 3
  • a

    Aeden Jameson

    04/26/2023, 6:30 PM
    While debugging in IntelliJ I’d like to override some of the LocalEnvironment settings. E.g parallelism from a config or via parameters and not in code. Despite setting FLINK_CONF_DIR and providing parallelism.default: 1 in the fink-conf.yaml the default value of parallelism as the number of cores gets used. How can I achieve the above?
    l
    • 2
    • 2
  • t

    Tan Kim

    04/27/2023, 1:05 AM
    Hello! I’m testing a Autoscaler in Flink Kubernetes Operator v1.4.0 these days, there is some issue. When some job is scaled down, the error is occurred. (detail error in thread) If there is anyone who resolve this, please let me share, thank you! Flink: 1.17.0 FKO: 1.4.0 StateBackend: RocksDB(GIC & UC enabled)
    g
    • 2
    • 16
  • a

    Amenreet Singh Sodhi

    04/27/2023, 10:03 AM
    Hi Team, I am deploying my job in application mode on Flink-1.16.0, but keep constantly receiving this error from a long time:
    Copy code
    2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 1 disconnected.
    2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 8759 due to node 1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
    2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1:
    org.apache.kafka.common.errors.DisconnectException: null
    2023-04-10 13:48:40,317 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-122] Node 19 disconnected.
    2023-04-10 13:51:20,059 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 16 disconnected.
    2023-04-10 13:51:20,060 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 10312 due to node 16 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
    2023-04-10 13:51:20,060 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 16:
    org.apache.kafka.common.errors.DisconnectException: null
    2023-04-10 13:51:48,469 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    2023-04-10 13:51:48,829 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    Also, found this ticket with the above issue: https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel Seems the issue is still not resolved. If anyone knows how to solve this issue, please help.Thanks
    m
    p
    • 3
    • 4
  • e

    Erlend F

    04/27/2023, 11:05 AM
    Hi all! I'm entirely new to Flink coming from ksqlDB/Kstreams and have some questions on how I should attack a problem. I'd really appreciate any input here, putting full text in thread.
    p
    • 2
    • 2
  • r

    Rashmin Patel

    04/27/2023, 12:00 PM
    Hii all Is there a way to configure number of cores, when running flink job in BATCH execution mode ? In STREAMING mode, one slot takes up one core and not more than that. Is it same in batch mode as well ?
  • c

    chunilal kukreja

    04/27/2023, 1:06 PM
    Hi Team, The below mentioned issue is marked as resolved in version 1.16.0; https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel But we are still facing it with 1.16.0 version. Can someone pls tell what is the solution that is done for this ticket? Because I can’t find any solution mentioned in the ticket.
  • m

    Mehul Batra

    04/27/2023, 2:51 PM
    Hi All, I am trying to sink in data to OpenSearch/Elasticsearch via flink connecter in upsert mode but it failing due to version conflict during the update on the document, is there any way I could tweak it to proceed with conflict.
    Copy code
    detect_noop[true]} of bulk request failed with status 409.
    	at org.apache.flink.connector.opensearch.sink.OpensearchWriter.extractFailures(OpensearchWriter.java:346)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    	at 
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.flink.util.FlinkRuntimeException: Single action update
    [OpenSearch exception [type=version_conflict_engine_exception, reason=[69404946617821]: version conflict, document already exists (current version [1])]]
    t
    • 2
    • 13
  • e

    Elizaveta Batanina

    04/27/2023, 3:19 PM
    Hi! I have a problem with running sql queries using python TableAPI 1.16.1 version. When I select watermark field, I have an error:
    Copy code
    t_env.execute_sql("""
    CREATE VIEW response_time AS (
    SELECT KEY_COUNTRY_CODE, GLOBAL_ENTITY_ID, VENDOR_CODE, TM,
    TIMESTAMPDIFF(SECOND,`ORDER_PARSED`.SENT_TO_VENDOR_AT, `ORDER_PARSED`.ACCEPTED_BY_VENDOR_AT) as RESPONSE_TIME
    from delivery_events_liza)
    """)
    Copy code
    An error occurred while calling o498.executeSql.
    : java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
    validated type:
    RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) *ROWTIME* TM, INTEGER RESPONSE_TIME) NOT NULL
    converted type:
    RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) TM, INTEGER RESPONSE_TIME) NOT NULL
    rel:
    LogicalProject(KEY_COUNTRY_CODE=[$2], GLOBAL_ENTITY_ID=[$3], VENDOR_CODE=[JSON_VALUE($1, _UTF-16LE'$.VENDOR_CODE')], TM=[CAST(parse_bq_datetime_udf($0)):TIMESTAMP(3)], RESPONSE_TIME=[CAST(/INT(Reinterpret(-(parse_order($1).ACCEPTED_BY_VENDOR_AT, parse_order($1).SENT_TO_VENDOR_AT)), 1000)):INTEGER])
      LogicalTableScan(table=[[default_catalog, default_database, delivery_events_liza]])
    How to fix this problem? If I just remove TM field, which is a watermark, my query finishes without an error. Thanks for help!
  • k

    Krzysztof Chmielewski

    04/27/2023, 4:06 PM
    Hi Im running very simple Flink Job on Flink 1.17 and I see this in logs:
    Kryo serializer scala extensions are not available
    I'm not using Scala in my sandbox job, pure java. What should I do to turn this off? the same thing was reported by other user on April 21st but unfortunately got no response https://apache-flink.slack.com/archives/C03G7LJTS2G/p1682093159771169
    d
    c
    • 3
    • 32
  • r

    Raghunadh Nittala

    04/27/2023, 4:12 PM
    Hello All, I’m trying to sink data to S3 in parquet format, creating the table using below DDL:
    CREATE TABLE sink_table_s3 (
    event_id STRING NOT NULL,
    event_type STRING NOT NULL,
    event_name STRING NOT NULL,
    ``date` STRING,`
    results_count BIGINT
    `) PARTITIONED BY (event_id, event_type,
    date
    ) WITH (`
    'connector' = 'filesystem',
    'path' = '<path>',
    'format' = 'parquet'
    );
    I’m using the below SQL query to sink data to S3 in parquet format:
    INSERT INTO sink_table_s3
    SELECT event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
    FROM source_table
    GROUP BY event_id, event_type, event_name, TUMBLE(proc_time, INTERVAL '1' HOUR);
    I am partitioning the table on event_id, event_type and date columns. I observed the parquet files are getting saved for an
    event_id
    ,
    event_type
    , but the date is being wrong. The data being processed today is being saved to 2023-04-26 folder. As I am using
    proc_time
    to derive the date, I expect data to be saved to 2023-04-27 folder. Can someone help me in identifying the issue?
    m
    s
    • 3
    • 3
  • m

    Mali

    04/27/2023, 5:09 PM
    Hello everyone, I have a simple question about flink and iceberg. I am not sure this is strictly related about flink or iceberg. (Probably iceberg). I have a table with iceberg format but the table hasn’t any partition. I need to add partition on my date column. Can i do that? Or should i drop and recreate table? I read the documentation but didn’t understand clearly. I was able to do that with spark-iceberg. I need like a “ADD PARTITION FIELD”
    m
    • 2
    • 3
  • a

    Abolfazl Ghahremani

    04/28/2023, 5:10 AM
    is there anyone that works global windowing in flink?
  • s

    Suriya Krishna Mariappan

    04/28/2023, 5:52 AM
    Hey everyone, We use flink runner (v1.13) with beam pipelines (with kafka topic as input source) and use Flink kubernetes operator for deployment . We seem to have encountered a lost message due to a message processing failure causing a restart in the pipeline. We have enabled flink checkpoints with S3 storage, we have also set the beam setting to commit kafka offset on finalize . We enabled unaligned checkpointing(few weeks ago) after we saw long delays in checkpointing. Yesterday we saw an exception in the pipeline, which caused it to restart. And flink restored the state from the last checkpoint but to our surprise , the failed message was not retried again but the offset had moved to the next position. Our expectation was that the offset would not move until the message has been processed. This is the first time we are seeing this issue, where we had lost a message because the offsets moved. When can this type of data loss happen ? Is it related to Unaligned checkpointing? Any help would be greatly appreciated. cc @Arun
    • 1
    • 1
  • d

    Dheeraj Panangat

    04/28/2023, 11:04 AM
    Hi All, Seeing an error with Table API when using select from table. As per this (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/systemfunctions/), the currentDate() should be equivalent to CURRENT_DATE, but I see it is transformed to TIMESTAMP_LTZ(3). Running into following error because of that :
    Copy code
    org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'default_catalog.default_database.table_name' do not match.
    Cause: Incompatible types for sink column 'summary_date' at position 2.
    
    Query schema: [*********, summary_date: TIMESTAMP_LTZ(3) NOT NULL, **********]
    Sink schema:  [*********, summary_date: DATE, *******************************]
    Can someone please help look at this ? Thanks
    m
    j
    • 3
    • 7
  • t

    Tan Kim

    04/28/2023, 2:59 PM
    Hello, I’m back again. I’d like to ask about Scaling Metric at this time. When to compute scaling target parallelism, TargetProcessingCapacity need to be calculated, but it doesn’t seem to provide a metric via prometheus reporter from kubernetes operator. Is there anything I’m missing for this?
    • 1
    • 2
  • a

    Ashwin Gupta

    04/28/2023, 4:28 PM
    Hi, I am trying to upgrade scala from 2.11 to 2.12 and currently on flink 1.13.5. I am facing
    NPE
    when kryo is trying to serialise
    PriorityQueue
    . Stack trace in thread
    m
    s
    • 3
    • 12
  • m

    Mayur Sharma

    04/28/2023, 5:31 PM
    New to flink and looking for some guidance on aggregating stream data preferably using SQL over a 60 day window - 1. Input - a stream of events where each event has
    user_id, price, event_timestamp
    2. Output - every 1 minute aggregate
    min(price) and max(price)
    by
    user_id
    for the past 60 days Was hoping to use sliding/hop windows of size 60 that slide every 1 min but seems like the first aggregation won't be available until first 60 days are completed. Is OVER the only windowing option for this case or am I missing something about sliding windows?
  • j

    Jirawech Siwawut

    04/29/2023, 3:16 AM
    Hi all. I would like to know how each Operator of Flink decides which element to be processed in subtask. I came across this code and quite not understand link. My main question is how Flink broadcast compact unit to each subtask. What happen if there is odd number of compact unit, and even number of subtask. If there is a document i would appreciate it.
    Copy code
    // broadcast
            harness0.processElement(new CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0);
            harness0.processElement(new CompactionUnit(1, "p0", Collections.singletonList(f3)), 0);
            harness0.processElement(new CompactionUnit(2, "p0", Arrays.asList(f2, f5)), 0);
            harness0.processElement(new CompactionUnit(3, "p0", Collections.singletonList(f6)), 0);
    
            harness0.processElement(new EndCompaction(1), 0);
    
            // check compacted file generated
            assertThat(fs.exists(new Path(folder, "compacted-f0"))).isTrue();
            assertThat(fs.exists(new Path(folder, "compacted-f2"))).isTrue();
    
            // f3 and f6 are in the charge of another task
            assertThat(fs.exists(new Path(folder, "compacted-f3"))).isFalse();
            assertThat(fs.exists(new Path(folder, "compacted-f6"))).isFalse();
  • c

    CG

    04/29/2023, 1:11 PM
    Hi. I’ve a Flink job which run out of heap space and the task managers were killed. I set the job state to suspended and did the Helm upgrade. I can see the Apache Flink operator suspending the job from the log. I then increase the task managers memory by changing the Helm values and use Helm upgrade with the job state set to running. I see that the operator is upgrading the spec but when the task manager was deployed in kubernetes, the amount of memory does not set to my new memory size. Any idea what’s preventing the new value being set?
    • 1
    • 1
  • l

    Leandro Garcia

    04/30/2023, 2:15 AM
    Hi team, Im completely new in Flink and would really apreciate some help in how to solve the following use case. In stock trading, Market makers are institutions that are hired by the stock exchange providing liquidity for some stocks and for so they must act on a daily basis keeping some stock tickers in the orders book respecting the activity parameters like (minimum amount, maximum spread between the buy and sell price and percentage of activity in trading session that usualy is from 09.00 to 18.00). I need to create a query to monitor in near real time or in a small time window those institutions that are not following the one of this three parameters. For the parameter that state percentage of activity in trading session that usualy is from 09.00 to 18.00) I also need to report how many time/hour it institutions broke the rule. Any help with mockup of queries are really welcome and would help me a lot. Thanks and Regards
  • s

    Srivatsav Gorti

    04/30/2023, 1:12 PM
    Hey fellow Flink devs, I have a use case with flink table API where there is a stream of JSONObject incoming (flattened one) and we are populating all values of that JSONObject into a single Row in a flink table. But the problem is with column names i.e field names. Here’s a sample code which works and am getting the output of the rows but I am not getting column names in the output as it’s a dynamic schema. (I am only getting f0 in the column name.) I want the column names too (which are keys of the JSONObject) as I want to use them to join with another table. Please help me out . Stuck on this for a while ! Appreciate the support in anticipation Flink version : 1.14.3
    Copy code
    public class LookupFlinkApp implements Serializable {
      private static final String CONFIG_FILE = "/configuration.properties";
      private static ParameterTool applicationProperties;
      private static final StreamExecutionEnvironment STREAM_ENVIRONMENT =
          StreamExecutionEnvironment.getExecutionEnvironment();
    Copy code
    private static final Logger logger = LoggerFactory.getLogger(LookupFlinkApp.class);
      static EnvironmentSettings settings = EnvironmentSettings
              .newInstance()
              .inStreamingMode()
              .build();
      static StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(STREAM_ENVIRONMENT, settings);
    Copy code
    public static void main(String[] args) throws Exception {
        ParameterTool parameter;
    Copy code
    try (InputStream in = LookupFlinkApp.class.getResourceAsStream(CONFIG_FILE)) {
          parameter = ParameterTool.fromPropertiesFile(in);
        }
        try (InputStream in =
                     LookupFlinkApp.class.getResourceAsStream(AppConstants.APPLICATION_PROPERTIES_YAML_FILE)) {
          applicationProperties = ParameterTool.fromPropertiesFile(in);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        Properties properties = parameter.getProperties();
        Properties appProperties = applicationProperties.getProperties();
    Copy code
    KafkaSource<String> kafkaSource =
            KafkaSource.<String>builder()
                .setBootstrapServers(applicationProperties.get("input.bootstrap.servers"))
                .setTopics(applicationProperties.get("kafka.input.topic"))
                .setGroupId(applicationProperties.get("input.group.id"))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
    Copy code
    final SingleOutputStreamOperator<String> kafkaData =
            STREAM_ENVIRONMENT
                .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
                .setParallelism(1);
    Copy code
    SingleOutputStreamOperator<FlowEntity> dataFromSource =
            kafkaData.process(new StringToFlowEntity()).setParallelism(1);
    Copy code
    // Stream of JSONObjects
        DataStream<JSONObject> content = dataFromSource.map(new CustomMapFunction(properties));
    Copy code
    // convert the content datastream into a table
        DataStream<Row> rows = content.flatMap(new BuildTableRowsFunc());
    Copy code
    Table postingsTable = streamTableEnvironment.fromDataStream(rows);
    Copy code
    postingsTable.printSchema();
    Copy code
    Table result = postingsTable.select($("*"));
    Copy code
    streamTableEnvironment.toDataStream(result, Row.class).map(new PrintFlinkTable(result));
    Copy code
    STREAM_ENVIRONMENT.execute();
      }
    Copy code
    public static class BuildTableRowsFunc implements FlatMapFunction<JSONObject, Row> {
    Copy code
    @Override
        public void flatMap(JSONObject jsonObject, Collector<Row> collector) throws Exception {
          Set<String> keySet = jsonObject.keySet();
          Row row = new Row(keySet.size());
          int i = 0;
          for (String key : keySet) {
            row.setField(i++, jsonObject.get(key));
          }
          collector.collect(row);
        }
      }
    Copy code
    public static class CustomMapFunction extends RichMapFunction<JSONObject, JSONObject> {
        private final Properties properties;
    Copy code
    public CustomMapFunction(Properties properties) {
          this.properties = properties;
        }
    Copy code
    @Override
        public JSONObject map(JSONObject jsonObject) throws Exception {
          jsonObject.put("tagId", "a1b2c3d4");
          return jsonObject;
        }
      }
    }
    Sample Output
    Copy code
    | f0 |
    | -- |
    | +I[142, data.posting, dateTime, 2015-11-03T09:34:45, 201, 2023-04-30T12:48:15Z, 2016-11-30T20:59:59 application/json, Edm.Int64,_agency, a1b2c3d4e5]
  • a

    Abolfazl Ghahremani

    05/01/2023, 11:02 AM
    tokhmiha
    👍 1
  • о

    Олег Спица

    05/01/2023, 8:55 PM
    Hello everyone! Need some help. Trying to upgrade flink job from flink 1.14.3 to 1.16.0. I have unexpected behaviour for FileSink. I have uint test which checks that job stored avro event into file. To trigger flushing data I’m triggering MiniClusterClient’s
    triggerSavepoint()
    method, but I see that after savepoint created, the file in filesystem still is hidden and inprogress, like:
    build/junit15822280378836783130/archive-output/2_shards/0/2023/04/26/17/.part-0-0.inprogress.8a05ee6b-c969-4e15-88ba-9a3b78f46035
    , but I’m expecting that file should be a permanent (closed, not hidden). I’ve did some attempts to find whats going on and how to fix it and one interesting thing I found: if increase timeout in test and job will trigger checkpoint storing (not savepoint triggered by hands, but checkpoint schedule in 1min) - so after it checkpoint everything as expected: file renamed and stored.
    23:41:38.838 [Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 1 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1682973698824 for job e63ff74fc4bc34a54c3f1f05855d149c.
    23:41:38.948 [jobmanager-io-thread-3] INFO  o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 1 for job e63ff74fc4bc34a54c3f1f05855d149c (15345 bytes, checkpointDuration=121 ms, finalizationTime=3 ms).
    23:42:38.955 [Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1682973758954 for job e63ff74fc4bc34a54c3f1f05855d149c.
    23:42:38.991 [jobmanager-io-thread-5] INFO  o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 2 for job e63ff74fc4bc34a54c3f1f05855d149c (15388 bytes, checkpointDuration=33 ms, finalizationTime=4 ms).
    23:42:39.528 [Test worker] INFO  c.s.d.t.h.FlinkHarness$Companion - Cancelling the job e63ff74fc4bc34a54c3f1f05855d149c...
    Does anyone know why it could be? What changed and how I should use it to fix my tests? I hope that in production it should work with enabled checkpoints, but what about savepoints? Thanks a lot for any help and suggestions
    • 1
    • 5
  • s

    Slackbot

    05/02/2023, 4:08 AM
    This message was deleted.
    h
    • 2
    • 2
  • c

    Chirag Dewan

    05/02/2023, 5:21 AM
    Hi, I understand the FileSystem DataStream FileSource remembers all the processed files in state, forever. This causes the state to grow unbounded, making FileSource impractical to use in a stateful application. Are there any solutions around this?
1...767778...98Latest