https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • g

    Gerald Schmidt

    02/16/2023, 6:17 PM
    Hi everyone, I have a slightly peculiar question. We can deploy all components of the Flink operator except the webhooks (validating + mutating). Is there a way of making good use of the operator without the webhooks? How central are they to the operator's design?
  • r

    Reme Ajayi

    02/16/2023, 6:42 PM
    Hi everyone, how do I retrieve timestamps from records received from confluent Kafka. I want to assign these timestamps using
    .withTimestampAssigner()
    on my Confluent Kafka sources. Bonus question, how do I know if flink already extracted timestamps from the records. The documentation, says that if timestamps are set in the headers of my records, Flink automatically retrieves them, does anyone know how I can check this?
    d
    • 2
    • 10
  • h

    Herat Acharya

    02/16/2023, 11:09 PM
    Hello folks .. I am trying to deploy a flink cluster using kubernetes-session.sh .. I have created an image with a jar inside and want to create a flink cluster on kubernetes but i am getting this .. I am using flink 1.16.1 please help ... i am royally blocked on this .. the pod keeps on restarting
    Copy code
    Enabling required built-in plugins
    Linking flink-s3-fs-hadoop-1.16.1.jar to plugin directory
    Successfully enabled flink-s3-fs-hadoop-1.16.1.jar
    sed: couldn't open temporary file /opt/flink/conf/sedKxVpXP: Read-only file system
    sed: couldn't open temporary file /opt/flink/conf/sed2jL9cQ: Read-only file system
    /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied
    /docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
    Starting kubernetes-session as a console application on host media-streaming-e1np-5c99487d59-5vwn2.
    SLF4J: No SLF4J providers were found.
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See <http://www.slf4j.org/codes.html#noProviders> for further details.
    SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions prior to 1.8.
    SLF4J: Ignoring binding found at [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Ignoring binding found at [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.19.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See <http://www.slf4j.org/codes.html#ignoredBindings> for an explanation.
    g
    • 2
    • 1
  • t

    Tony Wang

    02/16/2023, 11:45 PM
    Watermarks in batch mode: if I turn on minibatch processing like this:
    Copy code
    Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
    Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
    Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
    Does watermarks set like this:
    create TABLE users_table (ts TIMESTAMP(3), user_id INT, category INT, price FLOAT, featureA INT, WATERMARK FOR ts AS ts) WITH ('connector'='filesystem','path'='/home/ziheng/streaming-stuff/datagen/users.csv','format'='csv');
    even work? When I printed out that table it showed a watermark column but that watermark is based on the processing time on my computer instead of the
    ts
    column defined.
    ➕ 1
  • a

    Anjiang Wei

    02/17/2023, 1:34 AM
    Could anyone help me with this issue? I assume it would not be a hard question for flink experts:
    Copy code
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
            at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    We have tried hard to resolve the dependency issue, but still failed. We have written down a readme to describe the problem:
    <https://github.com/Anjiang-Wei/flink/blob/tony/dataset/README.md>
    Any feedback will be appreciated, many thanks!
  • a

    Anjiang Wei

    02/17/2023, 1:37 AM
    A simpler question would be: how to use parquet with sql client? More specifically, how to resolve the dependency issue?
  • t

    Tony Yeung

    02/17/2023, 2:09 AM
    Hi guys. Encounter checkpoint with long start delay. Look for help to investigate and tune. Thanks. Here is a Flink application to capture MySQL CDC event and write to Iceberg table. The checkpoint start delay can be very high sometimes. The start delay occurs in the source subtask (CDC source). Wondering what might be the cause and whether it can be tuned. It is still in initial snapshot stage and only 1 large table is included. No backpressure issue is spotted
  • m

    Madan

    02/17/2023, 3:23 AM
    Hi Guys, After flink job submitted in EMRand if one of the nodes lost we are seeing below error even though other nodes are available to pick it up and also retry strategy is not working as well. Can some one help me here
    Copy code
    org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could not send message [RemoteFencedMessage(00000000000000000000000000000000, RemoteRpcInvocation(null.updateTaskExecutionState(TaskExecutionState)))] from sender [Actor[<akka.tcp://flink@ip-10-121-176-58.vpc.internal:42393/temp/jobmanager_2$Rs]]> to recipient [Actor[<akka://flink/user/rpc/jobmanager_2#-1101702724]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
    j
    • 2
    • 2
  • t

    Tony Wang

    02/17/2023, 5:02 AM
    what happens to watermarks after a groupby aggregation in FlinkSQL?
  • u

    肖文浩

    02/17/2023, 9:08 AM
    I would like to ask if anybody use flink to write data to hdfs above version 2.7. When restoring from checkpoint, the file will be truncate. If the file is written in a compressed format, the file will be damaged after truncate. Is there any good way?
  • d

    Dheeraj Panangat

    02/17/2023, 11:01 AM
    Hi Team, How can I generate stream within another stream ? For Eg: I have a stream of data with Kafka as Source. For each event from Kafka I want to fetch data from database. Can I get the data from data base as a stream ? Transform the kafka stream into database stream based on the data I get in Kafka. Appreciate any help with this. Thanks
    b
    j
    • 3
    • 2
  • s

    Sai Sharath Dandi

    02/17/2023, 8:11 PM
    Hi folks, we use HDFS to checkpoint for our flink jobs. Is there any way to expose HDFS usage metrics from flink?
  • j

    Jeremy Ber

    02/17/2023, 10:02 PM
    I know this is probably not a good idea, but if I have a set of (small) images of type
    .jpg
    in Amazon S3, and I want to use the FileSource to read those in, is there any out of the box way to do this? Currently I am reading them in using the
    TextLineInputFormat
    and trying to convert this to an image.
  • r

    Rohan Kumar

    02/18/2023, 11:49 AM
    Hi, in flink kubernetes operator how to limit the number of taskmanagers for batch jobs?
    w
    • 2
    • 1
  • k

    Krish Narukulla

    02/18/2023, 8:32 PM
    Running into error, i build simple application jar(without flink deps), however application ran successful with uber jar.
    Copy code
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.16.1.jar:1.16.1]
            ... 13 more
    Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
            at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
            at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
            at org.apache.flink.table.api.brid
  • j

    Jagan Nalla

    02/19/2023, 11:17 PM
    Hi all, I've simple code. All I'm trying to do is group the data. However it is not grouping the data when I print. dt = datetime.fromtimestamp(epoch_time) # print(dt) table = t_env.from_elements([(1, 10, 'ml' ,dt), (2, 20, 'ml', dt), (3, 25, 'policy', dt)], DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT()), DataTypes.FIELD("c", DataTypes.STRING()), DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)), ])) # # table.print_schema() table.execute().print() # # # orders = t_env.from_path("table") # # table.window(Tumble.over(lit(2).seconds).on(col('rowtime')).alias("w"))\ result = table\ .group_by(col("c"))\ .select(col("c"), col("b").count.alias('d')) result.execute().print() +----+-------------+-------------+--------------------------------+-------------------------+ | op | a | b | c | rowtime | +----+-------------+-------------+--------------------------------+-------------------------+ | +I | 1 | 10 | ml | 2023-02-19 181416.000 | | +I | 2 | 20 | ml | 2023-02-19 181416.000 | | +I | 3 | 25 | policy | 2023-02-19 181416.000 | +----+-------------+-------------+--------------------------------+-------------------------+ 3 rows in set +----+--------------------------------+----------------------+ | op | c | d | +----+--------------------------------+----------------------+ | +I | policy | 1 | | +I | ml | 1 | | -U | ml | 1 | | +U | ml | 2 | +----+--------------------------------+----------------------+ 4 rows in set
    d
    • 2
    • 79
  • t

    Tyler Wood

    02/20/2023, 12:27 AM
    using
    %flink.ssql
    in a zeppelin notebook on AWS. Have a field
    created_at
    that is a double and is getting read into zeppelin as eg
    1.293848576E9
    When I try to apply
    FROM_UNIXTIME
    to cast it to a timestamp I get the following error:
    Copy code
    SQL validation failed. From line 1, column 8 to line 1, column 49: Cannot apply 'FROM_UNIXTIME' to arguments of type 'FROM_UNIXTIME()'. Supported form(s): 'FROM_UNIXTIME()'
    'FROM_UNIXTIME(, )'
    the query is
    Copy code
    %flink.ssql(type=update)
    
    SELECT FROM_UNIXTIME(created_at) FROM table;
    What does that error message mean? it looks like i’m following exactly the function signature…
    h
    • 2
    • 4
  • r

    Raghunadh Nittala

    02/20/2023, 4:30 AM
    Hello everyone, We’re on Flink stable 1.16.1 deployment using Flink Kubernetes operator. We are getting
    file is too short
    exceptions in the environment.
    Error while opening RocksDB instance.
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:243) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:222) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:189) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:169) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:503) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) [flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) [flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) [flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) [flink-dist-1.16.1.jar:1.16.1]
    at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: org.rocksdb.RocksDBException: file is too short (4044 bytes) to be an sstable/opt/flink/rocksdb/<jobId>
    Can someone please provide inputs on this?
    m
    m
    • 3
    • 5
  • a

    Abdelhakim Bendjabeur

    02/20/2023, 11:32 AM
    Hello, Quick question about Joins in Flink SQL: I have a query where I write a simple left join. I noticed that the preview of changes is different when I add the keyword
    outer
    ! To my understanding, the
    outer
    keyword is superfluous and should not change the result of the join. Am I missing something?
    Copy code
    with t1 AS (
    select 
      ...
    from `table1` t1
    ),
    t2 AS (
    select
      ...
    from `table1` t2
    )
    select 
      *
    FROM t1
    left [OUTER] join t2
    ON t1.accountId = t2.accountId 
    AND t1.id = t2.id
    The change contains only the
    create
    operation when using the
    outer
    , whereas it contains
    create
    -
    delete
    -
    create
    in a simple left join (with null on the second part of the data
    m
    • 2
    • 1
  • a

    Ari Huttunen

    02/20/2023, 12:47 PM
    FYI. My biggest problem (our biggest problem) with Flink is that we're constantly lacking some jars, and we're not sure which ones and which versions. For Java code this should be simpler, but we're using pyflink. Then we try to re-read the docs and use Google. Most frustrating.
    🙁 1
  • b

    Benjamin Wootton

    02/20/2023, 1:22 PM
    Hi guys. I had a question about Hive metastore and Hudi. In Flink SQL I create a catalog pointing to Hive metastore. I then create a database and a Hudi table. The Hudi table is created and Hive Metastore learns about the database and the table. The problem is that Hive does not know about the columns, only the table object. Is this by design or am I doing something wrong?
    n
    • 2
    • 16
  • b

    Benjamin Wootton

    02/20/2023, 1:23 PM
    Untitled
    Untitled
  • b

    Benjamin Wootton

    02/20/2023, 1:23 PM
    Untitled
    Untitled
  • n

    Nathanael England

    02/20/2023, 8:20 PM
    I'm seeing side outputs go to my main datastream sink. I did
    datastream.add_sink(<one test sink>)
    and then
    datastream.get_side_output(<output tag>).add_sink(<another test sink>)
    . However, my side outputs are showing up in the first sink. I followed the example in https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ which seemed quite simple, so I'm not sure what I could do wrong.
    d
    u
    • 3
    • 11
  • j

    Jeremy Ber

    02/20/2023, 9:34 PM
    any tricks to generating a random key distribution? can you generate one when your data is all pretty much the same?
  • n

    Nathanael England

    02/21/2023, 1:37 AM
    Are there any limitations to state descriptors? For example, is embedding a map in a map acceptable?
    Copy code
    state.MapStateDescriptor('example', Types.STRING(),
                             Types.MAP(<http://Types.INT|Types.INT>(), Types.STRING()))
    d
    • 2
    • 4
  • n

    Nathanael England

    02/21/2023, 2:01 AM
    Related to the above: Why would I be able to access
    MapState.items()
    from my
    process_element
    function but get the following error when calling it from
    open
    Copy code
    TypeError: None has type NoneType, but expected one of: bytes, unicode
    Does that just mean the state hasn't actually been created yet?
    d
    • 2
    • 6
  • t

    Ting Yin

    02/21/2023, 2:36 AM
    hi, team, does any one know how to resolve this error ?org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager. Error: Could not find or load main class org.openjdk.jmh.runner.ForkedMain Caused by: java.lang.ClassNotFoundException: org.openjdk.jmh.runner.ForkedMain <forked VM failed with exit code 1>
  • t

    Ting Yin

    02/21/2023, 2:41 AM
    The error message "Error: Could not find or load main class org.openjdk.jmh.runner.ForkedMain" suggests that the Java Virtual Machine (JVM) is not able to find the class org.openjdk.jmh.runner.ForkedMain.,I build the jar file using the flink-benchmarks with jhm dependencies, not sure what should included in the setting?
  • n

    Nathanael England

    02/21/2023, 4:55 AM
    Had some interesting behavior with map state:
    is_empty()
    returned
    False
    , but
    list(keys())
    and
    list(values())
    are both empty lists.
    d
    • 2
    • 1
1...575859...98Latest