Jonathan Rozmarin
07/06/2023, 11:35 AMRashmin Patel
07/06/2023, 12:20 PMLeong Wai Leong
07/06/2023, 1:52 PMNishanth S
07/06/2023, 3:02 PMorg.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
After this the job restarts, to recover from this exception. Is this expected, as I am able to reproduce this issue during every update to the job. Appreciate any pointers on this. Attached taskmanager and jobmanager error logs. Please let me know if any other logs are required.Adam Richardson
07/06/2023, 6:14 PMDaiyan Chowdhury
07/06/2023, 7:31 PMSai Sharath Dandi
07/07/2023, 12:12 AMAmenreet Singh Sodhi
07/07/2023, 6:52 AM2023-07-06 16:46:11,428 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[event_executor-1.1.20.jar:?]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[event_executor-1.1.20.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
Once we reinstall/helm upgrade then this exception goes away. How can this be resolved, any additional configuration required to resolve this?Иван Борисов
07/07/2023, 7:15 AMYang Li
07/07/2023, 10:07 AMcom.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey;
Has anyone experienced something like this before and has a insight about root of cause of this? 🙏Leong Wai Leong
07/07/2023, 10:38 AMVivien Budavölgyi
07/07/2023, 1:29 PMsophia wu
07/07/2023, 11:17 PMenv.setRuntimeMode(RuntimeExecutionMode.BATCH);
However, I encountered the following error when I run in Kineses Data Analytics:
java.lang.UnsupportedOperationException at <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)
My whole code logic
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<OutputType> result = text
.map(/* map logic here */ )
.keyBy(/* keyby logic here */)
.reduce(/* reduce logic here */)
result.writeAsText("filePath")
Can I know why the error happen? which part has "`UnsupportedOperationException` "? Thanks in advance!❤️Marco Villalobos
07/08/2023, 2:30 AM0.my_sink_name_my_group_id__Committer.numRecordsIn = 343
0.my_sink_name_my_group_id__Writer.numRecordsIn = 0
The writer metric is always zero. Consequently, that topic never receives any messages.
In my functional Docker Compose deployment where Kafka messages are received the Writer
metric is always positive non-zero
.
Does anybody understand what Committer
and Writer
do, and what I should consider to resolve this issue?
This job uses three kafka topics. It uses AT_LEAST_ONCE delivery guarantee for Kafka, and EXACTLY_ONCE checkpointing.
1. kafka source
2. kafka sink for telegraf
3. kafka sink to statefun (but writing to that topic fails as described above).
This particular Kafka cluster is running with only two bootstrap servers (in case that makes a difference).Or Keren
07/09/2023, 2:14 PMRajat Ahuja
07/10/2023, 3:30 AMselect 1;WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/rxahuja/flink-kubernetes-operator/examples/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length] [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length] Since error suggests that i shall set the property rest.client.max-content-length. hence i updated my server and clients with rest.client.max-content-length and rest.server.max-conent-length but still getting this issue. i am not able to figure out how to get pass this issue. Thanks
Eugenio Gastelum
07/10/2023, 6:08 AMflink_simple_functional.py
This one does not uses the UDF and just does a very simple transformation like multiplying an integer column by 2 and sinking it to the console. This works fine
2. flink_simple.py
This other one is a copy of the above mentioned code, but besides doing a multiplication by 2, it also derives another column more by using a UDF. That UDF just converts a string column to lowercase
However, the first code works (the one that does not uses the UDF), and the second does not, despite it's a very simple UDF.
Can someone spot what might be the mistake? Or if UDF are not supported anymore for AWS Kinesis? I am basically reproducing the AWS repo for flink-kinesis.
Here I attach both of my flink files, as well as a third file which is the one I use to populate my Kinesis input_stream
from which both the flink files fetch as source. Just in case it might help to take a look at how that stream looks like. It only has one shard and I confirm it has dataCheguri Vinay Goud
07/10/2023, 11:43 AMMikhail Spirin
07/10/2023, 12:42 PMJagan Nalla
07/10/2023, 2:21 PM@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), <http://DataTypes.INT|DataTypes.INT>()],
result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())
)
def p_record(first_seen,last_seen,src,rec_count):
record_map ={
"first_seen": first_seen,
"last_seen": last_seen,
"srcid": src,
"rec_count": rec_count
}
return record_map
query ="""select pdns_record(first_seen,last_seen,srcid,rec_count) from src_table"""
ftable = st_env.sql_query(query)
ftable.execute().print()
Error:
bytes_value = value.encode("utf-8")
AttributeError: 'int' object has no attribute 'encode'
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more
saqlain pasha
07/10/2023, 4:41 PMEugenio Gastelum
07/10/2023, 8:05 PMRamya Edpuganti
07/10/2023, 9:28 PMPappu Yadav
07/11/2023, 10:10 AMApplicationConfig config = context.getApplicationConfig();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set run time env to batch
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// env.setParallelism(10);
env.getConfig().setGlobalJobParameters(context.getParameterTool());
LocalDate date = startDate;
while (endDate.compareTo(date) >= 0) {
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
for (String eventName : EVENT_TYPES_LIST) {
// Map and Kafka Sink
processData(tableEnv, date, context, eventName, env);
}
date = date.plusDays(1);
}
env.execute(config.getServiceIdentifier() + "{" + startDate + " to " + endDate + "}");
Attaching screenshot of Execution Graph for reference, which shows it reading the data at once from s3Pappu Yadav
07/11/2023, 10:20 AMamarjeet pasrija
07/11/2023, 11:07 AMs_env = StreamExecutionEnvironment.get_execution_environment()
conf.set_string('checkpointing.mode', "EXACTLY_ONCE")
conf.set_string('state.backend', "filesystem")
conf.set_string('state.checkpoints.dir', checkpoint_location)
environment_settings = EnvironmentSettings.new_instance()\
.in_streaming_mode() \
.with_configuration(conf) \
.build()
t_env = TableEnvironment.create(environment_settings)
Even Tried.
t_env.get_config().set('checkpointing.mode', "EXACTLY_ONCE")
t_env.get_config().set('state.backend', "filesystem")
t_env.get_config().set('state.checkpoints.dir', checkpoint_location)
nitin
07/11/2023, 11:40 AMnitin
07/11/2023, 11:41 AMAndré Midea Jasiskis
07/11/2023, 1:00 PM// Caused by: org.apache.flink.table.api.ValidationException: Invalid primary key
// 'PK_data.abc_event__id'. Column 'data.abc_event__id' does not
// exist.
The code snippet for the table declaration is the following:
private static final DataType abcTransferOutRequestRow =
DataTypes.ROW(
DataTypes.FIELD("abc_event__id", DataTypes.STRING()),
DataTypes.FIELD("abc_event__cancelled_at", DataTypes.TIMESTAMP_LTZ()),
DataTypes.FIELD("abc_event__created_at", DataTypes.TIMESTAMP_LTZ()),
DataTypes.FIELD("abc_event__e_2e_id", DataTypes.STRING()),
DataTypes.FIELD("abc_event__failed_at", DataTypes.TIMESTAMP_LTZ()),
DataTypes.FIELD("abc_event__initiation_type", DataTypes.STRING()),
DataTypes.FIELD("abc_event__message", DataTypes.STRING()),
DataTypes.FIELD("abc_event__transfer_in_id_to_refund", DataTypes.STRING()));
public static final Schema schema =
Schema.newBuilder()
.column("meta", EntitySnapshotPayload.metadataType)
.column("data", abcTransferOutRequestRow)
.primaryKey("data.abc_event__id")
.watermark("db__tx_instant", "db__tx_instant")
.build();
I tried to work around that by using a columnByExpression but I get an error related to the column not being physical.
Any ideas for work arounds or is that just a limitation of the table api?Daiyan Chowdhury
07/11/2023, 8:19 PM