Amir Hossein Sharifzadeh
05/15/2023, 5:59 PMEmbeddedRocksDBStateBackend
class. I believe it must be in flink-statebackend-rocksdb-1.18-SNAPSHOT
(or not), so what’s the appropriate dependency for EmbeddedRocksDBStateBackend
in pom.xml
? It sounds like the <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
is not the answer…Abhinav Ittekot
05/15/2023, 6:40 PMCLAIM
mode w/ incremental checkpoints (in S3) state.checkpoints.num-retained: 50
and notice that it takes several minutes for Flink to verify the retained checkpoints before job manager starts to schedule a job. The jobs of JM suggests its validating checkpoints sequentially. Does anyone have ideas on what we can try to speed up this step? We've tried allocating more CPU, IO and Network capacity to the job manager instance but that doesn't seem to help.Bharathkrishna G M
05/15/2023, 7:50 PMval env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val inputFormat = new TextInputFormat(new Path("<gs://bucket/test_dir/>"))
inputFormat.setFilesFilter(FilePathFilter.createDefaultFilter())
// Set up the data stream by reading from the input directory in streaming mode.
val dataStream = env.readFile(inputFormat, "<gs://bucket/test_dir/>", FileProcessingMode.PROCESS_CONTINUOUSLY, 30000)
I have added the gcs-connector-latest-hadoop2.jar
and /plugins/gs-fs-hadoop/
I also have configured checkpoint directory in Flink config. I see some checkpoint files getting created as well when I run the app.
Now the problem I'm facing is that when I delete and restart the app, it is reading all the files from the bucket again. ie. it is not incrementally able to stream from the GCS bucket. Feels like it's not able to use the checkpoint info, or checkpoint itself doesn't capture the info of which files were streamed ? Appreciate any inputs on this use case , thanks!Shivam Bansal
05/15/2023, 8:45 PMHerat Acharya
05/15/2023, 9:43 PMCaused by: java.lang.NullPointerException
at java.base/java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1444)
at java.base/java.util.ArrayList$SubList.size(ArrayList.java:1185)
at java.base/java.util.AbstractList.add(AbstractList.java:111)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:241)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:44)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:796)
at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:124)
... 35 more
It seems its unable to do empty or null list ? any thoughts, also using flink 1.15.0André Casimiro
05/16/2023, 1:51 AMDavid Wisecup
05/16/2023, 3:09 AMThe message "Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module"
This error message indicates an issue that is occurring due to restrictions on reflection introduced in Java 9 with the introduction of the Java Platform Module System (JPMS).
For flink 1.17.0, what's the recommend Java version which is most compatible?David Wisecup
05/16/2023, 3:10 AMHJK nomad
05/16/2023, 6:07 AMHJK nomad
05/16/2023, 6:08 AMenv = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
my_array = [1, 2, 3, 4, 5]
stream = env.from_collection(collection=my_array)
stream.print()
env.execute()
but job finished, and tm log error.
2023-05-16 054832,411 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn Logging clients still connected during shutdown.
2023-05-16 054832,421 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint.
2023-05-16 054832,426 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint.
2023-05-16 054832,487 WARN org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Error cleaning up servers urn: "beamenvprocess:v1"
payload: "\032H/usr/local/lib/python3.8/dist-packages/pyflink/bin/pyflink-udf-runner.sh\"\225\002\n\004PATH\022\214\002/root/miniconda3/condabin:/usr/liHJK nomad
05/16/2023, 6:09 AMRicco Førgaard
05/16/2023, 8:06 AMConfig option 'state.checkpoint-storage' is ignored because the checkpoint storage passed via StreamExecutionEnvironment takes precedence
However, I don't understand where this gets passed via StreamExecutionEnvironment
. All I have in my code is
val env = StreamExecutionEnvironment.getExecutionEnvironment
and I don't make any changes to it besides setting max parallelism. Any ideas?Monish Bhanushali
05/16/2023, 10:13 AMEl Houssine Talab
05/16/2023, 11:19 AMJean-Baptiste PIN
05/16/2023, 11:30 AMJean-Baptiste PIN
05/16/2023, 12:44 PMJean-Baptiste PIN
05/16/2023, 12:46 PMPrithvi Dammalapati
05/16/2023, 2:25 PMSumit Singh
05/16/2023, 5:29 PMAdesh Dsilva
05/16/2023, 5:46 PMString paths = "<s3://bucket/path/20231011/,s3://bucket/path/20231012/,s3://bucket/path/20231013/>"
tableEnv.createTemporaryTable("MyTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", inputPaths)
.format(FormatDescriptor.forFormat("avro").build())
.build());
Each path above has multiple avro files and I cannot use <s3://bucket/path>
because I only want to read a particular range of folders.Prashant Doolabh
05/16/2023, 5:53 PMPyFlink
1.16.0 or 1.17.0 with a kerberized Hive cluster ?
Using the below config in my flink-conf.yaml
works only if i perform a kinit
with *.keytab
file prior to starting the pyflink job.
security.kerberos.login.use-ticket-cache: true
Without the kinit
, if i use the following configs, i experience an error when the job attempts to connect to the Hive-MetaStore
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /app/home/user/pyflink.keytab
security.kerberos.login.principal: pyflink@DOMAIN.COM
security.kerberos.krb5-conf.path': /etc/krb5.conf
Exception:
Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
Using the keytab
is the ideal solution considering it is a long running streaming mode job and the ticket would expire and need to be renewed. Any suggestions will be highly appreciated. 🙏. #PyFlink #FlinkSQLSumit Singh
05/16/2023, 7:01 PMTrystan
05/16/2023, 7:33 PMfetch
call?
the source says
In either case, this method should be reentrant, meaning that
* the next fetch call should just resume from where the last fetch call was waken up or
* interrupted.
but how? what if a split is partially finished (and emitted) but the taskmanager dies? i don’t see a way to keep the partially progressed state of a split alive after emitting some recordsTrystan
05/16/2023, 7:34 PMTrystan
05/16/2023, 7:38 PMharish reddy
05/16/2023, 9:43 PMTrevor Burke
05/16/2023, 11:04 PMBucketAssigner
so we can allow users to partition data on Kafka event time or Flink processing time.
Seeing The implementation of the BulkFormatBuilder is not serializable. The object probably contains or references non serializable fields.
James Timotiwu
05/17/2023, 12:13 AMDongwoo Kim
05/17/2023, 1:28 AMDheeraj Panangat
05/17/2023, 5:29 AM