Erik Wickstrom
07/14/2022, 11:05 PMCollecting pemja==0.1.4
Downloading pemja-0.1.4.tar.gz (32 kB)
ERROR: Command errored out with exit status 255:
command: /usr/local/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-6banf8rj/pemja/setup.py'"'"'; __file__='"'"'/tmp/pip-install-6banf8rj/pemja/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-0n4kuxu1
cwd: /tmp/pip-install-6banf8rj/pemja/
Complete output (1 lines):
Include folder should be at '/usr/local/openjdk-11/include' but doesn't exist. Please check you've installed the JDK properly.
----------------------------------------
ERROR: Command errored out with exit status 255: python setup.py egg_info Check the logs for full command output.
Hilmi Al Fatih
07/15/2022, 2:27 AMflink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_consumed_rate
(flink metrics)
• flink_taskmanager_job_task_operator_KafkaProducer_record_send_rate
(flink metrics)
• irate(kafka_server_brokertopicmetrics_messagesin_topic_total{topic=~"source-topic"}[5m])
(kafka broker metrics)
• irate(kafka_server_brokertopicmetrics_messagesin_topic_total{topic=~"sink-topic"}[5m])
(kafka broker metrics)
With AT_LEAST_ONCE, the metrics seems working well as follows.
• 1 pic (flink metrics)
• 2 pic (kafka broker metrics - left: total, right: rate)
With EXACTLY_ONCE, the producer metrics is halved.
• 3 pic (flink metrics)
• 4 pic (kafka broker metrics - left: total, right: rate)
for exactly once, I was afraid if there is some possibility of dataloss, but no dataloss seems to happen (pic 5, left: source topic, right: sink topic). So I am wondering where did I do wrong.Fred Wu
07/15/2022, 3:09 AMtega
07/15/2022, 7:57 AMRoman Bohdan
07/15/2022, 9:56 AMPrasanth Kothuri
07/15/2022, 10:03 AMimport org.apache.logging.log4j.scala.Logging
logger.debug("aggregations - start")
however when I run this on kubernetes flink deployment, task manager pod logs doesn't show these messages?
any ideas?Roman Bohdan
07/15/2022, 10:24 AMLiang
07/15/2022, 5:17 PMCheckpoints have a few differences from savepoints. They
• use a state backend specific (low-level) data format, may be incremental.
• do not support Flink specific features like rescaling.However, in 1.15 and later docs Checkpoints vs. Savepoints, in the table, rescaling was supported by both checkpoints and savepoints. Can someone please clarify which statement is correct? Or does that mean in version 1.15 and beyond, the checkpoint can start to support rescaling? Thanks!
Mehul Batra
07/16/2022, 6:31 PMEcho Lee
07/18/2022, 12:27 AM// new source api
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("benchmark")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, Object>() {
private int count = 0;
private long lastTime = System.currentTimeMillis();
@Override
public Object map(String value) throws Exception {
count++;
if (count % 100000 == 0) {
long currentTime = System.currentTimeMillis();
System.out.println(currentTime - lastTime);
lastTime = currentTime;
}
return null;
}
});
// old source api
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
new SimpleStringSchema(), properties);
flinkKafkaConsumer.setStartFromEarliest();
env.addSource(flinkKafkaConsumer)
.map(new MapFunction<String, Object>() {
private int count = 0;
private long lastTime = System.currentTimeMillis();
@Override
public Object map(String value) throws Exception {
count++;
if (count % 100000 == 0) {
long currentTime = System.currentTimeMillis();
System.out.println(currentTime - lastTime);
lastTime = currentTime;
}
return null;
}
});
haim ari
07/18/2022, 9:23 AMspec:
image: flink:1.14.3
flinkVersion: v1_14
ingress:
template: "<http://flink.k8s.io/{{namespace}}/{{name}}(/%7C$)(.*)|flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)>"
className: "nginx"
annotations:
<http://nginx.ingress.kubernetes.io/rewrite-target|nginx.ingress.kubernetes.io/rewrite-target>: "/$2"
however no ingress is created, also no events shown in namespaces or errors in operator logs.
I tried to remove both the operator and the flink app and recreate them, but ingress still not created.
Can someone advise on this ?Felix Angell
07/18/2022, 9:31 AMFlinkKafkaConsumer
. This is odd to me since afaik we cannot use SASL_SCRAM authentication to connect to our Kafka cluster with such an old version of the kafka client api.
Raw link: https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/_modules/pyflink/datastream/connectors.htmlsalvalcantara
07/18/2022, 11:12 AMIf you want to develop a connector that needs to bridge with DataStream APIs (i.e. if you want to adapt a DataStream connector to the Table API), you need to add this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
Then, the docs don't mention this library anymore / provide any further examples. Can someone point me to actual examples using flink-table-api-java-bridge
for bridging existing DataStream API based connector for SQL purposes?salvalcantara
07/18/2022, 11:15 AMSource<A>
(Sink<A>
), what would be the simplest way of obtaining a Source<B>
(Sink<B>
) based on a mapping/conversion function f:A->B
(f:B->A
)?Bastien DINE
07/18/2022, 1:48 PMTASK switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka test-1@-1 with FlinkKafkaInternalProducer{transactionalId='test-=false} because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.
To avoid data loss, the application will restart.
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
Our broker version is pretty old (1.1.1) (yet kafka-9310 has "affect version" = 2.4.0 & resolved = 2.4.1)
But, this was working well with the old producer API,
Is there a workaround with this bug ? Sadly, i can not upgrade my kafka broker right now,jiangchao qian
07/19/2022, 4:12 AMHaim Ari
07/19/2022, 7:00 AMMárk Bartos
07/19/2022, 8:30 AMPrasanth Kothuri
07/19/2022, 10:30 AMZhiqiang Du
07/19/2022, 11:50 AMCustomProcessAllWindowFunction
extract first and last element into a tuple2
CountEvictor
retain the last element of current window after do function.
Expected Result: (1, 4)(4, 8)(8,10)
Actual Result: (1, 4)(5, 8)(9,10)
env.addSource(new MockSource())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.evictor(CountEvictor.of(1, true))
.process(new CustomProcessAllWindowFunction())
.addSink(new PrintSinkFunction<Tuple2<Long, Long>>())
env.execute()
So did I misunderstand the function of Evictor? How can i get the expected output ?
And if i override the default trigger of tumbling window. I got expected output.
env.addSource(new MockSource())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.trigger(CountTrigger.of(5))
.evictor(CountEvictor.of(1, true))
.process(new CustomProcessAllWindowFunction())
.addSink(new PrintSinkFunction<Tuple2<Long, Long>>())
env.execute()
//output
Emit: 1
Emit: 2
Emit: 3
Emit: 4
Emit: 5
(1,5)
Emit: 6
Emit: 7
Emit: 8
Emit: 9
Emit: 10
(5,10)
Slackbot
07/19/2022, 11:55 AMFelix Angell
07/19/2022, 2:44 PMPython module with the program entry point. This option must be used in conjunction withIs this related to loading a python module directly as a program vs just specifying the files of a pyflink program (using python modules) via pyFiles?.--pyFiles
Beny Chernyak
07/19/2022, 3:04 PM<https://github.com/sjwiesman/flink/blob/83a6400e2587b067d08a64bc7e10edd4b57e71b4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/consistent/EventuallyConsistentBucketingSink.java>
to something new, so I use the FileSink:
return FileSink.forRowFormat(
new Path(outputBasePath), new Encoder<T>() {
@Override
public void encode(T record, OutputStream stream)
throws IOException {
GzipParameters params = new GzipParameters();
params.setCompressionLevel(Deflater.BEST_COMPRESSION);
GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
OBJECT_MAPPER.writeValue(out, record);
out.finish();
}
})
.withBucketAssigner(new BasePathBucketAssigner<>())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
where outputBasePath is some s3:// URI.
I got this error:
Exception in thread "main" java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:215)
at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
at org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo.toString(CommittableMessageTypeInfo.java:120)
Could someone please explain what I do wrong?Jérôme Viveret
07/19/2022, 4:51 PMXinbin Huang
07/19/2022, 6:06 PMBhupendra Yadav
07/20/2022, 8:01 AMGeorge Chen
07/21/2022, 12:00 AMpublic class KinesisEMFJsonDeserializationSchema extends AbstractDeserializationSchema<...> {
private final ObjectMapper objectMapper;
....
@Override
public RootNode deserialize(final byte[] input) {
try {
return objectMapper.readValue(input, RootNode.class);
} catch (final IOException e) {
...
}
}
}
This DeserializationSchema is complained by Flink to be not serializable as object Mapper is not serializable I guess?George Chen
07/21/2022, 12:16 AMCaused by: java.lang.NoClassDefFoundError: org/apache/avro/generic/GenericData$Array
at org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(AvroKryoSerializerUtils.java:68)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:591)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:138)
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:91)
Is there anything I might miss in flinkShadowJar dependency?jiangchao qian
07/21/2022, 2:02 AMJérôme Viveret
07/21/2022, 3:04 PM