raghav tandon
04/23/2023, 7:18 AMDataStream[Event] -> Window -> ProcessWindowFunction -> Datastream[Event2] -> Kafka Sinkout.collect()val userIds = events
      .keyBy(e => e.client)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .trigger(
        PurgingTrigger.of(ProcessingTimeTrigger.create())
      ) 
      .process(userIdFetcher)
      .setParallelism(1)
      .uid(userIdFetcher.getClass().getSimpleName())final class UserIdFetcher(config: UserIdFetcherConfig, failedOutputTag: OutputTag[ReadErrorEvent])
  extends ProcessWindowFunction[WriteSuccessEvent, Diff, String, TimeWindow]
    with Serializable{
  
  elements.groupBy[String](update => update.command.id.get).foreach {
      case (id, updates) =>
       
        retryableProcess(
          id,
          context,
          MongoBulkQuery(
            key,
            "User",
            updates.map(update => update.command.body.identifiers).toList,
            List(UserFields.Id)
          ),
          out
        )
    }
  
  def retryableProcess(
                        id: String,
                        context: Context,
                        query: MongoBulkQuery,
                        out: Collector[Diff],
                        attemptNumber: Int = 0
                      ): Unit = {
    
    mongoClient.client(query.client, query.collection).findUserIDs(query).onComplete {
      case Success(values) =>
       
        values
          .map(user => user.getObjectId(MongoFields.Id).toHexString())
          .grouped(config.maxBatchSize)
          .foreach(userIdBatch => {
            val diff = Diff(query.client, id, userIdBatch.toList)
            out.collect(diff)
          })
    }
  }
}
final case class Diff(
  @JsonProperty("dbName") client: String,
  @JsonProperty("id") collection: String,
  @JsonProperty("add") add: List[String]
)idMali
04/23/2023, 9:09 AM�������`�[{"testId"... |Slackbot
04/24/2023, 6:04 AMAmir Hossein Sharifzadeh
04/24/2023, 2:00 PMSlackbot
04/24/2023, 4:27 PMZachary Schmerber
04/24/2023, 5:44 PM// log source data from above Kafka Topic using JsonDeserializationSchema//
        DataStream<ObjectNode> filteredStream = env.fromSource(logSource, WatermarkStrategy.noWatermarks(), "Kafka winlog Source");
        DataStream<ObjectNode> winObjectStream = filteredStream.filter(node -> node.get("service_id").asText().equals("cdd31c3f-3b24-4526-848a-xxxxxxxxxxx"));
   //Placeholder for logically mapping the new Object Stream 
// Datastream that has sucessfully been transformed into Objects 
        winObjectStream.sinkTo(winEventSink);Xavier
04/24/2023, 6:31 PMJeesmon Jacob
04/24/2023, 6:49 PMslotmanager.redundant-taskmanager-num: "1"Nathanael England
04/24/2023, 7:50 PM-pyFilesNathanael England
04/24/2023, 8:27 PM.soERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/cpp_message.py
ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/_message.cpython-38-x86_64-linux-gnu.so
ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/__init__.pyImportError: cannot import name '_message' from 'google.protobuf.pyext' (/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/8541a21e-7834-425d-9a53-63350409a2a5/.deps/google/protobuf/pyext/__init__.py)Matthew Kerian
04/25/2023, 12:10 AMAmenreet Singh Sodhi
04/25/2023, 4:48 AM023-04-23 18:13:29,093 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Caught exception
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
	at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
	at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) ~[?:?]
	at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [event_executor-1.0-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [event_executor-1.0-SNAPSHOT.jar:?]
2023-04-23 18:13:06,279 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Error while responding to the http request.
java.text.ParseException: Unparseable date: "${jndi:<ldap://127.0.0.1#log4shell-generic-dXyfRkM647dI4bcPETOo.w.nessus.org/nessus>}"
	at java.text.DateFormat.parse(DateFormat.java:395) ~[?:?]
	at org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler.respondToRequest(StaticFileServerHandler.java:210) ~[event_executor-1.0-SNAPSHOT.jar:?]
	at ....Gaurav Gupta
04/25/2023, 8:16 AMKrzysztof Chmielewski
04/25/2023, 12:34 PMCaused by: java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.(com.google.common.util.concurrent.ListeningExecutorService, int, boolean)' the class loader org.apache.flink.util.ChildFirstClassLoader @2486925f of the current class, org/apache/hadoop/fs/s3a/S3AFileSystem, and the class loader 'app' for the method's defining class, org/apache/hadoop/util/SemaphoredDelegatingExecutor, have different Class objects for the type com/google/common/util/concurrent/ListeningExecutorService used in the signature (org.apache.hadoop.fs.s3a.S3AFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2486925f, parent loader 'app'; org.apache.hadoop.util.SemaphoredDelegatingExecutor is in unnamed module of loader 'app')
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
	at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299)Viktor Hrtanek
04/25/2023, 12:36 PMAurora Nockert
04/25/2023, 12:41 PMWITH('properties.sasl.jaas.config'='…')-DAurora Nockert
04/25/2023, 12:42 PMSergey Postument
04/25/2023, 1:35 PMKafkaSinkCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6520160 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.Mali
04/25/2023, 2:17 PM[{"abc":"123", "cde":"456"},{"efg":"789", "abc":"123"}]Johannes Cox
04/25/2023, 2:51 PMCREATE TABLEjava.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
	at org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows.assignWindows(SlidingEventTimeWindows.java:80) ~[flink-streaming-java-1.16.1.jar:1.16.1]
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:284) ~[flink-streaming-java-1.16.1.jar:1.16.1]
...SlidingEventTimeWindowsintervalJoinChristophe Bornet
04/25/2023, 4:57 PMTrystan
04/25/2023, 4:58 PMNathanael England
04/25/2023, 5:31 PMNodePortNodePortAmir Hossein Sharifzadeh
04/25/2023, 6:30 PMTable raw_table = tableEnv.sqlQuery(data_query);DataStream<Row> join_stream = tableEnv.toDataStream(raw_table);SingleOutputStreamOperator<List<double[][][]>> outputStreamOperator = join_stream.process(new EMPADProcessor(commands, taskENT.getTaskId()));public void processElement(Row row, ProcessFunction<Row, List<double[][][]>>.Context context, Collector<List<double[][][]>> collector) throws Exception {processElementRion Williams
04/25/2023, 8:06 PMkafka_consumergroup_lagkafka_consumergroup_lag{topic="topic-for-flink"} > $someThresholdkingsathurthi
04/26/2023, 5:42 AMWarning [IST0117] (Deployment f0/flink-kubernetes-operator) No service associated with this deployment. Service mesh deployments must be associated with a service.
Info [IST0118] (Service f0/pod1-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod1-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod2-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
Info [IST0118] (Service f0/pod2-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.Kevin T
04/26/2023, 5:59 AM<https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/>mvn clean package -DskipTests[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile (scala-test-compile) on project flink-scala_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 240 (Exit value: 240) -> [Help 1]chankyeong won
04/26/2023, 7:29 AMMySinkMarcus Baule
04/26/2023, 9:00 AMdeserialization_schema = AvroRowDeserializationSchema(
        avro_schema_string=schema_str_source
    )org.apache.flink.avro.shaded.org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1Slackbot
04/26/2023, 9:45 AM