在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:
1. 简介:
Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。
2. 使用工具:
rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了
3. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖
<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
4. 在yml文件里面配置rabbit mq
server:
port: 5666
spring:
application:
name: commonservice-mq-producer
profiles:
active: dev
cloud:
config:
discovery:
enabled: true
service-id: commonservice-config-server
# rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置
stream:
bindings:
mqScoreOutput:
destination: honghu_exchange
contentType: application/json
rabbitmq:
host: localhost
port: 5672
username: honghu
password: honghu
eureka:
client:
service-url:
defaultZone: http://honghu:123456@localhost:8761/eureka
instance:
prefer-ip-address: true
5. 定义接口ProducerService
package com.honghu.cloud.producer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
public interface ProducerService {
String SCORE_OUPUT = "mqScoreOutput";
@Output(ProducerService.SCORE_OUPUT)
SubscribableChannel sendMessage();
}
6. 定义绑定
package com.honghu.cloud.producer;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(ProducerService.class)
public class SendServerConfig {
}
7. 定义发送消息业务ProducerController
package com.honghu.cloud.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.honghu.cloud.common.code.ResponseCode; import com.honghu.cloud.common.code.ResponseVO; import com.honghu.cloud.entity.User; import com.honghu.cloud.producer.ProducerService; import net.sf.json.JSONObject; @RestController @RequestMapping(value = "producer") public class ProducerController { @Autowired private ProducerService producerService; /** * 通过get方式发送对象 * @param name 路径参数 * @return 成功|失败 */ @RequestMapping(value = "/sendObj", method = RequestMethod.GET) public ResponseVO sendObj() { User user = new User(1, "hello User"); Message<User> msg = MessageBuilder.withPayload(user).build(); boolean result = producerService.sendMessage().send(msg); if(result){ return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); } return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); } /** * 通过get方式发送字符串消息 * @param name 路径参数 * @return 成功|失败 */ @RequestMapping(value = "/send/{name}", method = RequestMethod.GET) public ResponseVO send(@PathVariable(value = "name", required = true) String name) { Message msg = MessageBuilder.withPayload(name.getBytes()).build(); boolean result = producerService.sendMessage().send(msg); if(result){ return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); } return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); } /** * 通过post方式发送json对象 * @param name 路径参数 * @return 成功|失败 */ @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST) public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) { Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build(); boolean result = producerService.sendMessage().send(msg); if(result){ return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false); } return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false); } }
8. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖
<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
9. 在yml文件中配置:
server:
port: 5111
spring:
application:
name: commonservice-mq-consumer1
profiles:
active: dev
cloud:
config:
discovery:
enabled: true
service-id: commonservice-config-server
stream:
bindings:
mqScoreInput:
group: honghu_queue
destination: honghu_exchange
contentType: application/json
rabbitmq:
host: localhost
port: 5672
username: honghu
password: honghu
eureka:
client:
service-url:
defaultZone: http://honghu:123456@localhost:8761/eureka
instance:
prefer-ip-address: true
9. 定义接口ConsumerService
package com.honghu.cloud.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ConsumerService {
String SCORE_INPUT = "mqScoreInput";
@Input(ConsumerService.SCORE_INPUT)
SubscribableChannel sendMessage();
}
10. 定义启动类和消息消费
package com.honghu.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import com.honghu.cloud.consumer.ConsumerService;
import com.honghu.cloud.entity.User;
@EnableEurekaClient
@SpringBootApplication
@EnableBinding(ConsumerService.class) //可以绑定多个接口
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(ConsumerService.SCORE_INPUT)
public void onMessage(Object obj) {
System.out.println("消费者1,接收到的消息:" + obj);
}
}
11. 分别启动commonservice-mq-producer、commonservice-mq-consumer1
12. 通过postman来验证消息的发送和接收
可以看到接收到了消息,下一章我们介绍mq的集群方案。
到此,整个消息中心方案集成完毕(需要源码可以加qq:2147775633)!!
欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。
相关推荐
Spring Cloud Stream 是一个基于 Spring Boot 的微服务框架,用于构建分布式消息驱动的微服务系统。RabbitMQ 是一个流行的开源消息队列服务器,提供了高效、可靠的消息传递服务。通过 Spring Cloud Stream RabbitMQ...
在微服务架构中,消息驱动的通信机制是不可或缺的一部分,SpringCloud Stream就是Spring Cloud生态中的一个关键组件,它提供了一种声明式的方式来进行消息处理,使得开发人员能够轻松地构建可扩展且高度解耦的服务。...
在Spring Boot应用中,我们需要引入`spring-cloud-stream-rabbit`依赖,它提供了与RabbitMQ集成的必要组件。同时,RabbitMQ的配置类(如`RabbitMQConfig.java`)中,可以自定义交换机、队列和绑定关系,以满足特定...
10. **Spring Cloud Stream**:消息驱动的微服务间通信,提供对消息中间件如RabbitMQ、Kafka的支持。 这些示例项目将帮助你逐步理解并实践SpringCloud的各个组件。每个子项目可能包含了服务的创建、配置、运行和...
spring-cloud-stream-samples, spring 云流示例 spring Cloud示例应用程序这个库包含使用 spring 云流编写的应用程序的集合。 所有的应用程序都是自包含的。 它们可以针对 Kafka 或者RabbitMQ中间件技术运行。 你...
7. **Spring Cloud Stream** - 流处理:Stream提供了一种模型,用于构建可扩展的、消息驱动的微服务,支持多种消息中间件。通过这个实例,我们可以学习如何构建流处理应用。 8. **Spring Cloud Sleuth** - 分布式...
springcloud-stream-rocketmq多topic示例代码
RocketMQ 和 Spring Cloud Stream 的结合使用,旨在构建一个高效、可扩展的消息驱动微服务架构。RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它提供了高吞吐量、低延迟、高可用性和可扩展性的消息传递服务。而 ...
在当今的微服务架构中,Spring Boot 和 Spring Cloud 是两个至关重要的技术。Spring Boot 提供了一种快速构建独立的、生产级别的基于Spring的应用程序的方式,而Spring Cloud则为开发分布式系统(如配置管理、服务...
介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。
spring-cloud-stream-app-descriptor-Celsius.SR3.stream-apps-kafka-10-docker
- 微服务间的通信:除了Ribbon,还可以使用OpenFeign进行声明式服务调用,或者使用Spring Cloud Stream实现消息驱动的解耦。 - 分布式事务管理:SpringCloud Data Flow提供数据流处理,而Seata(前身是TCC)可以解决...
springcloud生产者与消费者项目实战案例 Spring Cloud 中断路器 Circuit Breaker的应用 配置 Spring Cloud Config Server Spring Cloud Config使用Oracle数据库作为后端配置存储 Spring Cloud Config + Spring Cloud...
9. **Spring Cloud Stream**:消息驱动的微服务 - 提供了一种声明式的方法来定义消息输入和输出绑定,让微服务间可以通过消息中间件进行解耦通信。 10. **Spring Cloud Gateway**:新一代API网关 - Spring Cloud ...
在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...
**SpringCloudStream与RabbitMQ整合详解** SpringCloudStream是一个框架,它允许应用程序以声明式方式定义输入和输出绑定,从而简化与消息中间件的集成。在这个场景中,我们将讨论如何将SpringCloudStream与...
Spring Cloud Stream则关注消息驱动的应用程序,它提供了一种声明式的方式来消费和生产消息,适用于事件驱动的微服务架构。 在实际开发中,Spring Cloud Data Flow是用于数据流和任务管理的工具,它可以管理和部署...
在描述中没有提供具体信息,因此我们将基于标签 "springcloudstream" 进行详细的知识点讲解。 **Spring Cloud Stream** Spring Cloud Stream 是一个轻量级的框架,它允许开发者轻松地创建消息处理应用程序。它提供...
在微服务架构中,Spring Cloud Stream是一个关键组件,它为企业级开发提供了一种高效、灵活的消息处理机制。本文将深入探讨Spring Cloud Stream的功能、核心概念以及如何在实际项目中应用。 1. Spring Cloud Stream...