https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Reme Ajayi

    01/11/2023, 1:53 AM
    Hi Folks, does anyone know how to read files from Confluent Cloud Kafka topics with pyflink? I can't get it to work with the Table API and pyflink's datastream API does not support the
    ConfluentAvroDeserializationSchema
    which is necessary for this file reads (It is supported in Java)
  • s

    Sumit Nekar

    01/11/2023, 10:36 AM
    Hello Team, I am upgrading flink kuberenetes version from 1.2 to 1.3. I am getting following error.
    Copy code
    Exception occurred while acquiring lock 'LeaseLock: flink-operator - flink-operator-lease (flink-kubernetes-operator-5cb64599dc-zk7rb)'
    io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: <https://10.70.197.1/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. <http://leases.coordination.k8s.io|leases.coordination.k8s.io> "flink-operator-lease" is forbidden: User "system:serviceaccount:flink-operator:flink-operator" cannot get resource "leases" in API group "<http://coordination.k8s.io|coordination.k8s.io>" in the namespace "flink-operator".
    Do we need provide any more permissions for the service account as part of this upgrade?
    p
    g
    • 3
    • 3
  • c

    clen.moras

    01/11/2023, 11:28 AM
    anyone else facing this issue?
  • t

    Tri Tam Hoang

    01/11/2023, 3:41 PM
    Hi guys, AWS recently supports flink-1.15.2 for KDA. Is there anyway to retrieve runtime info of the underlying machine? (Like JDK version, …) Thanks
    h
    • 2
    • 3
  • m

    Michael Parrott

    01/11/2023, 3:45 PM
    👋 hey team, I’m trying to create a KeyedCoProcessFunction with generic type parameters. I want to store some ValueState with these generic types as well, and I also want to restrict what type can be passed into the generic function. however, to be able to store the generic type in the state, I need to use a context bound (as described here) which prevents me from using a type bound for my function. how can I do what I’m describing?
    • 1
    • 1
  • y

    Yang LI

    01/11/2023, 4:04 PM
    Hello guys, I have tried building flink from source with flink repo following this document https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ and I see this msg, Do you know how to fix this ?🙏
    m
    • 2
    • 20
  • l

    Lily Liu

    01/11/2023, 5:10 PM
    Hi team. I am getting error:
    org.apache.flink.table.api.TableException: unexpected correlate variable $cor0 in the plan
    SQL example I am following: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join There seems to be a JIRA ticket already on this error: https://issues.apache.org/jira/browse/FLINK-23159 However I can’t be sure if this is the same issue. Is there a walk around?
    • 1
    • 1
  • v

    Vinay Agarwal

    01/11/2023, 5:12 PM
    👋 Hello, team!, Looking to improve AVRO deserialization performance when creating
    DataStream<MyObject>
    from AVRO stored in S3. The commands used are
    Copy code
    Path path = new Path(s3Path);
        AvroInputFormat<MyObject> avroInputFormat = new AvroInputFormat<>(path, MyObject.class);
        return env.createInput(avroInputFormat);
    The
    s3Path
    is a folder containing AVRO data with total size of approximately 1400 GB containing ~150 million records.
    MyObject
    is a deeply nested object. The performance difference between the above and the following is about 1000x (the following is 1000x faster).
    Copy code
    Path path = new Path(s3Path);
        TextInputFormat textInputFormat = new TextInputFormat(path);
        return env.createInput(textInputFormat);
    In my Flink job, when using AVRO deserialization, I repeatedly get the following message
    Copy code
    Switching to Random IO seek policy
    I'd appreciate any suggestions.
    h
    • 2
    • 1
  • x

    xiaohe lan

    01/11/2023, 5:45 PM
    Hi Team, I just started playing with the Flink playground project, I am following this guide: https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/flink-operations-playground/ Looks everything is running fine except `operations-playground-client-1`:
    Copy code
    docker-compose ps
    NAME                                           COMMAND                  SERVICE                STATUS              PORTS
    operations-playground-clickevent-generator-1   "/docker-entrypoint.…"   clickevent-generator   running             8081/tcp
    operations-playground-client-1                 "/docker-entrypoint.…"   client                 exited (1)
    operations-playground-jobmanager-1             "/docker-entrypoint.…"   jobmanager             running             0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
    operations-playground-kafka-1                  "start-kafka.sh"         kafka                  running             0.0.0.0:9092->9092/tcp, 0.0.0.0:9094->9094/tcp, :::9092->9092/tcp, :::9094->9094/tcp
    operations-playground-taskmanager-1            "/docker-entrypoint.…"   taskmanager            running             8081/tcp
    operations-playground-zookeeper-1              "/bin/sh -c '/usr/sb…"   zookeeper              running             3888/tcp
    When I check the exceptions I see the following:
    Copy code
    org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
    	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
    	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    	... 4 more
    Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245)
    	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511)
    	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317)
    	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
    	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
    	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
    	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
    	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
    	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
    	... 4 more
    Caused by: java.io.IOException: Failed to create directory for shared state: file:/tmp/flink-checkpoints-directory/84da55bc3e4f0429bf874b80c498b94c/shared
    	at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.initializeBaseLocationsForCheckpoint(FsCheckpointStorageAccess.java:117)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:333)
    	... 18 more
    I do have the directory created already in the host:
    Copy code
    ls /tmp/flink*
    /tmp/flink-checkpoints-directory:
    
    /tmp/flink-savepoints-directory:
    Any idea what is going on here ?
    • 1
    • 1
  • c

    Colin Williams

    01/11/2023, 5:57 PM
    I'm interested in doing a "lookup join" or "enrichment join" against a "changelog stream" read by "upsert-kafka". I am wondering if this is possible against the table API. I found https://github.com/fhueske/flink-sql-demo#enrichment-join-against-temporal-table however when I read up on https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ it says that it does not work on changelog inputs. Then it appears that I specifically can't perform the "enrichment join". Is there a technique I am missing? Is there a reason why we should not perform the join that I am missing?
  • n

    Niels Berglund

    01/11/2023, 6:04 PM
    Hi there! Niels here - I just introduced myself over at #introductions. I am very new at
  • n

    Niels Berglund

    01/11/2023, 6:07 PM
    Hi All! Niels here, I just introduced myself over in #C03FAEU4MJB. I am trying to learn Flink by running locally in Docker. The problem I am running into is creating a SQL Gateway image from Flink 1.16. Does anyone have any pointers? Thanks! Niels
    c
    • 2
    • 4
  • y

    Yatrik Mehta

    01/12/2023, 12:05 AM
    Hi All, I am using Flink with multiple jobs deployed via k8s and running jobs as different pods. Curious on how can we add jobId to the logs that can be differentiated when viewed in an aggregated log ?
    z
    • 2
    • 2
  • l

    Lee xu

    01/12/2023, 2:13 AM
    Flink version 1.16.0 no longer supports Hive 1.x, but ours is still in use. How do I go about compatibility with support?
  • t

    toe

    01/12/2023, 3:45 AM
    I am debuging flink 1.16.0 with zookeeper 3.4.14 , but i get this error . Have anybody give me a little help ? Thanks
    Copy code
    org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /flink/application_1648557683773_1801
    y
    • 2
    • 1
  • t

    Tri Tam Hoang

    01/12/2023, 5:03 AM
    Hi all, my team and I are migrating our AWS KDA to the recently supported flink-1.15.2. And we are facing some random runtime exceptions, which can’t be reproduced in my local environment. It happens at our Scala code portions that invoke List.map or Seq.map. The exception is: java.util.NoSuchElementException: head of empty list _at scala.collection.immutable.Nil$.head(List.scala:469) at scala.collection.immutable.Nil$.head(List.scala:466) at scala.collection.immutable.List.map(List.scala:293)_ Similar exceptions happen when we use Case Class as data record between streaming task: the record fails to be serialized as it invokes similar code to List.map(). We are suspecting the Scala implementation at List.scala:297 tail ne Nil was not correctly executed, but can’t investigate further. Have you ever faced such error in the latest KDA? Please let me know. Thanks in advance. Note: we are using Scala: 2.12.15.
    m
    h
    +2
    • 5
    • 11
  • d

    ding king

    01/12/2023, 7:51 AM
    Hi all, I am trying flink table store with same throuble,which i wanting a solution. i am trying parse log file and write to table store, here is main code:
    Copy code
    // ... read kafka source and docode as a POJO class stream
    SingleOutputStreamOperator<PushLogRow> rows=input.process(new InputProcessFunction());
    // get table
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    153524,761 WARN org.apache.flink.runtime.taskmanager.Task [] - Writer (2/8)#0 (50437457f6a1c5bb432e75244c783e5e_8d96fc510e75de3baf03ef7367db7d42_1_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 1 for operator Writer (2/8)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException at org.apache.flink.table.store.connector.sink.StoreWriteOperator.prepareCommit(StoreWriteOperator.java:192) at org.apache.flink.table.store.connector.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:59)
  • d

    ding king

    01/12/2023, 8:19 AM
    Hi all, I am trying flink table store ,which i want to be an product solution. but there is same trouble : Here is my main code:
    Copy code
    // ... decode input as POJO Datastream
     SingleOutputStreamOperator<PushLogRow> rows=input.process(new InputProcessFunction());
    
    // ... convert to table
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    Table inputTable = tableEnv.fromDataStream(rows);
    inputTable.printSchema();
    // write to table store
    FlinkCatalog catalog= FlinkCatalogFactory.createCatalog("flink-table",tableCfg);
    tableEnv.registerCatalog("flink-table",catalog);
    tableEnv.useCatalog("flink-table");
    tableEnv.useDatabase("default");
    inputTable.executeInsert("t_push_log");
    the first problem is DataStream Column Type not match with TABLE on type of Map, then i find a way to resolved: add anotation to my POJO class:
    Copy code
    @TypeInfo(FlinkTypeInfoFactory.class)
    public class PushLogRow{
    
       private Map<String,String> ex;
    
    	@JsonAnySetter
    	public void setEx(String key, Object value){
    		if (null==ex){
    			ex =new HashMap<>();
    		}
    		ex.put(key,String.valueOf(value));
    	}
    }
    
    public class FlinkTypeInfoFactory<T> extends TypeInfoFactory<T> {
    
        @Override
        public TypeInformation<T> createTypeInfo(Type t, Map<String, TypeInformation<?>> map) {
            Map<String, TypeInformation<?>> typeInfo = new HashMap<>();
            Class<T> clazz=(Class<T>) t;
            FieldUtils.getAllFieldsList(clazz).forEach(field -> {
                String key=field.getName();
                Class<?> type=field.getType();
                 ...
                }else if (type==Map.class){
                    typeInfo.put(key, Types.MAP(Types.STRING,Types.STRING));
                }
            });
            return Types.POJO(clazz,typeInfo);
        }
    }
    then the DataStream table schema printed as I want Map<String,String> instead of RAW(java.util.Map,...) but Could not perform checkpoint, which error as blow:
    Copy code
    13:57:39,072 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Writer (3/8)#0 (a18ee02afc5d80bf846f1cfbc8863239_8d96fc510e75de3baf03ef7367db7d42_2_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 1 for operator Writer (3/8)#0.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
    	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
    	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException
    	at org.apache.flink.table.store.connector.sink.StoreWriteOperator.prepareCommit(StoreWriteOperator.java:192)
    	at org.apache.flink.table.store.connector.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:59)
    	at org.apache.flink.table.store.connector.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:47)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
    	... 22 more
    Caused by: java.lang.IndexOutOfBoundsException
    	at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375)
    	at org.apache.flink.table.data.binary.BinaryStringData.compareTo(BinaryStringData.java:178)
    	at MemTableComparator$1775.compare(Unknown Source)
    	at org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable.compareRecords(BinaryIndexedSortable.java:184)
    	at org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable.compare(BinaryIndexedSortable.java:176)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:136)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:321)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:315)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:315)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:321)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:321)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sortInternal(QuickSort.java:315)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sort(QuickSort.java:66)
    	at org.apache.flink.runtime.operators.sort.QuickSort.sort(QuickSort.java:81)
    	at org.apache.flink.table.store.file.mergetree.SortBufferMemTable.mergeIterator(SortBufferMemTable.java:110)
    	at org.apache.flink.table.store.file.mergetree.MergeTreeWriter.flushMemory(MergeTreeWriter.java:160)
    	at org.apache.flink.table.store.file.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:184)
    	at org.apache.flink.table.store.table.sink.AbstractTableWrite.prepareCommit(AbstractTableWrite.java:96)
    	at org.apache.flink.table.store.connector.sink.StoreWriteOperator.prepareCommit(StoreWriteOperator.java:188)
    13:57:39,072 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Writer (3/8)#0 (a18ee02afc5d80bf846f1cfbc8863239_8d96fc510e75de3baf03ef7367db7d42_2_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 1 for operator Writer (3/8)#0.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
    	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
    	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
        ...
    Caused by: java.io.IOException: java.lang.IndexOutOfBoundsException
    	at org.apache.flink.table.store.connector.sink.StoreWriteOperator.prepareCommit(StoreWriteOperator.java:192)
    	at org.apache.flink.table.store.connector.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:59)
    	at org.apache.flink.table.store.connector.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:47)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
    	... 22 more
    Caused by: java.lang.IndexOutOfBoundsException
    	at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375)
    	at org.apache.flink.table.data.binary.BinaryStringData.compareTo(BinaryStringData.java:178)
    	at MemTableComparator$1775.compare(Unknown Source)
    	at org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable.compareRecords(BinaryIndexedSortable.java:184)
    when droped Map field working well my env: flink-1.16.0-bin-scala_2.12 with flink-table-store-dist-0.2.1.jar here is samething i want to be helped: 1. does my way is correct or effect, any better way? 2. is the problem is a bug , having other solution or not ? thank all for any concern reply, i will be extremely grateful.
  • r

    Roman Bohdan

    01/12/2023, 8:49 AM
    Hello, can you please answer my question: is it possible to delete keyed state by key?
    t
    • 2
    • 3
  • s

    Suriya Krishna Mariappan

    01/12/2023, 10:25 AM
    Hey guys , we were using the flink kubernetes operator v 1.3.0, now we cannot find the download link in the apache downloads directory. It has been updated to 1.3.1. Just wondering why the 1.3.0 version was removed from the directory. Do we keep incrementing the versions every time it is updated? What is the recommended way to do this? Thanks in advance for any help provided.
  • k

    Karthi Thyagarajan

    01/12/2023, 1:47 PM
    Hello, I was wondering if there’s a way to do a
    rebalance
    (or equivalent) immediately downstream of a source operator that is unbalanced (meaning it’s generating data that is severely unevenly distributed across task slots) in a Table API app. Thanks!
    d
    • 2
    • 3
  • m

    mgu

    01/12/2023, 2:00 PM
    KeyedBroadcastProcessFunction or CoProcessFunction? what is the difference and when to use them while joining streams?
    t
    • 2
    • 2
  • a

    Amol Khare

    01/12/2023, 4:24 PM
    Hi Team, I'm missing out on some fundamental aspect. Just adding a
    filter
    on kinesis stream and then
    sink
    to JDBC database
    Copy code
    kinesisStream.filter(event -> EventType.AGENT_START == event.getType())
            .addSink(insertAssetSink);
    No data gets saved to DB. If I remove the
    filter
    operation, data gets saved. Has it got something to do with whether I'm processing all data from the kinesis or not ?
    a
    • 2
    • 3
  • a

    Amir Halatzi

    01/12/2023, 6:05 PM
    Hey all 🙂 I’m trying to query a Kafka source and emit an aggregated value every 10 seconds using SQL. I keep getting
    Initial Segment may not be null
    error. I’m using Flink 1.14.5. Here’s a simplified version of the query:
    Copy code
    SELECT
      window_start, window_end,
    count(`timestamp`) as `total`
    from TABLE(TUMBLE(TABLE tbl, descriptor(`timestamp`), INTERVAL '5' SECOND))
    group by
     window_start, window_end
    The only reference I found was an open issue in the mailing list, that I’m not sure is related. Can you please help?
  • s

    Sai Sharath Dandi

    01/12/2023, 7:51 PM
    Continuation thread to https://apache-flink.slack.com/archives/C03G7LJTS2G/p1671671687114309. I'm unable to use ROW() for constructing nested type when the input param is nested/array type. example -
    Copy code
    SELECT CAST(ROW(person.name, person.age) as ROW< new_name string, new_age int > ) as new_person FROM (VALUES (CAST(ROW('Bob', 10) as ROW< name string, age int>)),(CAST(ROW('Alice', 20) as ROW< name string, age int>))) AS PersonTable(person);
    >
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, column 23.
    Was expecting one of:
        ")" ...
        "," ...
    Added the comment to this similar ticket - https://issues.apache.org/jira/browse/FLINK-9161. Please lmk if anyone knows a way to make this work.
    j
    • 2
    • 2
  • k

    Krish Narukulla

    01/12/2023, 10:29 PM
    We are using flink-protobuf connector 1.16 version and Table API Need to create a record with the combination of column from a table and string literals. SELECT row(user_id, 'some-name') from `user'
    Copy code
    Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "\'some-name\'" at line 1, column 24.
    Was expecting one of:
        <BRACKET_QUOTED_IDENTIFIER> ...
        <QUOTED_IDENTIFIER> ...
        <BACK_QUOTED_IDENTIFIER> ...
        <HYPHENATED_IDENTIFIER> ...
        <IDENTIFIER> ...
        <UNICODE_QUOTED_IDENTIFIER> ...
        
            at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:483)
            at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:246)
            at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
            at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
            at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
            ... 4 more
    s
    • 2
    • 2
  • n

    Nathanael England

    01/13/2023, 1:28 AM
    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#broadcast-state says broadcast state is not supported in the python API but https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L602 makes me believe that it is. Is this a case of the docs being outdated or is there something more I'm not understanding?
    d
    • 2
    • 1
  • d

    Daniel Craig

    01/13/2023, 4:11 AM
    I'm using Flink 1.6 with Kinesis Data Analytics and I'm seeing the following error message quite often; is anyone aware of how to resolve this?:
    Copy code
    {
      "locationInformation": "org.slf4j.helpers.MarkerIgnoringBase.warn(MarkerIgnoringBase.java:131)",
      "logger": "akka.remote.transport.netty.NettyTransport",
      "message": "Remote connection failed with javax.net.ssl.SSLException: Cannot kickstart, the connection is broken or closed",
      "threadName": "flink-akka.actor.default-dispatcher-567",
      "applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxxxxx:application/xxxxx",
      "applicationVersionId": "55",
      "messageSchemaVersion": "1",
      "messageType": "WARN"
    }
    j
    • 2
    • 23
  • r

    raghav tandon

    01/13/2023, 5:38 AM
    I am facing some kafka source imbalance issue, can someone tell me the strategy or point to code how flink decides to map which Topic’s partition<>subtask I am consuming from 2 topic and both have different inrates.. Below is what Flink is doing Topic 1.- 10 partitions Topic 2 - 5 partitons Question - Why subtask 4 is not consuming anything from Topic1?
    z
    • 2
    • 10
  • c

    chankyeong won

    01/13/2023, 9:19 AM
    I’m having trouble with running my flink test job in my local IDEA (Intellij). When use rocksdb as state.backend, this error is occur,
    Copy code
    java.lang.NoSuchFieldError: NONE
    	at org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.<clinit>(RocksDBConfigurableOptions.java:138)
    	at org.apache.flink.contrib.streaming.state.PredefinedOptions$1.<init>(PredefinedOptions.java:84)
    	at org.apache.flink.contrib.streaming.state.PredefinedOptions.<clinit>(PredefinedOptions.java:79)
    	at org.apache.flink.contrib.streaming.state.RocksDBOptions.<clinit>(RocksDBOptions.java:80)
    	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.<init>(EmbeddedRocksDBStateBackend.java:223)
        ...
    Because my project’s flink-statebackend-rocksdb:1.16.0 points the other packages. (Hudi and Spark also have rocksdb dependencies..) I notice Hudi & Spark rocksdb dependency is
    org.rocksdb:rocksdbjni:6.20.3
    , but flink-statebackend-rocksdb:1.16.0 has
    com.ververica:frocksdbjni:6.20.3-ververica-1.0
    under that. And there is no enum field “NONE” in
    org.rocksdb:rocksdbjni:6.20.3
    ! How can i fix it?
    z
    • 2
    • 6
1...464748...98Latest