Nitin Agrawal
12/09/2022, 10:29 AMMomir Beljic
12/09/2022, 11:48 AMenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
input_table_name = "input_process_table"
output_table_name = "output_process_table"
create_source = create_source_table(input_table_name, "process_events", "eu-central-1", "LATEST")
t_env.execute_sql(create_source)
table = t_env.from_path(input_table_name)
print('Table')
print(table.execute().print())
pdf = table.limit(1000).to_pandas()
Thanks!Patrick Lucas
12/09/2022, 2:49 PMorg.apache.http.impl.auth.HttpAuthenticator
), but yet having been rewritten to reference a relocated class (<http://org.apache.flink.fs.shaded.hadoop3.org|org.apache.flink.fs.shaded.hadoop3.org>.apache.commons.logging.Log
). When the Elasticsearch client code wants to instantiate an HttpAuthenticator
, it hits this rewritten version, and it fails because it's trying to pass an un-relocated org.apache.commons.logging.Log
. My guess is that HttpAuthenticator
(and presumably anything else being bundled from org.apache.httpcomponents
?) should be relocated within flink-gs-fs-hadoop
.
I'm tempted to call this a bug, but I'd be surprised if no one else had run into this before, as using GCS and ES at the same time doesn't seem that rare. Could someone give me a sanity check that there isn't something obvious I'm doing wrong?
Minimal reproducing case: https://github.com/patricklucas/flink-gs-es-problem
Stacktrace in thread.André Casimiro
12/09/2022, 7:06 PMAverageAccumulator
but got wondering if there's a standard way to just plug it into some other generic aggregator. Now I have to write the same logic for other operations like max/min/sum calculation?
Any suggestions?Emmanuel Leroy
12/09/2022, 9:42 PMJin S
12/09/2022, 11:08 PMHADOOP_TOKEN_FILE_LOCATION
(given that the superuser has put DTs in that location)? Thanks :)Sumit Nekar
12/10/2022, 5:42 AMLee xu
12/10/2022, 11:09 AMThiruvenkadesh Someswaran
12/10/2022, 6:24 PMUnsupportedFileSystemSchemeException
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: basic-session-deployment-only-example
spec:
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
tolerations:
- key: "kafka"
operator: "Equal"
value: "true"
effect: "NoSchedule"
containers:
# Do not change the main container name
- name: flink-main-container
envFrom:
- configMapRef:
name: application-properties-flink
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.15.3.jar
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
my Flink Session job
kubectl get <http://flinksessionjobs.flink.apache.org|flinksessionjobs.flink.apache.org> -n flink alerting -o yaml
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
creationTimestamp: "2022-12-10T17:26:29Z"
finalizers:
- <http://flinksessionjobs.flink.apache.org/finalizer|flinksessionjobs.flink.apache.org/finalizer>
generation: 2
name: alerting
namespace: flink
resourceVersion: "2383389"
uid: 6a2b29f5-4002-4977-8204-52c1acaa3a66
spec:
deploymentName: basic-session-deployment-only-example
job:
args: []
jarURI: <s3://MYS3-JARbucket/myjar.jar>
parallelism: 4
state: running
upgradeMode: stateless
status:
error: 'org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme ''s3''. The scheme is directly supported
by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.
Please ensure that each plugin resides within its own subfolder within the plugins
directory. See <https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html>
for more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems. For
a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.>'
kingsathurthi
12/11/2022, 7:46 AM- The 'host' parameter of 'Jobmanager.sh' has been deprecated. Please use -D Key: 'jobmanager
2022-12-11 07:37:39,054 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypaint 2022-12-11 07:37:39,089 INFO org.apache.flink.runtime.entrypoint. ClusterEntrypoint
-Starting StandaloneSessionClusterEntrypoint. -Install default filesystem.
2022-12-11 07:37:39,093 INFO org.apache.flink.core.fs.FileSystem
[] Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via
Hadoop is not available. 2022-12-11 07:37:39,139 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
Install security context. [] Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2022-12-11 07:37:39,150 INFO
org.apache.flink.runtime.security.modules.HadoopModuleFactory 2022-12-11 07:37:39,155 INFO org.apache.flink.runtime.security.modules.JaasModule
Jaas file will be created as /tmp/jaas-3448220580025284093.conf.
2022-12-11 07:37:39,163 INFO org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classp
ath.
2822-12-11 07:37:39,165 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint 2822-12-11 07:37:39,173 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
Initializing cluster services.
-Using working directory: WorkingDirectory(/tmp/js_fcdb578a7424085b45f144475881118a). Trying to start actor system, external address 192.168.27.109:6123, bind address 0.0.0.0:612 -
2022-12-11 07:37:39,552 INFO org.apache.flink.runtime.rpc.akka.AldkaRpcServiceUtils 3.
2822-12-11 07:37:48,564
INFO akka.event.slf4j.slf4jLogger
-Slf4jLogger started
2822-12-11 07:37:40,612 INFO akka.remote.RemoteActorRefProvider
-Alda Cluster not in use enabling unsafe features anyway because 'alda.remote.use-unsafe-re
note-features-outside-cluster' has been enabled.
2622-12-11 07:37:40,613 INFO akka.remote.Remoting 2022-12-11 07:37:40,822 INFO akka.remote.Remoting
[] Starting remoting
-Remoting started; Listening on addresses [<akka.tcp://link@192.168.27.109:6123|akka.tcp://link@192.168.27.109:6123>] [] Actor system started at <akka.tcp://flink@192.168.27.109:6123|akka.tcp://flink@192.168.27.109:6123> [] Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
2022-12-11 07:37:48,978 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
2822-12-11 07:37:41,438 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.HubernetesHaServi
kingsathurthi
12/11/2022, 5:25 PMkingsathurthi@N-5CG2124DKR:-/flink/doc-standalone$ k logs flink-jobmanager-7d4447c69d-7tfc6 /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied sed: couldn't open temporary file /opt/flink/conf/sedeqcoZa: Read-only file system /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied /docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting Job Manager Starting standalonesession as a console application on host flink-jobmanager-7d4447c69d-7tfc6.
2822-12-11 17:27:15,459 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
2022-12-11 17:27:15,461 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
2022-12-11 17:27:15,461 INFO org.apache.flink.runtime.entrypoint. ClusterEntrypoint
Liubov
12/12/2022, 10:24 AMJirawech Siwawut
12/12/2022, 10:37 AMConor McGovern
12/12/2022, 12:12 PMCaused by: com.amazonaws.ResetException: The request to the service failed with a retryable reason, but resetting the request input stream has failed. See exception.getExtraInfo or debug-level logging for the original failure that caused this retry.; If the request involves an input stream, the maximum stream buffer size can be configured via request.getRequestClientOptions().setReadLimit(int)
Caused by:
Caused by: java.io.IOException: Resetting to invalid mark
(Full stacktrace in thread). Anybody know of a workaround/fix for this issue?Amir Halatzi
12/12/2022, 1:18 PMTudor Plugaru
12/12/2022, 1:48 PMmetrics.reporters: dghttp
metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
metrics.reporter.dghttp.apikey: redacted
metrics.reporter.dghttp.dataCenter: US
metrics.reporter.dghttp.useLogicalIdentifier: true
metrics.reporter.dghttp.tags: cluster_name:redacted, kube_deployment:redacted
<http://metrics.scope.jm|metrics.scope.jm>: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
<http://metrics.scope.tm|metrics.scope.tm>: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
But from what I see, I can’t rewrite the scopes of the metrics. They arrive in Datadog with the taskmanager
and jobmanager
prefix and not with the flink
prefix as required by Datadog docs in order to not be counted as customer metrics. Looking at the code I tried using tm-job
instead of tm.job
but still, the issue remains. Anyone can advice please 🙏Joris Basiglio
12/12/2022, 3:14 PMFelix Angell
12/12/2022, 4:09 PMapache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType
we're using flink 1.13.2 on AWS KDA. it looks as though this is a known problem and would degrade performance. is it worth investigating this further or would the effect be negligible?Sitara Wishal
12/12/2022, 6:07 PMRaf
12/12/2022, 7:36 PMSELECT * FROM table
is it supposed to show the query results as they come?
The query completes successfully because it's async (table.dml-sync=false), but for some reason, the results only show up if I stop the sql-gateway process.Emmanuel Leroy
12/12/2022, 11:32 PMSuparn Lele
12/13/2022, 4:06 AMval streamExecutionEnv = StreamExecutionEnvironment.getEnvironment
intervals.foreach(interval => run whole pipeline for this interval)
streamExecutionEnv.execute()
If I am creating executing environement just once and using execute statement just once, then flink wont run this sequqnetially. I think it would do this simulatenously because there would be one execute at the end once all loops are done. But I want it to run sequentially only. How can I achieve this?Kenny Lu
12/13/2022, 4:41 AM# disable UI submit jobs
web.submit.enable: "false"
Prathit Malik
12/13/2022, 10:25 AMSlackbot
12/13/2022, 11:14 AMkingsathurthi
12/13/2022, 12:04 PM2022-12-13 11:54:25,468 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2022-12-13 11:54:25,471 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0
]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.0.jar:1.16.0]
... 4 more
2022-12-13 11:54:25,484 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostic
s Cluster entrypoint has been closed externally..
Abdelhakim Bendjabeur
12/13/2022, 1:44 PMCREATE TABLE all_data (
`id` INT,
`accountId` INT,
...,
PRIMARY KEY (id, accountId) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = 'host.docker.internal:29092',
'value.format' = 'json',
'key.format' = 'json',
'topic' = 'some-topic'
);
INSERT INTO all_data
WITH table_a_dedup AS (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY `timestamp` DESC) rn
FROM table_a
) s
WHERE rn = 1),
table_b_dedup AS (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn
FROM table_b
) s
WHERE rn = 1)
SELECT
table_b_dedup.id,
table_b_dedup.accountId,
table_b_dedup.`timestamp` as tableBTimestamp,
table_a_dedup.`timestamp` as tableATimestamp,
table_a_dedup.createdDatetime as tableACreatedDatetime,
table_a_dedup.deletedDatetime as tableADeletedDatetime,
FROM table_b_dedup
LEFT JOIN table_a_dedup
ON table_b_dedup.ticketId = table_a_dedup.id;
whenever an update is applied on table_a
, an extra log appears in the topic 'some-topic'
with all of its columns set to null. I can't find an explanation to it, did anyone experience this before? Any help is appreciatedMatt Czyz
12/13/2022, 4:06 PMStreamTableEnvironment
and having problems running it with Yarn on top of EMR - works fine if run from IDE or without any code dependent on StreamTableEnvironment
.
The application was created using gradle quickstart script without any additional packaged deps:
bash -c "$(curl <https://flink.apache.org/q/gradle-quickstart.sh>)" -- 1.15.1 _2.12
With code to narrow down the issue simply implementing the example as-is from:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
When executing with Yarn on EMR I am getting the following error:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
The application executes fine if I replace flink-table-planner-loader-1.15.1.jar
with flink-table-planner_2.12-1.15.1.jar
under lib/
directory, but if I understand correctly flink-table-planner-loader
already contains the planner so it should work fine without replacing it with the legacy dependency?Felix Angell
12/13/2022, 4:33 PMAdrian Chang
12/13/2022, 9:28 PMGROUP BY TUMBLE
with a user-defined aggregate function. Can I expect the values sent to the function are in the order they were received ?
I know that SQL does not guarantee order when applying an aggregation function. Does Flink SQL behave the same way or it respect the order of the events ?
Thanks