Hi everyone! I facing some problems converting an ...
# troubleshooting
l
Hi everyone! I facing some problems converting an object to a JSON:
Copy code
package spendreport

import com.google.gson.Gson
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.KafkaSink
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.streaming.api.datastream.DataStream

object FraudDetectionJob {
    private fun inContainer(): Boolean {
        return System.getenv("RUNNING_IN_CONTAINER")?.toBoolean() ?: false
    }

    data class Article(
        val id: Int, val stage1: Int, val stage2: Int, val stage3: Int, val stage4: Int, val stage5: Int
    )

    @Throws(Exception::class)
    @JvmStatic
    fun main(args: Array<String>) {
        val brokers: String = if (inContainer()) "kafka:9092" else "localhost:9094"

        // Configurations
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.disableOperatorChaining()
        val source =
            KafkaSource.builder<String>().setBootstrapServers(brokers).setTopics("input").setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(SimpleStringSchema())
                .build()
        val kafkaStream = env.fromSource(
            source, WatermarkStrategy.noWatermarks(), "Kafka Source"
        ).map {
            Gson().fromJson(it, Article::class.java)
        } as DataStream<Article>


        val sink = KafkaSink.builder<String>().setBootstrapServers(brokers).setRecordSerializer(
            KafkaRecordSerializationSchema.builder<String>().setTopic("output")
                .setValueSerializationSchema(SimpleStringSchema()).build()
        ).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()

//        val stage = kafkaStream.map {
//            Gson().toJson(it)
//        }
//
//        stage.sinkTo(sink)

        env.execute("Flink Kafka Consumer and Producer Job with Backpressure")
    }
}
When I uncommented the lines the code stopped working. Does anybody have any tips on how to solve this?
Copy code
val stage = kafkaStream.map {
            Gson().toJson(it)
        }

        stage.sinkTo(sink)