https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Aswin Dev S

    12/16/2022, 6:03 AM
    Hey all, has anyone tried deploying Flink on AWS ECS with high availability( zookeeper )?
  • k

    Kyle Ahn

    12/16/2022, 6:38 AM
    Is anyone familiar with this error? Simply, I deployed a new
    FlinkDeployment.yaml
    with
    running
    state. The Flink jobmanager was rolled back, but I am unsure how to proceed with the deployment without losing the state.
    Copy code
    2022-12-16 06:03:22,665 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 00000000000000000000000000000000 reached terminal state FAILED.
    org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 20ba6b65f97481d5570070de90e4e791
    	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    	... 4 more
    Caused by: java.lang.IllegalStateException: There is no operator for the state 20ba6b65f97481d5570070de90e4e791
    	at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:728)
    	at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1666)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1594)
    	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)
    	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)
    	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
    	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
    	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
    	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
    	... 4 more
    g
    • 2
    • 44
  • g

    Gyula Fóra

    12/16/2022, 9:51 AM
    Hey guys! Somebody knowledgable around the jobresult store could maybe help debug this strange issue. I have the following 2 configs set in Flink 1.15/1.16:
    Copy code
    execution.shutdown-on-application-finish: false
    execution.submit-failed-job-on-application-error: true
    But still if the Application fails on startup (due to state incompatibility) the jobmanager shuts down. This seems incorrect and completely breaks some assumptions in the kubernetes operator.
    ✅ 1
    👀 1
    d
    • 2
    • 14
  • l

    Lee xu

    12/16/2022, 9:58 AM
    Does flink table store support S3 protocol?
  • o

    Otto Remse

    12/16/2022, 10:09 AM
    Hi! Currently trying to use flink tableapi with elasticSearch sink and partial updates. I want to be able to have a common document with updates from two different queries.
    Copy code
    insert into my_sink (myPrimaryKey, columnA) (select primaryKey, columnA from sourceA)
    insert into my_sink (myPrimaryKey, columnB) (select primaryKey, columnB from sourceB)
    Currently it seems the elasticsink just overwrites the document with null values for columnA, since the second query does not contain that.
    m
    • 2
    • 5
  • r

    Robin Cassan

    12/16/2022, 10:56 AM
    Hey all! I'm trying to subscribe to the dev mailing list, which is supposed to be done by sending an empty mail (without object or content) to the address, but when I do I receive this answer from MAILER-DAEMON@apache.org:
    Copy code
    ...
    ezmlm-reject: fatal: Sorry, I don't accept message with empty Subject (#5.7.0)
    Do you know if the process has changed somehow?
    m
    • 2
    • 3
  • m

    Mohit Jain

    12/16/2022, 12:32 PM
    Can someone help here? I am trying to read from Kafka with
    Avro format
    using Pyflink
    Copy code
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
    from pyflink.datastream.formats.avro import AvroRowDeserializationSchema,AvroSchema
    from pyflink.table import StreamTableEnvironment
    
    avro_schema = """{"type":"record","name":"NewName","fields":[{"name":"id","type":"long"},{"name":"UserId","type":["null","long"],"default":null},{"name":"fffId","type":["null","int"],"default":null},{"name":"gtbId","type":["null","string"],"default":null},{"name":"TimeofTrans","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"CheckType","type":["null","string"],"default":null}],"connect.name":"NewName"}"""
    
    # initial setup
    env = StreamExecutionEnvironment.get_execution_environment()
    
    env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-connector-kafka-1.16.0.jar")
    env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-avro-1.16.0.jar")
    
    # getting the table env for SQL
    deserialization_schema = AvroRowDeserializationSchema(
        avro_schema_string=avro_schema
    )
    
    kafka_consumer = FlinkKafkaConsumer(
        topics='NewTopicName',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '172.10.10.10:9092,172.20.10.10:9092,172.30.10.10:9092', 'group.id': 'test_group_1'}
    )
    kafka_consumer.set_start_from_latest()
    source = env.add_source(kafka_consumer).print()
    
    env.execute()
    It is giving ArrayIndexOutOfBoundException
    Copy code
    : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
    	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
    	at akka.dispatch.OnComplete.internal(Future.scala:300)
    	at akka.dispatch.OnComplete.internal(Future.scala:297)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
    	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
    	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
    	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
    	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
    	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
    	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
    	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
    	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
    	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
    	at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:537)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    	... 5 more
    Caused by: java.io.IOException: Failed to deserialize Avro record.
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:85)
    	at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
    Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 25 out of bounds for length 2
    	at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
    	at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
    	at org.apache.flink.avro.shaded.org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
    	at org.apache.flink.avro.shaded.org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
    	... 9 more
    m
    • 2
    • 6
  • b

    Bobby Richard

    12/16/2022, 3:32 PM
    After enabling exactly_once processing with kafka transactions I seem to have a memory leak. Heap utilization constantly grows until the job eventually crashes. When profiling I see many org.apache.kafka.clients.producer.internals.RecordAccumulator growing indefinitely. What could cause this?
    m
    • 2
    • 6
  • e

    Emmanuel Leroy

    12/16/2022, 4:03 PM
    I have a HybridSource loading files from objectstorage before switching to the equivalent Kafka stream. SOurce runs at 100% busy at first (which makes sense) then switches to Kafka but stays at 75-85% busy, which the kafka only version runs at 0% busy. I am wondering if i am doing something wrong here, as if the FileSource was still busy for some reason.
    m
    d
    +2
    • 5
    • 13
  • e

    Emmanuel Leroy

    12/16/2022, 4:45 PM
    I am still seeing this kind of errors all the time in the Operator. Savepoints end up working fine, but those that failed like this are not getting cleaned up after the time period defined, and I end up with many, many savepoints on object storage.
    flink-savepoint-error.txt
    g
    • 2
    • 11
  • y

    Yui H

    12/16/2022, 9:23 PM
    I’m trying to write a simple test but not sure why it’s not working here is my code. Not sure what I am missing 😢
    Copy code
    object CollectSink {
      val collectedResults = mutable.ListBuffer[(Int, String)]()
    }
    
    class CollectSink extends SinkFunction[(Int, String)] {
      override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
        CollectSink.collectedResults += value
      }
    }
    
    class MyTestSuite extends BaseSuite with BeforeAndAfterAll with BeforeAndAfterEach {
      val flinkCluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
          .setNumberSlotsPerTaskManager(2)
          .setNumberTaskManagers(1)
          .build
      )
    
      override def beforeAll() = {
        flinkCluster.before()
      }
    
      override def beforeEach() = {
        CollectSink.collectedResults.clear()
      }
    
      override def afterAll() = {
        flinkCluster.after()
      }
    
      it should "test example" in {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(2)
    
        val sourceDataStream = env.fromCollection(
          Seq(
            (1, "a"),
            (1, "b"),
            (2, "c"),
            (1, "d"),
            (2, "e")
          )
        )
        val collectSink = new CollectSink()
        sourceDataStream
          .keyBy(_._1)
          .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
          .trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](Time.seconds(1)))
          .reduce(new ReduceFunction[(Int, String)] {
            override def reduce(value1: (Int, String), value2: (Int, String)): (Int, String) = (value1._1, value1._2 + value2._2)
          })
          .addSink(collectSink)
    
        env.execute()
    
        CollectSink.collectedResults should have size 1
      }
    }
    m
    • 2
    • 4
  • g

    Grant Wade

    12/16/2022, 11:16 PM
    Hey all, I’m pretty new to Flink and have a simple TableApi app working (mostly), and the goal is to connect it to Kafka, but for this simple app I’m creating a datastream using StreamExecutionEnvironment#fromElements, then do some simple transformation (stateless), and pipe it into a table using StreamTableEnvironment#fromChangelogStream using ChangelogMode.upsert(). All of this works as I expect it to, except sometimes the records are processed out of order and sometimes they’re processed in the order I defined them. I’m determining the record order by using StreamTableEnvironment#toChangelogStream(…).process(/*log row */) I have 2 records being input:
    {keyAttrib1, valueAttrib1}
    {keyAttrib1, valueAttrib2}
    And when logging result of the table, sometimes I see
    +I [keyAttrib1, valueAttrib1]
    -U [keyAttrib1, valueAttrib1]
    +U [keyAttrib1, valueAttrib2]
    which is expected, but sometimes I see:
    +I [keyAttrib1, valueAttrib2]
    -U [keyAttrib1, valueAttrib2]
    +U [keyAttrib1, valueAttrib1]
    The records start as avro objects, so the mapping early in the flow is just converting them from Key/Value avro pair to a Row object for use in the TableApi. Any ideas? I can create a sample project if it’ll help.
  • k

    Kyle Ahn

    12/17/2022, 1:01 AM
    Has anyone faced an issue with using
    AT_TIMESTMP
    initPos for kinesis data stream? It seems like the Kinesis data stream is throwing
    ReadThrottled
    errors. Is there a strategy around reading from the shards at a slower pace?
    d
    • 2
    • 3
  • f

    Fariz Hajiyev

    12/17/2022, 1:56 AM
    Hello all! Got a question on Flink Statefun, is it possible to use cross-account stream as Kinesis ingress? my yaml looks like this at the moment:
    Copy code
    kind: io.statefun.kinesis.v1/ingress
    spec:
      id: <my_ingress_id>
      awsRegion:
        type: specific
        id: us-east-1
      startupPosition:
        type: latest
      streams:
        - stream: <my_stream_name>
          valueType: <my_value_type>
          targets:
            - <my_ingress_target>
    this works fine, but only for streams located in the same aws account with statefun service (it's on EKS now). does this YAML have a way of specifying that ingress is supposed to read a cross-account stream using a given cross-account role ARN? and in case if this is not possible, are there any suggestions or ideas on how to resolve this problem? oh and one more question - at the moment I am using
    startupPosition: type: latest
    but I'm wondering if statefun is able to generate snapshots periodically and save current ingress position in them, similar to Flink streaming, so that in case of redeployment or restart due to an error, it may continue from where it left last time, instead of starting from "latest"? is that enabled by default? Thank you!
    • 1
    • 1
  • a

    Ari Huttunen

    12/17/2022, 3:51 PM
    I'm trying to use pyflink to read data from (Confluent) Kafka. The data is CSV data, row by row, and it's compressed. Though Confluent Kafka has a schema registry, this topic does not have a schema. Instead I'm defining the columns in code. I'm having trouble figuring out how to put these things together. Any pointers? Also, what's the general status of pyflink compared to Java?
    m
    d
    • 3
    • 6
  • k

    Kerem Ulutaş

    12/18/2022, 6:20 PM
    Hi all, I'm trying to process some data using Dataset API (Flink version 1.14.3) - recently we received a no space left on device error and I remembered some setting to enable compression for intermediate results in MapReduce - searched through Flink configuration documentation but couldn't find any setting related to this. Can someone put me in the right direction? I am not using MapReduce via Flink, just reading some parquet files which I have the schema in Avro format - all transformation afterwards is done using Dataset API
    m
    • 2
    • 9
  • m

    Mohit Jain

    12/18/2022, 9:45 PM
    Is Pyflink avro client stable? I am trying to read Avro records from Kafka using Pyflink using the steps mentioned in the docs and facing the following issue: Version 1.16.0 Code:
    Copy code
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, KafkaSource, KafkaSink, KafkaOffsetsInitializer, FlinkKafkaProducer
    from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroSchema
    from pyflink.table import StreamTableEnvironment
    from pyflink.common import WatermarkStrategy
    from pyflink.common import SimpleStringSchema
    from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaRecordSerializationSchema, \
        KafkaOffsetsInitializer
    
    
    avro_schema = """{"type":"record","name":"tootname","fields":[{"name":"id","type":"long"},{"name":"Uid","type":"long"},{"name":"wid","type":"int"},{"name":"tid","type":"string"},{"name":"gt","type":"string"}],"connect.name":"tootnames"}"""
    
    # initial setup
    env = StreamExecutionEnvironment.get_execution_environment()
    
    env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-connector-kafka-1.16.0.jar")
    env.add_jars("file:///Desktop/Projects/streaming/flink_py_test/flink-sql-avro-1.16.0.jar")
    
    # getting the table env for SQL
    # t_env = StreamTableEnvironment.create(env)
    deserialization_schema = AvroRowDeserializationSchema(
        avro_schema_string=avro_schema
    )
    
    kafka_consumer = FlinkKafkaConsumer(
        topics='topic_name',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '<IP>:<port>', 'group.id': 'test_group_3'}
    )
    
    kafka_consumer.set_start_from_latest()
    source = env.add_source(kafka_consumer)
    
    
    producer = kafka_consumer = FlinkKafkaProducer(
        topic='flinkSink',
        serialization_schema=SimpleStringSchema(),
        producer_config={'bootstrap.servers': '127.0.0.1:9092', 'group.id': 'test_group_3'}
    )
    
    
    
    source.add_sink(producer)
    
    
    print("initiating process")
    env.execute()
    Result
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
    : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
    	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    .......
    	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
    	at akka.dispatch.OnComplete.internal(Future.scala:300)
    ....................
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
    ..............
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:537)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    	... 5 more
    Caused by: java.io.IOException: Failed to deserialize Avro record.
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:85)
    	at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
    Caused by: java.lang.UnsupportedOperationException: Cannot read strings longer than 2147483639 bytes
    	at org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305)
    	at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
    .................
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
    	at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
    	at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
    	... 9 more
    Not sure why it landed like
    Copy code
    Cannot read strings longer than 2147483639 bytes
    I checked Kafka as well as tried to read a message from Kafka via different raw python libraries. It was no longer than few KBs but here it says that it is trying to read message size greater than 2 GB?
    m
    a
    • 3
    • 6
  • j

    jaiprasad

    12/19/2022, 6:15 AM
    Issue : Checkpoint Failure with EOS enabled Objective : we are trying to stream data from one kafka instance in one aks cluster and another kafka instance on another aks cluster , and both the aks clusters are Vnet peered and in the same Azure Region Flink Version : 1.15 Kafka Client version : 2.8 Kafka Broker version : 3.2 ( Strimzi Kafka) DAG Source(kafka) -> Sink(kafka) Out of 47 checkpoints , 37 failed with below exceptions Exception :
    Copy code
    java.lang.Exception: Could not perform checkpoint 31 for operator Source: KafkaOnPrem_Source -> Transformer-> KafkaCloud_Sink: Writer -> KafkaCloud_Sink: Committer (1/1)#26.
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
    
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    
        at java.base/java.lang.Thread.run(Unknown Source)
    
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 31 for operator Source: KafkaOnPrem_Source -> Transformer -> KafkaCloud_Sink: Writer -> KafkaCloud_Sink: Committer (1/1)#26. Failure reason: Checkpoint was declined.
    
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
    
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
    
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
    
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
    
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
    
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
    
        ... 13 more
    
    Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId
    Current Configuration:
    👀 1
    m
    • 2
    • 16
  • r

    René

    12/19/2022, 9:39 AM
    Hi all, starting the appmanager of the Ververica Platform on OpenShift I got this error:
    Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'deploymentServiceFacade' defined in URL [jar:file:/vvp/app/lib/appmanager-controller-2.8.2-plain.jar!/com/dataartisans/appmanager/controller/core/DeploymentServiceFacade.class]: Unsatisfied dependency expressed through constructor parameter 7; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'artifactStorageChecker' defined in class path resource [com/dataartisans/appmanager/apiserver/config/AppManagerConfig.class]: Unsatisfied dependency expressed through method 'artifactStorageChecker' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.ververica.platform.artifactstorage.ArtifactStorage' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    m
    k
    • 3
    • 7
  • h

    Hygor Knust

    12/19/2022, 12:49 PM
    Hi, I’m trying to use node-labels in YARN @ EMR to run the JobManager only in CORE nodes and TaskManagers only in TASK nodes. I have assigned the right node-labels in the nodes, and I have this in
    flink-conf.yaml
    Copy code
    "yarn.application.node-label": "CORE",
    "yarn.taskmanager.node-label": "TASK",
    
    "jobmanager.memory.process.size": "12g",
    "taskmanager.memory.process.size": "17g"
    I’m using the instance
    m5.xlarge
    for CORE nodes (4c16g) and
    is4gen.xlarge
    (4c24g) But when I try to start the yarn-session, I get an exception:
    The cluster does not have the requested resources for the TaskManagers available!
    It seems like it is trying to allocate the TaskManagers in CORE nodes, which have only 12g of memory, and as I set the TaskManager memory to 17g, it is failing. My question is why isn’t it using the TASK nodes? And how can I debug it? Would anyone be able to help me on that?
  • e

    Emmanuel Leroy

    12/19/2022, 1:38 PM
    how do I recover from:
    Copy code
    2022-12-19 13:16:01,253 o.a.f.k.o.o.d.SessionObserver  [ERROR][flink/flink-session] REST service in session cluster is bad now
    java.util.concurrent.TimeoutException
    	at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    	at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:231)
    	at org.apache.flink.kubernetes.operator.observer.deployment.SessionObserver.observeFlinkCluster(SessionObserver.java:48)
    	at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:89)
    	at org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:55)
    	at org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:56)
    	at org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:32)
    	at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:114)
    	at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:55)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
    	at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    	at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
    	at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
  • s

    Sachin Saikrishna Manikandan

    12/19/2022, 2:01 PM
    Hello team, we are frequently getting this exception when deploying Flink via the K8s operator:
    Copy code
    Could not send message [RemoteFencedMessage(874d9a7a2209e291ddae0755843e4a27, RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState)))] from sender [Actor[<akka.tcp://flink@10.244.76.132:6122/temp/jobmanager_2$bd]]> to recipient [Actor[<akka://flink/user/rpc/jobmanager_2#1422837652]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
    m
    • 2
    • 6
  • j

    Jaume

    12/19/2022, 2:48 PM
    Hi fellows, I've a question regarding how Flink SQL treats events. Imagine we have a
    promotion
    data source, that has
    start_date
    and
    end_date
    fields. And we want to get the current active promotions, meaning
    CURRENT_TIME
    is between
    start_date
    and
    end_date
    . Whenever we run Flink Job it will start reading current values, but they won't be re-analyzed whenever time passes and a row matches the condition, only when a change appears in our data source. ❓ Is this something possible with Flink SQL?
    m
    • 2
    • 6
  • s

    Sami Badawi

    12/19/2022, 3:25 PM
    Trying to get PyFlink 1.16 working with CDC Postgres table source. First in standalone later in cluster mode. I am a little confused by the documentation. Do anybody know about an example project?
    r
    d
    • 3
    • 7
  • e

    Emmanuel Leroy

    12/19/2022, 6:36 PM
    Hi, I’m working with HybridSources to parse changelogs from various tables and switch to streaming. I am merging the sources and trying to get a state at a fixed interval. When streaming only, events arrive more or less in order, and more or less within a window, so that I can build a state at constant interval using event time and a window. When ‘replaying’ the data with the hybrid source, however, it is a lot less trivial: I wrote a custom FileSource to read the files somewhat in order, however the various sources have different sizes, so that files are read at different speeds and are completely out of sync when they get merged. So what would be a good strategy to replay these logs and insure the event from different sources that belong to the same window get processed at the same time; some sort of gating of the source based on event time?
  • n

    Nathanael England

    12/19/2022, 6:56 PM
    Is it possible to write a custom rabbitmq deserializer in pyflink? Looking through the code, it seems like pyflink doesn't really hold business logic for the connectors and such and rather utilizes the java code through the gateway.
    m
    d
    • 3
    • 11
  • m

    Mark Cho

    12/19/2022, 10:41 PM
    Hey folks, in the docs for Flink metrics (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#list-of-all-variables) It mentions:
    Important: For the Batch API, <operator_id> is always equal to <task_id>.
    For the streaming API, when will
    operator_id != task_id
    . From what I can tell, all my
    task_ids
    are the same as
    operator_id
    so I want to understand the distinction between the two IDs.
    d
    • 2
    • 4
  • s

    Sai Sharath Dandi

    12/20/2022, 2:40 AM
    Hi folks, Is there a recommended way to change flink generated column names? The generated column names are not compatible with avro naming and I run into this exception when converting Schema to AvroSchema
    Copy code
    An exception with message [java.lang.RuntimeException: Illegal character in: EXPR$0] was thrown while processing request.
    • 1
    • 3
  • a

    Amenreet Singh Sodhi

    12/20/2022, 6:09 AM
    Hi all, I am creating the JM and TS using the yaml files as mentioned on the site:
    Copy code
    <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/>
    and on this site it only shows the following ways to submit jobs:
    Copy code
    <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#accessing-flink-in-kubernetes>
    So am looking for a way to automatically deploy the job without needing the access to web-ui(in Flink Session mode). I am looking for way which doesnt involve shifting to flink k8s operator.
    m
    • 2
    • 2
  • r

    raghav tandon

    12/20/2022, 7:45 AM
    Hi All, I am trying to use Kafka txns to achieve EXACTLY_ONCE, most of the time it runs fine…We have followed all steps to make sure it works fine.. But randomly we get Exception and the entire job gets stuck as Producer is not able to ACK checkpoint…Putting exception in the thread… Can someone please explain what is going on? And why this is happening?
    m
    • 2
    • 12
1...414243...98Latest