M Harsha
10/28/2022, 4:52 PMCaused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
As per the Debugging Classloading docs, it suggests to set the param to ``classloader.resolve-order=parent-first``
I am setting this via Java code as follows:
Configuration configuration = new Configuration();
configuration.set(CLASSLOADER_RESOLVE_ORDER,"parent-first");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
System.out.println(env.getConfiguration().toString());
I see the following line in the output, that prints the config
classloader.resolve-order=parent-first
But I still see the error
Am I missing anything?
Also I am able to run the application via IntelliJ
TIA!Steven Zhang
10/28/2022, 7:47 PMflinkConfiguration:
taskmanager.numberOfTaskSlots: "10"
state.savepoints.dir: "file:///flink-data/savepoints"
state.checkpoints.dir: "file:///flink-data/checkpoints"
high-availability: "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
high-availability.storageDir: "file:///flink-data/ha"
I started up two SessionJobs using that deployment with upgradeMode: savepoint
. I sshed into the k8s node with the volume and saw the ha directory created with job graphs in it
blob submittedJobGraph220fe25801af submittedJobGraph477b47fdef02
I also manually triggered a savepoint with savepointTriggerNonce
and see the savepoint directory created
When I went to upgrade the FlinkDep image by modifying the CRD, the jobs entered RECONCILING
and never exited, and the job graphs had disappeared from the ha directory on the node (but the savepoints are still present). Am I missing something in my setup to get the jobs to continue running after the upgrade?Dan Dubois
10/28/2022, 11:57 PMtaskmanager.memory.process.size: 32gb
to my Task Manager FLIINK_PROPERTIES
in my compose.yml
file then the Task Manager crashes with the following logs:
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000003ab000000, 18605932544, 0) failed; error='Not enough space' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 18605932544 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/flink/hs_err_pid1.log
The thing I don't understand is that my host machine has 64Gb of memory and the Docker containers are allocated max 64Gb so why wouldn't the JVM be able to allocate enough memory?Matt Fysh
10/29/2022, 10:38 AMIkvir Singh
10/29/2022, 5:07 PMMatt Fysh
10/30/2022, 4:27 AMtable_env.sql_query('SELECT * FROM my_kinesis_source')
Jirawech Siwawut
10/30/2022, 4:39 AMselect category_id, window_start, window_end, count(1)
from
( select *
FROM TABLE(HOP(TABLE transacton, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '20' MINUTES)) AS t
) t
LEFT JOIN dimention_table FOR SYSTEM_TIME AS OF t.event_time
ON dimention_table.id = t.restaurant_id
AND dimention_table.id IS NOT NULL
AND t.restaurant_id IS NOT NULL
GROUP BY category_id, window_start, window_end
And here is the execution plan
+- GroupAggregate(groupBy=[category_id, window_start, window_end], select=[category_id, window_start, window_end, COUNT(DISTINCT $f3) AS $f3, COUNT(DISTINCT $f4) AS $f4])
+- Exchange ...
+- Calc ...
+- TemporalJoin...
:- Exchange(distribution=[hash[restaurant_id]])
: +- Calc ...
: +- WindowTableFunction(window=[HOP(time_col=[event_time], size=[2 min], slide=[1 min])])
: +- ...
+- Exchange(distribution=[hash[id]])
+- Calc ...
How do i get global window aggregation. I tried something like this but it did not work with temporal join. You may follow below sql
SELECT category_id, window_start, window_end, count(1)
FROM
( SELECT *
FROM TABLE(HOP(TABLE transacton, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '20' MINUTES)) AS t
LEFT JOIN dimention_table FOR SYSTEM_TIME AS OF t.event_time
ON dimention_table.id = t.restaurant_id
AND dimention_table.id IS NOT NULL
AND t.restaurant_id IS NOT NULL
) t
GROUP BY category_id, window_start, window_end
Tawfik Yasser
10/30/2022, 4:50 AMNo ExecutorFactory found to execute the application
.
Tried different solutions as well but didn't work at all.
Could anyone advice me, please?
TIAHaim Ari
10/30/2022, 11:25 AMHaim Ari
10/30/2022, 11:30 AMk-operator.yaml:82:25: executing "flink-kubernetes-operator/templates/flink-operator.yaml" at <$v.name>: can't evaluate field name in type interface {}...
Matt Fysh
10/31/2022, 1:21 AMKrish Narukulla
10/31/2022, 1:46 AMflink-table-code-spiltter
is not building.Jim
10/31/2022, 6:57 AMAmenreet Singh Sodhi
10/31/2022, 7:36 AMFailed to execute goal com.github.eirslett:frontend-maven-plugin:1.11.0:
linstall-node-and-npm (install node and pm) on project flink-runtime-web: Could not download pm: Could not download <https://registry.npmjs.org/mpm/-/mpm-8.1.2Itgz>: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
Haim Ari
10/31/2022, 8:03 AM5m12s Normal SuccessfulCreate replicaset/flink-kubernetes-operator-78fc4b8b86 Created pod: flink-kubernetes-operator-78fc4b8b86-dm7rd
62s Warning FailedMount pod/flink-kubernetes-operator-78fc4b8b86-dm7rd MountVolume.SetUp failed for volume "keystore" : secret "webhook-server-cert" not found
5m11s Normal Scheduled pod/flink-kubernetes-operator-78fc4b8b86-dm7rd Successfully assigned flink/flink-kubernetes-operator-78fc4b8b86-dm7rd to inst-pqqeh-singapore-ops-node-rtb-prd-intel
3m9s Warning FailedMount pod/flink-kubernetes-operator-78fc4b8b86-dm7rd Unable to attach or mount volumes: unmounted volumes=[keystore], unattached volumes=[flink-operator-config-volume flink-operator-token-d7k2c keystore]: timed out waiting for the condition
52s Warning FailedMount pod/flink-kubernetes-operator-78fc4b8b86-dm7rd Unable to attach or mount volumes: unmounted volumes=[keystore], unattached volumes=[keystore flink-operator-config-volume flink-operator-token-d7k2c]: timed out waiting for the condition
12s Normal error helmrelease/flink-kubernetes-operator Helm install failed: timed out waiting for the condition...
12s Normal error helmrelease/flink-kubernetes-operator reconciliation failed: Helm install failed: timed out waiting for the condition
12s Normal error helmrelease/flink-kubernetes-operator reconciliation failed: install retries exhausted
Tawfik Yasser
10/31/2022, 8:32 AMTawfik Yasser
10/31/2022, 8:32 AMTawfik Yasser
10/31/2022, 8:33 AMGaurav Miglani
10/31/2022, 9:35 AM2022-10-31 09:29:17,499 o.a.f.s.n.i.n.c.DefaultChannelPipeline [WARN ] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: Received fatal alert: bad_certificate
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: javax.net.ssl.SSLHandshakeException: Received fatal alert: bad_certificate
at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
at java.base/sun.security.ssl.Alert$AlertConsumer.consume(Unknown Source)
at java.base/sun.security.ssl.TransportContext.dispatch(Unknown Source)
at java.base/sun.security.ssl.SSLTransport.decode(Unknown Source)
at java.base/sun.security.ssl.SSLEngineImpl.decode(Unknown Source)
at java.base/sun.security.ssl.SSLEngineImpl.readRecord(Unknown Source)
at java.base/sun.security.ssl.SSLEngineImpl.unwrap(Unknown Source)
at java.base/sun.security.ssl.SSLEngineImpl.unwrap(Unknown Source)
at java.base/javax.net.ssl.SSLEngine.unwrap(Unknown Source)
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:296)
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1342)
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
... 17 more
Tawfik Yasser
10/31/2022, 11:17 AMArnon
10/31/2022, 12:56 PMPiotr Pawlaczek
10/31/2022, 1:42 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: my-session-deployment
spec:
image: <<my-custom-image-with-s3-plugin>>
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
s3.endpoint: <http://endpoint.com|endpoint.com>
s3.path.style.access: "true"
s3.access-key: <<key>>
s3.secret-key: "****"
then when I define my FlinkSessionJob:
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: job
spec:
deploymentName: my-session-deployment
job:
jarURI: <s3://test/flink-examples-streaming_2.12-1.15.2-TopSpeedWindowing.jar>
parallelism: 4
upgradeMode: stateless
I get the following error from FlinkSessionJob:
org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on dummy: com.amazonaws.SdkClientException: Unable to execute HTTP request: test.endpoint.com: Unable to execute HTTP request: test.endpoint.comIt seems my
s3.path.style.access: "true"
property was ignored and the bucket name (test) was attached to endpoint’s name instead of: <http://endpoint.com/test/|endpoint.com/test/>
.
Apart from that I’m able to read from s3 connecting with application using exactly the same configuration like in yaml so the s3 config is correct.
Could you please tell me what I’m doing wrong?Tiansu Yu
10/31/2022, 2:37 PMGaurav Miglani
10/31/2022, 3:44 PMJin Yi
10/31/2022, 5:40 PMEric Xiao
10/31/2022, 9:15 PMop
column:
tableEnv
.sqlQuery("""
SELECT
*
FROM line_item
WHERE
true
AND op <> 'I+'
""")
> Column 'op' not found in any table
Is there a special way for us to access the type of operation of the CDC record in sql directly? I tried added the op
column as a metadata column as well and it complained it could not find it.Ankit Kothari
11/01/2022, 12:09 AMStreamingFileSink
to write our stream data to s3
private transient int customMetric = 0;
final DataStream<String> source = ....
final StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new NamespaceSiteIdBucketAssigner<>())
.withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(getOutputFileConfig())
.build();
sink.getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return customMetric;
}
});
source.map(line -> {
Map<String, Object> resultMap = ...
customMetric = resultMap.get("some-key");
return Utils.jsonParser.writeValueAsString(resultMap);
}).name(workflowId).uid(workflowId);
On running this code snippet, I get the following error
"The runtime context has not been initialized.","message":"The runtime context has not been initialized.","name":"java.lang.IllegalStateException","extendedStackTrace":"java.lang.IllegalStateException: The runtime context has not been initialized.\n\tat org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
Is there some initialization to be done so that the runtime context object is created?
Can someone please point to what am I missing here and how do I get this working
FYI
Following this guide for metric: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/metrics/#metric-types
and this for streamingFileSink: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/Matt Fysh
11/01/2022, 12:56 AMSuna Yin
11/01/2022, 2:31 AMTawfik Yasser
11/01/2022, 3:47 AM