https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • n

    Nikola Stanisavljevic

    03/15/2023, 5:16 PM
    Hello, Is there a way to change subscription offset in flink source state (pulsar-flink connector)? State processor api does not allow for processing state of pulsar source from a savepoint. I am using flink version 1.16.0. Or i dont know how to access pulsar source state with state processor api.
    m
    • 2
    • 4
  • y

    Yufei Chen

    03/15/2023, 6:34 PM
    [About Autoscaler Usage] @Gyula Fóra Continue with my previous post https://apache-flink.slack.com/archives/C03G7LJTS2G/p1678490813655019, where I am trying to use autoscaler with Flink Kubernetes Operator 1.4.0 version, then I submitted a simple job to ingest Kafka data, while the pending records number is very high, I didn’t see the autoscale work as the job remains being executed in 1 slot while there are 7 free slots available. By following your suggestion, I checked on the Operator Pod log, and I can see the following: For metrics between the =========================== Starting metrics report =========================== and =========================== Finished metrics report =========================== There is no Job related metric reported (such as Utilization, input rate, target rate metrics). And log always shows “Resource fully reconciled, nothing to do” for the autoscaler FlinkDeployment. My questions is: does this means the autoscaler is not working at all? Is the configuration wrong? Any guidance would be really appreciated.
    Copy code
    2023-03-15 18:20:01,775 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/myauto] Starting reconciliation
    2023-03-15 18:20:01,776 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][default/myauto] Getting service for myauto
    2023-03-15 18:20:01,788 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/myauto] Resource fully reconciled, nothing to do...
    2023-03-15 18:20:01,788 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/myauto] End of reconciliation
  • p

    piby 180

    03/15/2023, 6:38 PM
    Hi, Could anyone help me debug this? I have added all the jars I could find (being desperate here). The jar paths are correct because flink-parquet worked.
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
    jars = "file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-sql-parquet-1.16.1.jar; \
                 file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-parquet-1.16.1.jar; \
                 file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-hadoop-fs.jar; \
                file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-hadoop-compatibility_2.12-1.16.1.jar; \
                file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/hadoop-common-3.3.4.jar; \
                    file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/hadoop-client-3.3.4.jar"
    env.add_jars(jars)
    env.add_classpaths(jars)
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)
    
    # FileSource
    
    file_source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), "/mnt/c/Users/piby/Downloads/flink/examples/python/input") \
                                .monitor_continuously(Duration.of_millis(5)) \
                                .build()
    ds = env.from_source(file_source, watermark_strategy=WatermarkStrategy.no_watermarks(), source_name="File Source")
    ds = ds.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
               .key_by(lambda i: i[0],  key_type=Types.STRING())\
               .reduce(lambda i, j: (i[0], i[1] + j[1]))
    
    row_type = DataTypes.ROW([
        DataTypes.FIELD(name='word', data_type=DataTypes.STRING(), description="word name"),
        DataTypes.FIELD(name='count', data_type=<http://DataTypes.INT|DataTypes.INT>(), description="total count")
    ])
    
    
    file_sink = FileSink.for_bulk_format(base_path=output_path, \
                    writer_factory=ParquetBulkWriters.for_row_type(
                                                        row_type,
                                                        hadoop_config=Configuration(),
                                                        utc_timestamp=True,
                                                    ))  \
                    .with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
                    .with_output_file_config( 
                        OutputFileConfig.builder() 
                        .with_part_prefix("prefix") 
                        .with_part_suffix(".parquet") 
                        .build()) \
                    .with_rolling_policy(RollingPolicy.default_rolling_policy()) \
                    .build()
    
    ds.sink_to(sink=file_sink)
    env.execute()
    Here is the error stack
    Copy code
    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    Cell In[1], line 57
         46 ds = ds.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
         47            .key_by(lambda i: i[0],  key_type=Types.STRING())\
         48            .reduce(lambda i, j: (i[0], i[1] + j[1]))
         50 row_type = DataTypes.ROW([
         51     DataTypes.FIELD(name='word', data_type=DataTypes.STRING(), description="word name"),
         52     DataTypes.FIELD(name='count', data_type=<http://DataTypes.INT|DataTypes.INT>(), description="total count")
         53 ])
         56 file_sink = FileSink.for_bulk_format(base_path=output_path, \
    ---> 57                 writer_factory=ParquetBulkWriters.for_row_type(
         58                                                     row_type,
         59                                                     utc_timestamp=True,
         60                                                 ))  \
         61                 .with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
         62                 .with_output_file_config(
         63                     OutputFileConfig.builder()
         64                     .with_part_prefix("prefix")
         65                     .with_part_suffix(".parquet")
         66                     .build()) \
         67                 .with_rolling_policy(RollingPolicy.default_rolling_policy()) \
         68                 .build()
         70 ds.sink_to(sink=file_sink)
         71 env.execute()
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/formats/parquet.py:212, in ParquetBulkWriters.for_row_type(row_type, hadoop_config, utc_timestamp)
        208 jvm = get_gateway().jvm
        209 JParquetRowDataBuilder = jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
        210 return RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
        211     _to_java_data_type(row_type).getLogicalType(),
    --> 212     create_hadoop_configuration(hadoop_config),
        213     utc_timestamp
        214 ), row_type)
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:39, in create_hadoop_configuration(config)     
         37 def create_hadoop_configuration(config: Configuration):
         38     jvm = get_gateway().jvm
    ---> 39     hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
         40     for k, v in config.to_dict().items():
         41         hadoop_config.set(k, v)
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/util/exceptions.py:185, in install_py4j_hooks.<locals>.wrapped_call(self, *args, **kwargs)
        184 def wrapped_call(self, *args, **kwargs):
    --> 185     raise TypeError(
        186         "Could not found the Java class '%s'. The Java dependencies could be specified via "
        187         "command line argument '--jarfile' or the config option 'pipeline.jars'" % self._fqn)
    
    TypeError: Could not found the Java class 'org.apache.hadoop.conf.Configuration'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
    u
    • 2
    • 1
  • k

    Krish Narukulla

    03/15/2023, 6:56 PM
    Running into this error frequently, any help to resolve
    Copy code
    2023-03-15 18:45:43,383 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to cancel task Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (28/32)#3754 (7a8b9af226cd4e016196e035ecd2d11a_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_27_3754).
    2023-03-15 18:45:43,383 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Task Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (28/32)#3754 is already in state FAILED
    2023-03-15 18:45:43,383 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (32/32)#3754 (7a8b9af226cd4e016196e035ecd2d11a_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_31_3754) switched from DEPLOYING to FAILED with failure cause: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
    old:[p-72449e261468158ece284f84d3d13d5995e23b89-295012ce09be8c6d30610c99e0563bf1]
    new:[p-72449e261468158ece284f84d3d13d5995e23b89-14ef7154e3312d2ce68d9fb6776e5b7f]
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
    k
    • 2
    • 1
  • c

    Chloe He

    03/15/2023, 7:06 PM
    Hi! We're looking to write to Redis using the HSET data structure in FlinkSQL, and we have been using a Redis connector from the community, but it looks like it doesn't not support writing multiple values at once. Is anyone aware of a Redis connector that supports FlinkSQL and HSET?
  • p

    piby 180

    03/15/2023, 10:44 PM
    Hi again, I am unable to deserialize Avro record. There is a similar question here with no answer 😕 https://stackoverflow.com/questions/74825256/failed-to-deserialize-avro-record-getting-arrayindexoutofboundsexception
    Copy code
    deserialization_schema = AvroRowDeserializationSchema(
            avro_schema_string="""
                {
                    "type": "record",
                    "name": "TestObject",
                    "namespace": "ca.dataedu",
                    "fields": [{
                        "name": "count",
                        "type": ["null", "int"],
                        "default": null
                    }, {
                        "name": "word",
                        "type": ["null", "string"],
                        "default": null
                    }]
                }
                """
        )
    
    
    kafka_source = KafkaSource.builder() \
        .set_bootstrap_servers('0.0.0.0:9092') \
        .set_topics("test_topic") \
        .set_group_id("my-group") \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(deserialization_schema) \
        .build()
    
    ds = st_env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    Error traceback
    Copy code
    Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -62 out of bounds for length 2
            at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
            at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
            at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
            at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
            at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
            at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
            at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
            at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
            at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
            at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
            ... 18 more
    Python code on the producer side
    Copy code
    from random_word import RandomWords
    import json
    from kafka import KafkaProducer
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda m: json.dumps(m).encode('utf-8'))
    
    r = RandomWords()
    Copy code
    for i in range(10):
        producer.send('test_topic', {'word': r.get_random_word(), 'count' : i})
    s
    u
    • 3
    • 4
  • p

    piby 180

    03/16/2023, 8:56 AM
    Hey all, I would like to partition my data into compressed parquet files. Is it possible to add compression codec (like snappy or gzip ) for parquet files in Pyflink? I couldn't find anything in the documentation.
    Copy code
    file_sink = FileSink.for_bulk_format(base_path=output_path, \
                    writer_factory=ParquetBulkWriters.for_row_type(
                                                        row_type,
                                                        hadoop_config=Configuration(),
                                                        utc_timestamp=True,
                                                    ))  \
                    .with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
                    .with_output_file_config( 
                        OutputFileConfig.builder() 
                        .with_part_prefix("prefix") 
                        .with_part_suffix(".parquet") 
                        .build()) \
                    .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
                    .build()
    d
    • 2
    • 2
  • e

    Evaldas Buinauskas

    03/16/2023, 1:57 PM
    Hi, I have a use case when I need to preload some data from multiple Kafka Streams for a broadcast state and use it as enrichment for the main stream. Is it possible to delay main stream startup until the data preloads?
    m
    d
    • 3
    • 8
  • s

    Slackbot

    03/16/2023, 3:01 PM
    This message was deleted.
    m
    e
    • 3
    • 5
  • a

    Abhinav sharma

    03/16/2023, 4:18 PM
    In the flink code, I am reading data from a Kafka topic and sinking the results to postgresDB. This works fine. If now I want to check if there exists an entry in the database for the same message that came in from Kafka and then decide if I want to update the row or insert a new row, how can I achieve this?
  • t

    Thijs van de Poll

    03/16/2023, 4:45 PM
    Hi all! I have the following problem: • I am trying to create a CDC pipeline from Postgres to Iceberg, • I am performing a
    groupBy
    operation and attempt to collect a list of nested objects. This results in:
    MULTISET<MAP<STRING,STRING>>
    . I do not necessarily need a multiset, Ideally it would be:
    ARRAY<MAP<STRING,STRING>>
    , but I cannot find how to do that (except using UDFs I guess)? • When I write to Iceberg, I get the following error:
    Invalid map: MULTISET<MAP<STRING, STRING>> is not a map
    . Iny ideas?
  • a

    Adam Augusta

    03/16/2023, 6:17 PM
    Would I be correct in presuming that Flink lacks the semantics to attempt to minimize hash exchanges by trying to take advantage of the partition assignments in a Kafka source? If it’s possible, I’d look into changing the upsert-kafka connector to support it, but it doesn’t seem likely.
  • a

    Adam Augusta

    03/16/2023, 6:17 PM
    *Flink planner
  • t

    Trystan

    03/16/2023, 6:21 PM
    are checkpoints guaranteed to be resumable across version upgrades? and if so, when the job takes its next checkpoint after a version upgrade, will it wind up being a continuation of the incremental checkpointing or will it behave as if it was restored from a savepoint and it’ll “look like” a full checkpoint?
    m
    • 2
    • 2
  • a

    Adam Augusta

    03/16/2023, 8:18 PM
    Running into a couple things that look like Planner flaws. Just sanity checking my analysis.
    Copy code
    tEnv.createTemporaryView(“contractswithevents”, “select contract_id, isin, leg1.events as leg1events, leg2.events as leg2events from contracts”));
    Table leg1Events = tEnv.sqlQuery(“select contract_id, isin, ‘leg1’ as leg, e.event_id, e.quantity from contractswithevents CROSS JOIN UNNEST(leg1events) as e(event_id, quantity)”);
    Table leg2Events = …
    Table events = leg1Events.unionAll(leg2Events);
    TablePipeline pipeline = events.groupBy($(“contract_id”), $(“quantity”).sum()).insertInto(“results”);
    The optimized execution plan has 2 `Exchange(distribution=[hash[contract_id]])`s, one after the scan, and one between the union and the aggregate. What gives? Is the planner getting baffled by the unnest?
    m
    s
    • 3
    • 4
  • a

    Adam Augusta

    03/16/2023, 8:19 PM
    (You might wonder why I created the trivial view above. Unnesting on a property of another nested property gives a nonsensical error.)
  • a

    Adam Augusta

    03/16/2023, 8:38 PM
    I’m not surprised that adding 1 contract row with three events results in 3 separate output rows on the same key. And I can grasp why a minor upsert on that contract results in 6 outputs on that key. But I do wonder if there’s a simple way to suppress the incomplete output rows.
  • e

    Emmanuel Leroy

    03/16/2023, 10:03 PM
    trying to use Beam with Flink on Kubernetes using the flink-operator. I can run Beam jobs on a Session Cluster, but I note that it only spins up a single TM. Isn’t the operator supposed to be looking at the number of tasks and auto-scale the number of TMs to match the need? or is this happening because Beam does not actually submit the job to the operator, but directly to the JobManager?
  • j

    Jeff Levesque

    03/17/2023, 1:01 AM
    Is there a way in PyFlink I can specify a
    DEFAULT
    value for a column? If my original table is as follows:
    Copy code
    return """CREATE TABLE {0} (
            ticker VARCHAR(6),
            {2} TIMESTAMP(3),
            window_end TIMESTAMP(3),
            first_price DOUBLE,
            last_price DOUBLE,
            min_price DOUBLE,
            max_price DOUBLE,
            notify_email VARCHAR(320),
            WATERMARK FOR {2} AS {2} - INTERVAL '{3}' SECOND
        ) WITH (
            'connector' = '{1}'
        )""".format(
            table_name,
            connector,
            field_watermark,
            watermark_interval
        )
    How can I perform something like the following:
    Copy code
    return """CREATE TABLE {0} (
            ticker VARCHAR(6),
            {2} TIMESTAMP(3),
            window_end TIMESTAMP(3),
            first_price DOUBLE,
            last_price DOUBLE,
            min_price DOUBLE,
            max_price DOUBLE,
            notify_email VARCHAR(320) METADATA DEFAULT '<mailto:my@email.com|my@email.com>',
            WATERMARK FOR {2} AS {2} - INTERVAL '{3}' SECOND
        ) WITH (
            'connector' = '{1}'
        )""".format(
            table_name,
            connector,
            field_watermark,
            watermark_interval
        )
    When I try something like that I get the following error:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
    : org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "DEFAULT" at line 9, column 44.
    Was expecting one of:
        "FROM" ...
        "COMMENT" ...
        "VIRTUAL" ...
        ")" ...
        "," ...
        
    	at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "DEFAULT" at line 9, column 44.
    Was expecting one of:
        "FROM" ...
        "COMMENT" ...
        "VIRTUAL" ...
        ")" ...
        "," ...
        
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:452)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:215)
    	at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
    	at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
    	at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
    	at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
    	... 13 more
    Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 9, column 44.
    Was expecting one of:
        "FROM" ...
        "COMMENT" ...
        "VIRTUAL" ...
        ")" ...
        "," ...
        
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39897)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39708)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5221)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6239)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20999)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3421)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3924)
    	at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:263)
    	at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
    	... 15 more
    
    
    Process finished with exit code 1
    d
    • 2
    • 3
  • j

    Jeff Levesque

    03/17/2023, 1:06 AM
    I'm using pyflink version 1.13
  • c

    Chen-Che Huang

    03/17/2023, 1:50 AM
    Hi, for performance and cost concerns, we use
    incremental checkpointing
    and configure
    state.checkpoints.num-retained
    parameter to retain 10 checkpoints only in our Flink applications. Although Flink keeps only 10 checkpoints, the number of files under
    shared
    folder keeps increasingly. To reduce the storage cost, it seems that the
    CLAIM mode
    mentioned in this doc is useful. We configure
    execution.savepoint-restore-mode
    to
    CLAIM
    but Flink still doesn’t delete files under the
    shared
    folder. I’m curious about the
    CLAIM mode
    only delete savepoints and checkpoints after the Flink app starts from using a new savepoint? If so, does anyone know whether there exists some way to limit the growth of files under the
    shared
    folder for checkpoints? Any comment is appreciated 🙏
    ł
    c
    • 3
    • 2
  • t

    Ta-Chun Shen

    03/17/2023, 4:04 AM
    Hi, I have a problem about checkpoint and rescaling. I'm using Flink 1.16.0 with incremental checkpointing enabled and Flink operator 1.3.0. Few days ago I increased the job parallelism from 36 to 48 (max parallelism 4096) using
    last-state
    upgrade mode. After the job was restored, I noticed that the full checkpoint size drastically increased from 600GB to 900GB. To investigate this issue, I extract the internal RocksDB from one of the operator subtask state and use the
    ldb
    tool to dump the database. I find that there are a large number of deleted rows. The following is the command I use:
    Copy code
    $ ldb --db=./db --column_family=name idump --hex | grep 'type:1' | wc -l
    134529889
    $ ldb --db=./db --column_family=name idump --hex | grep 'type:0' | wc -l
    33779429
    We can see that most of the rows are deleted. Next, I try to manually run compaction on the database and the size decreases from 16GB to 6.6GB. Why could rescaling create so many deleted rows in this case? Is there any configuration to alleviate the size increasing problem or it is just the expected behavior of rescaling?
  • s

    Soumya Ghosh

    03/17/2023, 7:04 AM
    Hi, is there a way to implement following pattern in Flink SQL MATCH_RECOGNIZE clause? Pattern: Event A not followed by Event B in
    x
    minutes
    d
    • 2
    • 1
  • a

    Abhinav sharma

    03/17/2023, 7:21 AM
    In the flink code, I am reading data from a Kafka topic and sinking the results to postgresDB. This works fine. If now I want to check if there exists an entry in the database for the same message that came in from Kafka and then decide if I want to update the row or insert a new row, how can I achieve this?
    s
    s
    • 3
    • 4
  • a

    Abhinav sharma

    03/17/2023, 12:21 PM
    How can I use an upsert statement in Flink when using datastream api to read data from Kafka? For example: if I read a message from Kafka, I insert it into the database or else I check if there exists an entry for the same record, I update the record. How can I achieve this?
  • e

    Evaldas Buinauskas

    03/17/2023, 12:32 PM
    What is the recommended strategy to scale Flink jobs with async operators? I need to do multiple asynchronous Redis calls for data enrichment and my operator throughput is only at ~3k RPS. Do I add more nodes? Do I increase CPU? I've deployed a job with parallelism of 35 to a 10-node cluster and it quickly saturated the CPU. Thank you!
    s
    • 2
    • 1
  • f

    Francisco Morillo

    03/17/2023, 1:17 PM
    Hi Everyone! Is there any way to create a RowType from an Iceberg Schema and defining it into NotNull? I am creating it using the FlinkSchemaUtil but as the rowtype is nullable is failling to deserialize the avro record. (I am using SpecificRecord_
  • l

    Liad Shachoach

    03/17/2023, 4:26 PM
    does anyone have any idea how to fix this?
  • e

    Emmanuel Leroy

    03/17/2023, 8:37 PM
    using Flink Operator, I often get this kind of error when submitting a job:
    Copy code
    Caused by: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /tmp/flink-web-efe42546-a97a-4393-87c2-f61f782595dc/flink-web-upload/5d8e11e0-4736-49b7-ac82-0e2bc9bd3b5b_beam.jar does not exist
    Somehow the job ends up working after some time when the jar is found. However using Beam, the Beam job fails because of this, and I see the Beam job restarting many, many times before it finally works. Is there any kind of setting to define how to ‘wait’ somehow for the jar to be present before attempting to launch the job? or is it a bug?
    • 1
    • 1
  • d

    Deepyaman Datta

    03/17/2023, 9:37 PM
    Updating with a simpler example: When I try to perform a query with a subquery and in-memory table, I get an
    unexpected correlate variable
    error; e.g.:
    Copy code
    Flink SQL> SELECT * FROM (VALUES (TIMESTAMP '2023-03-17 22:00:50', '2475090000000000', 0), (TIMESTAMP '2023-03-17 22:00:14', '630423000000', 0)) t0(inference_time, cc_num, is_fraud)
    > WHERE inference_time > (
    >   SELECT max(event_time)
    >   FROM iceberg_tx_average_transaction_amount_tx_1m t1
    >   WHERE t0.cc_num = t1.cc_num
    > );
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.TableException: unexpected correlate variable $cor14 in the plan
    --- Hi! I'm trying to execute the following query:
    Copy code
    WITH t0 AS (
      SELECT t3.`event_time`, t3.`cc_num`,
             t3.`amt` AS `average_transaction_amount_tx_1m`
      FROM iceberg_tx_average_transaction_amount_tx_1m t3
    ),
    t1 AS (
      SELECT t0.`event_time`, t0.`average_transaction_amount_tx_1m`,
             t0.`cc_num` AS `_right_by`
      FROM t0
    )
    SELECT `inference_time`, `cc_num`, max(t2.`is_fraud`) AS `is_fraud`,
           max(t1.`average_transaction_amount_tx_1m`) AS `average_transaction_amount_tx_1m`
    FROM (VALUES (TIMESTAMP '2023-03-17 20:48:49', '6540980000000000', 0), (TIMESTAMP '2023-03-17 20:48:51', '571465000000', 0)) t2(inference_time, cc_num, is_fraud)
      LEFT OUTER JOIN t1
        ON t2.`cc_num` = t1.`_right_by`
    WHERE t1.`event_time` = (
      SELECT max(t1.`event_time`) AS `Max(event_time)`
      FROM t1
      WHERE (t1.`_right_by` = t2.`cc_num`) AND
            (t1.`event_time` <= t2.`inference_time`)
    )
    GROUP BY inference_time, cc_num
    and I'm getting some
    unexpected correlated variable $cor0 in the plan
    error. Any pointers as to what might be causing this? I assume the SQL is valid, but Flink is having trouble building the query plan for some reason, and I can't find information on it. I have tried significantly reducing the size of my table created from
    VALUES
    (from ~200 rows to 2), just in case it was an issue of complexity. If I change
    (VALUES (TIMESTAMP '2023-03-17 20:48:49', '6540980000000000', 0), (TIMESTAMP '2023-03-17 20:48:51', '571465000000', 0)) t2(inference_time, cc_num, is_fraud)
    to
    spine t2
    (an existing table), it works as expected... I'll also share the stack trace in the thread:
    • 1
    • 3
1...656667...98Latest