Rohan Kumar
04/03/2023, 6:36 AMWarning Unhealthy 51s (x139 over 35m) kubelet Liveness probe errored: rpc error: code = Unknown desc = deadline exceeded ("DeadlineExceeded"): context deadline exceeded
. Other pods with the same liveness probe restart when they fail. How to make it work?Gaurav Gupta
04/03/2023, 8:40 AMuid(String)
are unique.`
Wanted to check if its allowed to create multiple StatefulFunctionEgressStreams in a single flink main function and chain them using Datastreams?Slackbot
04/03/2023, 11:23 AMAditya
04/03/2023, 12:28 PMAbolfazl Ghahremani
04/03/2023, 1:35 PMFelix Angell
04/03/2023, 2:39 PMenv.add_source(
kafka_consumer,
...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
Abolfazl Ghahremani
04/03/2023, 3:03 PMAbolfazl Ghahremani
04/03/2023, 3:43 PMDemid P
04/03/2023, 3:52 PMMatthew Kerian
04/03/2023, 5:42 PMJeesmon Jacob
04/03/2023, 6:44 PMExperimental
since v1.0 and still has Experimental
tag in latest version v1.4. But wondering if anyone using it in production and ran into any issues or it is a pretty stable feature 🙂 Thanks!Brian Lehman
04/03/2023, 8:14 PMAli Zia
04/03/2023, 9:30 PMSumit Nekar
04/04/2023, 2:59 AMGuruguha Marur Sreenivasa
04/04/2023, 4:20 AMkingsathurthi
04/04/2023, 4:41 AMEvents:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 2m22s default-scheduler Successfully assigned flink/kmgjobmanager-64d7777b9c-5b8bw to aks-nodepool1-12868363-vmss00001c
Warning FailedMount 38s (x7 over 2m12s) kubelet MountVolume.MountDevice failed for volume "pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8" : rpc error: code = Internal desc = volume(mc_cns-ba-mni-dev-westeurope-rg_e2e-common-qa-01_westeurope#f5bc870483c1d49f186a69c#pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8###flink) mount //f5bc870483c1d49f186a69c.file.core.windows.net/pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8 on /var/lib/kubelet/plugins/kubernetes.io/csi/file.csi.azure.com/4ecfe2ad9d43dfe5ae52e1eb2bc8bbc22724aeaa08fff4469ecc4306c919fafa/globalmount failed with mount failed: exit status 32
Mounting command: mount
Mounting arguments: -t cifs -o mfsymlinks,actimeo=30,nosharesock,file_mode=0777,dir_mode=0777,<masked> //f5bc870483c1d49f186a69c.file.core.windows.net/pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8 /var/lib/kubelet/plugins/kubernetes.io/csi/file.csi.azure.com/4ecfe2ad9d43dfe5ae52e1eb2bc8bbc22724aeaa08fff4469ecc4306c919fafa/globalmount
Output: mount error(2): No such file or directory
Refer to the mount.cifs(8) manual page (e.g. man mount.cifs) and kernel log messages (dmesg)
Warning FailedMount 19s kubelet Unable to attach or mount volumes: unmounted volumes=[flink-data], unattached volumes=[flink-config-volume pod-template-volume external-libs kube-api-access-d7n2d logs flink-data]: timed out waiting for the condition
What could be the reason hereAdesh Dsilva
04/04/2023, 10:03 AMrebalance
after the kafka consumer it started throwing null pointer exceptions.
I think thats because the default values that come as null are serialized and deserialized again (not sure) and this time avro doesnt like it (since this time key exists but value is null)
The real problem however is that I get this null pointer exception only in my local system whereas it works fine on EMR with real Kafka (MSK)
Any idea why I am seeing this inconsistency?Miguel Ángel Fernández Fernández
04/04/2023, 1:49 PMIvan Burmistrov
04/04/2023, 3:14 PMSELECT * FROM myKafkaStream WHERE col1 = "a"
UNION ALL
SELECT * FROM myKafkaStream WHERE col1 = "b"
Is it a valid usage? I was under impression that it's not valid because it would effectively move the Kafka pointer twice. However tried it recently and it seems working, so I'm confused a bitDheeraj Panangat
04/04/2023, 5:34 PMCaused by: java.lang.ClassCastException: class org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be cast to class org.apache.flink.table.data.columnar.vector.LongColumnVector (org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector and org.apache.flink.table.data.columnar.vector.LongColumnVector are in unnamed module of loader 'app')
at org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getLong(VectorizedColumnBatch.java:88)
at org.apache.flink.table.data.columnar.ColumnarRowData.getLong(ColumnarRowData.java:107)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:249)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
Varun Sayal
04/04/2023, 5:52 PMVarun Sayal
04/04/2023, 5:53 PMRadu Stoian
04/04/2023, 6:30 PMTony Piazza
04/04/2023, 10:39 PMjava.io.IOException: Failed to deserialize consumer record
Full stack trace attached. Please let me know if you have any advice on what we might be doing wrong.Suparn Lele
04/05/2023, 8:12 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
So this committedOffsets are taken from KafkaBroker or ZK directly?Rashmin Patel
04/05/2023, 8:59 AMContinuousProcessingTimeTrigger
goes into infinite loop (register+fire).
Upon further debugging, it seems that this PR ( 👉 https://github.com/apache/flink/pull/17106/files) is causing the problem.
Full Description:
We are hitting this issue in time windowing operator.
stream.
.windowAll(TumblingEventTimeWindowAssigner.of(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.allowedLateness(Time.days(1))
Let's say today is day T
, then my windowState will also contain pane of T-1
to `T`th day as well (as per allowedLateness = 1 day)
As per ContinuousProcessingTimeTrigger.registerNextFireTimestamp(. . .)
in 1.16.0,
Now for window pane ofCopy codelong nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
T-1
to `T`th day
window.maxTimestamp()
will be T
and time + interval
will be some T + x
(where T + x < T + 1)
So min of both expressions will always be T
, that will become timer.getTimestamp().
Now in InternalTimerServiceImpl.onProcessingTime(long time)
```while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
// triggerTarget.onProcessingTime(timer);
}```timer.getTimestamp() =
T
time = T + x
So this will never come out of the loop !!!
Can someone help me with this on how to proceed further ?Richard Noble
04/05/2023, 11:22 AMorg.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule.
Vincent Chee
07/20/2023, 7:56 AMpackage github.jhchee.functions;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.concurrent.Executors;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
//import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class DelayAsyncFunction extends RichAsyncFunction<Integer, String> {
private transient Executor executor;
@Override
public void open(Configuration parameters) {
<http://log.info|log.info>("Initializing executor");
executor = Executors.directExecutor();
// executor = Executors.newFixedThreadPool(10); // this works
}
@Override
public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) {
CompletableFuture.supplyAsync(() -> performAsyncOperation(input), executor)
.thenAccept(result -> resultFuture.complete(Collections.singletonList(result)));
}
@Override
public void timeout(Integer input, ResultFuture<String> resultFuture) {
String message = String.format("Failed to delay for %d seconds", input);
resultFuture.complete(Collections.singletonList(message));
}
private String performAsyncOperation(Integer input) {
try {
TimeUnit.SECONDS.sleep(input);
} catch (InterruptedException ignored) {
}
// Perform the asynchronous operation and return the result
return String.format("Delay for %d seconds", input);
}
@Override
public void close() {
((ExecutorService) executor).shutdown();
}
}
amarjeet pasrija
07/20/2023, 1:39 PM