Yaroslav Bezruchenko
08/21/2024, 12:05 PMorg.rocksdb.RocksDBException: WriteBatch has wrong count
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:955)
at org.apache.flink.contrib.streaming.state.RocksDBMapState.put(RocksDBMapState.java:139)
at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
.........
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
We are using:
Flink 1.19.0
Flink Kubernetes Operator 1.9.0
Java 17.0.12+7, temurin
As for dependencies for RocksDB:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.19.0</version>
</dependency>
This is not reproducable locally for me. Any ideas what can cause this?Yaroslav Bezruchenko
08/21/2024, 12:07 PMpublic class NoBlockCacheRocksDbOptionsFactory implements ConfigurableRocksDBOptionsFactory {
@Override
public RocksDBOptionsFactory configure(ReadableConfig readableConfig) {
return this;
}
@Override
public DBOptions createDBOptions(DBOptions dbOptions, Collection<AutoCloseable> collection) {
return dbOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions,
Collection<AutoCloseable> collection) {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
blockBasedTableConfig.setNoBlockCache(true);
// Needed in order to disable block cache;
blockBasedTableConfig.setCacheIndexAndFilterBlocks(false);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(false);
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(false);
columnFamilyOptions.setTableFormatConfig(blockBasedTableConfig);
return columnFamilyOptions;
}
}
Yaroslav Bezruchenko
08/21/2024, 12:09 PMstate.backend.incremental: "true"
state.backend.rocksdb.checkpoint.transfer.thread.num: "1"
state.backend.rocksdb.memory.managed: "false"
state.backend.rocksdb.options-factory: NoBlockCacheRocksDbOptionsFactory
Yaroslav Bezruchenko
08/21/2024, 12:19 PM