Gabe Kauffman
11/18/2020, 11:44 PMpublishSubject.observeOn(Schedulers.computation()).onNext(AudioProcessingHelper.calculateAudioVolume(audioSample))
Gabe Kauffman
11/18/2020, 11:45 PMval publishSubject = PublishSubject.create<Double>().toSerialized().observeOn(Schedulers.computation())
Gabe Kauffman
11/18/2020, 11:45 PMSami Eljabali
12/02/2020, 4:53 AMiex
12/02/2020, 2:41 PMSingle
with success/error state. Thinking about how to implement this. Something like
val successObservable: Observable<SomeType>
val errorObservable: Observable<AnotherType> // AnotherType can be mapped into a Throwable
request.flatMap {
// First event of successObservable OR errorObservable. successObservable -> just(event), errorObservable -> error(error)
}
Not sure what operators to use here, zip
with take(1)
and startsWith(none)
crossed my mind but it doesn't feel right...oday
12/03/2020, 9:47 PModay
12/08/2020, 10:31 AMpurchaseFlowStore
.getTransaction(transactionId)
.takeUntil { it.state == "done" || it.state == "error" || it.state == "authorized" }
.doOnError {
logger.debug("TransactionInfo error! $it")
}
.repeatWhen { Observable.timer(5, TimeUnit.SECONDS) }
.viewModelSubscription({
logger.debug("TransactionInfo is ${it.state}")
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
ShowRequestFailedDialog()
}
}
}, {
logger.debug("TransactionInfo error! $it")
})
oday
12/08/2020, 10:31 AModay
12/08/2020, 10:31 AModay
12/08/2020, 10:31 AMstate
is definitely still not when of those 3 options in takeUntiloday
12/08/2020, 12:29 PMObservable.interval(2000, TimeUnit.MILLISECONDS)
.repeatUntil {
System.currentTimeMillis() - startTime > 10000
true
}
.subscribe {
purchaseFlowStore
.getTransaction(transactionId)
.viewModelSubscription({
when (it.state) {
"done", "authorized" -> {
getOrder()
}
"error" -> {
showErrorDialog()
}
}
}, {
showErrorDialog()
})
}
Maciek
12/11/2020, 4:54 PMdoWork
during the work being done for the first entity I want it to be queued and wait for it to be finished. At first, I was trying to figure out some clever internal queue based on subject, but failed. Then I thought that simple semaphore will do the job just fine (but also will block the thread). Do you see any problems with such implementations or can it be achieved with similar simplicity with streams without thread blocking?
class Foo {
private val sem = Semaphore(permit = 1)
fun doWork(): Completable =
Completable.fromCallable { sem.acquire() }
.andThen { doWorkInternal() }
.doOnError { sem.release() }
.doOnComplete { sem.release() }
}
Grant Park
12/19/2020, 5:38 AMA and B
but the third listener gets B and A
and this unit test ultimately fails.
@Test
fun `rxjava test`() {
val eventHistory1 = ArrayList<String>()
val eventHistory2 = ArrayList<String>()
val eventHistory3 = ArrayList<String>()
val behaviorSubject = BehaviorSubject.create<String>()
behaviorSubject.subscribe {
eventHistory1.add(it)
}
behaviorSubject.subscribe {
eventHistory2.add(it)
if (it == "A") behaviorSubject.onNext("B")
}
behaviorSubject.subscribe {
eventHistory3.add(it)
}
behaviorSubject.onNext("A")
println(eventHistory1)
println(eventHistory2)
println(eventHistory3)
assert(eventHistory1 == eventHistory2)
assert(eventHistory2 == eventHistory3)
}
Grant Park
12/19/2020, 5:39 AM.onNext()
inside .subscribe
creates a race condition?Grant Park
12/19/2020, 5:40 AM[A, B]
[A, B]
[B, A]
Sam
01/20/2021, 7:09 AMJintin
01/27/2021, 3:52 PMSingle.just
is a hot observable or not.
I feel it's not but seems many people think it is. Is there any rule we can follow how to determine it?Jazz
02/18/2021, 3:38 PMsubject.subscribe{
when(type) {
A -> {}
B -> {}
C -> {}
}
}
subject.onNext(A)
// queue [A]
subject.onNext(B)
// queue [A, B]
subject.onNext(C)
// queue [C]
I want to modify subject stream(modify queue) like android mesage queue (add, remove…)samuel
02/18/2021, 5:49 PMursus
02/21/2021, 7:08 PMGrant Park
03/02/2021, 10:52 PMAndy Gibel
03/26/2021, 6:34 PMAndy Gibel
03/29/2021, 8:06 PMMatias Reparaz
08/20/2021, 5:47 PMfun getResources(): Single<Resources>
and with that I need to do 2 things, 1️⃣ a flatMap with another call fun doSomething(r: Resources): Single<OtherThing>
and 2️⃣ send a metric if some condition occurs fun sendMetric(r: Resources): Completable
.
I’m looking for something better than this:
getResources()
.doOnSuccess {
if (someCondition(it)) sendMetric(it).subscribe()
}.flatMap { doSomething(it) }
Note that if sendMetrics
fails it’s not critical and I don’t want to interrupt the doSomething
flow. Is there a more elegant way to do this?filipradon
09/03/2021, 2:05 PM@Test fun `completeable trampoline test`() {
Completable.defer { Completable.complete() }
.doOnComplete { println("I'm completing!") }
.andThen(
Completable.complete()
.timeout(1, TimeUnit.SECONDS, Schedulers.trampoline())
.doOnComplete { println("I'm not completing ! :( ") }
)
.subscribeOn(Schedulers.trampoline())
.test()
.assertComplete()
// timeout exception is thrown
}
It's not completing. It's throwing TimeoutError. I'm wondering how this chain is actually executed. I know Trampoline is putting a thread to sleep when used in timeout
for example but my question is
why the Completables do not complete (or actually only the second Completable) before that actually happens.
(PS. I'm aware that TestSchedulers should be used, it's just for better understanding purposes)Susheel
09/29/2021, 4:59 PMKeith Mayoral
05/06/2022, 12:05 AMSingle.amb(sourcesList)
which behaves just like amb when any input Single source succeeds but only errors out after all input sources have failed (as opposed to amb()
which errors out immediately if the first source to complete errored out)Derek Peirce
04/20/2023, 6:56 PMDuration
API, for methods like delay
and timeout
?boyw165
04/30/2023, 1:56 AMlam bui
10/29/2024, 7:38 AMList<Object>
, or fail after a 2-second delay using RxKotlin. Additionally, there are n
APIs to subscribe to (like downloading 10 files in a list, each with a download button). You want to subscribe to each file download and track the success or failure status of the entire list download.