https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Ravi Teja

    01/13/2023, 9:39 AM
    Hi, As of now, we know that kinesis does not support Upsert Stream for Sink. We are using TableAPI's SQL to generate a realtime stream, the query we are authoring generates changelog as both insert (+I) and updates (-U, +U). We observe the below error when we try to sync the upsert stream to the kinesis sink.
    Copy code
    `doesn't support consuming update changes which is produced by node GroupAggregate`.
    It will be great if the community can guide us on best practices for resolving the above issue. Thanks.
    j
    n
    • 3
    • 3
  • a

    Abhinav sharma

    01/13/2023, 12:50 PM
    Hi, I am aggregating some results in Flink. I have created a spring boot application where I am creating an API to send the Flink results as the API response. Is it possible to do that?
    s
    • 2
    • 6
  • n

    Nathanael England

    01/13/2023, 3:09 PM
    I could use some help conceptualizing the right way to frame a problem in Flink. I can’t use the complex event processing system due to the nature of other requirements. Anyway, say I have a data stream of events where each event has an event time and a Boolean field (call it A for simplicity). I’d like to send an alert out if we ever see a continuous stretch of events where A is true for at least five seconds. Put another way, alert if we did not see an event where A was false within five seconds of an event where A was true. My gut tells me that some combination of a sliding window and custom trigger is what this calls for, but I can’t quite nail it down.
    👍 1
    s
    d
    • 3
    • 19
  • n

    Nathanael England

    01/13/2023, 7:00 PM
    I might phrase this poorly, but I'm interested in understanding a detail of event time. If you partition a datastream by keys with
    keyby
    , do the watermarks of those partitioned streams evolve separately from one another?
    z
    d
    • 3
    • 17
  • v

    Vinay Agarwal

    01/13/2023, 7:02 PM
    Hello team, re-asking ways to improve AVRO deserialization performance https://apache-flink.slack.com/archives/C03G7LJTS2G/p1673457155729579
    r
    • 2
    • 1
  • m

    Michael Parrott

    01/13/2023, 9:25 PM
    I’m trying to understand how Flink works - I created a
    KeyedCoProcessFunction
    and based on these docs, it recommends setting the logger to
    static
    . if the parallelism of the operator for the coproessfunction is > 1, the operator might exist on multiple nodes, so how would a static variable work in this case?
    w
    • 2
    • 1
  • b

    Bhaarat

    01/14/2023, 8:19 PM
    Hi - New to Flink and FlinkSQL I'm looking to just play with some queries and came across https://github.com/ververica/flink-sql-cookbook . If I'm not using the Ververica Platform is there a packaged docker-compose that would let me play around with the cookbooks here?
    m
    j
    +3
    • 6
    • 11
  • s

    Sumit Nekar

    01/15/2023, 6:00 AM
    Hello Folks, I am trying to experiment with task slots of task managers and assigning optimal resources to them. I have tried two approaches and able achieve optimal configuration with which application processes without any back pressure for the workload we have.I would need your suggestion on which approach is better . Flink job consumes from 4 kafka topics and writes 4 kafka topics. It has keyed state (rocks db as state backend) which it refers to perform event deduplication. I am deploying this flink job in application mode using flink kubernetes operator 1. Task manager having multiple task slots. In this approach, each TM is having more than 1 task slots (setting it to 10 in our case) and assigning higher memory and higher cpu (cpu: 2). This way each task slots shares managed memory but one slot might consume all the heap memory sometimes. So there is not heap memory isolation. With this setup, its observed that we need to set higher parallelism (100-160) for the operators that take heavy load. 2. Each task manager having only one slot. In this approach, each TM is having only one slot and assigning 1 CPU and optimal memory for each slot. This way each task is guaranteed to have certain heap and managed memory. No heap memory sharing among task slots. With this setup, we observed that application processed fine with lesser parallelism compared to first approach. So it is about more parallel tasks vs more resources to each task with heap isolation. Any thoughts on these approaches?
    d
    • 2
    • 2
  • l

    Lei Su

    01/16/2023, 2:32 AM
    Hi, everyone. I have a checkpoint issue with window. I have a Flink job that consumes Kafka topic data, then use a TumblingProcessingTimeWindows with 10 minutes size and the checkpoint(aligned) interval is 4 minutes. When some data is in the window but window is not triggered, meanwhile a checkpoint is triggered and succeed. Before window is triggered, an OOM is occurred in TM, after TM restarted, all data in window is missing(checkpoint succeed, so the offset is push forward). I think this is because checkpoint barriers just ignore the window and flow forward, so data in window is missing, right?
  • l

    Lei Su

    01/16/2023, 3:38 AM
    and I do not use a agg after window, I only use a process method after window and use a Java ArrayList to add each data in window.
  • a

    Amenreet Singh Sodhi

    01/16/2023, 5:15 AM
    Hi Team, I am trying to use external metric reporter for flink, but I am getting the following error:
    Copy code
    sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    Has anyone faced the same issue, How to resolve it, do I need to add some new certificates to cacerts? Thanks
    z
    • 2
    • 7
  • r

    Rishab Sachdeva

    01/16/2023, 5:59 AM
    Hey team. We are using Confluent cloud. In this setup, our CRM system produces raw data on a Kafka topic. Then a Flink job (running in AWS EMR) transforms this raw data into meaningful data and stores on another topic. Problem statement: This Flink jobs gives errors beyond a certain number of events ( for reference - we have provisioned for 100k events but the Flink job fizzles beyond 5-10k events, stop and resumes after 10-20 mins). On the confluent side, there are no apparent errors as it's a managed service and no throttling has been configured. Summarized Error description: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings". Current keepalive time (before throttling): 20000ms
    s
    • 2
    • 2
  • a

    Abdelhakim Bendjabeur

    01/16/2023, 10:53 AM
    Hello everyone, I am working on a customer facing analytics application, and I am planning on using flinkSQL to join multiple kafka topics. The result will be in a ClickHouse table. The topics are CDC events produced by debezium, and I would like to join rows from topic A with rows from topic B when the creation date of rows B are always inferior than the update date of rows A. What is the impact of the state size if I just did the following?
    Copy code
    SELECT
    ...
    FROM A
    JOIN B ON B.aid = A.id
    AND A.created_datetime <= B.updated_datetime
    I can maybe use temporal joins on a 1-year period but it has a risk of data loss in case some entities have updates that span over more than 1 year. Anyone with experience on this or just ideas on how to properly design such a pipeline?
    s
    • 2
    • 1
  • n

    Nathanael England

    01/16/2023, 4:46 PM
    Are there any best practice recommendations for maintaining a sorted list in keyed state?
    d
    • 2
    • 9
  • m

    Matt Weiss

    01/16/2023, 7:03 PM
    We are consuming from two Kafka topics to produce a single stream of data. A 24 hours tumbling event time window is applied to this stream along with an aggregate function. At the end of the 24 hours, the result of the aggregate is upserted into a mysql db. The problem its that one of the topics has significantly less messages than the other (on the order of tens of thousands vs millions) and in the case of replaying all the data (we retain 7 days worth in our topic), the smaller topic causes the watermark to advance and the daily windows to fire, resulting in the data in the other topic to be late and thrown out.. Is there a way to consume from multiple topics at a rate than maintains event time order? Also, in general are any best practices we should be aware of or are there other things to consider when aggregating together multiple dissimilar data sources?
    z
    k
    +2
    • 5
    • 13
  • n

    Nathanael England

    01/17/2023, 12:07 AM
    If I have a non-keyed data stream piped into a process function with
    .process
    , that will be run in parallel, right? I don't need to maintain any state in this and the only discussion I'm seeing on parallelism restrictions are when you want to do windowing.
    d
    • 2
    • 4
  • s

    Sergii Mikhtoniuk

    01/17/2023, 1:23 AM
    What's the correct way to convert a
    Table
    or a
    DataStream<Row>
    into
    DataStream<RowData>
    ? I'm trying this:
    Copy code
    DataStream<Row> resultStream = tEnv.toChangelogStream(resultTable);
    RowRowConverter converter = RowRowConverter.create(resultTable.getResolvedSchema().toSinkRowDataType());
    
    DataStream<RowData> resultStreamRowData = resultStream.map(
      converter::toInternal,
      InternalTypeInfo.of(resultTable.getResolvedSchema().toSinkRowDataType().getLogicalType())
    );
    but I think I'm assigning
    TypeInformation
    incorrectly as it crashes with type casting errors. Context: I want to periodically save my table's changelog stream into
    parquet
    format. The only way I found so far is to go from
    Row
    to
    RowData
    only then to
    Avro::GenericRecord
    and then to use
    AvroParquetWriters
    yuck :(
  • k

    kingsathurthi

    01/17/2023, 5:54 AM
    We are using flink operator, when we deploy flinkdeployment, Job Manager, taskmanager and service(UI port:8081) for job manager is automatically created. My question is how to modify the default port and how to add additional ports in the job manager service?
    m
    z
    • 3
    • 12
  • p

    P S

    01/17/2023, 8:04 AM
    Hi I've tried connecting the minio with flink sql and I am getting this error, can anyone help me with this.
    m
    s
    • 3
    • 5
  • m

    Mehul Batra

    01/17/2023, 1:46 PM
    Is there any good examples or article to understand Json_value or Json_query flink sql functions?
    m
    • 2
    • 1
  • a

    Abdelhakim Bendjabeur

    01/17/2023, 1:53 PM
    Hello, I am trying to create a table from Kafka relying on apicurio registry using the confluent compatibility offered by apicurio.
    Copy code
    ) WITH (
        'connector' = 'kafka',
        'property-version' = 'universal',
        'properties.bootstrap.servers' = 'host.docker.internal:29092',
        'scan.startup.mode' = 'earliest-offset',
        'value.format' = 'avro-confluent',
        'value.avro-confluent.url' = '<http://host.docker.internal:8765/apis/ccompat/v6>', -- apicurio
        'topic' = 'my-topic',
        'properties.group.id' = 'mmy-consumer-group'
    );
    Here is the error
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId-0' was found.; error code: 40403
    The endpoint runs fine
    Copy code
    curl <http://localhost:8765/apis/ccompat/v6/schemas/ids/1>                                                                                                                                                                   
    
    {"schema":"{\"type\":\"record\",\
    ...
    }
    But it seems that Flink isn't able to read the bytes containing the schema ID. It keeps reading
    0
    instead of
    1, 2, ...
    Has anyone ever faced this issue, and is there a config to add to tell Flink to look at the first 8 bytes for the artefact ID?
    m
    r
    • 3
    • 5
  • r

    Reme Ajayi

    01/17/2023, 3:25 PM
    Hi All, Can someone help out with reading avro data from Confluent Cloud Kafka topics in Flink? I have specified the Avro-confluent format using Flink's Table API in the src DDLs
    Copy code
    src_ddl = """
    CREATE TABLE source_kafka (
        viewtime INT,
        registertime BIGINT,
        pageid STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'pageviews',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'avro-confluent',
        'avro-confluent.schema-registry.url' = '<http://localhost:8081/>',
        'properties.group.id' = 'test-001',
        'properties.bootstrap.servers' = 'localhost:9092'
    );"""
    But I keep running into this error.
    Copy code
    java.io.IOException: Failed to deserialize consumer record due to
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = pageviews, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1673563915562, serialized key size = 2, serialized value size = 21, headers = RecordHeaders(headers = [RecordHeader(key = task.generation, value = [48]), RecordHeader(key = task.id, value = [48]), RecordHeader(key = current.iteration, value = [53])], isReadOnly = false), key = [B@4ee8961e, value = [B@1a9f96a).
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
        ... 14 more
    Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
        ... 15 more
    Caused by: java.lang.ArrayIndexOutOfBoundsException
    Any insight on how to fix would be really helpful?
    s
    • 2
    • 15
  • a

    Adrian Chang

    01/17/2023, 5:21 PM
    Hello guys I am using the connector
    postgres-cdc
    but
    confirmed_flush_lsn
    is not increasing even if I update data on the table and the update is received on Flink. This is causing full storage on Postgres. This is my table on Postgres
    Copy code
    CREATE TABLE IF NOT EXISTS aerial_core_development.thing_meta
    (
        id integer NOT NULL DEFAULT nextval('aerial_core_development.thing_meta_id_seq'::regclass),
        thing_serial character varying(64) COLLATE pg_catalog."default" NOT NULL,
        floor integer,
        location character varying(32) COLLATE pg_catalog."default",
        location_custom text COLLATE pg_catalog."default",
        name character varying(64) COLLATE pg_catalog."default",
        device_type character varying(64) COLLATE pg_catalog."default",
        timezone character varying(64) COLLATE pg_catalog."default",
        CONSTRAINT thing_meta_pkey PRIMARY KEY (id),
        CONSTRAINT thing_meta_thing_serial_fkey FOREIGN KEY (thing_serial)
            REFERENCES aerial_core_development.thing (serial) MATCH SIMPLE
            ON UPDATE NO ACTION
            ON DELETE CASCADE
    )
    WITH (
        OIDS = FALSE
    )
    TABLESPACE pg_default;
    
    ALTER TABLE IF EXISTS aerial_core_development.thing_meta
        OWNER to aerial;
    
    REVOKE ALL ON TABLE aerial_core_development.thing_meta FROM flink;
    
    GRANT ALL ON TABLE aerial_core_development.thing_meta TO aerial;
    
    GRANT DELETE, INSERT, UPDATE, SELECT ON TABLE aerial_core_development.thing_meta TO aerial_core;
    
    GRANT SELECT ON TABLE aerial_core_development.thing_meta TO flink;
    This is how I use the connector
    Copy code
    CREATE TABLE thing_meta (
            thing_serial VARCHAR NOT NULL,
            timezone VARCHAR
        ) WITH (
            'connector' = 'postgres-cdc',
            'hostname' = 'localhost',
            'port' = '54321',
            'username' = 'flink_bridge',
            'password' = <password>,
            'database-name' = 'aerial_core_development',
            'schema-name' = 'aerial_core_development',
            'table-name' = 'thing_meta',
            'decoding.plugin.name' = 'pgoutput',
            'slot.name' = 'thing_meta_adrian'
        )
    Postgres engine version is 10.21
    Copy code
    SELECT 
    	*, 
    	pg_current_wal_lsn(),
    	pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag
    FROM pg_replication_slots;
    Do you have any idea why
    confirmed_flush_lsn
    is not advancing ? Thanks
    data-1673976006096.csv
    s
    • 2
    • 7
  • n

    Nathanael England

    01/17/2023, 5:59 PM
    I'm looking for some job design insight. I have two process functions in my system. The first operates event data directly from devices (keyed by a device ID). That process function generates side outputs that are keyed by a rule identifier for further processing and feed into the other process function. I would like for the second process function to maintain an understanding of the latest event time seen for each device, but the current layout doesn't achieve that. Some ideas I had • Make the first process function generate a side output of something simple like
    {device: <id>, time: <event time>}
    , broadcast that side output stream and connect it to the stream that feeds the second process function. • Generate multiple events out of the first process function like
    {rule: <id>, device: <id>, time: <event time>}
    where all that changes is rule ID so that this side output can be connected to the other side output that is keyed by the same namespace. • Something else? The former seems easier but perhaps not the proper intent of broadcast state. The latter seems like a lot of extra traffic/noise in the system.
    • 1
    • 1
  • c

    Colin Williams

    01/17/2023, 6:21 PM
    I'm getting the following exception
    Copy code
    org.apache.flink.table.api.ValidationException: Column types of query result and sink for 'temporaryCatalog.default.enrichmentOut' do not match.
    Cause: Different number of columns.
    
    Query schema: [id: STRING NOT NULL, enrichment: STRING NOT NULL, ts: TIMESTAMP(3) *ROWTIME*]
    Sink schema:  [id: STRING, status: STRING, enrichment: STRING, ts: TIMESTAMP(3)]
    Based on https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL I think I need to set a watermark strategy and perhaps rowtime field for my table job. Can I set this on the Table schema below? Is there a flink doc that shows this or would someone please kindly provide the syntax for doing this with the Table API?
    Copy code
    Schema.newBuilder()
                    .column("id", DataTypes.STRING().notNull())
                    .column("status", DataTypes.STRING().notNull())
                    .column("enrichment", DataTypes.STRING().notNull())
                    .column("ts",DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
                    .primaryKey("id")
                    .watermark("ts","ts")
                    .build();
    m
    • 2
    • 57
  • j

    Jason Politis

    01/17/2023, 7:57 PM
    Hello everyone. Have either of you come across regressing checkpoint ids?
    m
    • 2
    • 3
  • k

    Krish Narukulla

    01/18/2023, 3:52 AM
    how can i get OpenApi 3.0 spec json for CRD https://github.com/apache/flink-kubernetes-operator?
    • 1
    • 1
  • z

    Zeyu Qiu

    01/18/2023, 4:17 AM
    Hello team, I’m using PyFlink and found the
    DataStream.map()
    in python seems doesn’t work. I write a Python demo like:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.enable_checkpointing(1000)
    
    source = KafkaSource.builder().build()
    
    stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "team_config_source")
    
    sink = FileSink \
        .for_row_format('/opt/result/', Encoder.simple_string_encoder("UTF-8")) \
        .with_output_file_config(OutputFileConfig.builder()
                                 .with_part_prefix("team_config")
                                 .with_part_suffix(".json")
                                 .build()) \
        .with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024 ** 3, rollover_interval=15 * 60 * 1000,
                                                                  inactivity_interval=5 * 60 * 1000)) \
        .build()
    
    def mapping(data):
        return data
    
    stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
    env.execute()
    But Flink gives me this exception:
    Copy code
    2023-01-18 11:34:34 Traceback (most recent call last):
    2023-01-18 11:34:34   File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    2023-01-18 11:34:34     return _run_code(code, main_globals, None,
    2023-01-18 11:34:34   File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
    2023-01-18 11:34:34     exec(code, run_globals)
    2023-01-18 11:34:34   File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 223, in <module>
    2023-01-18 11:34:34     process2()
    2023-01-18 11:34:34   File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 218, in process2
    2023-01-18 11:34:34     stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
    2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 312, in map
    2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 654, in process
    2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 991, in _find_and_load
    2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
    2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
    2023-01-18 11:34:34   File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
    2023-01-18 11:34:34   File "<frozen zipimport>", line 259, in load_module
    2023-01-18 11:34:34   File "/opt/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
    2023-01-18 11:34:34 AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
    According to the exception, I look into the
    flink_fn_execution_pb2.py
    , and I found the code in this file’s beginning was:
    Copy code
    DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile('xxx')
    _INPUT = DESCRIPTOR.message_types_by_name['Input']
    But seems
    DescriptorPool.AddSerializedFile()
    doesn’t have a return statement, so
    DESCRIPTOR
    is always
    None
    . Am I using the
    DataStream.map()
    in a wrong way or there is something wrong? My environment is: • Flink 1.16.0 • Python Package: apache-flink==1.16.0
    d
    • 2
    • 1
  • l

    Lei Su

    01/18/2023, 4:31 AM
    Hello team. Will the checkpoint ignore the data in window? If TM restarts after checkpoint but window is not triggered, data in window will loss?
    d
    m
    • 3
    • 4
  • s

    Sumit Nekar

    01/18/2023, 5:11 AM
    Hello, I was experimenting with the upgrade modes supported by flink operator. Need some inputs. As per docs, 1. last-state: Quick upgrades in any application state (even for failing jobs), does not require a healthy job as it always uses the latest checkpoint information. Manual recovery may be necessary if HA metadata is lost. 2. savepoint: Use savepoint for upgrade, providing maximal safety and possibility to serve as backup/fork point. The savepoint will be created during the upgrade process. Note that the Flink job needs to be running to allow the savepoint to get created. If the job is in an unhealthy state, the last checkpoint will be used (unless
    kubernetes.operator.job.upgrade.last-state-fallback.enabled
    is set to
    false
    ). If the last checkpoint is not available, the job upgrade will fail. So is it always better to use savepoint as it falls back last state mode in case when job is unhealthy.? In savepoint upgrade , always savepoint is taken before upgrade/restart. Can it increase job estarting time? Which one is preferred for a statefull stream processing flink job?
    s
    g
    • 3
    • 6
1...474849...98Latest