`
dawuafang
  • 浏览: 1192153 次
文章分类
社区版块
存档分类
最新评论

Observable.interval()不起作用的解决办法

 
阅读更多

在Eclipse下测试Rxjava中的操作符interval()时出现了很奇怪的问题,怎么试都不能执行。

代码如下:

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                System.out.println("onCompleted ");
            }

            @Override
            public void onError(Throwable arg0) {
                // TODO Auto-generated method stub

            }

            @Override
            public void onNext(Long arg0) {
                // TODO Auto-generated method stub
                System.out.println("onNext " + arg0);
            }
        });

这段代码在Eclipse下面运行没有达到我们预期的每隔一秒输出一个数字的结果。

怎么解决呢?

去官网找了下原因,也找到了解决方案:
https://github.com/ReactiveX/RxJava/issues/3932

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                System.out.println("onCompleted ");
            }

            @Override
            public void onError(Throwable arg0) {
                // TODO Auto-generated method stub

            }

            @Override
            public void onNext(Long arg0) {
                // TODO Auto-generated method stub
                System.out.println("onNext " + arg0);
            }
        });

try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

这样就能输出结果了如下所示:

onNext 0
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6

......

让程序沉睡一段时间似乎也不是很完美的解决方案,继续找就找到了如下的解决办法(https://github.com/ReactiveX/RxJava/issues/4132)。

具体原因如下:

When you use the default scheduler (Schedulers.computation()) the observable emits on another thread. If your program exits just after the subscribe then the observable is not given a chance to run. Put in a long sleep just after the subscribe() call and you will see it working.

最终代码如下:

Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
        .subscribe(new Subscriber<Long>() {

            @Override
            public void onCompleted() {
                // TODO Auto-generated method stub
                System.out.println("onCompleted ");
            }

            @Override
            public void onError(Throwable arg0) {
                // TODO Auto-generated method stub

            }

            @Override
            public void onNext(Long arg0) {
                // TODO Auto-generated method stub
                System.out.println("onNext " + arg0);
            }
        });

需要注意的是,上面代码中的 Schedulers.trampoline()替换为Schedulers.immediate()也是可以运行的,但是这样做是不安全的。

The immediate() scheduler is not safe for recursive scheduling on the current thread, use trampoline() instead.

Schedulers.computation() 与 Schedulers.trampoline()区别

首先我们看看源码:

/**
     * Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
     *
     * @return a {@link Scheduler} that executes work immediately
     */
    public static Scheduler immediate() {
        return rx.internal.schedulers.ImmediateScheduler.INSTANCE;
    }

    /**
     * Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
     * current work completes.
     *
     * @return a {@link Scheduler} that queues work on the current thread
     */
    public static Scheduler trampoline() {
        return rx.internal.schedulers.TrampolineScheduler.INSTANCE;
    }

从上面源码可以看出,immediate()方法返回的是ImmediateScheduler类的实例,而trampoline()方法返回的是TrampolineScheduler类的实例。

ImmediateScheduler类的部分源码:


/**
 * Executes work immediately on the current thread.
 */
public final class ImmediateScheduler extends Scheduler {
    public static final ImmediateScheduler INSTANCE = new ImmediateScheduler();

    private ImmediateScheduler() {
        // the class is singleton
    }

从ImmediateScheduler类的官网介绍中可以看出ImmediateScheduler执行在当前main线程。

TrampolineScheduler类的部分源码:

/**
 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
 * after the current unit of work is completed.
 */
public final class TrampolineScheduler extends Scheduler {
    public static final TrampolineScheduler INSTANCE = new TrampolineScheduler();

    @Override
    public Worker createWorker() {
        return new InnerCurrentThreadScheduler();
    }

从TrampolineScheduler类的官网介绍中可以看出TrampolineScheduler不会立即执行,当其他排队任务介绍时才执行,当然TrampolineScheduler运行在当前main线程。

了解调度器

调度器的种类

下表展示了RxJava中可用的调度器种类:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行

默认调度器

在RxJava中,某些Observable操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:

操作符 调度器
buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan, timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
repeat trampoline
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retry trampoline
sample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) computation
takeLastBuffer(time, unit) computation
takeLastBuffer(count, time, unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector, timeoutSelector) immediate
timeout(timeoutSelector, other) immediate
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) immediate
timeout(timeout, timeUnit, other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

了解更多Scheduler使用请参考:
https://github.com/mcxiaoke/RxDocs/blob/master/Scheduler.md

<script type="text/javascript"> $(function () { $('pre.prettyprint code').each(function () { var lines = $(this).text().split('\n').length; var $numbering = $('<ul/>').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('<li/>').text(i)); }; $numbering.fadeIn(1700); }); }); </script>
分享到:
评论

相关推荐

    Android RxJava创建操作符Interval

    Interval操作符:用于创建Observable,跟TimerTask类似,用于周期性发送信息,是一个可以指定线程的TimerTask 首先添加类库 // RxAndroid compile 'io.reactivex:rxandroid:1.2.1' // RxJava compile 'io.reactivex...

    RxJava2实现倒计时

    .concatWith(Observable.interval(1, TimeUnit.SECONDS).take(5)) // 倒计时5秒 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws ...

    轻松实现Rxjava定时器功能

    Observable.interval(0, 1, TimeUnit.SECONDS) // 开始立即并每隔1秒发射一次 .take(count_time + 1) // 设置总共发射的次数 .map(new Function, Long&gt;() { @Override public Long apply(Long aLong) throws ...

    基于Rxjava实现轮询定时器

    在interval()方法中,我们使用了Observable.interval()方法来实现每隔若干毫秒后执行next操作。我们首先创建了一个Observable对象,使用interval()方法指定了执行next操作的时间间隔,然后使用observeOn()方法指定了...

    iOS-RxSwift-Observable的使用-Demo

    例如,`just`用于创建一个发出单个值的`Observable`,`from`用于从数组、集合等创建`Observable`,`interval`则可以创建每隔一定时间发出值的`Observable`。 2. 发射事件:`Observable`可以发射三种基本事件:`....

    Android RxJava创建操作符Timer的方法

    Timer 操作符的_interval_用法 除了延迟执行某段逻辑外,Timer 操作符还可以用来实现间隔执行某段逻辑。例如,要间隔 1 秒执行某段逻辑,可以使用以下代码: ```java private Subscription subscribe; private ...

    老罗Rxjava安卓视频教程

    Observable.interval(2, TimeUnit.SECONDS), (a, b) -&gt; a + " " + b) .take(5) .subscribe(System.out::println); ``` #### 四、实战应用案例 **4.1 实现网络请求** RxJava常用于实现网络请求,它可以简化...

    Reactive Programming on Android with RxJava

    - **作用**:Observable负责生成数据或事件,并将它们发送给Observer。 - **创建**:可以通过多种方式创建Observable,例如使用静态工厂方法`Observable.create()`。 ##### 2. **Observer(观察者)** - **定义**...

    angular 用Observable实现异步调用的方法

    通过调用 Observable 的 subscribe 方法,观察者会与 Observable 的执行作用域绑定,并开始接收数据。subscribe 方法返回一个 Subscription 对象,该对象有一个 unsubscribe() 方法,当不再需要数据时,可以调用此...

    RxJavaEssentials

    4. **Schedulers.trampoline()**:立即执行任务,不排队。 通过使用 `subscribeOn()` 和 `observeOn()` 方法,我们可以轻松地控制任务何时何地执行。 #### RESTful API 与 RxJava 在实际应用中,RxJava 经常与 ...

    探秘vue-rx 2.0(推荐)

    这些Observable可以是简单的单值、数组、定时器(interval)或高阶Observable(由一个Observable的Observable组成,即流中每个元素本身也是一个流)。在组件销毁时,Vue-Rx会自动处理这些订阅的取消,保证了内存的...

    lit-observable:从lit-html模板和RxJS observables创建自定义元素

    可观察到的 ... 原料药 litObservable... interval ( 1000 ) ; const render = ( { count , isEven } ) =&gt; html ` &lt; p&gt; Count: ${ count } ( ${ isEven ? 'even' : 'odd' } ) ` ; const CountUpElement = litObs

    Android-使用Rx在Android上制作视图动画的简单方法

    例如,我们可以使用`Observable.interval()`或`Observable.timer()`来定时触发动画事件。下面是一个简单的使用RxJava实现补间动画的例子: ```java import io.reactivex.Observable; import io.reactivex....

    android基于rxjava的图片轮播实现,十几行代码搞定

    2. 创建一个Observable,使用`Observable.interval()`方法来定时发布事件。例如,每隔2秒钟发布一次事件,表示切换到下一张图片。 3. 使用`.map()`操作符将时间间隔转换为实际的图片加载请求。你可以使用Picasso或...

    Android实现轮询的三种方式

    本文实例为大家分享了Android实现轮询的方式,供大家参考,具体内容如下 1.通过rxjava实现(代码中使用了Lambda表达式) ... mDisposable = Observable.interval(DELAY, PERIOD, TimeUnit.MILLISECOND

    rxdart使用要义

    - `interval`: 生成一个周期性发出值的序列。 - `merge`: 合并多个可观察序列。 ### 3. 转换操作符 转换操作符允许我们将数据从一种形式转换为另一种形式。例如: - `map`: 将每个元素应用一个函数并返回新的可...

    Rx使用示例(RxJava&RxAndroid)

    - **定时任务**:使用Observable.interval()或Observable.timer()实现定时任务。 - **事件处理**:处理点击事件、滑动事件等UI事件,以响应式的方式响应用户交互。 - **线程调度**:使用Schedulers.io()和...

    Android开发丶使用RxJava来完成自动轮播图

    这可以通过`Observable.interval()`或`Observable.timer()`方法来实现。例如: ```java Observable&lt;Long&gt; timerObservable = Observable.timer(3, TimeUnit.SECONDS); ``` 这里的代码表示每隔3秒发出一个信号。 ...

    关于RxJava的一些特殊用法小结

    在某些场景下,我们需要在特定时间后执行某个操作,`Observable.timer()`可以很好地解决这个问题: ```java Observable.timer(2, TimeUnit.SECONDS) .subscribe(call -&gt; { // 延迟2秒后的操作 }); ``` 这里我们...

Global site tag (gtag.js) - Google Analytics