Martin Bomio
09/08/2022, 4:30 PMEvent-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found
When I change the table to use processing time, the join works without issues.
In our case, the lookup is not dependent on timechunilal kukreja
09/09/2022, 1:08 PMDataStream<EventDataMapping> dataMappingDataStream = inputDataStream
.process(new DeriveKeyProcessFunction())
.keyBy(new KeySelector<EventDataMapping, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(EventDataMapping eventDataMapping) throws Exception {
return Tuple2.of(eventDataMapping.getTenancyId(), eventDataMapping.getEventExecutionType());
}
})
.countWindow(100)
.process(new EventProcessWindow()) -> i want to call async i/o in the windowed process function. how to do that?
.returns(EventDataMapping.class);
Is this achievable?
Thanks in advance !Jirawech Siwawut
09/09/2022, 3:17 PMtableEnv.createTemporaryView("tempTable", tempTable)
tableEnv.executeSql("INSERT INTO Kafkatable SELECT * FROM tempTable)
tableEnv.executeSql("INSERT INTO hiveTable SELECT * FROM tempTable)
Satya
09/09/2022, 4:57 PMStreamFormat
or SimpleStreamFormat
by extending it like below:
object AvroStreamFormat extends SimpleStreamFormat[GenericRecord] {
override def createReader(config: Configuration, stream: FSDataInputStream): StreamFormat.Reader[GenericRecord] = ???
override def getProducedType: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])
}
I am struggling to create a createReader
for avro file. Any pointer would be great to move forwardChris F
09/09/2022, 10:02 PMjava.lang.IllegalStateException: unable to find a router for ingress IngressIdentifier(showcase, names, interface com.google.protobuf.Message)
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.loadRoutersAttachedToIngress(IngressRouterOperator.java:96)
at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.open(IngressRouterOperator.java:78)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Gaurav Miglani
09/09/2022, 11:44 PMKrish Narukulla
09/10/2022, 3:58 AMorg.apache.flink.table.factories.DeserializationFormatFactory
? I am running into below Validation error
try {
final DynamicTableSourceFactory factory =
preferredFactory != null
? preferredFactory
: discoverTableFactory(DynamicTableSourceFactory.class, context);
return factory.createDynamicTableSource(context);
} catch (Throwable t) {
throw new ValidationException(
String.format(
"Unable to create a source for reading table '%s'.\n\n"
+ "Table options are:\n\n"
+ "%s",
Jaromir Hamala
09/10/2022, 2:28 PMflink-hadoop-compatibility_2.12-1.15.2.jar hadoop-common-3.3.4.jar. hadoop-shaded-guava-1.1.1.jar stax2-api-4.2.1.jar
flink-sql-parquet-1.15.2.jar hadoop-client-3.3.4.jar hadoop-mapreduce-client-core-3.3.4.jar woodstox-core-5.3.0.jar
it works for my Hello World, but that means little. Is there any guide describing what JARs are needed for the connector to work reliably?
This page lists just a single dependency https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/parquet/ but obviously it’s missing the whole hadoop thingie flink-sql-parquet.jar depends on.Henrik Feldt
09/11/2022, 4:02 PMSumit Nekar
09/12/2022, 7:03 AMJirawech Siwawut
09/12/2022, 3:33 PMMathi
09/13/2022, 11:41 AMHenrik Feldt
09/13/2022, 6:48 PMHenrik Feldt
09/13/2022, 6:54 PMHenrik Feldt
09/13/2022, 8:39 PMsap1ens
09/13/2022, 9:02 PMjarURI
parameter, which is passed to the Flink job as pipeline.jars
. However, if you place your jar in the /opt/flink/lib
location, it’ll be loaded twice: once by the system classloader and another time by the user classloader… Which leads to all kinds of exceptions. Is there any way to prevent the operator from passing pipeline.jars
?Sylvia Lin
09/14/2022, 1:24 AMCaused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:183)
at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:163)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 33 more
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.
below is our kafka sink configuration, any suggestion?
kafkaSinkProperties.get.setProperty(
"client.id",
kafkaSinkProperties.get.getProperty("client.id") + JOB_NAME
)
kafkaSinkProperties.get.setProperty(
"<http://request.timeout.ms|request.timeout.ms>",
"60000"
)
kafkaSinkProperties.get.setProperty(
"<http://linger.ms|linger.ms>",
"10"
)
kafkaSinkProperties.get.setProperty(
"batch.size",
"32768"
)
val kafkaSink = KafkaSink.builder()
.setBootstrapServers(kafkaSinkProperties.get.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(kafkaSinkProperties.get)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(TXN_PREFIX)
.setRecordSerializer(new CustomTopicSchema)
.build
Zhihao Chen
09/14/2022, 5:24 AM${cluster}-00000000000000000000000000000000-config-map
is being appended in the unbound way.
Inside the ConfigMap I can see some metadata from every checkpoint is being added to it. e.g.
...
checkpointID-0000000000000000069: rO0ABXNyAGNvcmcuYXBhY2hlLmZsaW5rLmt1YmVybmV0ZXMuaGlnaGF2YWlsYWJpbGl0eS5LdWJlcm5ldGVzU3RhdGVIYW5kbGVTdG9yZSRTdGF0ZUhhbmRsZVdpdGhEZWxldGVNYXJrZXKfgTvzmWVDfwIAAloAEW1hcmtlZEZvckRlbGV0aW9uTAAFaW5uZXJ0ADdMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1JldHJpZXZhYmxlU3RhdGVIYW5kbGU7eHAAc3IAO29yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5SZXRyaWV2YWJsZVN0cmVhbVN0YXRlSGFuZGxlAAEeGPFVlysCAAFMABh3cmFwcGVkU3RyZWFtU3RhdGVIYW5kbGV0ADJMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1N0cmVhbVN0YXRlSGFuZGxlO3hwc3IAOW9yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5maWxlc3lzdGVtLkZpbGVTdGF0ZUhhbmRsZQTcddhivRuzAgACSgAJc3RhdGVTaXplTAAIZmlsZVBhdGh0AB9Mb3JnL2FwYWNoZS9mbGluay9jb3JlL2ZzL1BhdGg7eHAAAAAAAADLhXNyAB1vcmcuYXBhY2hlLmZsaW5rLmNvcmUuZnMuUGF0aAAAAAAAAAABAgABTAADdXJpdAAOTGphdmEvbmV0L1VSSTt4cHNyAAxqYXZhLm5ldC5VUkmsAXguQ55JqwMAAUwABnN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdABxczM6Ly9ldXJla2EtZmxpbmstZGF0YS1pbnRlcm5hbC1kZXYvemhpaGFvLXRlc3Q2LWluc3RhbmNlaWQvaGEvc3RhbmRhbG9uZS9kZWZhdWx0L2NvbXBsZXRlZENoZWNrcG9pbnQ1ZjFhMDllMzA4YmV4
checkpointID-0000000000000000070: rO0ABXNyAGNvcmcuYXBhY2hlLmZsaW5rLmt1YmVybmV0ZXMuaGlnaGF2YWlsYWJpbGl0eS5LdWJlcm5ldGVzU3RhdGVIYW5kbGVTdG9yZSRTdGF0ZUhhbmRsZVdpdGhEZWxldGVNYXJrZXKfgTvzmWVDfwIAAloAEW1hcmtlZEZvckRlbGV0aW9uTAAFaW5uZXJ0ADdMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1JldHJpZXZhYmxlU3RhdGVIYW5kbGU7eHAAc3IAO29yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5SZXRyaWV2YWJsZVN0cmVhbVN0YXRlSGFuZGxlAAEeGPFVlysCAAFMABh3cmFwcGVkU3RyZWFtU3RhdGVIYW5kbGV0ADJMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1N0cmVhbVN0YXRlSGFuZGxlO3hwc3IAOW9yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5maWxlc3lzdGVtLkZpbGVTdGF0ZUhhbmRsZQTcddhivRuzAgACSgAJc3RhdGVTaXplTAAIZmlsZVBhdGh0AB9Mb3JnL2FwYWNoZS9mbGluay9jb3JlL2ZzL1BhdGg7eHAAAAAAAADLhXNyAB1vcmcuYXBhY2hlLmZsaW5rLmNvcmUuZnMuUGF0aAAAAAAAAAABAgABTAADdXJpdAAOTGphdmEvbmV0L1VSSTt4cHNyAAxqYXZhLm5ldC5VUkmsAXguQ55JqwMAAUwABnN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdABxczM6Ly9ldXJla2EtZmxpbmstZGF0YS1pbnRlcm5hbC1kZXYvemhpaGFvLXRlc3Q2LWluc3RhbmNlaWQvaGEvc3RhbmRhbG9uZS9kZWZhdWx0L2NvbXBsZXRlZENoZWNrcG9pbnQ0ZDQ2MTAwYTk2MGN4
checkpointID-0000000000000000071: ...
I feel this is unexpected as CM has the size limitation of 1MB. But not sure what I missed. Could this is due to some configuration I’ve missed? Thanks!Kwangin Jung
09/14/2022, 6:35 AMBilna P
09/14/2022, 7:17 AMMartin Egri
09/14/2022, 7:47 AMFileSource
(so the DataStream API). The [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/filesystem/#file-source) mentions Avro several times and lets me know I should use a format but from what I can see there is nothing prepackaged I can use here?Ivan M
09/14/2022, 8:03 AM2022-09-11 05:41:12 Caused by: java.io.IOException: Could not flush and close the file system output stream to gs://[GS path]/flink-jobs/namespaces/default/jobs/a25e17dc-bcaa-4ca4-895c-560c6b411ee1/checkpoints/a25e17dcbcaa4ca4895c560c6b411ee1/chk-7115/_metadata in order to obtain the stream state handle
2022-09-11 05:41:12 at java.lang.Thread.run(Unknown Source) [?:?]
2022-09-11 05:41:12 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
2022-09-11 05:41:12 at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
2022-09-11 05:41:12 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 7115. Failure reason: Failure to finalize checkpoint.
2022-09-11 05:41:12 2022-09-11 03:41:12,660 WARN org.apache.flink.runtime.jobmaster.JobMaster [] - Error while processing AcknowledgeCheckpoint message
2022-09-11 05:41:12 ... 7 more
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:130) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.MetadataOutputStreamWrapper.closeForCommit(MetadataOutputStreamWrapper.java:63) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.RecoverableStreamWrapper.closeForCommitAction(RecoverableStreamWrapper.java:46) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeForCommit(GSRecoverableFsDataOutputStream.java:203) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.close(GSRecoverableFsDataOutputStream.java:196) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.BaseWriteChannel.close(BaseWriteChannel.java:151) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.flushBuffer(BlobWriteChannel.java:183) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.RetryHelper.run(RetryHelper.java:76) ~[?:?]
2022-09-11 05:41:12 at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) ~[?:?]
2022-09-11 05:41:12 at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:874) ~[?:?]
2022-09-11 05:41:12 at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012) ~[?:?]
2022-09-11 05:41:12 at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84) ~[?:?]
2022-09-11 05:41:12 at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:152) ~[?:?]
2022-09-11 05:41:12 at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36) ~[?:?]
2022-09-11 05:41:12 at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at java.net.HttpURLConnection.getResponseCode(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at sun.net.www.http.HttpClient.parseHTTP(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at sun.net.www.http.HttpClient.parseHTTPHeader(Unknown Source) ~[?:?]
2022-09-11 05:41:12 Caused by: java.net.SocketException: Unexpected end of file from server
2022-09-11 05:41:12 ... 7 more
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:130) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.MetadataOutputStreamWrapper.closeForCommit(MetadataOutputStreamWrapper.java:63) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.RecoverableStreamWrapper.closeForCommitAction(RecoverableStreamWrapper.java:46) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeForCommit(GSRecoverableFsDataOutputStream.java:203) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.close(GSRecoverableFsDataOutputStream.java:196) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99) ~[?:?]
2022-09-11 05:41:12 at org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.BaseWriteChannel.close(BaseWriteChannel.java:151) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.flushBuffer(BlobWriteChannel.java:183) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.RetryHelper.run(RetryHelper.java:76) ~[?:?]
2022-09-11 05:41:12 at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) ~[?:?]
2022-09-11 05:41:12 at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:901) ~[?:?]
2022-09-11 05:41:12 at com.google.cloud.storage.spi.v1.HttpStorageRpc.translate(HttpStorageRpc.java:233) ~[?:?]
2022-09-11 05:41:12 Caused by: com.google.cloud.storage.StorageException: Unexpected end of file from server
2022-09-11 05:41:12 ... 7 more
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:149) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 Caused by: java.io.IOException: Could not flush and close the file system output stream to gs://[GS path]/flink-jobs/namespaces/default/jobs/a25e17dc-bcaa-4ca4-895c-560c6b411ee1/checkpoints/a25e17dcbcaa4ca4895c560c6b411ee1/chk-7115/_metadata in order to obtain the stream state handle
2022-09-11 05:41:12 at java.lang.Thread.run(Unknown Source) [?:?]
2022-09-11 05:41:12 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
2022-09-11 05:41:12 at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
2022-09-11 05:41:12 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1346) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
2022-09-11 05:41:12 org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
Raghunadh Nittala
09/14/2022, 8:29 AMRashmin Patel
09/14/2022, 9:26 AMSatya
09/14/2022, 10:19 AMFileSource
api to read stream of files on s3 with build config monitorContinuously(Duration.ofSeconds(10))
. While starting the job I am getting a connection timeout error.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_3.
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This eventually lead to connection timeout error.
My guess is that issue may cause by FileSource.forRecordStreamFormat
because The application will load files in batch mode to rebuild the historical data and by doing this it may cause the akka to timeout.
The question is:
1. Is this issue is because of FileSource.forRecordStreamFormat
?
2. How can I increase akka.ask.timeout here?Slackbot
09/14/2022, 12:09 PMAdesh Dsilva
09/14/2022, 1:28 PMTypes.LIST(Types.LONG);
but I don’t find Types.SET
?Pedro Mázala
09/14/2022, 1:44 PMcurl
the expected JMX port I get a curl: (52) Empty reply from server
but if I use jconsole
I’m able to check mbeans. Have anyone else experienced it?
flink-conf.yaml: |+
(...)
metrics.reporters: my_jmx_reporter
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020
env.java.opts: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=127.0.0.1
Nick Pocock
09/14/2022, 4:24 PMflink-compose-kinesis-master-1 | org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Certificate for <localstack> doesn't match any of the subject alternative names: [*.amplifyapp.localhost.localstack.cloud, *.cloudfront.localhost.localstack.cloud, *.execute-api.localhost.localstack.cloud, *.localhost.localstack.cloud, *.opensearch.localhost.localstack.cloud, *.s3.localhost.localstack.cloud, *.scm.localhost.localstack.cloud, localhost.localstack.cloud]
If I try to use HTTP I get
Caused by: java.lang.IllegalArgumentException: Invalid service endpoint url: <http://localstack:4566>; Only custom service endpoints using HTTPS are supported
Is there any way to allow HTTP for testing purposes whilst I develop locally? Has anyone had any issues with this before?
Going down a bit of a rabbit hole!
If it helps my module config for the ingress looks like:
kind: io.statefun.kinesis.v1/ingress
spec:
id: com/sentiment
awsRegion:
type: custom-endpoint
id: us-east-1
endpoint: <https://localstack:4566>
awsCredentials:
type: basic
accessKeyId: key
secretAccessKey: secret
startupPosition:
type: latest
streams:
- stream: customer-details
valueType: SentimentEvent
targets:
- sentiment
clientConfigProperties:
- SocketTimeout: 9999
- MaxConnections: 15
Thanks !James Timotiwu
09/14/2022, 4:49 PM.execute
, even if it is already present in the code? I want a way to run getExecutionPlan
without actually running the job, but would like to avoid rebuilding the jars even when the code already has execute