`
234390216
  • 浏览: 10229757 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462459
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775243
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398172
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:394946
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679874
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530768
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183565
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467451
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151273
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68020
社区版块
存档分类
最新评论

RocketMQ(03)——通过Tag对消息分类

阅读更多

通过Tag对消息分类

RocketMQ建议一个业务系统只使用一个Topic,不同类型的消息通过tag来区分。tag可以在构造Message的时候指定,下面代码就指定了发送的消息的tag都为tag0。

@Test
public void sendWithTag() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i = 0; i < 10; i++) {
    Message message = new Message("topic1", "tag0", ("hello with tag---" + i).getBytes());
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  producer.shutdown();
}

也可以通过Message的setTags()进行指定。

@Test
public void sendWithTag() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i = 0; i < 10; i++) {
    Message message = new Message("topic1", ("hello with tag---" + i).getBytes());
    message.setTags("tag0");
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  producer.shutdown();
}

虽然参数名叫tags,但是一条消息只能指定一个tag。消费者进行消费的时候也可以指定需要消费的消息对应的tag,比如下面就指定了需要消费的消息对应的Topic是topic1,Tag是tag0。

@Test
public void testConsumeByTag() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag0");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

同一消费者也可以同时订阅同一个Topic的多个Tag,多个Tag之间通过||进行分隔。比如下面代码就同时订阅了tag0、tag1和tag2。

@Test
public void testConsumeByTag() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag0||tag1||tag2");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

(注:本文基于RocketMQ4.5.0所写)

0
1
分享到:
评论

相关推荐

    RocketMQ讲义-03.pdf

    * **过滤消息**:通过TAG、SQL表达式或类过滤模式进行消息筛选。 * **事务消息**:确保消息发送与业务操作的原子性,支持分布式事务。 **4. 角色介绍** * **NameServer**:负责服务发现和路由管理,保持轻量级且无...

    RocketMQ分布式消息队列

    分布式消息队列RocketMQ RocketMQ是Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件,由阿里捐赠。...消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,选择性地消费消息。

    RocketMQ自定义selector实现消息通道定向发送和拉取

    本文将深入探讨如何通过自定义Selector实现在RocketMQ中进行消息通道定向的发送和拉取。 在RocketMQ中,Selector是用于过滤和选择消息的关键组件,它允许消费者根据特定的条件从队列中选择要消费的消息。默认的...

    rocketmq总结.ppt

    Tag是消息的一种分类,可以用来区分不同类型的业务消息。消费者可以通过订阅特定Tag来接收感兴趣的消息。同时,还可以使用属性过滤,通过设置消息的用户属性并使用SQL表达式进行筛选,提供了更灵活的消息过滤机制。 ...

    黑马rocketmq md文档

    6. **消息模型**:RocketMQ支持两种消息模型——点对点(Point-to-Point,简称P2P)和发布/订阅(Publish/Subscribe,简称Pub/Sub)。P2P模型中,每个消息仅被一个消费者消费,适合一对一通信;Pub/Sub模型下,消息...

    RocketMQ入门实战及源码解析.7z

    RocketMQ是中国阿里巴巴...通过学习RocketMQ,开发者不仅能掌握消息队列的核心概念,还能提升对分布式系统的理解和实践能力。提供的"RocketMQ源码解析.pdf"和"RocketMQ入门指南.pdf"将是深入理解RocketMQ的宝贵资源。

    IOS应用源码——3D TAG 云.zip

    在iOS应用开发中,3D效果通常通过OpenGL ES或SceneKit等框架来实现,这要求开发者对3D图形编程有深入的理解。 【描述】中提到的"IOS应用源码——3D TAG 云.zip",意味着这是一个完整的项目代码包,包含了实现3D标签...

    RocketMQ高级原理:深入剖析消息系统的核心机制(超详细整理讲解,值得收藏)

    RocketMQ支持标签(Tag)过滤,Consumer可以通过设置Tag来筛选感兴趣的消息,提高消费效率。 八、消费策略 1. 广播消费:每个Consumer实例都能收到所有消息,适用于需要全局同步的场景。 2. 精准消费:每个...

    rocketmq源码.zip

    通过对RocketMQ源码的深入学习,我们可以了解到其实现消息的存储、传输、消费等过程的具体细节,这对于优化性能、解决实际问题以及理解分布式系统的设计理念具有重要意义。在源码中,你可以看到如MessageStore、...

    JSP实例开发源码——Noka tag 软件标签.zip

    【JSP实例开发源码——Noka tag 软件标签.zip】是一个关于JSP(Java Server Pages)的实例项目,其中包含了一种名为“Noka tag”的自定义标签库。这个实例主要用于演示如何在JSP应用中创建和使用自定义标签,以提高...

    springboot整合rocketmq源码

    - **顺序消息**:在特定场景下,我们需要保证消息的处理顺序,RocketMQ提供了一种基于消息Tag的顺序消息机制,需要合理设计生产者和消费者的处理逻辑。 4. **源码解析**: - **启动类**:在SpringBoot的主类中,...

    顺序队列———tag实现

    顺序队列用tag标记来判断front==rear相等时队列时空还是满。

    RocketMQ3.2.6消息中间件

    RocketMQ支持基于标签(Tag)的消息过滤,允许Consumer根据需求筛选出感兴趣的消息。 11. **高可用与扩展性** RocketMQ通过主备切换和集群部署保证服务高可用,同时支持水平扩展,增加Broker节点可以线性提升系统...

    RocketMQ 3.2.6

    9. **消息过滤**:RocketMQ 提供了基于 Tag 的消息过滤功能,允许消费者根据消息的 Tag 过滤和消费感兴趣的消息。 10. **监控与管理**:RocketMQ 提供了管理控制台,可以实时查看消息的发送、消费状态,监控系统的...

    RocketMQ源码

    深入研究RocketMQ的源码,不仅可以提升对消息中间件的理解,也能为实际项目中的问题定位和性能优化提供宝贵经验。在学习过程中,可以结合官方文档、社区讨论和开源项目中的实践案例,逐步掌握RocketMQ的精髓。

    尚硅谷完整的关于rocketmq的学习视频整理笔记

    RocketMQ 是一款高性能、分布式的消息中间件,常用于构建大规模分布式系统中的消息传递。尚硅谷提供的 RocketMQ 学习视频笔记旨在帮助初学者系统地掌握 RocketMQ 的核心概念和使用方法。 1. **MQ 简述** 消息队列...

    RocketMQ是一个分布式消息和流平台,它被设计为简单和可靠的消息中间件

    RocketMQ是一个分布式消息和流平台,它被设计为简单和可靠的消息中间件。 以下是RocketMQ的一些关键特性和概述: 1.分布式:RocketMQ集群能够水平扩展,支持高并发和高容错。 2.消息队列:RocketMQ能够保证消息的顺序和...

    rocketmq-spring-rocketmq-spring-all-2.0.3_rocketmq_

    4. **易于测试**:Spring的测试框架可以方便地对消息发送和接收进行单元测试,提高代码质量。 **集成步骤** 1. **添加依赖**:在`pom.xml`文件中引入RocketMQ Spring的依赖。 2. **配置RocketMQ**:在`application....

    RocketMQ-3.2.6源码

    - **消息顺序**:RocketMQ支持基于Tag的顺序消息,通过特定的队列策略保证消息的顺序消费。 5. **消费模式** - **集群消费**:多个Consumer实例共同消费一个队列,消息被随机分发到其中一个Consumer。 - **广播...

    RocketMQ代码demo.zip

    消息可以是自定义的对象,通过MessageBuilder进行构建,设置主题(Topic)、标签(Tag)和消息体(Body)。发送消息有两种模式:同步和异步,同步模式会等待服务器返回结果,异步模式则不会阻塞当前线程。 接下来,...

Global site tag (gtag.js) - Google Analytics