`
234390216
  • 浏览: 10239369 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462924
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1776048
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398800
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395166
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:680183
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:531183
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1185119
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:468903
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151507
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68372
社区版块
存档分类
最新评论

RocketMQ(05)——消息的群集消费和广播消费

阅读更多

消息的群集消费和广播消费

RocketMQ的消费者进行消息消费时有两种消费方式,群集消费和广播消费。默认是群集消费。

群集消费

Consumer都有一个Group,当两个Consumer实例属于同一个Group时,它们会共享消息队列中的消息消费位移,即同一条消息只会由一个消费者实例消费。实际上一个队列只会分配给一个消费者实例,那么属于该队列中的消息就只能被一个消费者实例消费了。当一个消费者实例挂了后,会重新为消费者实例分配队列,这样原本分配给挂了的那个实例的队列中又会分配给其它消费者进行消费。RocketMQ限制了同一个JVM中不允许有相同Group名称的Consumer实例存在,所以同一Group的多个Consumer往往是部署在不同机器上的,通常是同一程序部署了多份。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-5条消息由Machine1上的消费者消费,6-10条消息由Machine2上的消费者消费。如果在Machine1上还部署了一个名为group2的消费者,则该消费者可以从消息队列中消费1-10条消息,因为它跟group1是不同的Group,不同Group的消费者对同一队列的消费是完全独立的。比如下面的代码对应的就是群集消费。

@Test
public void testConsumer() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag1");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息" + msgs);
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

广播消费

消费者的消费模式是广播消费时,每一个消费者实例对消息队列中消息的消费是独立的,而不管它们的Group是什么。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-10条消息由Machine1上的消费者消费,它们也会由Machine2上的消费者消费。可以通过消费者的setMessageModel(MessageModel.BROADCASTING)指定消息的消费模式为广播消费,它的默认值是CLUSTERING。

@Test
public void testBroadcastConsume() throws Exception {
  String topic = "topic1";
  String tag = "tag1";
  String consumerGroup = "consumer-group1";
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
  consumer.setNamesrvAddr(this.nameServer);
  //广播方式,同一消息可以被所有的消费者消费。
  consumer.setMessageModel(MessageModel.BROADCASTING);
  try {
    consumer.subscribe(topic, tag);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      System.out.println("消费了消息——" + msgs.get(0).getMsgId());
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
  } catch (MQClientException e) {
    e.printStackTrace();
  }
  TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

(注:本文是基于RocketMQ4.5.0所写)

分享到:
评论

相关推荐

    彻底明白Zigbee术语——群集(Cluster)、端点(EndPoint)等.pdf

    端点255用于广播消息,而端点241到254则保留供特殊用途。 五、配置文件(Profile) 配置文件是Zigbee生态系统中的一个重要组成部分,它定义了一组设备描述,这些设备能够协同工作以实现特定的应用场景或服务。配置...

    彻底明白Zigbee术语——群集(Cluster)、端点(EndPoint)等.docx

    端点255用于广播消息,而241到254是预留端点,有特殊用途。 6. **群集 (Cluster)**:群集是一组相关的属性集合,定义了一个特定的功能或服务,例如照明控制群集或温度测量群集。每个群集都有一个唯一的ID,可以包含...

    实训6:配置故障转移群集服务.docx

    - 创建群集涉及到选择服务器节点、命名群集以及配置群集的管理访问点(包括群集名称和IP地址)。确保群集名称在域中唯一,并分配未使用的IP地址。在确认无误后,向导会创建群集,并显示创建过程和结果报告,以便...

    国家开放大学 网络操作系统管理 形考任务4  配置故障转移群集服务实训

    2. **添加群集节点**:将server1和server2添加为群集节点。 3. **配置群集名称与IP地址**:设置群集名称(例如cluster12)及对应的IP地址(例如10.0.0.12)。 4. **完成创建**:根据提示完成创建过程。 5. **查看...

    故障切换群集和 Microsoft 群集服务的设置

    本文档《故障切换群集和Microsoft群集服务的设置》主要介绍了如何在VMware环境和Microsoft群集服务之间搭建故障切换群集。文档涉及的内容包括ESXi 6.0和VMware vCenter Server 6.0,这些是构建虚拟化环境的关键组件...

    rocketmq-spring-boot-starter:rocketmq-spring-boot-starter

    rocketmq-spring-boot-starter 阿里云RocketMQSpring图书版支持功能: 发送普通消息的三种模式:同步,异步和单向 订阅消息群集,广播 发送和接收顺序消息 交易讯息 延迟讯息 接收和接收定时消息定时消息和延迟消息...

    apache-rocketmq-on-aws:此解决方案可轻松在aws帐户上设置Apache Rocketmq ec2集群

    Apache RocketMQ是一个统一的消息传递引擎以及轻量级的数据处理平台。 AWS上的RocketMQ解决方案使客户能够在AWS云中快速部署RocketMQ集群。 基本群集设置(例如EC2实例类型)也可以在部署期间进行配置。 快速入门...

    Win2003服务器群集创建和配置指南

    在IT领域,Windows Server 2003是微软推出的一款企业级操作系统,广泛应用于服务器环境,特别是对于需要高可用性和负载均衡的企业应用来说,它的群集功能尤其关键。本指南将详细阐述如何创建和配置Win2003服务器群集...

    Windows Server 2003服务器群集创建和配置指南

    **Windows Server 2003服务器群集**是微软提供的一种高可用性解决方案,它通过在多台物理服务器之间共享资源和服务,增强了系统的稳定性和容错能力。服务器群集主要由运行**Microsoft Cluster Service (MSCS)**的...

    rocketmq-exporter:Apache RocketMQ Prometheus导出器

    适用于Prometheus的Apache RocketMQ导出器。 目录 制片人 消费群体 消费者 格拉法纳仪表板 快速开始 兼容性 支持Apache RocketMQ版本4.3.2(及更高版本)。 配置 可以使用不同的属性来配置该映像,有关配置示例,...

    rocketmq-spring:Apache RocketMQ Spring集成

    使用并发模式的消息(广播/群集) 消费有序消息 使用标签或sql92表达式过滤消息 支持消息跟踪 支持身份验证和授权 支持请求-回复消息交换模式 以推/拉模式使用消息 先决条件 JDK 1.8及更高版本 3.0及更高...

    Windows Server 2003 服务器群集创建和配置指南

    **服务器群集**是一种高级配置方法,通过它可以让多台物理服务器协同工作,形成一个统一的服务实体,从而提供高可用性、故障恢复能力和伸缩性。这种架构在**Windows Server 2003**平台上的实现依赖于**Microsoft ...

    安装 win2003群集,群集配置资料(含本人亲自手写档)

    在IT领域,Windows Server 2003群集是一种高可用性解决方案,它允许多台服务器协同工作,形成一个逻辑上的单一系统,以提供服务的连续性和故障转移能力。本资料将详细介绍如何安装和配置Windows Server 2003群集,...

    2003服务器群集创建和配置指南

    **Windows Server 2003 服务器群集创建和配置指南** Windows Server 2003 提供了一种高可用性的解决方案,即服务器群集,它允许多台服务器协同工作,形成一个逻辑上的单一系统,提高了应用程序和服务的稳定性和容错...

    Windows Server 2003服务器群集创建和配置指南.doc

    **Windows Server 2003 服务器群集创建和配置指南** Windows Server 2003 提供了服务器群集功能,这是一种高可用性解决方案,能够通过将多个服务器节点组合成一个逻辑单元来提高关键应用的可靠性和性能。服务器群集...

    rocketmq-operator:Apache RocketMQ运算符

    RocketMQ运算符目录创建RocketMQ集群验证数据存储验证主机路径存储验证NFS存储水平刻度名称服务器群集规模经纪人集群规模乱序消息场景中的高级中间人主题转移清洁环境发展先决条件建造操作员代理和名称服务器映像 ...

    windows server 2012故障转移群集搭建指导手册

    本手册将指导用户如何搭建 Windows Server 2012 故障转移群集环境,以实现高可用性和负载均衡。该手册将分为三个部分:环境准备、搭建过程和群集配置。 环境准备 在开始搭建故障转移群集之前,需要准备好以下环境...

Global site tag (gtag.js) - Google Analytics