`

RxJava源码解析(三)背压+源码+同步异步+原理

阅读更多

系列文章第三篇
承接上文:RXjava解析(二)我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?
顺手留下GitHub链接,需要获取相关面试等内容的可以自己去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

背压问题

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

简而言之,背压是流速控制的一种策略。

需要强调两点:

  • 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。

响应式拉取(reactive pull)

首先我们回忆之前那篇《关于Rxjava最友好的文章》,里面其实提到,在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。

结构示意图如下:
观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

源码

  public class FlowableOnBackpressureBufferStategy{
  ...
        @Override
        public void onNext(T t) {
           if (done) {
             return;
           }
           boolean callOnOverflow = false;
           boolean callError = false;
           Deque<T> dq = deque;
           synchronized (dq) {
              if (dq.size() == bufferSize) {
                     switch (strategy) {
                     case DROP_LATEST:
                         dq.pollLast();
                         dq.offer(t);
                         callOnOverflow = true;
                         break;
                     case DROP_OLDEST:
                         dq.poll();
                         dq.offer(t);
                         callOnOverflow = true;
                         break;
                     default:
                          // signal error
                          callError = true;
                          break;
                     }
        } else {
                     dq.offer(t);
          }
  }
      if (callOnOverflow) {
            if (onOverflow != null) {
                try {
                   onOverflow.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.cancel();
                    onError(ex);
                }
           }
          } else if (callError) {
            s.cancel();
            onError(new MissingBackpressureException());
          } else {
            drain();
          }
    }
  ...
  }

在这段源码中,根据不同的背压策略进行了不同的处理措施,当然这只是列举了一段关于buffer背压策略的例子。

根源

产生背压问题的根源就是上游发送速度与下游的处理速度不均导致的,所以如果想要解决这个问题就需要通过匹配两个速率达到解决这个背压根源的措施。
通常有两个策略可供使用:

  1. 从数量上解决,对数据进行采样
  2. 从速度上解决,降低发送事件的速率
  3. 利用flowable和subscriber

使用Flowable

  Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
              Log.d(TAG, "emit 1");
              emitter.onNext(1);
              Log.d(TAG, "emit 2");
              emitter.onNext(2);
              Log.d(TAG, "emit 3");
              emitter.onNext(3);
              Log.d(TAG, "emit complete");
              emitter.onComplete();
          }
       }, BackpressureStrategy.ERROR); //增加了一个参数
       Subscriber<Integer> downstream = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
               Log.d(TAG, "onSubscribe");
               s.request(Long.MAX_VALUE); //注意这句代码
        }

        @Override
        public void onNext(Integer integer) {
              Log.d(TAG, "onNext: " + integer);
        }
        @Override
        public void onError(Throwable t) {
              Log.w(TAG, "onError: ", t);
        }
        @Override
         public void onComplete() {
               Log.d(TAG, "onComplete");
         }
   };
   upstream.subscribe(downstream);

我们注意到这次和 Observable 有些不同. 首先是创建 Flowable 的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用 BackpressureStrategy.ERROR 这种方式,这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException . 其余的策略后面再来讲解.

另外的一个区别是在下游的 onSubscribe 方法中传给我们的不再是 Disposable 了, 而是 Subscription , 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用 Disposable.dispose() 方法可以切断水管, 同样的调用 Subscription.cancel() 也可以切断水管, 不同的地方在于 Subscription增加了一个 void request(longn) 方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

  s.request(Long.MAX_VALUE);

这是因为 Flowable 在设计的时候采用了一种新的思路也就是 响应式拉取 的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的 控制数量 和 控制速度 不太一样, 这种方式用通俗易懂的话来说就好比是 叶问打鬼子 , 我们把 上游看成 小日本 , 把 下游 当作 叶问 , 当调用 Subscription.request(1) 时, 叶问 就说 我要打一个! 然后 小日本 就拿出 一个鬼子 给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用 request(10) , 叶问就又说 我要打十个! 然后小日本又派出 十个鬼子 给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...

所以我们把request当做是一种能力, 当成 下游处理事件 的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

同步情况

  Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; ; i++) { //无限循环发事件
                    emitter.onNext(i);
            } 
       }
  }).subscribe(new Consumer<Integer>() {
       @Override
       public void accept(Integer integer) throws Exception {
            Thread.sleep(2000);
            Log.d(TAG, "" + integer);
       }
  });

当上下游工作在 同一个线程 中时, 这时候是一个 同步 的订阅关系, 也就是说 上游 每发送一个事件 必须 等到 下游 接收处理完了以后才能接着发送下一个事件.

同步与异步的区别就在于有没有缓存发送事件的缓冲区。

异步情况

通过subscribeOnobserveOn来确定对应的线程,达到异步的效果,异步时会有一个对应的缓存区来换从从上游发送的事件。

  public enum BackpressureStrategy {
    /**
      * OnNext events are written without any buffering or dropping.
      * Downstream has to deal with any overflow.
      * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
      */
    MISSING,
    /**
      * Signals a MissingBackpressureException in case the downstream can't keep up.
      */
    ERROR,
    /**
      * Buffers <em>all</em> onNext values until the downstream consumes it.
      */
    BUFFER,
    /**
      * Drops the most recent onNext value if the downstream can't keep up.
      */
    DROP,
    /**
       * Keeps only the latest onNext value, overwriting any previous value if the
      * downstream can't keep up.
      */
      LATEST
  }

背压策略:

  1. error, 缓冲区大概在128
  2. buffer, 缓冲区在1000左右
  3. drop, 把存不下的事件丢弃
  4. latest, 只保留最新的
  5. missing, 缺省设置,不做任何操作

上游从哪里得知下游的处理能力呢?我们来看看上游最重要的部分,肯定就是 FlowableEmitter了啊,我们就是通过它来发送事件的啊,来看看它的源码吧(别紧张,它的代码灰常简单):

  public interface FlowableEmitter<T> extends Emitter<T> {
      void setDisposable(Disposable s);
      void setCancellable(Cancellable c);
      /**
        * The current outstanding request amount.
        * <p>This method is thread-safe.
        * @return the current outstanding request amount
        */
      long requested();
      boolean isCancelled();
      FlowableEmitter<T> serialize();
  }

FlowableEmitter是个接口,继承Emitter,Emitter里面就是我们的onNext(),onComplete()onError()三个方法。我们看到FlowableEmitter中有这么一个方法:

  long requested();

 


这张图的意思就是当上下游在同一个线程中的时候,在 下游 调用request(n)就会直接改变 上游 中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,继续发送事件便会抛异常了


可以看到,当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

 

Rxjava实例开发应用

  1. 网络请求处理(轮询,嵌套,出错重连)
  2. 功能防抖
  3. 从多级缓存获取数据
  4. 合并数据源
  5. 联合判断
  6. 与 Retrofit,RxBinding,EventBus结合使用

Rxjava原理

  1. Scheduler线程切换工作原理
  2. 数据的发送与接收(观察者模式)
  3. lift的工作原理
  4. map的工作原理
  5. flatMap的工作原理
  6. merge的工作原理
  7. concat的工作原理

顺手留下GitHub链接,需要获取相关面试等内容的可以自己去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

PDF和源码获取
分享到:
评论

相关推荐

    MVP+RxJava+Retrofit+Okhttp+ButterKnifer的快速开发框架

    "MVP+RxJava+Retrofit+OkHttp+ButterKnife"是一个流行的组合,它提供了强大的功能和灵活性,使得开发者能够快速构建复杂的Android应用程序。下面我们将逐一解析这些组件及其在开发中的作用。 **Model-View-...

    MVP+rxjava2+retrofit2+okhttp3 应用框架

    RxJava2引入了一些新特性,如错误处理的增强、背压策略的优化以及对Java 8的支持,使得在Android开发中进行复杂的并发和事件处理更为简单。 【Retrofit2】 Retrofit2是Square公司开发的一款网络请求库,它允许...

    Android-基于Retrofi2tRxJava2OkHttp3RxLifecycle3的网络请求框架

    **RxJava2** 是一个用于处理异步操作和事件流的库,它引入了Reactive Programming的概念。在Android中,它可以用来管理复杂的回调链,避免回调地狱。RxJava2提供了多种操作符,如map、filter、concatMap等,使你能够...

    老罗RxJava视频+源码百度云下载

    - **深入理解**:查看源码可以帮助学习者更深入地理解RxJava的工作原理。 - **调试与优化**:通过对源码的研究,开发者可以更好地调试自己的代码,并对其进行优化。 - **二次开发**:源码为二次开发提供了可能,使得...

    Android Glide和Rxjava源码分析视频

    #### 二、RxJava 原理概述 1. **观察者模式**:定义了一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并自动更新。 2. **事件流**:将一系列事件视为连续的数据流,可以对其...

    rxjava2 +retrofit +myrecyclerview

    `RxJava2`、`Retrofit` 和 `RecyclerView` 是三个非常流行的库,它们分别处理异步编程、网络请求和列表视图的管理。本项目结合这三者,旨在提供一个封装良好的网络框架,并集成下拉刷新和自动加载更多的功能。 1. *...

    Rxjava2:Rxjava2源码解析-源码解析

    2,Rxjava线程调度原理 3,observeOn与subscribeOn控制线程的执行顺序解析 目录 1,地图原始码解析 2,FlatMap源码解析 3,concatMap源码解析 4,concatMap为何在变换中有序输出 目录 1,zip原始码解析 目录 1,背压...

    Rxjava1.1.9 jar和sources源码

    而 `sources` 源码文件则提供了完整的源代码,对于开发者来说,能够查看源码有助于理解库的工作原理,以及如何自定义操作符或者进行调试。 RxJava 的核心特性包括: 1. **冷序列和热序列**:冷序列只在有订阅者时...

    RxJava2.0(初学者教程+实用操作符总结及原理简析)

    此外,RxJava2.0还引入了背压策略,以应对高频率事件产生的问题,确保下游Observer不会因事件过多而崩溃。 总之,RxJava2.0为开发者提供了一种强大且灵活的方式来处理异步事件和数据流。通过观察者模式和丰富的操作...

    WanAndroid:MVP RxJava2 + Retrofit2 + Dragger2 + autodispose + Glide应用框架

    同时,RxJava2的错误处理和背压机制能有效防止内存泄漏和性能问题。 3. Retrofit2: Retrofit是由Square公司开发的一个Android网络请求库,它通过注解将HTTP请求转换为Java接口。在WanAndroid中,Retrofit2用于...

    MVP+DataBinding+Rxjava+retrofit2 封装最流行的技术 极度减少代码重复

    ### MVP+DataBinding+RxJava+Retrofit2:封装最流行的技术 极度减少代码重复 在当前快速发展的移动开发领域中,为了提高开发效率、降低维护成本并增强应用程序的可扩展性,开发者们不断探索并尝试各种新技术与模式...

    retrofit2 + rxjava 2 + MVP

    在Android开发中,Retrofit2、RxJava2和MVP(Model-View-Presenter)是三个非常重要的组件,它们一起可以构建出高效、可维护的网络请求和业务逻辑处理框架。接下来,我们将深入探讨这三个技术及其结合使用的方式。 ...

    Apache Nifi 概念介绍、源码解析、开发指南(中文)

    Apache NiFi 概念介绍、源码解析、开发指南(中文) Apache NiFi 是一个开源的数据集成工具,由 Apache 软件基金会维护。NiFi 的核心设计理念是 Flow-Based Programming,它可以将数据从各种 sources 中提取,转换...

    source-rxjava3:rxjava3源码阅读

    源码阅读是理解其内在机制的关键,本文将针对RxJava3的源码进行深入探讨,揭示其核心概念与实现原理。 首先,我们关注“系统开源”这一标签。开源意味着开发者可以访问到完整的代码库,自由地学习、修改和分发。...

    Rxjava+Retrofit的简单使用Demo

    在Android开发中,RxJava和Retrofit是两个非常重要的库,它们分别用于处理异步操作和网络请求。RxJava是一个响应式编程库,它引入了观察者模式的概念,使我们能够更优雅地处理事件和数据流。而Retrofit则是一个简洁...

    AndroidProject,RXJava+改装.zip

    - **异步操作**:RXJava的subscribeOn()和observeOn()操作符可以轻松地处理线程切换,实现UI线程与后台线程的分离,避免主线程阻塞。 - **数据绑定**:结合Data Binding库,可以实现UI组件与数据模型的自动同步,...

    MVP+Rxjava+Retrofit

    在Android应用开发中,"MVP+RxJava+Retrofit"是一种常见的高效、灵活的网络框架组合,用于处理复杂的业务逻辑和数据管理。这个框架模式的精髓在于将职责分离,提高代码可读性和可测试性。 **MVP(Model-View-...

    【0积分下载】RxJava 3.x 最新版

    RxJava:响应式编程的现代方式 在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式...5. **背压管理**:在数据生产速度超过消费速度时,RxJava可以帮助管理背压,防止系统过载。

    rxjava2-http:通过无阻塞背压通过HTTP传输RxJava2 Flowable

    使用Servlet 3.1+异步处理(默认情况下) 入门 Maven依赖 &lt; groupId&gt;com.github.davidmoten&lt;/ groupId&gt; &lt; artifactId&gt;rxjava2-http&lt;/ artifactId&gt; &lt; version&gt;VERSION_HERE 创建servlet 下面

    Retrofit2.0的学习以及Rxjava与Retrofit2的结合使用

    而RxJava则是一个强大的响应式编程库,它通过观察者模式来处理异步数据流,提供了丰富的操作符来组合和处理数据流。当Retrofit2与RxJava结合使用时,可以构建出更高效、更灵活的网络请求解决方案。 首先,我们来...

Global site tag (gtag.js) - Google Analytics