galvas
02/05/2024, 10:05 AMSharedFlow
, but I was wondering if Flows are the correct API to represent this? Or if this is a problem other people have thought about before or not? There is no interface that indicates that a flow is hot right? Would it make more sense to my implementation to be a Channel
for example?galvas
02/05/2024, 10:08 AMAndrew K
02/23/2024, 12:34 AMKV
03/04/2024, 1:10 PMKotlinLeaner
03/05/2024, 12:01 PMSharedFlow
? I created this class for navigating the screens based on response. This code is just basic example.
class Navigator : NavigationHandler {
private val _destination: MutableSharedFlow<ScreenName> = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val destination: SharedFlow<ScreenName> = _destination.asSharedFlow()
override fun navigate(navigationDestination: ScreenName) {
_destination.tryEmit(navigationDestination)
}
}
Djuro
03/06/2024, 12:13 PMMutableStateFlow<T>.update/getAndUpdate/updateAndGet
, is this real atomicity (only one coroutine can read the state at a time when used and update it, meaning any
other coroutine using .update
for example won't be able to get/update the value until the one accessing the resource finishes.
Which statement is the correct one?
1. While 1 coroutine is using .update
on a given state the next one that tries to do so will be waiting
2. It is possible for state to be updated while .update
is being executed meaning a race condition
This is implementation of the state flow
public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
while (true) {
val prevValue = value
val nextValue = function(prevValue)
if (compareAndSet(prevValue, nextValue)) {
return
}
}
}
It is obvious from its implementation that while function(prevValue)
is being executed, value can change in another coroutine and it is also written in docs of StateFlow: function may be evaluated multiple times, if value is being concurrently updated.
Question is, why would this be desired behaviour? Why not lock the resource(state) from the very begining until the update is finished? This can cause an infinite loop also if race condition occurs. Update is then not a classical critical section, but compareAndSet
is instead. What am I missing?Geert
04/07/2024, 8:11 AMfun <T, R> Flow<T>.foldToStateFlow(initial: R, op: (acc: R, next: T) -> R): StateFlow<R>
Justin Breitfeller
04/10/2024, 1:31 PMreturn channelFlow {
launch(start = CoroutineStart.UNDISPATCHED) {
messageSourceFlow.collect { send(it)
}
try {
turnOnMessages()
awaitCancellation()
} finally {
turnOffMessages()
}
}
The key element is the launch(start = CoroutineStart.UNDISPATCHED) {
because I want to make sure I'm ready to receive messages before I call the turnOnMessages
method. Does this make sense? Is there an easier way to achieve this goal?nino
04/11/2024, 6:59 AMinit
method when starting my server
flow {
val stopTimesFirst = getStopTimesFirst()
println("preparing first data finished")
emit(stopTimesFirst)
val stopTimesSecond = getStopTimesSecond()
println("preparing second data finished")
emit(stopTimesSecond)
}.flatMapConcat { stopTimes ->
tickerFlow(FIVE_MINUTES_IN_MS)
.onEach { now ->
println("Refreshing data: $now")
results = refreshData(
now,
now.plusMinutes(90),
stopTimes.toList(),
)
println("Refreshing data: done")
}
}.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
.launchIn(GlobalScope)
.start()
whereas tickerFlow
is defined like
fun tickerFlow(period: Long, initialDelay: Long = 0) = flow {
delay(initialDelay)
while (true) {
emit(ZonedDateTime.now(zoneId))
delay(period)
}
}
I need an operator instead of the flatMapConcat
operator that considers every emit
from flow
and runs the tickerFlow. Currently, only the first emit
is considered. When using map
the first emit
doesn't trigger the ticker flow... Any suggestions?Djuro
04/11/2024, 8:16 AMStateFlow
.
For example, I would usually have a state and something like the following
sealed class MyState{
data class Success(
val intList: List<Int>
....
) : MyState()
data class Success2(
....
) : MyState()
}
Now when updating my state using update
myState.update{ currentState ->
if(currentState is MyState.Success){
currentState.copy(intList = newIntList())
} else currentState
}
fun newIntList(): List<Int> {...}
Is there some approach you take to avoid this if/else
branching? I always hated it but didn't figure out yet how to get rid of it.
Do you have some suggestions?
for example, I can make a function myState.updateIf<MyState, MyState.Success>{ mySuccessState -> ... }
but I am unsure if there is some other better approachpers
04/14/2024, 3:01 PMfun loadUsersByRegion(regions: List<String>): Flow<List<User>>
I want to call fetchInfo on each item without nesting map....
fun fetchInfo(user: User): Info
is there any better?
.flatMapConcat { users ->
flow {
val usersInfo = users.map { user -> fetchInfo(user) }
emit(usersInfo)
}
}
Jérémy CROS
04/15/2024, 3:16 PMprivate var hasFirstBeenEmitted : Boolean = false
at class level
Is there anyway to stay within the confines of flows?
Can provide more details on the implementation if it's too vague and it helps 🙂Geert
04/17/2024, 3:14 PMDjuro
04/18/2024, 6:39 AMmerge
function of Kotlin flows. In the docs here it says that it doesn't preserve the order of elements.
Do you maybe know of some function that merges multiple flows and preserves the order of elements?Andrey
04/21/2024, 7:16 AMStanislav Kral
05/02/2024, 7:04 PMCLOVIS
05/06/2024, 3:44 PMflow {
emit(a)
while (true) { delay(1_000_000) }
}
but that would consume scheduler resources, even if only a slight amount.
Is there a way to tell the runtime "hey, this flow will never emit again, no need to schedule anything at all"?ephemient
05/06/2024, 3:47 PMawaitCancellation()
Stanislav Kral
05/12/2024, 8:50 PMprivate val manualPinKeySynchronizationEvents = MutableSharedFlow<Any>(
replay = 1,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
I use replay = 1
so that if there are any events before I start collecting this flow I don't miss them.
Later on I use this flow in the following way:
merge(failedSynchronizationsSource(), manualSynchronizationSource())
.distinctUntilChanged()
.mapLatest { synchronizationRequired ->
...
rdhruva
05/30/2024, 6:30 PMlaunchIn
says this:
Note that the resulting value of launchIn is not used and the provided scope takes care of cancellation.
Why is that the case? launchIn
returns a Job
, and if I call .cancel()
on that job, it does seem to cancel the flow. Am I missing something?Colton Idle
06/26/2024, 2:41 PMTobias
07/01/2024, 8:26 AMval monitorDongleEuiJob = settings.dongleEui.takeWhile { dongleEui ->
dongleEui.isNotEmpty()
}.onEach { dongleEui ->
// do something, but nothing happens
}.catch {
print(it.message)
}.launchIn(coroutineScope)
When I change the code to this, it always works, but I don’t undestand why
val monitorDongleEuiJob = settings.dongleEui.onEach { dongleEui ->
if (dongleEui.isEmpty()) {
return@onEach
}
// do something
}.catch {
print(it.message)
}.launchIn(coroutineScope)
Shouldn’t it basically be the same?
Or am I misunderstanding takeWhile
? Maybe does it stop collection when the condition has been met?Kashismails
07/01/2024, 1:31 PMsealed interface SocketResponse<out T> {
data class Data<T>(val item: T) : SocketResponse<T>
data class Error(val exception: Throwable) : SocketResponse<Nothing>
object ConnectionLost : SocketResponse<Nothing>
object Loading : SocketResponse<Nothing>
}
I have this class and a function that returns a flow such as
fun SocketFun() = flow<SocketResponse>{
emit(SocketResonse.Data(SomeClass(variableA, variableB)
emit(SocketResonse.ConnectionLost)
}
Now flow can emit two different states of one parent, I want to make an extension to track state of flow such as this one
fun <T> Flow<T>.asResult(shouldEmit: Boolean = true): Flow<Result<T>> {
return this
.map<T, Result<T>> {
Result.Success(it)
}
.onStart { emit(Result.Loading) }
.catch {
if (it is Exception) {
errorLogger("resultException", it.message ?: "Exception in result Flow.")
if (shouldEmit) {
emit(Result.Error(it.getRealException()))
}
} else {
emit(Result.Error(CustomMessage.ExceptionMessage("System not responding.")))
}
}
}
However, i want to skip map because i dont want to map anything its either data or connection lost start and catch will be in the extension function. It wont work without map but i have to do this at multiple places in my codebase and dont want to repeat onstart buffer and catch everywhere as logic for all these will be same. What is the best way to handle this?Malik Hasan
07/09/2024, 6:32 PMPeter Kievits
07/18/2024, 2:07 PMSharedFlow
in our backend application. We're receiving updates and are publishing these on the SharedFlow
. This works great because it very easy to understand workflow. However, we would like to implement metrics around this. Specifically the SharedFlow has a predefined buffer, and we'd like to be aware of when it is filling up. We can detect when the buffer is full, because the tryEmit
returns false
. Is there any way for us to get an early warning here, in case of a slow consumer? It looks like we can check the number of consumers on the flow, but there is no way to check the buffer status. Has anyone got any ideas how to monitor this?Vaibhav Jaiswal
07/28/2024, 9:29 AMabstract class Parent {
abstract val flow1: StateFlow<Int>
val flow2 = flow1.mapLatest { it.toString() }
}
class Child : Parent() {
override val flow1 = MutableStateFlow(1).asStateFlow()
}
fun main(){
val child = Child()
runBlocking {
child.flow2.collect {
}
}
}
This seems to crash my app, saying
> java.lang.NullPointerException: Attempt to invoke interface method 'java.lang.Object kotlinx.coroutines.flow.Flow.collect(kotlinx.coroutines.flow.FlowCollector, kotlin.coroutines.Continuation)' on a null object reference
Also fails on compile in Kotlin Playground
Does anyone know whats wrong
App Crash Stack Trace in 🧵
If i make the 2nd flow also abstract it worksprudhvi reddy
08/04/2024, 6:54 PMcatch
operator?
This test case fails with a crash.
Whereas if i don't use catch
operator but used try-catch
the test is as expectedAlina Dolgikh [JB]
Djuro
09/11/2024, 1:29 PMreplay
cache to Flow
sth like Flow.buffer(capacity = 1, replay = 1, onBufferOverflowStrategy = DROP_OLDEST)
for example?Alina Dolgikh [JB]