Hong Phuc
09/22/2025, 2:15 AMRob Elliot
09/23/2025, 11:17 AMsealed interface FooResult
data object FooCancelled : FooResult
data class FooSuccess : FooResult
suspend fun foo(): FooResult = TODO()
val deferred = async { foo() }
deferred.cancel()
val result = deferred.await()
so that the last call will never throw CancellationException, instead returning FooCancelled? Without a try / catch (or equivalent) around deferred.await()?Tian Tian098
09/24/2025, 3:52 AMval foo = MutableStateFlow(0)
val bar = flow { ... }
foo.zip(bar) { ... } // I don't want the resulting flow to be limited by how often foo is updated.Jonas
09/24/2025, 12:48 PMFlow using callbackFlow , do I have to call close() after trySend(...) when the call back only emits once?Julius Babies
09/27/2025, 7:20 PMArtyom Gornostayev
09/29/2025, 10:03 AMkotlin-coroutines-debug library, which solves my requirements. However, I'm not sure about performance impact + if it is suitable to native builds (according to the docs - not really)...
I would be very grateful for advice. :)CLOVIS
10/01/2025, 12:51 PMreactormonk
10/09/2025, 12:36 PMreactormonk
10/15/2025, 4:16 PMfirst() doesn't get cancelled by the timeoutMario Andhika
10/17/2025, 7:04 AMobject MyObject {
var x = 1
init {
GlobalScope.launch {
x++
}
}
}
suspend fun main() = coroutineScope {
println(MyClass.x.toString()) // output: 1
delay(100L)
println(MyClass.x.toString()) // output: 2
}
How to make MyClass.x.toString() print 2 without needing to have a delay statement? How should I change the init block?Peter
10/20/2025, 1:48 AMCLOVIS
10/20/2025, 6:13 PMDebugProbes.install() is very slow! Here, it takes between ~1.5s and ~3s depending on the run, and it's called once for each Gradle moduleursus
10/22/2025, 12:21 AMMutableSharedFlow (that acts as a trigger to refresh stuff), that has a default value (so syncs get triggered right away on start)?
I find manually throwing in `trigger.emit(Unit)`after everyone is subscriber a bit awkwards
Unless there is an API I'm not aware of, probably the best would be to have own subclass right? Like this maybe?
class TriggerFlow : Flow<Unit> {
private val _trigger = MutableSharedFlow<Unit>(
replay = 1
)
init {
_trigger.tryEmit(Unit)
}
suspend fun trigger() {
_trigger.emit(Unit)
}
override suspend fun collect(collector: FlowCollector<Unit>) {
_trigger.collect(collector)
}
}
How does this look? Is the tryEmit in init cool?Lukas K-G
10/22/2025, 1:35 PMinternal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
trySendBlocking(adItemStatus)
val statusListener = AdItemStatusListener { _, status -> trySendBlocking(status) }
addStatusListener(statusListener)
awaitClose { removeStatusListener(statusListener) }
}
We figured that this is likely due to the trySendBlocking which does a runBlocking.
We then pivoted to the following:
internal fun ScheduledAdItem.adItemStatusAsFlow() = callbackFlow {
send(adItemStatus)
val statusListener = AdItemStatusListener { _, status -> launch { send(status) }}
addStatusListener(statusListener)
awaitClose { removeStatusListener(statusListener) }
}
Which we then figured that it might not guarantee the order of events if run on a coroutine dispatcher with multiple threads.
What would be the recommended way to implement a cancellation cooperative callback flow with guaranteed event order?
Thanks in advance!Alex Styl
10/24/2025, 3:17 AMdelay() for some days and then continue the coroutine)Winson Chiu
10/27/2025, 8:12 PMrunTest workaround for withTimeout using withContext(Dispatchers.Default.limitedParallelism(1)), but is there a workaround for Flow.timeout? I can flowOn, but I'd rather not force my entire Flow to a specific dispatcher if possible.Bernhard
10/29/2025, 1:57 PMrocketraman
10/31/2025, 1:54 PMMDCContext (https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-slf4j/src/MDCContext.kt) implemented correctly? It wraps MDC which is a mutable thread local. The docs for ThreadContextElement seem to indicate that CopyableThreadContextElement should be used for this case.Rob Elliot
11/04/2025, 8:33 PMDavid Bieregger
11/08/2025, 5:47 PMcapacity and onBufferOverflow is not exposed in channelFlow ? Following usecase:
suspend fun receiveOutgoingTransactionsAsFlow(capacity: Int): Flow<EventTransaction> = channelFlow(capacity = capacity /* this parameter doesn't exist */) {
launch {
sharedFlow.collect { item ->
trySend(item).getOrElse {
close(Error("Private buffer of size $capacity overflowed."))
}
}
}
}
This logic ensures that subscribers of a SharedFlow can process independently, but also accounts for the fact that the collector might be too slow. The capacity parameter would be really useful in this case or am I overcomplicating things?marcinmoskala
11/11/2025, 10:49 AMgroostav
11/13/2025, 6:27 AMDispatchers.Main but dont want to put javafx on my classloader, what are my options? Do I have any, or do i need to roll my own MainCoroutineDispatcher implementation and insert it in the SPI/service-loader?S.
11/22/2025, 10:21 PMYoussef Shoaib [MOD]
11/27/2025, 10:38 PMlaunch {
Foo().use {
cancel()
ensureActive()
}
}.join()
class Foo : AutoCloseable {
override fun close() {
error("Foo")
}
}
IMO, use should prioritize the close exception over a CancellationException. This is because cancellation is considered "normal", while an error is truly exceptional, and thus should be reported.
The docs say:
> Thrown by cancellable suspending functions if the coroutine is cancelled while it is suspended. It indicates normal cancellation of a coroutine.
"_normal_" is the keyword there. Normal cancellation shouldn't mask a real exception.
To be clear, this is all from the suppression behaviour of use. My proposal is that use should never addSuppressed to a CancellationException. If you use try-finally instead of use, the code reports an error as you'd expect.
Error suppression is great, and it prioritising the body of use is great too, but it shouldn't prioritise CancellationException since that's an expected exception (dare I say, it's a control flow exception, which Scala also deprioritises similarly)Oğuzhan Soykan
11/28/2025, 10:41 AMmono{something().await()} from coroutine and reactive perspective? Underlying logic of mono uses GlobalScope and it worried me a little bit.
class ComponentHealthIndicator(
private val conn: Component
) : AbstractReactiveHealthIndicator("Component health check failed") {
override fun doHealthCheck(builder: Health.Builder) : Mono<Health> = mono {
val ping = conn.ping().await()
if (isHealthy(ping)) {
builder.up().build()
} else {
builder.down().build()
}
}
}jean
12/02/2025, 12:41 PMlaunchIn ? It can be set with scope.launch(dispatcher) { ... } but not with
someFlow
.onEach { ... }
.launchIn(scope) // <- I can't set a dispatcher here
I created this extension but I wonder if there’s a reason for not using it
fun <T> Flow<T>.launchIn(scope: CoroutineScope, coroutineDispatcher: CoroutineDispatcher): Job = scope.launch(coroutineDispatcher) {
collect()
}
// Usage
someFlow
.onEach { ... }
.launchIn(scope, dispatcher)Alexander Schmidt
12/02/2025, 3:03 PM@Configuration
class ConcurrencyConfig {
@Bean
fun executors(properties: ConcurrencyProperties): CustomExecutors {
val taskDecorator = MDCTaskDecorator()
return CustomExecutors(
Default = createExecutor(properties.default, taskDecorator),
Io = createExecutor(properties.io, taskDecorator, useVirtual = true)
)
}
@Bean
fun dispatchers(executors: CustomExecutors): CustomDispatchers {
return CustomDispatchers(
Default = executors.Default.asCoroutineDispatcher(),
Io = executors.Io.asCoroutineDispatcher()
)
}
}
Sharing pools between Java and Kotlin makes configuring max-threads and so on much simpler, although Java-Threads might block coroutines. That is the reason why the Io-Executor uses virtual threads. This works quite well - so far, but now I face problems when adding some required features our applications need.
1. All threads and coroutines must have an MDC correlationId
2. Executors and dispatchers must shutdown gracefully within the spring context, allowing running tasks to continue (until a timeout is reached) and also create sub-tasks (which the task needs to successfully complete, e.g. the last step of a task is to create 3 subtasks (io) and then aggregate results (default)). New tasks should be rejected while shutdown is in progress.reactormonk
12/03/2025, 6:31 PMAnton Krutko
12/04/2025, 6:22 AMcounter values themselves is not important in that example. Coroutines version 1.10.2 and 1.8.0.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
var counter = 0L
val mutableSharedFlow = MutableSharedFlow<Long>()
var cs: CoroutineScope? = null
fun start() {
cs?.cancel()
cs = CoroutineScope(Dispatchers.IO + SupervisorJob())
mutableSharedFlow.onEach {
delay(50)
println(it)
}.launchIn(cs!!)
cs!!.launch {
while (currentCoroutineContext().isActive) {
mutableSharedFlow.emit(counter)
counter++
}
}
}
suspend fun main() {
while (true) {
start()
println("restart, counter = $counter")
delay(3000)
}
}
The output we see:
....
47
48
49
50
51
52
53
54
restart, counter = 56
restart, counter = 56
restart, counter = 56
restart, counter = 56
restart, counter = 56
... // proper behavior (emissions happening) is never restored after this point
Any help will be greatly appreciated.Rey (Kingg22)
12/10/2025, 4:13 PMCompletionStage)* to Kotlin coroutines. Hibernate Reactive requires that all operations run on the same thread where the session and the database connection (Vert.x SQL event loop) were created.
My idea was to create some sort of internal thread/dispatcher pool to store the dispatcher used to open the connection and session and then use the same dispatcher when awaiting (using extension functions that convert CompletableFuture to suspending operations). However, it didn’t work, coroutines kept jumping between threads.
I also tried using the Vert.x coroutine dispatcher, which seems to propagate the event-loop context correctly, but that still doesn’t guarantee staying on the same physical thread, which HR seems to require.
Additionally, HR needs that sessions (and nested transactions) are shared across coroutine calls, so I would need to reuse the dispatcher of an already opened session. I’m thinking this might require a custom CoroutineContext element for transaction/session propagation, similar to how Exposed handles it. Am I on the right track?
So is there any reliable way to enforce strict single-thread execution for coroutines, similar to Vert.x event loop confinement, but without thread hopping? Or is this simply not how coroutines are intended to operate?
Thanks!