`
234390216
  • 浏览: 10229817 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462460
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775244
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398177
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:394947
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679879
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530770
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183579
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467457
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151277
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68022
社区版块
存档分类
最新评论

RocketMQ(04)——发送顺序消息

阅读更多

发送顺序消息

如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是需要传递MessageQueueSelector的send(),该方法还可以传递一个额外的参数,其对应MessageQueueSelector的select()的最后一个参数。下面代码中我们一共发送了10条消息,从1开始算顺序为奇数的都放到第一个队列中,顺序为偶数的都放第二个队列中。所以最终第一个队列放了顺序号为1/3/5/7/9的消息,第二个队列中放了顺序号为2/4/6/8/10的消息。

@Test
public void testOrderSend() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(this.nameServer);
  producer.start();
  for (int i=0; i<10; i++) {
    Message message = new Message("topic1", "tag3", (System.currentTimeMillis() + "---" + System.nanoTime() + "hello ordered message " + i).getBytes());
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
      @Override
      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int index = (int) arg;
        //奇数放一个队列,偶数放一个队列
        return mqs.get(index % mqs.size() % 2);
      }
    }, i);
    Assert.assertTrue(sendResult.getSendStatus() == SendStatus.SEND_OK);
  }
  producer.shutdown();
}

在消费的时候如果需要确保队列中的消息是按照顺序消费的,注册消息监听器时不能再选择并发消费的MessageListenerConcurrently,而需要选择按顺序消费的MessageListenerOrderly。按顺序消费时每个线程会锁定当前队列,只有一条消息消费完了才会释放锁,这样确保同一队列同时只能有一个线程消费一条消息。而并发消费时是不会一直锁队列的。有序消费时同一个队列里面的消息会按照顺序进行消费,但是它们可能被不同的线程消费。如消息的顺序是1/2/3/4/5/6,则按照顺序消费可以保证消息的消费顺序一定是1/2/3/4/5/6,但是消费它们的线程有可能是线程6/5/4/3/2/1。如果要保证有序的消费是在同一个线程完成的,则消费者线程只能有一个,可以通过setConsumeThreadMax()定义消费线程的最大数,可以通过setConsumeThreadMin设置消费者线程的最小数。下面的代码中就定义了按照顺序进行消息的消费。

@Test
public void testOrderConsume() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
  consumer.setNamesrvAddr(this.nameServer);
  consumer.subscribe("topic1", "tag3");
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//默认值,订阅以前的消息将被忽略
  consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
      System.out.println(Thread.currentThread().getName() + "消费消息:" + new String(msgs.get(0).getBody()));
      return ConsumeOrderlyStatus.SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
  consumer.shutdown();
}

顺序消息消费时返回的ConsumeOrderlyStatus只能是SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUCCESS表示消费成功,可以继续消费下一条,而SUSPEND_CURRENT_QUEUE_A_MOMENT表示消费失败,需要等待一下再继续消费。它不能像并发消费那样跳过消费失败的消息,因为那样就破坏了消息消费的顺序性。

(注:本文是基于RocketMQ4.5.0所写)

分享到:
评论

相关推荐

    RocketMQ消息队列demo

    - **消息顺序**:对于需要保持消息顺序的场景,RocketMQ提供了有序消息的支持。 4. ** Demo分析 ** 在"rocketmq-demo"中,可能包含以下内容: - 一个简单的命令行界面,用户输入IP和端口后,连接到RocketMQ...

    rocketmq使用.zip

    总结来说,“rocketmq使用.zip”这个压缩包包含了关于RocketMQ的基本使用和核心特性——事务消息的资料,对于理解并运用RocketMQ在分布式系统中的事务处理非常有帮助。通过学习和实践,你可以构建出更健壮、高效的...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    13. **消息顺序性**:RocketMQ通过设置消息队列的分配策略,可以保证在同一队列内的消息顺序消费,适用于需要消息顺序性的场景。 通过"rocketmq-all-5.1.3-bin-release"这个压缩包,你可以得到完整的RocketMQ运行...

    rocketmq-4.7.0.zip

    8. **顺序消息**:RocketMQ支持全局顺序和局部顺序消息,保证消息按照特定顺序消费,这对于某些业务场景(如日志记录、库存更新)至关重要。 9. **高可用与容灾**:RocketMQ通过主备切换、镜像复制等方式保证服务的...

    黑马rocketmq md文档

    3. **顺序消息**:RocketMQ支持全局和分区的顺序消息,保证消息的有序性。 4. **延迟消息**:用户可以设置消息的延迟时间,使得消息在特定时间后才被消费。 5. **事务消息**:RocketMQ提供分布式事务支持,确保...

    LG机构RocketMQ视频源码资料课件

    6. **消息顺序性**:RocketMQ支持部分场景下的消息顺序性,例如在同一队列内保证消息的顺序发送和消费。 7. **消息回溯与幂等性**:RocketMQ提供了消息回溯功能,允许Consumer回到历史消息中重新消费。同时,开发者...

    RocketMQ用户指南v3.2.4

    RocketMQ用户指南v3.2.4是针对阿里巴巴开源的消息中间件项目——RocketMQ的一份详细开发指南。此文档旨在帮助开发者理解并有效地利用RocketMQ的功能特性,为分布式系统提供可靠的消息传递服务。 1. **产品发展历史*...

    rocketmq-server-4.7.1.rar

    事务消息保证了消息的最终一致性,延迟消息可以按设定的时间延迟发送,顺序消息则保证了消息的顺序消费,而消息过滤则允许在服务器端或客户端进行消息的筛选,提高了效率。 总之,RocketMQ-server-4.7.1是一个强大...

    【面试资料】-(机构内训资料)java面试题_消息中间件--RocketMq(14题).zip

    6. ** 消息的顺序性 **:RocketMQ支持顺序消息,通过特定的队列分配策略和发送方式,保证消息的顺序消费。 7. ** 消息重复与幂等性 **:理解如何处理消息重复的问题,以及如何设计消费者的幂等性,确保即使消息重复...

    分布式开放消息系统

    - **全局顺序消息**:对于某些特殊需求,RocketMQ还支持跨队列的全局顺序消息,即无论消息发送到哪个队列,都需要保证全局顺序。这种模式更加复杂,但也更为强大。 为了实现顺序消息,RocketMQ采用了以下机制: - ...

    rocketmq-externals-master.zip

    5. **消息顺序**:RocketMQ支持部分场景下的消息顺序保证,比如在同一分片内的消息顺序。 6. **消息回溯与重试**:RocketMQ提供消息回溯功能,可以设置回溯时间,使得消费者可以在指定的时间范围内重新消费消息。...

    RocketMQ源码分析讲解

    - 向文件顺序写操作(appendMessage):RocketMQ通过MappedFile类实现对大文件的操作,appendMessage方法用于顺序写入消息。 - 消息刷盘操作(commit):commit方法负责将内存中的消息数据刷到磁盘。 - 随机读操作...

    rocketmqDemo

    - **顺序消息**:RocketMQ提供了顺序消息保障,确保消息按照特定的顺序被消费。 - **事务消息**:RocketMQ支持分布式事务,确保消息的原子性,即使在分布式环境中也能保证事务的一致性。 - **高可用**:通过主从...

    消息队列简介-培训材料

    它基于先进先出(FIFO)的数据结构——队列,允许应用程序将数据(即消息)放入队列,而其他应用程序可以从队列中取出并消费这些消息。这种设计模式降低了不同组件之间的耦合度,提高了系统的可扩展性和稳定性。 **...

    service-transaction.rar

    例如,使用消息队列(如rocketmq)进行异步通信,服务A执行操作后发送消息到队列,服务B接收并处理消息,实现事务的最终一致性。 3. **Saga事务**:Saga是一种长事务的解决方案,它将一个长事务分解为一系列短事务...

Global site tag (gtag.js) - Google Analytics