https://kotlinlang.org logo
Join SlackCommunities
Powered by
# kafka
  • w

    waltermcq

    12/10/2020, 3:38 PM

    https://www.youtube.com/watch?v=Y-sqGKsnSHI▾

  • l

    lutz

    01/22/2021, 10:17 PM
    Does anyone have recommendations for unit testing a consumer? I'm not sure if I'm just googling the wrong things or it's generally advised to use integration tests instead. Can't seem to figure out how to correctly mock a KStream
    f
    j
    s
    • 4
    • 5
  • t

    tim

    01/28/2021, 12:16 PM
    Hey folks! I'm new to kafka ... wondering if there are any kotlin-recommended libraries out there for getting started or if the current approach is to use the java libraries? 🙏
    f
    a
    +2
    • 5
    • 7
  • j

    Júlio Santos

    08/06/2021, 2:50 PM
    Guys, I'm having trouble thinking of a way to start Kafka consumers on Ktor's Application, would you have any tips? Please
    g
    • 2
    • 4
  • a

    Adam S

    08/08/2022, 8:58 PM
    I've released Kotka Streams on Maven Central - it's a real helper if you're using Kafka Streams in a Kotlin application https://kotlinlang.slack.com/archives/C0BJ0GTE2/p1659979064029999
  • j

    João Gabriel Zó

    09/30/2022, 12:40 PM
    I’m trying to setup Schema Registry with Testcontainers but I’m receiving
    Connection Refused
    and running my tests. I’m currently using
    http://${schemaRegistryContainer.host}:8081
    as the URL. Anybody ever gone through something like it or any ideas on how to fix it?
    s
    • 2
    • 6
  • m

    Michael Petri

    10/21/2022, 12:41 PM
    Hey ppl ✌️ i'm currently playing around with the kstream/table examples from confluent, and everytime i create the kstreams instance with my topology an exceptions gets thrown:
    Copy code
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 7.1.0-ccs
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c86722379ab997cc
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1666355905650
    [kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.admin.client for adminclient-1 unregistered
    [kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
    [kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
    [kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$ListSize
    	at org.apache.kafka.streams.StreamsConfig.<clinit>(StreamsConfig.java:821)
    	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:738)
    	at MainKt.main(Main.kt:30)
    Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$ListSize
    	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
    	... 3 more
    Any idea how to debug this?
    l
    • 2
    • 13
  • m

    Michael Petri

    10/25/2022, 7:08 AM
    Hello folks, i have a debezium/kafka setup where i have to write two topics in one transaction to the destination database, because of this i started an application to join both topics. Every thing works so far as expected. Now i wonder what will be best practice to write things to DB. 1. Write to db from the same application? 2. Write to a new topic and use another app to write to db? 3. Replace the custom join implementation by kdbsql and just write to db in the custom application? Any experience? Thanks!
    s
    • 2
    • 6
  • m

    Michael Petri

    02/13/2023, 12:26 PM
    Anyone here who has experience with the kafka list serde? I can't initiate a new instance for
    ArrayList<String>
    Here my aggregate call
    Copy code
    .aggregate(
                    { ArrayList<String>() },
                    { _, event, history ->
                        history.add(event)
                        history
                    },
                    Materialized.with(
                        Serdes.Long(),
                        Serdes.ListSerde(
                            ArrayList::class.java,
                            Serdes.StringSerde()
                        )
                    )
                )
    and in the screenshot the error. Maybe someone has an idea 🙏 Thanks!
    a
    • 2
    • 33
  • g

    gavvvr

    06/26/2024, 1:52 PM
    Hello, community! 👋🏻 The codebase I'm working on right now is using
    io.github.nomisrev:kotlin-kafka
    bindings by @simon.vergauwen instead of directly using the official
    org.apache.kafka:kafka-clients
    . So, instead of using a
    KafkaConsumer
    from the official JVM connector, the app uses
    io.github.nomisRev.kafka.receiver.KafkaReceiver
    for handling Kafka messages. I'd like to understand that the connection between app's consumer and Kafka topic is still alive. Is there any way to retrieve such a connection health information without rewriting everything to
    org.apache.kafka:kafka-clients
    ? 🤔 As I can see the Cohort project is using the
    last-poll-seconds-ago
    metric from
    KafkaConsumer
    . Is there any API to access such information if messages handling code is implemented using the
    io.github.nomisrev:kotlin-kafka
    ?
    g
    • 2
    • 1
  • a

    Asaf Peleg

    03/03/2025, 3:55 PM
    I had something pretty unusual happen last week that I haven't observed before. A single topic/consumer got backed up due to a parsing error (deserializing avro record into data class pojo). After the change was reverted things started flowing again but a lot of the records were processed multiple times (for some reason seems to be exactly 9?) before committed. What is confusing is that the records look ACK'd each time (at least I think so) due to a log that's prints right afterwards. I'm hoping someone could provide some clues/ideas to help me get a better understanding of the behavior? Context: • MSK • Ktor • Kafka Lib
    io.github.nomisrev:kotlin-kafka
    • Avro (via glue registry) • AWS glue deserializer
    com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
    SImplified event loop
    Copy code
    import io.github.nomisRev.kafka.receiver.KafkaReceiver
    
    suspend fun <T> startKafkaConsumer(consumer: KafkaConsumer<T>) {
      val settings = receiverSettings(consumer.group())
      <http://logger.info|logger.info> { "Starting Kafka consumer ${consumer.group()} for topic ${consumer.topic().topicName}" }
      KafkaReceiver(settings)
        .receive(consumer.topic().topicName)
        .map { record ->
          val recordValue = consumer.parseRecord(record.value())
          <http://logger.info|logger.info> { "[Kafka Consumer] found record" } // <--- I see this log 9x for each record/offset
    
          if (consumer.shouldProcess(recordValue)) {
            try {
              retry(2) {
                consumer.consume(recordValue)
              }
            } catch (e: Exception) {
              logger.atError { "[Kafka Consumer] error" }
              throw e
            }
          }
          record
        }
        .retry {
          logger.atError { "[Kafka Consumer] error" }
          delay(1000)
          true
        }
        .collect {
          it.offset.acknowledge()
          logger.atInfo { "[Kafka Consumer] ack record" } // <--- I see this log 9x for each record/offset
        }
    }
    s
    • 2
    • 14