Chris Ro
12/02/2022, 11:25 PMEmmanuel Leroy
12/03/2022, 2:00 AMSandeep Kongathi
12/03/2022, 5:24 PMCREATE TABLE orders (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);
Sink
CREATE TABLE orders_kafka (
order_uid BIGINT,
product_id BIGINT,
price DECIMAL(32, 2),
order_time TIMESTAMP(3),
PRIMARY KEY (`order_uid`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'redpanda:29092',
'topic' = 'orders',
'sink.parallelism' = '2',
'key.format' = 'json',
'value.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
);
Finally when I insert the data
with
INSERT INTO orders_kafka
SELECT *
FROM orders;
I am getting below errorraghav tandon
12/03/2022, 8:33 PMpipeline.max-parallelism
but this increases of other operators as well…
And i am not able to set parallelism at sink operator level
Operator org.apache.flink.streaming.api.datastream.KeyedStream@76828577 cannot set the maximumparalllelism
Pipeline writes to 3 different sinks using `SideOuput`…. Pls suggest if there is a way out…Jirawech Siwawut
12/04/2022, 1:41 AMselect
agg1.id
,agg2.id
,agg1.window_start
,agg2.window_start
,agg1.window_end
,agg2.window_end
from agg1
left join agg2
ON agg1.window_start = agg2.window_start
AND agg1.window_end = agg2.window_end
AND agg1.id=agg2.id
The output is weird where there is always null
value from agg2. It only works for some window at the beginning and start to produce null afterwards. I already try to separate agg1
and agg2
, and found that they both product output for the same window and key. Does anyone here experience the same behavior?sharad mishra
12/04/2022, 2:29 AMval serializer = AvroSerializationSchema.forSpecific(classOf[DCNPOJORecord])
val kafkaRecordSerializationSchema = KafkaRecordSerializationSchema.builder()
.setTopic(targetTopic)
.setValueSerializationSchema(serializer)
.build()
val sink: KafkaSink[DCNPOJORecord] = KafkaSink.builder()
.setBootstrapServers(brokerURL)
.setProperty("<http://transaction.max.timeout.ms|transaction.max.timeout.ms>", transactionMaxTimeoutMs)
.setProperty("<http://transaction.timeout.ms|transaction.timeout.ms>", transactionTimeoutMs)
.setTransactionalIdPrefix(transactionIdPrefix)
.setRecordSerializer(kafkaRecordSerializationSchema)
.setDeliveryGuarantee(deliveryGuarantee)
.build()
sink
Marco Villalobos
12/04/2022, 4:07 AMSumit Nekar
12/04/2022, 7:49 AMtaskmanager.memory.process.size: "8000m"
taskmanager.memory.task.off-heap.size: "500m"
taskmanager.memory.jvm-metaspace.size: "250m"
When the job starts processing, the metrics show that flink_taskmanager_Status_Flink_Memory_Managed_Used is always ZERO.
where flink_taskmanager_Status_Flink_Memory_Managed_Total is set to 5G
Is this configuration fine or should I need to configure either configured explicitly via taskmanager.memory.managed.size
?Amenreet Singh Sodhi
12/05/2022, 6:02 AMflink-1.16.0/tools/releasing
using maven version 3.2.5 and java 11, But i get the following error while doing so
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:jar (attach-javadocs) on project flink-core: MavenReportException: Error while creating archive:
[ERROR] Exit code: 1 - /Users/amensodhi/flink-1.16.0/flink-core/src/main/java/org/apache/flink/management/jmx/JMXServer.java:123: error: package sun.rmi.registry is not visible
[ERROR] private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
[ERROR] ^
[ERROR] (package sun.rmi.registry is declared in module java.rmi, which does not export it to the unnamed module)
[ERROR]
[ERROR] Command line was: /Library/Java/JavaVirtualMachines/jdk-11.0.16.1.jdk/Contents/Home/bin/javadoc -Xdoclint:none --add-exports=java.base/sun.net.util=ALL-UNNAMED @options @packages
[ERROR]
[ERROR] Refer to the generated Javadoc files in '/Users/amensodhi/flink-1.16.0/flink-core/target/apidocs' dir.
[ERROR] -> [Help 1]
But when i build it simply using mvn clean install i dont face any such issues. How to fix this issue?
ThanksAbdelhakim Bendjabeur
12/05/2022, 10:35 AMNick Pocock
12/05/2022, 10:42 AMGiannis Polyzos
12/05/2022, 12:49 PMSELECT
TUMBLE_START(eventTime_ltz, INTERVAL '30' SECONDS) AS startT,
TUMBLE_END(eventTime_ltz, INTERVAL '30' SECONDS) AS endT,
userSession,
COLLECT(eventType) AS userSessionEventTypesCount,
LISTAGG(eventType) AS events
FROM click_events
GROUP BY TUMBLE(eventTime_ltz, INTERVAL '30' SECONDS), userSession
Felix Angell
12/05/2022, 6:31 PMRené
12/05/2022, 8:25 PMThe JAR contains multiple connectors. Please choose which one you want to register.
But I can't choose any connector. Does anyone know how to deal with that or how to install the Oracle driver?Michael Parrott
12/05/2022, 10:09 PMgetOutput
and numKeyedStateEntries
as ways of verifying that my function does as I expect. Is there a way to inspect the state inside my KeyedCoProcessFunction when unit testing it with a test harness?Matyas Orhidi
12/06/2022, 2:17 AMhigh-availability.kubernetes.leader-election.lease-duration
high-availability.kubernetes.leader-election.renew-deadline
Krish Narukulla
12/06/2022, 5:39 AMpublic abstract class LookupFunction extends TableFunction<RowData> {
public LookupFunction() {
}
public abstract Collection<RowData> lookup(RowData var1) throws IOException;
public final void eval(Object... keys) {
GenericRowData keyRow = GenericRowData.of(keys);
try {
Collection<RowData> lookup = this.lookup(keyRow);
if (lookup != null) {
lookup.forEach(this::collect);
}
} catch (IOException var4) {
throw new RuntimeException(String.format("Failed to lookup values with given key row '%s'", keyRow), var4);
}
}
Tudor Plugaru
12/06/2022, 8:43 AMrecords_lag_max
metrics, but the values aren’t really reflecting the real numbers of the lag, for example I have around 400mil records in the topic, but the metic shows a value of around 4mil. Just trying to understand how best to monitor the backlog the job has based only on the metrics Flink exports. Thanks in advance.Gaurav Miglani
12/06/2022, 12:20 PMTamas Kiss
12/06/2022, 1:29 PMhistoryserver.archive.clean-expired-jobs = true
to do the clean up. Is this a safe solution, or something else is recommended instead?Nick Pocock
12/06/2022, 2:01 PMmodule.yaml
that we use with the StateFun image
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
For example, for Kakfa you use
kind: io.statefun.kafka.v1/egress
spec:
id: com.example/my-egress
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
Has anyone used a custom egress with the StateFun stuff? ThanksEmmanuel Leroy
12/06/2022, 3:31 PMThiruvenkadesh Someswaran
12/06/2022, 4:59 PMMarco Villalobos
12/06/2022, 5:36 PMRommel
12/06/2022, 6:24 PMSami Badawi
12/06/2022, 9:43 PMexport FLINK_CONDA_HOME=$(dirname $(dirname $CONDA_EXE))
export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1
export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1
~/miniconda3/bin/conda create -n pyflink_38 python=3.8
conda activate pyflink_38
pip install -r ./dev/dev-requirements.txt
conda install -c conda-forge grpcio
pip install -r ./dev/dev-requirements.txt
pip install --upgrade pip setuptools wheel
pip install -r ./dev/dev-requirements.txt
pip install apache-flink
But I was still not able to run the basic_operations.py example from GitHub:
python /Users/sami/Documents/GitHub/flink/flink-python/pyflink/examples/datastream/basic_operations.py
File "/Users/sami/miniconda3/envs/pyflink_39/lib/python3.9/site-packages/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
_INPUT = DESCRIPTOR.message_types_by_name['Input']
AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
I read that the problem has been fixed, and this will go out with Flink 1.17, but I was wondering if anybody found a workaround, so I could experiment with PyFlink now?Marco Villalobos
12/06/2022, 10:20 PM2022-12-06 16:32:42,550 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - aggregate tags (3/4) (f72fc395431b2f2f8cd17b75f27a040e) switched from RUNNING to FAILED on container_1664952361217_0019_01_000003 @ ip-10-45-1-55.us-west-2.compute.internal (dataPort=42231).
org.apache.flink.util.SerializedThrowable: Could not perform checkpoint 240017 for operator aggregate tags (3/4)#0.
...
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_352]
Caused by: org.apache.flink.util.SerializedThrowable: Could not complete snapshot 240017 for operator aggregate tags (3/4)#0. Failure reason: Checkpoint was declined.
....
Caused by: org.apache.flink.util.SerializedThrowable: While open a file for appending: /mnt/yarn/usercache/root/appcache/application_1664952361217_0019/flink-io-a8866c5d-1d84-43f9-9a3e-a6046505da2a/job_ee369b360e11b92b4ddf86ae4d1bc692_op_WindowOperator_6c048ec3ba20a654976042820b488880__3_4__uuid_1110507e-f1b2-4ac0-9443-9db3ac2ebdf0/chk-240017.tmp/MANIFEST-1145131: Too many open files
....
2022-12-06 16:32:42,585 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 6c048ec3ba20a654976042820b488880_2.
2022-12-06 16:32:42,586 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 44 tasks should be restarted to recover the failed task 6c048ec3ba20a654976042820b488880_2.
Then thirty minutes later job manager ran out of memory:
2022-12-06 17:01:30,040 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'flink-akka.remote.default-remote-dispatcher-5' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
I believe that incremental checkpoints requires too many files and too much memory somehow. Additionally, for a long time, our akka frame size has required enlargement due to error that it was too small. It is currently, akka.framesize: 85298139b.
Any advice or strategies for tuning this?Bhupendra Yadav
12/07/2022, 8:00 AM-- SELECT c1, c2 FROM table t LIMIT 5
DB LOG: execute <unnamed>: SELECT c1, c2 FROM table LIMIT 5
-- SELECT c1, c2 FROM table t WHERE c1=123 LIMIT 5
DB LOG: execute <unnamed>: SELECT c1,c2 FROM table
Any references that can help me understand the query execution better?Tiansu Yu
12/07/2022, 9:26 AMcurrentInputWatermark
stays at Long.MIN_VALUE
.
This is the setup I use for watermarks of all sources:
WatermarkStrategy.<T>forBoundedOutOfOrderness(windowConf.getWatermarkBoundedness())
.withIdleness(windowConf.getWatermarkIdleness())
.withTimestampAssigner((SerializableTimestampAssigner<T>) (value, recordTimestamp) -> timestampExtractor.apply(value));
Somehow, only the currentOutputWatermark advances. But the input watermark stays Long.MIN_VALUE
.Suparn Lele
12/07/2022, 1:24 PMval table = streamTableEnvironment.fromValues(
DataTypes.ROW(
DataTypes.FIELD("timestamp", DataTypes.STRING()),
DataTypes.FIELD("organization_id", DataTypes.STRING()),
DataTypes.FIELD("cluster_id", DataTypes.STRING()),
DataTypes.FIELD("total", <http://DataTypes.INT|DataTypes.INT>())
),
row("2022-01-01 00:02:00", "A", "B", Int.box(2)),
row("2022-01-01 00:03:00", "A", "C", Int.box(1))
)
After this I am converting it to temporary view
streamTableEnvironment.createTemporaryView("table1", table)
After this I am running the following query
val table2 = streamTableEnvironment.sqlQuery("select cast(`timestamp` as TIMESTAMP(3)) as trunc_time, organization_id, cluster_id, total FROM table1")
Till this point everything works fine but when I am running the following query
streamTableEnvironment.createTemporaryView("temp", table2)
val aggregatedTable = streamTableEnvironment.sqlQuery("select TUMBLE_END(trunc_time, INTERVAL '10' MINUTE) `timestamp`, cluster_id, sum(total) as total from temp group by TUMBLE(trunc_time, INTERVAL '10' MINUTE), organization_id, cluster_id")
It throws following exception
Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
My settings are as follows
val streamExecutionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment)
val unmodifiableCollection: Class[_] = Class.forName("java.util.Collections$UnmodifiableCollection")
streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH)
Please help. So my general question is how can I apply windowing operations for a table which I have loaded from DB. The table has timestamp columns. And I am running job in batch mode.
P.S - Flink - 1.14, Scala - 2.11