What
反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。
反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。
常用的反应式编程实现类库包括:Reactor、RxJava 2,、Akka Streams、Vert.x以及Ratpack。本文基于Reactor(由于Reactor有Spring背书,同时反应式编程已经集成于Java 9)。
反应式编程与Java8提供的Streams有众多相似之处(尤其是API上),且提供了相互转化的API。但是反应式编程更加强调异步非阻塞,通过onComplete等注册监听的方式避免阻塞,同时支持delay、interval等特性。而Streams本质上是对集合的并行处理,并不是非阻塞的。
Why
反应式编程的核心是基于事件流、无阻塞、异步的,使用反应式编程不需要编写底层的并发、并行代码。并且由于其声明式编写代码的方式,使得异步代码易读且易维护。
How
基本概念
- Flux,是Reactor中的一种发布者,包含0到N个元素的异步序列。通过其提供的操作可以生成、转换、编排序列。如果不触发异常事件,Flux是无限的。
- Mono,是Reactor中的一种发布者,包含0或者1个的异步序列。可以用于类似于Runnable的场景。
- 背压(backpressure),由订阅者声明的、限定本消费者可处理的流中的元素个数。
操作
所有的流都是不可变的,所以对流的操作都会返回一个新的流。
创建
- just,根据参数创建数据流
- never,创建一个不会发出任何数据的无限运行的数据流
- empty,创建一个不包含任何数据的数据流,不会无限运行。
- error,创建一个订阅后立刻返回异常的数据流
- concact,从多个Mono创建Flux
- generate,同步、逐一的创建复杂流。重载方法支持生成状态。在方法内部的lambda中通过调用next和complete、error来指定当前循环返回的流中的元素(并不是return)。
- create,支持同步、异步、批量的生成流中的元素。
- zip,将多个流合并为一个流,流中的元素一一对应
- delay,Mono方法,用于指定流中的第一个元素产生的延迟时间
- interval,Flux方法,用于指定流中各个元素产生时间的间隔(包括第一个元素产生时间的延迟),从0开始的Long对象组成的流
- justOrEmpty,Mono方法,用于指定当初始化时的值为null时返回空的流
- defaultIfEmpty,Mono方法,用于指定当流中元素为空时产生的默认值
- range,生成一个范围的Integer队列
转化
- map,将流中的数据按照逻辑逐个映射为一个新的数据,当流是通过zip创建时,有一个元组入参,元组内元素代表zip前的各个流中的元素。
- flatMap,将流中的数据按照逻辑逐个映射一个新的流,新的流之间是异步的。
- take,从流中获取N个元素,有多个扩展方法。
- zipMap,将当前流和另一个流合并为一个流,两个流中的元素一一对应。
- mergeWith,将当前流和另一个流合并为一个流,两个流中的元素按照生成顺序合并,无对应关系。
- join,将当前流和另一个流合并为一个流,流中的元素不是一一对应的关系,而是根据产生时间进行合并。
- concactWith,将当前流和另一个流按声明顺序(不是元素的生成时间)链接在一起,保证第一个流消费完后再消费第二流
- zipWith,将当前流和另一个流合并为一个新的流,这个流可以通过lambda表达式设定合并逻辑,并且流中元素一一对应
- first,对于Mono返回多个流中,第一个产生元素的Mono。对于Flux,返回多个Flux流中第一个产生元素的Flux。
- block,Mono和Flux中类似的方法,用于阻塞当前线程直到流中生成元素
- toIterable,Flux方法,将Flux生成的元素返回一个迭代器
- defer,Flux方法,用于从一个Lambda表达式获取结果来生成Flux,这个Lambda一般是线程阻塞的
- buffer相关方法,用于将流中的元素按照时间、逻辑规则分组为多个元素集合,并且这些元素集合组成一个元素类型为集合的新流。
- window,与buffer类似,但是window返回的流中元素类型还是流,而不是buffer的集合。
- filter,顾名思义,返回负责规则的元素组成的新流
- reduce,用于将流中的各个元素与初始值(可以设置)逐一累积,最终得到一个Mono。
其他
- doOnXXX,当流发生XXX时间时的回调方法,可以有多个,类似于监听。XXX包括Subscribe、Next、Complete、Error等。
- onErrorResume,设置流发生异常时返回的发布者,此方法的lambda是异常对象
- onErrorReturn,设置流发生异常时返回的元素,无法捕获异常
- then,返回Mono,跳过整个流的消费
- ignoreElements,忽略整个流中的元素
- subscribeOn,配合Scheduler使用,订阅时的线程模型。
- publisherOn,配合Scheduler使用,发布时的线程模型。
- retry,订阅者重试次数
测试
使用reactor-rest中的StepVerifier,来声明一组对事件流的期望,并最终由verify或verifyError/verifyComplete来测试。如果流中的数据触发时不符合期望则抛出AssertionError。
StepVerifier.create(stringFlux). expectNext("foo").expectNext("bar").expectComplete(); //断言 StepVerifier.create(userFlux).assertNext(user ->assertThat(user.getName()). isEqualTo("jpinkman")).verifyComplete(); //个数判断 StepVerifier.create(longFlux).expectNextCount(10).verifyComplete(); //虚拟实践判断 StepVerifier.withVirtualTime(() -> longFlux).expectSubscription(). expectNoEvent(Duration.ofHours(3)).thenAwait(Duration.ofHours(1)). expectNextCount(2).verifyComplete();
-
expectNext,判断下一个值
-
assertNext,使用Lambda表达式和断言来判断值
-
expectNextCount,流中数的个数
-
expectSubscription,开始订阅,配合withVirtualTime使用
-
expectNoEvent,期望指定时间内流中无数据产生
-
thenAwait,等待指定时间,这段时间订阅正常发生(和expectNoEvent的区别)
-
expectComplete,期望从流中获取完成信号
-
varifyComplete,从流中获取完成信号,并进行测试
-
thenRequet,用于背压测试,向流中请求处理的元素个数。使用thenRequest后必须要消费(expectNext或其他方式),才能继续请求
-
thenCancel,用于背压测试,退出、不再消费流中的元素
调试
启用调试:
Hooks.onOperatorDebug();
日志输出,输出流中处理日志到控制台或其他日志框架:
log()
检查点:
Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);
原理
Flux/Publisher架构:
不同的转化、构建操作会返回FluxOperator抽象类的不同实现类,而各个实现类都持有一或多个(具体的Flux不同)其前一步操作返回的Flux接口实现类的实例,保证最终subscribe操作时,能够统一调用。
Subscribe架构:Flux的subscribe方法会根据参数创建不同类型的Subscriber接口实现类的实例。同时,根据Subscriber和数据构建Subscription接口实现类的实例,并由Subscription接口实现类调用Subscription的request方法(request方法可以设置背压值),进而调用Subscriber的onnext、onComplete、onError方法。
相关推荐
课程下载——C#+WPF上位机开发课程(模块化与反应式编程)
标题“Reactive programming反应式编程介绍”暗示我们将探讨这一编程范式的基础概念,包括它的核心原则、优势以及如何在实际项目中应用。 描述中提到的链接指向了一个博客文章,尽管具体内容无法在这里提供,但通常...
反应式编程框架Flower的设计与实践 反应式编程框架Flower是一种消息驱动的反应式编程框架,旨在解决高并发程序的崩溃问题。该框架的设计与实践是基于Akka的Actor异步基础,实现了分布式异步微服务解决方案。 反应...
【反应式编程】使用常见的反应式操作
为什么需要函数式反应编程
反应式编程框架设计:如何使程序调用不阻塞等待,立即响应? 反应式编程是一种异步编程方案,旨在解决高并发情况下程序崩溃的问题。传统的 Web 应用程序运行期的线程特性是,每个用户请求分配一个线程进行处理,...
title: 10.0 第10章 理解反应式编程- 第3部分 反应式Spring- 第10章 理解反应式编程第10章 理解反应式编程本章内容:反应式编程概览Re
本文重点介绍的是一篇名为《下一代的反应式编程框架研究与实现》的毕业论文,该论文关注的是如何构建一个高效、灵活且适应性强的反应式编程框架来应对日益增长的用户需求和5G时代带来的挑战。反应式系统以其消息驱动...
10. **函数式反应式编程(FRP)**:FRP是一种编程范式,结合了函数式编程和反应式编程的概念。在JavaScript中,像RxJS这样的库提供了处理事件流和异步操作的函数式方式。 通过深入理解并实践这些函数式编程原则和...
"reactive-grpc" 是一个专为集成反应式编程(Reactive Programming)与 gRPC Java 设计的库。gRPC 是一个高性能、开源和通用的 RPC(远程过程调用)框架,基于 HTTP/2 协议设计,而反应式编程则是一种编程范式,它...
至于反应式编程,Swift社区倾向于采用单向数据流和组件化的思想,从而让开发者能从中学习并应用在自己的项目中。 在总结部分,傅若愚通过历史的视角探讨了函数式编程的发展,提到了图灵和邱奇两位计算机科学领域的...
响应式编程是一种编程范式,它依赖于异步数据流和变化的传播,以此来处理数据。与传统的命令式编程(如a=b+c)不同,响应式编程的特点是当数据源发生变化时,依赖于该数据源的所有对象都会自动更新。这类似于电子...
8. **反应式编程**:虽然不是Java 8的标准部分,但Reactor和Vavr等库为Java提供了反应式编程的支持,这是一种基于异步数据流和变换的编程范式,适合处理大量并发事件。 9. **函数式编程与传统编程模式的对比**:...
### 反应式编程简介 反应式编程是一种编程范式,它强调数据流和变化传播。在这种模式下,程序被视为由事件和数据流组成的,而不是一系列步骤。当数据发生变化时,程序会自动响应这些变化,无需手动更新。 ### `...
课程下载——C#+WPF上位机开发课程(模块化与反应式编程)课程
【作品名称】:基于 event loop 、事件流、反应式编程的高并发的异步非阻塞框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 ...
Reactive4JavaFlow是一个利用Java 9中的Flow API构建的反应式编程库,它与第四代ReactiveX设计思想相结合,旨在提供一个高效、灵活且易于使用的反应式编程框架。这个开源项目的目标是帮助开发者在Java环境中实现非...