Ans Fida
02/07/2023, 7:48 PMEugene Kozlov
02/07/2023, 8:22 PMJason Politis
02/07/2023, 8:29 PMMatyas Orhidi
02/07/2023, 9:43 PMJorge Iván Tordecilla Ruíz - Ceiba Software
02/07/2023, 9:52 PMAmir Hossein Sharifzadeh
02/07/2023, 11:08 PMwith table_result.collect() as results:
for result in results:
print(result)
but this code does not work. How can I iterate over table_result and extract all columns? Thank you very much for your assistance. Best. AmirAmir Hossein Sharifzadeh
02/08/2023, 2:15 AMKyle Ahn
02/08/2023, 2:38 AMMessage: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.extensions "eventlogger-pipeline-v0" is forbidden: User "system:serviceaccount:eventlogger-pipeline-staging:flink" cannot get resource "deployments" in API group "extensions" in the namespace "eventlogger-pipeline-staging"
[Context]
Two flinkdeployments, eventlogger-pipeline-v0
and eventlogger-pipeline-v0-1
attempt to be deployed, but one fails with this exception, and the other goes through.
More logs in the thread ->Tony Yeung
02/08/2023, 2:39 AMkingsathurthi
02/08/2023, 5:37 AMSudhan Madhavan
02/08/2023, 6:25 AMYang LI
02/08/2023, 10:31 AMenv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Yang LI
02/08/2023, 10:53 AMYang LI
02/08/2023, 12:35 PMShamit Jain
02/08/2023, 4:58 PM2023-01-06T21:10:11.885Z {"applicationARN":"arn:aws:kinesisanalytics:us-east-1:542230711021:application/laap-kda-ue1-con-sr-sr-per-dlvry-metrc-imprsn-prod","applicationVersionId":"15","locationInformation":"org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.handleException(PartitionRequestQueue.java:287)","logger":"org.apache.flink.runtime.io.network.netty.PartitionRequestQueue","message":"Encountered error while consuming partitions","messageSchemaVersion":"1","messageType":"ERROR","threadName":"Flink Netty Server (6121) Thread 0","throwableInformation":"org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n"}
Drake Esdon
02/08/2023, 7:58 PMAndrei Leibovski
02/08/2023, 9:05 PMhigh-availability.storageDir
? I can't find it in the docs anywhere.Reme Ajayi
02/08/2023, 9:53 PMflink-conf.yaml.
However I am still unable to write files to S3. Error below:
Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
What am I missing?Andrew Otto
02/08/2023, 10:17 PMyield output_tag, value
. But, when I do that, I’m just getting a Tuple(output_tag, value) in the main datastream, not the side output one. What am I doing wrong?Amir Hossein Sharifzadeh
02/09/2023, 2:00 AMKrish Narukulla
02/09/2023, 2:58 AMWARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.16.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Suparn Lele
02/09/2023, 4:35 AMWei Qin Pan, Max
02/09/2023, 6:19 AM-- when i execute this sql, i can get the correct result
SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM source;
-- when i execute this sql, i get the error result(return blank)
SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM source;
maybe this is a bug ?xiaohe lan
02/09/2023, 6:45 AMTuple2<String, String>
, I want to group by the first field and sum the integer in the second field. This is my `ProcessFunction`:
public static class MyKeyedProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Integer.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
Integer sum = state.value();
if (sum == null) {
sum = 0;
}
sum += value.f1;
state.update(sum);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(Tuple2.of(ctx.getCurrentKey(), state.value()));
state.clear();
}
}
Now the onTimer
is called for every element. I specified the input as:
aaa,50
aaa,40
aaa,10
I see the output like:
(aaa,100)
(aaa, null)
(aaa, null)
How can I get the output as (aaa,100)
?Amenreet Singh Sodhi
02/09/2023, 6:56 AMEugenio Marotti
02/09/2023, 7:19 AME1("A",1,...) -> E2("B",1,...) -> E3("C",1,...)
When I receive event "A" I want to start a timer (keyed by the source id) and update a sink with the timer value periodically. When I receive event "C" I want to stop the timer and update the sink with the final timer value. Is there a way to accomplish that in Apache Flink?Marouane Souadi
02/09/2023, 10:15 AMNitin Agrawal
02/09/2023, 3:05 PMsequence_number
the sequence_number should be increasing in nature starting from 1 and keep increasing over lifetime skip of sequence is okay .. In the Database world we generally have this sequence called as AUTO_INCREMENT
. Is there a way in flink to achieve the same.Siddhesh Kalgaonkar
02/09/2023, 6:18 PMErwin Cabral
02/09/2023, 7:17 PMstate.checkpoints.num-retained: "5"
hoping that the size won't grow any more beyond a certain size but I am still seeing a increasing trend when I view metrics from grafana. Is there a way to cleanup the checkpoint files which are no longer relevant for checkpoint recovery?