Lucas Martins Soares
09/18/2023, 4:36 PMpackage spendreport
import com.google.gson.Gson
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.KafkaSink
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
import org.apache.flink.streaming.api.datastream.DataStream
object FraudDetectionJob {
private fun inContainer(): Boolean {
return System.getenv("RUNNING_IN_CONTAINER")?.toBoolean() ?: false
}
data class Article(
val id: Int, val stage1: Int, val stage2: Int, val stage3: Int, val stage4: Int, val stage5: Int
)
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
val brokers: String = if (inContainer()) "kafka:9092" else "localhost:9094"
// Configurations
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.disableOperatorChaining()
val source =
KafkaSource.builder<String>().setBootstrapServers(brokers).setTopics("input").setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(SimpleStringSchema())
.build()
val kafkaStream = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "Kafka Source"
).map {
Gson().fromJson(it, Article::class.java)
} as DataStream<Article>
val sink = KafkaSink.builder<String>().setBootstrapServers(brokers).setRecordSerializer(
KafkaRecordSerializationSchema.builder<String>().setTopic("output")
.setValueSerializationSchema(SimpleStringSchema()).build()
).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build()
// val stage = kafkaStream.map {
// Gson().toJson(it)
// }
//
// stage.sinkTo(sink)
env.execute("Flink Kafka Consumer and Producer Job with Backpressure")
}
}
When I uncommented the lines the code stopped working. Does anybody have any tips on how to solve this?
val stage = kafkaStream.map {
Gson().toJson(it)
}
stage.sinkTo(sink)