`

alibaba rocket mq的串行及并行处理

 
阅读更多

项目中用到rocket mq的方式有多种,

第一种,严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push:

                     SendResult result = producer.send(msg, new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = arg.hashCode();
int index = id % mqs.size();
return mqs.get(index);
}

}, dataAsyncEvent.getDataType());

 

                   消费者也要按照顺序严格有序消费,用这个有序的监听者:

                   //同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费

 

 consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeOrderlyContext context) {

                    return ConsumeOrderlyStatus.CONSUME_SUCCESS;

             }

                   public interface MessageListenerOrderly extends MessageListener {
    /**
     * 方法抛出异常等同于返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT<br>
     * P.S: 建议应用不要抛出异常
     * 
     * @param msgs
     *            msgs.size() >= 1<br>
     *            DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
     * @param context
     * @return
     */
    public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
            final ConsumeOrderlyContext context);

 

第二种:不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多:

               生产者:

               SendResult result = producer.send(msg);

                消费者:(用此接口处理)

               consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

             }

                public interface MessageListenerConcurrently extends MessageListener {
    /**
     * 方法抛出异常等同于返回 ConsumeConcurrentlyStatus.RECONSUME_LATER<br>
     * P.S: 建议应用不要抛出异常
     * 
     * @param msgs
     *            msgs.size() >= 1<br>
     *            DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
     * @param context
     * @return
     */
    public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
            final ConsumeConcurrentlyContext context);
}

分享到:
评论

相关推荐

    Rocket MQ 使用排查指南1

    Rocket MQ 是一款由阿里云基于 Apache RocketMQ 构建的分布式消息中间件,设计目标是实现低延迟、高并发、高可用和高可靠性。它适用于分布式应用系统的异步解耦和削峰填谷场景,支持海量消息堆积,拥有高吞吐能力,...

    Rocket MQ.xmind

    Rocket MQ.xmind

    Rocket MQ 使用排查指南.zip

    云运维工程师从入门到精通,6个要点掌握Rocket MQ 原理,5步教程快速入门Rocket MQ ,100+常见问题排查精解

    rocket mq 视频(龙果)

    这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源

    Rocket MQ 消息中间件

    主要是针对消息中间件及其应用的介绍, rocketmq是阿里巴巴开源的一款分布式的消息中间件,是实现分布式系统中解耦、异步消息、流量销锋、日志处理等。

    RocketIO高速串行传输原理与实现.pdf

    ### RocketIO高速串行传输原理与实现 #### 引言 随着集成电路技术的快速发展,雷达信号处理的速度得到了显著提升。特别是在高速A/D转换技术和FPGA技术的推动下,雷达信号处理系统的性能有了质的飞跃。这些进步使得...

    my ali rocket mq学习demo

    在这个“my ali rocket mq学习demo”中,我们有两个关键的Java源文件:Consumer.java和Producer.java,它们分别代表了消息队列中的生产者和消费者角色。 首先,我们来了解一下RocketMQ的基本概念: 1. **生产者...

    rocket mq window 111111111111111111111111111

    1111111111111111

    RocketIO的高速串行通道设计与验证

    总结以上,标题和描述中提到的知识点主要围绕FPGA技术和高速串行通道设计展开,具体包括高速串行互连技术的必要性、Virtex4 FX系列FPGA中的RocketIO模块、MGT模块及参考时钟的设置、复位机制的设计要求以及8B/10B...

    rocket_mq.rar

    RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它在大规模分布式系统中扮演着重要的角色,主要负责处理系统间的异步通信、数据交换和任务调度。RocketMQ的设计目标是高可用、高可靠、低延迟,因此在大数据处理和...

    DataSync:一个基于Rocket MQ的不同数据库之间数据实时同步的平台

    一个基于Rocket MQ的不同数据库之间数据实时同步的平台 watcher为监视数据更新的windows service 配置文件: 1.\Config\producerconfig.json -- rocket mq的生产者配置,用来将从数据库查询出来的数据推送到mq以便...

    MQ工具类java

    包含了IBM的MQ初始化,发送,接收的工具类,方便极了,可直接放入到项目中。

    基于RocketIO的高速光纤红外图像串行传输的实现.pdf

    串行化通常指将并行数据转换为串行数据以便于传输,而在接收端需要将串行数据转换回并行数据,这个过程称为并行化。在高速通信领域,串行化能有效减少信号线的数量,降低功耗,提高传输效率。 8. **高速红外通信...

    RocketIO的使用方法

    RocketIO是Xilinx公司推出的高速串行收发器技术,广泛应用于Xilinx的各种FPGA产品中。它支持多种高速通信标准,如PCI Express、SATA、SerDes等,并且具有高度可配置性和灵活性。RocketIO的出现极大地提升了FPGA在...

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    RocketmqBenchmark:Rocket MQ Benchmark Terraform UCloud

    RocketMQ,由阿里巴巴开源,是一款高效、稳定的分布式消息中间件,广泛应用于大数据处理和实时交易系统。它提供了高可靠、高可用的消息传输能力,支持发布/订阅模式以及点对点模式,是大型分布式系统的理想选择。...

    Xilinx Rocketio资料整理

    RocketIO是Xilinx公司开发的一种高性能的串行接口技术,主要应用于其FPGA(Field-Programmable Gate Array)芯片中。Xilinx GTX是RocketIO系列中的一个关键组件,提供了高速的数据传输能力,广泛用于通信、数据处理...

    消息中间件kafka与activemq、rabbitmq、zeromq、rocketmq的比较

    RocketMQ是由阿里巴巴集团研发并开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。 - **特性**: - **高可用性**: 支持分布式集群部署,确保系统的高可用性。 - **严格的顺序保证**: ...

Global site tag (gtag.js) - Google Analytics