1.原文地址:https://zhuanlan.zhihu.com/p/61819803
Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来文章将从以下几点跟大家进行介绍:
- 什么是 Spring Messaging;
- 什么是 Spring Integration;
- 什么是 SCS及其功能;
Spring Messaging
Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。
- 比如消息
Messaging
对应的模型就包括一个消息体 Payload 和消息头 Header:
package org.springframework.messaging;
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
- 消息通道
MessageChannel
用于接收消息,调用send
方法可以将消息发送至该消息通道中 :
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
消息通道里的消息如何被消费呢?
- 由消息通道的子接口可订阅的消息通道
SubscribableChannel
实现,被MessageHandler
消息处理器所订阅:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
- 由
MessageHandler
真正地消费/处理消息:
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:
- 消息接收参数及返回值处理:消息接收参数处理器
HandlerMethodArgumentResolver
配合@Header
,@Payload
等注解使用;消息接收后的返回值处理器HandlerMethodReturnValueHandler
配合@SendTo
注解使用; - 消息体内容转换器
MessageConverter
; - 统一抽象的消息发送模板
AbstractMessageSendingTemplate
; - 消息通道拦截器
ChannelInterceptor
;
Spring Integration
Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。它提出了不少新的概念,包括消息的路由 MessageRoute
、消息的分发 MessageDispatcher
、消息的过滤 Filter
、消息的转换Transformer
、消息的聚合 Aggregator
、消息的分割 Splitter
等等。同时还提供了MessageChannel
和MessageHandler
的实现,分别包括DirectChannel
、ExecutorChannel
、PublishSubscribeChannel
和MessageFilter
、ServiceActivatingHandler
、MethodInvokingSplitter
等内容。
首先为大家介绍几种消息的处理方式:
- 消息的分割:
- 消息的聚合:
- 消息的过滤:
- 消息的分发:
接下来,我们以一个最简单的例子来尝试一下 Spring Integration:
SubscribableChannel messageChannel = new DirectChannel(); // 1
messageChannel.subscribe(msg -> { // 2
System.out.println("receive: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
- 构造一个可订阅的消息通道
messageChannel
; - 使用
MessageHandler
去消费这个消息通道里的消息; - 发送一条消息到这个消息通道,消息最终被消息通道里的
MessageHandler
所消费,最后控制台打印出:receive: msg from alibaba
;
DirectChannel
内部有个 UnicastingDispatcher
类型的消息分发器,会分发到对应的消息通道MessageChannel
中,从名字也可以看出来,UnicastingDispatcher
是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy
负载均衡策略,默认只有轮询的实现,可以进行扩展。
我们对上段代码做一点修改,使用多个 MessageHandler
去处理消息:
SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
由于 DirectChannel
内部的消息分发器是 UnicastingDispatcher
单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler
。控制台打印出:
receive1: msg from alibaba
receive2: msg from alibaba
既然存在单播的消息分发器 UnicastingDispatcher
,必然也会存在广播的消息分发器,那就是BroadcastingDispatcher
,它被 PublishSubscribeChannel
这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler
:
SubscribableChannel messageChannel = new PublishSubscribeChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
发送两个消息,都被所有的 MessageHandler
所消费。控制台打印:
receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba
Spring Cloud Stream
SCS与各模块之间的关系是:
- SCS 在 Spring Integration 的基础上进行了封装,提出了
Binder
,Binding
,@EnableBinding
,@StreamListener
等概念; - SCS 与 Spring Boot Actuator 整合,提供了
/bindings
,/channels
endpoint; - SCS 与 Spring Boot Externalized Configuration 整合,提供了
BindingProperties
,BinderProperties
等外部化配置类; - SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。
- SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。
Binder
是提供与外部消息中间件集成的组件,为构造 Binding
提供了 2 个方法,分别是bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。目前官方的实现有Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。
从图中可以看出,Binding
是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:
- 启动类及消息的发送:
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {
public static void main(String[] args) {
SpringApplication.run(SendAndReceiveApplication.class, args);
}
@Bean // 2
public CustomRunner customRunner() {
return new CustomRunner();
}
public static class CustomRunner implements CommandLineRunner {
@Autowired
private Source source;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int index = 1; index <= count; index++) {
source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
}
}
}
}
- 消息的接收:
@Service
public class StreamListenerReceiveService {
@StreamListener(Sink.INPUT) // 4
public void receiveByStreamListener1(String receiveMsg) {
System.out.println("receiveByStreamListener: " + receiveMsg);
}
}
这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 kafka,只需修改配置文件即可,代码无需修改。
我们分析这段代码的原理:
-
@EnableBinding
对应的两个接口属性Source
和Sink
是 SCS 内部提供的。SCS 内部会基于Source
和Sink
构造BindableProxyFactory
,且对应的 output 和 input 方法返回的 MessageChannel 是DirectChannel
。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
配置文件里 bindings 的 name 为 output 和 input,对应 Source
和 Sink
接口的方法上的注解里的 value:
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
- 构造
CommandLineRunner
,程序启动的时候会执行CustomRunner
的run
方法。 - 调用
Source
接口里的 output 方法获取DirectChannel
,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。
- Source 里的 output 发送消息到
DirectChannel
消息通道之后会被AbstractMessageChannelBinder#SendingHandler
这个MessageHandler
处理,然后它会委托给AbstractMessageChannelBinder#createProducerMessageHandler
创建的 MessageHandler 处理(该方法由不同的消息中间件实现); - 不同的消息中间件对应的
AbstractMessageChannelBinder#createProducerMessageHandler
方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker;
- 使用
@StreamListener
进行消息的订阅。请注意,注解里的Sink.input
对应的值是 "input",会根据配置文件里 binding 对应的 name 为 input 的值进行配置:
- 不同的消息中间件对应的
AbstractMessageChannelBinder#createConsumerEndpoint
方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message; - 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中;
-
@StreamListener
对应的StreamListenerMessageHandler
订阅了 name 为 input 的消息通道,进行了消息的消费;
这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):
SCS 章节的最后,我们来看一段 SCS 关于消息的处理方式的一段代码:
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
System.out.println("receive by headers['index']=='1': " + msg);
}
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
System.out.println("receive Person: " + person);
}
@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
System.out.println("receive allMsg by StreamListener. content: " + msg);
}
@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}
有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是类似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是org.springframework.web.method.support.HandlerMethodArgumentResolver
、org.springframework.web.method.support.HandlerMethodReturnValueHandler
。
Spring Messaging 中对于参数和返回值的处理类之前也提到过,分别是org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver
、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler
。
它们的类名一模一样,甚至内部的方法名也一样。
总结
上图是 SCS 体系相关类说明的总结,关于 SCS 以及 RocketMQ Binder 更多相关的示例,可以参考 RocketMQ Binder Demos,包含了消息的聚合、分割、过滤;消息异常处理;消息标签、sql过滤;同步、异步消费等等。
相关推荐
Spring Cloud Stream 体系及原理.pdf Spring Cloud Stream 体系及原理.pdf Spring Cloud Stream 体系及原理.pdf Spring Cloud Stream 体系及原理.pdf Spring Cloud Stream 体系及原理.pdf Spring Cloud ...
让我们深入探讨Spring Cloud Stream的体系结构、核心概念以及工作原理。 1. **体系架构** Spring Cloud Stream 基于发布/订阅模型,它定义了三个核心组件:Binder、Source、Sink。Binder 是连接消息中间件和应用的...
介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。
** RocketMQ + Spring Cloud Stream 环境搭建详解 ** RocketMQ 和 Spring Cloud Stream 的结合使用,旨在构建一个高效、可扩展的消息驱动微服务架构。RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它提供了高...
**SpringCloudStream与RabbitMQ整合详解** SpringCloudStream是一个框架,它允许应用程序以声明式方式定义输入和输出绑定,从而简化与消息中间件的集成。在这个场景中,我们将讨论如何将SpringCloudStream与...
本文将深入探讨SpringCloud Stream的核心概念、工作原理以及如何在实际项目中进行应用。 一、SpringCloud Stream概述 SpringCloud Stream是一个用于构建消息驱动微服务的框架,它基于Spring Boot,提供了轻量级的...
标题 "spring-cloud-stream结合kafka dome" 描述的是一个关于如何使用Spring Cloud Stream与Apache Kafka进行集成并创建示例应用的教程。在这个场景中,我们首先启动两个消费者(标记为kafka和kafka1),然后启动名...
Spring Cloud Stream则关注消息驱动的应用程序,它提供了一种声明式的方式来消费和生产消息,适用于事件驱动的微服务架构。 在实际开发中,Spring Cloud Data Flow是用于数据流和任务管理的工具,它可以管理和部署...
在描述中没有提供具体信息,因此我们将基于标签 "springcloudstream" 进行详细的知识点讲解。 **Spring Cloud Stream** Spring Cloud Stream 是一个轻量级的框架,它允许开发者轻松地创建消息处理应用程序。它提供...
Spring Cloud Stream 和 Apache Kafka 的集成是构建分布式系统中消息驱动架构的一个重要实践。Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,...
springcloud生产者与消费者项目实战案例 Spring Cloud 中断路器 Circuit Breaker的应用 配置 Spring Cloud Config Server Spring Cloud Config使用Oracle数据库作为后端配置存储 Spring Cloud Config + Spring Cloud...
Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 SpringCloud系列教程、SpringBoot、 Stream、Kafka、案例教程
一篇很好的springCloud学习的思维导读,详细的介绍了,springCloud的搭建步骤以及各组件的说明讲解 涵盖 Eureka服务注册与发现 Zookeeper服务注册与发现 Consul服务注册与发现 Ribbon负载均衡服务调用 OpenFeign...
1. **Spring Cloud Stream介绍** Spring Cloud Stream提供了一个编程模型,允许开发者定义输入和输出绑定,将应用程序与消息代理解耦。它支持多种消息中间件,包括RabbitMQ、Kafka和Amazon Kinesis等。通过声明式的...
Spring Cloud Stream 是一个用于构建可复用的、松耦合微服务的框架,它提供了一种声明式方式来处理消息的输入和输出。RabbitMQ 是一个流行的消息代理和队列服务器,常被用于分布式系统中的异步任务处理和解耦组件。...
【尚硅谷周阳老师SpringCloud笔记】是一份深入学习SpringCloud技术体系的教程资源,由知名教育机构尚硅谷的周阳老师编撰。SpringCloud作为微服务架构的重要框架,广泛应用于现代企业的分布式系统开发中。这份笔记...
spring cloud整合MQTT简单示例,分为三个项目eureka-server、service-hi、service-ribbon,MQTT环境需要自己提前搭好,我这搭的环境是apache-apollo-1.7.1
这些视频课程结合源码分析和课件学习,可以帮助开发者深入理解SpringBoot和SpringCloud的核心原理,并掌握实际应用中的最佳实践。对于想要在微服务领域深入发展的IT专业人士来说,这是一个非常有价值的资源。通过...
- **介绍 Spring Cloud Stream**:Spring Cloud Stream 是一个建立在轻量级消息传递之上的构建块,它提供了一种简单的模型来构建消息驱动的微服务应用程序。 - **主要概念**:Spring Cloud Stream 包括了一些核心...