I'm using Apache Flink 1.15 with RocksDB state bac...
# troubleshooting
m
I'm using Apache Flink 1.15 with RocksDB state backend. Is the Compaction filter strategy enabled by default?
d
No, In Flink 1.15 it’s not on by default and you need to set apply a RocksDBConfigSetter to RocksDBStateBackend
Copy code
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.contrib.streaming.state.RocksDBConfigSetter;

public class CustomRocksDBConfigSetter implements RocksDBConfigSetter {

    @Override
    public void setConfig(String propertyName, String propertyValue) {
        // You can set other RocksDB options here if needed
    }

    @Override
    public void setOptions(DBOptions options) {
        // Set a custom compaction filter factory
        options.setCompactionFilterFactory(new NativeCompactionFilterFactory() {
            @Override
            public CompactionFilter createCompactionFilter(String columnFamilyName) {
                return new YourCustomCompactionFilter(); // Replace with your custom filter class
            }
        });
    }

    @Override
    public void setColumnFamilyOptions(ColumnFamilyOptions options, String columnFamilyName) {
        // Additional configurations for specific column families can be set here
    }
}
It’s applied like this:
Copy code
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;

// Inside your Flink job setup
RocksDBStateBackend rocksDBBackend = new RocksDBStateBackend(
    new Path("/path/to/checkpoint_directory"), // Specify your checkpoint directory
    true // Enable incremental checkpoints
);

// Apply the custom RocksDB configuration
rocksDBBackend.setRocksDBConfigSetter(new CustomRocksDBConfigSetter());
m
I'm using Managed Apache Flink by AWS, so I think I can't configure RocksDB in the code
Instead can I tell to AWS support team to configure this property 'state.backend.rocksdb.ttl.compaction.filter.enabled'?
d
hmm. that might be a question for AWS team. They might be applying it already or have another way to apply it
I would think they have some way to activate this. But with Flink 1.15 I believe they it requires a custom implementation
m
Why in flink 1.15 it is more difficult to activate it then Flink 1.8? 🤯
d
It’s not necessarily more difficult it has to do with the requirement for a custom compaction filter. There maybe other ways in 1.15 to achieve your objectives.
You can use StateTTLConfig to set some things
Starting with Flink 1.11 you can also use more aggressive periodic cleanup strategy vs. the default lazy clean up
Also if you use incremental checkpoints (assuming this is an option for you), expired state is not included in new checkpoint snapshots
That will reduce size of checkpoint data and thus reduce cleanup effort
m
mmm no. I use Incremental checkpoint but the checkpoint size is always growing
d
well are you also at the same time using periodic cleanup?
Make sure TTL is correctly set for all relevant state descriptors. And you are enabling background periodic cleanup
Copy code
StateTtlConfig#enableCleanupInBackground(true)
Beyond that make sure that all TTL state is configured correctly for all relevant state descriptors.
You should use Flink metrics to monitor
Copy code
state.backend.rocksdb.total-delayed-state-size
and also
Copy code
num-delayed-keys
It can also be indirectly helpful to optimize rocksdb config like
Copy code
write-buffer-size, and max-write-buffer-number
Look into any long lived records that are exceeding their TTL and not getting expired
This could be an issue with timestamps (type of timestamp used) or how watermarks strategy is implemented
There are many possibilities for what causes the buildup. You need to monitor and systematically eliminate the possibilities. Let us know what you find. I am quite sure that you do not depend on the compaction filter to achieve your goals although its a viable option if you can configure it on AWS.
m
I'm not currently using a periodic cleanup strategy
thanks for the help!
The method enableCleanupInBackground doesn't exist. disableCleanupInBackgroud instead exists
d
oh ok. I wonder if that means its on by default. Not sure.
m
The answer is here
d
Yes, so since 1.10 it appears its on by default. I would say then you need to see closer whats happening with the state. log at debug level and set the TTL on some state. Take snapshots before and after to see if its being cleaned up
I think we should conclude that probably TTL Config on some state descriptors is incorrect or there is an issue with timestamp/watermarks. Enable debugging logs to take a closer look
m
What log should I see when a cleanup is done?
d
you should TTL initialization logs like “Starting cleanup of expired state”
“cleaned up x bytes of state for key Y in descriptor Z”. Total cleaned up state xMB etc
With debug you might see more granular as well
It’s also important to watch the progression of watermarks to make sure thats working correctly.
This is a good use of time as watermarks a fundamental aspect of flink stream processing. You want to know how to verify that your watermarks are implemented correctly
“Oberserved Watermark X for Stream Y” etc
For ending you might see “Finish State cleanup task” or similar.
m
I'm using the ProcessingTime, not watermarking
d
hmm …
That could likely be the problem right there
Its difficult to get ProcessingTime to work correctly and deal with delays etc.