Duc Anh Khu
08/15/2022, 10:10 PMflink:1.13.2-scala_2.12-java11
cluster. Thank you đ .
env.add_source(kafka_consumer).print()
This works fine until I add a .map
such as:
env.add_source(kafka_consumer).map(lambda a: a).print()
The error I'm getting is:
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
at java.base/java.lang.ProcessBuilder.start(Unknown Source)
at java.base/java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:193)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:177)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:353)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:261)
at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126)
at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:264)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:121)
at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:108)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: error=2, No such file or directory
at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
at java.base/java.lang.ProcessImpl.<init>(Unknown Source)
at java.base/java.lang.ProcessImpl.start(Unknown Source)
... 23 more
The command that I'm running is:
flink run --python apps/app_name.py --jobmanager localhost:28081
Donatien Schmitz
08/16/2022, 8:12 AMBumblebee
08/16/2022, 12:53 PMat io.javaoperatorsdk.operator.Operator.start(Operator.java:100)
at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:182)
at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:187)
2022-08-16 125022,257 i.j.o.Operator [INFO ] Operator SDK 3.0.3 is shutting down...
2022-08-16 125022,258 i.j.o.p.e.s.i.InformerManager [INFO ] Stopping informer io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager@507b79f7 -> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper@64a9d48c
2022-08-16 125022,258 i.j.o.p.e.s.i.InformerManager [INFO ] Stopping informer io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager@1616022c -> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper@77a171b6
Exception in thread âmainâ io.javaoperatorsdk.operator.MissingCRDException: âflinkdeployments.flink.apache.orgâ v1 CRD was not found on the cluster, controller âflinkdeploymentcontrollerâ cannot be registered
at io.javaoperatorsdk.operator.processing.Controller.throwMissingCRDException(Controller.java:337)
at io.javaoperatorsdk.operator.processing.Controller.start(Controller.java:309)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at io.javaoperatorsdk.operator.Operator$ControllerManager.start(Operator.java:219)
at io.javaoperatorsdk.operator.Operator.start(Operator.java:100)
at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:182)
at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:187)Hunter Medney
08/16/2022, 1:24 PMPedro Cunha
08/16/2022, 4:32 PMs3-fs-presto
copied to the plugins
folder in order to read paths that have s3
on them.
2. After adding the plugin, I was expecting Flink to read the savepoint no problem, but to my surprise, instead of using the data thatâs on the savepoint I downloaded, it still tries to connect to AWS⌠I donât want to do this, I want to be able to read the savepoint locally.
Is there anyway to workaround point nr 2 other than configure the access to AWS?karthik
08/16/2022, 4:38 PMiLem0n
08/16/2022, 9:41 PMRichCoProcessFunction
. Is there anything like that?Krish Narukulla
08/16/2022, 10:40 PMavro
message format dynamically without specifying schema class? something like below.
CREATE TEMPORARY TABLE XXX (
`total_time_playing` BIGINT,
`device_id` STRING,
`channel_id` STRING,
`occurrence` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'S000011',
'properties.bootstrap.servers' = '<brokers>',
'properties.group.id' = 'flinkapp-test',
'scan.startup.mode' = 'latest-offset',
'format' = 'avro'
)
Z Mario
08/17/2022, 12:57 AMHilmi Al Fatih
08/17/2022, 7:09 AM// TODO: Handle removed partitions.
When I check the master branch, it seems that the todo is still there, so I am wondering is there any plan to resolve this soon? Or is it already implemented?Tiansu Yu
08/17/2022, 12:04 PMmvn compile exec:java -D...
. I noticed a naive standalone cluster started this way does not pick up src/main/resources/log4j.properties
, which basically eats all my`logger.info()` and whatnots. I want to ask if there is anyway I can configure the logging configures for a cluster generated this way?Alexis Josephides (Contractor)
08/17/2022, 1:52 PMFINISHED
thus stopping Checkpoints working across the application.
The error I see in logs is:
Failed to trigger checkpoint for job ba29c0e38410cd31b3a9870b7d384807 since some tasks of job ba29c0e38410cd31b3a9870b7d384807 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running."
I (maybe rather naively) thought that I could do a rebalance()
after the kafka source is added but the issue persists as it breaks the chaining.
There is a single StreamExecutionEnvironment and so code is something like:
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(topic1.getKafkaSource(),etc...).rebalance()
env.fromSource(topic2.getKafkaSource(),etc...).rebalance()
env.addSink(s3Sink1)
env.addSink(s3Sink2)
Iâm wondering if I can set a parallelism on each of these steps for each operator or will it be across the whole datastream?
The load on the topic with 40 partitions will not be sustained if the parallelism is set to 4 across the whole stream.
Do hope this makes sense and thanks in advance for any insightDavid Wisecup
08/17/2022, 2:24 PMGROUP BY CUBE (...)
Haven't figured out so far how to do that so I had to write the SQL as a string then execute it via Table rows = tableEnv.sqlQuery(sqlStr);
Thanks for any help.Aeden Jameson
08/17/2022, 3:51 PMKrish Narukulla
08/17/2022, 6:10 PMCREATE TEMPORARY TABLE XXX (
`total_time_playing` BIGINT,
`device_id` STRING,
`channel_id` STRING,
`occurrence` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'S000011',
'properties.bootstrap.servers' = '<brokers>',
'properties.group.id' = 'flinkapp-test',
'scan.startup.mode' = 'latest-offset',
'format' = 'avro'
)
David Christle
08/18/2022, 1:07 AMSandeep Kathula
08/18/2022, 1:08 AMKafkaSource.<ConsumerRecord>builder()
.setProperties(consumerProperties)
.setStartingOffsets(offsetStrategy(readConfig))
.setTopics(topic)
.setDeserializer(
new KafkaRecordDeserializationSchema<ConsumerRecord>() {
@Override
public TypeInformation<ConsumerRecord> getProducedType() {
return TypeInformation.of(ConsumerRecord.class);
}
@Override
public void open(DeserializationSchema.InitializationContext context)
throws Exception {}
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> consumerRecord,
Collector<ConsumerRecord> collector)
throws IOException {
collector.collect(consumerRecord);
}
})
.build();
}
but because of having Headers within the consumer record, its using kryo as the deserializer and I am seeing very less throughput. If I remove headers while deserialization, I am seeing almost 9X throughput. Can someone help me to know how to deserialize the headers efficiently without using Kryo?Duc Anh Khu
08/18/2022, 8:52 AMCoFlatMapFunction
with value states and trying to write a unit test for it. However, the open
and flat_map
methods are never called, only __init__
is being called:
class MyCoFlatMap(CoFlatMapFunction):
def __init__(self):
print("!! __init__ called")
self.user_id_state = None
self.events_state = None
def open(self, runtime_context: RuntimeContext):
print("!! open called")
user_id_stt_desc = ValueStateDescriptor(
"user_id",
Types.PICKLED_BYTE_ARRAY()
)
events_stt_desc = ListStateDescriptor(
"events",
Types.PICKLED_BYTE_ARRAY()
)
self.user_id_state = runtime_context.get_state(user_id_stt_desc)
self.events_state = runtime_context.get_list_state(events_stt_desc)
def flat_map1(self, value):
print("!! flat_map1 called")
yield value
def flat_map2(self, value):
print("!! flat_map2 called")
yield value
def test_co_flat_map_with_states(self):
self.env.add_python_file("../../../apps")
self.env.set_state_backend(MemoryStateBackend())
ds1 = self.env.from_collection(
[signed_in_event],
type_info=self.type_info
)
ds2 = self.env.from_collection([], type_info=self.type_info)
ds1.connect(ds2).key_by(
key_selector,
key_selector,
key_type=Types.STRING()
).flat_map(MyCoFlatMap(), output_type=self.type_info).add_sink(self.test_sink)
self.env.execute()
results = self.test_sink.get_results(False)
expected = [j_signed_in_event]
self.assert_equals_sorted(expected, results)
I'm using test util from Dian Fu's repo. Any suggestions would be much appreciated. CheersJaya Ananthram
08/18/2022, 11:59 AMJirawech Siwawut
08/18/2022, 2:11 PMLee Wallen
08/18/2022, 5:02 PMjava.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
I found a single entry in stackoverflow with a suggestion that the dependencies used when running locally donât match what is running in the personâs cluster. I verified that the image I made has the 1.15.0 versions of the flink libraries just like the flink appâs dependencies.
Also, I get this exception whether I set the starting offsets or not, so I it isnât related to explicitly setting an offset.
One thing to note - I used the Strimzi Kafka operator to spin up a 3.2.0 Kafka cluster with the broker protocol version set to 3.2. I wouldnât expect that there would be an issue due to the Kafka version but figured I would mention it in case there is a Kafka connector version issue.
Here are my current settings:
final KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers("my-cluster-kafka-bootstrap:9092")
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("incoming-events")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(config.consumer())
.build();
Does anyone recognize this issue, and/or does anyone have any suggestions on where I should look for the source of the issue?Almark Cao
08/19/2022, 7:03 AMflink.version
when saving metadata into hive metastore (TABLE_PARAMS table), i think it's very useful for compatibility check and management purpose. in my case, i can collect hive metadata with the create engine info for metadata management system (datahub e.g.). I see spark and trino have a constant property (presto_version
and spark.sql.create.version
) to identifier the create engine, but flink does not.Sumit Nekar
08/19/2022, 7:52 AMMustafa Akur
08/19/2022, 8:35 AMNithin kharvi
08/19/2022, 11:22 AMRoman Bohdan
08/19/2022, 12:15 PMstate.backend.rocksdb.localdir
if I will put this property, will it save state from rocksdb to directory and will jobmanager get it after starting/restarting?Aeden Jameson
08/19/2022, 7:16 PMchunilal kukreja
08/20/2022, 1:07 PM// env.enableCheckpointing(1000);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(700);
//
// env.setStateBackend(new EmbeddedRocksDBStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("file:///Users/chunikukreja/checkpoint_storage");
checkpointing with rocksdb get enabled..
Any pointer will help me to understand if I am missing something?Shen Zhu
08/21/2022, 5:25 AMCREATE TABLE SourceKafkaTable (
userId STRING,
sessionId STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_topic',
'properties.group.id' = 'test_group_id'
)
And later apply the following query
val table1 = tableEnv.sqlQuery("SELECT COUNT(*) FROM SourceKafkaTable GROUP BY userId)
val table2 = tavleEnv.sqlQuery("SELECT COUNT(*) FROM SourceKafkaTable GROUP BY sessionId)
Then does both table1
and table2
get all the data from SourceKafkaTable
? Or each table only get a part of it?
Thanks for your help!Leo Xiong
08/22/2022, 8:23 AM