Xiaosheng Wu
10/18/2022, 4:48 PMM Harsha
10/18/2022, 5:23 PMSachin Saikrishna Manikandan
10/18/2022, 5:27 PMStephan Weinwurm
10/18/2022, 11:42 PMpool_size
configured per TaskManager or does the JobManager split the pool_size
evenly across all available TaskManagers? We want to limit the number of concurrent connections to our state fun endpoint to e.g. 1000 conntections. Do we set pool_size: 1000
or pool_size: 1000 / num_task_managers
?Matt Fysh
10/19/2022, 2:18 AMRaghunadh Nittala
10/19/2022, 2:49 AMCanope Nerda
10/19/2022, 3:44 AMWATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
I enabled early/late fire with the following configurations and I expect to see most fresh data in the sink which is not the case. Any suggestion is highly appreciated.
table.exec.emit.allow-lateness: "24 h"
table.exec.emit.early-fire.enabled: "true"
table.exec.emit.early-fire.delay: "0 s"
table.exec.emit.late-fire.enabled: "true"
table.exec.emit.late-fire.delay: "0 s"
Canope Nerda
10/19/2022, 3:48 AMMatt Fysh
10/19/2022, 5:16 AMenv.fromElements("a", "b", "c").print
to show up in ZeppelinMatteo De Martino
10/19/2022, 9:17 AMSystem.err: (none)
2022-10-19T09:03:23.730088107Z at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730092595Z at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730096510Z at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730101151Z at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:159) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730105491Z at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:107) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730109668Z at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
2022-10-19T09:03:23.730113393Z at java.lang.Thread.run(Unknown Source) [?:?]
2022-10-19T09:03:23.730116960Z Caused by: java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapRefArray(java.lang.Object[])'
2022-10-19T09:03:23.730120758Z at generic.WindowFunctions$.<clinit>(WindowFunctions.scala:58) ~[?:?]
2022-10-19T09:03:23.730124621Z at generic.WindowFunctions.main(WindowFunctions.scala) ~[?:?]
2022-10-19T09:03:23.730128185Z at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
2022-10-19T09:03:23.730132380Z at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
2022-10-19T09:03:23.730136395Z at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
2022-10-19T09:03:23.730140226Z at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
2022-10-19T09:03:23.730143988Z at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730148759Z at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730161755Z at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-dist-1.15.2.jar:1.15.2]
2022-10-19T09:03:23.730165936Z
The Flink version is 1.15.2 (via this docker image), I am packaging a Scala 2.13 job as an uber jar (tried several ways). I also tested the same code locally (in memory Flink) and it worked fine.
I am sure I am missing something obvious, but can anyone help me and point me in the right direction? 🙏
Thanks 🙇Yaroslav Bezruchenko
10/19/2022, 9:56 AMCanope Nerda
10/19/2022, 10:29 AMM Harsha
10/19/2022, 12:54 PMSET 'execution.runtime-mode' = 'streaming';
SET 'state.checkpoints.dir' = 'file:///tmp/flink-checkpoints/';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.time-characteristic'='event-time';
SET 'execution.checkpointing.interval' = '5min';
SET 'execution.checkpointing.min-pause' = '1min';
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
SET 'parallelism.default' = '-1';
SET 'jobmanager.scheduler' = 'adaptive';
SET 'cluster.declarative-resource-management.enabled' = 'true';
The job graph is as follows:
TableSourceScan > Calc > LocalWindowAggregate > GlobalWindowAggregate > Calc > ConstraintEnforcer > Sink
When I run the job, I'm seeing that the entire pipeline is running in one slot( checked it via SlotStatus in the logs), and as the load increases I do not see it getting divided and using the other slots
Say when I set the default parallelism to 1, and use the default scheculer, this is the job breakdown
Task1(1 Slot): TableSourceScan > Calc > LocalWindowAggregate
Task2(1 Slot): GlobalWindowAggregate > Calc > ConstraintEnforcer > Sink
Am I missing something which is leading to the job not using the resources properly(adaptive scheduler)clen.moras
10/19/2022, 1:01 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: basic-session
spec:
deploymentName: basic-session-deployment
job:
jarURI: local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: basic-session
spec:
deploymentName: basic-session-deployment
job:
jarURI: <https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.2/flink-examples-streaming_2.12-1.15.2-TopSpeedWindowing.jar>
parallelism: 4
upgradeMode: stateless
whereas in both case the MANIFEST.MF file is the same in both cases of jars.
root@ca5227b1db40:/opt/flink/examples/streaming/META-INF# cat MANIFEST.MF
Manifest-Version: 1.0
Implementation-Title: Flink : Examples : Streaming
Implementation-Version: 1.15.2
Archiver-Version: Plexus Archiver
Built-By: cranmerd
Specification-Vendor: The Apache Software Foundation
Specification-Title: Flink : Examples : Streaming
Implementation-Vendor-Id: org.apache.flink
program-class: org.apache.flink.streaming.examples.windowing.TopSpeedW
indowing
Implementation-Vendor: The Apache Software Foundation
Created-By: Apache Maven 3.2.5
Build-Jdk: 1.8.0_342
Specification-Version: 1.15.2
What am i doing wrong hereIlya Sterin
10/19/2022, 2:39 PMMatteo De Martino
10/19/2022, 2:56 PMx
that is DATE
and another field y
that is TIMESTAMP
.
What I need is to get build a final TIMESTAMP
(format yyyy-MM-dd HH:mm:ss
) from x
and only the time part of y
.
I tried something like: TO_TIMESTAMP(DATE_FORMAT(x, 'yyyy-MM-dd') || ' ' || DATE_FORMAT(y, 'HH:mm:ss'), 'yyyy-MM-dd HH:mm:ss') AS CreationTime
But DATE_FORMAT
cannot be used on x
(which is DATE
), and I can't find a way to convert that into a timestamp or a string....
What am I missing? 😅Bhupendra Yadav
10/19/2022, 2:58 PMding bei
10/19/2022, 3:08 PMTommy May
10/19/2022, 4:53 PMJason Politis
10/19/2022, 5:03 PMIF(
COUNTRY IS NULL
OR CHAR_LENGTH(RTRIM(COUNTRY)) = 0,
'ETL_UNSPEC_STR',
RTRIM(COUNTRY)
)
M Harsha
10/19/2022, 5:40 PMMingliang Liu
10/19/2022, 6:32 PMThe GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.I’m wondering why was the decision? Is it possible to make it work across multiple sessions / clients? In-memory is fine (so it’s understood data will be gone after JVM restarts)
ding bei
10/20/2022, 2:33 AMMatt Fysh
10/20/2022, 6:15 AMsambhav gupta
10/20/2022, 7:01 AMIldar Almakaev
10/20/2022, 9:35 AMValueState
and MapState
states:
private ValueState<User> userState;
private MapState<String, List<Action>> delayedRecordsState;
Could you help me figure out how to expose the number of records of each state as a JMX metric?
Then I could monitor them in CloudWatch/Grafana, etc.
Thanks for any help!chunilal kukreja
10/20/2022, 10:55 AMKafkaSource<EventDataMapping> kafkaSource = KafkaSource.<String>builder()
.setTopicPattern(streamNamePattern)
.setClientIdPrefix(configProps.getJobProps().getClientId())
.setStartingOffsets(OffsetsInitializer.timestamp(delta))
.setDeserializer(kafkaRecordDeserializationSchema)
//.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(kafkaConsumerProperties)
.build();
It throws me the following error: Invalid negative Offset while trying to read from a stream:
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
I observed this happens when it tries to return the record from a partition which hasnt recieved any message. So i have 3 partitions, and when events are present in all the three partitions, it works fine, returns all the events in 3 partitions after that particular timestamp. But even if one of the partitions is empty it will throw the error instead of returning the events. How to handle this issue? Ideally it should get an empty response from a partition containing no events instead of error, so that code could look for events in the other partitions.
(FYI. I am using Oracle Streaming Service)Jhuanderson Macias
10/20/2022, 1:05 PMTim Bauer
10/20/2022, 1:45 PMval windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
Is there an equivalent approach in Flink?
All I am finding on Table API side is that watermarks need to be defined at the moment of table creation. It doesn't look like one could later provide a custom watermark for a custom aggregation or am I wrong? On Datastream API side there seems to exist assignTimestampsAndWatermarks
.
Background: I'm dealing with a stream in which only certain events have the event timestamp that I want to use as watermark. I'm filtering out those events using the table API and after I have done so I would like to declare this now guaranteed-to-be-available timestamp field to be my watermark.Varun Sayal
10/20/2022, 2:35 PM