https://kotlinlang.org logo
Join SlackCommunities
Powered by
# flow
  • g

    galvas

    02/05/2024, 10:05 AM
    Hey folks, I have a sort of conceptual question. I'm looking to implement a queue with backpressure that can be modified asynchronously. So what we have now, we implemented it as a Hot Flow, the collector dictates how fast the items in the queue get processed, but while collecting 1 item, they can still for example clear the rest of the queue and suspend waiting for the next item which would be queued at some point later. We did this in a somewhat silly implementation using
    SharedFlow
    , but I was wondering if Flows are the correct API to represent this? Or if this is a problem other people have thought about before or not? There is no interface that indicates that a flow is hot right? Would it make more sense to my implementation to be a
    Channel
    for example?
  • g

    galvas

    02/05/2024, 10:08 AM
    Basically I want 1 single consumer dictating the backpressure, and potentially multiple producers adding events to it. I was looking at the Actor builder but it I'm not sure if that's the best fit or why it's obsolete.
    g
    • 2
    • 5
  • a

    Andrew K

    02/23/2024, 12:34 AM
    What is the best way to dynamically add/remove flows to another flow? I have this class so far...
    d
    • 2
    • 3
  • k

    KV

    03/04/2024, 1:10 PM
    Hi, what is the best way to skip the result from the flow? For example I have 3 states. Loading, Done, Success Loading and Done are always there but when success comes can we hide the Done state from the UI? 1st Scenario: I need to show->loading--------5 seconds ------done 2nd Scenario: I need to show ->loading--------5 seconds(done state should not visible in the UI ) ------ success It means -> loading ... done state skip as we have success Like I want to add the timer for done: if within 5 seconds or 10 seconds success doesn't come, then show the done state?
    p
    • 2
    • 2
  • k

    KotlinLeaner

    03/05/2024, 12:01 PM
    Hello, I have Events and I want to share same event again. So which flow is best for my use case
    SharedFlow
    ? I created this class for navigating the screens based on response. This code is just basic example.
    Copy code
    class Navigator : NavigationHandler {
    
        private val _destination: MutableSharedFlow<ScreenName> = MutableSharedFlow(
            replay = 1,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        )
    
        override val destination: SharedFlow<ScreenName> = _destination.asSharedFlow()
    
        override fun navigate(navigationDestination: ScreenName) {
            _destination.tryEmit(navigationDestination)
        }
    }
    u
    • 2
    • 4
  • d

    Djuro

    03/06/2024, 12:13 PM
    Hello everyone, I have a question I've been discussing with a couple of colleagues. Regarding
    MutableStateFlow<T>.update/getAndUpdate/updateAndGet
    , is this real atomicity (only one coroutine can read the state at a time when used and update it, meaning any other coroutine using
    .update
    for example won't be able to get/update the value until the one accessing the resource finishes. Which statement is the correct one? 1. While 1 coroutine is using
    .update
    on a given state the next one that tries to do so will be waiting 2. It is possible for state to be updated while
    .update
    is being executed meaning a race condition This is implementation of the state flow
    Copy code
    public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
        while (true) {
            val prevValue = value
            val nextValue = function(prevValue)
            if (compareAndSet(prevValue, nextValue)) {
                return
            }
        }
    }
    It is obvious from its implementation that while
    function(prevValue)
    is being executed, value can change in another coroutine and it is also written in docs of StateFlow:
    function may be evaluated multiple times, if value is being concurrently updated.
    Question is, why would this be desired behaviour? Why not lock the resource(state) from the very begining until the update is finished? This can cause an infinite loop also if race condition occurs. Update is then not a classical critical section, but
    compareAndSet
    is instead. What am I missing?
    w
    • 2
    • 8
  • g

    Geert

    04/07/2024, 8:11 AM
    Hi, I have a question on StateFlow: is it possible to fold a flow into a stateflow? I.e. is there a function like
    Copy code
    fun <T, R> Flow<T>.foldToStateFlow(initial: R, op: (acc: R, next: T) -> R): StateFlow<R>
    s
    • 2
    • 2
  • j

    Justin Breitfeller

    04/10/2024, 1:31 PM
    Hi All. I'm trying to make a flow that wants to emit the result of another flow but it also needs to trigger some side effect code once we're ready to start emitting that flows results. This is what I've done:
    Copy code
    return channelFlow {
         launch(start = CoroutineStart.UNDISPATCHED) { 
           messageSourceFlow.collect { send(it) 
         }
         try { 
             turnOnMessages()
             awaitCancellation() 
         } finally {
            turnOffMessages() 
         }
    }
    The key element is the
    launch(start = CoroutineStart.UNDISPATCHED) {
    because I want to make sure I'm ready to receive messages before I call the
    turnOnMessages
    method. Does this make sense? Is there an easier way to achieve this goal?
    e
    s
    • 3
    • 9
  • n

    nino

    04/11/2024, 6:59 AM
    I run the following code in the
    init
    method when starting my server
    Copy code
    flow {
        val stopTimesFirst = getStopTimesFirst()
        println("preparing first data finished")
        emit(stopTimesFirst)
        
        val stopTimesSecond = getStopTimesSecond()
        println("preparing second data finished")
        emit(stopTimesSecond)
    }.flatMapConcat { stopTimes ->
        tickerFlow(FIVE_MINUTES_IN_MS)
            .onEach { now ->
                println("Refreshing data: $now")
                results = refreshData(
                    now,
                    now.plusMinutes(90),
                    stopTimes.toList(),
                )
                println("Refreshing data: done")
            }
    }.flowOn(<http://Dispatchers.IO|Dispatchers.IO>)
        .launchIn(GlobalScope)
        .start()
    whereas
    tickerFlow
    is defined like
    Copy code
    fun tickerFlow(period: Long, initialDelay: Long = 0) = flow {
        delay(initialDelay)
        while (true) {
            emit(ZonedDateTime.now(zoneId))
            delay(period)
        }
    }
    I need an operator instead of the
    flatMapConcat
    operator that considers every
    emit
    from
    flow
    and runs the tickerFlow. Currently, only the first
    emit
    is considered. When using
    map
    the first
    emit
    doesn't trigger the ticker flow... Any suggestions?
    s
    • 2
    • 8
  • d

    Djuro

    04/11/2024, 8:16 AM
    Hi everyone, hope you are all doing well. I am interested in how you manage states when using
    StateFlow
    . For example, I would usually have a state and something like the following
    Copy code
    sealed class MyState{
       data class Success(
            val intList: List<Int>
            ....
       ) : MyState()
       data class Success2(
            ....
       ) : MyState()
    }
    Now when updating my state using
    update
    Copy code
    myState.update{ currentState ->
       if(currentState is MyState.Success){
          currentState.copy(intList = newIntList())
       } else currentState
    } 
    
    fun newIntList(): List<Int> {...}
    Is there some approach you take to avoid this
    if/else
    branching? I always hated it but didn't figure out yet how to get rid of it. Do you have some suggestions? for example, I can make a function
    myState.updateIf<MyState, MyState.Success>{ mySuccessState -> ... }
    but I am unsure if there is some other better approach
    • 1
    • 1
  • p

    pers

    04/14/2024, 3:01 PM
    Let's say I have this flow
    Copy code
    fun loadUsersByRegion(regions: List<String>): Flow<List<User>>
    I want to call fetchInfo on each item without nesting map....
    Copy code
    fun fetchInfo(user: User): Info
    is there any better?
    Copy code
    .flatMapConcat { users ->
                flow {
                    val usersInfo = users.map { user -> fetchInfo(user) }
                    emit(usersInfo)
                }
            }
    c
    • 2
    • 3
  • j

    Jérémy CROS

    04/15/2024, 3:16 PM
    Hi everyone, How would one model the fact that an item emitted by a flow might be the first this flow has emitted? Basically, when the app starts, we begin collecting a flow for display usage. But we want to detect and react differently for the first item. I've been struggling really hard all afternoon to avoid a
    private var hasFirstBeenEmitted : Boolean = false
    at class level Is there anyway to stay within the confines of flows? Can provide more details on the implementation if it's too vague and it helps 🙂
    c
    • 2
    • 7
  • g

    Geert

    04/17/2024, 3:14 PM
    Hi, maybe this is a trivial, but I’m trying to create a new Flow from a StateFlow that is based on the current and the previous state. Eg. a StateFlow<Int> with state 2 and then 5 and then 1 would cause the flow to emit 3 and then -4.
    p
    • 2
    • 5
  • d

    Djuro

    04/18/2024, 6:39 AM
    Hi guys, hope you are all doing well. I have a question about
    merge
    function of Kotlin flows. In the docs here it says that it doesn't preserve the order of elements. Do you maybe know of some function that merges multiple flows and preserves the order of elements?
    j
    • 2
    • 4
  • a

    Andrey

    04/21/2024, 7:16 AM
    Hi guys, I think my question is very simple, but for some reason I can't find the answer. I'm guessing I have an infinite SharedFlow, which 100ms makes a difference. How can I get all the values once per minute? I understand that you can simply use .take() or .toList(), but in the first case I need to specify the exact value that I want to get, and in the second I will get a constantly changing collection
    m
    s
    • 3
    • 6
  • s

    Stanislav Kral

    05/02/2024, 7:04 PM
    Hello guys, I want to implement a simple request (a general request, not necessary an HTTP) handler where incoming requests are processed serially and in FIFO order. I need to provide an interface that notifies the caller when: 1) request processing has started (e.g., the request the caller submitted has been taken out of the queue and is about to be processed), and 2) the result of the request processing is available. Error observability is also required. The request type is specified by the type parameter. In my particular case request processing means bytes are written to a socket. More in the thread 🧵
    • 1
    • 3
  • c

    CLOVIS

    05/06/2024, 3:44 PM
    I want an infinite flow, that emits once and then never again, but doesn't terminate. (context: the consumer of this flow expects flow termination to happen in a specific situation, and in this case, the flow can never reach that situation, but can also never emit again). I could do
    Copy code
    flow {
        emit(a)
    
        while (true) { delay(1_000_000) }
    }
    but that would consume scheduler resources, even if only a slight amount. Is there a way to tell the runtime "hey, this flow will never emit again, no need to schedule anything at all"?
    ✅ 1
  • e

    ephemient

    05/06/2024, 3:47 PM
    Copy code
    awaitCancellation()
    c
    • 2
    • 1
  • s

    Stanislav Kral

    05/12/2024, 8:50 PM
    Hello, I want to receive & process some events (triggered by the user or over HTTP) and I want to have it act like as something similar FIFO queue with a size of 1 (keep the latest event). Can I use this or should I use a Channel?
    Copy code
    private val manualPinKeySynchronizationEvents = MutableSharedFlow<Any>(
            replay = 1,
            extraBufferCapacity = 1,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        )
    I use
    replay = 1
    so that if there are any events before I start collecting this flow I don't miss them. Later on I use this flow in the following way:
    Copy code
    merge(failedSynchronizationsSource(), manualSynchronizationSource())
                .distinctUntilChanged()
                .mapLatest { synchronizationRequired ->
                     ...
  • r

    rdhruva

    05/30/2024, 6:30 PM
    I'm curious why the documentation for
    launchIn
    says this:
    Copy code
    Note that the resulting value of launchIn is not used and the provided scope takes care of cancellation.
    Why is that the case?
    launchIn
    returns a
    Job
    , and if I call
    .cancel()
    on that job, it does seem to cancel the flow. Am I missing something?
    e
    • 2
    • 5
  • c

    Colton Idle

    06/26/2024, 2:41 PM
    I have a hot flow that is being collected in multiple places, but I'd only like for one of the collectors to actually get the emission. Doesn't matter if it's the first collector or last (I suppose last collector wins would work best). But is there a way to do that? I'm a flow noob. sorry if im missing something simple.
    s
    g
    a
    • 4
    • 13
  • t

    Tobias

    07/01/2024, 8:26 AM
    When I collect a flow I run into the problem that although the value gets changed nothing happens
    Copy code
    val monitorDongleEuiJob = settings.dongleEui.takeWhile { dongleEui ->
            dongleEui.isNotEmpty()
        }.onEach { dongleEui ->
            // do something, but nothing happens
        }.catch {
            print(it.message)
        }.launchIn(coroutineScope)
    When I change the code to this, it always works, but I don’t undestand why
    Copy code
    val monitorDongleEuiJob = settings.dongleEui.onEach { dongleEui ->
        if (dongleEui.isEmpty()) {
            return@onEach
        }
        // do something
    }.catch {
        print(it.message)
    }.launchIn(coroutineScope)
    Shouldn’t it basically be the same? Or am I misunderstanding
    takeWhile
    ? Maybe does it stop collection when the condition has been met?
    s
    • 2
    • 5
  • k

    Kashismails

    07/01/2024, 1:31 PM
    Copy code
    sealed interface SocketResponse<out T> {
        data class Data<T>(val item: T) : SocketResponse<T>
        data class Error(val exception: Throwable) : SocketResponse<Nothing>
        object ConnectionLost : SocketResponse<Nothing>
        object Loading : SocketResponse<Nothing>
    }
    I have this class and a function that returns a flow such as
    fun SocketFun() = flow<SocketResponse>{
    emit(SocketResonse.Data(SomeClass(variableA, variableB)
    emit(SocketResonse.ConnectionLost)
    }
    Now flow can emit two different states of one parent, I want to make an extension to track state of flow such as this one
    Copy code
    fun <T> Flow<T>.asResult(shouldEmit: Boolean = true): Flow<Result<T>> {
        return this
            .map<T, Result<T>> {
                Result.Success(it)
            }
            .onStart { emit(Result.Loading) }
            .catch {
                if (it is Exception) {
                    errorLogger("resultException", it.message ?: "Exception in result Flow.")
                    if (shouldEmit) {
                        emit(Result.Error(it.getRealException()))
                    }
                } else {
                    emit(Result.Error(CustomMessage.ExceptionMessage("System not responding.")))
                }
            }
    }
    However, i want to skip map because i dont want to map anything its either data or connection lost start and catch will be in the extension function. It wont work without map but i have to do this at multiple places in my codebase and dont want to repeat onstart buffer and catch everywhere as logic for all these will be same. What is the best way to handle this?
  • m

    Malik Hasan

    07/09/2024, 6:32 PM
    Hi, I am trying to implement offline sync functionality. I have my NetworkConnectionFlow which emits true/false based on connectivity, and I have UnsyncedItemsFlow which emits items from Room which need to be synced to the backend server whenever connectivity comes online. Is it possible to collect from the UnsyncedItemsFlow only when the network flow returns true? I don't want to drop any unsynced items on the floor either, so I want the emissions to remain unprocessed until network comes online Thanks in advance
    • 1
    • 1
  • p

    Peter Kievits

    07/18/2024, 2:07 PM
    Hi Everyone! Just looking into using a
    SharedFlow
    in our backend application. We're receiving updates and are publishing these on the
    SharedFlow
    . This works great because it very easy to understand workflow. However, we would like to implement metrics around this. Specifically the SharedFlow has a predefined buffer, and we'd like to be aware of when it is filling up. We can detect when the buffer is full, because the
    tryEmit
    returns
    false
    . Is there any way for us to get an early warning here, in case of a slow consumer? It looks like we can check the number of consumers on the flow, but there is no way to check the buffer status. Has anyone got any ideas how to monitor this?
    r
    • 2
    • 5
  • v

    Vaibhav Jaiswal

    07/28/2024, 9:29 AM
    Hi I have a similar structure in my project, link to playground https://pl.kotl.in/WM-fc97xm
    Copy code
    abstract class Parent {
        abstract val flow1: StateFlow<Int>
        
        val flow2 = flow1.mapLatest { it.toString() }
        
    }
    
    class Child : Parent() {
        override val flow1 = MutableStateFlow(1).asStateFlow()
    }
    
    fun main(){
        val child = Child()
        
        runBlocking {
            child.flow2.collect {
                
            }
        }
    }
    This seems to crash my app, saying > java.lang.NullPointerException: Attempt to invoke interface method 'java.lang.Object kotlinx.coroutines.flow.Flow.collect(kotlinx.coroutines.flow.FlowCollector, kotlin.coroutines.Continuation)' on a null object reference Also fails on compile in Kotlin Playground Does anyone know whats wrong App Crash Stack Trace in 🧵 If i make the 2nd flow also abstract it works
    • 1
    • 1
  • p

    prudhvi reddy

    08/04/2024, 6:54 PM
    👋 Hey folks Did anyone tried testing the
    catch
    operator? This test case fails with a crash. Whereas if i don't use
    catch
    operator but used
    try-catch
    the test is as expected
    f
    e
    • 3
    • 10
  • a

    Alina Dolgikh [JB]

    08/09/2024, 3:27 PM
    Hi everyone! I’m considering merging this channel with #C1CFAFJSK to avoid overlap in discussion topics. By “merge,” I mean archiving this channel and inviting you to join #C1CFAFJSK if you’re not already there. Please let me know if there are any reasons to keep both channels. Thank you!
    🤷 1
    💯 8
  • d

    Djuro

    09/11/2024, 1:29 PM
    Hello everyone, hope you are doing fine Is there a way to add
    replay
    cache to
    Flow
    sth like
    Flow.buffer(capacity = 1, replay = 1, onBufferOverflowStrategy = DROP_OLDEST)
    for example?
    a
    • 2
    • 2
  • a

    Alina Dolgikh [JB]

    09/16/2024, 10:03 AM
    archived the channel
    😅 2
    😨 2
    ⏸️ 1