Hi all, I have a flink job with source as kafka an...
# troubleshooting
m
Hi all, I have a flink job with source as kafka and sink as cassandra. when i run this flink job on multiple ec2 instances they are all consuming same messages even though they same group.id. i am using flink 1.16 and java11. am i doing something wrong in the below code? what should we do to process a kafka message only once when running flink job on multiple machines with same group.id?
Copy code
StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        flinkEnv.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        flinkEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        flinkEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        //flinkEnv.getConfig().setLatencyTrackingInterval(1000L);
        //flinkEnv.getConfig().disableGenericTypes();

        ARGOS_LOGGER.warn("Default Parallelism =" + StreamExecutionEnvironment.getDefaultLocalParallelism());
        if (config.getProperty("flink.parallelism") != null && config.getProperty("flink.parallelism").length() > 0) {
            int parallelism = Integer.parseInt(config.getProperty("flink.parallelism"));
            flinkEnv.setParallelism(parallelism);
        }
        ARGOS_LOGGER.warn("Final Parallelism =" + flinkEnv.getParallelism());

        // add fanflow as a source to Flink
        Set<String> topics = new HashSet<>(Arrays.asList(config.getProperty("fanflow.topic").split(",")));
        topics.remove("");
        ARGOS_LOGGER.error("Topics: " + topics + " GroupId: " + config.getProperty("group.id"));
        KafkaSource<String> fanflowSource = KafkaSource.<String>builder()
                .setBootstrapServers(config.getProperty("fanflow.exhibitor"))
                .setTopics(topics.toArray(new String[0])).setGroupId(config.getProperty("group.id"))
                .setValueOnlyDeserializer(new MsgPackUnpacker())
                .setProperty("fetch.message.max.bytes", "5242880")
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).build();
d
It seems like you’ve set up your Flink job correctly for processing Kafka messages with exactly-once semantics, and you’re also ensuring that all instances of your job share the same group.id, which is a fundamental step in Kafka to ensure that a message is only processed by one consumer within consumer group. So far so good.
There could be a few reasons why you might still see duplication: Consumer Offset Synchronization When running your Flink job across multiple EC2 instances,make sure all instances have access to commit their offsets back to Kafka. This typically happens via ZooKeeper or Kafka’s internal offset storage if you’re using newer versions with the Kafka offset storage mode. If there’s any issue with committing these offsets, it can lead to rebalancing issues where each instance thinks it needs to consume all messages.
Your setup includes checkpointing and a restart strategy. If the job is frequently restarting due to failures before it can commit its offsets, it may re-consume messages. Make sure your checkpoints are completing successfully and that there are no issues causing frequent job restarts.
m
wouldn't flink tasks be idle if parallelism is > no of kafka parititions?
i have checked the parallelism, we are running this job on 3 ec2 instances with parallelism set to 34 and no of partitions are 100.
d
So, while not an exact match to each partition, your current configuration is in line with best practices for maintaining a scalable and resource-efficient system, as long as each task is actively processing data from at least one partition.
exact match is to deal specifically with the question of optimal resource utilization and idle tasks.
but for a production system what you have is reasonable.
The key is to ensure that the parallelism is set in a manner that maximizes resource utilization without overwhelming the system, and your setup seems to align with this principle, taking into account potential growth and scalability needs. In conclusion, while aiming for an exact match is advisable for optimal resource use, practical implementations can often be different based on specific requirements, scalability plans, and operational realities. Your current setup, with more partitions than tasks, is a common and practical approach that provides room for growth and maintains efficiency under most circumstances.
m
@D. Draco O'Brien are you a bot?
d
Have you checked that group.id is not modified anywhere else in your code?
m
yes it's not getting modified anywhere.
d
ok ..
Zookeeper is part of your configuration?
How frequently is your Flink job configured to commit offsets?
m
no zookeeper is not part of my configuration
How frequently is your Flink job configured to commit offsets?
How to find this?
d
run the following to check exactly how many instances of the consumer group you have.
Copy code
./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --list
also share the following info about the group (removing whatever you cannot share)
Copy code
./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --describe --group <your.group.id>
m
kafka-consumer-groups.sh i don't have this file
d
How are you deploying Flink? K8s?
m
on aws ec2 instance using docker images of the java program containing flink
d
ok that shell script is part of kafka installation not Flink
Copy code
kubectl run -it --rm --image confluentinc/cp-kafka kafka-client -- /bin/bash -c "kafka-consumer-groups.sh --bootstrap-server <kafka-bootstrap-server> --describe --group <your.group.id>"
This is one way to access it if your using docker.
You can also install Kafka CLI tools locally.
I think running this is the next step for you as it might show enough what is at issue.
Once you are able to run this command here is what you should be looking for:
1. Ensure there is a valid coordinator assigned to the group. Absence or errors here could indicate issues with group management. 2. The state should be “Stable” for a healthy consumer group. “PreparingRebalance”, “CompletingRebalance”, or “Dead” states indicate problems.
3. CURRENT-OFFSET and LOG-END-OFFSET: These values give an idea of how far behind a consumer is in processing messages. Large differences might suggest processing delays or issues. 4. Check the member list for inconsistencies. Each member represents a consumer in your group. Ensure the number of members aligns with your expectations based on Flink parallelism and that there are no duplicate or unexpected members.
5. Assigned Partitions: Verify that partitions are evenly distributed among members (Flink tasks) without duplication. Follow these 5 checks and you can diagnose and resolve the issues related to multiple consumer groups, checkpointing, and offset management in your Flink-Kafka setup. (edited)
Let us know what you see when you run this and I or someone else will be probably be able to determine the exact cause. It definitely seems to be in the consumer group or offset management area. You just need more visibility into that.
m
i am seeing state empty and members 0 for my consumer groups the lag is not too much
Screenshot 2024-08-09 at 2.00.55 AM.png
d
seems to be one with really high messages behind …
m
yes that topic has a really volume of messages
d
Good that you now have basic visibility into your cluster. That’s essential for troubleshooting your issue.