Jirawech Siwawut
11/19/2022, 3:11 PMupsert-kafka
from Kafka topic written by Spark?
I use Flink SQL to read with upsert-kafka
and get no result when trying to print table stream.
Note: If i sink to upsert-kafka
with Flink and read it back. It seems to work fine. I just wonder how it actually works under the hood and would it be possible to sink data with Spark and read it back with Flink upsert-kafkadino bin
11/21/2022, 6:03 AMding bei
11/21/2022, 7:14 AMwuyi yang
11/21/2022, 8:37 AMMaher Turifi
11/21/2022, 11:10 AMFelix Angell
11/21/2022, 3:28 PMJeremy DeGroot
11/21/2022, 4:01 PMJeremy Ber
11/21/2022, 4:02 PMPadraic McAtee
11/21/2022, 4:09 PMJoris Basiglio
11/21/2022, 4:54 PMKrish Narukulla
11/21/2022, 7:39 PMCaused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2638) ~[?:?]
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3341) ~[?:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3373) ~[?:?]
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[?:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[?:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[?:?]
hive-site.xml is
<configuration>
<property>
<name>hive.metastore.uris</name>
<value><thrift://xxx:9083></value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
<description>
The AbstractFileSystem for gs: (GCS) uris. Only necessary for use with Hadoop 2.
</description>
</property>
</configuration>
Thiruvenkadesh Someswaran
11/21/2022, 10:24 PMWarning FailedMount 89s (x21 over 28m) kubelet MountVolume.SetUp failed for volume "flink-volume" : hostPath type check failed: /tmp/flink is not a directory
Chris Ro
11/22/2022, 1:15 AMAsyncDataStream.orderedWait
with a capacity
(let’s say 50). my asyncInvoke
method is making an HTTP call to look up some data for enrichment and it’s doing this by spawning a background thread that, when it completes, uses resultFuture.complete
. if my async operator is processing all 50 records simultaneously, does the idleTimeMsPerSecond
show as 100%, or 0%? this blog post seems to suggest custom threads (noted that they’re discouraged) don’t get counted correct for the idle and busy time metrics.박담
11/22/2022, 2:45 AMEmmanuel Leroy
11/22/2022, 3:49 AMkubernetes.operator.periodic.savepoint.interval: 30m
kubernetes.operator.savepoint.history.max.age: 24h
kubernetes.operator.savepoint.history.max.count: "25"
yet my savepoints don’t get deleted and I have hundreds of them… any idea why and how to troubleshoot?Buddhike de Silva
11/22/2022, 6:26 AMRichFlatMapFunction
class seem to get serialised. You can prevent this behaviour by adding private transient
to those fields.
What is the purpose of this serialisation? When is it appropriate to use private transient
and when is it not?
Thank you so much in advance for shedding some light here 🙏🏾Suparn Lele
11/22/2022, 6:30 AMJeremy DeGroot
11/22/2022, 10:51 AMchunilal kukreja
11/22/2022, 11:32 AMAsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
Rather it should be;
AsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
If one follows the document snippet, it fails to recognise “retryIfResult” & “' retryIfException”.
Can someone pls confirm, is this observation correct?Aishwarya Raimule
11/22/2022, 8:34 PM박담
11/23/2022, 4:49 AMGaurav Miglani
11/23/2022, 7:32 AM00000000000000000000000000000000
as all our jobs use last-state upgrade mode, but in flink 1.16 we are getting some other job ids like ffffffffd85f74d40000000000000000
, but on every run, it is same for a job, can anyone let me know logic for it or is it random uuid 🤔박담
11/23/2022, 9:12 AMexample data)
time=1669082219148, trans_time=2022-1-11-22T10:56:59, flink_time=20221122105659148
time=1669082219148, trans_time=2022-1-11-22T10:56:59, flink_time=20221122105659148
time=1669082220148, trans_time=2022-111-22T10:57:00, flink_time=20221122105700148
The watertmark was created as follows by using flink_time (ymdhms.ms) or time (unix time) of the above data.
WatermarkStrategy st =WatermarkStrategy
.SystemData>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((event, timestamp) -> event.getFlink_time());
I am trying to handle event time using TumblingEventTimeWindows.
DataStream<SystemResultDto> aggStream=mainStream
.keyBy((dto)-> {
SystemData cdto=((SystemData)dto);
return cdto.getTxn_id();
})
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new CustomProcess())
However, if it is processed based on the actual event time, it does not work.
So, if I process it using process time, it is being processed.
DataStream<SystemResultDto> aggStream=mainStream
.keyBy((dto)-> {
SystemData cdto=((SystemData)dto);
return cdto.getTxn_id();
}) .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new CustomProcess());
Vishal bharatbhai Vanpariya
11/23/2022, 11:42 AM{
"ts": 1669099696691,
"uuid": "fa8dfe26114c49acb9745ee8ac9bbfdb",
"count": 95
}
type of this stream is DataStream<String>
, but i want to convert it to keyedstream KeyedStream<T, K>
where key will be count and data will be same json string.
without applying and aggregation. can somebody guide me on this?
thanksThiruvenkadesh Someswaran
11/23/2022, 6:05 PMkubectl port-forward svc/basic-checkpoint-ha-example-rest 8081:8081 -n flink
E1123 12:56:37.974939 96110 portforward.go:406] an error occurred forwarding 8081 -> 8081: error forwarding port 8081 to pod e33822576ad4f60ddbd13d462e29f6641c6cd07fd606136cd2613e020432b5ee, uid : exit status
I have the HA and regular examples runningraghav tandon
11/24/2022, 6:51 AMfixed-strategy
, after that it is cleaning up state in Zk as well…
How can i avoid that from happening? Due to this i am loosing the state of job and checkpoints as wellVishal bharatbhai Vanpariya
11/24/2022, 7:03 AMlicho
11/24/2022, 7:18 AMTO_TIMESTAMP(JSON_VALUE( data , '$.dt' ))
dt is generated by LocalDateTime.now().toString()
, can't it as a lateral key? flink version 1.16licho
11/24/2022, 9:46 AMSumit Khaitan
11/24/2022, 10:09 AM