`
m635674608
  • 浏览: 5052935 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

使用 Spring Cloud Stream 构建消息驱动微服务

 
阅读更多

微服务的目的: 松耦合

事件驱动的优势:高度解耦

Spring Cloud Stream 的几个概念

Spring Cloud Stream is a framework for building message-driven microservice applications.

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。


Spring Cloud Stream Application

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式

Binder

Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。

通过 binder ,可以很方便的连接中间件,可以动态的改变消息的
destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。

甚至可以任意的改变中间件的类型而不需要修改一行代码。

Publish-Subscribe

消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。

这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。

Consumer Groups

“Group”,如果使用过 Kafka 的童鞋并不会陌生。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。

微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。

Durability

消息事件的持久化是必不可少的。Spring Cloud Stream 可以动态的选择一个消息队列是持久化,还是 present。

Bindings

bindings 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。

基于 RabbitMQ 使用

以下内容源码: spring cloud demo

消息接收

Spring Cloud Stream 基本用法,需要定义一个接口,如下是内置的一个接口。

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。

这就接口声明了一个 binding 命名为 “input” 。

其他内容通过配置指定:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: mqTestDefault

destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("一般监听收到:" + message.getPayload());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT) ,方法参数为 Message 。

启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。

所有发送 exchange 为“mqTestDefault” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。

以上代码就完成了最基本的消费者部分。

消息发送

消息的发送同消息的接受,都需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

这就接口声明了一个 binding 命名为 “output” ,不同于上述的 “input”,这个binding 声明了一个消息输出流,也就是消息的生产者。

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: mqTestDefault
          contentType: text/plain

contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs

destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 mqTestDefault 的所有消息队列中。

代码中调用:

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {

    @Autowired
    @Qualifier("output")
    MessageChannel output;

    @Override
    public void run(String... strings) throws Exception {
        // 字符串类型发送MQ
        System.out.println("字符串信息发送");
        output.send(MessageBuilder.withPayload("大家好").build());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

通过注入MessageChannel的方式,发送消息。

通过注入Source 接口的方式,发送消息。 具体可以查看样例

以上代码就完成了最基本的生产者部分。

自定义消息发送接收

自定义接口

Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。

interface OrderProcessor {

    String INPUT_ORDER = "inputOrder";
    String OUTPUT_ORDER = "outputOrder";

    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();

    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}

一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。

使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER

spring:
  cloud:
    stream:
      defaultBinder: defaultRabbit
      bindings:
        inputOrder:
          destination: mqTestOrder
        outputOrder:
          destination: mqTestOrder

如上配置,指定了 destination 为 mqTestOrder 的输入输出流。

分组与持久化

上述自定义的接口配置中,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,对应的连接关闭的时候,该队列也会消失。而在实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。

只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX

rabbitMQ routing key 绑定

用惯了 rabbitMQ 的童鞋,在使用的时候,发现 Spring Cloud Stream 的消息投递,默认是根据 destination + group 进行区分,所有的消息都投递到 routing key 为 “#‘’ 的消息队列里。

如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置:

spring:
  cloud:
    stream:
      bindings:
        inputProductAdd:
          destination: mqTestProduct
          group: addProductHandler      # 拥有 group 默认会持久化队列
        outputProductAdd:
          destination: mqTestProduct
      rabbit:
        bindings:
          inputProductAdd:
            consumer:
              bindingRoutingKey: addProduct.*       # 用来绑定消费者的 routing key
          outputProductAdd:
            producer:
              routing-key-expression: '''addProduct.*'''  # 需要用这个来指定 RoutingKey

spring.cloud.stream.rabbit.bindings.[channelName].consumer.bindingRoutingKey
指定了生成的消息队列的routing key

spring.cloud.stream.rabbit.bindings.[channelName].producer.routing-key-expression 指定了生产者消息投递的routing key

DLX 队列

DLX 作用

DLX:Dead-Letter-Exchange(死信队列)。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

消息被拒绝(basic.reject/ basic.nack)并且requeue=false
消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间)
队列达到最大长度

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。

Spring Cloud Stream 中使用

spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true

spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true

配置说明,可以参考 spring cloud stream rabbitmq consumer properties

结论

Spring Cloud Stream 最大的方便之处,莫过于抽象了事件驱动的一些概念,对于消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

相关文档

spring cloud stream 文档

spring cloud stream 项目

spring cloud stream 样例



作者:不小下
链接:http://www.jianshu.com/p/fb7d11c7f798
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
 
 
http://www.jianshu.com/p/fb7d11c7f798
分享到:
评论

相关推荐

    使用SpringCloudStream构建消息驱动微服务

    官方定义SpringCloudStream是一个构建消息驱动微服务的框架。SpringCloudStreamApplication应用程序通过inputs或者outputs来与SpringCloudStream中binder交互,通过我们配置来binding,而SpringCloudStream的binder...

    spring cloud stream kafka 消息驱动集成

    Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,而 Kafka 是一个高吞吐量、低延迟的分布式消息队列系统,常用于实时数据流处理。 在“spring ...

    解锁SpringCloud主流组件 解决微服务诸多难题

    在IT行业中,SpringCloud作为一款强大的微服务框架,被广泛应用于构建复杂分布式系统。本教程“解锁SpringCloud主流组件 解决微服务诸多难题”旨在帮助开发者深入理解和掌握SpringCloud的核心组件,解决微服务架构中...

    《深入理解Spring Cloud与微服务构建》word版本

    《深入理解Spring Cloud与微服务构建》是一本专注于讲解如何使用Spring Cloud构建高效、可靠的微服务系统的书籍。Spring Cloud作为目前最流行的微服务框架之一,它提供了大量的工具和服务,帮助开发者快速搭建分布式...

    springcloud-stream-demo-master.zip

    SpringCloud Stream是一个用于构建消息驱动微服务的框架,它基于Spring Boot,提供了轻量级的消息抽象层,支持RabbitMQ、Kafka和Amazon Kinesis等多种消息中间件。通过定义输入和输出绑定,SpringCloud Stream允许...

    Spring Cloud Stream 体系及原理.zip

    Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它为开发者提供了在Spring Boot应用中构建消息处理管道的能力。这个框架充分利用了Java的强大力量,使得开发人员能够轻松地处理分布式系统中的输入和输出...

    基础框架代码_springcloud_repeatoj7_微服务基础框架代码_springcloud项目_

    本项目"基础框架代码_springcloud_repeatoj7_微服务基础框架代码_springcloud项目_",提供了一个通用的SpringCloud微服务框架搭建方案,旨在帮助开发者快速理解和构建自己的微服务应用。 首先,让我们深入了解...

    Spring Cloud实战 _springcloud实战_springcloud_

    Spring Cloud Stream则关注消息驱动的应用程序,它提供了一种声明式的方式来消费和生产消息,适用于事件驱动的微服务架构。 在实际开发中,Spring Cloud Data Flow是用于数据流和任务管理的工具,它可以管理和部署...

    RocketMQ+Spring Cloud Stream环境搭建

    RocketMQ 和 Spring Cloud Stream 的结合使用,旨在构建一个高效、可扩展的消息驱动微服务架构。RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它提供了高吞吐量、低延迟、高可用性和可扩展性的消息传递服务。而 ...

    springcloud 微服务(全套视频)

    根据提供的文件信息,我们可以推断出这是一套关于Spring Cloud微服务的全套视频教程。...希望这套“springcloud 微服务(全套视频)”教程能够帮助大家更好地掌握Spring Cloud的使用方法,进一步提升个人技术水平。

    基于Spring Cloud的微服务架构实践指南 .zip

    消息驱动的微服务集成Spring Cloud Stream实现消息驱动的微服务架构。 API网关使用Zuul实现API网关,进行请求路由和过滤。 分布式跟踪集成Zipkin实现分布式服务跟踪。 教程列表 《Spring Cloud构建微服务架构...

    03Spring Cloud项目实战微服务整合spring boot视频教程课件

    了解如何使用Spring Cloud Stream构建消息驱动的微服务。 11. **Spring Cloud Sleuth**:Sleuth提供了一种分布式追踪解决方案,集成Zipkin或ELK Stack,帮助我们跟踪微服务间的调用链路,进行性能分析和问题定位。 ...

    深入理解Spring Cloud与微服务构建 方志朋 高清pdf

    Spring Cloud Stream为消息驱动微服务应用程序提供了构建块集合。 #### 微服务的设计原则与最佳实践 - **领域驱动设计(DDD)**:强调从业务角度出发,将复杂的业务逻辑分解为一系列的领域模型和服务。 - **API ...

    SpringCloud微服务架构笔记-共四部分四个PDF文件

    - 微服务间的通信:除了Ribbon,还可以使用OpenFeign进行声明式服务调用,或者使用Spring Cloud Stream实现消息驱动的解耦。 - 分布式事务管理:SpringCloud Data Flow提供数据流处理,而Seata(前身是TCC)可以解决...

    ddd-springcloud-stream.zip

    Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,而DDD是一种软件开发方法,通过将复杂的业务逻辑分解为一系列明确的领域和子领域来管理。 在描述中没有提供具体信息,因此我们将基于标签 "springcloud...

    Spring Cloud Stream 体系及原理介绍

    Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。 Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream)...

    idea创建的SpringCloud微服务项目

    9. **Spring Cloud Stream消息驱动的编程模型**: - Spring Cloud Stream提供了一种声明式方式来处理输入和输出绑定,允许微服务间通过消息中间件进行通信。 10. **Docker与Kubernetes容器化部署**: - 考虑到...

    spring cloud微服务技术栈

    Spring Cloud Stream提供了构建消息驱动微服务的框架,支持各种消息中间件如RabbitMQ、Kafka等,方便构建松耦合、高可用的系统。 9. **Spring Cloud Sleuth服务跟踪** Sleuth提供了分布式追踪解决方案,集成...

    SpringCloud第3季2024.7z

    7. **Spring Cloud Stream**:针对消息驱动的微服务设计,提供生产者、消费者模型,支持RabbitMQ、Kafka等多种消息中间件。 8. **Spring Cloud Data Flow**:用于构建、部署和管理数据处理任务,特别适用于流处理...

    微服务Spring Cloud 程序前后端代码代码示例

    8. **Spring Cloud Stream**:用于处理消息驱动的微服务间通信,如RabbitMQ或Kafka。 通过这些示例代码,开发者可以理解如何在实际项目中集成和使用这些组件。例如,Eureka的注册和发现流程,如何在Zuul中编写过滤...

Global site tag (gtag.js) - Google Analytics