在Eclipse下测试Rxjava中的操作符interval()时出现了很奇怪的问题,怎么试都不能执行。
代码如下:
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onError(Throwable arg0) {
}
@Override
public void onNext(Long arg0) {
System.out.println("onNext " + arg0);
}
});
这段代码在Eclipse下面运行没有达到我们预期的每隔一秒输出一个数字的结果。
怎么解决呢?
去官网找了下原因,也找到了解决方案:
https://github.com/ReactiveX/RxJava/issues/3932
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onError(Throwable arg0) {
}
@Override
public void onNext(Long arg0) {
System.out.println("onNext " + arg0);
}
});
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
这样就能输出结果了如下所示:
onNext 0
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
......
让程序沉睡一段时间似乎也不是很完美的解决方案,继续找就找到了如下的解决办法(https://github.com/ReactiveX/RxJava/issues/4132)。
具体原因如下:
When you use the default scheduler (Schedulers.computation()) the observable emits on another thread. If your program exits just after the subscribe then the observable is not given a chance to run. Put in a long sleep just after the subscribe() call and you will see it working.
最终代码如下:
Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onError(Throwable arg0) {
}
@Override
public void onNext(Long arg0) {
System.out.println("onNext " + arg0);
}
});
需要注意的是,上面代码中的 Schedulers.trampoline()替换为Schedulers.immediate()也是可以运行的,但是这样做是不安全的。
The immediate() scheduler is not safe for recursive scheduling on the current thread, use trampoline() instead.
Schedulers.computation() 与 Schedulers.trampoline()区别
首先我们看看源码:
/**
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
*
* @return a {@link Scheduler} that executes work immediately
*/
public static Scheduler immediate() {
return rx.internal.schedulers.ImmediateScheduler.INSTANCE;
}
/**
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
* current work completes.
*
* @return a {@link Scheduler} that queues work on the current thread
*/
public static Scheduler trampoline() {
return rx.internal.schedulers.TrampolineScheduler.INSTANCE;
}
从上面源码可以看出,immediate()方法返回的是ImmediateScheduler类的实例,而trampoline()方法返回的是TrampolineScheduler类的实例。
ImmediateScheduler类的部分源码:
/**
* Executes work immediately on the current thread.
*/
public final class ImmediateScheduler extends Scheduler {
public static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
private ImmediateScheduler() {
}
从ImmediateScheduler类的官网介绍中可以看出ImmediateScheduler执行在当前main线程。
TrampolineScheduler类的部分源码:
/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
* after the current unit of work is completed.
*/
public final class TrampolineScheduler extends Scheduler {
public static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
@Override
public Worker createWorker() {
return new InnerCurrentThreadScheduler();
}
从TrampolineScheduler类的官网介绍中可以看出TrampolineScheduler不会立即执行,当其他排队任务介绍时才执行,当然TrampolineScheduler运行在当前main线程。
了解调度器
调度器的种类
下表展示了RxJava中可用的调度器种类:
调度器类型
效果
Schedulers.computation( ) |
用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) |
使用指定的Executor作为调度器 |
Schedulers.immediate( ) |
在当前线程立即开始执行任务 |
Schedulers.io( ) |
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) |
为每个任务创建一个新线程 |
Schedulers.trampoline( ) |
当其它排队的任务完成后,在当前线程排队开始执行 |
默认调度器
在RxJava中,某些Observable操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:
操作符
调度器
buffer(timespan) |
computation |
buffer(timespan, count) |
computation |
buffer(timespan, timeshift) |
computation |
debounce(timeout, unit) |
computation |
delay(delay, unit) |
computation |
delaySubscription(delay, unit) |
computation |
interval |
computation |
repeat |
trampoline |
replay(time, unit) |
computation |
replay(buffersize, time, unit) |
computation |
replay(selector, time, unit) |
computation |
replay(selector, buffersize, time, unit) |
computation |
retry |
trampoline |
sample(period, unit) |
computation |
skip(time, unit) |
computation |
skipLast(time, unit) |
computation |
take(time, unit) |
computation |
takeLast(time, unit) |
computation |
takeLast(count, time, unit) |
computation |
takeLastBuffer(time, unit) |
computation |
takeLastBuffer(count, time, unit) |
computation |
throttleFirst |
computation |
throttleLast |
computation |
throttleWithTimeout |
computation |
timeInterval |
immediate |
timeout(timeoutSelector) |
immediate |
timeout(firstTimeoutSelector, timeoutSelector) |
immediate |
timeout(timeoutSelector, other) |
immediate |
timeout(timeout, timeUnit) |
computation |
timeout(firstTimeoutSelector, timeoutSelector, other) |
immediate |
timeout(timeout, timeUnit, other) |
computation |
timer |
computation |
timestamp |
immediate |
window(timespan) |
computation |
window(timespan, count) |
computation |
window(timespan, timeshift) |
computation |
了解更多Scheduler使用请参考:
https://github.com/mcxiaoke/RxDocs/blob/master/Scheduler.md
<script type="text/javascript">
$(function () {
$('pre.prettyprint code').each(function () {
var lines = $(this).text().split('\n').length;
var $numbering = $('<ul/>').addClass('pre-numbering').hide();
$(this).addClass('has-numbering').parent().append($numbering);
for (i = 1; i <= lines; i++) {
$numbering.append($('<li/>').text(i));
};
$numbering.fadeIn(1700);
});
});
</script>
分享到:
相关推荐
Interval操作符:用于创建Observable,跟TimerTask类似,用于周期性发送信息,是一个可以指定线程的TimerTask 首先添加类库 // RxAndroid compile 'io.reactivex:rxandroid:1.2.1' // RxJava compile 'io.reactivex...
.concatWith(Observable.interval(1, TimeUnit.SECONDS).take(5)) // 倒计时5秒 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Long aLong) throws ...
Observable.interval(0, 1, TimeUnit.SECONDS) // 开始立即并每隔1秒发射一次 .take(count_time + 1) // 设置总共发射的次数 .map(new Function, Long>() { @Override public Long apply(Long aLong) throws ...
在interval()方法中,我们使用了Observable.interval()方法来实现每隔若干毫秒后执行next操作。我们首先创建了一个Observable对象,使用interval()方法指定了执行next操作的时间间隔,然后使用observeOn()方法指定了...
例如,`just`用于创建一个发出单个值的`Observable`,`from`用于从数组、集合等创建`Observable`,`interval`则可以创建每隔一定时间发出值的`Observable`。 2. 发射事件:`Observable`可以发射三种基本事件:`....
Timer 操作符的_interval_用法 除了延迟执行某段逻辑外,Timer 操作符还可以用来实现间隔执行某段逻辑。例如,要间隔 1 秒执行某段逻辑,可以使用以下代码: ```java private Subscription subscribe; private ...
Observable.interval(2, TimeUnit.SECONDS), (a, b) -> a + " " + b) .take(5) .subscribe(System.out::println); ``` #### 四、实战应用案例 **4.1 实现网络请求** RxJava常用于实现网络请求,它可以简化...
- **作用**:Observable负责生成数据或事件,并将它们发送给Observer。 - **创建**:可以通过多种方式创建Observable,例如使用静态工厂方法`Observable.create()`。 ##### 2. **Observer(观察者)** - **定义**...
通过调用 Observable 的 subscribe 方法,观察者会与 Observable 的执行作用域绑定,并开始接收数据。subscribe 方法返回一个 Subscription 对象,该对象有一个 unsubscribe() 方法,当不再需要数据时,可以调用此...
4. **Schedulers.trampoline()**:立即执行任务,不排队。 通过使用 `subscribeOn()` 和 `observeOn()` 方法,我们可以轻松地控制任务何时何地执行。 #### RESTful API 与 RxJava 在实际应用中,RxJava 经常与 ...
这些Observable可以是简单的单值、数组、定时器(interval)或高阶Observable(由一个Observable的Observable组成,即流中每个元素本身也是一个流)。在组件销毁时,Vue-Rx会自动处理这些订阅的取消,保证了内存的...
可观察到的 ... 原料药 litObservable... interval ( 1000 ) ; const render = ( { count , isEven } ) => html ` < p> Count: ${ count } ( ${ isEven ? 'even' : 'odd' } ) ` ; const CountUpElement = litObs
例如,我们可以使用`Observable.interval()`或`Observable.timer()`来定时触发动画事件。下面是一个简单的使用RxJava实现补间动画的例子: ```java import io.reactivex.Observable; import io.reactivex....
2. 创建一个Observable,使用`Observable.interval()`方法来定时发布事件。例如,每隔2秒钟发布一次事件,表示切换到下一张图片。 3. 使用`.map()`操作符将时间间隔转换为实际的图片加载请求。你可以使用Picasso或...
本文实例为大家分享了Android实现轮询的方式,供大家参考,具体内容如下 1.通过rxjava实现(代码中使用了Lambda表达式) ... mDisposable = Observable.interval(DELAY, PERIOD, TimeUnit.MILLISECOND
- `interval`: 生成一个周期性发出值的序列。 - `merge`: 合并多个可观察序列。 ### 3. 转换操作符 转换操作符允许我们将数据从一种形式转换为另一种形式。例如: - `map`: 将每个元素应用一个函数并返回新的可...
- **定时任务**:使用Observable.interval()或Observable.timer()实现定时任务。 - **事件处理**:处理点击事件、滑动事件等UI事件,以响应式的方式响应用户交互。 - **线程调度**:使用Schedulers.io()和...
这可以通过`Observable.interval()`或`Observable.timer()`方法来实现。例如: ```java Observable<Long> timerObservable = Observable.timer(3, TimeUnit.SECONDS); ``` 这里的代码表示每隔3秒发出一个信号。 ...
在某些场景下,我们需要在特定时间后执行某个操作,`Observable.timer()`可以很好地解决这个问题: ```java Observable.timer(2, TimeUnit.SECONDS) .subscribe(call -> { // 延迟2秒后的操作 }); ``` 这里我们...