`
raymond.chen
  • 浏览: 1437378 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

用Spring Cloud Stream构建消息驱动的微服务

 
阅读更多

Spring Cloud Stream 是一个用于构建“基于事件驱动的、与共享消息系统相连接的高度可扩展微服务”的框架,并提供了许多抽象和原语,以简化Spring生态系统消息驱动应用程序的开发。

 

核心概念

    Spring Cloud Stream的应用程序模型

          应用程序通过inputs或者outputs来与Binder交互,其通过配置来绑定,Binder负责与中间件交互。

     Binder抽象

          提供与外部消息中间件集成的组件。

          目前只提供了RabbitMQ和Kafka的Binder实现。

          通过使用它所提供的扩展API来实现其他中间件的Binder。

     持久的发布-订阅模型支持

          消息通信方式遵循发布-订阅模式。

     消费者组支持

          当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息。

          消费者类型:

                 Message-driven (消息驱动型,有时简称为异步)

                 Polled (轮询型,有时简称为同步)

     分区支持

           分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。

     可拔插的Binder API

 

Spring Cloud Stream 提供了三个绑定消息通道的默认实现

       Sink:通过指定消费消息的目标来标识消息使用者的约定。

       Source:与Sink相反,用于标识消息生产者的约定。

       Processor:集成了Sink和Source的作用,标识消息生产者和使用者。

 

       也可以自定义消息通道:

public interface OrderOutputChannel {
	String OUTPUT = "output";
	
	@Output(OrderOutputChannel.OUTPUT)
	MessageChannel output();
}

public interface OrderInputChannel {
	String INPUT = "input";
	
	@Input(OrderInputChannel.INPUT)
	SubscribableChannel input();
}

 

创建消息生产者工程

     pom.xml的关键配置:

<parent>
  	<groupId>org.springframework.boot</groupId>
  	<artifactId>spring-boot-starter-parent</artifactId>
  	<version>2.0.7.RELEASE</version>
  	<relativePath/>
  </parent>

  <dependencies> 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
  	<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  </dependencies>
  
  <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Finchley.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
  </dependencyManagement>

 

    application.yml文件的配置信息:

server:
  port: 5502
  
spring:
  application:
    name: service-stream-sender
  rabbitmq:
    host: 192.168.134.134
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output: #通道名
          destination: order  #目的地
          content-type: application/json  #消息格式
          group: default  #消费组名

 

    启动类:

@SpringBootApplication
@RestController
@EnableBinding(OrderOutputChannel.class) //启用与消息通道的绑定
public class Main {
	/**
	 * 此处使用自定义的消息通道
	 */
	@Autowired
	private OrderOutputChannel outputChannel;
	
	public static void main(String[] args) {
		SpringApplication.run(Main.class, args);
	}
	
	@GetMapping("/index")
	public String index(){
		//将消息通过channel发送到目的地
		User user = new User("cjm", "123");
		Message<User> message = MessageBuilder.withPayload(user).build();
		outputChannel.output().send(message); //Bean对象会转成json字符串存储到目的地
		
		return "service stream sender";
	}
}

 

 创建消息消费者工程

     pom.xml关键配置:

           参考消息生产者工程。

 

     application.yml文件的配置信息:

server:
  port: 5501
  
spring:
  application:
    name: service-stream-receiver
  rabbitmq:
    host: 192.168.134.134
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #通道名
          destination: order  #目的地
          content-type: application/json  #消息格式
          group: default  #消费组名,添加group后队列就是持久化的了

 

    启动类:

@SpringBootApplication
@RestController
@EnableBinding({OrderInputChannel.class}) //启用与消息通道的绑定
public class Main {
	private String message = "";
	
	public static void main(String[] args) {
		SpringApplication.run(Main.class, args);
	}
	
	@GetMapping("/index")
	public String index(){
		return "service stream receiver: " + message;
	}
	
	/**
	 * 监听指定通道,通过该通道接收指定目的地的消息
	 */
	@StreamListener(OrderInputChannel.INPUT)
    public void receive(String payload) {
		message = payload;
        System.out.println("Received1: " + payload);
    }
	
	/**
	 * 将json格式的消息转成User对象
	 */
	@StreamListener(OrderInputChannel.INPUT)
    public void receive2(User user) {
		System.out.println(user.getClass().getName());
        System.out.println("usernaem=" + user.getUsername() + ", password=" + user.getPassword());
    }
}

 



 

  • 大小: 18.5 KB
分享到:
评论

相关推荐

    使用SpringCloudStream构建消息驱动微服务

    官方定义SpringCloudStream是一个构建消息驱动微服务的框架。SpringCloudStreamApplication应用程序通过inputs或者outputs来与SpringCloudStream中binder交互,通过我们配置来binding,而SpringCloudStream的binder...

    spring cloud stream kafka 消息驱动集成

    Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,而 Kafka 是一个高吞吐量、低延迟的分布式消息队列系统,常用于实时数据流处理。 在“spring ...

    解锁SpringCloud主流组件 解决微服务诸多难题

    在IT行业中,SpringCloud作为一款强大的微服务框架,被广泛应用于构建复杂分布式系统。本教程“解锁SpringCloud主流组件 解决微服务诸多难题”旨在帮助开发者深入理解和掌握SpringCloud的核心组件,解决微服务架构中...

    《深入理解Spring Cloud与微服务构建》word版本

    《深入理解Spring Cloud与微服务构建》是一本专注于讲解如何使用Spring Cloud构建高效、可靠的微服务系统的书籍。Spring Cloud作为目前最流行的微服务框架之一,它提供了大量的工具和服务,帮助开发者快速搭建分布式...

    springcloud-stream-demo-master.zip

    SpringCloud Stream是一个用于构建消息驱动微服务的框架,它基于Spring Boot,提供了轻量级的消息抽象层,支持RabbitMQ、Kafka和Amazon Kinesis等多种消息中间件。通过定义输入和输出绑定,SpringCloud Stream允许...

    Spring Cloud Stream 体系及原理.zip

    Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它为开发者提供了在Spring Boot应用中构建消息处理管道的能力。这个框架充分利用了Java的强大力量,使得开发人员能够轻松地处理分布式系统中的输入和输出...

    Spring Cloud实战 _springcloud实战_springcloud_

    Spring Cloud Stream则关注消息驱动的应用程序,它提供了一种声明式的方式来消费和生产消息,适用于事件驱动的微服务架构。 在实际开发中,Spring Cloud Data Flow是用于数据流和任务管理的工具,它可以管理和部署...

    基础框架代码_springcloud_repeatoj7_微服务基础框架代码_springcloud项目_

    本项目"基础框架代码_springcloud_repeatoj7_微服务基础框架代码_springcloud项目_",提供了一个通用的SpringCloud微服务框架搭建方案,旨在帮助开发者快速理解和构建自己的微服务应用。 首先,让我们深入了解...

    RocketMQ+Spring Cloud Stream环境搭建

    RocketMQ 和 Spring Cloud Stream 的结合使用,旨在构建一个高效、可扩展的消息驱动微服务架构。RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它提供了高吞吐量、低延迟、高可用性和可扩展性的消息传递服务。而 ...

    springcloud 微服务(全套视频)

    根据提供的文件信息,我们可以推断出这是一套关于Spring Cloud微服务的全套视频教程。...希望这套“springcloud 微服务(全套视频)”教程能够帮助大家更好地掌握Spring Cloud的使用方法,进一步提升个人技术水平。

    基于Spring Cloud的微服务架构实践指南 .zip

    消息驱动的微服务集成Spring Cloud Stream实现消息驱动的微服务架构。 API网关使用Zuul实现API网关,进行请求路由和过滤。 分布式跟踪集成Zipkin实现分布式服务跟踪。 教程列表 《Spring Cloud构建微服务架构...

    03Spring Cloud项目实战微服务整合spring boot视频教程课件

    了解如何使用Spring Cloud Stream构建消息驱动的微服务。 11. **Spring Cloud Sleuth**:Sleuth提供了一种分布式追踪解决方案,集成Zipkin或ELK Stack,帮助我们跟踪微服务间的调用链路,进行性能分析和问题定位。 ...

    深入理解Spring Cloud与微服务构建 方志朋 高清pdf

    Spring Cloud Stream为消息驱动微服务应用程序提供了构建块集合。 #### 微服务的设计原则与最佳实践 - **领域驱动设计(DDD)**:强调从业务角度出发,将复杂的业务逻辑分解为一系列的领域模型和服务。 - **API ...

    SpringCloud微服务架构笔记-共四部分四个PDF文件

    - 微服务间的通信:除了Ribbon,还可以使用OpenFeign进行声明式服务调用,或者使用Spring Cloud Stream实现消息驱动的解耦。 - 分布式事务管理:SpringCloud Data Flow提供数据流处理,而Seata(前身是TCC)可以解决...

    ddd-springcloud-stream.zip

    Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,而DDD是一种软件开发方法,通过将复杂的业务逻辑分解为一系列明确的领域和子领域来管理。 在描述中没有提供具体信息,因此我们将基于标签 "springcloud...

    Spring Cloud Stream 体系及原理介绍

    Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。 Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream)...

    idea创建的SpringCloud微服务项目

    9. **Spring Cloud Stream消息驱动的编程模型**: - Spring Cloud Stream提供了一种声明式方式来处理输入和输出绑定,允许微服务间通过消息中间件进行通信。 10. **Docker与Kubernetes容器化部署**: - 考虑到...

    spring cloud微服务技术栈

    Spring Cloud Stream提供了构建消息驱动微服务的框架,支持各种消息中间件如RabbitMQ、Kafka等,方便构建松耦合、高可用的系统。 9. **Spring Cloud Sleuth服务跟踪** Sleuth提供了分布式追踪解决方案,集成...

    微服务Spring Cloud 程序前后端代码代码示例

    8. **Spring Cloud Stream**:用于处理消息驱动的微服务间通信,如RabbitMQ或Kafka。 通过这些示例代码,开发者可以理解如何在实际项目中集成和使用这些组件。例如,Eureka的注册和发现流程,如何在Zuul中编写过滤...

    SpringCloud第3季2024.7z

    7. **Spring Cloud Stream**:针对消息驱动的微服务设计,提供生产者、消费者模型,支持RabbitMQ、Kafka等多种消息中间件。 8. **Spring Cloud Data Flow**:用于构建、部署和管理数据处理任务,特别适用于流处理...

Global site tag (gtag.js) - Google Analytics