Hey fellow Flinkers, I currently have a Flink job...
# troubleshooting
r
Hey fellow Flinkers, I currently have a Flink job that handles reading from a source (Kafka) and writing output to three separate sinks: Kafka, Elasticsearch, and Postgres. I’m seeing some issues with performance where the job will seemingly stall after a period of time. I’ve surmised that this bottleneck/backpressure seems to be related to Elasticsearch, but I can’t quite prove that. I suspect that because the job is having trouble writing to ES, it’s failing to commit the offsets for the original messages, and thus, not progressing to continue to read from Kafka. Is there any way to further rule out any of the other sinks in this scenario? Just trying to isolate the problem down to a specific sink so that I can uncork the bottleneck Happy to share other details about the job if that would be helpful.
At present the meat of the pipeline looks something like this:
Copy code
// Route users to Postgres
incidents
    .getSideOutput(Tags.users)
    .map{ user -> PostgresUser(user) }
    .returns(TypeInformation.of(PostgresUser::class.java))
    .name("map-users-to-postgres-users")
    .uid("map-users-to-postgres-users")
    .addSink(PostgresSink.forUser(parameters))
    .uid("write-users-to-postgres")
    .name("write-users-to-postgres")

// Send enriched incidents to Elastic
incidents
    .process(SanitizeElasticFunction())
    .uid("sanitize-incidents-for-elasticsearch")
    .name("sanitize-incidents-for-elasticsearch")
    .addSink(DynamicElasticsearchSink(IncidentElasticsearchRouter(parameters)))
    .name("send-incident-to-elasticsearch")
    .uid("send-incident-to-elasticsearch")


// Send enriched incidents to Kafka
incidents
    .map { x -> x.apply { remove("route") } }
    .returns(TypeInformation.of(JsonObject::class.java))
    .name("remove-route-for-kafka")
    .uid("remove-route-for-kafka")
    .sinkTo(buildKafkaSink(parameters))
    .name("send-enriched-incidents-to-kafka")
    .uid("send-enriched-incidents-to-kafka")
In the above case the
DynamicElasticsearchSink
accepts a router that keeps track of several Elasticsearch sinks behind the scenes since messages can be dynamically routed to different indices (based on the tenant) via something like this:
Copy code
class DynamicElasticsearchSink<ElementT, RouteT, SinkT : ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
    /**
     * Defines a router that maps an element to its corresponding ElasticsearchSink instance
     * @param sinkRouter A [ElasticSinkRouter] that takes an element of type [ElementT], a string-based route
     * defined as [RouteT] which is used for caching sinks, and finally the sink itself as [ElasticsearchSink]
     */
    private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {

    // Store a reference to all the current routes
    private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()

    /* Omitted for brevity */

    override fun invoke(value: ElementT, context: SinkFunction.Context) {
        val route = sinkRouter.getRoute(value)
        var sink = sinkRoutes[route]
        if (sink == null) {
            // Build a new sink for this key and cache it for later use
            sink = sinkRouter.createSink(route, value)
            sink.runtimeContext = runtimeContext
            sink.open(configuration)
            sinkRoutes[route] = sink
        }

        sink.invoke(value, context)
    }
Not sure if anything is glaringly wrong here or if this makes sense, but scratching my head a bit since I can’t isolate the bottleneck. Currently attempting to throw DEBUG-level logging onto the job to see if that provides some insight into the internal BulkProcessor calls behind the scenes.
After enabling debugging, I’ve noticed that I can see it issuing the requests to Elasticsearch, however it seems to stop after some period of time. It looks like it aligns when I receive a message that issues an API request, which if it comes back invalid (or not ready) it’ll store that message in state and wait for the next time it sees a message from that tenant to evict the previous messages (if the the response was good). Something like this:
Copy code
// Get the current configuration (for this tenant)
var someConfigurationInfo = configuration.value()
val queuedMessages = messagesAwaitingConfiguration.get()

// If we don't have configuration, get it
if (someConfigurationInfo == nulL){
    // Get the configuration from an endpoint
    someConfigurationInfo = lookupConfigurationFromEndpoint(...)

    // If it's bad, queue the message and just wait until it's good
    if (someConfigurationInfo == null){
        messagesAwaitingConfiguration.add(message)
    }
    else {
        // Configuration was good, store it (so we don't have to request it every time for this tenant)
        configuration.update(someConfigurationInfo)

        // Evict any previous messages from state
        evictQueuedMessages(someConfigurationInfo, context.currentKey, out)

        // Output the current message
        out.collection(currentMessage)
    }
}
else {
    out.collection(currentMessage)
}
I’m assuming that adding a message to state is enough such that the job will continue processing messages (and commiting the offset for this one)?
Following up on this - the issue/troublesome sink was in fact Postgres, which had run out of disk space and eventually the job couldn’t connect to it (per the TM logs). So phrasing my question a bit differently: when a checkpoint or sink is failing, is it possible to know which of those was lagging and causing the checkpoint to fail? Looking at the graph for the job, all three of the sinks seem to be merged/grouped under the same single operator, so it’s not quite clear which of them is to blame when something goes wrong.