https://kotlinlang.org logo
Join SlackCommunities
Powered by
# rx
  • g

    Gabe Kauffman

    11/18/2020, 11:44 PM
    You can't do
    publishSubject.observeOn(Schedulers.computation()).onNext(AudioProcessingHelper.calculateAudioVolume(audioSample))
  • g

    Gabe Kauffman

    11/18/2020, 11:45 PM
    You also can't do
    val publishSubject = PublishSubject.create<Double>().toSerialized().observeOn(Schedulers.computation())
  • g

    Gabe Kauffman

    11/18/2020, 11:45 PM
    Maybe I shouldn't even be using PublishSubject here, not sure
    g
    s
    • 3
    • 14
  • s

    Sami Eljabali

    12/02/2020, 4:53 AM
    I'm trying to achieve the following in a Rx command: 1) Get all locations & their cached temperatures from db. 2) For every missing cached temp, fetch temp from api. 3) Aggregate temps & return. I did #1, leaving me with a mixed bag of areas with cached temperatures and areas needing api fetches. • How do I split this stream into 2? • How do I then aggregate the results into 1 list after the stream needing api fetches is done?
    g
    c
    • 3
    • 4
  • i

    iex

    12/02/2020, 2:41 PM
    I have 2 separate observables that emit respectively success and error for a specific request. I'm wrapping this in a new observable, that triggers the request and returns a
    Single
    with success/error state. Thinking about how to implement this. Something like
    Copy code
    val successObservable: Observable<SomeType>
    val errorObservable: Observable<AnotherType> // AnotherType can be mapped into a Throwable
    
    request.flatMap { 
        // First event of successObservable OR errorObservable. successObservable -> just(event), errorObservable -> error(error)
    }
    Not sure what operators to use here,
    zip
    with
    take(1)
    and
    startsWith(none)
    crossed my mind but it doesn't feel right...
    m
    • 2
    • 3
  • o

    oday

    12/03/2020, 9:47 PM
    how do I poll with Rx? I want to keep querying an endpoint until its content include some value, and I want to be querying it 2 seconds apart for example until that result is found
    b
    m
    • 3
    • 8
  • o

    oday

    12/08/2020, 10:31 AM
    Copy code
    purchaseFlowStore
                .getTransaction(transactionId)
                .takeUntil { it.state == "done" || it.state == "error" || it.state == "authorized" }
                .doOnError {
                    logger.debug("TransactionInfo error! $it")
                }
                .repeatWhen { Observable.timer(5, TimeUnit.SECONDS) }
                .viewModelSubscription({
                    logger.debug("TransactionInfo is ${it.state}")
    
                    when (it.state) {
                        "done", "authorized" -> {
                            getOrder()
                        }
                        "error" -> {
                            ShowRequestFailedDialog()
                        }
                    }
                }, {
                    logger.debug("TransactionInfo error! $it")
                })
    s
    • 2
    • 1
  • o

    oday

    12/08/2020, 10:31 AM
    this only runs twice
  • o

    oday

    12/08/2020, 10:31 AM
    the first time, and the repeatWhen block
  • o

    oday

    12/08/2020, 10:31 AM
    even though
    state
    is definitely still not when of those 3 options in takeUntil
  • o

    oday

    12/08/2020, 12:29 PM
    there
    Copy code
    Observable.interval(2000, TimeUnit.MILLISECONDS)
                    .repeatUntil {
                        System.currentTimeMillis() - startTime > 10000
                        true
                    }
                    .subscribe {
                        purchaseFlowStore
                            .getTransaction(transactionId)
                            .viewModelSubscription({
                                when (it.state) {
                                    "done", "authorized" -> {
                                        getOrder()
                                    }
                                    "error" -> {
                                        showErrorDialog()
                                    }
                                }
                            }, {
                                showErrorDialog()
                            })
                    }
    m
    • 2
    • 3
  • m

    Maciek

    12/11/2020, 4:54 PM
    I've got a class that does some work and returns the completable. But the work must be sequential and synchronized. So if the work takes 10 seconds to complete and some other entity calls
    doWork
    during the work being done for the first entity I want it to be queued and wait for it to be finished. At first, I was trying to figure out some clever internal queue based on subject, but failed. Then I thought that simple semaphore will do the job just fine (but also will block the thread). Do you see any problems with such implementations or can it be achieved with similar simplicity with streams without thread blocking?
    Copy code
    class Foo {
    
    	private val sem = Semaphore(permit = 1)
    
    	fun doWork(): Completable = 
          Completable.fromCallable { sem.acquire() }
            .andThen { doWorkInternal() }
    	    .doOnError { sem.release() }
            .doOnComplete { sem.release() }
    }
    a
    s
    • 3
    • 2
  • g

    Grant Park

    12/19/2020, 5:38 AM
    Does anyone know why 1 out of 3 listeners gets emissions in backwards order given this sample demo of BehaviorSubject? All three listeners should receive
    A and B
    but the third listener gets
    B and A
    and this unit test ultimately fails.
    Copy code
    @Test
        fun `rxjava test`() {
            val eventHistory1 = ArrayList<String>()
            val eventHistory2 = ArrayList<String>()
            val eventHistory3 = ArrayList<String>()
    
            val behaviorSubject = BehaviorSubject.create<String>()
    
            behaviorSubject.subscribe {
                eventHistory1.add(it)
            }
    
            behaviorSubject.subscribe {
                eventHistory2.add(it)
                if (it == "A") behaviorSubject.onNext("B")
            }
    
            behaviorSubject.subscribe {
                eventHistory3.add(it)
            }
    
            behaviorSubject.onNext("A")
    
            println(eventHistory1)
            println(eventHistory2)
            println(eventHistory3)
    
            assert(eventHistory1 == eventHistory2)
            assert(eventHistory2 == eventHistory3)
        }
    👀 1
  • g

    Grant Park

    12/19/2020, 5:39 AM
    I’m guessing doing an
    .onNext()
    inside
    .subscribe
    creates a race condition?
  • g

    Grant Park

    12/19/2020, 5:40 AM
    if you were to run this code, the output is:
    Copy code
    [A, B]
    [A, B]
    [B, A]
  • s

    Sam

    01/20/2021, 7:09 AM
    Good morning everyone, I have to find a way to handle this logic : Step 1: get data from cache -> show on UI Step 2: call APIs and update cache Step 3: update UI with lasted Data -> so which way simplify can do it for updating UI? Thanks.
    r
    s
    • 3
    • 6
  • j

    Jintin

    01/27/2021, 3:52 PM
    hi guys, having a question on whether
    Single.just
    is a hot observable or not. I feel it's not but seems many people think it is. Is there any rule we can follow how to determine it?
    z
    • 2
    • 5
  • j

    Jazz

    02/18/2021, 3:38 PM
    hi guys. I am newbie on RxKotlin. Can I remove queue ?
    Copy code
    subject.subscribe{ 
      when(type) {
          A -> {}
          B -> {}
          C -> {}
      }
    }
    
    subject.onNext(A)
    // queue  [A]
    subject.onNext(B)
    // queue  [A, B]
    subject.onNext(C)
    // queue [C]
    I want to modify subject stream(modify queue) like android mesage queue (add, remove…)
  • s

    samuel

    02/18/2021, 5:49 PM
    is it possible to have an observable that is collected by only one subscriber and the single subscriber should be the one that subscribed last?
  • u

    ursus

    02/21/2021, 7:08 PM
    hmm, switchMap?
  • g

    Grant Park

    03/02/2021, 10:52 PM
    I have an observable that emits Timers. I also have an observable that emits Ints. How can I combine them such that an Int will be emitted followed by the timer? i.e. given an array of ints [1,2,3] and timers [2s, 4s, 3s] 1 (then wait 2 s) 2 (then wait 4 s) 3 (then wait 3 s)
    g
    • 2
    • 4
  • a

    Andy Gibel

    03/26/2021, 6:34 PM
    Given two streams A and B, I want to merge them such that the merge emits items from A only until B emits. At that point it emits B only. How can I do this? Thanks!
    e
    u
    • 3
    • 4
  • a

    Andy Gibel

    03/29/2021, 8:06 PM
    Is there a way to buffer stream outputs for some amount of time and return a list of emissions? Let's say I'm wrapping a callback from an API that calls the callback each time a device is found on the network. I'd like the client code to listen for events for N seconds and then return a SIngle<List<Device>> of elements detected in that time.
    u
    • 2
    • 2
  • m

    Matias Reparaz

    08/20/2021, 5:47 PM
    Hi, I have a
    fun getResources(): Single<Resources>
    and with that I need to do 2 things, 1️⃣ a flatMap with another call
    fun doSomething(r: Resources): Single<OtherThing>
    and 2️⃣ send a metric if some condition occurs
    fun sendMetric(r: Resources): Completable
    . I’m looking for something better than this:
    Copy code
    getResources()
        .doOnSuccess { 
          if (someCondition(it)) sendMetric(it).subscribe()
        }.flatMap { doSomething(it) }
    Note that if
    sendMetrics
    fails it’s not critical and I don’t want to interrupt the
    doSomething
    flow. Is there a more elegant way to do this?
    e
    • 2
    • 5
  • f

    filipradon

    09/03/2021, 2:05 PM
    Hi guys, maybe someone can shed some extra light on this for me. I have such test case:
    Copy code
    @Test fun `completeable trampoline test`() {
            Completable.defer { Completable.complete() }
                .doOnComplete { println("I'm completing!") }
                .andThen(
                    Completable.complete()
                        .timeout(1, TimeUnit.SECONDS, Schedulers.trampoline())
                        .doOnComplete { println("I'm not completing ! :( ") }
                )
                .subscribeOn(Schedulers.trampoline())
                .test()
                .assertComplete()
    
            // timeout exception is thrown
        }
    It's not completing. It's throwing TimeoutError. I'm wondering how this chain is actually executed. I know Trampoline is putting a thread to sleep when used in
    timeout
    for example but my question is why the Completables do not complete (or actually only the second Completable) before that actually happens. (PS. I'm aware that TestSchedulers should be used, it's just for better understanding purposes)
  • s

    Susheel

    09/29/2021, 4:59 PM
    I noticed that Zip.iterable() is missing from Rxjava3. What is the alternative provided?
  • k

    Keith Mayoral

    05/06/2022, 12:05 AM
    Hey all, I’m trying to think of a way to have a RxJava2 operator that is something like
    Single.amb(sourcesList)
    which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to
    amb()
    which errors out immediately if the first source to complete errored out)
    e
    d
    • 3
    • 3
  • d

    Derek Peirce

    04/20/2023, 6:56 PM
    Has there been any effort to add extensions incorporating Kotlin's new
    Duration
    API, for methods like
    delay
    and
    timeout
    ?
  • b

    boyw165

    04/30/2023, 1:56 AM
    Curious if there’s a curated list of articles for mapping RxJava examples to Kotlin Coroutines? My company is debating about migration from RxJava to Coroutines, and from my current knowledge level of Kotlin Coroutines, I’m not fully convinced. I feel it’s apple vs orange. What do you think?
    p
    g
    • 3
    • 11
  • l

    lam bui

    10/29/2024, 7:38 AM
    To simulate an API that returns a
    List<Object>
    , or fail after a 2-second delay using RxKotlin. Additionally, there are
    n
    APIs to subscribe to (like downloading 10 files in a list, each with a download button). You want to subscribe to each file download and track the success or failure status of the entire list download.