Richard Noble
04/05/2023, 11:22 AMpiby 180
04/05/2023, 11:49 AMorg.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
Could someone provide me a hint?
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: beamapp
spec:
image: beamapp:latest
imagePullPolicy: Always
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/main.py"]
parallelism: 1
upgradeMode: stateless
main.py
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
input_file = '/opt/flink/usrlib/kinglear.txt'
output_path = '/opt/flink/usrlib/output.txt'
pipeline_options = PipelineOptions(
runner='FlinkRunner',
project='my-project-id',
job_name='unique-job-name',
)
def main():
with beam.Pipeline(options=pipeline_options) as p:
output = (p | 'Read lines' >> beam.io.ReadFromText(input_file)
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'Combine per element' >> beam.combiners.Count.PerElement()
| 'Convert to string' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
| 'Write output to file' >> beam.io.WriteToText(output_path))
main()
Dockerfile
FROM flink:1.15
# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source, \
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
wget <https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz> && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Install dependencies
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
# add python script
USER flink
RUN mkdir /opt/flink/usrlib
RUN mkdir /opt/flink/usrlib/src
COPY . /opt/flink/usrlib/
requirements.txt
apache-flink==1.15
apache_beam==2.27.0
Raghunadh Nittala
04/05/2023, 2:02 PMCREATE TABLE data_to_sink (
record_id STRING NOT NULL,
request_id STRING NOT NULL,
source_name STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
``date` STRING,`
results_count BIGINT
`) PARTITIONED BY (record_id, source_name, date
) WITH (`
'connector' = 'filesystem',
'path' = '<S3 path>',
'format' = 'parquet'
);
INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
I can see the parquet files being created, but when I verified the schema using parquet-cli tool, the schema doesn’t show record_id, source_name, date
fields. I verified the doc, but didn’t find any setting for this. Is this expected?Mehul Batra
04/05/2023, 4:06 PMDominik Prester
04/05/2023, 6:47 PM2023-04-05T11:40:00Z
) stored as a STRING column and I tried casting it to TIMESTAMP type with:
• TO_TIMESTAMP(<col_name>, '%Y-%m-%dT%H:%M:%SZ')
• TO_TIMESTAMP(<col_name>, 'yyyy-MM-ddTHH:mm:ssZ')
but both result in NULL values. What am I missing?Ivan Webber
04/05/2023, 9:06 PMorg.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
The full stack trace (see my reply) makes me wonder if it is trying to allocate a local temporary file and failing. I am using the flink-operator and running on an AKS cluster.craig fan
04/05/2023, 9:11 PMHerat Acharya
04/06/2023, 12:22 AMkingsathurthi
04/06/2023, 6:14 AMAri Huttunen
04/06/2023, 7:47 AMOutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
but I have written the file sink using SQL syntax. How can I make this generate files with suffix .parquet
?
CREATE TABLE save_polystar_aggregates_core (
...
)
PARTITIONED BY (EVENT_DAY, EVENT_HOUR)
WITH (
'connector' = 'filesystem',
'path' = 's3://.../core',
'format' = 'parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.trigger'='partition-time'
);
Also, can I change the directory names from
path
└── EVENT_DAY=2019-08-25
syntax to
path
└── 2019-08-25
?Slackbot
04/06/2023, 10:34 AMTsering
04/06/2023, 1:01 PMkey group from 0 to 6 does not contain 120.........
is there any one who know what exactly is happening ?
Thank you in advance 🙏.Krzysztof Chmielewski
04/06/2023, 2:56 PMFelix Terkhorn
04/06/2023, 3:05 PMTimer
capabilities in such a way: (if our project is successful) we’ll have potentially billions of individual states that may need to use processElement
/ onTimer
to schedule events into the future. Potentially the far future — like, a year into the future. Is Flink considered reliable for this type of long-term, delayed processing? Running a proof of concept with delays of a few seconds was super easy and successful, and on my local machine I pushed a ton of events through.
But testing these sorts of very long term delays isn’t feasible, so we’re really interested to hear feedback from the community about when (in time), and if, such a Timer
usage would begin to break down. How does Flink perform when scheduling things over multiple days & months?
Are there additional best practices and considerations that we should adhere to if we’re using Timer
s that are dated for execution far into the future?
Are there any concerns around prioritization of the `Timer`s firing at their due date, in the case where individual jobs experience high levels of traffic & contention?
Thanks very much for taking a look at my question!Elizaveta Batanina
04/06/2023, 3:34 PMcolumn_by_expression
:
org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
Have anyone faced this problem? I will add more details in thread
Thanks!Nancy Yang
04/06/2023, 3:47 PMCaused by: java.io.IOException: Failed to create directory for shared state: file:/flink-data/checkpoints/0fd46de24a2483b788f329c179239967/shared
If I access the pod k3d-k3s-default-server-0
, I do can see the "/tmp/flink" created. Have anybody met the same issue when deploying to k3d/k3s cluster running in docker? Thanks!Adam Augusta
04/06/2023, 7:20 PMJalil Alchy
04/06/2023, 8:49 PMSink: Committer (1/1)#0 (0c5abffd2f065e1edef3036b20ec42a5) switched from RUNNING to FAILED with failure cause: java.nio.file.AccessDeniedException: <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: initiate MultiPartUpload on <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: KC4SFHVXBGJC6XFW; S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=; Proxy: null), S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=:AccessDenied
My sink is configured like so:
return DeltaSink.forRowData(
new Path("s3a://<bucket/<path>"),
new Configuration() {
{
set(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
}
},
FULL_SCHEMA_ROW_TYPE)
.withMergeSchema(true)
.build();
Writes to my local work, but writes to S3 are failing. The credentials on the machine are admin for the account. Any thoughts?Dmitry Koudryavtsev
04/07/2023, 9:32 AMjava.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
... 4 more
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (org.apache.flink.core.fs.local.LocalDataInputStream); line: 1, column: 0]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
... 6 more
We removed empty ..._DIRTY.json
file from /flink/recovery/job-result-store/<our cluster ID>/
dir (local filesystem) and flink successfully starts.
Flink: 1.15.3
Is it possible to validate dirty job results file ..._DIRTY.json
file length/content before opening and/or overwrite it automatically if file is broken ?Max Dubinin
04/07/2023, 10:59 AMnick christidis
04/07/2023, 2:20 PMvoid kafkaConsumeLogic() {
kafkaSource.run();
boolean kafkaSourcePaused = false;
boolean okState;
while (true) {
okState = areWeInDesiredState();
if (!okState && !kafkaSourcePaused) {
kafkaSource.pause();
kafkaSourcePaused = true;
}
if (okState && kafkaSourcePaused) {
kafkaSource.resume();
kafkaSourcePaused = false;
}
paceWait();
}
}
So, we have achieved the above by leveraging HybridSource (https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/)
and using the first registered source as a predicate-check Bounded source
which:
• it will check for that areWeInDesiredState() predicate/condition, and if all ok....
• ...then in the SourceReader it provides, in the pollNext method, will return -> InputStatus.END_OF_INPUT, so the next registered source will take over which is the KafkaSource.
So the above pseudo code has been implemented like the following:
class PredicateSourceReader<T> implements SourceReader<T, MySourceSplit> {
....
@Override
public InputStatus pollNext(ReaderOutput<T> output) {
if (areWeInDesiredState()) {
return InputStatus.END_OF_INPUT;
}
return InputStatus.MORE_AVAILABLE;
}
.....
}
and the wiring:
PredicateSourceReader<SomeType> sourceActivator = new PredicateSourceReader<>(....);
Source<SomeType> initialSource = new Source<>(sourceActivator);
KafkaSource<SomeType> kafkaSource = buildKafkaSource(....);
HybridSource<SomeType> hybridsource = HybridSource
.builder(initialSource)
.addSource(context -> kafkaSource, Boundedness.CONTINUOUS_UNBOUNDED)
.build();
----------------------------------------------------
Problem
Unfortunately the above is not the most elegant + correct usage of HybridSource in my eyes, but the most important is that it does not work in the following case - which where I need some opinions or if I miss something:
So, based on the above description if we satisfy the condition/predicate, we switch to kafka source, but there is another business need we need to satisfy,
where for example when we identify another bad condition/predicate we fail on purpose with exception,
and due to restart strategy (exponential) and failover strategy (region) this hybridSource gets restarted...which means due to re-init again,
it should go again to first initialSource and kafkaSource should not run, but this is not the case, as we see kafka consumption still taking place.
I have some ideas on my mind, but first I want to be sure, why after region restart, the kafkaSource is still open and not closed.
Also I tried to extend KafkaSourceReader and in the pollNext method to wire there the logic, but KafkaSource is very closed to extend and only accessible through KafkaSourceBuilder which from framework user point of view does not give a lot of options.Adesh Dsilva
04/07/2023, 5:27 PMtableEnv.createTemporaryTable("myTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "path-to-file.orc")
.format("orc")
.build());
Exception I get:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024
at org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:255)
at org.apache.orc.impl.TreeReaderFactory$DoubleTreeReader.nextVector(TreeReaderFactory.java:762)
at org.apache.orc.impl.ConvertTreeReaderFactory$DecimalFromDoubleTreeReader.nextVector(ConvertTreeReaderFactory.java:1297)
Adam Augusta
04/07/2023, 7:56 PMDuc Anh Khu
04/07/2023, 10:19 PMKeyedProcessFunction
in unit tests. When unit tests the function on its own (from_collection
source and test sink), everything works fine. However, when it gets mixed into a bigger program, only process_element
is being called and on_timer
is not being called at all. The sources and sink in these 2 cases are exactly the same (from_collection
and test sink). What are the possible causes of on_timer
not being called?
Update: In a bigger program with 2 KeyedProcessFunction
, Only the 1st function on_timer
is called. The 2nd function is not.Alex Brekken
04/08/2023, 1:56 PMjarURI
.
So first question, any ideas where I should be looking for my jar file? And second, is this the right way to handle deployments? I’ll be deploying several jobs eventually, and assumed I would want to separate the FlinkDeployment from the actual jobs. (but maybe that’s not the best way?)Maksim Aniskov
04/08/2023, 7:19 PMTsering
04/09/2023, 12:55 PMjava.lang.IllegalArgumentException: key group from 0 to 6 does not contain 120
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
please let me know if you have any idea what’s going on here? I have been stuck here for a few days but literally found no clue.Guruguha Marur Sreenivasa
04/09/2023, 7:10 PM2023-04-09 19:08:29,483 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] - Failed to transfer file from TaskExecutor 100.80.157.200:6122-0224ee.
I'm using Flink 1.14.3. Any idea why this happens?piby 180
04/09/2023, 10:58 PMjar_list = """
file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
t_env.get_config().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
t_env.get_config().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
I have also read somwhere that flink-s3-fs-hadoop jar needs to be placed in plugins dir in a separate folder and not in lib dir. So I have also tried setting up plugins dir but it didn't work
os.environ["FLINK_PLUGINS_DIR"] = "/home/ubuntu/environment/flink/plugins"
Here is the error I get
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
Junqi Xie
04/10/2023, 2:48 AM