`
herman_liu76
  • 浏览: 99661 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RxJava的核心原理是不是这样的?

阅读更多
        最近看到RxJava技术,好奇就网上找了些关于RxJava的博文,但绝大部分文章都讲了其使用,可我对最基本的东西——概念还是一脸懵逼。

        这些文章看后,大概我知道两个重点,一是类似于观察者模式,二是任务执行与通知可以设置不同的线程。具体这个框架源码怎么做的就不知道了。看源码太累了,很多有名的产品其实最核心的原理很简单,demo也很好实现。比如dubbo,就是客户端把调用方法与参数发过去,服务端根据invocation找到对应的service并执行,再把结果从远程发过来;又比如druid,就是把sql包中的一切都适配一下,中间插入我要的filterChain,比如统计的filter。所以我就想先按自己的理解,写几行代码,做一个小Demo实现两个重点,请大家看看是不是这样的原理。

        核心业务:我作为观察者,注册到你这里,你就开始处理(我是参数)并把结果通知我,我接着处理。当然你处理和我处理都可以封成runnable扔到不同的线程池中。

        操作符:我作为观察者,操作符会造成你被一个新产生的中间人代理了。这时我注册到中间人,中间人开始处理(我是参数)并产生一个中间观察者注册到你这里,你处理后,结果给中间观察者处理,中间观察者处理后,再给我(参数)处理。

        话不多说,花一小会时间写的代码(不包含RxJava的操作符):

package com.rxjava.test;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
 
/**
 * 学习rxjava的原理
 * 
 * 【思考】: 如果作为框架,call函数功能也应该从外部提供,用内部类?用abstract方法?
 * 如果call可给其它线程执行用哪个方案?用内部类?
 * 
 * @author ACER
 * @date 2019年11月24日
 */
public class RxjavaTest {
	// 配置用其它线程执行时,订阅者的操作在这里执行
	private static ExecutorService ioExecutor;
	private static final AtomicLong threadIndex = new AtomicLong(0);// 线程计数
	private static int cupNum = Runtime.getRuntime().availableProcessors();// cpu数
 
	static {
		System.out.println("cpu:" + cupNum);
		ioExecutor = new ThreadPoolExecutor(cupNum, cupNum, 1000 * 60,
				TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread thread = new Thread(r, "io_thread_"
								+ threadIndex.incrementAndGet());
						return thread;
					}
				});
	}
 
	public static void main(String[] args) {
		RxjavaTest test = new RxjavaTest();
		test.regListen(new Listenner() {
			@Override
			public void OnNext(int plusValue) {
				System.out.println("copy that!!!->" + plusValue);
			}
		});
		System.out.println("---------------【修改为不同的线程执行】------------------");
		test.useSameThread = false;
		test.regListen(new Listenner() {
			@Override
			public void OnNext(int plusValue) {
				System.out.println("copy that in anather thread!!!->"
						+ plusValue);
			}
		});
	}
 
	public boolean useSameThread = true;// 是否用同一线程
	public Listenner _listenner;// 注册进来的订阅者
 
	// 注册操作。同时就调用执行相关服务,执行过程中会通知订阅者。
	void regListen(Listenner li) {
		_listenner = li;
		// 有人注册了,开始工作
		if (_listenner != null) {
			call(_listenner);
		}
	}
 
	// 真正的服务过程(其实也可以给其它线程去运行)
	public void call(Listenner listenner) {
		int a = 4;
		int b = 6;
		if (useSameThread) {
			listenner.OnNext(a + b);
			b = 45;
			listenner.OnNext(a + b);
		} else {
			// 后面扔到线程池执行
			final int result = a + b + 100;
			ioExecutor.submit(new listenRun(listenner, result));
			final int result2 = a + b + 1000;
			ioExecutor.submit(new listenRun(listenner, result2));
		}
	}
 
	//配置其它线程时:订阅者执行用其它线程,产生的Runnable对象。
	public class listenRun implements Runnable {
		public Listenner lisInner;
		private int resultInner;
 
		public listenRun(Listenner a, int b) {
			lisInner = a;
			resultInner = b;
		}
 
		@Override
		public void run() {
			System.out
					.println("ThreadName:" + Thread.currentThread().getName());
			lisInner.OnNext(resultInner);
		}
	}
 
	// 订阅者应实现的接口
	public static interface Listenner {
		void OnNext(int plusValue);
	}
}

       这是执行结果:

cpu:4
copy that!!!->10
copy that!!!->49
---------------【修改为不同的线程执行】------------------
ThreadName:io_thread_1
ThreadName:io_thread_2
copy that in anather thread!!!->110
copy that in anather thread!!!->1010


        当然上面的代码如果把call过程也配置一下,放入另外的线程池就更好了。最后再改造成一个框架,从外部传入call的操作就更完善了。

        欢迎大家指点一下,谢谢!

注:刚看到这个文章,写的不错,推荐。https://blog.csdn.net/TellH/article/details/71534704
0
1
分享到:
评论

相关推荐

    L18- RxJava 的原理完全解析-讲义.pdf

    在深入理解RxJava的原理之前,我们需要先了解两个核心概念:`Observable`(可观察者)和`Observer`(观察者)。 1. `Observable`(可观察者) `Observable`是RxJava中的关键组件,它代表了一个可以发出数据的源。它...

    Rxjava Android 开发视频

    #### 二、RxJava的核心概念 - **Observables(可观察者)**:表示一个或多个值的序列,或者是错误通知。 - **Observers(观察者)**:是接收来自Observable的数据的对象。 - **Operators(操作符)**:用于处理数据...

    超详细的Rxjava文档(包括Rxjava1,Rxjava2,中文官方文档,实战实例,博客详文等)

    RxJava的核心概念是Observable(可观察者)和Observer(观察者)。Observable能够发出数据流,而Observer则订阅这些数据流并处理它们。这种设计模式使得开发者能够处理异步事件和复杂的并发情况,同时保持代码的简洁...

    Rxjava的Mvp架构附带Rxjava中文文档

    RxJava的核心概念包括Observable(可观测序列)、Observer(观察者)和Operators(操作符)。Observable能够发布数据流,而Observer则订阅这些数据流并处理发布的数据。Operators是一系列可以对数据流进行转换、过滤...

    RxJava的消息发送和线程切换实现原理

    RxJava的消息发送和线程切换实现原理 RxJava是一款功能强大且广泛应用的响应式扩展库,它提供了一个简洁的方式来处理异步编程和事件驱动编程。RxJava的核心概念是Observable和Observer,前者是被观察者,后者是观察...

    RxJava响应式编程原理

    RxJava的核心特性在于它提供了一套丰富的操作符,这些操作符允许开发人员以声明式的方式组合异步任务,从而避免了回调地狱的问题。例如,`concatMap`、`flatMap`等操作符可以将多个异步任务串联起来,形成一个连续的...

    RxJava+Retrofit简单Demo

    其核心概念包括`Observable`(可观察者)、`Observer`(观察者)、各种操作符(如map、filter、concatMap等)以及线程控制策略。 **Retrofit** Retrofit是Square公司开发的一个针对网络请求的Java库,它允许开发者通过...

    RxJava入门学习资料-RxJava Essentials(英/中文版合集)

    总的来说,《RxJava Essentials》是一本全面的教程,可以帮助读者掌握 RxJava 的基本原理和实用技巧,从而更好地利用响应式编程提升开发效率和代码质量。通过深入学习这本书,你将能够熟练地运用 RxJava 解决实际...

    RxJava 完全解析.pdf

    - **HyObservable 类的设计思路**: 上文提到的自定义`HyObservable`类提供了一个简化的模型来模拟RxJava的工作原理。通过这种方式,我们可以更好地理解RxJava如何通过创建`Observable`、`Observer`和`subscribe`方法...

    适配Retrofit RxJava3版本的CallAdapter .zip

    CallAdapter是Retrofit的核心组件之一,它负责将Retrofit的Call对象转换为观察者可以订阅的类型,例如Observable或Flowable,这样我们就可以利用RxJava的特性进行链式操作和错误处理。 适配RxJava3的CallAdapter...

    Learning RxJava

    通过这些实践,读者不仅可以加深对RxJava原理的理解,还能学会如何有效地利用RxJava解决实际问题。 #### 并发控制 RxJava支持多线程执行,这对于处理高并发场景非常有用。通过使用不同的调度器(如`Schedulers.io()...

    android手写最简单rxjava

    首先,RxJava的核心概念是 Observable(可观察者) 和 Observer(观察者)。Observable 是数据的生产者,Observer 是数据的消费者。在Android应用中,这通常用于处理用户交互、网络请求或后台任务等异步事件。 描述...

    RxJavaRetrofitAdapter,用于改装2的RXJava 3适配器.zip

    本文将深入探讨RxJava 2到3的主要变化、适配器的工作原理以及如何在项目中应用这个适配器。 首先,让我们看看RXJava 2和RXJava 3之间的关键差异。RXJava 3主要引入了对Java 8及更高版本的支持,同时也进行了一些API...

    RxJava1.1.1.jar

    RxJava的核心组件包括Observable、Observer、Subscription和Operator。Observable是数据的生产者,它可以发布一系列的事件或数据;Observer是数据的消费者,它订阅Observable并接收发布的数据;Subscription是订阅...

    无损压缩可以结合RXjava使用

    这里我们将详细探讨无损压缩的原理、RXJava的优势以及如何将两者结合。 无损压缩是一种数据压缩方法,它在压缩和解压缩过程中不会丢失任何原始信息。相比于有损压缩,无损压缩适合那些需要保持原始数据完整性的场景...

    Retrofit_RxJava封住的网络框架

    **RxJava的原理与应用** 1. **Observables(被观察者)**:RxJava的核心是Observable,它能够生成、转换或组合数据流。在Retrofit中,通过`Call.execute().toObservable()`可以将Retrofit的网络请求转化为...

    rxjava-groovy-0.7.0.zip

    通过研究这个项目,我们可以深入理解RxJava在Groovy环境下的工作原理,学习如何优雅地处理异步流程,以及如何利用Groovy的特性来优化代码。这对于提升我们的并发编程能力和理解反应式编程模型都大有裨益。对于想要在...

    rxjava学习代码

    对于初学者来说,这样的实践和注解是非常有价值的,它们可以帮助你从理论到实践,深入理解RxJava的工作原理。 总的来说,这个"rxjava学习代码"压缩包是你提升 RxJava 技能的理想起点。通过阅读和运行这些代码,你...

    Rxjava+Retrofit的简单使用Demo

    RxJava的核心概念是Observable(可观察者)和Observer(观察者)。Observable负责发出数据,Observer负责订阅和接收这些数据。通过一系列的操作符(如map、filter、concatMap等),我们可以对数据流进行转换、过滤和...

Global site tag (gtag.js) - Google Analytics