https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Amir Hossein Sharifzadeh

    05/15/2023, 5:59 PM
    My application does not recognize
    EmbeddedRocksDBStateBackend
    class. I believe it must be in
    flink-statebackend-rocksdb-1.18-SNAPSHOT
    (or not), so what’s the appropriate dependency for
    EmbeddedRocksDBStateBackend
    in
    pom.xml
    ? It sounds like the
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
    </dependency>
    is not the answer…
    d
    • 2
    • 2
  • a

    Abhinav Ittekot

    05/15/2023, 6:40 PM
    hi 👋 we are deploying Flink in
    CLAIM
    mode w/ incremental checkpoints (in S3)
    state.checkpoints.num-retained: 50
    and notice that it takes several minutes for Flink to verify the retained checkpoints before job manager starts to schedule a job. The jobs of JM suggests its validating checkpoints sequentially. Does anyone have ideas on what we can try to speed up this step? We've tried allocating more CPU, IO and Network capacity to the job manager instance but that doesn't seem to help.
  • b

    Bharathkrishna G M

    05/15/2023, 7:50 PM
    👋 hello I'm trying to run a Flink app which reads from a GCS bucket (continuous streaming) Code roughly looks like:
    Copy code
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE)
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        val inputFormat = new TextInputFormat(new Path("<gs://bucket/test_dir/>"))
        inputFormat.setFilesFilter(FilePathFilter.createDefaultFilter())
    
        // Set up the data stream by reading from the input directory in streaming mode.
        val dataStream = env.readFile(inputFormat, "<gs://bucket/test_dir/>", FileProcessingMode.PROCESS_CONTINUOUSLY, 30000)
    I have added the
    gcs-connector-latest-hadoop2.jar
    and
    /plugins/gs-fs-hadoop/
    I also have configured checkpoint directory in Flink config. I see some checkpoint files getting created as well when I run the app. Now the problem I'm facing is that when I delete and restart the app, it is reading all the files from the bucket again. ie. it is not incrementally able to stream from the GCS bucket. Feels like it's not able to use the checkpoint info, or checkpoint itself doesn't capture the info of which files were streamed ? Appreciate any inputs on this use case , thanks!
    s
    • 2
    • 12
  • s

    Shivam Bansal

    05/15/2023, 8:45 PM
    Hi Team, I am new to apache flink and trying to setup flink locally.I have pre installed python==3.8, java==11.0.11 and apache-flink==1.17.0. I am trying to run simple word count code but getting below error, Can anyone please help me on this error.
    d
    • 2
    • 9
  • h

    Herat Acharya

    05/15/2023, 9:43 PM
    Hello .. .I am using kryo 5.0.5 in flink .. I am getting this exception
    Copy code
    Caused by: java.lang.NullPointerException
    	at java.base/java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1444)
    	at java.base/java.util.ArrayList$SubList.size(ArrayList.java:1185)
    	at java.base/java.util.AbstractList.add(AbstractList.java:111)
    	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:241)
    	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:44)
    	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:796)
    	at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:124)
    	... 35 more
    It seems its unable to do empty or null list ? any thoughts, also using flink 1.15.0
    • 1
    • 1
  • a

    André Casimiro

    05/16/2023, 1:51 AM
    Hi, why in BATCH mode Flink only process 1 task at a time? In the image at least reading from the sources could certainly be executed in parallel...
    w
    • 2
    • 4
  • d

    David Wisecup

    05/16/2023, 3:09 AM
    I'm running into a lot of different runtime errors centered around:
    Copy code
    The message "Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module"
    This error message indicates an issue that is occurring due to restrictions on reflection introduced in Java 9 with the introduction of the Java Platform Module System (JPMS). For flink 1.17.0, what's the recommend Java version which is most compatible?
  • d

    David Wisecup

    05/16/2023, 3:10 AM
    I'm using Java 17 btw
    d
    • 2
    • 2
  • h

    HJK nomad

    05/16/2023, 6:07 AM
    hi bro i run flink python job just simple case.
  • h

    HJK nomad

    05/16/2023, 6:08 AM
    the code like this:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    env.set_parallelism(1)
    
    my_array = [1, 2, 3, 4, 5]
    
    stream = env.from_collection(collection=my_array)
    stream.print()
    
    env.execute()
    but job finished, and tm log error. 2023-05-16 054832,411 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn Logging clients still connected during shutdown. 2023-05-16 054832,421 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint. 2023-05-16 054832,426 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint. 2023-05-16 054832,487 WARN org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error cleaning up servers urn: "beamenvprocess:v1" payload: "\032H/usr/local/lib/python3.8/dist-packages/pyflink/bin/pyflink-udf-runner.sh\"\225\002\n\004PATH\022\214\002/root/miniconda3/condabin:/usr/li
    d
    • 2
    • 4
  • h

    HJK nomad

    05/16/2023, 6:09 AM
    the TM matespace oom ...how to clear up this or setting?
  • r

    Ricco Førgaard

    05/16/2023, 8:06 AM
    I'm running a simple Flink 1.15 job in Kubernetes using the Flink Operator and I see this in the logs:
    Copy code
    Config option 'state.checkpoint-storage' is ignored because the checkpoint storage passed via StreamExecutionEnvironment takes precedence
    However, I don't understand where this gets passed via
    StreamExecutionEnvironment
    . All I have in my code is
    Copy code
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    and I don't make any changes to it besides setting max parallelism. Any ideas?
  • m

    Monish Bhanushali

    05/16/2023, 10:13 AM
    Hi Folks, trying to run a flink cluster in multi node environment with 1 master and 1 worker node and trying to run a job on master but getting error running below command for executing job ./bin/flink run ./examples/streaming/WordCount.jar --input /home/test_wc/test_wo_c.txt --output /home/test_wc/output1.txt OR ./bin/flink run ./examples/streaming/WordCount.jar Getting below error : Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/127.0.0.1:32087
  • e

    El Houssine Talab

    05/16/2023, 11:19 AM
    Can someone help in this case please? https://stackoverflow.com/questions/76257572/records-not-showing-in-flink-when-watermarking-is-activated
  • j

    Jean-Baptiste PIN

    05/16/2023, 11:30 AM
    Hi, I've a job listening from kafka and sinking to mongodb. I have a weird behavior that I can't explain. When I launch the process it's all good but then it stop before the end. If I send a new event in kafka then I got my missing event from the previous batch and I miss some of the new batch... then again... It's like I got some data in waiting somewhere... do someone can help me figure this out ?
  • j

    Jean-Baptiste PIN

    05/16/2023, 12:44 PM
    Also in this pics, left is source Records In et right source Records Out.. using kafka I got twice out
  • j

    Jean-Baptiste PIN

    05/16/2023, 12:46 PM
    Also in the next step (filter) I receive 1222 records In..
  • p

    Prithvi Dammalapati

    05/16/2023, 2:25 PM
    Hi, Is there a setting in Flink-conf to make Presto S3 plugin create a $folder$ metadata folder in S3 for savepoints? fs-s3-hadoop plugin does this by default
  • s

    Sumit Singh

    05/16/2023, 5:29 PM
    Hi, Any doc how to setup Flink on multi-node ?
    w
    • 2
    • 1
  • a

    Adesh Dsilva

    05/16/2023, 5:46 PM
    Hi What is the best way to create a table source from multiple input paths on Flink 1.16?
    Copy code
    String paths = "<s3://bucket/path/20231011/,s3://bucket/path/20231012/,s3://bucket/path/20231013/>"
    
    tableEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("filesystem")
            .schema(schema)
            .option("path", inputPaths)
            .format(FormatDescriptor.forFormat("avro").build())
            .build());
    Each path above has multiple avro files and I cannot use
    <s3://bucket/path>
    because I only want to read a particular range of folders.
    m
    • 2
    • 3
  • p

    Prashant Doolabh

    05/16/2023, 5:53 PM
    Hi , has anyone had any success using
    PyFlink
    1.16.0 or 1.17.0 with a kerberized Hive cluster ? Using the below config in my
    flink-conf.yaml
    works only if i perform a
    kinit
    with
    *.keytab
    file prior to starting the pyflink job.
    Copy code
    security.kerberos.login.use-ticket-cache: true
    Without the
    kinit
    , if i use the following configs, i experience an error when the job attempts to connect to the
    Hive-MetaStore
    Copy code
    security.kerberos.login.use-ticket-cache: true 
    security.kerberos.login.keytab: /app/home/user/pyflink.keytab
    security.kerberos.login.principal: pyflink@DOMAIN.COM
    security.kerberos.krb5-conf.path': /etc/krb5.conf
    Exception:
    Copy code
    Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
    Using the
    keytab
    is the ideal solution considering it is a long running streaming mode job and the ticket would expire and need to be renewed. Any suggestions will be highly appreciated. 🙏. #PyFlink #FlinkSQL
    • 1
    • 1
  • s

    Sumit Singh

    05/16/2023, 7:01 PM
    Hi, Running Flink on EC2, what config to change in flink-conf.yaml to see Web UI on <http://public-ip:8081>
  • t

    Trystan

    05/16/2023, 7:33 PM
    is there an example out there of a batch source with splits that may or may not finish in a given
    fetch
    call? the source says
    Copy code
    In either case, this method should be reentrant, meaning that
         * the next fetch call should just resume from where the last fetch call was waken up or
         * interrupted.
    but how? what if a split is partially finished (and emitted) but the taskmanager dies? i don’t see a way to keep the partially progressed state of a split alive after emitting some records
  • t

    Trystan

    05/16/2023, 7:34 PM
    there’s a cassandra source example but it sidesteps this issue with some trickery to say “nah this can’t happen, it’ll all fit in memory i promise”
  • t

    Trystan

    05/16/2023, 7:38 PM
    i could modify the split, but evidently you’re not supposed to do that - you’re supposed to modify the SplitState. so i’m not sure what this could actually mean, given that the SplitReader gets Splits, and not SplitStates…
    j
    • 2
    • 3
  • h

    harish reddy

    05/16/2023, 9:43 PM
    Hi guys, Did anyone got around writing files to adls using pyflink ? I am getting a could not create committable serializer error, any advice would be appreciated
  • t

    Trevor Burke

    05/16/2023, 11:04 PM
    Looking for guidance on my code to extend
    BucketAssigner
    so we can allow users to partition data on Kafka event time or Flink processing time. Seeing
    The implementation of the BulkFormatBuilder is not serializable. The object probably contains or references non serializable fields.
    • 1
    • 5
  • j

    James Timotiwu

    05/17/2023, 12:13 AM
    Hello, I read here that bulk encoded FileSinks extends a checkpoint-based rolling policy. Does this mean that they cannot be used in Batch execution mode? On Flink 1.14
  • d

    Dongwoo Kim

    05/17/2023, 1:28 AM
    Hi all, I have a question about incremental snapshotting. I have been using incremental snapshotting with RocksDB as the state backend. However, I’ve noticed that even though I’ve configured incremental snapshotting, the size of the last checkpoint periodically matches the size of a full snapshot. Just like the image below. I thought incremental snapshots would only include changes from the last snapshot, so I expected their sizes to be smaller. Is this expected behavior? Has anyone experienced something like this or can help me understand why this is happening? Any thoughts or suggestions are welcome. Thanks in advance. Lastly, I will add my flink configurations in the below. flink version: 1.16.1 “state.backend”: “rocksdb”, “state.backend.incremental”: “true”, “state.checkpoint-storage”: “filesystem”, “state.checkpoints.num-retained”: “10" “execution.checkpointing.mode”: “EXACTLY_ONCE”, “execution.checkpointing.interval”: “30s”, “s3.endpoint”: “{s3_endpoint}“, “state.checkpoints.dir”: “s3://flink-checkpoint/checkpoints/****”, “state.savepoints.dir”: “s3://flink-checkpoint/savepoints/****,
    h
    o
    • 3
    • 5
  • d

    Dheeraj Panangat

    05/17/2023, 5:29 AM
    Hi Team, For the Flink Operator I see stable version as 1.5.0, but couldn't find the helm repo for download Is this yet to be released ?
    g
    • 2
    • 2
1...808182...98Latest