`
tangkuo
  • 浏览: 102056 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

RabbitMQ spring 使用总结

阅读更多


rabbitMQ相关概念不在本文介绍范围,rabbitMQ官网和其他博客都有大量介绍。

本文重点内容是spring和rabbit环境搭建以及使用中注意事项总结。
1.1   rabbitMQ服务器搭建

下载安装官网最新版本服务器
1.2   rabbitMQ开启服务管理

rabbitMQ start 启动
1.3   spring pom配置

<spring-rabbit.version>1.3.9.RELEASE</spring-rabbit.version>
<!-- 消息队列 rabbitmq -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>${rabbitmq-client.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>${spring-rabbit.version}</version>
</dependency>

1.4   spring config配置

在D:\workspace\sps\src\main\resources\spring-rabbitmq.xml

配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.springframework.org/schema/mvc
     http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <mvc:annotation-driven />


    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.master.ip}" port="${rabbitmq.master.port}" username="${rabbitmq.master.username}" password="${rabbitmq.master.password}" />

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     exchange="order_topic_exchange" message-converter="gsonConverter" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="orderQueue" durable="true"  />
    <rabbit:queue name="orderPayQueryQueue" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">600000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="pay_delay_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:queue name="orderPayDelayQueryQueue" durable="true"/>

    <rabbit:topic-exchange name="pay_delay_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderPayDelayQueryQueue" pattern="orderPay.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <rabbit:topic-exchange name="order_topic_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderQueue" pattern="sps.#"/>
            <rabbit:binding queue="orderPayQueryQueue" pattern="orderPay.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"  concurrency="10">
        <rabbit:listener queues="orderQueue" ref="orderQueueListener"/>

    </rabbit:listener-container>
  
    <bean id="orderQueueListener" class="com.supuy.sps.services.queue.OrderQueueListener" />

    <bean id="gsonConverter" class="com.supuy.core.mq.Gson2JsonMessageConverter"/>


</beans>

1.5  延迟消息队列

有时候,因为各种原因,我们想实现延迟消费的目的,但是rabbitMQ并没有提供这个功能,这时候,可以通过x-message-ttl和x-dead-letter-exchange实现。

    <rabbit:queue name="orderPayQueryQueue" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">600000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="pay_delay_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

1.6   生产者

@Override
public void orderBuilder(int type,String orderCode) {
    String key = "tps."+orderCode;
    orderCode = type+"."+orderCode;
    amqpMaster.convertAndSend(key, orderCode);
    logger.info("订单加入消息队列,订单编码:{}", key);
}

1.7   消费者

package com.supuy.tps.service.queue;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.supuy.tps.common.mq.Gson2JsonMessageConverter;
import com.supuy.tps.dto.bean.WmsOrderParam;
import com.supuy.tps.service.IOrderShopService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;

/**
* Created by bill on 2016/5/31.
*/
public class OrderSendQueueListener implements ChannelAwareMessageListener {
    private static Logger logger = LoggerFactory.getLogger(OrderSendQueueListener.class);
    @Autowired
    private Gson2JsonMessageConverter messageConverter;
    @Autowired
    private IOrderShopService orderShopService;
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        channel.basicQos(100);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data=(String)messageConverter.fromMessage(message);
        if (data!=null){
            WmsOrderParam wmsOrderParam= JSON.parseObject(data,WmsOrderParam.class);
            if (wmsOrderParam != null){
                wmsOrderParam.setOrderCode(wmsOrderParam.getOrderCode().substring(1));
                orderShopService.pushOrderLogInfo(wmsOrderParam);
            }
        }
    }
}

附加类Gson2JsonMessageConverter实现如下,

package com.supuy.tps.common.mq;

import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
     
    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
     
    private static ClassMapper classMapper =  new DefaultClassMapper();
 
    private static Gson gson = new Gson();
 
    public Gson2JsonMessageConverter() { 
        super(); 
    } 


    @Override 
    protected Message createMessage(Object object,
            MessageProperties messageProperties) {
        byte[] bytes = null; 
        try { 
            String jsonString = gson.toJson(object); 
            bytes = jsonString.getBytes(getDefaultCharset()); 
        } 
        catch (IOException e) { 
            throw new MessageConversionException(
                    "Failed to convert Message content", e); 
        } 
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset()); 
        if (bytes != null) { 
            messageProperties.setContentLength(bytes.length); 
        } 
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    } 
 
    @Override 
    public Object fromMessage(Message message)
            throws MessageConversionException {
        Object content = null; 
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) { 
            String contentType = properties.getContentType(); 
            if (contentType != null && contentType.contains("json")) { 
                String encoding = properties.getContentEncoding(); 
                if (encoding == null) { 
                    encoding = getDefaultCharset(); 
                } 
                try { 
                        Class<?> targetClass = getClassMapper().toClass(
                                message.getMessageProperties());
                        content = convertBytesToObject(message.getBody(), 
                                encoding, targetClass); 
                } 
                catch (IOException e) { 
                    throw new MessageConversionException(
                            "Failed to convert Message content", e); 
                } 
            } 
            else { 
                log.warn("Could not convert incoming message with content-type [" 
                        + contentType + "]"); 
            } 
        } 
        if (content == null) { 
            content = message.getBody(); 
        } 
        return content; 
    } 
 
    private Object convertBytesToObject(byte[] body, String encoding, 
            Class<?> clazz) throws UnsupportedEncodingException { 
        String contentAsString = new String(body, encoding); 
        return gson.fromJson(contentAsString, clazz); 
    }

    @Override
    public ClassMapper getClassMapper() {
        return new DefaultClassMapper();

    }


1.8   Q&A

1 ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。

2 接受消息之后,出现错误,该消息就会被持续占有,无法消费。所以,要活用消息的ack,nack,reject。



分享到:
评论

相关推荐

    java rabbitmq spring springAMQP 代码包 project

    总结来说,这个项目是一个很好的学习资源,它演示了如何在Java环境中利用Spring和Spring AMQP与RabbitMQ协同工作,实现高效的消息传递。对于想要深入理解分布式系统、微服务架构和异步通信的开发者来说,这是一个...

    RabbitMQ与SpringMVC集成

    在提供的`spring.amqp.stocks.rar`文件中,可能包含了一个示例应用,展示了如何在SpringMVC项目中配置和使用RabbitMQ进行股票数据的发布和订阅。用户可以通过解压文件,查看源代码和文档(如`安装说明.docx`),了解...

    springcloud部署rabbitMQ

    总结一下,本篇文章主要讲解了如何在SpringCloud项目中部署和使用RabbitMQ。步骤包括:安装RabbitMQ服务,引入Spring Amqp依赖,配置RabbitMQ连接信息,定义交换机、队列和绑定,以及创建发送和接收消息的组件。这样...

    spring rabbitmq rpc 测试代码

    总结一下,Spring RabbitMQ RPC的核心在于利用RabbitMQ作为中间人,通过定义交换机、队列和绑定,实现在客户端和服务端之间进行异步的远程调用。这种方式可以很好地扩展系统,同时保持组件间的解耦。通过配置和编程...

    SpringCloudStream整合RabbitMq

    要在Spring Boot项目中使用SpringCloudStream和RabbitMQ,首先需要在`pom.xml`文件中添加`spring-cloud-starter-stream-rabbit`依赖。确保版本与Spring Boot版本兼容,例如: ```xml &lt;groupId&gt;org.springframework...

    RabbitMq整合使用

    **RabbitMQ整合使用详解** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中,用于处理异步任务、解耦组件以及实现流量控制。在Spring Boot框架中...

    RabbitMq与Spring整合实例

    总结来说,RabbitMQ与Spring的整合使得消息队列的使用变得更加便捷,通过Spring的抽象和自动化,开发者可以更专注于业务逻辑,而不是底层的消息传递细节。这为构建高可用、可扩展的分布式系统提供了有力的支持。

    docker安装rabbitmq并整合springboot

    本文将指导您如何使用 Docker 安装 RabbitMQ,并整合 Spring Boot 实现消息队列功能。 Docker 安装 RabbitMQ 使用 Docker 安装 RabbitMQ 需要以下步骤: 1. 拉取 RabbitMQ 镜像:使用 Docker 的 pull 命令拉取 ...

    Spring Boot RabbitMQ 延迟消息实现完整版

    本文将详细介绍如何使用Spring Boot结合RabbitMQ的`rabbitmq_delayed_message_exchange`插件实现延迟消息。 #### 技术背景 1. **Spring Boot**: 是基于Spring框架的一个开源框架,用于简化Spring应用的初始搭建...

    22.Spring Cloud整合RabbitMQ或Kafka消息驱动

    总结来说,Spring Cloud整合RabbitMQ或Kafka能够实现微服务间的异步通信,提高系统性能和稳定性。同时,Eureka提供服务注册与发现功能,使得微服务能够互相找到并通信。这三者的结合,构建了一个高效、解耦和可扩展...

    spring问题总结.zip

    这里,我们针对“spring问题总结.zip”文件中的主题进行详细的Spring框架知识探讨。 1. **依赖注入(Dependency Injection, DI)**:Spring的核心特性之一,通过DI,对象之间的依赖关系被反转,使得应用程序的组件...

    rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用

    总结来说,通过RabbitMQ和Spring-AMQP实现RPC模式,我们可以利用消息队列的优势,比如解耦、异步处理和高可用性。同时,Spring-AMQP提供了丰富的工具,简化了与RabbitMQ的交互,使得开发人员能够更加专注于业务逻辑...

    Spring整合RabbitMQ

    **Spring整合RabbitMQ*...总结,Spring整合RabbitMQ提供了强大而灵活的消息传递机制,帮助开发者构建健壮的分布式系统。理解RabbitMQ的工作模式,并熟练使用Spring的相关配置和API,能有效提升系统的可靠性和可扩展性。

    Spring rabbitmq集成

    总结,Spring 和 RabbitMQ 的集成使得开发者能够轻松地在 Java 应用中实现消息队列功能,从而提升系统的可扩展性和可靠性。通过配置交换器、队列、监听器,我们可以灵活地控制消息的发送和接收,实现解耦和异步处理...

    spring集成rabbitMq(基于direct、topic和fanout模式)

    在本文中,我们将深入探讨如何将Spring框架与RabbitMQ集成,主要关注三种交换器类型:direct、topic和fanout。这些模式是RabbitMQ消息路由的基础,它们为不同的消息分发需求提供了灵活性。 首先,让我们理解...

    spring 使用RabbitMQ进行消息传递的示例代码

    "Spring 使用 RabbitMQ 进行消息传递的示例代码" Spring 使用 RabbitMQ 进行消息传递的示例代码是指使用 Spring Framework 来整合 RabbitMQ 消息队列系统,以实现高效、可靠的消息传递机制。在这篇文章中,我们将...

    rabbitmq-demo(Java客户端-基础代码 + Java客户端-Spring AMQP)

    Spring AMQP是Spring框架对RabbitMQ的集成,它提供了一套更高级的API和抽象,简化了RabbitMQ的使用。使用Spring AMQP,我们可以利用Spring的IoC和声明式特性来管理RabbitMQ的配置和操作。 首先,我们需要在Spring...

    基于springboot+maven的rabbitmq项目demo

    总结来说,这个项目提供了一个基础的SpringBoot+Maven环境下的RabbitMQ实践,包括了发布/订阅模式的使用。通过注释和简单的实现,开发者可以快速了解如何在SpringBoot应用中集成和操作RabbitMQ。如果你对某个部分有...

    基于SpringBoot整合RabbitMQ发送邮件通知

    总结来说,SpringBoot 结合 RabbitMQ 可以帮助我们构建高效、可靠的通知系统。通过异步消息传递,我们可以避免阻塞主线程,提高系统的响应速度。同时,由于邮件发送与业务逻辑分离,我们可以更灵活地调整和优化邮件...

    惠农电子超市(SpringCloud+Redis+Nginx+MySQL+Elasticsearch+RabbitMQ)

    总结起来,"惠农电子超市"项目通过巧妙地整合SpringCloud、Redis、Nginx、MySQL、Elasticsearch和RabbitMQ,构建了一个高效、稳定、可扩展的电商系统。这些技术的组合应用,不仅实现了用户友好的购物体验,也为企业...

Global site tag (gtag.js) - Google Analytics