Aswin Dev S
12/16/2022, 6:03 AMKyle Ahn
12/16/2022, 6:38 AMFlinkDeployment.yaml
with running
state. The Flink jobmanager was rolled back, but I am unsure how to proceed with the deployment without losing the state.
2022-12-16 06:03:22,665 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 00000000000000000000000000000000 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 20ba6b65f97481d5570070de90e4e791
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 4 more
Caused by: java.lang.IllegalStateException: There is no operator for the state 20ba6b65f97481d5570070de90e4e791
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:728)
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1666)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1594)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Gyula Fóra
12/16/2022, 9:51 AMexecution.shutdown-on-application-finish: false
execution.submit-failed-job-on-application-error: true
But still if the Application fails on startup (due to state incompatibility) the jobmanager shuts down. This seems incorrect and completely breaks some assumptions in the kubernetes operator.Lee xu
12/16/2022, 9:58 AMOtto Remse
12/16/2022, 10:09 AMinsert into my_sink (myPrimaryKey, columnA) (select primaryKey, columnA from sourceA)
insert into my_sink (myPrimaryKey, columnB) (select primaryKey, columnB from sourceB)
Currently it seems the elasticsink just overwrites the document with null values for columnA, since the second query does not contain that.Robin Cassan
12/16/2022, 10:56 AM...
ezmlm-reject: fatal: Sorry, I don't accept message with empty Subject (#5.7.0)
Do you know if the process has changed somehow?Mohit Jain
12/16/2022, 12:32 PMAvro format
using Pyflink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.avro import AvroRowDeserializationSchema,AvroSchema
from pyflink.table import StreamTableEnvironment
avro_schema = """{"type":"record","name":"NewName","fields":[{"name":"id","type":"long"},{"name":"UserId","type":["null","long"],"default":null},{"name":"fffId","type":["null","int"],"default":null},{"name":"gtbId","type":["null","string"],"default":null},{"name":"TimeofTrans","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"CheckType","type":["null","string"],"default":null}],"connect.name":"NewName"}"""
# initial setup
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-connector-kafka-1.16.0.jar")
env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-avro-1.16.0.jar")
# getting the table env for SQL
deserialization_schema = AvroRowDeserializationSchema(
avro_schema_string=avro_schema
)
kafka_consumer = FlinkKafkaConsumer(
topics='NewTopicName',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '172.10.10.10:9092,172.20.10.10:9092,172.30.10.10:9092', 'group.id': 'test_group_1'}
)
kafka_consumer.set_start_from_latest()
source = env.add_source(kafka_consumer).print()
env.execute()
It is giving ArrayIndexOutOfBoundException
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:85)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 25 out of bounds for length 2
at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.flink.avro.shaded.org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.flink.avro.shaded.org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
... 9 more
Bobby Richard
12/16/2022, 3:32 PMEmmanuel Leroy
12/16/2022, 4:03 PMEmmanuel Leroy
12/16/2022, 4:45 PMYui H
12/16/2022, 9:23 PMobject CollectSink {
val collectedResults = mutable.ListBuffer[(Int, String)]()
}
class CollectSink extends SinkFunction[(Int, String)] {
override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
CollectSink.collectedResults += value
}
}
class MyTestSuite extends BaseSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build
)
override def beforeAll() = {
flinkCluster.before()
}
override def beforeEach() = {
CollectSink.collectedResults.clear()
}
override def afterAll() = {
flinkCluster.after()
}
it should "test example" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val sourceDataStream = env.fromCollection(
Seq(
(1, "a"),
(1, "b"),
(2, "c"),
(1, "d"),
(2, "e")
)
)
val collectSink = new CollectSink()
sourceDataStream
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](Time.seconds(1)))
.reduce(new ReduceFunction[(Int, String)] {
override def reduce(value1: (Int, String), value2: (Int, String)): (Int, String) = (value1._1, value1._2 + value2._2)
})
.addSink(collectSink)
env.execute()
CollectSink.collectedResults should have size 1
}
}
Grant Wade
12/16/2022, 11:16 PM{keyAttrib1, valueAttrib1}
{keyAttrib1, valueAttrib2}
And when logging result of the table, sometimes I see
+I [keyAttrib1, valueAttrib1]
-U [keyAttrib1, valueAttrib1]
+U [keyAttrib1, valueAttrib2]
which is expected, but sometimes I see:
+I [keyAttrib1, valueAttrib2]
-U [keyAttrib1, valueAttrib2]
+U [keyAttrib1, valueAttrib1]
The records start as avro objects, so the mapping early in the flow is just converting them from Key/Value avro pair to a Row object for use in the TableApi. Any ideas? I can create a sample project if it’ll help.Kyle Ahn
12/17/2022, 1:01 AMAT_TIMESTMP
initPos for kinesis data stream? It seems like the Kinesis data stream is throwing ReadThrottled
errors. Is there a strategy around reading from the shards at a slower pace?Fariz Hajiyev
12/17/2022, 1:56 AMkind: io.statefun.kinesis.v1/ingress
spec:
id: <my_ingress_id>
awsRegion:
type: specific
id: us-east-1
startupPosition:
type: latest
streams:
- stream: <my_stream_name>
valueType: <my_value_type>
targets:
- <my_ingress_target>
this works fine, but only for streams located in the same aws account with statefun service (it's on EKS now). does this YAML have a way of specifying that ingress is supposed to read a cross-account stream using a given cross-account role ARN?
and in case if this is not possible, are there any suggestions or ideas on how to resolve this problem?
oh and one more question - at the moment I am using startupPosition: type: latest
but I'm wondering if statefun is able to generate snapshots periodically and save current ingress position in them, similar to Flink streaming, so that in case of redeployment or restart due to an error, it may continue from where it left last time, instead of starting from "latest"? is that enabled by default?
Thank you!Ari Huttunen
12/17/2022, 3:51 PMKerem Ulutaş
12/18/2022, 6:20 PMMohit Jain
12/18/2022, 9:45 PMfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, KafkaSource, KafkaSink, KafkaOffsetsInitializer, FlinkKafkaProducer
from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroSchema
from pyflink.table import StreamTableEnvironment
from pyflink.common import WatermarkStrategy
from pyflink.common import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaRecordSerializationSchema, \
KafkaOffsetsInitializer
avro_schema = """{"type":"record","name":"tootname","fields":[{"name":"id","type":"long"},{"name":"Uid","type":"long"},{"name":"wid","type":"int"},{"name":"tid","type":"string"},{"name":"gt","type":"string"}],"connect.name":"tootnames"}"""
# initial setup
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-connector-kafka-1.16.0.jar")
env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-avro-1.16.0.jar")
# getting the table env for SQL
# t_env = StreamTableEnvironment.create(env)
deserialization_schema = AvroRowDeserializationSchema(
avro_schema_string=avro_schema
)
kafka_consumer = FlinkKafkaConsumer(
topics='topic_name',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '<IP>:<port>', 'group.id': 'test_group_3'}
)
kafka_consumer.set_start_from_latest()
source = env.add_source(kafka_consumer)
producer = kafka_consumer = FlinkKafkaProducer(
topic='flinkSink',
serialization_schema=SimpleStringSchema(),
producer_config={'bootstrap.servers': '127.0.0.1:9092', 'group.id': 'test_group_3'}
)
source.add_sink(producer)
print("initiating process")
env.execute()
Result
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
.......
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
....................
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
..............
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:85)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: java.lang.UnsupportedOperationException: Cannot read strings longer than 2147483639 bytes
at org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305)
at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
.................
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
... 9 more
Not sure why it landed like
Cannot read strings longer than 2147483639 bytes
I checked Kafka as well as tried to read a message from Kafka via different raw python libraries. It was no longer than few KBs but here it says that it is trying to read message size greater than 2 GB?jaiprasad
12/19/2022, 6:15 AMjava.lang.Exception: Could not perform checkpoint 31 for operator Source: KafkaOnPrem_Source -> Transformer-> KafkaCloud_Sink: Writer -> KafkaCloud_Sink: Committer (1/1)#26.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 31 for operator Source: KafkaOnPrem_Source -> Transformer -> KafkaCloud_Sink: Writer -> KafkaCloud_Sink: Committer (1/1)#26. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
... 13 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId
Current Configuration:René
12/19/2022, 9:39 AMCaused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'deploymentServiceFacade' defined in URL [jar:file:/vvp/app/lib/appmanager-controller-2.8.2-plain.jar!/com/dataartisans/appmanager/controller/core/DeploymentServiceFacade.class]: Unsatisfied dependency expressed through constructor parameter 7; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'artifactStorageChecker' defined in class path resource [com/dataartisans/appmanager/apiserver/config/AppManagerConfig.class]: Unsatisfied dependency expressed through method 'artifactStorageChecker' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.ververica.platform.artifactstorage.ArtifactStorage' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
Hygor Knust
12/19/2022, 12:49 PMflink-conf.yaml
"yarn.application.node-label": "CORE",
"yarn.taskmanager.node-label": "TASK",
"jobmanager.memory.process.size": "12g",
"taskmanager.memory.process.size": "17g"
I’m using the instance m5.xlarge
for CORE nodes (4c16g) and is4gen.xlarge
(4c24g)
But when I try to start the yarn-session, I get an exception:
The cluster does not have the requested resources for the TaskManagers available!
It seems like it is trying to allocate the TaskManagers in CORE nodes, which have only 12g of memory, and as I set the TaskManager memory to 17g, it is failing.
My question is why isn’t it using the TASK nodes? And how can I debug it?
Would anyone be able to help me on that?Emmanuel Leroy
12/19/2022, 1:38 PM2022-12-19 13:16:01,253 o.a.f.k.o.o.d.SessionObserver [ERROR][flink/flink-session] REST service in session cluster is bad now
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:231)
at org.apache.flink.kubernetes.operator.observer.deployment.SessionObserver.observeFlinkCluster(SessionObserver.java:48)
at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:89)
at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:55)
at org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:56)
at org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:32)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:114)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:55)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Sachin Saikrishna Manikandan
12/19/2022, 2:01 PMCould not send message [RemoteFencedMessage(874d9a7a2209e291ddae0755843e4a27, RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState)))] from sender [Actor[<akka.tcp://flink@10.244.76.132:6122/temp/jobmanager_2$bd]]> to recipient [Actor[<akka://flink/user/rpc/jobmanager_2#1422837652]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
Jaume
12/19/2022, 2:48 PMpromotion
data source, that has start_date
and end_date
fields.
And we want to get the current active promotions, meaning CURRENT_TIME
is between start_date
and end_date
. Whenever we run Flink Job it will start reading current values, but they won't be re-analyzed whenever time passes and a row matches the condition, only when a change appears in our data source.
❓ Is this something possible with Flink SQL?Sami Badawi
12/19/2022, 3:25 PMEmmanuel Leroy
12/19/2022, 6:36 PMNathanael England
12/19/2022, 6:56 PMMark Cho
12/19/2022, 10:41 PMImportant: For the Batch API, <operator_id> is always equal to <task_id>.For the streaming API, when will
operator_id != task_id
. From what I can tell, all my task_ids
are the same as operator_id
so I want to understand the distinction between the two IDs.Sai Sharath Dandi
12/20/2022, 2:40 AMAn exception with message [java.lang.RuntimeException: Illegal character in: EXPR$0] was thrown while processing request.
Amenreet Singh Sodhi
12/20/2022, 6:09 AM<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/>
and on this site it only shows the following ways to submit jobs:
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#accessing-flink-in-kubernetes>
So am looking for a way to automatically deploy the job without needing the access to web-ui(in Flink Session mode). I am looking for way which doesnt involve shifting to flink k8s operator.raghav tandon
12/20/2022, 7:45 AM