mohan tummala
08/08/2024, 6:26 PMStreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf);
flinkEnv.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
flinkEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
flinkEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
//flinkEnv.getConfig().setLatencyTrackingInterval(1000L);
//flinkEnv.getConfig().disableGenericTypes();
ARGOS_LOGGER.warn("Default Parallelism =" + StreamExecutionEnvironment.getDefaultLocalParallelism());
if (config.getProperty("flink.parallelism") != null && config.getProperty("flink.parallelism").length() > 0) {
int parallelism = Integer.parseInt(config.getProperty("flink.parallelism"));
flinkEnv.setParallelism(parallelism);
}
ARGOS_LOGGER.warn("Final Parallelism =" + flinkEnv.getParallelism());
// add fanflow as a source to Flink
Set<String> topics = new HashSet<>(Arrays.asList(config.getProperty("fanflow.topic").split(",")));
topics.remove("");
ARGOS_LOGGER.error("Topics: " + topics + " GroupId: " + config.getProperty("group.id"));
KafkaSource<String> fanflowSource = KafkaSource.<String>builder()
.setBootstrapServers(config.getProperty("fanflow.exhibitor"))
.setTopics(topics.toArray(new String[0])).setGroupId(config.getProperty("group.id"))
.setValueOnlyDeserializer(new MsgPackUnpacker())
.setProperty("fetch.message.max.bytes", "5242880")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).build();
D. Draco O'Brien
08/08/2024, 6:43 PMD. Draco O'Brien
08/08/2024, 6:44 PMD. Draco O'Brien
08/08/2024, 6:47 PMmohan tummala
08/08/2024, 6:48 PMmohan tummala
08/08/2024, 6:54 PMD. Draco O'Brien
08/08/2024, 6:58 PMD. Draco O'Brien
08/08/2024, 6:59 PMD. Draco O'Brien
08/08/2024, 6:59 PMD. Draco O'Brien
08/08/2024, 7:01 PMmohan tummala
08/08/2024, 7:03 PMD. Draco O'Brien
08/08/2024, 7:14 PMmohan tummala
08/08/2024, 7:14 PMD. Draco O'Brien
08/08/2024, 7:18 PMD. Draco O'Brien
08/08/2024, 7:21 PMD. Draco O'Brien
08/08/2024, 7:24 PMmohan tummala
08/08/2024, 7:32 PMmohan tummala
08/08/2024, 7:32 PMHow frequently is your Flink job configured to commit offsets?How to find this?
D. Draco O'Brien
08/08/2024, 7:35 PM./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --list
D. Draco O'Brien
08/08/2024, 7:36 PM./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --describe --group <your.group.id>
mohan tummala
08/08/2024, 7:39 PMD. Draco O'Brien
08/08/2024, 7:39 PMmohan tummala
08/08/2024, 7:40 PMD. Draco O'Brien
08/08/2024, 7:43 PMD. Draco O'Brien
08/08/2024, 7:43 PMkubectl run -it --rm --image confluentinc/cp-kafka kafka-client -- /bin/bash -c "kafka-consumer-groups.sh --bootstrap-server <kafka-bootstrap-server> --describe --group <your.group.id>"
D. Draco O'Brien
08/08/2024, 7:43 PMD. Draco O'Brien
08/08/2024, 7:44 PMD. Draco O'Brien
08/08/2024, 7:44 PMD. Draco O'Brien
08/08/2024, 7:45 PMD. Draco O'Brien
08/08/2024, 7:52 PMD. Draco O'Brien
08/08/2024, 7:54 PMD. Draco O'Brien
08/08/2024, 7:54 PMD. Draco O'Brien
08/08/2024, 7:55 PMmohan tummala
08/08/2024, 8:30 PMmohan tummala
08/08/2024, 8:31 PMD. Draco O'Brien
08/08/2024, 8:49 PMmohan tummala
08/08/2024, 8:50 PMD. Draco O'Brien
08/08/2024, 10:55 PM