https://kotlinlang.org logo
Join SlackCommunities
Powered by
# coroutines
  • y

    Youssef Shoaib [MOD]

    04/07/2025, 6:19 AM
    I know you're "not supposed to" resume the continuation given within
    suspendCoroutineUninterceptedOrReturn
    , but seemingly it works fine! Is that a guaranteed behaviour, or could a future compiler version break that somehow? I understand that the warning against doing that maybe due to stack overflows and such.
  • h

    Hugo Costa

    04/07/2025, 2:05 PM
    Hello, quick question on
    withContext
    . I find myself using MDC a lot like
    Copy code
    val validationId = message.validationUniqueIdentifier
    MDC.put("validationRequestId", validationId)
    
    return withContext(MDCContext()) { ... }
    and I'm interested in 2 things 1. Should I do
    withContext(MDCContext())
    or
    withContext(coroutineContext + MDCContext())
    , with coroutineContext being provided by the parent scope where I am running this? 2. Can I abstract this into something like below to make it faster to create?
    Copy code
    suspend fun <T> withMDCContext(
        parentContext: CoroutineContext,
        block: suspend CoroutineScope.() -> T,
    ): T = withContext(parentContext + MDCContext()) { block() }
    
    // depending on your answer, or ...
    
    suspend fun <T> withMDCContext(block: suspend CoroutineScope.() -> T): T = withContext(MDCContext()) { block() }
    s
    • 2
    • 2
  • d

    Dmitry Khalanskiy [JB]

    04/08/2025, 8:31 AM
    📣 kotlinx.coroutines 1.10.2 is here!
    kotilnx-coroutines-debug
    was published in 1.10 in a way that confused some tooling. 1.10.2 fixes that and introduces several other bugfixes. The changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.10.2
    ❤️ 4
    K 4
    kodee happy 1
    ⏸️ 1
  • m

    Márton Matusek

    04/08/2025, 9:31 AM
    Hello Everyone, There is an ongoing issue about a time and size based
    chunked
    flow extension and we created an implementation for ourselves. Details: We are in need of a
    fun <T> Flow<T>.chunked(size: Int, timeout: Duration): Flow<List<T>>
    function to chunk elements in flows based on a chunk size and a timeout. The timeout means that uncomplete chunks are emitted downstream anyway after a timeout. Our implementation so far looks like this:
    Copy code
    fun <T> Flow<T>.chunked(chunkSize: Int, timeout: Duration): Flow<List<T>> =
        channelFlow {
            var result: ArrayList<T>? = null
            var timer: Job? = null
            val mutex = Mutex()
    
            this@chunked.collect { event ->
                val batch = result ?: ArrayList<T>(chunkSize).also { result = it }
                batch.add(event)
                if (timer == null) {
                    timer = launch {
                        delay(timeout)
                        mutex.withLock {
                            if (batch.isNotEmpty()) {
                                send(batch)
                                result = null
                            }
                        }
                    }
                }
    
                mutex.withLock {
                    if (batch.size >= chunkSize) {
                        send(batch)
                        result = null
                        timer?.cancelAndJoin()
                        timer = null
                    }
                }
            }
        }
    Can you please have a look and review potential problems in it? We would like to use it with SharedFlows as well. Thank you in advance!
    d
    • 2
    • 4
  • y

    Youssef Shoaib [MOD]

    04/10/2025, 5:13 AM
    A little trampolining implementation I wrote to provide functionality similar to
    DeepRecursiveFunction
    while allowing foreign suspension. Any critiques?
    Copy code
    internal fun <T> Continuation<T>.resumeWithIntercepted(result: Result<T>) {
      context.trampoline.next { resumeWith(result) }
    }
    
    @OptIn(InternalCoroutinesApi::class)
    internal fun CoroutineContext.withTrampoline(): CoroutineContext {
      val interceptor = this[ContinuationInterceptor].let {
        if (it is Trampoline) it.interceptor else it
      }
      return this + if (interceptor is Delay) TrampolineWithDelay(interceptor, interceptor) else Trampoline(interceptor)
    }
    
    @InternalCoroutinesApi
    private class TrampolineWithDelay(interceptor: ContinuationInterceptor?, delay: Delay) :
      Trampoline(interceptor), Delay by delay
    
    private open class Trampoline(val interceptor: ContinuationInterceptor?) :
      AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
      private var toRun: (() -> Unit)? = null
      fun next(block: () -> Unit) {
        check(toRun == null) { "Already running a block: $toRun" }
        toRun = block
      }
    
      override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        TrampolineContinuation(continuation).let {
          interceptor?.interceptContinuation(it) ?: it
        }
    
      override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        interceptor?.releaseInterceptedContinuation(continuation)
      }
    
      private inner class TrampolineContinuation<T>(val cont: Continuation<T>) : Continuation<T> {
        override val context: CoroutineContext = cont.context
    
        override fun resumeWith(result: Result<T>) {
          cont.resumeWith(result)
          while (true) {
            (toRun ?: return).also { toRun = null }.invoke()
          }
        }
      }
    }
    
    private val CoroutineContext.trampoline: Trampoline
      get() =
        this[ContinuationInterceptor] as? Trampoline ?: error("No trampoline in context: $this")
    Could likely be improved by defunctionalizing
    toRun
    lambdas if they're known to only need e.g. 2 fields.
  • d

    David Kubecka

    04/10/2025, 2:43 PM
    I have a class with 2 lazy properties and I need to compute them in parallel. My idea was to use coroutines for this. Here is a pseudo-code - it doesn't compile but hopefully conveys the idea:
    Copy code
    class MyClass(val scope: CoroutineScope) {
      val p1 = lazy { scope.async { someExpensiveComputation() } }
      val p2 = lazy { scope.async { anotherExpensiveComputation() } }
    
      val p3 = lazy { useBothProperties(p1.await(), p2.await()) }
      val p4 = lazy { useSingleProperty(p1.await()) }
    }
    
    fun main() {
      runBlocking {
        val myClass = MyClass(this)
        myClass.p3 + myClass.p4
      }
    }
    How can I achieve this with coroutines? Or is there perhaps another way how to achieve my main goal (first sentence)?
    s
    s
    +2
    • 5
    • 43
  • y

    Youssef Shoaib [MOD]

    04/10/2025, 8:23 PM
    Is it ever planned to have delayed continuation spilling? I.e. that continuations only spill when suspended. I can imagine that may not work too well in multi threaded situations.
  • a

    alexhelder

    04/10/2025, 10:01 PM
    Is there a better way to do this? The goal is for the assignment to
    items
    to not block the thread: •
    daggerLazyPagerFactory
    is
    dagger.Lazy, get()
    bocks the thread in my case • i have the
    flow { }
    builder just with a single
    emitAll
    , which seems wrong, but moves the blocking
    get()
    to
    Dispatcher.Default
    Copy code
    val items: Flow<PagingData<Item>> = flow {
            emitAll(daggerLazyPagerFactory.get().flow())
        }.flowOn(Dispatchers.Default)
            .cachedIn(viewModelScope)
    If I try this, get() blocks the thread, as
    flowOn{}
    does not impact that:
    Copy code
    val items = daggerLayPagerFactory.get()
       .flow()
       .flowOn(Dispatchers.Default)
       .cacheIn(...)
    r
    • 2
    • 4
  • a

    Alexandru Caraus

    04/11/2025, 5:13 AM
    Maybe get().flow() runs on main under the hood
    🧵 1
  • z

    zak.taccardi

    04/14/2025, 4:44 PM
    What would be the ideal way to convert a
    CoroutineScope
    or
    CoroutineContext
    to an
    ExecutorService
    ? context is that I'm writing a test (so the
    CoroutineScope
    comes from the
    runTest { }
    function), and I need to inject an
    okhttp3.Dispatcher
    (which requires an
    ExecutorService
    to construct, docs) into an
    OkHttpClient.Builder
    s
    • 2
    • 3
  • r

    rakeeb

    04/16/2025, 9:35 PM
    Wrote a class which delays running an action.
    Copy code
    class SuspendedInvocationTimer(
        val onTimerExpired: suspend () -> Unit,
    ) {
    
        suspend fun start(duration: Duration) = coroutineScope {
            delay(duration)
            if (isActive) {
                onTimerExpired()
            }
        }
    }
    Even when the parent coroutine scope is cancelled the
    onTimerExpired
    lambda is still invoked. I've tried a few variations like:
    Copy code
    suspend fun start(duration: Duration) = coroutineScope {
        val job = launch { delay(duration) }
        job.join()
        val isCancelling = !job.isActive && !job.isCompleted && job.isCancelled
        val isCancelled = !job.isActive && job.isCompleted && job.isCancelled
        if (!isCancelling && !isCancelled) {
            onTimerExpired()
        }
    }
    or checking to see the parent coroutine scope as well, none of them seem to work. The
    onTimerExpired
    lambda always gets invoked even if the timer got cancelled. What is the fault in my approach here?
    z
    • 2
    • 12
  • m

    marcinmoskala

    04/17/2025, 9:49 AM
    This code ends with exception and never prints "Done":
    Copy code
    fun main() {
        runTest {
            supervisorScope {
                launch { throw Exception("Error in coroutine") }
            }
        }
        println("Done")
    }
    That is a result of runTest setting CoroutineExceptionHandler. I do not understand why it is doing that, I only had problems with this behavior. I made an issue: https://youtrack.jetbrains.com/issue/KT-76854/supervisorScope-misbehaves-in-runTest
    d
    y
    +2
    • 5
    • 15
  • m

    Michael

    04/17/2025, 8:58 PM
    Hey there! I have a question about how coroutines ensure the happens-before relationship.. There’s an article by Roman Elizarov where he explains that suspending functions perform some synchronization, which makes accessing mutable state safe as long as it happens from the same coroutine. There’s also the
    limitedParallelism(1)
    pattern, which, according to the documentation, guarantees sequential task execution and establishes a happens-before relationship between them. So it makes shared state thread-safe even when accessed from different coroutines. So, I was wondering: • Is this something that the
    LimitedDispatcher
    explicitly enforces, or is it just a side effect of the lack of parallelism? In other words, if I don’t use
    limitedParallelism
    , but it just so happens that two different coroutines on the same dispatcher are executed sequentially, does that mean one coroutine is guaranteed to see modifications made by the other? • I’ve been struggling to reproduce any memory visibility issues.. It seems that once a coroutine suspends, any other coroutine can immediately observe changes made to shared state (without additional synchronization). Am I just lucky, or are there some subtleties in how this works that you could give insight into? Thanks in advance 😊
    r
    c
    +2
    • 5
    • 13
  • v

    Vampire

    04/18/2025, 3:16 PM
    Say I want to start an asynchronous background task that should not be waited for in the current suspending function, what is the appropriate way? I was under the impression
    GlobalScope.launch { /* do the background task */ }
    is the appropriate way. I'm just a bit uncertain as I have to opt-in to
    DelicateCoroutinesApi
    and the AI-Reviewer suggests to use a
    CoroutineScope
    instead. It more details are interesting, when a request for X is done, X is taken from cache or calculated and put to cache. After X was requested, it is quite likely, that Y and Z are requested next, so I want to trigger asynchronously in the background that Y and Z are calculated and put to the cache. But this should not delay the returning of X.
    p
    r
    +2
    • 5
    • 12
  • d

    Don Mitchell

    04/18/2025, 6:59 PM
    In Kotlin Coroutine Confidence @Sam off the cuff says that IntelliJ shows potentially blocking coroutine calls. I don't see that nor how to enable it. Is it only in AndroidStudio or am I missing it? I even put
    Thread.sleep(999)
    in a
    suspend
    method and intelliJ didn't slap my hand.
    k
    s
    e
    • 4
    • 14
  • r

    reactormonk

    04/22/2025, 10:36 AM
    What's the difference between
    Copy code
    mutableStateFlow1.value = mutableStateFlow0.value
    and
    Copy code
    mutableStateFlow1.emit(mutableStateFlow0.value)
    same 1
    d
    • 2
    • 2
  • d

    Don Mitchell

    04/23/2025, 12:47 PM
    `runBlocking`is so easy and evil to use. How do I avoid it in top level methods which are below
    main
    or in
    koin
    module definitions or class initializers/constructors? Example from
    koin
    init (as you'll see I'm questioning the original author's overuse and non-trivial computations)
    Copy code
    single {
                    val redis: RedisConnection = get(CORE_REDIS_CONNECTION)
                    val redisHealth = HealthDetail(
                        url = redis.uris.first().toString(),
                        healthy = runCatching {
                            // ugh, this is ugly. too much work in koin and unclear what the coroutine context is.
                            // this may deadlock coroutines as this won't release the context until it completes.
                            runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
                                redis.withSuspendConnection { connection ->
                                    connection.ping()
                                }
                            }
                            true
                        }.getOrDefault(false),
                        responseTimeMs = 0
                    )
    
                    Health(database = get(MYSQL), stage = Stage.fromEnvironment(), redisHealth = redisHealth)
                }
    c
    z
    k
    • 4
    • 12
  • d

    Don Mitchell

    04/23/2025, 9:33 PM
    On the JVM, what's the preferred way to "find and select a coroutine scope" from a top-level function that isn't
    main
    ? e.g., when implementing
    com.github.ajalt.clikt.core.CoreCliktCommand#run
    ? You can't
    Copy code
    CoroutineScope().launch {
    ...
    }
    as there's nothing to stop the process from exiting and the container from shutting down before it does anything (there's no containing scope). You can't call
    join
    on that bc there's no suspend context. I've been wantonly using
    runBlocking
    but I know that's not the preferred pattern. What I don't grok is what is the preferred pattern? (NOTE, this is a long-running headless batch service; so, there's no UI so
    MainScope
    makes no sense,
    coroutineScope
    is not allowed bc it's not a suspend context,
    GlobalScope
    has the same problem as
    CoroutineScope()
    , neither of these exist:
    lifecycleScope, viewModelScope
    )
    k
    s
    j
    • 4
    • 7
  • u

    ursus

    04/24/2025, 9:33 AM
    Copy code
    coroutineScope {
        launch {
            outgoingMessages
                .collect {
                    webSocket.send(...)
                }
        }
    
        for (frame in webSocket.incoming) {
            ...
        }
    }
    I have a websocket where there is a incoming messages loop and I need to send messages as well,
    outgoingMessages
    is currently a
    MutableSharedFlow<OutgoingMessage>
    to which I pipe in messages and then they get collected here and sent trouble is, that
    MutableSharedFlow
    is never completing, so this
    coroutineScope
    will never return, which is a problem, when server closes the connection what would be the most clean and idiomatic way of solving this? or more generally, in a
    coroutineScope
    how can one child cancel the other?
    j
    • 2
    • 38
  • s

    Stephen Edwards

    04/24/2025, 8:56 PM
    Broken assumption:
    yield()
    on
    Main.immediate
    will behave like Unconfined and empty the current event loop or continue undispatched if its empty. Discovered result:
    yield()
    for
    Main.immediate
    on Android always ends up getting a main thread message posted. Brief investigation: HandlerContext has no
    dispatchYield()
    implementation, which means that it will default to just
    dispatch
    (here). Which means that
    dispatcherWasUnconfined
    will not get set and we will never
    yieldUndispatched()
    for
    yield()
    (here).
    Copy code
    public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val context = uCont.context
        context.ensureActive()
        val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
        if (cont.dispatcher.safeIsDispatchNeeded(context)) {
            // this is a regular dispatcher -- do simple dispatchYield
            cont.dispatchYield(context, Unit)
        } else {
            // This is either an "immediate" dispatcher or the Unconfined dispatcher
            // This code detects the Unconfined dispatcher even if it was wrapped into another dispatcher
            val yieldContext = YieldContext()
            cont.dispatchYield(context + yieldContext, Unit)
            // Special case for the unconfined dispatcher that can yield only in existing unconfined loop
            if (yieldContext.dispatcherWasUnconfined) {
                // Means that the Unconfined dispatcher got the call, but did not do anything.
                // See also code of "Unconfined.dispatch" function.
                return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
            }
            // Otherwise, it was some other dispatcher that successfully dispatched the coroutine
        }
        COROUTINE_SUSPENDED
    }
    Initial question: Why is that? Is it a bug? Should not
    yield()
    have the same behavior for
    Main.immediate
    and any other dispatcher using an event-loop?
    z
    d
    • 3
    • 59
  • d

    Don Mitchell

    04/25/2025, 2:27 PM
    Frustration w kotlin compiler, I needed an entry point for connecting and executing redis operations from Java (the one for kotlin was trivial to write). I made one which returned a
    CompletableFuture
    . I was shocked that the compiler didn't flag illegal casts and that in runtime, the caller logged but swallowed the cast exception. IDK if it was bc it was on a future or something up the stack from the caller that swallowed it. Here's the erroneous code (
    pool
    is
    suspend
    ). The fix is to use
    RedisClusterCommands
    not its subclass
    RedisCommands
    Copy code
    fun <T> withBlockingConnection(body: (RedisCommands<String, String>) -> T): CompletableFuture<T> =
            CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).future {
                val connection = pool.borrowObject()
                try {
                    val commands = when (connection) {
                        is StatefulRedisConnection -> connection.sync()
                        is StatefulRedisClusterConnection -> connection.sync()
                        else -> throw IllegalStateException("Redis connection is not a known type $connection")
                    } as RedisCommands<String, String>
    
                    body(commands)
                } finally {
                    pool.returnObject(connection)
                }
            }
    j
    • 2
    • 1
  • j

    João Cana Verde

    04/25/2025, 4:56 PM
    I'm trying to debug an exception that gets thrown deep inside some coroutine calls. I'm using Spring and I'd like
    kotlinx.coroutines.debug
    to be enabled both in production and in tests, so I've added a
    @Configuration
    and I've confirmed that this runs:
    Copy code
    DebugProbes.sanitizeStackTraces = false
    DebugProbes.enableCreationStackTraces = enableCreationStackTraces // disabled for prod, enabled for tests
    
    System.setProperty(
      kotlinx.coroutines.DEBUG_PROPERTY_NAME,
      kotlinx.coroutines.DEBUG_PROPERTY_VALUE_ON
    )
    
    DebugProbes.install()
    But I still get a super limited stack trace when I run my test. Is this the right way to set it up? I've also added a
    @CoroutinesTimeout
    to my test class but that didn't help either. What am I missing?
    r
    • 2
    • 4
  • u

    ursus

    04/29/2025, 2:51 PM
    Copy code
    // ktor client
    webSocketSession.send(frame)
    this suspends until it's actually sent? or only until enqueued (and will be sent later)? if it's the later - is there a way to tell it's actually been delivered? -- I need to update my entity's state from
    SENDING
    to
    SENT
    y
    d
    • 3
    • 29
  • d

    Don Mitchell

    04/30/2025, 8:22 PM
    (jvm-based), while trying to ensure that content specific exceptions don't stop sibling/parent coroutines, I want system exceptions to cancel the stack. It's doing great at the former but blithely continuing on the latter. Here's the skeleton, any 🤦 gotchas you note? The
    >>
    designates where I expect it to void the
    supervisorScope
    . (Are there paradigmatic examples of how to void the
    supervisorScope
    ?)
    Copy code
    supervisorScope {
      accounts.forEach { account ->
        this.ensureActive()
        launch(errorHandler("..")) {
            doit(account)
         }
      }
    }
    
        protected fun errorHandler(msg: String): CoroutineExceptionHandler =
            CoroutineExceptionHandler { context, exception ->
                logger.error(exception) { msg }
                if (exception.fatal() || exception.outOfDiskSpace()) {
    >>                context.cancel(CancellationException(null, cause = exception))
                }
            }
    e
    • 2
    • 6
  • a

    Anurag Soni

    05/01/2025, 4:31 PM
    Is there a particular reason one should prefer
    suspend fun main()
    over
    fun main() { runBlocking ...
    as the entry point of an application that uses coroutines? I've always defaulted to
    runBlocking
    as that was what I remember initially from the coroutine docs. I believe one difference might be that runBlocking picks a dispatcher from kotlinx-coroutines and suspend main starts off on the Main thread?
    y
    k
    z
    • 4
    • 11
  • y

    Youssef Shoaib [MOD]

    05/01/2025, 9:32 PM
    Is there a reason why JS's
    CoroutineImpl
    doesn't use
    Result
    internally? Is
    Result
    slower on JS or something? Same thing with wasmJs
  • d

    Don Mitchell

    05/02/2025, 5:25 PM
    CoroutineScope.cancel
    doesn't seem to work at least in the kotlin playground or I'm missing something. I expected the following to cancel on
    i==6
    but it doesn't. It throws the error after the loop finishes.
    Copy code
    import kotlinx.coroutines.*
    
    @OptIn(DelicateCoroutinesApi::class)
    suspend fun main() = coroutineScope {
        repeat(10) { i ->
            keepRunningUnlessFatal {
                if (i == 3) {
                    println("throwing ISE")
                    throw IllegalStateException("Illegal 3")
                }
                if (i == 6) {
                    println("throwing OOM")
                    throw OutOfMemoryError("OOM")
                }
                println("i ${i}")
            }
        }
     
    }
    
    suspend fun <T> CoroutineScope.keepRunningUnlessFatal(
        message: String = "Continuing execution after exception",
        block: suspend () -> T
    ): T? = try {
        block()
    } catch (e: VirtualMachineError) {
        this.cancel("FATAL_ERROR_MSG", e)
        null
        // HERE - uncomment to see difference
        //throw e
    } catch (e: InterruptedException) {
        this.cancel("FATAL_ERROR_MSG", e)
        null
    } catch (e: LinkageError) {
        this.cancel("FATAL_ERROR_MSG", e)
        null
    } catch (e: CancellationException) {
        this.cancel(e)
        null
    } catch (e: Exception) {
        println(e)
        println(message)
        null
    }
    e
    t
    c
    • 4
    • 9
  • u

    ursus

    05/02/2025, 11:42 PM
    Copy code
    var webSocketSession = DefaultClientWebSocketSession? = null
    try {
          webSocketSession = ktorClient.webSocketSession(url)
          ....
    } finally {
          webSocketSession?.close() <----
    }
    Coroutines general question: ktor calls
    close
    in finally block as to do graceful exit. However
    close
    is a suspend function. How can it work reliably? Aren't all suspend functions no-op after cancellation unless wrapped with
    NomCancellable
    ? Am I missing something?
    r
    c
    • 3
    • 6
  • j

    Jan

    05/08/2025, 4:58 PM
    Is there some way to get the full stacktrace in the exception (main() and mainMethod() are missing in the stacktrace, I assume because of the suspending delay(...) use)? Snippet: https://pl.kotl.in/rVbVEv0nt
    j
    • 2
    • 3
  • y

    Yassine Abou

    05/12/2025, 6:20 PM
    I'm having an issue with my Compose Multiplatform project after updating library versions. While the app runs smoothly on Android , iOS , and desktop , the WebAssembly version displays a blank page with the following errors in the browser console:
    Copy code
    NotImplementedError: Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic
    kotlinx.coroutines.error_$external_fun