英文链接:What’s different in 2.0
RxJava2相比RxJava1,它的改动还是很大的:
Observable and Flowable
在前一个版本里backpressure被集成到了Observable中,官方也提供了很多方法让我们来处理backpressure问题。但是有一些特殊的场景根本无法用其来解决,最常见的例如UI事件。而不处理backpressure有可能导致MissingBackpressureException的出现。
关于backpressure的概念可以看一下
http://blog.csdn.net/jdsjlzx/article/details/52717636
为了解决这个问题,在RxJava2里,引入了Flowable这个类:Observable不包含backpressure处理,而Flowable包含。
例如:
Flowable<Long> flowable =
Flowable.create((FlowableOnSubscribe<Long>) e -> {
Observable.interval(10, TimeUnit.MILLISECONDS)
.take(Integer.MAX_VALUE)
.subscribe(e::onNext)
}, FlowableEmitter.BackpressureMode.DROP)
Observable<Long> observable =
Observable.create((ObservableOnSubscribe<Long>) e -> {
Observable.interval(10, TimeUnit.MILLISECONDS)
.take(Integer.MAX_VALUE)
.subscribe(e::onNext)
})
两个对象都以10毫秒一次派发数据,假设订阅他们的方法都是:
i -> {
Thread.sleep(100);
Log.e("lzx", "out : " + i);
}
以100毫秒一次消费数据,消费数据的效率是生产的1/10。那么
对于observable
他会按照0,1,2,3,4…的顺序依次消费,并输出log,而没有消费的数据将会都存在内存中。如果在RxJava1中,内存数据超过128个时将会抛出MissingBackpressureException错误;而在RxJava2中并不会报错,数据会一直放到内存中,直到发生OutOfMemoryError。
对于flowable, 在创建时我们设定了FlowableEmitter.BackpressureMode.DROP,一开始他会输出0,1,2,3….127但之后会忽然跳跃到966,967,968 …。中间的部分数据由于缓存不了,被抛弃掉了。
Single
和Observable,Flowable一样会发送数据,不同的是订阅后只能接受到一次:
Single<Long> single = Single.just(1l);
single.subscribe(new SingleObserver<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Long value) {
}
@Override
public void onError(Throwable e) {
}
});
普通Observable可以使用toSingle转换:Observable.just(1).toSingle()。
Completable
与Single类似,只能接受到完成(onComplete)和错误(onError)
同样也可以由普通的Observable转换而来:Observable.just(1).toCompletable()
可订阅的对象在RxJava1中只有Observable一种,之前我们经常会直接把数据源称作Observable。而在RxJava2中扩充成了4种,因此在之后还是把他们统称为数据源为宜。
Base reactive interfaces
和Flowable的接口Publisher类似,Observable、Single、Completable也有类似的基类。
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
因此许多操作符接受的参数从以前的具体对象,变成了现在的接口:
Flowable<R> flatMap(
Function<? super T, ? extends Publisher<? extends R>> mapper
);
Observable<R> flatMap(
Function<? super T, ? extends ObservableSource<? extends R>> mapper
);
------
Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
由于接收的都是接口,在使用其他遵循Reactive-Streams设计的第三方库的时候,就不需要把他自定义的Flowable转换成标准Flowable了。
Subjects and Processors
io.reactivex.subjects.AsyncSubject,
io.reactivex.subjects.BehaviorSubject,
io.reactivex.subjects.PublishSubject,
io.reactivex.subjects.ReplaySubject,
io.reactivex.subjects.UnicastSubject
在RxJava2中依然存在,但现在他们不支持backpressure。新出现的
io.reactivex.processors.AsyncProcessor,
io.reactivex.processors.BehaviorProcessor,
io.reactivex.processors.PublishProcessor,
io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor
支持backpressure
Other classes
rx.observables.ConnectableObservable变成了io.reactivex.observables.ConnectableObservable< T>和io.reactivex.flowables.ConnectableFlowable< T>
类似的还有rx.observables.GroupedObservable。
Functional interfaces
需要注意的一点是,现在RxJava2的接口方法里加上了throws Exception:
public interface Consumer<T> {
void accept(T t) throws Exception;
}
意味着在这些方法里调用一些会发生异常的方法不需要try-catch了。
Actions
另外大部分接口方法都按照Java8的接口方法名进行了相应的修改,比如上面那个Consumer< T>接口原来叫Action1< T>,而Action2< T>改名成了BiConsumer。
Action3-Action9被删掉了,大概因为没人用。
Functions
同上,基本就是名字的修改和不常用类的删除。
Subscriber
RxJava1里Subscriber只是一个空接口,在新版里Subscriber被赋予了更多的作用,有几个实现类可以供我们使用,例如
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
};
request()方法可以控制当前subscriber需要接收几个事件。而且,还可以调用subscriber.dispose()来断开对信号的监听。
同时,onCompleted方法改成了onComplete。意味着完成时调用这个方法,而不是完成后d
由于Subscription被改掉了(下面会讲到)。如果需要类似以前CompositeSubscription的用法,可以使用:
CompositeDisposable composite2 = new CompositeDisposable();
注意这里需要使用subscribeWith而不是subscribe,因为subscribe方法现在返回void。
Subscription
在RxJava1里,Subscription起到的是订阅桥梁的作用。在RxJava2中,由于Subscription本身和Reactive-Streams里的另外一个同名概念冲突。因此把原本的Subscription改名成了Disposable。
除了上一节里subscribe(Subscriber )方法返回void,其他名为subscribe的方法都返回Disposable
相应的,
- CompositeSubscription 改名成了 CompositeDisposable
- SerialSubscription 和 MultipleAssignmentSubscription被合并到了SerialDisposable里. set() 方法会处理掉就的值,而replace()方法不会。
- RefCountSubscription被移除了

