- 浏览: 2210631 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (1240)
- mac/IOS (287)
- flutter (1)
- J2EE (115)
- android基础知识 (582)
- android中级知识 (55)
- android组件(Widget)开发 (18)
- android 错误 (21)
- javascript (18)
- linux (70)
- 树莓派 (18)
- gwt/gxt (1)
- 工具(IDE)/包(jar) (18)
- web前端 (17)
- java 算法 (8)
- 其它 (5)
- chrome (7)
- 数据库 (8)
- 经济/金融 (0)
- english (2)
- HTML5 (7)
- 网络安全 (14)
- 设计欣赏/设计窗 (8)
- 汇编/C (8)
- 工具类 (4)
- 游戏 (5)
- 开发频道 (5)
- Android OpenGL (1)
- 科学 (4)
- 运维 (0)
- 好东西 (6)
- 美食 (1)
最新评论
-
liangzai_cool:
请教一下,文中,shell、C、Python三种方式控制led ...
树莓派 - MAX7219 -
jiazimo:
...
Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析 -
hp321:
Windows该命令是不是需要安装什么软件才可以?我试过不行( ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
hp321:
Chenzh_758 写道其实直接用一下代码就可以解决了:JP ...
ImageIO读jpg的时候出现javax.imageio.IIOException: Unsupported Image Type -
huanghonhpeng:
大哥你真强什么都会,研究研究。。。。小弟在这里学到了很多知识。 ...
android 浏览器
背压(backpressure)
数据流发射,处理,响应可能在各自的线程中独立进行,上游在发射数据的时候,不知道下游是否处理完,也不会等下游处理完之后再发射。
这样,如果上游发射的很快而下游处理的很慢,会怎样呢?
将会产生很多下游没来得及处理的数据,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是Rxjava中的背压问题。
例如,运行以下代码:
创建一个可观察对象Obervable在Schedulers.newThread()()的线程中不断发送数据,而观察者Observer在Schedulers.newThread()的另一个线程中每隔5秒接收一条数据,运行后,查看内存使用如下:
由于上下游分别在各自的线程中独立处理数据(如果上下游在同一线程中,下游对数据的处理会堵塞上游数据的发送,上游发送一条数据后会等下游处理完之后再发送下一条),而上游发送数据速度远大于下游接收数据的速度,造成上下游流速不均,导致数据累计,最后引起内存溢出。
Flowable
Flowable是为了解决背压(backpressure)问题,而在Observable的基础上优化后的产物,与Observable不是同一组观察者模式下的成员,Flowable是Publisher与Subscriber这一组观察者模式中Publisher的典型实现,Observable是ObservableSource/Observer这一组观察者模式中ObservableSource的典型实现;
所以在使用Flowable的时候,可观察对象不再是Observable,而是Flowable;观察者不再是Observer,而是Subscriber。Flowable与Subscriber之间依然通过subscribe()进行关联。
有些朋友可能会想,既然Flowable是在Observable的基础上优化后的产物,Observable能解决的问题Flowable都能进行解决,何不抛弃Observable而只用Flowable呢。其实,这是万万不可的,他们各有自己的优势和不足。
由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。
因为只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。
所以,如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用Flowable,以免影响性能。
通过Flowable发射处理数据流的基础代码如下:
执行结果如下:
System.out: 发射----> 1
System.out: 发射----> 2
System.out: 发射----> 3
System.out: 发射----> 完成
System.out: 接收----> 1
System.out: 接收----> 2
System.out: 接收----> 3
System.out: 接收----> 完成
我们发现运行结果与Observerable没有区别,但是的代码中,除了为上下游指定各自的运行线程外,还有三点不同
一、create方法中多了一个BackpressureStrategy类型的参数。
二、onSubscribe回调的参数不是Disposable而是Subscription,多了行代码:
三、Flowable发射数据时,使用的发射器是FlowableEmitter而不是ObservableEmitter
BackpressureStrategy背压策略
在Flowable的基础创建方法create中多了一个BackpressureStrategy类型的参数,
BackpressureStrategy是个枚举,源码如下:
其作用是什么呢?
Flowable的异步缓存池不同于Observable,Observable的异步缓存池没有大小限制,可以无限制向里添加数据,直至OOM,而Flowable的异步缓存池有个固定容量,其大小为128。
BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池存储数据的策略。
ERROR
在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。
运行如下代码:
Flowable发射129条数据,Subscriber在睡10秒之后再开始接收,运行后会发现控制台打印如下异常:
W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
W/System.err: at net.fbi.rxjava2.RxJava2Demo$6.subscribe(RxJava2Demo.java:103)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
W/System.err: at io.reactivex.Flowable.subscribe(Flowable.java:12218)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:237)
W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
W/System.err: at java.lang.Thread.run(Thread.java:818)
如果将Flowable发射数据的条数改为128,则不会出现此异常。
DROP
在此策略下,如果Flowable的异步缓存池满了,会丢掉将要放入缓存池中的数据。
运行如下代码:
在上面代码中通过创建Flowable发射500条数据,每隔100毫秒发射一次,并记录开始发射和结束发射的时间,下游每隔300毫秒接收一次数据,运行后,控制台打印日志如下:
通过日志
我们可以发现Subscriber在接收完第128条数据后,再次接收的时候已经到了288,而这之间的60条数据正是因为缓存池满了而被丢弃掉了。
那么问题来了,当Flowable在发射第129条数据的时候,Subscriber已经接收了42条数据了,第129条数据为什么没有放入缓存池中呢?日志如下:
那是因为缓存池中数据的清理,并不是Subscriber接收一条,便清理一条,而是每累积到95条清理一次。也就是Subscriber接收到第96条数据时,缓存池才开始清理数据,之后Flowable发射的数据才得以放入。
查看日志可以发现,Subscriber接收到第96条数据后,Flowable发射第288条数据。而第128到288之间的数据,正好处于缓存池存满的状态,而被丢弃,所以Subscriber在接收完第128条数据之后,接收到的是第288条数据,而不是第129条。
LATEST
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
将上述代码中的DROP策略改为LATEST:
运行后日志对比如下:
DROP:
LATEST:
latest策略下Subscriber在接收完成之前,接收的数据是Flowable发射的最后一条数据,而Drop策略下不是。
BUFFER
此策略下,Flowable的异步缓存池同Observable的一样,没有固定大小,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。
运行如下代码:
查看内存使用:
会发现和使用Observalbe时一样,都会导致内存剧增,最后导致OOM,不同的是使用Flowable内存增长的速度要慢得多,那是因为基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。
MISSING
此策略表示,通过Create方法创建的Flowable没有指定背压策略,不会对通过OnNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
onBackpressureXXX背压操作符
Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST
例如代码
等同于,代码:
Subscription
Subscription与Disposable均是观察者与可观察对象建立订阅状态后回调回来的参数,如同通过Disposable的dispose()方法可以取消Observer与Oberverable的订阅关系一样,通过Subscription的cancel()方法也可以取消Subscriber与Flowable的订阅关系。
不同的是接口Subscription中多了一个方法request(long n),如上面代码中的:
此方法的作用是什么呢,去掉这个方法会有什么影响呢?
运行如下代码:
运行结果如下:
System.out: 发射----> 1
System.out: 发射----> 2
System.out: 发射----> 3
System.out: 发射----> 完成
我们发现Flowable照常发送数据,而Subsriber不再接收数据。
这是因为Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。
如果不显示调用request则默认下游的需求量为零,所以运行上面的代码后,上游Flowable发射的数据不会交给下游Subscriber处理。
运行如下代码:
运行结果如下:
我们发现通过s.request(2);设置Subscriber的数据请求量为2条,超出其请求范围之外的数据则没有接收。
多次调用request会产生怎样的结果呢?
运行如下代码:
通过Flowable发射10条数据,在onSubscribe(Subscription s) 方法中调用两次request,运行结果如下:
我们发现Subscriber总共接收了7条数据,是两次需求累加后的数量。
通过日志我们发现,上游并没有根据下游的实际需求,发送数据,而是能发送多少,就发送多少,不管下游是否需要。
而且超出下游需求之外的数据,仍然放到了异步缓存池中。这点我们可以通过以下代码来验证:
通过Flowable发射130条数据,通过s.request(1)设置下游的数据请求量为1条,设置缓存策略为BackpressureStrategy.ERROR,如果异步缓存池超限,会导致MissingBackpressureException异常。
运行之后,日志如下:
久违的异常出现了,所以超出下游需求之外的数据,仍然放到了异步缓存池中,并导致缓存池溢出。
那么上游如何才能按照下游的请求数量发送数据呢,
虽然通过request可以设置下游的请求数量,但是上游并没有获取到这个数量,如何获取呢?
这便需要用到Flowable与Observable的第三点区别,Flowable特有的发射器FlowableEmitter
FlowableEmitter
flowable的发射器FlowableEmitter与observable的发射器ObservableEmitter均继承自Emitter(Emitter在教程二中已经说过了)
比较两者源码可以发现;
与
接口FlowableEmitter中多了一个方法
我们可以通过这个方法来获取当前未完成的请求数量,
运行下面的代码,这次我们要先丧失一下原则,虽然我们之前说过同步状态下不使用Flowable,但是这次我们需要先看一下同步状态下情况。
打印日志如下:
通过日志我们发现, 通过e.requested()获取到的是一个动态的值,会随着下游已经接收的数据的数量而递减。
在上面的代码中,我们没有指定上下游的线程,上下游运行在同一线程中。
这与我们之前提到的,同步状态下不使用Flowable相违背。那是因为异步情况下e.requested()的值太复杂,必须通过同步情况过渡一下才能说得明白。
我们在上面代码的基础上,给上下游指定独立的线程,代码如下
运行后日志如下:
虽然我们指定了下游的数据请求量为3,但是我们在上游获取未完成请求数量的时候,并不是3,而是128。难道上游有个最小未完成请求数量?只要下游设置的数据请求量小于128,上游获取到的都是128?
带着这个疑问,我们试一下当下游的数据请求量为500,大于128时的情况。
运行日志如下;
结果还是128.
其实不论下游通过s.request();设置多少请求量,我们在上游获取到的初始未完成请求数量都是128。
这是为啥呢?
还记得之前我们说过,Flowable有一个异步缓存池,上游发射的数据,先放到异步缓存池中,再由异步缓存池交给下游。所以上游在发射数据时,首先需要考虑的不是下游的数据请求量,而是缓存池中能不能放得下,否则在缓存池满的情况下依然会导致数据遗失或者背压异常。如果缓存池可以放得下,那就发送,至于是否超出了下游的数据需求量,可以在缓存池向下游传递数据时,再作判断,如果未超出,则将缓存池中的数据传递给下游,如果超出了,则不传递。
如果下游对数据的需求量超过缓存池的大小,而上游能获取到的最大需求量是128,上游对超出128的需求量是怎么获取到的呢?
带着这个疑问,我们运行一下,下面的代码,上游发送150个数据,下游也需要150个数据。
截取部分日志如下:
我们发现通过e.requested()获取到的上游当前未完成请求数量并不是一直递减的,在递减到33时,又回升到了128.而回升的时机正好是在下游接收了96条数据之后。我们之前说过,异步缓存池中的数据并不是向下游发射一条便清理一条,而是每等累积到95条时,清理一次。通过e.requested()获取到的值,正是在异步缓存池清理数据时,回升的。也就是,异步缓存池每次清理后,有剩余的空间时,都会导致上游未完成请求数量的回升,这样既不会引发背压异常,也不会导致数据遗失。
上游在发送数据的时候并不需要考虑下游需不需要,而只需要考虑异步缓存池中是否放得下,放得下便发,放不下便暂停。所以,通过e.requested()获取到的值,并不是下游真正的数据请求数量,而是异步缓存池中可放入数据的数量。数据放入缓存池中后,再由缓存池按照下游的数据请求量向下传递,待到传递完的数据累积到95条之后,将其清除,腾出空间存放新的数据。如果下游处理数据缓慢,则缓存池向下游传递数据的速度也相应变慢,进而没有传递完的数据可清除,也就没有足够的空间存放新的数据,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。
所以上游只需要在e.requested()等于0时,暂停发射数据,便可解决背压问题。
最终方案
下面我们回到最初的问题
运行下面代码:
由于下游处理数据的速度(Thread.sleep(50))赶不上上游发射数据的速度,则会导致背压问题。
运行后查看内存使用如下:
内存暴增,很快就会OOM
下面,对其通过Flowable做些改进,让其既不会产生背压问题,也不会引起异常或者数据丢失。
代码如下:
下游处理数据的速度Thread.sleep(50)赶不上上游发射数据的速度,
不同的是,我们在下游onNext(Integer integer) 方法中,每接收一条数据增加一条请求量,
在上游添加代码
让上游按需发送数据。
运行后查看内存:
内存一直相当的平静,而且上游严格按照下游的需求量发送数据,不会产生MissingBackpressureException异常,或者丢失数据。
数据流发射,处理,响应可能在各自的线程中独立进行,上游在发射数据的时候,不知道下游是否处理完,也不会等下游处理完之后再发射。
这样,如果上游发射的很快而下游处理的很慢,会怎样呢?
将会产生很多下游没来得及处理的数据,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是Rxjava中的背压问题。
例如,运行以下代码:
创建一个可观察对象Obervable在Schedulers.newThread()()的线程中不断发送数据,而观察者Observer在Schedulers.newThread()的另一个线程中每隔5秒接收一条数据,运行后,查看内存使用如下:
由于上下游分别在各自的线程中独立处理数据(如果上下游在同一线程中,下游对数据的处理会堵塞上游数据的发送,上游发送一条数据后会等下游处理完之后再发送下一条),而上游发送数据速度远大于下游接收数据的速度,造成上下游流速不均,导致数据累计,最后引起内存溢出。
Flowable
Flowable是为了解决背压(backpressure)问题,而在Observable的基础上优化后的产物,与Observable不是同一组观察者模式下的成员,Flowable是Publisher与Subscriber这一组观察者模式中Publisher的典型实现,Observable是ObservableSource/Observer这一组观察者模式中ObservableSource的典型实现;
所以在使用Flowable的时候,可观察对象不再是Observable,而是Flowable;观察者不再是Observer,而是Subscriber。Flowable与Subscriber之间依然通过subscribe()进行关联。
有些朋友可能会想,既然Flowable是在Observable的基础上优化后的产物,Observable能解决的问题Flowable都能进行解决,何不抛弃Observable而只用Flowable呢。其实,这是万万不可的,他们各有自己的优势和不足。
由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。
因为只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。
所以,如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用Flowable,以免影响性能。
通过Flowable发射处理数据流的基础代码如下:
执行结果如下:
引用
System.out: 发射----> 1
System.out: 发射----> 2
System.out: 发射----> 3
System.out: 发射----> 完成
System.out: 接收----> 1
System.out: 接收----> 2
System.out: 接收----> 3
System.out: 接收----> 完成
我们发现运行结果与Observerable没有区别,但是的代码中,除了为上下游指定各自的运行线程外,还有三点不同
一、create方法中多了一个BackpressureStrategy类型的参数。
二、onSubscribe回调的参数不是Disposable而是Subscription,多了行代码:
s.request(Long.MAX_VALUE);
三、Flowable发射数据时,使用的发射器是FlowableEmitter而不是ObservableEmitter
BackpressureStrategy背压策略
在Flowable的基础创建方法create中多了一个BackpressureStrategy类型的参数,
BackpressureStrategy是个枚举,源码如下:
public enum BackpressureStrategy { ERROR,BUFFER,DROP,LATEST,MISSING }
其作用是什么呢?
Flowable的异步缓存池不同于Observable,Observable的异步缓存池没有大小限制,可以无限制向里添加数据,直至OOM,而Flowable的异步缓存池有个固定容量,其大小为128。
BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池存储数据的策略。
ERROR
在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。
运行如下代码:
Flowable发射129条数据,Subscriber在睡10秒之后再开始接收,运行后会发现控制台打印如下异常:
引用
W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
W/System.err: at net.fbi.rxjava2.RxJava2Demo$6.subscribe(RxJava2Demo.java:103)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
W/System.err: at io.reactivex.Flowable.subscribe(Flowable.java:12218)
W/System.err: at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
W/System.err: at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
W/System.err: at java.util.concurrent.FutureTask.run(FutureTask.java:237)
W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
W/System.err: at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
W/System.err: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
W/System.err: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
W/System.err: at java.lang.Thread.run(Thread.java:818)
如果将Flowable发射数据的条数改为128,则不会出现此异常。
DROP
在此策略下,如果Flowable的异步缓存池满了,会丢掉将要放入缓存池中的数据。
运行如下代码:
在上面代码中通过创建Flowable发射500条数据,每隔100毫秒发射一次,并记录开始发射和结束发射的时间,下游每隔300毫秒接收一次数据,运行后,控制台打印日志如下:
通过日志
我们可以发现Subscriber在接收完第128条数据后,再次接收的时候已经到了288,而这之间的60条数据正是因为缓存池满了而被丢弃掉了。
那么问题来了,当Flowable在发射第129条数据的时候,Subscriber已经接收了42条数据了,第129条数据为什么没有放入缓存池中呢?日志如下:
那是因为缓存池中数据的清理,并不是Subscriber接收一条,便清理一条,而是每累积到95条清理一次。也就是Subscriber接收到第96条数据时,缓存池才开始清理数据,之后Flowable发射的数据才得以放入。
查看日志可以发现,Subscriber接收到第96条数据后,Flowable发射第288条数据。而第128到288之间的数据,正好处于缓存池存满的状态,而被丢弃,所以Subscriber在接收完第128条数据之后,接收到的是第288条数据,而不是第129条。
LATEST
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
将上述代码中的DROP策略改为LATEST:
运行后日志对比如下:
DROP:
LATEST:
latest策略下Subscriber在接收完成之前,接收的数据是Flowable发射的最后一条数据,而Drop策略下不是。
BUFFER
此策略下,Flowable的异步缓存池同Observable的一样,没有固定大小,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。
运行如下代码:
查看内存使用:
会发现和使用Observalbe时一样,都会导致内存剧增,最后导致OOM,不同的是使用Flowable内存增长的速度要慢得多,那是因为基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。
MISSING
此策略表示,通过Create方法创建的Flowable没有指定背压策略,不会对通过OnNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
onBackpressureXXX背压操作符
Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST
例如代码
等同于,代码:
Subscription
Subscription与Disposable均是观察者与可观察对象建立订阅状态后回调回来的参数,如同通过Disposable的dispose()方法可以取消Observer与Oberverable的订阅关系一样,通过Subscription的cancel()方法也可以取消Subscriber与Flowable的订阅关系。
不同的是接口Subscription中多了一个方法request(long n),如上面代码中的:
s.request(Long.MAX_VALUE);
此方法的作用是什么呢,去掉这个方法会有什么影响呢?
运行如下代码:
运行结果如下:
引用
System.out: 发射----> 1
System.out: 发射----> 2
System.out: 发射----> 3
System.out: 发射----> 完成
我们发现Flowable照常发送数据,而Subsriber不再接收数据。
这是因为Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。
如果不显示调用request则默认下游的需求量为零,所以运行上面的代码后,上游Flowable发射的数据不会交给下游Subscriber处理。
运行如下代码:
运行结果如下:
System.out: 发射----> 1 System.out: 发射----> 2 System.out: 发射----> 3 System.out: 发射----> 完成 System.out: 接收----> 1 System.out: 接收----> 2
我们发现通过s.request(2);设置Subscriber的数据请求量为2条,超出其请求范围之外的数据则没有接收。
多次调用request会产生怎样的结果呢?
运行如下代码:
通过Flowable发射10条数据,在onSubscribe(Subscription s) 方法中调用两次request,运行结果如下:
我们发现Subscriber总共接收了7条数据,是两次需求累加后的数量。
通过日志我们发现,上游并没有根据下游的实际需求,发送数据,而是能发送多少,就发送多少,不管下游是否需要。
而且超出下游需求之外的数据,仍然放到了异步缓存池中。这点我们可以通过以下代码来验证:
通过Flowable发射130条数据,通过s.request(1)设置下游的数据请求量为1条,设置缓存策略为BackpressureStrategy.ERROR,如果异步缓存池超限,会导致MissingBackpressureException异常。
运行之后,日志如下:
久违的异常出现了,所以超出下游需求之外的数据,仍然放到了异步缓存池中,并导致缓存池溢出。
那么上游如何才能按照下游的请求数量发送数据呢,
虽然通过request可以设置下游的请求数量,但是上游并没有获取到这个数量,如何获取呢?
这便需要用到Flowable与Observable的第三点区别,Flowable特有的发射器FlowableEmitter
FlowableEmitter
flowable的发射器FlowableEmitter与observable的发射器ObservableEmitter均继承自Emitter(Emitter在教程二中已经说过了)
比较两者源码可以发现;
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(Disposable d); void setCancellable(Cancellable c); boolean isDisposed(); ObservableEmitter<T> serialize(); }
与
public interface FlowableEmitter<T> extends Emitter<T> { void setDisposable(Disposable s); void setCancellable(Cancellable c); long requested(); boolean isCancelled(); FlowableEmitter<T> serialize(); }
接口FlowableEmitter中多了一个方法
long requested();
我们可以通过这个方法来获取当前未完成的请求数量,
运行下面的代码,这次我们要先丧失一下原则,虽然我们之前说过同步状态下不使用Flowable,但是这次我们需要先看一下同步状态下情况。
打印日志如下:
通过日志我们发现, 通过e.requested()获取到的是一个动态的值,会随着下游已经接收的数据的数量而递减。
在上面的代码中,我们没有指定上下游的线程,上下游运行在同一线程中。
这与我们之前提到的,同步状态下不使用Flowable相违背。那是因为异步情况下e.requested()的值太复杂,必须通过同步情况过渡一下才能说得明白。
我们在上面代码的基础上,给上下游指定独立的线程,代码如下
运行后日志如下:
虽然我们指定了下游的数据请求量为3,但是我们在上游获取未完成请求数量的时候,并不是3,而是128。难道上游有个最小未完成请求数量?只要下游设置的数据请求量小于128,上游获取到的都是128?
带着这个疑问,我们试一下当下游的数据请求量为500,大于128时的情况。
运行日志如下;
结果还是128.
其实不论下游通过s.request();设置多少请求量,我们在上游获取到的初始未完成请求数量都是128。
这是为啥呢?
还记得之前我们说过,Flowable有一个异步缓存池,上游发射的数据,先放到异步缓存池中,再由异步缓存池交给下游。所以上游在发射数据时,首先需要考虑的不是下游的数据请求量,而是缓存池中能不能放得下,否则在缓存池满的情况下依然会导致数据遗失或者背压异常。如果缓存池可以放得下,那就发送,至于是否超出了下游的数据需求量,可以在缓存池向下游传递数据时,再作判断,如果未超出,则将缓存池中的数据传递给下游,如果超出了,则不传递。
如果下游对数据的需求量超过缓存池的大小,而上游能获取到的最大需求量是128,上游对超出128的需求量是怎么获取到的呢?
带着这个疑问,我们运行一下,下面的代码,上游发送150个数据,下游也需要150个数据。
截取部分日志如下:
我们发现通过e.requested()获取到的上游当前未完成请求数量并不是一直递减的,在递减到33时,又回升到了128.而回升的时机正好是在下游接收了96条数据之后。我们之前说过,异步缓存池中的数据并不是向下游发射一条便清理一条,而是每等累积到95条时,清理一次。通过e.requested()获取到的值,正是在异步缓存池清理数据时,回升的。也就是,异步缓存池每次清理后,有剩余的空间时,都会导致上游未完成请求数量的回升,这样既不会引发背压异常,也不会导致数据遗失。
上游在发送数据的时候并不需要考虑下游需不需要,而只需要考虑异步缓存池中是否放得下,放得下便发,放不下便暂停。所以,通过e.requested()获取到的值,并不是下游真正的数据请求数量,而是异步缓存池中可放入数据的数量。数据放入缓存池中后,再由缓存池按照下游的数据请求量向下传递,待到传递完的数据累积到95条之后,将其清除,腾出空间存放新的数据。如果下游处理数据缓慢,则缓存池向下游传递数据的速度也相应变慢,进而没有传递完的数据可清除,也就没有足够的空间存放新的数据,上游通过e.requested()获取的值也就变成了0,如果此时,再发送数据的话,则会根据BackpressureStrategy背压策略的不同,抛出MissingBackpressureException异常,或者丢掉这条数据。
所以上游只需要在e.requested()等于0时,暂停发射数据,便可解决背压问题。
最终方案
下面我们回到最初的问题
运行下面代码:
由于下游处理数据的速度(Thread.sleep(50))赶不上上游发射数据的速度,则会导致背压问题。
运行后查看内存使用如下:
内存暴增,很快就会OOM
下面,对其通过Flowable做些改进,让其既不会产生背压问题,也不会引起异常或者数据丢失。
代码如下:
下游处理数据的速度Thread.sleep(50)赶不上上游发射数据的速度,
不同的是,我们在下游onNext(Integer integer) 方法中,每接收一条数据增加一条请求量,
mSubscription.request(1)
在上游添加代码
if(e.requested()==0)continue;
让上游按需发送数据。
运行后查看内存:
内存一直相当的平静,而且上游严格按照下游的需求量发送数据,不会产生MissingBackpressureException异常,或者丢失数据。
发表评论
-
带你深入理解 FLUTTER 中的字体“冷”知识
2020-08-10 23:40 649本篇将带你深入理解 Flutter 开发过程中关于字体和文 ... -
Flutter -自定义日历组件
2020-03-01 17:56 1122颜色文件和屏幕适配的文件 可以自己给定 import ... -
Dart高级(一)——泛型与Json To Bean
2020-02-23 19:13 1016从 Flutter 发布到现在, 越来越多人开始尝试使用 Da ... -
flutter loading、Progress进度条
2020-02-21 17:03 1193Flutter Progress 1 条形无固定值进度条 ... -
Flutter使用Https加载图片
2020-02-21 01:39 1037Flutter使用Https加载图片 使用http加载图片出 ... -
flutter shared_preferences 异步变同步
2020-02-21 00:55 856前言 引用 在开发原生iOS或Native应用时,一般有判断上 ... -
Flutter TextField边框颜色
2020-02-19 21:31 948监听要销毁 myController.dispose(); T ... -
flutter Future的正确用法
2020-02-18 21:55 812在flutter中经常会用到异步任务,dart中异步任务异步处 ... -
记一次Flutter简单粗暴处理HTTPS证书检验方法
2020-02-18 14:13 992最近在做Flutter项目到了遇到一个无解的事情,当使用Ima ... -
flutter 获取屏幕宽度高度 通知栏高度等屏幕信息
2019-07-27 08:39 1356##MediaQuery MediaQuery.of(con ... -
关于flutter RefreshIndicator扩展listview下拉刷新的问题
2019-07-10 19:40 1149当条目过少时listview某些嵌套情况下可能不会滚动(条目 ... -
flutter listview 改变状态的时候一直无限添加
2019-07-10 16:01 807setstate的时候会一直无限的调用listview.bui ... -
Flutter Android端启动白屏问题的解决
2019-07-09 00:51 1538问题描述 Flutter 应用在 Android 端上启动时 ... -
Flutter中SnackBar使用
2019-07-08 23:43 792底部弹出,然后在指定时间后消失。 注意: build(Bui ... -
Flutter 之点击空白区域收起键盘
2019-07-08 18:43 1799点击空白处取消TextField焦点这个需求是非常简单的,在学 ... -
Flutter 弹窗 Dialog ,AlertDialog,IOS风格
2019-07-08 18:04 1393import 'package:flutter/mate ... -
flutter ---TextField 之 输入类型、长度限制
2019-07-08 14:30 2353TextField想要实现输入类型、长度限制需要先引入impo ... -
【flutter 溢出BUG】键盘上显示bottom overflowed by 104 PIXELS
2019-07-08 11:13 1588一开始直接使用Scaffold布局,body:new Colu ... -
解决Flutter项目卡在Initializing gradle...界面的问题
2019-07-07 12:53 891Flutter最近很火,我抽出了一点时间对Flutter进行了 ... -
关于android O 上 NotificationChannel 的一些注意事项
2019-07-04 11:47 947最近在适配android O,遇到个问题,应用中原本有设置界面 ...
相关推荐
Rxbus基于RxJava2.0,Flowable替换Observeable解决背压,使用简便
使用所需的任何序列化库(内核支持Flowable<ByteBuffer> ) 支持普通的HTTP / HTTPS服务(WebSockets没有防火墙问题) 使用Servlet 3.1+异步处理(默认情况下) 入门 Maven依赖 < groupId>...
本文将详细讲解如何利用RxJava2的Flowable结合SQLite在Android上进行数据库操作。 首先,RxJava2的Flowable是用于处理背压的Observable类型,适用于数据产生速度快于消费速度的场景。它提供了BackpressureStrategy...
Flowable支持背压策略配置,如Buffer、Drop、Error等。 七、错误处理 RxJava提供了异常处理机制,通过onErrorResumeNext()或onErrorReturn()等操作符,可以在出现错误时继续执行其他操作,或者返回默认值。 八、...
本文主要介绍了关于RxJava 2.x新特性的相关资料,下面话不多说,来看看详细的介绍吧。 背压的分离 Flowable/Subscriber Flowable.range(0,10) .subscribe(new Subscriber() { Subscription sub; //当订阅后,会...
Flowable支持背压,适用于大数据流,而Observable不支持,适用于小数据量或者不需要背压的情况。 6. **生命周期管理**:RxJava2引入了`Disposable`接口,用于取消订阅和资源清理,取代了RxJava1中的`Subscription`...
- **背压支持**: `Flowable` 类实现了背压策略,避免了数据生产速度超过消费速度导致的问题。 学习 RxJava 有助于提升开发效率,特别是当配合网络库如 Retrofit 和 OkHttp 使用时,可以简化异步请求和数据处理。...
- io.reactivex.Flowable:支持0到N个元素的事件源,遵循Reactive-Streams规范,并支持背压(backpressure)。 - io.reactivex.Observable:同样支持0到N个元素的事件源,但不支持背压。 - io.reactivex.Single:仅...
在Android开发中,网络请求是应用与服务器交互的重要部分,而OkHttp3、Retrofit2和RxJava则是目前广泛使用的三大利器,它们结合使用可以构建高效、简洁的网络请求架构。下面将详细介绍这三个库以及如何将它们整合...
10. **Flowable**:对于大数据流或背压需求,RxJava 2.x引入了Flowable,它是处理背压的Observables,适用于大量数据传输场景。 在RxJavaEssentials-master这个压缩包中,你将找到各种示例代码,涵盖了上述知识点,...
Flowable 通过支持背压机制来避免此类问题的发生。 #### 特殊事件 除了普通的 **onNext()** 事件外,RxJava 定义了三个特殊的事件:**onComplete()**、**onError()** 和 **onSubscribe()**。 - **onComplete()**:...
- **Flowable**:在 RxJava2 中引入了 Flowable 类型,用于处理背压,适用于大数据量流。 - **生命周期管理**:配合 Android 的 Lifecycle 库,可以自动管理 Observable 的订阅生命周期,防止内存泄漏。 - **单例...
在 Flowable 类中,RxJava 提供了对背压的支持,这对于处理大量数据或者无限数据流尤其重要。Flowable 是 RxJava 2.x 版本引入的,用于处理可能需要背压的情况。 描述中提到的 "RxJava 操作符图片" 可能是指一系列...
RxJava2引入了Flowable,用于处理大量数据或背压问题,使得在Android平台上的内存管理和性能优化更加得心应手。通过组合、过滤、延迟执行等操作符,开发者可以创建出复杂的异步逻辑,简化并发编程。 **Glide**: ...
8. **背压策略**:在高频率数据流中,需要考虑背压问题,`Flowable`类提供了解决方案,它支持背压策略,如`onBackpressureBuffer()`或`onBackpressureLatest()`。 在"RxjavaDemo-master"这个项目中,开发者可能展示...
其次,**新增了 Flowable 类型以支持背压(backpressure)**。在 RxJava 1 中,`Observable` 类型不支持背压,可能导致 `MissingBackpressureException`。在 RxJava 2 中,为了解决这个问题,引入了 `Flowable` 类,...
RxJavaRetrofitAdapter是一个针对RXJava 2到RXJava 3转换的适配器库,主要目的是帮助开发者在他们的应用程序中平滑地过渡到RXJava 3,而无需对现有基于RXJava 2的Retrofit接口进行大规模的重构。这个开源项目解决了...
- RxJava2.x 引入了背压机制,用于处理下游消费速度跟不上上游生产速度的问题,可以通过 `Flowable` 和 `BackpressureStrategy` 来实现。 6. **生命周期管理** - 在Android环境中,需要关注Activity或Fragment的...