Don Mitchell
05/02/2025, 5:25 PMCoroutineScope.cancel
doesn't seem to work at least in the kotlin playground or I'm missing something. I expected the following to cancel on i==6
but it doesn't. It throws the error after the loop finishes.
import kotlinx.coroutines.*
@OptIn(DelicateCoroutinesApi::class)
suspend fun main() = coroutineScope {
repeat(10) { i ->
keepRunningUnlessFatal {
if (i == 3) {
println("throwing ISE")
throw IllegalStateException("Illegal 3")
}
if (i == 6) {
println("throwing OOM")
throw OutOfMemoryError("OOM")
}
println("i ${i}")
}
}
}
suspend fun <T> CoroutineScope.keepRunningUnlessFatal(
message: String = "Continuing execution after exception",
block: suspend () -> T
): T? = try {
block()
} catch (e: VirtualMachineError) {
this.cancel("FATAL_ERROR_MSG", e)
null
// HERE - uncomment to see difference
//throw e
} catch (e: InterruptedException) {
this.cancel("FATAL_ERROR_MSG", e)
null
} catch (e: LinkageError) {
this.cancel("FATAL_ERROR_MSG", e)
null
} catch (e: CancellationException) {
this.cancel(e)
null
} catch (e: Exception) {
println(e)
println(message)
null
}
ursus
05/02/2025, 11:42 PMvar webSocketSession = DefaultClientWebSocketSession? = null
try {
webSocketSession = ktorClient.webSocketSession(url)
....
} finally {
webSocketSession?.close() <----
}
Coroutines general question:
ktor calls close
in finally block as to do graceful exit.
However close
is a suspend function.
How can it work reliably? Aren't all suspend functions no-op after cancellation unless wrapped with NomCancellable
?
Am I missing something?Jan
05/08/2025, 4:58 PMYassine Abou
05/12/2025, 6:20 PMNotImplementedError: Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic
kotlinx.coroutines.error_$external_fun
Colton Idle
05/13/2025, 1:13 PM谢朋刚
05/15/2025, 6:12 AMadjorno
05/15/2025, 8:19 AMinternal class AnalyticsQueueWithFileCache(
private val delegate: AnalyticsQueue,
) : AnalyticsQueue {
override val events: List<MockEvent> = delegate.events
private val scope = CoroutineScope(SupervisorJob())
private var fileDumpJob: Job? = null
override fun emitEvent(event: MockEvent) {
println("New event - ${event.event}")
delegate.emitEvent(event)
val newAnalytics = Analytics(events.subList(0, events.size).sortedBy { it.timestamp })
val prevJob = fileDumpJob
prevJob?.cancel()
println("cancel - ${event.event}")
fileDumpJob = null
fileDumpJob = scope.launch {
println("before cancelAndJoin - ${event.event}")
prevJob?.cancelAndJoin()
println("after cancelAndJoin - ${event.event}")
dumpAnalyticsToFile(event, newAnalytics)
fileDumpJob = null
}
}
override fun clear() {
delegate.clear()
}
private suspend fun dumpAnalyticsToFile(event: MockEvent, analytics: Analytics) = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
ensureActive()
val json = json.encodeToString<Analytics>(analytics)
ensureActive()
saveDataToPublicFile(json, "analytics.json")
println("File dumped - ${event.event}")
}
}
Unfortunately I am randomly missing 1 or 2 of last emitted events in the file compared to the in-memory implementation.
It feels I am messing up something with the coroutines. Could anyone help to spot the issue?Stanislav Kral
05/15/2025, 9:04 AMviewModelScope
) so that I can wipe all in-flight work whenever my filter changes—rather than cancelling each Job?
by hand.
Here’s what I’ve got today:
class MyViewModel : ViewModel() {
private var loadNextPageJob: Job? = null
private var refreshJob: Job? = null
fun onLoadNextPage() {
loadNextPageJob = viewModelScope.launch {
// …fetch next page, show retry SnackBar, recurse on retry…
}
}
fun onRefresh() {
refreshJob = viewModelScope.launch {
// …fetch fresh data, toggle spinner, show error SnackBar…
}
}
fun setFilter(filter: Filter) {
// I want to cancel *all* running work here…
loadNextPageJob?.cancel()
refreshJob?.cancel()
// …then apply the new filter
}
}
I thought of creating a custom scope, but I am not sure how to properly "inherit" from viewModelScope
Vivek Modi
05/15/2025, 8:36 PMStragaSevera
05/19/2025, 6:54 AM+------------+-----------+-------------+--------------------------------------------------------------------+
| Total size | Instances | Largest | Class name |
+------------+-----------+-------------+--------------------------------------------------------------------+
| 2.02GiB | 19346752 | 112.00bytes | kotlinx.coroutines.ChildHandleNode |
| 1.54GiB | 18829802 | 88.00bytes | kotlinx.coroutines.NodeList |
| 1.19GiB | 9414895 | 136.00bytes | kotlinx.coroutines.JobSupport$ChildCompletion |
| 1.18GiB | 39507640 | 32.00bytes | kotlin.coroutines.CombinedContext |
| 1.09GiB | 14593270 | 80.00bytes | kotlinx.coroutines.StandaloneCoroutine |
| 938.18MiB | 10465478 | 94.00bytes | kotlinx.coroutines.internal.DispatchedContinuation |
| 742.92MiB | 5125080 | 152.00bytes | ___my_suspend_function__$append$2$1 |
| 529.53MiB | 4957649 | 112.00bytes | kotlinx.coroutines.ChildContinuation |
| 528.99MiB | 4333524 | 128.00bytes | ___my_suspend_function_2__$runLoop$2$3$1$1 |
| 514.25MiB | 4786174 | 267.04KiB | java.lang.Object[] |
| 460.03MiB | 5131677 | 94.00bytes | kotlinx.coroutines.CancellableContinuationImpl |
| 432.55MiB | 9449215 | 48.00bytes | kotlinx.coroutines.JobSupport$Finishing |
| 427.99MiB | 5099730 | 88.00bytes | kotlinx.coroutines.internal.ScopeCoroutine |
| 321.36MiB | 4680202 | 72.00bytes | kotlinx.coroutines.internal.ListClosed |
| 194.54MiB | 5099727 | 40.00bytes | kotlinx.coroutines.sync.MutexImpl$CancellableContinuationWithOwner |
The code is structured in a way that a Java library (Pcap4j) writes a lot of data in a normal Java queue, and then coroutine-aware code reads this pieces of data, processes them and passes along via channels (with both CPU-bound and IO-bound steps, each step has its own dispatcher with an independent executor).
Just from the bird's eye view, are there any obvious better approaches?viluon
05/19/2025, 8:07 AMflowOn
obviously doesn't help: not only is it happening outside of a collection (so it doesn't see flow items), it affects the upstream, not downstream.
I'm aware that one can use `channelFlow`s to switch context arbitrarily. However, this (to the best of my knowledge) doesn't affect downstream flows: there's a channel in-between that separates them.
TL;DR: how do I make the flow element part of the downstream coroutine context (to propagate it through the chain of emissions) while preserving flow requirements? Is it even possible?efemoney
05/19/2025, 1:39 PMsend
does not suspend on all subscribers 🤔kite
05/20/2025, 1:06 AM1. Why do we still need to explicitly callon a childcomplete()
, even when it's structurally connected to a parent viaJob
?Job(parent)
2. How do the internal implementations differ between using(which creates anlaunch
) and creating aAbstractCoroutine
directly, and how do those differences affect structured concurrency and job completion behavior?Job()
martmists
05/22/2025, 3:15 PMMay 22 17:13:36 preon bash[49023]: Exception in thread "DefaultDispatcher-worker-25" java.lang.NoClassDefFoundError: kotlinx/coroutines/CoroutineExceptionHandlerKt
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:126)
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
May 22 17:13:36 preon bash[49023]: at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
May 22 17:13:36 preon bash[49023]: Caused by: java.lang.ClassNotFoundException: kotlinx.coroutines.CoroutineExceptionHandlerKt
May 22 17:13:36 preon bash[49023]: ... 6 more
How do I fix this? I have kotlinx-coroutines-core in my shadowJar.Hong Phuc
05/25/2025, 5:23 AMBhaskar Singh
05/25/2025, 11:20 AM<http://Dispatchers.IO|Dispatchers.IO>
, I understand that it's optimized for I/O operations. Internally, does this utilize mechanisms like Java NIO or even OS-level calls such as epoll_create1()
? Is it accurate to think of it as posting a task to the OS and letting the OS notify the application when the I/O completes?
3. CPU-bound tasks and Dispatchers.Default:
4. For CPU-bound tasks, we typically use Dispatchers.Default
, which is backed by a shared pool of threads equal to the number of CPU cores (e.g. 8 threads for 8 cores).
◦ Does this mean that only up to 8 CPU-bound coroutine tasks can be executed concurrently? I know more can be submitted, but they would be scheduled based on thread availability.
◦ Are these tasks tied to specific CPU cores? For instance, if Task A starts on Core 1, will it continue on Core 1 for the sake of cache locality (i.e. core affinity), or is that not guaranteed?
◦ Is time-slicing used when there are more tasks than available cores?
Would really appreciate any clarification or pointers. Thanks in advance!Irsath Kareem
05/27/2025, 4:07 PMsomeGameRelatedFlow
.onCompletion { cause ->
println("onCompletion Called")
if(cause is CancellationException) {
try {
gameDao.invalidateJoinedRooms() // Suspending Call
} catch (e: CancellationException) {
CoroutineScope(Dispatchers.IO).async { //New CoroutineScope,because currentContext is already cancelled
launch {
delay(5000)
this@async.cancel()
}
gameDao.invalidateJoinedRooms() // Suspending Call
cancel()
}.await()
coroutineContext.ensureActive()
}
}
}
.stateIn(
scope, SharingStarted.WhileSubscribed(5000), null
)
My Assumption would be as follows,
When scope is cancelled, onCompletion's New CoroutineScope(IO) would do either of the following.....
1. At most 5000ms only it runs (OR)
2. if gameDao.invalidateJoinedRooms() finishes, onCompletion also finishes.
This pattern recommended?Tian Tian098
05/28/2025, 5:59 PMFlow.buffer
function, which got a decent result
files.asFlow()
.map { async { download(it) } }
.buffer(CONCURRENCY)
.collect { it.await() }
But this runs all the operations in order. In particular, if operation 1 takes really long, and the rest of the operations in the buffer are done, then I won't start any new operations until operation 1 is done.
Is there a better way?dany giguere
06/01/2025, 4:07 PM@get:UniqueEmail
val email: String,
but ConstraintValidator are not suspendable. So I have to run it blocking like :
@Component
class UniqueEmailValidator @Autowired constructor(
private val userService: UserService
) : ConstraintValidator<UniqueEmail, String> {
override fun isValid(value: String?, context: ConstraintValidatorContext): Boolean {
if (value == null) return true
// Run suspend function in a blocking way for validation
return runBlocking { userService.findByEmail(value) == null }
}
}
will that runBlocking somehow breaks (slows) all the coroutines afterward. I mean all the coroutine code that comes after that ? The repo is here: https://github.com/danygiguere/spring-boot-3-reactive-with-kotlin-coroutinesCLOVIS
06/03/2025, 9:48 AMsuspend fun doSomething() = coroutineScope {
val data1 = async { fetchData1() }
val data2 = async { fetchData2() }
val data3 = async { fetchData3() }
doSomething(data1.await(), data2.await(), data3.await())
}
Is there a way to declare that this function should use high parallelism when it's latency-critical, and use parallelism only if the system is idle? We don't want to simply reduce the parallelism of non-critical operations, because if the system is doing few things, having high parallelism for these operations is still beneficial.
Maybe some kind of priority? Maybe with another dispatcher?Hong Phuc
06/05/2025, 2:36 AMWithContext
to do a blocking operation like connecting and writing a db statement, if the threads of the dispatcher, say <http://Dispatcher.IO|Dispatcher.IO>
, are already used up and no thread is available immediately, do the coroutine just wait until a thread is available and essentially block the thread it’s running on? Thanks in advanceetsang
06/06/2025, 7:17 PMTanvi Goyal
06/10/2025, 4:37 PMYassine Abou
06/10/2025, 9:45 PM{
"message":"Uncaught SyntaxError: Unexpected token '<'",
"type":"error"
} isTrusted,true
kotlinx.coroutines.error_$external_fun @ composeApp.uninstantiated.mjs:189
Is this primarily a
SQLDelight configuration issue or a coroutines uncaught
exception?
Relevant Code Snippets:
Database Configuration:
private val workerScriptUrl: String =
js("""new URL("@cashapp/sqldelight-sqljs-worker/sqljs.worker.js", import.meta.url)""")
actual val platformModule = module {
single {
val driver = WebWorkerDriver(Worker(workerScriptUrl)).apply {
enableForeignKeys()
}
LlmsDatabaseWrapper(driver, LlmsDatabase(driver))
}
}
Webpack Configuration (sqljs.js)
config.resolve = {
fallback: {
fs: false,
path: false,
crypto: false,
}
};
const CopyWebpackPlugin = require('copy-webpack-plugin');
config.plugins.push(
new CopyWebpackPlugin({
patterns: [
'../../node_modules/sql.js/dist/sql-wasm.wasm'
]
})
);
for more context. check out this repository link: https://github.com/yassineAbou/LLMSDon Mitchell
06/12/2025, 8:32 PMFlow invariant is violated
trying to flow lines from aws kotlin s3 sdk getObject. I've tried nesting the flow
at all different levels and get stream closed if it's nested (altho getting rid of the use
and any other auto close may fix those as I suspect any outside the flow go through their finally
blocks when we're returning the flow)
fun <T : Any> S3Client.flowObject(
bucketName: String,
key: String,
deserializer: (String) -> T
): Flow<T> {
val request = GetObjectRequest {
this.bucket = bucketName
this.key = key
}
return flow {
this@flowObject.getObject(request) { response ->
checkNotNull(response.body, { "Failed to get object from S3: $bucketName/$key" }).let { inputStream ->
inputStream.toInputStream().bufferedReader(Charsets.UTF_8).useLines { lines ->
lines.forEach { emit(deserializer(it)) }
}
}
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
}
any obvious fixes or better explanations?Don Mitchell
06/18/2025, 6:02 PMwithContext
doc string says
Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns the result.which sounds like it auto implements a callback pattern, but I doubt it. I ask bc copilot implies that I can wrap java file io methods in
withContext
to get it to do the filesystem request and then suspend while waiting for the OS. example non-suspending call which I'd like to suspend
inputStream.bufferedReader().use { reader ->
reader.readLine()?.split(",")?.map { it.trim() } ?: emptyList()
}
marcinmoskala
06/25/2025, 7:30 AMbaxter
06/27/2025, 9:18 PMclass kotlinx.coroutines.test.TestCoroutineScheduler$timeSource$1 cannot be cast to class kotlin.time.TimeSource$WithComparableMarks
This is happening while running unit tests in a SpringBoot project. Using Kotlin 2.2.0, and Coroutines 1.10.2. Anyone have any ideas?ursus
06/28/2025, 3:29 PMaudio processing producer consumer
system. Typically in blocking world that is solved with 2 threads and a RingBuffer
.
Say now I want to do that in ktor server, where producer is a web socket.
The ktor ws api is suspending.
Does it make sense to implement this with `Channel`s? Don’t channels require immutable data?
How would you go about it?
Is it a good idea to flatten a byte array and send each byte into a Channel?Don Mitchell
07/01/2025, 6:53 PMabstract class TraversingPrimitivePreprocessor : Preprocessor {
abstract fun handle(node: PrimitiveNode, context: DecoderContext): ConfigResult<Node>
needs to use the kotlin aws sdk to get the value. All of the kotlin aws sdk methods are appropriately suspending. Of course Claude spams runBlocking
which I know how to use and to avoid if at all possible.