https://kotlinlang.org logo
Join Slack
Powered by
# coroutines
  • h

    Hong Phuc

    09/22/2025, 2:15 AM
    Hi guys, can someone comment on how true this comment is https://www.reddit.com/r/Kotlin/comments/1kko2s7/comment/mrytehj/?utm_source=share&u[…]m=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button? The commenter talked about why they wouldn't use ktor/coroutine, one of the issue they mentioned is that debugger doesn't follow the coroutine, but the carrier thread, which I have experienced before, another issue is that parameter will be marked as "optimised out", also the fact that code is "colored" with coroutine. I can't confirm about other issues that was mentioned, so I'm looking for everyone's opinion. Thanks in advance.
    s
    g
    +2
    • 5
    • 14
  • r

    Rob Elliot

    09/23/2025, 11:17 AM
    This is probably a stupid question, but is there anyway to wrap up this:
    Copy code
    sealed interface FooResult
    data object FooCancelled : FooResult
    data class FooSuccess : FooResult
    
    suspend fun foo(): FooResult = TODO()
    
    val deferred = async { foo() }
    deferred.cancel()
    val result = deferred.await()
    so that the last call will never throw
    CancellationException
    , instead returning
    FooCancelled
    ? Without a try / catch (or equivalent) around
    deferred.await()
    ?
    y
    z
    l
    • 4
    • 16
  • t

    Tian Tian098

    09/24/2025, 3:52 AM
    Is it possible to have a MutableStateFlow that continuously produces values? I would like to zip it with another flow as I don't want the resulting flow to be limited by how often my MutableStateFlow gets mutated.
    Copy code
    val foo = MutableStateFlow(0)
    val bar = flow { ... }
    foo.zip(bar) { ... } // I don't want the resulting flow to be limited by how often foo is updated.
    solved 1
    g
    • 2
    • 4
  • j

    Jonas

    09/24/2025, 12:48 PM
    Hey everyone, when I'm converting a callback based API to a
    Flow
    using
    callbackFlow
    , do I have to call
    close()
    after
    trySend(...)
    when the call back only emits once?
    g
    l
    • 3
    • 15
  • j

    Julius Babies

    09/27/2025, 7:20 PM
    Don't know if this is the right channel, but does something an async task queue exist where I provide a coroutine scope in which callbacks can be scheduled to be executed sequentially or simultaneously?
    c
    p
    +2
    • 5
    • 5
  • a

    Artyom Gornostayev

    09/29/2025, 10:03 AM
    Hi guys. Does anybody have an experience in creating observability for active coroutines at runtime (PROD)? I need to know how many active coroutines are in the thread pool. I see there is an
    kotlin-coroutines-debug
    library, which solves my requirements. However, I'm not sure about performance impact + if it is suitable to native builds (according to the docs - not really)... I would be very grateful for advice. :)
    n
    • 2
    • 3
  • c

    CLOVIS

    10/01/2025, 12:51 PM
    Is it possible to disable the automatic dump of coroutines at the end of tests when debug probes are enabled? I'd like to control the exact time at which it happens.
    d
    o
    • 3
    • 10
  • r

    reactormonk

    10/09/2025, 12:36 PM
    How would I encode interruptible code vs. non-interruptible code? Aka ask some Job nicely to cancel, but the job itself can say no, or await until an interruptible section has been reached.
    d
    • 2
    • 16
  • r

    reactormonk

    10/15/2025, 4:16 PM
    I'm trying to figure out why this
    first()
    doesn't get cancelled by the timeout
    j
    • 2
    • 9
  • m

    Mario Andhika

    10/17/2025, 7:04 AM
    Given the following snippet:
    Copy code
    object MyObject {
        var x = 1
        init {
            GlobalScope.launch {
                x++
            }
        }
    }
    
    suspend fun main() = coroutineScope {
        println(MyClass.x.toString()) // output: 1
        delay(100L)
        println(MyClass.x.toString()) // output: 2
    }
    How to make
    MyClass.x.toString()
    print 2 without needing to have a delay statement? How should I change the init block?
    d
    g
    • 3
    • 21
  • p

    Peter

    10/20/2025, 1:48 AM
    Do all scopes get cancelled on it's own, when the application is terminated? I have a scope, that has basically same lifetime as the application. I'm wondering if I should bother closing it. I guess it will get canceled, on JVM shutdown or something?
    s
    e
    r
    • 4
    • 10
  • c

    CLOVIS

    10/20/2025, 6:13 PM
    DebugProbes.install()
    is very slow! Here, it takes between ~1.5s and ~3s depending on the run, and it's called once for each Gradle module
    d
    • 2
    • 3
  • u

    ursus

    10/22/2025, 12:21 AM
    What is the most idiomatic way of having a
    MutableSharedFlow
    (that acts as a trigger to refresh stuff), that has a default value (so syncs get triggered right away on start)? I find manually throwing in `trigger.emit(Unit)`after everyone is subscriber a bit awkwards Unless there is an API I'm not aware of, probably the best would be to have own subclass right? Like this maybe?
    Copy code
    class TriggerFlow : Flow<Unit> {
        private val _trigger = MutableSharedFlow<Unit>(
            replay = 1
        )
        
        init {
            _trigger.tryEmit(Unit)
        }
        
        suspend fun trigger() {
            _trigger.emit(Unit)
        }
        
        override suspend fun collect(collector: FlowCollector<Unit>) {
            _trigger.collect(collector)
        }
    }
    How does this look? Is the
    tryEmit
    in init cool?
    s
    • 2
    • 6
  • l

    Lukas K-G

    10/22/2025, 1:35 PM
    Hi, my colleagues and me had a discussion around a callback flow that was not cancellation-cooperative:
    Copy code
    internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
        trySendBlocking(adItemStatus)
        val statusListener = AdItemStatusListener { _, status -> trySendBlocking(status) }
        addStatusListener(statusListener)
        awaitClose { removeStatusListener(statusListener) }
    }
    We figured that this is likely due to the
    trySendBlocking
    which does a
    runBlocking
    . We then pivoted to the following:
    Copy code
    internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
        send(adItemStatus)
        val statusListener = AdItemStatusListener { _, status -> launch { send(status) }}
        addStatusListener(statusListener)
        awaitClose { removeStatusListener(statusListener) }
    }
    Which we then figured that it might not guarantee the order of events if run on a coroutine dispatcher with multiple threads. What would be the recommended way to implement a cancellation cooperative callback flow with guaranteed event order? Thanks in advance!
    k
    e
    +2
    • 5
    • 29
  • a

    Alex Styl

    10/24/2025, 3:17 AM
    are there any solutions for resumable coroutines that persist over application restarts? I saw this from nextjs yesterday and I would love to have something like this* on my backend *this = having coroutines run and not having to worry about server restarts (ie have
    delay()
    for some days and then continue the coroutine)
    e
    h
    • 3
    • 4
  • w

    Winson Chiu

    10/27/2025, 8:12 PM
    I'm aware of the
    runTest
    workaround for
    withTimeout
    using
    withContext(Dispatchers.Default.limitedParallelism(1))
    , but is there a workaround for
    Flow.timeout
    ? I can
    flowOn
    , but I'd rather not force my entire Flow to a specific dispatcher if possible.
    d
    • 2
    • 1
  • b

    Bernhard

    10/29/2025, 1:57 PM
    I've got a big JSON file that I want to read; for each entry in an array that is read lazily, I want to make a request to an external system; however, I don't want to send each request at once but use a maximum of x requests at once; I also don't want to evaluate more objects than I can currently send; is there an easy way to do that?
    ✅ 1
    j
    c
    w
    • 4
    • 14
  • r

    rocketraman

    10/31/2025, 1:54 PM
    Is
    MDCContext
    (https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-slf4j/src/MDCContext.kt) implemented correctly? It wraps
    MDC
    which is a mutable thread local. The docs for
    ThreadContextElement
    seem to indicate that
    CopyableThreadContextElement
    should be used for this case.
  • r

    Rob Elliot

    11/04/2025, 8:33 PM
    Has anyone written Kotlin Concurrency in Practice yet? I still remember how amazing it was to have my eyes opened by Java Concurrency in Practice. Suspect it's why I still reflexively reach for Java concurrency tools, for all their complexity I feel like I know them well...
    j
    r
    • 3
    • 3
  • d

    David Bieregger

    11/08/2025, 5:47 PM
    Hey guys, does somebody know why
    capacity
    and
    onBufferOverflow
    is not exposed in
    channelFlow
    ? Following usecase:
    Copy code
    suspend fun receiveOutgoingTransactionsAsFlow(capacity: Int): Flow<EventTransaction> = channelFlow(capacity = capacity /* this parameter doesn't exist */) {
            launch {
                sharedFlow.collect { item ->
                    trySend(item).getOrElse {
                        close(Error("Private buffer of size $capacity overflowed."))
                    }
                }
            }
        }
    This logic ensures that subscribers of a SharedFlow can process independently, but also accounts for the fact that the collector might be too slow. The
    capacity
    parameter would be really useful in this case or am I overcomplicating things?
    m
    r
    • 3
    • 6
  • m

    marcinmoskala

    11/11/2025, 10:49 AM
    Hey guys. I have a problem with Dispatchers.Main.immediete. I know the theory, that it guarantees to not redispatch when using withContext from immediete to immediete, but in my experiments, I observe the same behavior when I just use Dispatchers.Main. Do you know any specific code that could show a difference between Dispatchers.Main and Dispatchers.Main.immediete?

    https://www.youtube.com/watch?v=QIYgn7KlUfo▾

    s
    c
    • 3
    • 5
  • g

    groostav

    11/13/2025, 6:27 AM
    Hey so, If I'm writing a desktop CLI application, and I want a
    Dispatchers.Main
    but dont want to put
    javafx
    on my classloader, what are my options? Do I have any, or do i need to roll my own
    MainCoroutineDispatcher
    implementation and insert it in the SPI/service-loader?
    m
    l
    +2
    • 5
    • 7
  • s

    S.

    11/22/2025, 10:21 PM
    is this issue still a thing? https://github.com/Kotlin/kotlinx.coroutines/issues/3874 I might be running into the same problem, adding a k/js script to a website running k/js. but the proposed workaround doesn't work for me.
    d
    • 2
    • 3
  • y

    Youssef Shoaib [MOD]

    11/27/2025, 10:38 PM
    Cancellation can suppress other exceptions, which results in them staying unreported. Is this really intentional behaviour?
    Copy code
    launch {
      Foo().use {
        cancel()
        ensureActive()
      }
    }.join()
    class Foo : AutoCloseable {
      override fun close() {
        error("Foo")
      }
    }
    IMO,
    use
    should prioritize the
    close
    exception over a
    CancellationException
    . This is because cancellation is considered "normal", while an
    error
    is truly exceptional, and thus should be reported. The docs say: > Thrown by cancellable suspending functions if the coroutine is cancelled while it is suspended. It indicates normal cancellation of a coroutine. "_normal_" is the keyword there. Normal cancellation shouldn't mask a real exception. To be clear, this is all from the suppression behaviour of
    use
    . My proposal is that
    use
    should never
    addSuppressed
    to a
    CancellationException
    . If you use
    try-finally
    instead of
    use
    , the code reports an error as you'd expect. Error suppression is great, and it prioritising the body of
    use
    is great too, but it shouldn't prioritise
    CancellationException
    since that's an expected exception (dare I say, it's a control flow exception, which Scala also deprioritises similarly)
    👍 2
  • o

    Oğuzhan Soykan

    11/28/2025, 10:41 AM
    A question, is there any caveat of bridging Mono with
    mono{something().await()}
    from coroutine and reactive perspective? Underlying logic of
    mono
    uses
    GlobalScope
    and it worried me a little bit.
    Copy code
    class ComponentHealthIndicator(
      private val conn: Component
    ) : AbstractReactiveHealthIndicator("Component health check failed") {
      override fun doHealthCheck(builder: Health.Builder) : Mono<Health> = mono {
        val ping = conn.ping().await()
        if (isHealthy(ping)) {
          builder.up().build()
        } else {
          builder.down().build()
        }
      }
    }
    s
    • 2
    • 2
  • j

    jean

    12/02/2025, 12:41 PM
    Is there a reason for not having the possibility to set a dispatcher when calling
    launchIn
    ? It can be set with
    scope.launch(dispatcher) { ... }
    but not with
    Copy code
    someFlow
        .onEach { ... }
        .launchIn(scope) // <- I can't set a dispatcher here
    I created this extension but I wonder if there’s a reason for not using it
    Copy code
    fun <T> Flow<T>.launchIn(scope: CoroutineScope, coroutineDispatcher: CoroutineDispatcher): Job = scope.launch(coroutineDispatcher) {
        collect()
    }
    
    // Usage
    someFlow
        .onEach { ... }
        .launchIn(scope, dispatcher)
    v
    w
    • 3
    • 3
  • a

    Alexander Schmidt

    12/02/2025, 3:03 PM
    Hello there, for my work project I would like to create a concurrency module that can be used from Kotlin and Java. Therefore my initial idea was to provide some spring-beans that provide access to a Default and an Io thread pool (just like Dispatchers.Io and Dispatchers.Default) managed by Spring. Every module should then be able to just use the pools via dependency injection for all kinds of tasks. Also I am deriving the coroutine dispatchers directly from the thread pools
    Copy code
    @Configuration
    class ConcurrencyConfig {
        @Bean
        fun executors(properties: ConcurrencyProperties): CustomExecutors {
            val taskDecorator = MDCTaskDecorator()
            return CustomExecutors(
                Default = createExecutor(properties.default, taskDecorator),
                Io = createExecutor(properties.io, taskDecorator, useVirtual = true)
            )
        }
        
        @Bean
        fun dispatchers(executors: CustomExecutors): CustomDispatchers {
            return CustomDispatchers(
                Default = executors.Default.asCoroutineDispatcher(),
                Io = executors.Io.asCoroutineDispatcher()
            )
        }
    }
    Sharing pools between Java and Kotlin makes configuring max-threads and so on much simpler, although Java-Threads might block coroutines. That is the reason why the Io-Executor uses virtual threads. This works quite well - so far, but now I face problems when adding some required features our applications need. 1. All threads and coroutines must have an MDC correlationId 2. Executors and dispatchers must shutdown gracefully within the spring context, allowing running tasks to continue (until a timeout is reached) and also create sub-tasks (which the task needs to successfully complete, e.g. the last step of a task is to create 3 subtasks (io) and then aggregate results (default)). New tasks should be rejected while shutdown is in progress.
    🧵 1
    a
    • 2
    • 2
  • r

    reactormonk

    12/03/2025, 6:31 PM
    Oh great, I think I managed to trigger KT-79276 again on 2.2.21 🙃
    i
    • 2
    • 22
  • a

    Anton Krutko

    12/04/2025, 6:22 AM
    Hello there, In our project we are observing an issue with consuming the data from MutableSharedFlow. Under certain conditions we are cancelling the scope in which we produce and collect data. Then we launch data emission and consuming on the new scope, but on the same global instance of MutableSharedFlow. The issue we face is that after one or several restarts emitted items are no longer delivered to the collector. Below is the minimal example to reproduce the issue. However, reproduction of this issue is greatly affected by the work load performed (or time spent) in the collector. We managed to reproduce this issue on desktop JVM and on Android. Do you see any obvious mistakes we made or spots we missed? Note: correctness of
    counter
    values themselves is not important in that example. Coroutines version
    1.10.2
    and
    1.8.0
    .
    Copy code
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.MutableSharedFlow
    import kotlinx.coroutines.flow.launchIn
    import kotlinx.coroutines.flow.onEach
    
    var counter = 0L
    val mutableSharedFlow = MutableSharedFlow<Long>()
    
    var cs: CoroutineScope? = null
    fun start() {
        cs?.cancel()
        cs = CoroutineScope(Dispatchers.IO + SupervisorJob())
    
        mutableSharedFlow.onEach {
            delay(50)
            println(it)
        }.launchIn(cs!!)
    
        cs!!.launch {
            while (currentCoroutineContext().isActive) {
                mutableSharedFlow.emit(counter)
                counter++
            }
        }
    }
    
    suspend fun main() {
        while (true) {
            start()
            println("restart, counter = $counter")
    
            delay(3000)
        }
    }
    The output we see:
    Copy code
    ....
    47
    48
    49
    50
    51
    52
    53
    54
    restart, counter = 56
    restart, counter = 56
    restart, counter = 56
    restart, counter = 56
    restart, counter = 56
    ... // proper behavior (emissions happening) is never restored after this point
    Any help will be greatly appreciated.
    l
    m
    • 3
    • 5
  • r

    Rey (Kingg22)

    12/10/2025, 4:13 PM
    Hi! I have a question about coroutines and thread confinement. Is it possible to keep a coroutine running strictly on the same thread? I know it might sound obvious, but I just want to confirm my understanding. Context: Some time ago I was experimenting with porting *Hibernate Reactive (
    CompletionStage
    )* to Kotlin coroutines. Hibernate Reactive requires that all operations run on the same thread where the session and the database connection (Vert.x SQL event loop) were created. My idea was to create some sort of internal thread/dispatcher pool to store the dispatcher used to open the connection and session and then use the same dispatcher when awaiting (using extension functions that convert
    CompletableFuture
    to suspending operations). However, it didn’t work, coroutines kept jumping between threads. I also tried using the Vert.x coroutine dispatcher, which seems to propagate the event-loop context correctly, but that still doesn’t guarantee staying on the same physical thread, which HR seems to require. Additionally, HR needs that sessions (and nested transactions) are shared across coroutine calls, so I would need to reuse the dispatcher of an already opened session. I’m thinking this might require a custom
    CoroutineContext
    element for transaction/session propagation, similar to how Exposed handles it. Am I on the right track? So is there any reliable way to enforce strict single-thread execution for coroutines, similar to Vert.x event loop confinement, but without thread hopping? Or is this simply not how coroutines are intended to operate? Thanks!
    s
    o
    • 3
    • 5