Ranjeet Ranjan
06/07/2022, 1:02 PMJeesmon Jacob
06/07/2022, 1:09 PM- apiGroups:
- ""
resources:
- pods
- services
- endpoints
- persistentvolumeclaims
- events
- configmaps
- secrets
- nodes
verbs:
- '*'
Where do we use nodes
resource in operator? Is that permission really needed? Or can we reduce the verbs for nodes
to just get/list/watch?Gyula Fóra
06/07/2022, 1:16 PMVeeramani Moorthy
06/07/2022, 1:22 PMJeesmon Jacob
06/07/2022, 7:32 PMXinbin Huang
06/07/2022, 9:10 PMRecordEmitter
. Do you have some tips on optimizing latency?
• would moving the deserialization to SplitReader.fetch
better?
• p99 time from source received the events to emitting the event is roughly the polling timeout (similar to kafka/pulsar) set in SplitReader.fetch
+1s. Is there some tips on further reduce the fixed portion (i.e. +1s)?Zain Haider Nemati
06/08/2022, 12:20 PMJeesmon Jacob
06/08/2022, 1:27 PM- apiGroups:
- flink-operator
resources:
- "*"
verbs:
- "*"
https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml#L24-L29
But doesn't look like we have CRDs with that group. So wanted to check if it is carry over from an old version.Eric
06/09/2022, 12:29 AMEric
06/09/2022, 12:31 AMIldar Almakaev
06/09/2022, 8:40 AMJohn Gorman
06/09/2022, 9:01 AMKlaas Schmidt
06/09/2022, 11:04 AMEmily Morgan
06/09/2022, 11:22 AMVeeramani Moorthy
06/09/2022, 1:00 PMHunter Medney
06/09/2022, 4:08 PMstreamA
and streamB
• we have a Flink job that maintains a running aggregate of streamA
called aggA
, which reduces streamA
to a set of key/value pairs.
• we need to enrich streamB
with the latest aggregate values in aggA
• additional jobs will need to perform similar enrichments on streams using aggA
key/values
My initial thought would be to store the constantly-updating keyed data from aggA
in Redis so that all the enrichment jobs can simply retrieve the latest aggregated value for aggA
as it processes each record in streams like streamB
. My high-level understanding of Redis is it propagates the latest key values in-memory for all Redis clients, so each Flink operation in each slot would always be working with the latest aggA
values from Redis.
Does this make sense? I haven't come across this kind of implementation yet associated with Flink, so I have a feeling I'm missing something 🙂Sahil Aulakh
06/09/2022, 9:46 PMSucheth Shivakumar
06/10/2022, 5:02 AMCaused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
Sucheth Shivakumar
06/10/2022, 5:02 AMclass KafkaMessageDeserializationSchema(topic: String, schemaUrl: String) extends AbstractDeserializationSchema[GenericRecord] {
@transient private var inner: KafkaAvroDeserializer = _
override def deserialize(message: Array[Byte]): GenericRecord = {
checkInitialized()
inner.deserialize(topic, message).asInstanceOf[GenericRecord]
}
override def open(context: DeserializationSchema.InitializationContext): Unit = super.open(context)
override def isEndOfStream(nextElement: GenericRecord): Boolean = {
false
}
override def getProducedType: TypeInformation[GenericRecord] = TypeExtractor.getForClass(classOf[GenericRecord])
private def checkInitialized(): Unit = {
if (inner == null) {
val props = Map(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> false
)
val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
inner = new KafkaAvroDeserializer(client, mapAsJavaMap(props))
}
}
Sucheth Shivakumar
06/10/2022, 5:02 AMUday Sharma
06/10/2022, 2:18 PMpip install apache-flink
. Getting below errors related to pemja, JDK, VS.
pyexceptions.c
C:\Software\jdk-11.0.15\include\jni.h(45): fatal error C1083: Cannot open include file: 'jni_md.h': No such file or directory
error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\BuildTools\\VC\\Tools\\MSVC\\14.16.27023\\bin\\HostX86\\x64\\cl.exe' failed with exit status 2
[end of output]
note: This error originates from a subprocess, and is likely not a problem with pip.
error: legacy-install-failure
× Encountered error while trying to install package.
╰─> pemja
note: This is an issue with the package mentioned above, not pip.
hint: See above for output from the failure.
Ildar Almakaev
06/10/2022, 8:57 PMDataStream<User> userStream
and DataStream<Log> logsStream.
Both streams are repartitioned by the userId
column using keyBy()
.
Finally I just need to enrich log events with users data.
AFAIK, I could do that using connect
and process
:
DataStream<EnrichedLogs> enrichedStream = clickStream
.connect(usersStream)
.process(new ProcessingTimeJoin());
The ProcessingTimeJoin
is the CoProcessFunction<Log, User, EnrichedLogs>
with a state ValueState<User> userState
Please take a look at the code snippet:
public class ProcessingTimeJoin extends CoProcessFunction<Log, User, EnrichedLog> {
// Store latest reference data
private ValueState<User> userState;
@Override
public void open(Configuration config) {
ValueStateDescriptor<User> cDescriptor = new ValueStateDescriptor<>("data", TypeInformation.of(User.class));
userState = getRuntimeContext().getState(cDescriptor);
}
@Override
public void processElement1(Log logEvent,
Context context,
Collector<EnrichedLog> out) throws Exception {
User user = userState.value();
if (Objects.nonNull(user)) {
EnrichedLog enrichedLog = Mapper.mapToEnrichedLog(logEvent, user);
out.collect(enrichedLog);
} else {
log.warn("Can not enrich {}", logAudit);
}
}
@Override
public void processElement2(User user, Context context, Collector<EnrichedLog> collector) throws Exception {
userState.update(user);
}
}
Question :)
AFAICS, there might be cases in processElement1
function when the state store might not have such user data in the store.
Is there any way to fill the state before the enrichment (joining) the streams? I mean, consume all available users data from users
Kafka topic, prepare the state with users data and only after that start consuming events of logs and enriching them?Sucheth Shivakumar
06/10/2022, 9:34 PMI see ConfluentRegistryAvroSerializationSchema.forGeneric() expects schema to be provided, I do not want to store schema I just want to make use of the same schema that have read from the source kafka topic which is already present as part of the genericrecord
Veeramani Moorthy
06/11/2022, 5:35 AMSucheth Shivakumar
06/11/2022, 6:09 AMSucheth Shivakumar
06/11/2022, 4:58 PMHow to implement flink kafka streaming with avro serialized topic with TopicRecordNameStrategy(same topic can contain multiple event types)
I have to implement flink kafka streaming where kafka topic can have multiple event types with TopicRecordNameStrategy I want to setup a generic record streaming, since single topic can have multiple schema how to implement this case in flink streaming ?
Bharath
06/12/2022, 1:01 AMJaya Ananthram
06/12/2022, 4:01 PMJeff Levesque
06/13/2022, 1:20 AMsliding_window.py
(i.e. previous example) to the next level. I successfully tested the following modification (which also required some adjustments to create_print_table()
):
input_table = table_env.from_path(input_table_name)
sliding_window_table = (
input_table.window(
Slide.over(sliding_window_over)
.every(sliding_window_every)
.on(sliding_window_on)
.alias(sliding_window_alias)
)
.group_by('ticker, {}'.format(sliding_window_alias))
.select('ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as utc_start, {0}.end as utc_end'.format(
sliding_window_alias
))
)
Since the above worked, I tried to slightly modify as follows:
input_table = table_env.from_path(input_table_name)
sliding_window_table = (
input_table.window(
Slide.over(sliding_window_over)
.every(sliding_window_every)
.on(sliding_window_on)
.alias(sliding_window_alias)
)
.group_by('ticker, {}'.format(sliding_window_alias))
.select('FROM_UNIXTIME(28*60 * (UNIX_TIMESTAMP({0}.end) / (28*60))), ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as utc_start, {0}.end as utc_end'.format(
sliding_window_alias
))
)
However, when I run the above, I get the following error:
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o75.select.
: org.apache.flink.table.api.ValidationException: Undefined function: FROM_UNIXTIME
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53)
at java.base/java.util.Optional.orElseThrow(Optional.java:408)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35)
at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
at org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606)
at org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Uday Sharma
06/13/2022, 8:11 AM