`
阅读更多

 

编写不易,转载请注明(http://shihlei.iteye.com/blog/2428152)!

 

一 概述

阅读Hystrix的源码过程,看到很多RxJava的使用,本文对其中一些重要的方法进行总结,为之后Hystrix的讲解做铺垫。

 

如果对应响应式编程,RxJava不是很了解,可以阅读《响应式编程 RxJava》做个简单的入门。

 

版本:

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>

 

二 Observable 操作符

1) defer操作符:

(1)概述

Observable用于封装数据,defer提供懒绑定,直到有订阅时才创建Observable,即冷 Observable;

 

注:

(a)冷热问题

热Observable:创建后马上执行,生成可观察事件。但观察者注册完后,可能从中间消费事件,造成一定量事件无法观察到。

冷Observable:一直等待,直到有观察者订阅他才开始发送数据,因此观察者可以确保收到整个数据序列。

 

(b)使用时通过回调传入一个已有Observable,defer()进行包装

 

(2)demo

package x.rx.rxjava.demo.func;

import io.reactivex.Observable;

/**
 * DeferDemo
 * <p>
 * Observable用于封装数据,defer提供懒绑定,直到有订阅时才创建Observable,即冷 Observable;
 * <p>
 * 注:
 * (a)冷热问题
 * 热Observable:创建后马上执行,生成可观察事件。但观察者注册完后,可能从中间消费事件,造成一定量事件无法观察到。
 * 冷Observable:一直等待,直到有观察者订阅他才开始发送数据,因此观察者可以确保收到整个数据序列。
 * <p>
 * (b)使用时通过回调传入一个已有Observable,defer()进行包装
 *
 * @author shilei
 */
public class DeferDemo {

    public static void main(String[] args) {
        new DeferDemo().run();
    }

    private void run() {
        System.out.println("do hot observable :");
        doHotObservable();

        System.out.println("do cold observable : ");
        doColdObservable();
    }

    /**
     * 热Observable:创建后马上执行,生成可观察事件。但观察者注册完后,可能从中间消费事件,造成一定量事件无法观察到。
     * <p>
     * 注:这里通过just模拟
     */
    private void doHotObservable() {
        Content content = new Content("before");
        Observable<String> observable = Observable.just(content.data);
        content.data = "after";
        //这里输出"befere" ,因为 通过just创建时已经绑定 "before" 的引用,用于保持数据"原生性"很有用
        observable.subscribe(result -> System.out.println(result));
    }

    /**
     * 冷Observable:一直等待,直到有观察者订阅他才开始发送数据,因此观察者可以确保收到整个数据序列。
     * <p>
     * 注:defer 包装一个Observable实现
     */
    private void doColdObservable() {
        Content content = new Content("before");
        Observable<String> observable = Observable.defer(() -> Observable.just(content.data));
        content.data = "after";
        //这里输出"after" ,延迟绑定,直到subscribe()调用时,才调用获取 content.data;用于保证数据"最新性"很有用
        observable.subscribe(result -> System.out.println(result));
    }

    static class Content {
        private String data;

        Content(String data) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "Content{" +
                    "data='" + data + '\'' +
                    '}';
        }
    }
}

 

(3)结果

 

do hot observable :
before
do cold observable : 
after
 

 

分析:

hot observable:生成observable时已经绑定了”before“,所以即使修改content.data=after 对观察者是不可见的。

cold observable(defer):延迟绑定,修改content.data=after 观察者看到的就是”after“,对于订阅到最新数据非常有用。

 

2)lift操作符:

(1)概述

方法负责将一个Observable转换成另一个Observable,转换方式通过Operator提供。

注:

lift()方法是在一个已有Observable上调用,可以用于Observable封装数据变换。

 

(2)demo

 

package x.rx.rxjava.demo.func;


import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

/**
 * LiftDemo:
 * <p>
 * 概述
 * 方法负责将一个Observable转换成另一个Observable,转换方式通过Operator提供。
 * 注:
 * lift()方法是在一个已有Observable上调用,可以用于Observable封装数据变换。
 *
 * @author shilei
 */
public class LiftDemo {

    public static void main(String[] args) {
        new LiftDemo().run();
    }

    private void run() {
        Observable<String> stringObservable = Observable.just("1");

        Observable<Integer> integerObservable = stringObservable.lift(new ObservableOperator<Integer, String>() {
            @Override
            public Observer<? super String> apply(Observer<? super Integer> child) throws Exception {

                return new Observer<String>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        child.onSubscribe(d);
                    }

                    @Override
                    public void onNext(String s) {
                        child.onNext(Integer.valueOf(s));
                    }

                    @Override
                    public void onError(Throwable e) {
                        child.onError(e);
                    }

                    @Override
                    public void onComplete() {
                        child.onComplete();
                    }
                };
            }
        });

        integerObservable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("value : " + integer);
            }
        });
    }
}

 

(3)结果

 

value : 1
 

 

三 线程调度:Scheduler

1)概述

异步执行任务代码:observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())

 

RxJava实现异步的主要结构,核心三大:

(1)Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();

 

(2)Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)

 

(3)Schedulers:静态工厂,返回Scheduler 实例,2.x主要有如下几种:

      • TRAMPOLINE:当前线程单线程执行,任务先进先出,一般用于“测试”目的
      • SINGLE: 共享单线程异步执行,任务先出执行
      • NEW_THREAD:每一个任务一个新的后台线程执行的调度器实例
      • COMPUTATION:用于执行计算型任务,使用固定大小的线程池,线程数=cpu 核数
      • IO:用于执行IO密集型任务,使用大小动态变化的线程池 

2)demo

 

package x.rx.rxjava.demo.scheduler;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

/**
 * SchedulerDemo
 * <p>
 * 异步执行任务代码:observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())
 * <p>
 * RxJava实现异步的主要结构,核心三大:
 * (1)Scheduler:调度器,用于调度一个Worker执行任务
 * 核心方法:Worker createWorker();
 * <p>
 * (2)Scheduler.Worker:执行线程任务
 * 核心方法:schedule(@NonNull Runnable run)
 * <p>
 * (3)Schedulers:静态工厂,返回Scheduler 实例,2.x主要有如下几种:
 * <p>
 * TRAMPOLINE:当前线程执行,任务进度队列,先进先出执行
 * SINGLE: 拥有一个共享线程执行,任务进度队列,先进先出执行
 * NEW_THREAD:每一个任务一个新的后台线程执行的调度器实例
 * COMPUTATION:用于执行计算型任务,使用线程池执行,线程数=cpu 核数
 * IO:用于执行IO密集型任务,使用线程池执行。
 *
 * @author shilei
 */
public class SchedulerDemo {

    public static void main(String[] args) throws Exception {
        System.out.println("当前线程id : " + Thread.currentThread().getId());

        new SchedulerDemo().run();

        // 等待线程执行结束
        TimeUnit.SECONDS.sleep(10);
    }

    private void run() {
        Map<String, Scheduler> schedulers = new LinkedHashMap<>();
        schedulers.put("trampoline1", Schedulers.trampoline());
        schedulers.put("trampoline2", Schedulers.trampoline());

        schedulers.put("single1", Schedulers.single());
        schedulers.put("single2", Schedulers.single());

        schedulers.put("newThread1", Schedulers.newThread());
        schedulers.put("newThread2", Schedulers.newThread());

        schedulers.put("computation1", Schedulers.computation());
        schedulers.put("computation2", Schedulers.computation());

        for (Map.Entry<String, Scheduler> entry : schedulers.entrySet()) {
            String type = entry.getKey();
            Scheduler scheduler = entry.getValue();
            doWork(type, scheduler);
        }
    }

    private void doWork(String info, Scheduler scheduler) {
        Scheduler.Worker worker = scheduler.createWorker();

        Runnable job = () -> {
            System.out.print("类型:" + info);
            System.out.println("  ,     线程id:" + Thread.currentThread().getId() + " do job! ");
        };

        worker.schedule(job);
    }
}

 

 

3)结果

 

当前线程id : 1
类型:trampoline1  ,     线程id:1 do job! 
类型:trampoline2  ,     线程id:1 do job! 
类型:single1  ,     线程id:12 do job! 
类型:single2  ,     线程id:12 do job! 
类型:newThread1  ,     线程id:13 do job! 
类型:newThread2  ,     线程id:14 do job! 
类型:computation1  ,     线程id:15 do job! 
类型:computation2  ,     线程id:16 do job!
  

 

四 解除订阅,中断任务:Disposable

1)概述

用于Observer控制执行过程中解除订阅,典型场景:

(1)业务逻辑:最多处理n个事件,剩下的忽略

(2)超时,解除事件等待:如hystrix的超时熔断

 

2)Demo

(1)Disposable

用于主动解除订阅,核心方法 dispose();

注:

如果任务异步执行且阻塞时,调用dispose() 会中断阻塞。

如果任务正在执行非阻塞状态,则不会中断,会一直执行完,退出。

package x.rx.rxjava.demo.observer;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;


/**
 * 异步执解绑定阻塞执行查看
 * <p>
 * Disposable
 * 用于主动解除订阅,核心方法 dispose();
 * 注:
 * 如果任务异步执行且阻塞时,调用dispose() 会中断阻塞。
 * 如果任务正在执行非阻塞状态,则不会中断,会一直执行完,退出。
 *
 * @author shilei0907
 */
public class DisposableDemo {


    public static void main(String[] args) throws Exception {
        new DisposableDemo().run();

        // 等待线程执行结束
        TimeUnit.SECONDS.sleep(10);
    }

    private void run() throws Exception {
        Observable<Boolean> observable = getObservable();


        Disposable disposable = observable.observeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.newThread())
                .subscribe(result -> {
                    System.out.println("result : " + result);
                }, e -> {
                    e.printStackTrace();
                });

        //等待启动
        TimeUnit.SECONDS.sleep(1L);

        // 取消任务
        disposable.dispose();

        System.out.println("all Finish !");
    }

    private Observable<Boolean> getObservable() {
        Observable<Boolean> observable = Observable.create(emitter -> {
            System.out.println(Thread.currentThread().getId() + " start ! ");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Throwable t) {
                t.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + " finish  ! ");
        });

        //延迟执行
        return Observable.defer(() -> observable);
    }
}

     结果:

 

12 start ! 
all Finish !
12 finish  ! 
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at x.rx.rxjava.demo.observer.DisposableDemo.lambda$getObservable$2(DisposableDemo.java:57)
	at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
	at io.reactivex.Observable.subscribe(Observable.java:12051)
	at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:39)
	at io.reactivex.Observable.subscribe(Observable.java:12051)
	at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)
	at io.reactivex.Observable.subscribe(Observable.java:12051)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:579)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
   

 

(2) CompositeDisposable

用于添加一组Disposable,快速解除订阅,清理资源时很有用。

 

核心方法:

add(): 添加到组中

clear(): 快速解除所有订阅,中断所有添的“被观察者”正在执行的任务。

 

package x.rx.rxjava.demo.observer;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;

/**
 * CompositeDisposableDemo
 * <p>
 * 用于添加一组Disposable,快速解除订阅,清理资源时很有用。
 * <p>
 * 核心方法:
 * add(): 添加到组中
 * clear(): 快速解除所有订阅,中断所有添的“被观察者”正在执行的任务。
 *
 * @author shilei0907
 * @version 1.0, 2018/8/6
 */
public class CompositeDisposableDemo {

    public static void main(String[] args) {
        new CompositeDisposableDemo().run();
    }

    private void run() {
        CompositeDisposable compositeDisposable = new CompositeDisposable();


        Observable.just("message").subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                //添加待管理的订阅
                compositeDisposable.add(d);
            }

            @Override
            public void onNext(String s) {
                System.out.println("receive : " + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

        // 清理绑定关系
        compositeDisposable.clear();

    }
}

 

分享到:
评论

相关推荐

    Android代码-RxJava2Examples

    RxJava2 Examples——它可能是从RxJava1跳到RxJava2(学习RxJava2)最好的例子Demo &gt; RxJava 1.x 到 RxJava 2.x 的无缝对接 无需学习 RxJava 1.x, 直接学习 RxJava 2.x 完备齐全的操作符示例 支持与 Retrofit 交互...

    rxjava2demo.zip

    在这个"rxjava2demo.zip"压缩包中,包含了一个详细的Android RxJava2.x 示例项目,帮助开发者理解和应用这个强大的工具。 反应式编程是一种编程范式,它将数据流和变换处理视为计算,而不是作为独立的事件进行处理...

    RxJava2Examples-master:测试

    RxJava2ExamplesRxJava2 Examples——它可能是从RxJava1跳到RxJava2(学习RxJava2)最好的例子Demo号外:听说「nanchen」搞了一个 Android 开发者的免费福利,不行你看:RxJava 1.x 到 RxJava 2.x 的无缝对接无需...

    retrofit2与rxjava结合的demo

    本篇文章将详细讲解如何将Retrofit2与RxJava结合使用,通过一个具体的Demo来阐述这一过程。 首先,Retrofit是由Square公司开发的一款Android和Java的网络请求库,它通过简洁的API设计,使得网络请求变得更加简单。...

    Rxjava2 demo

    这个"RxJava2 Demo"旨在展示如何在实际应用中有效地利用RxJava2的功能。RxJava的核心概念是响应式编程,这是一种处理数据流和变换的方式,特别适合于Android平台的异步编程。 在描述中提到的"rxjava2.0版本"是...

    Rxjava+Retrofit的简单使用Demo

    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // For RxJava 2.x .build(); ``` 4. **调用网络请求并订阅** 现在,我们可以直接通过接口调用网络请求,并使用RxJava的订阅方式处理结果: ```java ...

    rxjavahellodemo.zip

    RxJava 1.x 是 RxJava 的早期版本,虽然现在已经有了 RxJava 2.x 和 3.x,但 1.x 版本仍然是许多现有项目的基础。 首先,让我们回顾一下 RxJava 的核心概念。在 RxJava 中,一切都可以看作是“Observable”(可观察...

    Retrofit结合RxJava编程的简单Demo

    implementation 'io.reactivex.rxjava2:rxjava:2.x.x' // RxJava2 implementation 'io.reactivex.rxjava2:rxandroid:2.x.x' // 用于Android平台的特定支持 } ``` 接下来,定义一个使用Retrofit的网络接口。例如,...

    sqlite数据库使用demo

    在`build.gradle`文件中,确保你已经包含了`androidx.room:room-runtime`和`androidx.room:room-rxjava2`(或`room-coroutines`,根据你的选择)依赖,它们提供了对SQLite数据库的抽象层,使得操作更加简便: ...

    RxJavaEssentials全章Demo

    10. **Flowable**:对于大数据流或背压需求,RxJava 2.x引入了Flowable,它是处理背压的Observables,适用于大量数据传输场景。 在RxJavaEssentials-master这个压缩包中,你将找到各种示例代码,涵盖了上述知识点,...

    Android 网络请求的那些事Demo

    2. 异步处理:由于Android主线程不能执行耗时操作,网络请求通常在子线程或AsyncTask中进行,防止ANR(Application Not Responding)问题。 二、HTTP GET和POST方法 1. GET请求:GET是最简单的HTTP请求,用于获取...

    retrofitDemo

    **Retrofit简介** ...Retrofit的核心理念是将网络请求转换为Java接口调用,大大降低了网络编程的复杂度。...在实际项目中,Retrofit与其他库(如RxJava、LiveData等)结合使用,能进一步提升网络请求的灵活性和可维护性。

    网易新闻客户端学习demo

    8. **异步处理**:Android应用的网络操作不能在主线程中执行,需使用AsyncTask、Handler、Thread或Retrofit+RxJava进行异步处理,避免阻塞UI。 9. **权限管理**:Android 6.0及以上版本需要在运行时动态申请网络...

    reactive-微服务的demo-Java后台

    而Spring Boot 2.x开始,Spring官方全面支持了反应式编程,与RSocket、MongoDB等反应式数据库配合,可以构建完整的反应式数据流。 在压缩包文件名"reactive-ms-example-master"中,我们可能找到一个包含完整示例...

    databinding的demo代码

    - ViewModel类:作为数据绑定的目标对象,通常继承自`androidx.lifecycle.ViewModel`,用于存储和管理UI相关的数据。 - 数据绑定表达式:可以直接在布局中调用ViewModel的方法,如`@{viewModel.someMethod()}`。 ...

    Android MVVM demo

    5. **生命周期管理**:ViewModel应该实现`androidx.lifecycle.ViewModel`接口,确保在配置变更时存活,避免数据丢失。 6. **通信机制**:使用Messenger或其他方式(如LiveData、RxJava)实现ViewModel之间的通信或...

    Retrofit精简版

    通过`Retrofit.Builder().addCallAdapterFactory(RxJava2CallAdapterFactory.create())`,可以在Retrofit中使用RxJava进行异步数据处理,极大地提高了代码的优雅性和可读性。 #### 七、Retrofit实战案例 在给定的...

    updateDemo

    每当一个线程完成其部分下载,应用程序需要更新进度条的值,这通常通过Handler或RxJava等异步编程模型来实现,确保UI更新与下载操作在主线程中同步进行,避免阻塞用户界面。 为了实现上述功能,开发者可能使用了...

    android Tab页切换的框架demo

    2. **ViewPager**: ViewPager是一种滑动页面的容器,常用于实现水平滑动的页面切换效果。在Tab页切换中,ViewPager与TabLayout结合使用,允许用户通过滑动或点击Tab来切换Fragment。 3. **TabLayout**: TabLayout是...

    Android-观众头像进入直播间的效果demo

    2. **动画框架**:Android提供了多种动画框架,如Tween Animation(补间动画)、Property Animation(属性动画)。在头像进入直播间的效果中,可能使用了属性动画,通过ObjectAnimator或ValueAnimator来改变头像的...

Global site tag (gtag.js) - Google Analytics