`
后来我们都老了
  • 浏览: 34667 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Rxjava学习

    博客分类:
  • java
阅读更多

1 基本概念

1.1 Rx概念

一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库,

其实 RxJava 的本质就是一个可以实现异步操作的库

1.2 Rx优势

同样是做异步,为什么人们用它,而不用现成的 Async / Future / XXX / ... 一个词:简洁! 异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 随着程序逻辑变得越来越复杂,它依然能够保持简洁,对日后代码维护省去不少力气。k

1.3 Rx结构

响应式编程的主要组成部分是observable, operator和susbscriber,网上大多数文章都是介绍说有两部分,我这里把operator操作符也加进去了,这样对结构的整体性会有更全面的认识)。 一般响应式编程的信息流如下所示:

Observable > Operator 1 > Operator 2 > Operator 3 > Subscriber

也就是说,observable是事件的生产者,subscriber是事件最终的消费者。 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。 如果我们不需要修改事件,就不需要在observable和subscriber中插入operator。这时的Rx结构如下:

Obsevable > Subscriber

这里看起来跟设计模式中的观察者模式很像,他们最重要的区别之一在于在没有subscriber(观察者模式中的observer)之前,observable(被观察者)不会产生事件。

1.4 最简单的模式

 

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {
                for (int i = 0; i < students.size(); i++) {
                    subscriber.onNext(students.get(i));
                }
                subscriber.onCompleted();
            }
        });
        //观察者1
        Subscriber<Student> subscriber = new Subscriber<Student>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Student student) {
                System.out.println(student.getAge());
            }
        };
        observable.subscribe(subscriber);

 

 

如果我们不关心subscriber是否结束(onComplete())或者发生错误(onError()),subscriber的代码可以简化为

 

 

Observable.just(student1,student2,student3,student4).subscribe(new Action1<Student>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});

 

 

Observable.from(students).subscribe(new Action1<Stußßdent>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});

 

 

很明显跟subscriber比起来,action相当于只有onNext()方法,因此,这种方式叫做不完整定义。

1.5 加入operator

1.5.1 map操作符

很多时候,我们需要针对处理过的事件做出响应,而不仅仅是Observable产生的原始事件。 意淫一下,加入我还要再输入每个student的自我介绍怎么办?传统方法for循环? 可以,但rxjava可以这么做

 

Observable.from(students)
                .map(new Func1<Student, Student>() {
                    @Override
                    public Student call(Student student) {
                        log(student.getIntroduce());
                        return student;
                    }
                })
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        log(student.getAge());
                    }
                });

 

 

1.5.2 flatMap操作符

坑爹,繁琐的需求又来了,每个student可以选择不同的课程,1对多... 怎么操作这些课程呢?传统方式嵌套for循环? No,rxjava可以这么做

 

Observable.from(students)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    log("name:" + student.getName());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    log("course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    log("onError");
                }

                @Override
                public void onNext(Course course) {
                    log(" teacher:" + course.getTeacherName());
                }
            });

 

 

1.5.3 filter操作符

如果我想在所有的学生中找出19岁以上怎么办?filter!

 

Observable.from(studentList)
            //获取19岁以上的
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });

 

 

1.5.4 take操作符

如果我不想要这么多学生我只想要一个怎么办呢?take!

 

Observable.from(studentList)
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .take(1)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });

 

rxjava提供大量的操作,这里只介绍几个最基本的,有兴趣的同学可以会下自行学习.

1.6 取消订阅

实际上执行Observable.subscribe()时,它会返回一个Subscrition,它代表了Observable和Subscriber之间的关系。你可以通过Subscrition解除Observable和Subscriber之间的订阅关系,并立即停止执行整个订阅链。

subscription.unsubscribe();

2 调度器Schedulers

2.1 基本概念

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。

2.2 调度类别

Schedulers类别有很多种,下面介绍几个常用的,Android专用的在此不做介绍,有兴趣的同学请线下交流。

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

2.3 调度器的使用

subscribeOn():指定 subscribe() 所发生的线程,或者叫做事件产生的线程

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .subscribeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
输出:
main
RxNewThreadScheduler-1
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1
RxNewThreadScheduler-1:20


 
 从上图可以看出,线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;
observeOn():指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
 
Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {  
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .observeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
 
输出:
main
main
main
main
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1:20

从上图可以看出,线程切换发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程
3 应用场景
3.1 缓存降级
3.2 异步接口
可以实现异步接口调用的方法有很多,比如@Async/Future/CountdownLatch... 
  • 大小: 65.4 KB
  • 大小: 101.6 KB
0
0
分享到:
评论

相关推荐

    rxjava学习代码

    这个“rxjava学习代码”压缩包提供了一种深入理解RxJava 2.x版本的途径,通过实际的代码示例和详细的注释,为初学者提供了宝贵的资源。 首先,让我们了解一下什么是响应式编程。响应式编程是一种编程范式,它关注...

    读书笔记:没有背压的Rxjava学习曲线极低的响应式编程.zip

    读书笔记:没有背压的Rxjava学习曲线极低的响应式编程

    rxjava学习资料

    总之,RxJava学习资料为那些希望在Android开发中使用响应式编程模型的Java程序员提供了丰富的知识资源。通过学习RxJava,开发者能够设计出更加灵活、高效、响应快的应用程序,有效地克服Android平台的限制,实现事件...

    RxJava学习Demo

    **RxJava学习Demo详解** RxJava是一个用于处理异步数据流和进行事件发布的库,它在Android开发中尤其受欢迎。这个“RxJava学习Demo”旨在帮助开发者深入理解RxJava的核心概念和实际应用。 1. **核心概念** - **...

    MVP+Retrofit+RxJava学习

    通过深入学习和实践"MVP+Retrofit+RxJava",开发者不仅可以提升Android应用的开发效率,还能增强对现代软件设计原则的理解,这对于职业发展非常有益。在实际项目中,这种技术栈经常被采用,因为它们能有效地处理...

    RxJava的学习demo

    **RxJava学习Demo详解** RxJava,全称为Reactive Extensions for Java,是一种基于观察者模式的响应式编程库,广泛应用于Android开发中。它通过提供一系列灵活的操作符,使得异步编程、事件处理以及数据流管理变得...

    RxBasicsKata:RxJava学习者的实际挑战

    RxJava学习者的实际挑战 使用JUnit测试作为接受标准来学习RxJava会遇到一系列简单的代码挑战。 侧重于一些基本概念,但尚未涵盖任何Android主题。 当前实施 依存关系: RxJava 2.1.7 JUnit 4.12 涵盖的React类型:...

    RXJava学习系列

    资源中包含RX控件源码(RxAndroid-2.x.zip,RxFile-master.zip,RxJava-2.x.zip)以及RX涉及的JAR包调用。最后还提供了一个DEMO,通过RXJAVA实现一个天气预报功能。

    Rxjava2学习demo

    在这个"RxJava2学习demo"中,我们可以深入理解RxJava2的核心概念和API,并通过实践提升我们的技能。 1. **基本概念**: - **Observable(可观察者)**:是数据的生产者,可以发送数据或通知给订阅者。 - **...

    Reactive Programming with RxJava

    通过阅读这本书,读者可以了解到如何利用RxJava库来创建异步的、基于事件的应用程序,同时书中的内容可以作为增量学习工具,帮助开发者逐步理解和掌握RxJava,这说明了RxJava学习曲线虽然陡峭,但通过阅读本书可以...

    RxJava和RxAndroid学习例子大全

    "RxJava和RxAndroid学习例子大全"包含了各种示例,可以帮助开发者深入理解RxJava和RxAndroid的使用。`appcompat_v7_2`可能包含支持库,用于兼容不同版本的Android设备;`Android_RxJava`则可能是一系列示例代码,...

    RxJava入门学习资料-RxJava Essentials(英/中文版合集)

    《RxJava入门学习资料-RxJava Essentials》是针对 RxJava 进行系统学习的重要资源,包含英文版和中文版,适合编程初学者以及对响应式编程感兴趣的开发者。RxJava 是一个在 Java VM 上使用 Reactive Extensions 的库...

    RxJava 2.0学习文档

    ### RxJava 2.0 学习文档 #### 基本概念 **RxJava** 是一种基于 Java 的响应式编程库,它提供了一种优雅的方式来处理异步数据流。RxJava 的核心思想是将数据流视为一系列事件,并允许开发者以声明式的方式来处理...

    RxJava2 示例 - 这可能是从 RxJava1 跳到 RxJava2(学习 RxJava2 )最好的示例 Demo.zip

    RxJava2 示例 —— 这可能是从 RxJava1 跳到 RxJava2(学习 RxJava2 )最好的示例 DemoRxJava2示例RxJava2 示例——可能是从 RxJava1 跳到 RxJava2(学习 RxJava2)最好的示例演示RxJava 1.x 到 RxJava 2.x 的无缝...

    最简单易懂的RxJava2学习教程(一)

    本文主要针对"RxJava2学习"进行详细讲解,帮助开发者理解其核心概念和用法。 首先,我们需要了解RxJava的核心理念——观察者模式。在RxJava中,数据的生产者被称为Observable(可观察者),而数据的消费者被称为...

    Android,Retrofit2,Dagger2,RxJava2学习

    学习这三个库是提升Android开发能力的重要步骤,Retrofit2让网络请求变得简单,Dagger2帮助管理依赖关系,而RxJava2则提供了一种强大的处理异步操作的工具。通过阅读文档和实践项目,开发者可以深入理解它们的工作...

    retrofit学习、retrofit rxjava封装、retrofit mvp rxjava

    Retrofit是Android开发中一款流行的...学习并熟练掌握Retrofit和RxJava的结合使用,对于提升Android应用的开发效率和质量至关重要。在实际项目中,可以参考提供的`retrofitdemo`示例代码,进一步理解这些概念和技术。

    RxJavaSearchMVP:您可以在此项目中使用RxJava学习MVP

    总的来说,RxJavaSearchMVP项目是一个绝佳的学习资源,它将理论与实践相结合,帮助开发者提升在Java和Android应用开发中的技能,特别是对响应式编程和MVP架构的理解和应用。通过深入研究和模仿这个项目,开发者能够...

    超详细的Rxjava文档(包括Rxjava1,Rxjava2,中文官方文档,实战实例,博客详文等)

    这个压缩包包含了关于RxJava的全面资源,包括RxJava 1和RxJava 2两个主要版本的文档,中文官方文档,实战实例以及一系列深入解析的博客文章,对于学习和掌握RxJava来说是非常宝贵的资料。 RxJava的核心概念是...

    老罗Rxjava安卓视频教程

    通过上述内容的学习,我们了解了RxJava的基本概念、使用方法以及高级特性。RxJava作为一种强大的异步编程工具,在Android开发中有着广泛的应用场景。掌握这些知识点不仅有助于提高代码的健壮性和可维护性,还能帮助...

Global site tag (gtag.js) - Google Analytics