lutz
01/22/2021, 10:17 PMtim
01/28/2021, 12:16 PMJúlio Santos
08/06/2021, 2:50 PMAdam S
08/08/2022, 8:58 PMJoão Gabriel Zó
09/30/2022, 12:40 PMConnection Refused
and running my tests.
I’m currently using http://${schemaRegistryContainer.host}:8081
as the URL.
Anybody ever gone through something like it or any ideas on how to fix it?Michael Petri
10/21/2022, 12:41 PM[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 7.1.0-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c86722379ab997cc
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1666355905650
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.admin.client for adminclient-1 unregistered
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$ListSize
at org.apache.kafka.streams.StreamsConfig.<clinit>(StreamsConfig.java:821)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:738)
at MainKt.main(Main.kt:30)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$ListSize
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
... 3 more
Any idea how to debug this?Michael Petri
10/25/2022, 7:08 AMMichael Petri
02/13/2023, 12:26 PMArrayList<String>
Here my aggregate call
.aggregate(
{ ArrayList<String>() },
{ _, event, history ->
history.add(event)
history
},
Materialized.with(
Serdes.Long(),
Serdes.ListSerde(
ArrayList::class.java,
Serdes.StringSerde()
)
)
)
and in the screenshot the error.
Maybe someone has an idea 🙏
Thanks!gavvvr
06/26/2024, 1:52 PMio.github.nomisrev:kotlin-kafka
bindings by @simon.vergauwen instead of directly using the official org.apache.kafka:kafka-clients
.
So, instead of using a KafkaConsumer
from the official JVM connector, the app uses io.github.nomisRev.kafka.receiver.KafkaReceiver
for handling Kafka messages.
I'd like to understand that the connection between app's consumer and Kafka topic is still alive. Is there any way to retrieve such a connection health information without rewriting everything to org.apache.kafka:kafka-clients
? 🤔
As I can see the Cohort project is using the last-poll-seconds-ago
metric from KafkaConsumer
. Is there any API to access such information if messages handling code is implemented using the io.github.nomisrev:kotlin-kafka
?Asaf Peleg
03/03/2025, 3:55 PMio.github.nomisrev:kotlin-kafka
• Avro (via glue registry)
• AWS glue deserializer com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
SImplified event loop
import io.github.nomisRev.kafka.receiver.KafkaReceiver
suspend fun <T> startKafkaConsumer(consumer: KafkaConsumer<T>) {
val settings = receiverSettings(consumer.group())
<http://logger.info|logger.info> { "Starting Kafka consumer ${consumer.group()} for topic ${consumer.topic().topicName}" }
KafkaReceiver(settings)
.receive(consumer.topic().topicName)
.map { record ->
val recordValue = consumer.parseRecord(record.value())
<http://logger.info|logger.info> { "[Kafka Consumer] found record" } // <--- I see this log 9x for each record/offset
if (consumer.shouldProcess(recordValue)) {
try {
retry(2) {
consumer.consume(recordValue)
}
} catch (e: Exception) {
logger.atError { "[Kafka Consumer] error" }
throw e
}
}
record
}
.retry {
logger.atError { "[Kafka Consumer] error" }
delay(1000)
true
}
.collect {
it.offset.acknowledge()
logger.atInfo { "[Kafka Consumer] ack record" } // <--- I see this log 9x for each record/offset
}
}