`

java版spring cloud微服务架构b2b2c-Rabbitmq实现延迟

阅读更多

1.预备知识

 

1.1 消息传递

 

首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?

当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。

 

需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六

 

带着这几个关键字:交换器、路由键和队列。

 

1.2 交换器类型

 

如之前所说,交换器根据规则决定消息的路由方向。因此,rabbitmq的消息投递分类便是从交换器开始的,不同的交换器实现不同的路由算法便实现了不同的消息投递方式。

 

direct交换器

 

direct -> routingKey -> queue,相当一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列

 

fanout交换器

 

fanout交换器相当于实现了一(交换器)对多(队列)的广播投递方式

 

topic交换器

 

提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列。

 

1.3 消息延迟

 

本文想要实现一个可延迟发送的消息机制。消息如何延迟?

 

ttl (time to live) 消息存活时间

 

ttl是指一个消息的存活时间。

 

Per-Queue Message TTL in Queues

 

引用官方的一句话:

 

TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead.

我们可以通过x-message-ttl设置一个队列中消息的过期时间,消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

 

Per-Message TTL in Publishers

 

引用官方的一句话:

 

A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.

 

The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.

 

我们可以通过设置每一条消息的属性expiration,指定单条消息有效期。消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

 

重新路由-死信交换机(Dead Letter Exchanges)

引用官方一句话:

 

Dead Letter Exchanges

 

Messages from a queue can be ‘dead-lettered’; that is, republished to

another exchange when any of the following events occur:

 

The message is rejected (basic.reject or basic.nack) with

requeue=false, The TTL for the message expires; or The queue length

limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges.

They can be any of the usual types and are declared as usual.

To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.

 

我们可以通过设置死信交换器(x-dead-letter-exchange)来重新发送消息到另外一个队列,而这个队列将是最终的消费队列。

 

具体实现

rabbitmq配置

 

属性文件-rabbitmq.properties

 

交换、路由等配置按照以上策略,其中,添加了prefetch参数来根据服务器能力控制消费数量。

 

连接用户名

 

mq.user =sms_user

 

密码

 

mq.password =123456

 

主机

 

mq.host =192.168.99.100

 

端口

 

mq.port =5672

 

默认virtual-host

 

mq.vhost =/

 

the default cache size for channels is 25

 

mq.channelCacheSize =50

 

发送消息路由

 

sms.route.key =sms_route_key

 

延迟消息队列

 

sms.delay.queue =sms_delay_queue

 

延迟消息交换器

 

sms.delay.exchange =sms_delay_exchange

 

消息的消费队列

 

sms.queue =sms_queue

 

消息交换器

 

sms.exchange =sms_exchange

 

每秒消费消息数量

 

sms.prefetch =30

 

配置rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder location="rabbitmq.properties"/>
    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory"
                       username="${mq.user}" password="${mq.password}"
                       host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" />

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定义queue -->
    <rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" />
    <!-- 创建延迟,有消息有效期的队列 -->
    <rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <!-- 队列默认消息过期时间 -->
                <value type="java.lang.Long">3600000</value>
            </entry>
            <!-- 消息过期根据重新路由 -->
            <entry key="x-dead-letter-exchange" value="${sms.exchange}"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 定义direct exchange,sms_queue -->
    <rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 延迟消息配置,durable=true 持久化生效 -->
    <rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}">
        <rabbit:listener queues="${sms.queue}" ref="messageReceiver"/>
    </rabbit:listener-container>
</beans>

 消息发布者

package git.yampery.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @decription MsgProducer
* <p>生产者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {

   @Resource
   private AmqpTemplate amqpTemplate;
   @Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;
   @Value("${sms.exchange}") private String SMS_EXCHANGE;
   @Value("${sms.route.key}") private String SMS_ROUTE_KEY;

   /**
    * 延迟消息放入延迟队列中
    * @param msg
    * @param expiration
    */
   public void publish(String msg, String expiration) {
       amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {
           // 设置消息属性-过期时间
           message.getMessageProperties().setExpiration(expiration);
           return message;
       });
   }

   /**
    * 非延迟消息放入待消费队列
    * @param msg
    */
   public void publish(String msg) {
       amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);
   }
}

 消费者

package git.yampery.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @decription MsgConsumer
* <p>消费者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {
   @Override
   public void onMessage(Message message) {
       String msg;
       try {
           // 线程每秒消费一次
           Thread.sleep(1000);
           msg = new String(message.getBody(), "utf-8");
           System.out.println(msg);

       } catch (Exception e) {
           // 这里并没有对服务异常等失败的消息做处理,直接丢弃了
           // 防止因业务异常导致消息失败造成unack阻塞再队列里
           // 可以选择路由到另外一个专门处理消费失败的队列
           return;
       }
   }
}

 测试

package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
 * @decription TestMq
 * <p>测试</p>
 * @author Yampery
 * @date 2018/2/11 15:03
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMq {

    @Resource
    private MsgProducer producer;

    @Test
    public void testMq() {
        JSONObject jObj = new JSONObject();
        jObj.put("msg", "这是一条短信");
        producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));
    }
}

 了解springcloud架构可以加求求:三五三六二四七二五九

分享到:
评论

相关推荐

    spring cloud 微服务架构集成-spring-cloud-framework.zip

    7. **Spring Cloud Sleuth**:分布式追踪 - 在微服务架构中,定位问题可能涉及多个服务,Sleuth可以集成Zipkin或ELK(Elasticsearch、Logstash、Kibana)堆栈,帮助开发者追踪和分析请求在各个服务间的流转过程。...

    springcloud微服务技术栈-个人笔记文档(基础篇)

    【SpringCloud微服务技术栈详解】 SpringCloud 是一套完整的微服务解决方案,它为开发者提供了构建分布式...结合其他技术如 Docker、Redis 和 RabbitMQ,可以进一步优化微服务架构,实现更高效、更稳定的系统运行。

    spring-cloud-steam-rabbitmq-demo.zip

    本示例“spring-cloud-steam-rabbitmq-demo”将探讨如何使用Spring Cloud Stream与RabbitMQ进行集成,以构建一个高效、可靠的分布式系统。 Spring Cloud Stream 是一个框架,用于构建可复用的、松耦合的服务,这些...

    springcloud 微服务(全套视频)

    Spring Cloud为开发者提供了一整套微服务解决方案,通过上述知识点的介绍,我们不仅了解了Spring Cloud的核心技术和主要组件,还深入探讨了如何利用这些技术来构建稳定可靠的微服务架构。希望这套“springcloud ...

    基于Spring Cloud微服务架构的openapi设计与实现源码

    该项目为基于Spring Cloud微服务架构的openapi设计与实现源码,共计503个文件。其中,包含95个JavaScript文件、89个Java源文件、85个PNG图像文件、84个GIF图像文件、73个CSS文件、17个XML配置文件、14个HTML文件、8...

    springcloud 微服务 。pdf

    SpringCloud基于SpringBoot,简化了微服务架构的搭建和管理,使得开发者可以快速地在分布式环境中实现服务发现、负载均衡、断路器、配置中心、API网关等功能。 一、SpringCloud的核心组件: 1. Eureka:服务注册与...

    (完整版)基于SpringCloud微服务系统设计方案.docx

    综上所述,基于Spring Cloud的微服务架构设计涉及多个层面和技术点,不仅需要深入理解微服务的本质和挑战,还需要根据实际业务场景灵活调整设计方案。通过不断优化和迭代,可以构建出稳定、高效、可扩展的微服务系统...

    疯狂Spring Cloud微服务架构实战视频教程

    - **实战案例**:本教程通过一个完整的电商项目来演示Spring Cloud微服务架构的构建过程,涵盖了用户管理、商品管理、订单管理等多个模块。 - **常见问题及解决方案**:例如如何解决服务雪崩效应、如何进行灰度发布...

    SpringCloud微服务架构笔记(四

    【Spring Cloud微服务架构笔记(四)】 在微服务架构中,Spring Cloud Stream是一个关键组件,它为企业级开发提供了一种高效、灵活的消息处理机制。本文将深入探讨Spring Cloud Stream的功能、核心概念以及如何在...

    详解Spring Cloud微服务架构下的WebSocket解决方案

    Spring Cloud 微服务架构下的 WebSocket 解决方案 在现代浏览器中,WebSocket 的应用已经变得非常普遍,特别是在某些业务场景下,要求服务器端推送消息至客户端。在微服务架构集群模式下,选择合适的解决方案尤为...

    springcloud 微服务,Sring cloud Greenwich-xmfcn-spring-cloud.zip

    在 "xmfcn-spring-cloud-master" 文件夹中,我们可以预期找到一个典型的 Spring Cloud 微服务架构的项目结构,包括不同模块(如 Eureka 服务器、配置中心、API 网关、业务服务等),每个模块都有相应的配置文件(如 ...

    spring-cloud 微服务系统架构 ppt

    9. **消息总线**:Spring Cloud Bus通过轻量级的消息代理(如RabbitMQ或Kafka),实现服务间的事件广播和配置刷新,增强系统的实时性。 10. **持续集成/持续部署(CI/CD)**:在微服务架构中,Jenkins、GitLab CI/CD...

    藏经阁-spring cloud微服务架构设计与开发实践-222.pdf

    【Spring Cloud微服务架构设计与开发实践】 Spring Cloud是一个基于Spring Boot实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、...

    售票系统,采用spring cloud微服务架构,mysql+redis+rabbitmq.zip

    售票系统,采用spring cloud微服务架构,mysql+redis+rabbitmq

    转载 仿百度能力开放平台的spring cloud 微服务框架 ,-open-capacity-platform.zip

    【标题】中的“仿百度能力开放平台的spring cloud 微服务框架”指的是一个基于Spring Cloud构建的微服务架构,该架构的设计灵感来源于百度的能力开放平台。Spring Cloud是Java平台上广泛使用的微服务开发框架,它...

    源码-spring cloud 微服务 入门、实战与进阶.zip

    通过深入研究并实践"spring-cloud-master"中的代码,开发者可以了解到如何在实际项目中运用这些组件,理解它们的工作原理,并能熟练地构建和维护自己的微服务架构。这个压缩包对于想要深入了解和掌握Spring Cloud的...

    SpringCloud.pdf

    6. **分布式消息传递**:Spring Cloud Stream 支持集成消息中间件(如 RabbitMQ、Kafka),实现服务间的异步通信和解耦。 云原生应用程序是Spring Cloud 支持的一种开发范式,鼓励采用持续交付和以价值为导向的开发...

    springclound 微服务-springcloud-learn.zip

    10. **Spring Cloud Sleuth**: 用于实现分布式跟踪,集成Zipkin或ELK(Elasticsearch、Logstash、Kibana)等工具,帮助分析微服务架构中的请求流转和性能瓶颈。 11. **Spring Cloud Gateway**: 作为新一代的API网关...

    SpringCloud微服务.zip

    这个压缩包中的代码示例可能涵盖了以上提到的一些或所有功能,通过学习和运行这些示例,开发者可以深入了解SpringCloud如何在实际项目中发挥作用,提升微服务架构的设计和实施能力。在探索过程中,还需要了解Spring ...

Global site tag (gtag.js) - Google Analytics