编写不易,转载请注明(http://shihlei.iteye.com/blog/2426891)!
一 概述
最近使用Hystrix,看文档Hystrix底层基于Rxjava实现,很好奇,就一起研究了下,做个总结
二 响应式编程(Reactive Programming)
定义:一种基于异步数据流概念的编程模式。
核心:事件,可以被触发,传播,订阅
特点:易于并发,易于编写按条件执行代码,避免大量回调
最常见的使用场景:发送网络请求,获得结果,提交事件,更新
注:主要是基于观察者模式,关于观察者模式可以参考我之前的文章:《观察者模式及Guava EventBus》
三 RxJava
1)概述
Rx: ReactiveX, 官方定义是基于“观察者模式”,实现基于事件流异步处理的lib,是用于实现响应式编程的框架。
特点:
(a)链式调用,异步处理复杂问题,
(b)使用推的方式,有数据主动推给消费者
主要类:
1)被观察者:Observable
(1)注册观察者 subscribe()
(2)事件回调
(3)增加如下方法
onComplated():用于生产者没有更多数据可用时发出通知
onError():生产者发生错误时能够发出通知
(4)Observables 能够组合而不是嵌套,避免发生大量嵌套回调
2)观察者:Observer
(1)事件消费 consumer()
(2)提供如下回调可以可以模块处理各种情况
2)简单demo
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>
(2)代码
就是个简单的“ 观察者模式 ” ,看看怎么做:
package x.rx.rxjava.demo; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; /** * RxJava Demo * <p> * * @author shilei */ public class RxJavaDemo { public static void main(String[] args) { RxJavaDemo rxJavaDemo = new RxJavaDemo(); rxJavaDemo.runObservable(); } /** * 1)创建一个可观察对象,不断提供事件数据 * 2)创建一个观察者,提供事件处理方法 */ public void runObservable() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }).subscribe(new Consumer<String>() { //接受事件处理 @Override public void accept(String s) throws Exception { System.out.println("handle Event : " + s); } }, new Consumer<Throwable>() { // 事件产生过程产生error时 @Override public void accept(Throwable throwable) throws Exception { System.out.println("Observable : onError " + throwable.getMessage()); } }, new Action() { // 收到complete通知时 @Override public void run() throws Exception { System.out.println("Observable : onComplete"); } }); } }
3)源码分析:看看“观察者模式”怎么实现的
(1)创建一个被观察者:Observable.create()
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
其中主要的被观察者:ObservableCreate<T>(source) 对象,包装了产生事件的回调方法
(2)注册观察者:subscribe()方法,这里重要的是创建一个观察者:LambdaObserver,主要为支持流式开发,会将方法拆成函数接口填充。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //省略:。。。。。。 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
继续调用重载方法
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //省略:。。。。。。 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { //省略:。。。。。。 } }
找到最终的核心:回到subscribeActual(observer); 方法subscribeActual(observer) 是一个抽象方法,具体取决于我们创建Observable,Demo中,就是(1)中create的 new ObservableCreate<T>(source);看看他的subscribeActual 实现:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
主要是创建一个CreateEmitter<T> 用于调用推送数据的回调。并关联了刚才的“观察者”,上面的 LambdaObserver
这里最重要的一步,启动推动数据:source.subscribe(parent); 这步就是我们demo中提供的回调方法,还记得吗,会放下
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }
传进来的就是刚才源码中的内部类CreateEmitter<T>,所以调用 onNext,onComplete,onError 等都是调用 CreateEmitter<T> 的,其底层完成 LambdaObserver 的调用,及执行我们通过subscribe()提供的各种绑定方法
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } //省略 。。。。。。 }
至此流程调用链得以执行,绕了这么大圈好坏,我还要想想。
4)Observable(被观察者)常用操作
(1)创建:
Observable.from() :遍历集合或数组,逐个推送其中的数据。
Observable.just(): 推送一个普通的Java函数的返回值:非常有用,我们可以将远程调用的结果,推送到处理类中。
Observable .just():可以接受多个参数进行推送
Observable.timer()、Observable.interval(): 定时发送,可以用于代替定时任务
(2)预处理:
map() 在“观察者”消费前,将处理逻辑作用于每个推送的数据
(3)过滤:
filter() 从推送的数据中过滤出想要的进行处理
distinct() 排重,用于推送是出错引起的重复推送的情况(推送的数据一般实现Comparable接口)
distinctUnitChange() 观察者可能一直有数据产生,这里要求直到有数据变化,观察者才能接收到
(4)组合:同时处理多个来源的事件
marge() 将多个Observable 的数据合并到一起处理
concat() 顺序执行,特别适合使用队列的情况
四 RxJava 应用场景举例
1)场景一:异步网络请求
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.0.6.RELEASE</version> </dependency>
(2)代码
package x.rx.rxjava.demo; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.web.client.RestTemplate; /** * RxJava 异步网络请求 * * @author shilei */ public class WeatherRxJavaDemo { private static final String WATHER_OPEN_API_URL = "http://www.weather.com.cn/data/cityinfo/101010100.html"; public static void main(String[] args) throws Exception { new WeatherRxJavaDemo().run(); //等待异步处理结束 TimeUnit.SECONDS.sleep(1L); } private void run() { Observable .create(emitter -> { try { emitter.onNext(getWatherResult()); emitter.onComplete(); }catch (Throwable e){ emitter.onError(e); } }) .subscribeOn(Schedulers.newThread())//异步 .subscribe(weatherResult -> { //正常获得结果 System.out.println("weather : " + weatherResult); }, throwable -> { //异常记录日志 System.out.println("weather error : " + throwable.getMessage()); }); } private String getWatherResult() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8)); return restTemplate.getForObject(WATHER_OPEN_API_URL, String.class); } }
2)场景二:Observable 处理链
使用concat 连接Observable ,任何一个满足要求推送数据,后面的就不会执行。
这里实现cache的逐级读取模拟。
package x.rx.rxjava.demo; import java.util.Objects; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.schedulers.Schedulers; /** * rxjava demo: 实现数据逐级缓存异步读取 * <p> * * @author shilei */ public class ObservableConcatDemo { public static void main(String[] args) throws Exception { new ObservableConcatDemo().run(); } private void run() throws Exception { //任务1 Observable<String> memeryCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "memery data"; // 模拟从内存查询数据,这里模拟查不到 if (Objects.nonNull(data)) { // 推送从memery查到了,推动结果,同时整个流程 emitter.onNext(data); } else { //查差不到,结束 System.out.println("get date from memery : null"); emitter.onComplete(); } } }); Observable<String> redisCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = null; //模拟从redis 查询数据,这里模拟查不到 if (Objects.nonNull(data)) { //从redis 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from redis : null"); emitter.onComplete(); } } }); Observable<String> mysqlJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "db data"; //模拟从db 查询数据,查询成功 if (Objects.nonNull(data)) { //db 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from db : null"); emitter.onComplete(); } } }); // 使用concat 任何一个满足要求推送数据,后面的就不会执行。 Observable.concat(memeryCacheJob, redisCacheJob, mysqlJob) .subscribeOn(Schedulers.newThread()) //异步 .subscribe(data -> { System.out.println("data result : " + data); }); //避免主线程退出 TimeUnit.SECONDS.sleep(1L); } }
相关推荐
读书笔记:Android中的响应式编程RxJavaRxAndroid
读书笔记:1.信鸽推送接入 2响应式编程Rxjava和retrofit结合使用
【Android响应式编程RxJava2完全解析】 RxJava2是一个在Android开发中广泛使用的响应式编程库,它的核心概念在于异步处理,通过Observables(被观察者)和Observers(观察者)之间的交互来实现数据流的管理和事件的...
在开发手机 App、 Web App 时, 要想保证对用户 请求的实时响应,给用户带来流畅的体验,响应式编程是一个不错的选择, RxJava 则是这种编程模式的 Java 实现。本书主要介绍如何使用 RxJava 进行响应式编程 。全书...
响应式编程RxJava 与Java中的传统Pull方法相比,React式编程具有Push方法。 早些时候,应用程序会担心从数据库/服务中提取数据,但是通过响应式编程,可观察对象使我们能够将数据作为通知推送。 响应式编程是事件...
在Java领域,响应式编程的一个流行实现是RxJava(Reactive Extensions for Java)。RxJava使用可观察序列(Observables)作为数据流模型,观察者(Observers)订阅这些序列来处理数据项,每个数据项依次被推送(异步...
响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。 响应式编程的一个关键概念是事件。事件可以被等待,可以触发...
**RxJava 响应式编程** RxJava是一个用于处理异步数据流的库,它引入了响应式编程的概念,使得事件处理和数据流管理变得更加简洁和高效。在Android开发中,RxJava尤其受到欢迎,因为它能够帮助开发者更好地处理复杂...
的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。想 必大家肯定被近期的 Kotlin 炸开了锅,笔者也在闲暇之时去了解了一番(作为一个与时俱进的有 理想的青年怎么可能不...
**RxJava响应式编程原理** 响应式编程是一种编程范式,它主要关注数据流和变化传播,特别适用于处理异步事件和数据。RxJava是一个基于Java的响应式编程库,它利用观察者模式来处理这些事件和数据流。观察者模式中,...
Java响应式编程是一种高效、灵活且可扩展的编程模型,特别是在构建微服务时,它能够充分利用现代硬件资源,提供更好的性能和用户体验。Red Hat公司的"Building Reactive Microservices in Java"是一本深入探讨这一...
在移动开发领域,尤其是在Android平台上,MVP(Model-View-Presenter)架构模式、Retrofit网络库和RxJava响应式编程库是开发者常用的技术组合。这个名为"mvp-retrofit-rxjava-recyclerview-master.zip"的压缩包,...
通过研究rxjava,从中得到启发,自己重写了一个精简的响应式编程框架,这套框架与rxjava用法类似,采用链式调用规则,代码更精简,结构轻巧,扩展性强。此框架目前处在雏形阶段,希望能够抛砖引玉,大家有什么宝贵...
读书笔记:rxJava响应式编程在安卓中的使用
精通高级RxJava2响应式编程思想移动端开发秒速教程.txt
Java编程方法论响应式RxJava与代码设计实战
读书笔记:http retrofit rxjava cache 网络缓存,响应式编程
本篇文章将深入探讨一个基于Android开发的阅读应用——ReadDaily,它巧妙地集成了MVP(Model-View-Presenter)设计模式、Retrofit网络库、RxJava响应式编程、Jsoup网页解析工具以及Glide图片加载库,实现了高效、...
本书配套视频及其他响应式系列配套视频如下:Java编程方法论-响应式篇-RxJava分享视频已完结bilibili: : 油管: : list PL95Ey4rht798MMCusPzIW7VYD1xaKJVjcJava编程方法论-响应式篇-Reactor分享视频已完结B站: : ...