https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    Felix Angell

    12/07/2022, 2:25 PM
    anyone ran into issues of a pyflink kda app not being able to resolve modules when upgrading from 1.13.2 to 1.15.2? 😬
    s
    d
    • 3
    • 13
  • m

    Matt Weiss

    12/07/2022, 2:33 PM
    From what I've been reading in a couple different places, is that we want the parallelism of our Kafka consumer in Flink set equal to the number of partitions we have. So if we had 5 partitions in kafka and did
    kafkaSource.setParallelism(5)
    , Flink would make 5 consumer instances, each consuming from a partition. There's gotta be some tradeoff or limit to that I'd think. What's better? 5 task containers each with a parallelism of 1 or 1 container with a parallelism of 5?
  • m

    Morey Straus

    12/07/2022, 4:23 PM
    Hi folks - I've searched and haven't been able to find any document for the support lifecycle. When does 1.14 go EOL?
    m
    • 2
    • 1
  • d

    ding bei

    12/07/2022, 4:48 PM
    Hey guys , I am using FileSystem to write files into s3. Given the checkpoint interval was set to 10s and low traffic ,the file was quite small. So I am trying to use compact feature. However when I tried to upgrade my app, it reported a checkpoint error, then I find something contradictory in the document , I quote( pic 1)
    Copy code
    Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to "Finished" 
    AND
    As part-81fc4980-a6af-41c8-9937-9939408a734b-0 is now pending completion, after the next successful checkpoint, it is finalized
    contradict (pic2)
    Copy code
    Important Note 3: Flink and the FileSink never overwrites committed data. Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed by subsequent successful checkpoints, the FileSink will refuse to resume and will throw an exception as it cannot locate the in-progress file.
    since the first quote said only pending files will be committed and then become a finished file , what does quote 2 mean when it said a in-progress file was committed ?
    👀 1
  • a

    Adisun Wheelock

    12/07/2022, 7:02 PM
    Hi Flink experts: I have a beam pipeline using the flink runner that had an OOM fail in one of the task managers, and on restart it did not read from its checkpoint which resulted in pipeline data loss. I'm using beam's [SnowflakeIO](https://beam.apache.org/documentation/io/built-in/snowflake/) and i'm using a
    checkpointingMode
    of
    EXACTLY_ONCE
    (the default). Does a restart caused by OOM carry different meaning in terms of failure recovery than other reasons for restart? There are some internal discussions at my company going back and forth where some believe OOM will result in different restart semantics and I wanted to come here for the source of truth.
    👍 1
  • d

    Dan Hill

    12/07/2022, 7:10 PM
    Hi. Does anyone have an open sourced, approximate median solution working in Flink?
    👍 1
    • 1
    • 1
  • v

    Victor

    12/07/2022, 8:32 PM
    👋 Hello, team! hoping to get some help with flink issue i’m seeing. Getting the following error when upgrading build with upgradeMode set as savepoint. Any ideas what i could be doing wrong. Happy to provide additional info to help troubleshoot.
  • v

    Victor

    12/07/2022, 8:32 PM
    Untitled.txt
    Untitled.txt
    • 1
    • 1
  • d

    Deryl Rodrigues

    12/07/2022, 10:23 PM
    Hi. Does anyone has tried the following Temporal Join. Khafka Streams Join with multiple hive dimensions table ??
    Copy code
    Khafka Stream Table A 
    LEFT JOIN B ON joining fields
    LEFT JOIN C ON joining fields
  • d

    Deryl Rodrigues

    12/07/2022, 10:24 PM
    Encountered the following error
    Copy code
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
  • d

    Deryl Rodrigues

    12/07/2022, 10:24 PM
    I have defined primary keys in both Table B and C.
    g
    • 2
    • 1
  • e

    Emmanuel Leroy

    12/07/2022, 11:11 PM
    How can i debug code where I want to restore a savepoint to test stuff? I use VSCode. Is there some way to set it up to do this? Thanks
    s
    c
    • 3
    • 2
  • t

    Travis Carter

    12/08/2022, 12:48 AM
    I'm building a statefun platform and very close to completion. I've hit a bit of a wall with our Protobufs. It seems that when running tests or locally everything works just fine, but when I run the container I'm getting the following:
    Copy code
    java.lang.NoClassDefFoundError: Could not initialize class com.thepublichealthco.protos.core.protos.ForecastProtos
    	at com.thepublichealthco.protos.core.protos.Forecast.getDescriptor(Forecast.java:123)
    	at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.extractForecast(ForecastSinkFn.java:56)
    	at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.process(ForecastSinkFn.java:36)
    	at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.process(ForecastSinkFn.java:23)
    	at com.thepublichealthco.statefun.io.elasticsearch.ElasticsearchSinkFunctionDelegate.process(ElasticsearchSinkFunctionDelegate.java:27)
    	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:330)
    	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:86)
    	at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:55)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
    	at org.apache.flink.statefun.flink.core.functions.SideOutputSink.accept(SideOutputSink.java:47)
    	at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:99)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleEgressMessages(RequestReplyFunction.java:248)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleInvocationResultResponse(RequestReplyFunction.java:207)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:178)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:356)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:330)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:321)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onRequest(RequestReplyFunction.java:136)
    	at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:118)
    	at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
    	at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
    	at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
    	at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
    	at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:149)
    	at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    	at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)
    	at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    The type is defined as
    Copy code
    public static final Type<Forecast> FORECAST_TYPE =
          TypeUtils.createProtobufType(TYPES_NAMESPACE, Forecast.getDescriptor(), Forecast.parser());
    ---
    public static <T extends GeneratedMessageV3> Type<T> createProtobufType(String typeNamespace,
                                                                              Descriptors.Descriptor descriptor,
                                                                              Parser<T> parser) {
        return SimpleType.simpleImmutableTypeFrom(
            TypeName.typeNameOf(typeNamespace, descriptor.getFullName()), T::toByteArray,
            parser::parseFrom);
      }
    It seems the issue comes from statefun using Protoc
    3.7.1
    while I'm using
    3.21.7
    in my proto library. Any insight or direction would be greatly appreciated
    e
    • 2
    • 2
  • e

    Emmanuel Leroy

    12/08/2022, 3:57 AM
    Question about Keyed Windowed streams. If I key by an ID that can span a pretty large range within the window time range, and then use a ReduceFunction to aggregate events with the same ID, I end up with a lot of windows, and although the reduce reduces by ID, that’s a lot of events all collected at once at each window close. I end up being throttled on the Kafka sink side for sending too many records at once. To me it doesn’t really matter that the IDs be process at once (i.e. in sync), what matters is that they each be processed over the window timeframe, so they could be staggered. So the question is: is there some trick to stagger the windows, or the output of the reduce function? using timers maybe? knowing that I don’t know how many ids (and therefore windows) will be present at any one time.
    c
    • 2
    • 1
  • t

    Tsering

    12/08/2022, 4:11 AM
    I am facing this weird error can anyone help me with this
    Cannot access org.apache.flink.runtime.state.StateBackend
    e
    • 2
    • 2
  • m

    Matthias

    12/08/2022, 8:37 AM
    I was looking into the Flink Kubernetes Operator documentation about
    LAST_STATE
    recovery. It states that it recovers from HA metadata (see corresponding section in docs). I would interpret this as relying on Flink's HA metadata. But based on what I found in the code (in AbstractJobReconciler:206) it looks like the operator is relying on the savepoints periodically triggered by the operator itself. Am I right or do I miss something here? 🤔
    m
    g
    • 3
    • 14
  • r

    RICHARD JOY

    12/08/2022, 10:05 AM
    Hi everyone! I have my operator running on 'abc' namespace and I deployed a FlinkDeployment to another namespace 'xyz' in the same kubernetes cluster. The job manager is up along with taskmanager and able to connect to rest endpoint.I can see from operator logs that operator from 'abc' namespace able to get job status from 'xyz', however operator throws [ERROR] "Exception while listing jobs" java.util.concurrent.TimeoutException. When I deploy operator and FlinkDeployment job on the same namespace, there is no error. Help appreciated! Thanks.
    Copy code
    ....[ERROR] [xyz/basic-checkpoint-ha-example] Exception while listing jobs 
    java.util.concurrent.TimeoutException.
    at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:228)
    at org.apache.flink.kubernetes.operator.observer.JobStatusObserver.observe(JobStatusObserver.java.68)
    j
    • 2
    • 3
  • y

    Yang LI

    12/08/2022, 1:25 PM
    Hi everyone! we are trying to configure flink-s3-fs-presto with webidentityProvidertoken for fetch aws credentials in flink 1.13.6 . but we see this trace in the log
    Copy code
    Caused by: java.lang.RuntimeException: Error creating an instance of com.amazonaws.auth.WebIdentityTokenCredentialsProvider for URI <s3p://cs-flink-savepoints-eu-west-1/dev/cs-flink/newflink/_entropy_>
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:724)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getAwsCredentialsProvider(PrestoS3FileSystem.java:708)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:632)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:216)
            at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123)
            ... 28 more
    Caused by: java.lang.NoSuchMethodException: com.amazonaws.auth.WebIdentityTokenCredentialsProvider.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
            at java.base/java.lang.Class.getConstructor0(Unknown Source)
            at java.base/java.lang.Class.getConstructor(Unknown Source)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:720)
            ... 32 more
    Because WebIdentityTokenCredentialsProvider is introduced in aws java sdk from version 1.11.603. So I think the flink-s3-fs-presto 1.13.6 is using aws api version of its presto dependency but not the aws version specified in flink-s3-fs-bash 1.11.788 specified here https://github.com/apache/flink/blob/release-1.13.6/flink-filesystems/flink-s3-fs-base/pom.xml#L35. And it's weired because we have excluded aws java sdk of presto dependency in the flink project🤨 Is there anybody who has dealed with this error message before? 🙏
    • 1
    • 1
  • v

    Victor

    12/08/2022, 4:18 PM
    Hi Everyone!!! I’m getting the following exception with flink Deployment when updating flink build image with upgradeMode=savepoint
    Copy code
    [cp104/my-event-processor-flinkdep] Error during event processing ExecutionScope{ resource id: ResourceID{name='my-event-processor-flinkdep', namespace='cp104'}, version: 1574921286} failed.
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.NotFoundException: Operation not found under key: org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@c351173
        at org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:182)
        at
    Any ideas what could be the root cause?
  • a

    Adrian Chang

    12/08/2022, 4:31 PM
    Hello, I am using the Group Window Aggregation
    GROUP BY TUMBLE
    with a user-defined aggregate function. Can I expect the values sent to the function are in the order they were received ? I know that SQL does not guarantee order when applying an aggregation function. Does Flink SQL behave the same way or it respect the order of the events ? Thanks
  • k

    Krish Narukulla

    12/08/2022, 5:29 PM
    How can i consume both hive2 and hive 3 catalogs in flink 1.16. deps:
    Copy code
    "org.apache.hive:hive-serde:3.1.3",
            "org.apache.hive:hive-metastore:3.1.3",
            "org.apache.hive:hive-common:3.1.3",
    Copy code
    Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CREATE CATALOG: (catalogName: [myhive], properties: [{hive-conf-dir=./src/main/java/com/xxx/common/flink/examples/sql/resources/, default-database=xxx, type=hive}])
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1435)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1172)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
            at com.roku.common.flink.examples.sql.Hive.main(Hive.java:53)
    Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Configured default database roku doesn't exist in catalog myhive.
            at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:309)
            at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:211)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1431)
            ... 3 more
    m
    • 2
    • 3
  • j

    Jeremy DeGroot

    12/08/2022, 6:34 PM
    Does anyone have any experience making the processing of side channel output (late records from a window operator) more efficient? We're using Flink to write windowed aggregates to a DB, and when late records come in they seem to be processed one-by-one, but it's still a windowed stream. I would expect them to be grouped together and thus we'd be able to process and aggregate them the same way we do the timely records, but in practice that doesn't seem to be the case. Has anyone else experienced (and hopefully solved) this problem?
    c
    • 2
    • 2
  • r

    Rommel

    12/08/2022, 6:43 PM
    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/formats/parquet/ i am following the example, but the ParquetColumnarRowInputFormat in not available outside of the package, so it can’t be built. is there anything that i missed?
    c
    • 2
    • 5
  • t

    Travis Carter

    12/08/2022, 7:10 PM
    https://apache-flink.slack.com/archives/C03G7LJTS2G/p1670460506906629 It would seem that the issue I'm facing is that our Protobuf Library is external to our StateFun Elasticsearch-IO module. We compile a Java library and use as a dependency using latest Protos version
    3.21.x
    however Statefun has been compiled with Protos version
    3.7.1
    which includes the Lite library that has been removed in
    >=3.8.x
    and is now a plugin. In traditional Flink I was required to register these protos with KryoSerializer. Is there a similar requirement for this case? Or is there something I'm missing in getting our Protos in line with what StateFun Expects?
    • 1
    • 1
  • h

    Hygor Knust

    12/08/2022, 7:28 PM
    Is there a way to set the Avro schema name using Flink SQL? We have an
    upsert-kafka
    sink producing an Avro Schema to Schema Registry. It auto generates a Avro schema like this:
    Copy code
    {
     "fields": [...],
     "name": "record",
     "type": "record"
    }
    The problem is that the
    "name"
    attribute is always
    "record"
    . This breaks compatibility with the gradle-avro-plugin as it tries to create a class using the
    AvroSchema.name
    , which in this case would be
    class Record
    , a reserved keyword in Java.
    c
    h
    • 3
    • 15
  • s

    Suparn Lele

    12/08/2022, 8:15 PM
    Hello, I am using apache flink in batch mode currently. In a nutshell my pipeline looks like this. 1. Take data from database in the form of table through flink jdbc catalog 2. Perform queries on the loaded data 3. Convert table into datastream and store data into table X 4. I run plain jdbc to get the max(timestamp) from the table X But here is the issue. Step 4 gets executed while flink is executing the step 3. Because of this I am not getting actual max(timestamp) because the data is yet not stored. How do I get around this. How can I make sure that flink would run sequentially? Please help
    m
    • 2
    • 4
  • j

    Jason Politis

    12/09/2022, 12:30 AM
    Hello team. I have a custom jar I would like to use, but I'm not seeing the effects that I'm expecting. I'm thinking I've implemented it wrong 😕 I'm using flink 1.160 in docker i'm using sql-client There is a change we need for a client in the debezium avro serialization, mainly it must not be nullable. As you may know, the main jar is the flink-avro-confluent-* jar, but then there is also the flink-sql-avro-confluent-* jar. I've mounted the non sql one into the opt/flink/lib folder of the container and used the sql version with the -j options when starting the sql client. Is this the correct way to implement this? Thank you.
    c
    • 2
    • 6
  • t

    Thomas Zhang

    12/09/2022, 12:48 AM
    Hello all, I'm running flink locally with python in pycharm. I'm wondering how to get plugins to work to access s3 files. I put the flink-s3-fs-hadoop jar in my venv pyflink folder, i.e.
    /.../flink-playground/venv/lib/python3.8/site-packages/pyflink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.15.2.jar
    and my python file I'm running is in
    /.../flink-playground/main.py
    . However I'm still getting
    Could not find a file system implementation for scheme 's3'.
    Is there a way to get this to work with python locally?
    t
    • 2
    • 4
  • k

    Kenny Lu

    12/09/2022, 1:11 AM
    Hello all we are running 1.16 with HA in kubernetes, all is well including the jobs except the task manager pods keep shutting down and spinning up a new one out of the blue. And when it happens we see this exception from the Web Dashboard, does anyone know why TaskExecutor gets shutdown? We even bumped up the heap memory so from a resource perspective, it's plenty.
    Copy code
    2022-12-08 16:44:05
    org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:456)
    	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    s
    e
    s
    • 4
    • 3
  • d

    ding bei

    12/09/2022, 5:02 AM
    Hey guys , I am using FileSystem to write files into s3. Given the checkpoint interval was set to 10s and low traffic ,the file was quite small. So I am trying to use compact feature. However when I tried to upgrade my app, it reported a checkpoint error, then I find something contradictory in the document , I quote( pic 1)
    Copy code
    Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to "Finished" 
    AND
    As part-81fc4980-a6af-41c8-9937-9939408a734b-0 is now pending completion, after the next successful checkpoint, it is finalized
    contradict (pic2)
    Copy code
    Important Note 3: Flink and the FileSink never overwrites committed data. Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed by subsequent successful checkpoints, the FileSink will refuse to resume and will throw an exception as it cannot locate the in-progress file.
    since the first quote said only pending files will be committed and then become a finished file , what does quote 2 mean when it said a in-progress file was committed ?
1...383940...98Latest