Maher Turifi
10/25/2022, 11:35 AMhttps://apache-flink.slack.com/files/U045154GZEW/F048CKFN5R7/screenshot_from_2022-10-25_14-56-06.png▾
Thiruvenkadesh Someswaran
10/25/2022, 7:32 PMo.a.f.k.o.r.ReconciliationUtils [ERROR][default/basic-example] Validation failed: Forbidden Flink config key: kubernetes.cluster-id
Erwin Cabral
10/25/2022, 11:10 PMJirawech Siwawut
10/26/2022, 5:18 AMAqib Mehmood
10/26/2022, 7:24 AMWITH CTE AS (
SELECT a.sku, a.name, a.updatedAt, b.price FROM (
SELECT sku, name, max(updatedAt) AS updatedAt from (
SELECT sku, name, updatedAt FROM wms.PurchaseOrderProduct
WHERE CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s')) not in (
SELECT CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s')) FROM (
SELECT sku, max(updatedAt) as updatedAt from wms.PurchaseOrderProduct
GROUP BY sku
) AS x
)
) AS z
GROUP BY sku, name
) AS a
LEFT JOIN wms.PurchaseOrderProduct b
ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
)
SELECT a.sku, a.name, a.updatedAt AS latestupdatedAt, a.price AS latestPrice, b.updatedAt AS lastUpdatedAt, b.price AS lastPrice
FROM (
SELECT a.sku, a.name, a.updatedAt, b.price from (
SELECT sku, name, max(updatedAt) as updatedAt from wms.PurchaseOrderProduct
GROUP BY sku, name
) AS a
LEFT JOIN wms.PurchaseOrderProduct b
ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
) AS a
LEFT JOIN CTE AS b
ON a.sku=b.sku AND a.name=b.name;
This issue is that Im getting NULLs for columns lastUpdatedAt and lastPrice. But when I run the same query on our prod database, I'm getting desired results.
I suspect that flink is not processing the entire query before giving the results.
I get desired results for a couple of rows in while lastUpdatedAt and lastPrice are not NULL in the beginning of the table*.* But then after that the entire two columns return NULLs
I would like to know why flink is not executing the above query properly?
TIARobin Cassan
10/26/2022, 8:51 AMLorin Liu
10/26/2022, 9:13 AMorg.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka etl-spp-out-0@-1 with FlinkKafkaInternalProducer{transactionalId='etl-spp-out-flink-spp-0-5', inTransaction=true, closed=false}
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka etl-spp-out-1@-1 with FlinkKafkaInternalProducer{transactionalId='etl-spp-out-flink-spp-0-5', inTransaction=true, closed=false}
because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.
To avoid data loss, the application will restart.
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question.
This could happen if, for instance, the producer's records were deleted because their retention time had elapsed.
Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
Any idea about it? Thanks!Tiansu Yu
10/26/2022, 9:33 AMChristos Hadjinikolis
10/26/2022, 9:42 AMBurns Smith
10/26/2022, 5:42 PMIris Grace Endozo
10/27/2022, 12:55 AMKrish Narukulla
10/27/2022, 3:18 AMmessage PersonRecord {
string name = 1;
int32 age = 2;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phones = 4;
}
// Our address book file is just one of these.
message AddressBookRecord {
repeated PersonRecord people = 1;
}
M Harsha
10/27/2022, 6:47 AMFelix Angell
10/27/2022, 9:58 AMMihaly Hajdu
10/27/2022, 10:04 AMKeyedProcessFunction
with the number of keys being somewhere around 500.000.
The logic is heavily time based and runs the production workload in a streaming fashion on unbounded datasources. Before deployments, we want to have the same pipeline being executed against a fixed, goldenish, bounded dataset for testing purposes. And issues start to arise at this point. We figured, this is primarily because of watermarks handled on a per operator-instance level while our use case demands a per key level behaviour.
We came up with a way to handle late/out of order by becoming independent from the watermark, but we are still left with the timers firing incorrectly as they are driven by the watermarks.
Having more keys per operator instance feels like a common use case and I wonder how others approach this.Konstantin
10/27/2022, 10:53 AMdefault
database is created and manually creating a database also seems to fail silently.Sachin Saikrishna Manikandan
10/27/2022, 3:42 PMvignesh kumar kathiresan
10/27/2022, 8:57 PMSylvia Lin
10/27/2022, 9:28 PMMatt Fysh
10/27/2022, 10:34 PMtable_env.execute_sql("INSERT...
table_env.execute_sql("INSERT...
Vaibhav Gubbi Narasimhan
10/28/2022, 4:00 AMSumit Nekar
10/28/2022, 4:08 AMMatt Fysh
10/28/2022, 4:25 AM--pyFiles
conflicts with the one wanted by the pyflink executor, is there a clean-ish solution to resolve?M Harsha
10/28/2022, 6:58 AMAdesh Dsilva
10/28/2022, 8:47 AMProtobuf format
Flink now supports the Protocol Buffers (Protobuf) format. This allows you to use this format directly in your Table API or SQL applications.Couldn't find more information on this. Does this mean I no longer have to register my protobuf class using a 3rd party protobuf serializer with kryo? Will it now work seamlessly like Avro generated classes in DataStream code?
Matt Fysh
10/28/2022, 9:39 AMRecordType:peek_no_expand
and the other has RecordType
chunilal kukreja
10/28/2022, 11:38 AMFailed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar :1.15.2: Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
maven central does not have this jar.
https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde
Is there any workaround available other than referring to spring repo to get this dependency.?chunilal kukreja
10/28/2022, 12:32 PMCould not resolve dependencies for project org.apache.flink:flink-dist_2.12:jar:1.15.2: The following artifacts could not be resolved: org.apache.flink:flink-dist-scala_2.12:jar:1.15.2, org.apache.flink:flink-examples-streaming-state-machine_2.12:jar:1.15.2: Could not find artifact org.apache.flink:flink-dist-scala_2.12:jar:1.15.2 in central (<https://repo.maven.apache.org/maven2>)
I tried changing diff repo’s (including my corp artifactory as well) but nothing worked.. I didn’t faced this with earlier version 1.15.1.Joris Basiglio
10/28/2022, 1:44 PMdef main(args: Array[String]): Unit = {
val savepointLocation = "<s3://my-bucket/savepoint/savepoint-f60fde-f0680e680d7e>"
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
val savepoint = SavepointReader.read(
streamEnv,
savepointLocation
)
savepoint
.readKeyedState(
"my-operator",
new ReaderFunction(),
scala.createTypeInformation[MyOperatorKey],
BasicTypeInfo.STRING_TYPE_INFO
)
.filter(new TestFilterFunction())
.addSink(new DiscardingSink[String]())
streamEnv.execute()
}
M Harsha
10/28/2022, 1:50 PMList<Object>
And so I cannot use the Jdbc sink directly( this function expects the input to be a single object not a list)
Is there any workaround for this apart from writing a custom sink function?