Flowables In RxJAva And When To Use It

Posted By : Akash Tomer | 30-Apr-2018

To comprehend Flowables, we have to comprehend Observables first. Observables are those substances which we watch for any occasion. Observables are utilized when we have moderately couple of things over the time and there is no danger of overflooding shoppers. In the event that there is a probability that the buyer can be overflooded, at that point we utilize Flowable. One illustration could be getting a tremendous measure of the information from a sensor. They commonly push out information at a high rate. In the past rendition of RxJava, this overflooding could be anticipated by applying back weight. Be that as it may, in RxJava 2, the improvement group has isolated these two sorts of makers into two substances. i.e. Detectable and Flowable. As per documentation: use it?

We should comprehend the utilization of Flowable utilizing another illustration. Assume you have a source that is producing information things at a rate of 1 Million things/second. The subsequent stage is to influence system to ask for on everything. Assume the gadget can deal with 100 system demands/second. Do you see the issue? The second step is the bottleneck since gadget can deal with at most 100 solicitations/second thus the enormous measure of the information from stage 1 will cause OOM(Out Of Me

val observableData = PublishSubject.create<Int>()
  observable.observeOn(Schedulers.computation())
            .subscribeBy (
                onNext ={
                  println("number: ${it}")
                },onError = {t->
                  print(t.message)
                }
            )
    for (i in 0..1000000){
        observableData.onNext(i)
    }

In these situations, we require backpressuring , which in straightforward words is only an approach to deal with the things that can't be prepared. In the beneath code, we will deal with the case utilizing Flowable

val observableData = PublishSubject.create<Int>()
    observable
            .toFlowable(BackpressureStrategy.MISSING)
            .observeOn(Schedulers.computation())
            .subscribeBy (
                onNext ={
                    println("number: ${it}")
                },onError = {t->
                print(t.message)
            }
            )
    for (i in 0..1000000){
        observableData.onNext(i)
    }

In the event that you run the above code, you'll see the yield:

Queue is full

This is on account of we haven't indicated any BackpressureStrategy, so it falls back to default which fundamentally cradles upto 128 things in the line. Consequently the yield Queue is full.

 

Hope this will help you.

About Author

Author Image
Akash Tomer

Akash is an Android Developer at Oodles Technology.

Request for Proposal

Name is required

Comment is required

Sending message..