Youssef Shoaib [MOD]
04/07/2025, 6:19 AMsuspendCoroutineUninterceptedOrReturn
, but seemingly it works fine! Is that a guaranteed behaviour, or could a future compiler version break that somehow? I understand that the warning against doing that maybe due to stack overflows and such.Hugo Costa
04/07/2025, 2:05 PMwithContext
. I find myself using MDC a lot like
val validationId = message.validationUniqueIdentifier
MDC.put("validationRequestId", validationId)
return withContext(MDCContext()) { ... }
and I'm interested in 2 things
1. Should I do withContext(MDCContext())
or withContext(coroutineContext + MDCContext())
, with coroutineContext being provided by the parent scope where I am running this?
2. Can I abstract this into something like below to make it faster to create?
suspend fun <T> withMDCContext(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> T,
): T = withContext(parentContext + MDCContext()) { block() }
// depending on your answer, or ...
suspend fun <T> withMDCContext(block: suspend CoroutineScope.() -> T): T = withContext(MDCContext()) { block() }
Dmitry Khalanskiy [JB]
04/08/2025, 8:31 AMkotilnx-coroutines-debug
was published in 1.10 in a way that confused some tooling. 1.10.2 fixes that and introduces several other bugfixes.
The changelog: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.10.2Márton Matusek
04/08/2025, 9:31 AMchunked
flow extension and we created an implementation for ourselves.
Details:
We are in need of a fun <T> Flow<T>.chunked(size: Int, timeout: Duration): Flow<List<T>>
function to chunk elements in flows based on a chunk size and a timeout. The timeout means that uncomplete chunks are emitted downstream anyway after a timeout.
Our implementation so far looks like this:
fun <T> Flow<T>.chunked(chunkSize: Int, timeout: Duration): Flow<List<T>> =
channelFlow {
var result: ArrayList<T>? = null
var timer: Job? = null
val mutex = Mutex()
this@chunked.collect { event ->
val batch = result ?: ArrayList<T>(chunkSize).also { result = it }
batch.add(event)
if (timer == null) {
timer = launch {
delay(timeout)
mutex.withLock {
if (batch.isNotEmpty()) {
send(batch)
result = null
}
}
}
}
mutex.withLock {
if (batch.size >= chunkSize) {
send(batch)
result = null
timer?.cancelAndJoin()
timer = null
}
}
}
}
Can you please have a look and review potential problems in it? We would like to use it with SharedFlows as well.
Thank you in advance!Youssef Shoaib [MOD]
04/10/2025, 5:13 AMDeepRecursiveFunction
while allowing foreign suspension. Any critiques?
internal fun <T> Continuation<T>.resumeWithIntercepted(result: Result<T>) {
context.trampoline.next { resumeWith(result) }
}
@OptIn(InternalCoroutinesApi::class)
internal fun CoroutineContext.withTrampoline(): CoroutineContext {
val interceptor = this[ContinuationInterceptor].let {
if (it is Trampoline) it.interceptor else it
}
return this + if (interceptor is Delay) TrampolineWithDelay(interceptor, interceptor) else Trampoline(interceptor)
}
@InternalCoroutinesApi
private class TrampolineWithDelay(interceptor: ContinuationInterceptor?, delay: Delay) :
Trampoline(interceptor), Delay by delay
private open class Trampoline(val interceptor: ContinuationInterceptor?) :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
private var toRun: (() -> Unit)? = null
fun next(block: () -> Unit) {
check(toRun == null) { "Already running a block: $toRun" }
toRun = block
}
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
TrampolineContinuation(continuation).let {
interceptor?.interceptContinuation(it) ?: it
}
override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
interceptor?.releaseInterceptedContinuation(continuation)
}
private inner class TrampolineContinuation<T>(val cont: Continuation<T>) : Continuation<T> {
override val context: CoroutineContext = cont.context
override fun resumeWith(result: Result<T>) {
cont.resumeWith(result)
while (true) {
(toRun ?: return).also { toRun = null }.invoke()
}
}
}
}
private val CoroutineContext.trampoline: Trampoline
get() =
this[ContinuationInterceptor] as? Trampoline ?: error("No trampoline in context: $this")
Could likely be improved by defunctionalizing toRun
lambdas if they're known to only need e.g. 2 fields.David Kubecka
04/10/2025, 2:43 PMclass MyClass(val scope: CoroutineScope) {
val p1 = lazy { scope.async { someExpensiveComputation() } }
val p2 = lazy { scope.async { anotherExpensiveComputation() } }
val p3 = lazy { useBothProperties(p1.await(), p2.await()) }
val p4 = lazy { useSingleProperty(p1.await()) }
}
fun main() {
runBlocking {
val myClass = MyClass(this)
myClass.p3 + myClass.p4
}
}
How can I achieve this with coroutines?
Or is there perhaps another way how to achieve my main goal (first sentence)?Youssef Shoaib [MOD]
04/10/2025, 8:23 PMalexhelder
04/10/2025, 10:01 PMitems
to not block the thread:
• daggerLazyPagerFactory
is dagger.Lazy, get()
bocks the thread in my case
• i have the flow { }
builder just with a single emitAll
, which seems wrong, but moves the blocking get()
to Dispatcher.Default
val items: Flow<PagingData<Item>> = flow {
emitAll(daggerLazyPagerFactory.get().flow())
}.flowOn(Dispatchers.Default)
.cachedIn(viewModelScope)
If I try this, get() blocks the thread, as flowOn{}
does not impact that:
val items = daggerLayPagerFactory.get()
.flow()
.flowOn(Dispatchers.Default)
.cacheIn(...)
Alexandru Caraus
04/11/2025, 5:13 AMzak.taccardi
04/14/2025, 4:44 PMCoroutineScope
or CoroutineContext
to an ExecutorService
?
context is that I'm writing a test (so the CoroutineScope
comes from the runTest { }
function), and I need to inject an okhttp3.Dispatcher
(which requires an ExecutorService
to construct, docs) into an OkHttpClient.Builder
rakeeb
04/16/2025, 9:35 PMclass SuspendedInvocationTimer(
val onTimerExpired: suspend () -> Unit,
) {
suspend fun start(duration: Duration) = coroutineScope {
delay(duration)
if (isActive) {
onTimerExpired()
}
}
}
Even when the parent coroutine scope is cancelled the onTimerExpired
lambda is still invoked. I've tried a few variations like:
suspend fun start(duration: Duration) = coroutineScope {
val job = launch { delay(duration) }
job.join()
val isCancelling = !job.isActive && !job.isCompleted && job.isCancelled
val isCancelled = !job.isActive && job.isCompleted && job.isCancelled
if (!isCancelling && !isCancelled) {
onTimerExpired()
}
}
or checking to see the parent coroutine scope as well, none of them seem to work. The onTimerExpired
lambda always gets invoked even if the timer got cancelled.
What is the fault in my approach here?marcinmoskala
04/17/2025, 9:49 AMfun main() {
runTest {
supervisorScope {
launch { throw Exception("Error in coroutine") }
}
}
println("Done")
}
That is a result of runTest setting CoroutineExceptionHandler. I do not understand why it is doing that, I only had problems with this behavior. I made an issue:
https://youtrack.jetbrains.com/issue/KT-76854/supervisorScope-misbehaves-in-runTestMichael
04/17/2025, 8:58 PMlimitedParallelism(1)
pattern, which, according to the documentation, guarantees sequential task execution and establishes a happens-before relationship between them. So it makes shared state thread-safe even when accessed from different coroutines.
So, I was wondering:
• Is this something that the LimitedDispatcher
explicitly enforces, or is it just a side effect of the lack of parallelism? In other words, if I don’t use limitedParallelism
, but it just so happens that two different coroutines on the same dispatcher are executed sequentially, does that mean one coroutine is guaranteed to see modifications made by the other?
• I’ve been struggling to reproduce any memory visibility issues.. It seems that once a coroutine suspends, any other coroutine can immediately observe changes made to shared state (without additional synchronization). Am I just lucky, or are there some subtleties in how this works that you could give insight into?
Thanks in advance 😊Vampire
04/18/2025, 3:16 PMGlobalScope.launch { /* do the background task */ }
is the appropriate way.
I'm just a bit uncertain as I have to opt-in to DelicateCoroutinesApi
and the AI-Reviewer suggests to use a CoroutineScope
instead.
It more details are interesting,
when a request for X is done, X is taken from cache or calculated and put to cache.
After X was requested, it is quite likely, that Y and Z are requested next,
so I want to trigger asynchronously in the background that Y and Z are calculated and put to the cache.
But this should not delay the returning of X.Don Mitchell
04/18/2025, 6:59 PMThread.sleep(999)
in a suspend
method and intelliJ didn't slap my hand.reactormonk
04/22/2025, 10:36 AMmutableStateFlow1.value = mutableStateFlow0.value
and
mutableStateFlow1.emit(mutableStateFlow0.value)
Don Mitchell
04/23/2025, 12:47 PMmain
or in koin
module definitions or class initializers/constructors? Example from koin
init (as you'll see I'm questioning the original author's overuse and non-trivial computations)
single {
val redis: RedisConnection = get(CORE_REDIS_CONNECTION)
val redisHealth = HealthDetail(
url = redis.uris.first().toString(),
healthy = runCatching {
// ugh, this is ugly. too much work in koin and unclear what the coroutine context is.
// this may deadlock coroutines as this won't release the context until it completes.
runBlocking(<http://Dispatchers.IO|Dispatchers.IO>) {
redis.withSuspendConnection { connection ->
connection.ping()
}
}
true
}.getOrDefault(false),
responseTimeMs = 0
)
Health(database = get(MYSQL), stage = Stage.fromEnvironment(), redisHealth = redisHealth)
}
Don Mitchell
04/23/2025, 9:33 PMmain
? e.g., when implementing com.github.ajalt.clikt.core.CoreCliktCommand#run
?
You can't
CoroutineScope().launch {
...
}
as there's nothing to stop the process from exiting and the container from shutting down before it does anything (there's no containing scope). You can't call join
on that bc there's no suspend context.
I've been wantonly using runBlocking
but I know that's not the preferred pattern. What I don't grok is what is the preferred pattern? (NOTE, this is a long-running headless batch service; so, there's no UI so MainScope
makes no sense, coroutineScope
is not allowed bc it's not a suspend context, GlobalScope
has the same problem as CoroutineScope()
, neither of these exist: lifecycleScope, viewModelScope
)ursus
04/24/2025, 9:33 AMcoroutineScope {
launch {
outgoingMessages
.collect {
webSocket.send(...)
}
}
for (frame in webSocket.incoming) {
...
}
}
I have a websocket where there is a incoming messages loop and I need to send messages as well, outgoingMessages
is currently a MutableSharedFlow<OutgoingMessage>
to which I pipe in messages and then they get collected here and sent
trouble is, that MutableSharedFlow
is never completing, so this coroutineScope
will never return, which is a problem, when server closes the connection
what would be the most clean and idiomatic way of solving this?
or more generally, in a coroutineScope
how can one child cancel the other?Stephen Edwards
04/24/2025, 8:56 PMyield()
on Main.immediate
will behave like Unconfined and empty the current event loop or continue undispatched if its empty.
Discovered result: yield()
for Main.immediate
on Android always ends up getting a main thread message posted.
Brief investigation: HandlerContext has no dispatchYield()
implementation, which means that it will default to just dispatch
(here). Which means that dispatcherWasUnconfined
will not get set and we will never yieldUndispatched()
for yield()
(here).
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
context.ensureActive()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (cont.dispatcher.safeIsDispatchNeeded(context)) {
// this is a regular dispatcher -- do simple dispatchYield
cont.dispatchYield(context, Unit)
} else {
// This is either an "immediate" dispatcher or the Unconfined dispatcher
// This code detects the Unconfined dispatcher even if it was wrapped into another dispatcher
val yieldContext = YieldContext()
cont.dispatchYield(context + yieldContext, Unit)
// Special case for the unconfined dispatcher that can yield only in existing unconfined loop
if (yieldContext.dispatcherWasUnconfined) {
// Means that the Unconfined dispatcher got the call, but did not do anything.
// See also code of "Unconfined.dispatch" function.
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
// Otherwise, it was some other dispatcher that successfully dispatched the coroutine
}
COROUTINE_SUSPENDED
}
Initial question: Why is that? Is it a bug? Should not yield()
have the same behavior for Main.immediate
and any other dispatcher using an event-loop?Don Mitchell
04/25/2025, 2:27 PMCompletableFuture
. I was shocked that the compiler didn't flag illegal casts and that in runtime, the caller logged but swallowed the cast exception. IDK if it was bc it was on a future or something up the stack from the caller that swallowed it. Here's the erroneous code (pool
is suspend
). The fix is to use RedisClusterCommands
not its subclass RedisCommands
fun <T> withBlockingConnection(body: (RedisCommands<String, String>) -> T): CompletableFuture<T> =
CoroutineScope(<http://Dispatchers.IO|Dispatchers.IO>).future {
val connection = pool.borrowObject()
try {
val commands = when (connection) {
is StatefulRedisConnection -> connection.sync()
is StatefulRedisClusterConnection -> connection.sync()
else -> throw IllegalStateException("Redis connection is not a known type $connection")
} as RedisCommands<String, String>
body(commands)
} finally {
pool.returnObject(connection)
}
}
João Cana Verde
04/25/2025, 4:56 PMkotlinx.coroutines.debug
to be enabled both in production and in tests, so I've added a @Configuration
and I've confirmed that this runs:
DebugProbes.sanitizeStackTraces = false
DebugProbes.enableCreationStackTraces = enableCreationStackTraces // disabled for prod, enabled for tests
System.setProperty(
kotlinx.coroutines.DEBUG_PROPERTY_NAME,
kotlinx.coroutines.DEBUG_PROPERTY_VALUE_ON
)
DebugProbes.install()
But I still get a super limited stack trace when I run my test. Is this the right way to set it up? I've also added a @CoroutinesTimeout
to my test class but that didn't help either.
What am I missing?ursus
04/29/2025, 2:51 PM// ktor client
webSocketSession.send(frame)
this suspends until it's actually sent? or only until enqueued (and will be sent later)?
if it's the later - is there a way to tell it's actually been delivered?
--
I need to update my entity's state from SENDING
to SENT
Don Mitchell
04/30/2025, 8:22 PM>>
designates where I expect it to void the supervisorScope
. (Are there paradigmatic examples of how to void the supervisorScope
?)
supervisorScope {
accounts.forEach { account ->
this.ensureActive()
launch(errorHandler("..")) {
doit(account)
}
}
}
protected fun errorHandler(msg: String): CoroutineExceptionHandler =
CoroutineExceptionHandler { context, exception ->
logger.error(exception) { msg }
if (exception.fatal() || exception.outOfDiskSpace()) {
>> context.cancel(CancellationException(null, cause = exception))
}
}
Anurag Soni
05/01/2025, 4:31 PMsuspend fun main()
over fun main() { runBlocking ...
as the entry point of an application that uses coroutines? I've always defaulted to runBlocking
as that was what I remember initially from the coroutine docs. I believe one difference might be that runBlocking picks a dispatcher from kotlinx-coroutines and suspend main starts off on the Main thread?Youssef Shoaib [MOD]
05/01/2025, 9:32 PMCoroutineImpl
doesn't use Result
internally? Is Result
slower on JS or something? Same thing with wasmJsDon Mitchell
05/02/2025, 5:25 PMCoroutineScope.cancel
doesn't seem to work at least in the kotlin playground or I'm missing something. I expected the following to cancel on i==6
but it doesn't. It throws the error after the loop finishes.
import kotlinx.coroutines.*
@OptIn(DelicateCoroutinesApi::class)
suspend fun main() = coroutineScope {
repeat(10) { i ->
keepRunningUnlessFatal {
if (i == 3) {
println("throwing ISE")
throw IllegalStateException("Illegal 3")
}
if (i == 6) {
println("throwing OOM")
throw OutOfMemoryError("OOM")
}
println("i ${i}")
}
}
}
suspend fun <T> CoroutineScope.keepRunningUnlessFatal(
message: String = "Continuing execution after exception",
block: suspend () -> T
): T? = try {
block()
} catch (e: VirtualMachineError) {
this.cancel("FATAL_ERROR_MSG", e)
null
// HERE - uncomment to see difference
//throw e
} catch (e: InterruptedException) {
this.cancel("FATAL_ERROR_MSG", e)
null
} catch (e: LinkageError) {
this.cancel("FATAL_ERROR_MSG", e)
null
} catch (e: CancellationException) {
this.cancel(e)
null
} catch (e: Exception) {
println(e)
println(message)
null
}
ursus
05/02/2025, 11:42 PMvar webSocketSession = DefaultClientWebSocketSession? = null
try {
webSocketSession = ktorClient.webSocketSession(url)
....
} finally {
webSocketSession?.close() <----
}
Coroutines general question:
ktor calls close
in finally block as to do graceful exit.
However close
is a suspend function.
How can it work reliably? Aren't all suspend functions no-op after cancellation unless wrapped with NomCancellable
?
Am I missing something?Jan
05/08/2025, 4:58 PMYassine Abou
05/12/2025, 6:20 PMNotImplementedError: Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic
kotlinx.coroutines.error_$external_fun