项目中用到rocket mq的方式有多种,
第一种,严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push:
SendResult result = producer.send(msg, new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = arg.hashCode();
int index = id % mqs.size();
return mqs.get(index);
}
}, dataAsyncEvent.getDataType());
消费者也要按照顺序严格有序消费,用这个有序的监听者:
//同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerOrderly extends MessageListener {
/**
* 方法抛出异常等同于返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT<br>
* P.S: 建议应用不要抛出异常
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
* @param context
* @return
*/
public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context);
}
第二种:不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多:
生产者:
SendResult result = producer.send(msg);
消费者:(用此接口处理)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerConcurrently extends MessageListener {
/**
* 方法抛出异常等同于返回 ConsumeConcurrentlyStatus.RECONSUME_LATER<br>
* P.S: 建议应用不要抛出异常
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}
http://blog.csdn.net/fangletian1981/article/details/20694381
相关推荐
Rocket MQ 是一款由阿里云基于 Apache RocketMQ 构建的分布式消息中间件,设计目标是实现低延迟、高并发、高可用和高可靠性。它适用于分布式应用系统的异步解耦和削峰填谷场景,支持海量消息堆积,拥有高吞吐能力,...
Rocket MQ.xmind
云运维工程师从入门到精通,6个要点掌握Rocket MQ 原理,5步教程快速入门Rocket MQ ,100+常见问题排查精解
这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源
主要是针对消息中间件及其应用的介绍, rocketmq是阿里巴巴开源的一款分布式的消息中间件,是实现分布式系统中解耦、异步消息、流量销锋、日志处理等。
### RocketIO高速串行传输原理与实现 #### 引言 随着集成电路技术的快速发展,雷达信号处理的速度得到了显著提升。特别是在高速A/D转换技术和FPGA技术的推动下,雷达信号处理系统的性能有了质的飞跃。这些进步使得...
在这个“my ali rocket mq学习demo”中,我们有两个关键的Java源文件:Consumer.java和Producer.java,它们分别代表了消息队列中的生产者和消费者角色。 首先,我们来了解一下RocketMQ的基本概念: 1. **生产者...
1111111111111111
【TongLINKQ与MQ对比分析】 在IT行业中,消息中间件是企业级应用的关键组件,用于在不同系统之间高效地传递数据。本报告对比分析了两款知名的消息中间件产品——IBM的WebSphere MQ(简称MQ)和北京东方通科技公司的...
总结以上,标题和描述中提到的知识点主要围绕FPGA技术和高速串行通道设计展开,具体包括高速串行互连技术的必要性、Virtex4 FX系列FPGA中的RocketIO模块、MGT模块及参考时钟的设置、复位机制的设计要求以及8B/10B...
RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它在大规模分布式系统中扮演着重要的角色,主要负责处理系统间的异步通信、数据交换和任务调度。RocketMQ的设计目标是高可用、高可靠、低延迟,因此在大数据处理和...
一个基于Rocket MQ的不同数据库之间数据实时同步的平台 watcher为监视数据更新的windows service 配置文件: 1.\Config\producerconfig.json -- rocket mq的生产者配置,用来将从数据库查询出来的数据推送到mq以便...
包含了IBM的MQ初始化,发送,接收的工具类,方便极了,可直接放入到项目中。
串行化通常指将并行数据转换为串行数据以便于传输,而在接收端需要将串行数据转换回并行数据,这个过程称为并行化。在高速通信领域,串行化能有效减少信号线的数量,降低功耗,提高传输效率。 8. **高速红外通信...
RocketIO是Xilinx公司推出的高速串行收发器技术,广泛应用于Xilinx的各种FPGA产品中。它支持多种高速通信标准,如PCI Express、SATA、SerDes等,并且具有高度可配置性和灵活性。RocketIO的出现极大地提升了FPGA在...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
RocketMQ,由阿里巴巴开源,是一款高效、稳定的分布式消息中间件,广泛应用于大数据处理和实时交易系统。它提供了高可靠、高可用的消息传输能力,支持发布/订阅模式以及点对点模式,是大型分布式系统的理想选择。...
RocketIO是Xilinx公司开发的一种高性能的串行接口技术,主要应用于其FPGA(Field-Programmable Gate Array)芯片中。Xilinx GTX是RocketIO系列中的一个关键组件,提供了高速的数据传输能力,广泛用于通信、数据处理...