最近看到RxJava技术,好奇就网上找了些关于RxJava的博文,但绝大部分文章都讲了其使用,可我对最基本的东西——概念还是一脸懵逼。
这些文章看后,大概我知道两个重点,一是类似于观察者模式,二是任务执行与通知可以设置不同的线程。具体这个框架源码怎么做的就不知道了。看源码太累了,很多有名的产品其实最核心的原理很简单,demo也很好实现。比如dubbo,就是客户端把调用方法与参数发过去,服务端根据invocation找到对应的service并执行,再把结果从远程发过来;又比如druid,就是把sql包中的一切都适配一下,中间插入我要的filterChain,比如统计的filter。所以我就想先按自己的理解,写几行代码,做一个小Demo实现两个重点,请大家看看是不是这样的原理。
核心业务:我作为观察者,注册到你这里,你就开始处理(我是参数)并把结果通知我,我接着处理。当然你处理和我处理都可以封成runnable扔到不同的线程池中。
操作符:我作为观察者,操作符会造成你被一个新产生的中间人代理了。这时我注册到中间人,中间人开始处理(我是参数)并产生一个中间观察者注册到你这里,你处理后,结果给中间观察者处理,中间观察者处理后,再给我(参数)处理。
话不多说,花一小会时间写的代码(不包含RxJava的操作符):
package com.rxjava.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
/**
* 学习rxjava的原理
*
* 【思考】: 如果作为框架,call函数功能也应该从外部提供,用内部类?用abstract方法?
* 如果call可给其它线程执行用哪个方案?用内部类?
*
* @author ACER
* @date 2019年11月24日
*/
public class RxjavaTest {
// 配置用其它线程执行时,订阅者的操作在这里执行
private static ExecutorService ioExecutor;
private static final AtomicLong threadIndex = new AtomicLong(0);// 线程计数
private static int cupNum = Runtime.getRuntime().availableProcessors();// cpu数
static {
System.out.println("cpu:" + cupNum);
ioExecutor = new ThreadPoolExecutor(cupNum, cupNum, 1000 * 60,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "io_thread_"
+ threadIndex.incrementAndGet());
return thread;
}
});
}
public static void main(String[] args) {
RxjavaTest test = new RxjavaTest();
test.regListen(new Listenner() {
@Override
public void OnNext(int plusValue) {
System.out.println("copy that!!!->" + plusValue);
}
});
System.out.println("---------------【修改为不同的线程执行】------------------");
test.useSameThread = false;
test.regListen(new Listenner() {
@Override
public void OnNext(int plusValue) {
System.out.println("copy that in anather thread!!!->"
+ plusValue);
}
});
}
public boolean useSameThread = true;// 是否用同一线程
public Listenner _listenner;// 注册进来的订阅者
// 注册操作。同时就调用执行相关服务,执行过程中会通知订阅者。
void regListen(Listenner li) {
_listenner = li;
// 有人注册了,开始工作
if (_listenner != null) {
call(_listenner);
}
}
// 真正的服务过程(其实也可以给其它线程去运行)
public void call(Listenner listenner) {
int a = 4;
int b = 6;
if (useSameThread) {
listenner.OnNext(a + b);
b = 45;
listenner.OnNext(a + b);
} else {
// 后面扔到线程池执行
final int result = a + b + 100;
ioExecutor.submit(new listenRun(listenner, result));
final int result2 = a + b + 1000;
ioExecutor.submit(new listenRun(listenner, result2));
}
}
//配置其它线程时:订阅者执行用其它线程,产生的Runnable对象。
public class listenRun implements Runnable {
public Listenner lisInner;
private int resultInner;
public listenRun(Listenner a, int b) {
lisInner = a;
resultInner = b;
}
@Override
public void run() {
System.out
.println("ThreadName:" + Thread.currentThread().getName());
lisInner.OnNext(resultInner);
}
}
// 订阅者应实现的接口
public static interface Listenner {
void OnNext(int plusValue);
}
}
这是执行结果:
cpu:4
copy that!!!->10
copy that!!!->49
---------------【修改为不同的线程执行】------------------
ThreadName:io_thread_1
ThreadName:io_thread_2
copy that in anather thread!!!->110
copy that in anather thread!!!->1010
当然上面的代码如果把call过程也配置一下,放入另外的线程池就更好了。最后再改造成一个框架,从外部传入call的操作就更完善了。
欢迎大家指点一下,谢谢!
注:刚看到这个文章,写的不错,推荐。https://blog.csdn.net/TellH/article/details/71534704
分享到:
相关推荐
在深入理解RxJava的原理之前,我们需要先了解两个核心概念:`Observable`(可观察者)和`Observer`(观察者)。 1. `Observable`(可观察者) `Observable`是RxJava中的关键组件,它代表了一个可以发出数据的源。它...
#### 二、RxJava的核心概念 - **Observables(可观察者)**:表示一个或多个值的序列,或者是错误通知。 - **Observers(观察者)**:是接收来自Observable的数据的对象。 - **Operators(操作符)**:用于处理数据...
RxJava的核心概念是Observable(可观察者)和Observer(观察者)。Observable能够发出数据流,而Observer则订阅这些数据流并处理它们。这种设计模式使得开发者能够处理异步事件和复杂的并发情况,同时保持代码的简洁...
RxJava的核心概念包括Observable(可观测序列)、Observer(观察者)和Operators(操作符)。Observable能够发布数据流,而Observer则订阅这些数据流并处理发布的数据。Operators是一系列可以对数据流进行转换、过滤...
RxJava的消息发送和线程切换实现原理 RxJava是一款功能强大且广泛应用的响应式扩展库,它提供了一个简洁的方式来处理异步编程和事件驱动编程。RxJava的核心概念是Observable和Observer,前者是被观察者,后者是观察...
RxJava的核心特性在于它提供了一套丰富的操作符,这些操作符允许开发人员以声明式的方式组合异步任务,从而避免了回调地狱的问题。例如,`concatMap`、`flatMap`等操作符可以将多个异步任务串联起来,形成一个连续的...
其核心概念包括`Observable`(可观察者)、`Observer`(观察者)、各种操作符(如map、filter、concatMap等)以及线程控制策略。 **Retrofit** Retrofit是Square公司开发的一个针对网络请求的Java库,它允许开发者通过...
总的来说,《RxJava Essentials》是一本全面的教程,可以帮助读者掌握 RxJava 的基本原理和实用技巧,从而更好地利用响应式编程提升开发效率和代码质量。通过深入学习这本书,你将能够熟练地运用 RxJava 解决实际...
- **HyObservable 类的设计思路**: 上文提到的自定义`HyObservable`类提供了一个简化的模型来模拟RxJava的工作原理。通过这种方式,我们可以更好地理解RxJava如何通过创建`Observable`、`Observer`和`subscribe`方法...
CallAdapter是Retrofit的核心组件之一,它负责将Retrofit的Call对象转换为观察者可以订阅的类型,例如Observable或Flowable,这样我们就可以利用RxJava的特性进行链式操作和错误处理。 适配RxJava3的CallAdapter...
通过这些实践,读者不仅可以加深对RxJava原理的理解,还能学会如何有效地利用RxJava解决实际问题。 #### 并发控制 RxJava支持多线程执行,这对于处理高并发场景非常有用。通过使用不同的调度器(如`Schedulers.io()...
首先,RxJava的核心概念是 Observable(可观察者) 和 Observer(观察者)。Observable 是数据的生产者,Observer 是数据的消费者。在Android应用中,这通常用于处理用户交互、网络请求或后台任务等异步事件。 描述...
本文将深入探讨RxJava 2到3的主要变化、适配器的工作原理以及如何在项目中应用这个适配器。 首先,让我们看看RXJava 2和RXJava 3之间的关键差异。RXJava 3主要引入了对Java 8及更高版本的支持,同时也进行了一些API...
RxJava的核心组件包括Observable、Observer、Subscription和Operator。Observable是数据的生产者,它可以发布一系列的事件或数据;Observer是数据的消费者,它订阅Observable并接收发布的数据;Subscription是订阅...
这里我们将详细探讨无损压缩的原理、RXJava的优势以及如何将两者结合。 无损压缩是一种数据压缩方法,它在压缩和解压缩过程中不会丢失任何原始信息。相比于有损压缩,无损压缩适合那些需要保持原始数据完整性的场景...
**RxJava的原理与应用** 1. **Observables(被观察者)**:RxJava的核心是Observable,它能够生成、转换或组合数据流。在Retrofit中,通过`Call.execute().toObservable()`可以将Retrofit的网络请求转化为...
通过研究这个项目,我们可以深入理解RxJava在Groovy环境下的工作原理,学习如何优雅地处理异步流程,以及如何利用Groovy的特性来优化代码。这对于提升我们的并发编程能力和理解反应式编程模型都大有裨益。对于想要在...
对于初学者来说,这样的实践和注解是非常有价值的,它们可以帮助你从理论到实践,深入理解RxJava的工作原理。 总的来说,这个"rxjava学习代码"压缩包是你提升 RxJava 技能的理想起点。通过阅读和运行这些代码,你...
RxJava的核心概念是Observable(可观察者)和Observer(观察者)。Observable负责发出数据,Observer负责订阅和接收这些数据。通过一系列的操作符(如map、filter、concatMap等),我们可以对数据流进行转换、过滤和...