Varun Sayal
10/14/2022, 2:40 PMVarun Sayal
10/14/2022, 2:41 PMKrish Narukulla
10/14/2022, 5:49 PMFlinkDeployment
k8s operator support reading from s3/gcs? Is there a plan to support?
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: xxx
namespace: %{K8S_NAMESPACE}
spec:
image: %{IMAGE_REGISTRY}/dea/xxx:%{IMAGE_TAG}
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: <s3://bucket/krish/airstream.jar>
args: ["--config", "<s3://bucket/pipeline-test.yaml>"]
parallelism: 2
upgradeMode: stateless
Angelo Kastroulis
10/15/2022, 4:36 AMflink-s3-fs-hadoop
plugin
(v1.15.2) in the plugins folder and also flink-shaded-hadoop-2-uber-2.4.1-10.0.jar
into lib
. But, I can’t get it working.
The error I get is :
Caused by: java.io.IOException: No FileSystem for scheme: s3a
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
at org.apache.flink.orc.shim.OrcShimV230.createReader(OrcShimV230.java:43)
at org.apache.flink.orc.shim.OrcShimV200.createRecordReader(OrcShimV200.java:98)
at org.apache.flink.orc.AbstractOrcFileInputFormat.createReader(AbstractOrcFileInputFormat.java:106)
at org.apache.flink.orc.AbstractOrcFileInputFormat.createReader(AbstractOrcFileInputFormat.java:52)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
for reference the parquet version:
Caused by: java.io.IOException: No FileSystem for scheme: s3a
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:469)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:119)
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:78)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
It looks like Orc (Parquet does the same thing) is not using the plugin to load the file system, but rather something else, so it fails.
If I remove the hadoop uber jar from the equation (which the docs never mention I need, in fact, over and over again, it’s never even included in examples with the configuration shows with s3-f3 (YARN, kubernetes, docker, plugins, you name it). I get this error:
java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.flink.orc.OrcFileFormatFactory$OrcBulkDecodingFormat.createRuntimeDecoder(OrcFileFormatFactory.java:146) at org.apache.flink.orc.OrcFileFormatFactory$OrcBulkDecodingFormat.createRuntimeDecoder(OrcFileFormatFactory.java:118) at org.apache.flink.connector.file.table.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:157) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:461) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:158) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:122) at
I’m just at a loss.Krish Narukulla
10/15/2022, 5:00 AMCREATE TEMPORARY TABLE table (
`x` BIGINT,
`y` STRING,
`z` STRING
) WITH (
'connector' = 'filesystem',
'path' = '<gs://xxx-yyy-dev-1/dev/krish>',
'format' = 'parquet'
)
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
Ambiguous factory classes are:
org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.loader.DelegateExecutorFactory
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553) ~[flink-table-api-java-uber-1.15.2.jar:1.15.2]
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105) ~[flink-table-api-java-uber-1.15.2.jar:1.15.2]
Avinash
10/15/2022, 6:30 AMflink-sql-connector-hive-2.3.6
as mentioned here with no success. I’m facing some issues and would appreciate it if someone can gimme some ideas.Alex Riedler
10/15/2022, 10:33 PMVamshi Gandrapu
10/16/2022, 2:57 AMStatefulFunctionEgressStreams egresses =
StatefulFunctionDataStreamBuilder.builder(Identifiers.LOGS_STREAMS)
.withDataStreamAsIngress(logsIngress)
.withFunctionProvider(Identifiers.LOGS_INGRESS, fn -> new LogsStatefulFn())
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder.requestReplyFunctionBuilder(Identifiers.LOGS_REMOTE_FN,
URI.create("<http://logs-svc:8000/statefun>"))
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500))
.withEgressId(Identifiers.LOGS_ENGRESS)
.withConfiguration(statefunConfig)
.build(env);
Python fn is setup with async RequestReply protocol with grpc/protobuf.
We are able to successfully invoke remote fn and send a packed org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
with message, however the pipeline is failing to parse the response sent by remote grpc python fn with an error below. I am expecting the response is inferred to a TypedValue. Not sure what is causing this issue.
Caused by: java.lang.NoSuchMethodError: 'com.google.protobuf.Parser org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.parser()' │
│ at org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClient.parseResponse(DefaultHttpRequestReplyClient.java:76)
Thien Nguyen
10/17/2022, 7:39 AMAlagappan Palanisamy
10/17/2022, 8:36 AMSlackbot
10/17/2022, 8:36 AMAlagappan Palanisamy
10/17/2022, 8:37 AMTiansu Yu
10/17/2022, 8:43 AMcom.amazonaws.handlers.*
in flink-s3-fs-hadoop-1.13.2-jar
shadows my own com.amazonaws.handlers
. Details in 🧵Ameenur Rahman
10/17/2022, 10:59 AMpackage org.apache.flink.streaming.examples.wordcount.util;
//import com.bidsopt.gpb.adresponse.AdResponse;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
//import com.bidsopt.gpb.*;
import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat;
/**
* Implements a word count which takes the input file and counts the number of occurrences of each
* word in the file and writes the result back to disk.
*
* <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to common
* Java types for better usage in a Flink job and how to use Hadoop Output Formats.
*/
@SuppressWarnings("serial")
public class FileWordCountHadoop {
public static void main(String[] args) throws Exception {
final String inputPath = "<hdfs://localhost:9000/notes>";
final String outputPath = "<hdfs://localhost:9000/notes_out>";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(),
LongWritable.class,
Text.class,
job);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataStream<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
text.print();
env.execute("Word Count");
}
}
as u can see from the above code i used env.createInput(hadoopInputFormat) but continuous file processing does not happen here for hadoop input format pls help with a solutionM Harsha
10/17/2022, 11:38 AM$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
$ kubectl create -f jobmanager-session-deployment-ha.yaml
$ kubectl create -f taskmanager-session-deployment.yaml
The taskmanager pods keep restarting with the following log
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_>.
Tried a curl from the taskmanager pod
flink@flink-taskmanager-76cb7df9d6-jl5dk:~$ curl flink-jobmanager:6123
curl: (52) Empty reply from server
Upon checking the logs at the job manager, found this:
2022-10-17 11:34:50,695 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<akka.tcp://flink@flink-jobmanager:6123/]]> arriving at [<akka.tcp://flink@flink-jobmanager:6123>] inbound addresses are [<akka.tcp://flink@172.17.0.3:6123>]
2022-10-17 11:35:03,582 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<akka.tcp://flink@flink-jobmanager:6123/]]> arriving at [<akka.tcp://flink@flink-jobmanager:6123>] inbound addresses are [<akka.tcp://flink@172.17.0.3:6123>]
Is any config missing?
This issue seems to be same as FLINK-24031
Any workaround for this issue?M Harsha
10/17/2022, 4:39 PMKevin Lam
10/17/2022, 5:16 PMAbel Lamjiri
10/17/2022, 6:08 PMThe application contains no execute() calls.
When we switch to use RocksDb as state backend, using following Flink Configs:
flinkConfiguration:
state.backend: rocksdb
state.backend.rocksdb.localdir: file:///data/rocksdb-hvf
state.backend.incremental: "true"
state.backend.rocksdb.timer-service.factory: heap
we get the above Exception
The following is part of volume mount and PVC:
spec:
serviceAccount: <%= var['serviceaccount'] %>
containers:
# Do not change the main container name
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.15.2.jar
volumeMounts:
- name: data
mountPath: /data
volumes:
- name: data
persistentVolumeClaim:
claimName: filesystem-pvc
Stack Trace:
2022-10-14T20:25:03.606223513Z Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
2022-10-14T20:25:03.606226983Z ... 14 more
2022-10-14T20:25:03.607635319Z 2022-10-14 20:25:03,607 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
2022-10-14T20:25:03.607650643Z java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
Abel Lamjiri
10/17/2022, 6:22 PM<s3p://pocstreamingstack-some-bluh-bluh-5kxj/checkpoint>
Or, do I need to specify region, and aws endpoint as well? With the above, I’m getting Forbidden (Service: Amazon S3; Status Code: 403
error.Steven Zhang
10/17/2022, 6:34 PMStatus:
Error: java.io.FileNotFoundException: /opt/flink/usrlib/sql-runner.jar (No such file or directory)
I kubectl exec
into the deployment's pod and see the jar file in the specified path, so I'm not sure what I'm missingYaroslav Bezruchenko
10/17/2022, 7:55 PMJin S
10/17/2022, 8:40 PMXi Cheng
10/18/2022, 1:30 AMGrouping Window
Trigger frequency: for the SESSION(time_attr, interval)
grouping window function https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation, it seems that the evaluation is only triggered upon window expiration, is that the case? it seems that https://issues.apache.org/jira/browse/FLINK-16868 custom triggers were not supportedSachin Saikrishna Manikandan
10/18/2022, 6:41 AMMaher Turifi
10/18/2022, 12:12 PMSELECT
o_amount, r_rate
FROM
Orders, LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
Assuming Rates is a temporal table function, which can only be created in SCALA or JAVA as ".createTemporalTableFunction()" is still not supported in Python API, then it will register the function in Table API.
I've created the Rates /the temporal table function/ in Scala and register it and now I'm able to use it in SQL(as in the above statement).
I did run all of the statements in Zeppelin Notebook where I can mix different APIs(where I'm using SQL to create the dynamic tables, Scala to create the temporal table function, and Python for creating the UDFs)
However my Question is: Can you give me any guidance on how to create this temporal table function (scala code) and use it in my pyflink application in kinesis. can I call the scala environment inside pyFlink application?
I will appreciate any guidance or links or instructions, many thanks.Sachin Saikrishna Manikandan
10/18/2022, 12:55 PMM Harsha
10/18/2022, 1:12 PMDenis Cosmin
10/18/2022, 1:22 PM--kafka_group_id test
to the job
job:
jarURI: local:///opt/flink/usrlib/my-flink-job.jar
state: running
parallelism: 2
upgradeMode: last-state
Kosta Sovaridis
10/18/2022, 2:18 PM// ...
configurations {
flinkShadowJar // dependencies which go into the shadowJar
// always exclude these (also from transitive dependencies) since they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'org.apache.logging.log4j'
}
dependencies {
implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
implementation "org.apache.flink:flink-clients:${flinkVersion}"
// allows using Flink's web UI when running in the IDE:
implementation "org.apache.flink:flink-runtime-web:${flinkVersion}"
implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
implementation group: 'org.postgresql', name: 'postgresql', version: '42.5.0'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"
// --------------------------------------------------------------
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --------------------------------------------------------------
flinkShadowJar "org.apache.flink:flink-connector-kafka:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-connector-jdbc:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-avro:${flinkVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-api:${log4jVersion}"
runtimeOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}"
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}
// ...
Dockerfile:
FROM gradle:7.4.2-jdk11 AS builder
COPY ./build.gradle /opt/build.gradle
COPY ./settings.gradle /opt/settings.gradle
COPY ./src /opt/src
RUN cd /opt; gradle installShadowDist;
FROM apache/flink:1.15.0-java11
RUN mkdir /opt/flink/usrlib
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.0/flink-sql-connector-kafka-1.15.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.15.0/flink-avro-1.15.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.15.0/flink-avro-confluent-registry-1.15.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/avro/avro/1.10.0/avro-1.10.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://jdbc.postgresql.org/download/postgresql-42.5.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.11.0/jackson-databind-2.11.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.0/jackson-core-2.11.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.11.0/jackson-annotations-2.11.0.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/6.2.2/kafka-schema-registry-client-6.2.2.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar> /opt/flink/lib/
ADD --chown=flink:flink <https://packages.confluent.io/maven/org/apache/kafka/kafka-clients/6.2.2-ccs/kafka-clients-6.2.2-ccs.jar> /opt/flink/lib/
COPY --from=builder /opt/build/libs/flink*.jar /opt/flink/usrlib/spend-report.jar
Error:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
ClassLoader info: URL ClassLoader:
file: 'usrlib/spend-report.jar' (missing)
Class not resolvable through given classloader.
I am really not sure what I am doing wrong, also I am having issues understanding why I need to declare the dependency in gradle as well as downloading it in the container.Tim Bauer
10/18/2022, 3:43 PMrowtime - INTERVAL '10' SECOND
. My understanding is that each window should be triggered 10 seconds after it ends. When I start my application connected to a local kafka with a dummy event producer triggered every 10s, nothing happens, not after 1:10 and not later either. 😖