`
springcloud关注者
  • 浏览: 315501 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
12d8ea3d-4199-3941-8a17-acd5024729b8
Spring_Cloud构...
浏览量:253993
文章分类
社区版块
存档分类
最新评论

(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

阅读更多

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:

 

1. 简介:

Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

 

2. 使用工具:

rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了

 

3. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖

 

<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

 4. 在yml文件里面配置rabbit mq

 

server:
  port: 5666
spring:
  application:
    name: commonservice-mq-producer
  profiles: 
    active: dev
  cloud:
    config:
      discovery: 
        enabled: true
        service-id: commonservice-config-server
  # rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置
    stream:
      bindings:
        mqScoreOutput: 
          destination: honghu_exchange
          contentType: application/json
          
  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu
eureka: 
  client:
    service-url:
      defaultZone: http://honghu:123456@localhost:8761/eureka
  instance:
    prefer-ip-address: true

 5. 定义接口ProducerService

 

package com.honghu.cloud.producer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface ProducerService {
	
	String SCORE_OUPUT = "mqScoreOutput";
	
	@Output(ProducerService.SCORE_OUPUT)
	SubscribableChannel sendMessage();
}

 6. 定义绑定

 

package com.honghu.cloud.producer;

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(ProducerService.class)
public class SendServerConfig {

}

 7. 定义发送消息业务ProducerController

 

package com.honghu.cloud.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.honghu.cloud.common.code.ResponseCode;
import com.honghu.cloud.common.code.ResponseVO;
import com.honghu.cloud.entity.User;
import com.honghu.cloud.producer.ProducerService;

import net.sf.json.JSONObject;

@RestController
@RequestMapping(value = "producer")
public class ProducerController {
	
	@Autowired
	private ProducerService producerService;
	
	
	/**
	 * 通过get方式发送对象
	 * @param name 路径参数
	 * @return 成功|失败
	 */
	@RequestMapping(value = "/sendObj", method = RequestMethod.GET)
	public ResponseVO sendObj() {
		User user = new User(1, "hello User");
		Message<User> msg = MessageBuilder.withPayload(user).build();
		boolean result = producerService.sendMessage().send(msg);
		if(result){
			return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
		}
		return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
	}
	
	
	/**
	 * 通过get方式发送字符串消息
	 * @param name 路径参数
	 * @return 成功|失败
	 */
	@RequestMapping(value = "/send/{name}", method = RequestMethod.GET)
	public ResponseVO send(@PathVariable(value = "name", required = true) String name) {
		Message msg = MessageBuilder.withPayload(name.getBytes()).build();
		boolean result = producerService.sendMessage().send(msg);
		if(result){
			return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
		}
		return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
	}
	
	/**
	 * 通过post方式发送json对象
	 * @param name 路径参数
	 * @return 成功|失败
	 */
	@RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)
	public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {
		Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();
		boolean result = producerService.sendMessage().send(msg);
		if(result){
			return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
		}
		return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
	}
}

 

8. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖

<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

 

 9. 在yml文件中配置:

server:
  port: 5111
spring:
  application:
    name: commonservice-mq-consumer1
  profiles: 
    active: dev
  cloud:
    config:
      discovery: 
        enabled: true
        service-id: commonservice-config-server
        
    stream:
      bindings:
        mqScoreInput:
          group: honghu_queue
          destination: honghu_exchange
          contentType: application/json
          
  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu
eureka: 
  client:
    service-url:
      defaultZone: http://honghu:123456@localhost:8761/eureka
  instance:
    prefer-ip-address: true

 

9. 定义接口ConsumerService

package com.honghu.cloud.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerService {
	
	String SCORE_INPUT = "mqScoreInput";

	@Input(ConsumerService.SCORE_INPUT)
	SubscribableChannel sendMessage();

}

 

10. 定义启动类和消息消费

package com.honghu.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

import com.honghu.cloud.consumer.ConsumerService;
import com.honghu.cloud.entity.User;

@EnableEurekaClient
@SpringBootApplication
@EnableBinding(ConsumerService.class) //可以绑定多个接口
public class ConsumerApplication {
	
	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class, args);
	}
	
	@StreamListener(ConsumerService.SCORE_INPUT)
	public void onMessage(Object obj) {
		System.out.println("消费者1,接收到的消息:" + obj);
	}

}

 

11. 分别启动commonservice-mq-producer、commonservice-mq-consumer1

12. 通过postman来验证消息的发送和接收



 

 

 

 

 

 

可以看到接收到了消息,下一章我们介绍mq的集群方案。

 

到此,整个消息中心方案集成完毕(需要源码可以加qq:2147775633)!!

 

欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。

 

  • 大小: 25.5 KB
  • 大小: 86 KB
  • 大小: 80.7 KB
  • 大小: 86.6 KB
  • 大小: 85.8 KB
  • 大小: 61 KB
7
0
分享到:
评论
9 楼 springcloud关注者 2018-08-28  
dreamday 写道
博主 有源代码可以发一下吗?

http://minglisoft.cn/honghu/technology.html

8 楼 springcloud关注者 2018-08-28  
waterml 写道
不错,确实有很大帮助

希望帮助到你
7 楼 springcloud关注者 2018-08-28  
mangogo 写道
很厉害啊,写的很经典~谢谢,期待跟大神交流

谢谢,有问题给我留言
6 楼 dreamday 2018-08-28  
博主 有源代码可以发一下吗?
5 楼 waterml 2018-08-28  
不错,确实有很大帮助
4 楼 mangogo 2018-08-28  
很厉害啊,写的很经典~谢谢,期待跟大神交流
3 楼 springcloud关注者 2018-08-28  
6696 写道
支持一下,有图有真相,完美

欢迎给点建议或者改进的地方,一起学习
2 楼 6696 2018-08-28  
支持一下,有图有真相,完美
1 楼 springcloud关注者 2018-08-28  
今天集成了Spring Cloud Stream,希望可以帮助到大家

相关推荐

    spring-cloud-starter-stream-rabbit MQ使用规范

    Spring Cloud Stream 是一个基于 Spring Boot 的微服务框架,用于构建分布式消息驱动的微服务系统。RabbitMQ 是一个流行的开源消息队列服务器,提供了高效、可靠的消息传递服务。通过 Spring Cloud Stream RabbitMQ...

    springcloud-stream-rocketmq多topic示例代码

    springcloud-stream-rocketmq多topic示例代码

    springcloud-stream-demo-master.zip

    在微服务架构中,消息驱动的通信机制是不可或缺的一部分,SpringCloud Stream就是Spring Cloud生态中的一个关键组件,它提供了一种声明式的方式来进行消息处理,使得开发人员能够轻松地构建可扩展且高度解耦的服务。...

    spring-cloud-steam-rabbitmq-demo.zip

    在Spring Boot应用中,我们需要引入`spring-cloud-stream-rabbit`依赖,它提供了与RabbitMQ集成的必要组件。同时,RabbitMQ的配置类(如`RabbitMQConfig.java`)中,可以自定义交换机、队列和绑定关系,以满足特定...

    springcloud-learning-master.zip springcloud学习合集

    10. **Spring Cloud Stream**:消息驱动的微服务间通信,提供对消息中间件如RabbitMQ、Kafka的支持。 这些示例项目将帮助你逐步理解并实践SpringCloud的各个组件。每个子项目可能包含了服务的创建、配置、运行和...

    spring-cloud-stream-samples, spring 云流示例.zip

    spring-cloud-stream-samples, spring 云流示例 spring Cloud示例应用程序这个库包含使用 spring 云流编写的应用程序的集合。 所有的应用程序都是自包含的。 它们可以针对 Kafka 或者RabbitMQ中间件技术运行。 你...

    spring-cloud-examples-master

    7. **Spring Cloud Stream** - 流处理:Stream提供了一种模型,用于构建可扩展的、消息驱动的微服务,支持多种消息中间件。通过这个实例,我们可以学习如何构建流处理应用。 8. **Spring Cloud Sleuth** - 分布式...

    RocketMQ+Spring Cloud Stream环境搭建

    RocketMQ 和 Spring Cloud Stream 的结合使用,旨在构建一个高效、可扩展的消息驱动微服务架构。RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它提供了高吞吐量、低延迟、高可用性和可扩展性的消息传递服务。而 ...

    springcloud-learning-master.zip

    在当今的微服务架构中,Spring Boot 和 Spring Cloud 是两个至关重要的技术。Spring Boot 提供了一种快速构建独立的、生产级别的基于Spring的应用程序的方式,而Spring Cloud则为开发分布式系统(如配置管理、服务...

    介绍Spring Cloud Stream与RabbitMQ集成

    介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。

    spring-cloud-stream-app-descriptor-Celsius.SR3

    spring-cloud-stream-app-descriptor-Celsius.SR3.stream-apps-kafka-10-docker

    SpringCloud微服务架构笔记-共四部分四个PDF文件

    - 微服务间的通信:除了Ribbon,还可以使用OpenFeign进行声明式服务调用,或者使用Spring Cloud Stream实现消息驱动的解耦。 - 分布式事务管理:SpringCloud Data Flow提供数据流处理,而Seata(前身是TCC)可以解决...

    SpringCloud项目实战各组件源代码案例

    springcloud生产者与消费者项目实战案例 Spring Cloud 中断路器 Circuit Breaker的应用 配置 Spring Cloud Config Server Spring Cloud Config使用Oracle数据库作为后端配置存储 Spring Cloud Config + Spring Cloud...

    spring cloud 微服务架构集成-spring-cloud-framework.zip

    9. **Spring Cloud Stream**:消息驱动的微服务 - 提供了一种声明式的方法来定义消息输入和输出绑定,让微服务间可以通过消息中间件进行解耦通信。 10. **Spring Cloud Gateway**:新一代API网关 - Spring Cloud ...

    (spring cloud stream 整合 rabbitmq , 自定义消息通道,既能发消息,)cloud-stream-rabbitmq-test.rar

    在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...

    SpringCloudStream整合RabbitMq

    **SpringCloudStream与RabbitMQ整合详解** SpringCloudStream是一个框架,它允许应用程序以声明式方式定义输入和输出绑定,从而简化与消息中间件的集成。在这个场景中,我们将讨论如何将SpringCloudStream与...

    Spring Cloud实战 _springcloud实战_springcloud_

    Spring Cloud Stream则关注消息驱动的应用程序,它提供了一种声明式的方式来消费和生产消息,适用于事件驱动的微服务架构。 在实际开发中,Spring Cloud Data Flow是用于数据流和任务管理的工具,它可以管理和部署...

    ddd-springcloud-stream.zip

    在描述中没有提供具体信息,因此我们将基于标签 "springcloudstream" 进行详细的知识点讲解。 **Spring Cloud Stream** Spring Cloud Stream 是一个轻量级的框架,它允许开发者轻松地创建消息处理应用程序。它提供...

    SpringCloud微服务架构笔记(四

    在微服务架构中,Spring Cloud Stream是一个关键组件,它为企业级开发提供了一种高效、灵活的消息处理机制。本文将深入探讨Spring Cloud Stream的功能、核心概念以及如何在实际项目中应用。 1. Spring Cloud Stream...

Global site tag (gtag.js) - Google Analytics