Rion Williams
05/30/2023, 9:22 PMRion Williams
05/30/2023, 9:27 PM// Route users to Postgres
incidents
.getSideOutput(Tags.users)
.map{ user -> PostgresUser(user) }
.returns(TypeInformation.of(PostgresUser::class.java))
.name("map-users-to-postgres-users")
.uid("map-users-to-postgres-users")
.addSink(PostgresSink.forUser(parameters))
.uid("write-users-to-postgres")
.name("write-users-to-postgres")
// Send enriched incidents to Elastic
incidents
.process(SanitizeElasticFunction())
.uid("sanitize-incidents-for-elasticsearch")
.name("sanitize-incidents-for-elasticsearch")
.addSink(DynamicElasticsearchSink(IncidentElasticsearchRouter(parameters)))
.name("send-incident-to-elasticsearch")
.uid("send-incident-to-elasticsearch")
// Send enriched incidents to Kafka
incidents
.map { x -> x.apply { remove("route") } }
.returns(TypeInformation.of(JsonObject::class.java))
.name("remove-route-for-kafka")
.uid("remove-route-for-kafka")
.sinkTo(buildKafkaSink(parameters))
.name("send-enriched-incidents-to-kafka")
.uid("send-enriched-incidents-to-kafka")
Rion Williams
05/30/2023, 9:29 PMDynamicElasticsearchSink
accepts a router that keeps track of several Elasticsearch sinks behind the scenes since messages can be dynamically routed to different indices (based on the tenant) via something like this:
class DynamicElasticsearchSink<ElementT, RouteT, SinkT : ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
/**
* Defines a router that maps an element to its corresponding ElasticsearchSink instance
* @param sinkRouter A [ElasticSinkRouter] that takes an element of type [ElementT], a string-based route
* defined as [RouteT] which is used for caching sinks, and finally the sink itself as [ElasticsearchSink]
*/
private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {
// Store a reference to all the current routes
private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
/* Omitted for brevity */
override fun invoke(value: ElementT, context: SinkFunction.Context) {
val route = sinkRouter.getRoute(value)
var sink = sinkRoutes[route]
if (sink == null) {
// Build a new sink for this key and cache it for later use
sink = sinkRouter.createSink(route, value)
sink.runtimeContext = runtimeContext
sink.open(configuration)
sinkRoutes[route] = sink
}
sink.invoke(value, context)
}
Rion Williams
05/30/2023, 9:41 PMRion Williams
05/30/2023, 10:00 PM// Get the current configuration (for this tenant)
var someConfigurationInfo = configuration.value()
val queuedMessages = messagesAwaitingConfiguration.get()
// If we don't have configuration, get it
if (someConfigurationInfo == nulL){
// Get the configuration from an endpoint
someConfigurationInfo = lookupConfigurationFromEndpoint(...)
// If it's bad, queue the message and just wait until it's good
if (someConfigurationInfo == null){
messagesAwaitingConfiguration.add(message)
}
else {
// Configuration was good, store it (so we don't have to request it every time for this tenant)
configuration.update(someConfigurationInfo)
// Evict any previous messages from state
evictQueuedMessages(someConfigurationInfo, context.currentKey, out)
// Output the current message
out.collection(currentMessage)
}
}
else {
out.collection(currentMessage)
}
I’m assuming that adding a message to state is enough such that the job will continue processing messages (and commiting the offset for this one)?Rion Williams
05/31/2023, 4:44 PM