`
hbxflihua
  • 浏览: 683348 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

rabbitmq批量处理

阅读更多

我们通过spring-amqp操作rabbitmq是极其简单的,消息的生产者和消费者只需要如下配置:

客户端(生产者):connectionFactory、queue、exchange、messageConverter、RabbitTemplate。

服务端(消费者):connectionFactory、queue、exchange、messageConverter、listenerContainer。

 

如果消息堆积严重,我们可以通过两种方式来处理消息,一种是在服务端开启监听多线程服务(concurrency="10"),另一种是让消息批量出队列。

 

开启多线程的配置示例如下:

	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10"   message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="tradeListener" method="listen"  queues="queue_trade_repay" />
	</rabbit:listener-container>

 

批量出队列的示例如下:

客户端(消息生产者

import java.math.BigDecimal;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.rd.account.domain.AccountLog;
import com.rd.ifaes.mq.producer.RabbitProducer;
import com.rd.ifaes.web.BaseTest;

/**
 * 消息生产者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Producer  extends BaseTest{
	
//	@Autowired
//	private RabbitTemplate rabbitTemplate;
	
	//这里对rabbitTemplate做了简单的封装,您可以直接使用rabbitTemplate
	@Autowired
	private RabbitProducer rabbitProducer;
	
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;

	@Test
	public void main() {
		for (int i = 0; i < 512; i++) {
			AccountLog log = new AccountLog("001", "001", "asdf", BigDecimal.valueOf(i), "remark"+i);
			rabbitProducer.send(queueName, log);
//			rabbitTemplate.convertAndSend(queueName, "hello" + i);
		}
		
	}
	
}

 

服务端(消息消费者)

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消息消费者
 * @author lihua
 * @since 2018-04-08
 *
 */
public class Consumer extends BaseTest{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	private static final String queueName = "ACCOUNT_LOG_BATCH"; //MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final int BATCH_SIZE = 100;
	
	@Test
	public void consumer() {
    	while (true) {
    		rabbitTemplate.execute(new ChannelCallback<String>() {
    			@Override
    			public String doInRabbit(Channel channel) throws Exception {
    				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    		        try {
    		            final AMQP.Queue.DeclareOk ok = channel.queueDeclare(queueName, true, false, false, null);
    		            int messageCount = ok.getMessageCount();
    		            LOGGER.info("run consumer {}, msg count {}", sdf.format(new Date()), messageCount);
    		            if (messageCount == 0) {
    		                return null;
    		            }
    		            List<AccountLog> list = new ArrayList<>();
    		            channel.basicQos(BATCH_SIZE);
    		            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    		            LOGGER.info("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
    		            final String inConsumerTag = "test consumer" + sdf.format(new Date());
    		            channel.basicConsume(queueName, false, inConsumerTag, queueingConsumer);
    		            long messageId = -1;
    		            int dealedCount = 0;
    		            int i = BATCH_SIZE;
    		            while (i-- > 0) {
    		                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
    		                if (delivery == null) {
    		                    break;
    		                }
    		                String msg = new String(delivery.getBody());
    		                AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
    		                list.add(log);
    		                messageId = delivery.getEnvelope().getDeliveryTag();
    		                LOGGER.info("get message {} delivery id {}", msg, messageId);
    		                dealedCount++;
    		                if (dealedCount % 5 == 0) {
    		                    channel.basicAck(messageId, true);
    		                    LOGGER.info("batch ack message id =>{}", messageId);
    		                    messageId = -1;
    		                }
    		            }
    		            if (messageId > 0) {
    		                channel.basicAck(messageId, true);
    		                LOGGER.info("last to ack message id =>{}", messageId);
    		            }
    		            
    		            // 日志入库
    		            accountLogService.saveBatch(list);
    		            
    		        } finally {
    		            LOGGER.info("consumer done {}", sdf.format(new Date()));
    		        }
    		        channel.abort();
    				return null;
    			}
    		});
			
    		try {
    			Thread.sleep(5000);
    		} catch (InterruptedException e) {
    			
    		}
		}
	}

}

 

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-context.xml"})
public abstract class BaseTest {

}

 

补一个服务端真实案例:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;


@Component
@Lazy(value=false)
public class AccountLogBatchListener {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private AccountLogService accountLogService;
	
	private static final Logger LOGGER = LoggerFactory.getLogger(AccountLogBatchListener.class);
	private static final String QUEUE_NAME = MqConstant.ROUTING_KEY_ACCOUNT_LOG_BATCH;
	private static final ExecutorService executor = Executors.newFixedThreadPool(1);
	private static final int BATCH_SIZE = 100;
	
	@PostConstruct
	public void init(){		
		executor.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				execute();
				return null;
			}			
		});
	}
	
	private void execute(){		
		while (true) {
			rabbitTemplate.execute(new ChannelCallback<String>() {
				@Override
				public String doInRabbit(Channel channel) throws Exception {
					SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
					try {		        	
						final AMQP.Queue.DeclareOk ok = channel.queueDeclare(QUEUE_NAME, true, false, false, null);
						int messageCount = ok.getMessageCount();
						LOGGER.debug("accountLogBatchListener {}, msg count {}", sdf.format(new Date()), messageCount);
						if (messageCount == 0) {
							return null;
						}
						List<AccountLog> list = new ArrayList<>();
						channel.basicQos(BATCH_SIZE);
						QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
						LOGGER.debug("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
						final String inConsumerTag = "accountLogBatchListener {}" + sdf.format(new Date());
						channel.basicConsume(QUEUE_NAME, false, inConsumerTag, queueingConsumer);
						long messageId = -1;
						int dealedCount = 0;
						int i = BATCH_SIZE;
						while (i-- > 0) {
							QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(BATCH_SIZE);
							if (delivery == null) {
								break;
							}
							String msg = new String(delivery.getBody());
							AccountLog log = JSONObject.parseObject(msg, AccountLog.class);
							list.add(log);
							messageId = delivery.getEnvelope().getDeliveryTag();
							LOGGER.info(" userId {}, delivery id {}", log.getUserId(), messageId);
							dealedCount++;
							if (dealedCount % 5 == 0) {
								channel.basicAck(messageId, true);
								LOGGER.debug("batch ack message id =>{}", messageId);
								messageId = -1;
							}
						}
						if (messageId > 0) {
							channel.basicAck(messageId, true);
							LOGGER.debug("last to ack message id =>{}", messageId);
						}
						
						// 日志入库
						accountLogService.saveBatch(list);
						
						
					} finally {
						LOGGER.info("accountLogBatchListener done {}", sdf.format(new Date()));
					}
					channel.abort();
					return null;
				}
			});
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
			}
		}
	}

}

 

分享到:
评论

相关推荐

    使用rabbitmq解决超卖问题

    但需要注意,批量处理时要确保单次扣除的库存不超过实际库存。 - 使用RabbitMQ的QoS(Quality of Service)设置,限制消费者一次性取走的消息数量,防止因大量并发请求导致库存瞬间清零。 5. **系统架构**: - ...

    用PHP收发RabbitMQ消息

    RabbitMQ 是一个消息队列系统,使用 AMQP(Advanced Message Queuing Protocol)协议来实现异步消息传输。下面我们将详细介绍如何使用 PHP 语言实现消息队列的发送和接收。 一、安装 AMQP 扩展 要使用 PHP 语言与 ...

    rabbitmq高可用 demo,一看就懂!

    RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的异步处理、解耦和负载均衡。本教程将深入探讨如何实现RabbitMQ的高可用性(High Availability, HA),确保即使在高并发或者服务器故障时,服务仍能正常...

    C# RabbitMQ Helper 帮助类

    6. **异常处理**:内置异常处理机制,能够捕获并适当地处理RabbitMQ操作过程中可能出现的错误。 **三、使用方法** 1. **引入DLL**:将C# RabbitMQ Helper编译后的DLL文件引入到你的项目中,添加对它的引用。 2. *...

    RabbitMQ实战指南-rabbitmq-action.zip

    2. **批量处理**: 处理大量数据时,可以将任务放入队列,由后台服务按需处理。 3. **日志收集**: 日志生产者发送日志消息,日志消费者收集并存储。 4. **事件驱动架构**: 通过发布/订阅模式,实现事件驱动的系统...

    RabbitMQ

    4. 当消费者准备好时,它们从队列中获取消息,RabbitMQ支持多种消费模式,如同步、批量和惰性消费。 **RabbitMQ性能测试与调优** 1. **性能测试**: 可以使用工具如Apache JMeter或Pika进行RabbitMQ的性能测试,...

    rabbitmq-service3.7.14.rar

    在实际应用中,RabbitMQ常用于异步任务处理、系统间解耦、批量数据处理等场景。例如,当Web服务器接收到一个耗时的任务时,它可以将任务作为一个消息发布到RabbitMQ,然后继续处理其他请求,而后台的工作进程则从...

    rabbitmq-jar包

    RabbitMQ是一个开源的消息代理和队列服务器,它基于高级消息队列协议(AMQP)标准,被广泛用于处理分布式系统中的异步任务和消息传递。在这个“rabbitmq-jar包”中,我们有两个版本的AMQP客户端库,分别是`amqp-...

    rabbitmq-c rabbitmq amqp c++接口库

    3. **C++接口库**:`rabbitmq-c++`通常是对`rabbitmq-c`进行封装的库,提供了C++对象模型,使得API更符合C++的编程习惯,例如通过面向对象的方式处理连接、通道和队列。封装可能包括异常处理、智能指针、RAII...

    rabbitmq-access, rabbitmq手动确认模式java封装.zip

    4. **批量确认**:如果消息处理速度很快,可以考虑开启批量确认。这意味着在接收到一定数量的消息后,一次性发送确认信号,而不是每条消息都确认。这可以通过设置`basicQos`的参数`prefetchSize`为0和`prefetchCount...

    rabbitmq server 3.7.4版本window 64 安装包exe

    - **批量数据处理**:通过消息队列缓存大量待处理的数据,后台服务按需处理,避免阻塞前端服务。 6. **最佳实践** - **监控和日志**:配置日志输出,使用Prometheus和Grafana等工具监控RabbitMQ的运行状态。 - *...

    rabbitmq客户端

    5. 工作模式:RabbitMQ支持不同的消费模式,如非持久订阅(non-durable)、自动确认(auto-ack)和批量消费(batch consuming)。选择合适的模式可以优化性能和可靠性。 6. 高级特性:RabbitMQ客户端还支持消息的...

    rabbitMq及其依赖.rar

    3. **批量处理**:消息积压后,可以一次性处理多个,提高效率。 4. **容错机制**:消息如果未被正确处理,可以重新放入队列,保证数据完整性。 5. **负载均衡**:通过消息队列,可以实现多消费者并行处理,提升系统...

    SpringBoot+redis+RabbitMq整合实例

    根据实际性能和需求,可以调整Redis的缓存策略,或者RabbitMQ的消息处理方式,比如采用批量消费、死信队列等优化策略。 "sky-shopping"可能是这个实例中的一个具体应用场景,可能是一个电子商务平台。在这个平台上...

    canal-rabbitmq-1.1.5封装包

    2. 性能优化:合理配置Canal和RabbitMQ的参数,如批量发送事件、设置合理的缓冲区大小,以提高性能。 3. 错误处理:建立完善的错误处理机制,如重试、死信队列等,确保消息的可靠传递。 综上所述,Canal与RabbitMQ...

    RabbitMQ+ erlang32位系统.rar

    2. **批量处理**:通过消息队列,可以将大量任务分发到后台进行批量处理,避免阻塞前端应用。 3. **事件驱动**:RabbitMQ可以用来实现事件驱动架构,如日志记录、监控告警等。 4. **负载均衡**:消息队列可以帮助...

    RabbitMQ发送和接收

    - 工作模式:RabbitMQ支持不同的消费者工作模式,如同步、异步、批量处理等,应根据需求选择合适的工作模式。 总的来说,C#中的RabbitMQ应用涉及到创建连接、声明交换机和队列、发送与接收消息等关键步骤。通过学习...

    Release_rabbitmq_C#源码_

    7. **性能优化**:在大量文件传输时,可能需要考虑批量发送消息、使用多线程处理文件或使用异步I/O来提高效率。 8. **消息确认机制**:RabbitMQ提供了消息确认机制,允许消费者确认已处理的消息,确保消息不被重复...

    《深入RabbitMQ》.pdf.zip

    6. **消费者工作模式**:包括非持久订阅(noack)和确认模式,以及批量消费和并发消费的实现。 7. **高可用性**:通过集群和镜像队列实现RabbitMQ的高可用性,确保在故障发生时服务不会中断。 8. **安全性**:用户...

    RabbitMQ 3.8.5 文件及安装文档.rar

    消费者可以设置为“同步”或“异步”,即直接处理消息或者批量处理。 8. **AMQP协议**:AMQP是标准的消息中间件协议,提供了通用的接口,使得不同语言和平台可以轻松地与RabbitMQ交互。 在安装RabbitMQ 3.8.5时,...

Global site tag (gtag.js) - Google Analytics