Backpressure
在第一节Observable and Flowable里已经说到了这个问题,在2中,Observable将不会处理backpressure,也就不会发生MissingBackpressureException问题,但内存仍然会缓存多余的数据。
而在使用Flowable时,如果配置Backpressure有问题,那么MissingBackpressureException依然存在。
Schedulers
RxJava2里仍然包含了computation, io, newThread 和 trampoline这些默认线程调度。而immediate被移除了,因为他经常被人错误使用。同时Schedulers.test也被移除了。
Entering the reactive world
将普通方法转换成RxJava的数据源,在RxJava1中,提供了Observable.create()方法,但是这个方法过于强大,但使用时需要注意的东西太多经常会发生错误。
因此在RxJava2中,把原来的fromAsync重命名成了create,fromAsync是一个和create类似但更为简单和安全的方法。这样大部分旧代码都能够继续使用。
Leaving the reactive world
之前如果想把数据源转换成普通的数据对象,需要先转换成BlockingObservable。而在Rxjava2中,可以调用blockingXXX方法直接把数据源转换成对象:
List<Integer> list = Flowable.range(1, 100).toList().blockingFirst()
有一点需要特别注意,在RxJava2里,不建议在Subscriber里抛出错误,这意味着下面的代码可能有一天就不能继续运行了:
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
public void onNext(Integer t) {
if (t == 1) {
throw new IllegalArgumentException();
}
}
public void onError(Throwable e) {
if (e instanceof IllegalArgumentException) {
throw new UnsupportedOperationException();
}
}
public void onComplete() {
throw new NoSuchElementException();
}
};
Flowable.just(1).subscribe(subscriber);
由于上面类似的代码实际中出现得很多,因此在2中提供了safeSubscribe方法,使用它就可以继续在subscriber里抛出错误。
当然,你可以绕过subscribe(subscriber)这个方法,使用类似:
Flowable.just(1).subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
这样的方法,之前的代码仍然可以继续throw错误。
Operator differences
https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0#1x-observable-to-2x-flowable
集成和重载方法,例如新增加,存在着相当大的变化。
总结
可以明显的看到,RxJava2最大的改动就是对于backpressure的处理,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。
原文链接:http://www.jianshu.com/p/850af4f09b61
<script type="text/javascript">
$(function () {
$('pre.prettyprint code').each(function () {
var lines = $(this).text().split('\n').length;
var $numbering = $('<ul/>').addClass('pre-numbering').hide();
$(this).addClass('has-numbering').parent().append($numbering);
for (i = 1; i <= lines; i++) {
$numbering.append($('<li/>').text(i));
};
$numbering.fadeIn(1700);
});
});
</script>
相关推荐
具体实现可能涉及Retrofit2调用服务器获取推荐信息,然后通过RxJava2进行数据处理和分发,最后在View中展示给用户。 总的来说,"Retrofit2+Rxjava2+MVP demo"是一个展示如何在Android应用中有效地使用现代网络请求...
1. **RxJava 2与RxJava 3的主要区别** - **API变更**:RXJava 3对部分API进行了调整,如移除了`rx`包,将所有类型移动到`io.reactivex`和`io.reactivex.rxjava3`包下,这可能导致原有代码无法正常工作。 - **性能...
在Android开发中,Retrofit2和RxJava2的结合使用已经成为构建高效、可测试和易于管理的网络请求框架的首选方案。本项目"RxJava2+Retrofit2实现简单易用的网络请求框架"(RxEasyHttp)旨在提供一个简洁、强大的网络层...
android rxjava2 android rxjava2 android rxjava2
RxJava2 示例 —— 这可能是从 RxJava1 跳到 RxJava2(学习 RxJava2 )最好的示例 DemoRxJava2示例RxJava2 示例——可能是从 RxJava1 跳到 RxJava2(学习 RxJava2)最好的示例演示RxJava 1.x 到 RxJava 2.x 的无缝...
**RxJava2Interop** 是一个专门设计用于在 **RxJava 1.x** 和 **RxJava 2.x** 之间进行互操作的库。这个库的出现主要是因为 RxJava 从第一版升级到第二版时引入了大量的不兼容变化,使得两个版本无法直接混用。...
这个压缩包包含了关于RxJava的全面资源,包括RxJava 1和RxJava 2两个主要版本的文档,中文官方文档,实战实例以及一系列深入解析的博客文章,对于学习和掌握RxJava来说是非常宝贵的资料。 RxJava的核心概念是...
1. **RxJava2基本概念**:包括Observable(可观察者)、Observer(观察者)、subscribe(订阅)以及各种操作符,如map、filter、concatMap、flatMap、zip等。 2. **在Android中的应用**:如何在Activity或Fragment...
本项目标题提到的"安卓retrofit2 + rxjava2 + okhttp3多线程下载"正是一个利用现代Android开发库来优化下载流程的方法。以下将详细介绍这三个库以及它们如何协同工作实现多线程下载。 1. **Retrofit2**: Retrofit...
RxJava2是一个强大的异步处理库,用于在Android和Java应用程序中进行反应式编程。它引入了一种新的编程范式,使得数据流管理和事件处理更加高效、简洁。在这个"RxJava2学习demo"中,我们可以深入理解RxJava2的核心...
根据提供的文件信息,本次知识点梳理将围绕“RxJava2”展开。主要涵盖RxJava2的基础概念、特性及其在Android开发中的应用案例。 ### 一、RxJava2简介 #### 1.1 定义 RxJava2是RxJava的一个重大更新版本,它是一种...
本篇文章将详细介绍如何利用Retrofit2和RxJava2来实现一个MVP架构的应用。 首先,让我们了解Retrofit2。Retrofit是由Square公司开发的一个类型安全的HTTP客户端,它允许开发者通过简单的接口定义与HTTP服务的交互。...
在Android开发中,Retrofit2、RxJava2和OkHttp3是三个非常重要的库,它们共同构建了一个高效、灵活的网络请求框架。本篇文章将深入探讨这三个组件的基础知识,以及如何进行封装和在实际项目中使用。 首先,Retrofit...
在Android开发中,MVP(Model-View-Presenter)架构模式、Retrofit2网络库、RxJava2响应式编程库以及GreenDao数据库是常见的技术组合,用于构建高效、可维护的应用程序。以下将详细讲解这些知识点及其在项目中的应用...
本教程将探讨如何结合MVP(Model-View-Presenter)架构、Retrofit2、OkHttp3和RxJava2来打造一个实用且简约的网络请求解决方案。 **MVP架构** MVP全称为Model-View-Presenter,是一种设计模式,它将业务逻辑、用户...
1. **Operator转换**:RxJava 2和3的某些操作符(Operator)在命名和使用上有所变化,如`map()`、`filter()`、`concatMap()`等。RxJavaBridge通过适配器使这些操作符在两个版本间兼容。 2. **Scheduler兼容**:...
`RxJava2`、`Retrofit2`和`OkHttp`是三个非常流行的库,它们结合在一起可以创建出强大的网络请求解决方案。以下是对这三个库及其整合使用的详细介绍。 **RxJava2** 是一个响应式编程库,它引入了观察者模式的概念,...
1. **RxJava2**: RxJava是一个在Java VM上运行的反应式编程库,它将观察者模式扩展到了多种数据流。RxJava2是RxJava的第二个主要版本,引入了许多改进和修复,包括更好的错误处理、类型安全性和性能提升。它允许...
1. 响应式编程:RXJava2基于响应式扩展(Reactive Extensions)的概念,允许开发者以声明式的方式处理数据流和事件。这种编程方式使得代码更加简洁,可读性更强,同时减少了回调地狱的问题。 2. Observable(可观测...
RxJava2.x 是一个在Android开发中广泛使用的异步编程库,它引入了反应式编程的概念,使得处理事件和数据流变得更加简洁、高效。在这个"rxjava2demo.zip"压缩包中,包含了一个详细的Android RxJava2.x 示例项目,帮助...