通过Tag对消息分类
RocketMQ建议一个业务系统只使用一个Topic,不同类型的消息通过tag来区分。tag可以在构造Message的时候指定,下面代码就指定了发送的消息的tag都为tag0。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", "tag0", ("hello with tag---" + i).getBytes());
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
也可以通过Message的setTags()
进行指定。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", ("hello with tag---" + i).getBytes());
message.setTags("tag0");
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
虽然参数名叫tags,但是一条消息只能指定一个tag。消费者进行消费的时候也可以指定需要消费的消息对应的tag,比如下面就指定了需要消费的消息对应的Topic是topic1,Tag是tag0。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
同一消费者也可以同时订阅同一个Topic的多个Tag,多个Tag之间通过||
进行分隔。比如下面代码就同时订阅了tag0、tag1和tag2。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0||tag1||tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
(注:本文基于RocketMQ4.5.0所写)
相关推荐
* **过滤消息**:通过TAG、SQL表达式或类过滤模式进行消息筛选。 * **事务消息**:确保消息发送与业务操作的原子性,支持分布式事务。 **4. 角色介绍** * **NameServer**:负责服务发现和路由管理,保持轻量级且无...
分布式消息队列RocketMQ RocketMQ是Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件,由阿里捐赠。...消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,选择性地消费消息。
本文将深入探讨如何通过自定义Selector实现在RocketMQ中进行消息通道定向的发送和拉取。 在RocketMQ中,Selector是用于过滤和选择消息的关键组件,它允许消费者根据特定的条件从队列中选择要消费的消息。默认的...
Tag是消息的一种分类,可以用来区分不同类型的业务消息。消费者可以通过订阅特定Tag来接收感兴趣的消息。同时,还可以使用属性过滤,通过设置消息的用户属性并使用SQL表达式进行筛选,提供了更灵活的消息过滤机制。 ...
6. **消息模型**:RocketMQ支持两种消息模型——点对点(Point-to-Point,简称P2P)和发布/订阅(Publish/Subscribe,简称Pub/Sub)。P2P模型中,每个消息仅被一个消费者消费,适合一对一通信;Pub/Sub模型下,消息...
RocketMQ是中国阿里巴巴...通过学习RocketMQ,开发者不仅能掌握消息队列的核心概念,还能提升对分布式系统的理解和实践能力。提供的"RocketMQ源码解析.pdf"和"RocketMQ入门指南.pdf"将是深入理解RocketMQ的宝贵资源。
在iOS应用开发中,3D效果通常通过OpenGL ES或SceneKit等框架来实现,这要求开发者对3D图形编程有深入的理解。 【描述】中提到的"IOS应用源码——3D TAG 云.zip",意味着这是一个完整的项目代码包,包含了实现3D标签...
RocketMQ支持标签(Tag)过滤,Consumer可以通过设置Tag来筛选感兴趣的消息,提高消费效率。 八、消费策略 1. 广播消费:每个Consumer实例都能收到所有消息,适用于需要全局同步的场景。 2. 精准消费:每个...
通过对RocketMQ源码的深入学习,我们可以了解到其实现消息的存储、传输、消费等过程的具体细节,这对于优化性能、解决实际问题以及理解分布式系统的设计理念具有重要意义。在源码中,你可以看到如MessageStore、...
【JSP实例开发源码——Noka tag 软件标签.zip】是一个关于JSP(Java Server Pages)的实例项目,其中包含了一种名为“Noka tag”的自定义标签库。这个实例主要用于演示如何在JSP应用中创建和使用自定义标签,以提高...
- **顺序消息**:在特定场景下,我们需要保证消息的处理顺序,RocketMQ提供了一种基于消息Tag的顺序消息机制,需要合理设计生产者和消费者的处理逻辑。 4. **源码解析**: - **启动类**:在SpringBoot的主类中,...
顺序队列用tag标记来判断front==rear相等时队列时空还是满。
RocketMQ支持基于标签(Tag)的消息过滤,允许Consumer根据需求筛选出感兴趣的消息。 11. **高可用与扩展性** RocketMQ通过主备切换和集群部署保证服务高可用,同时支持水平扩展,增加Broker节点可以线性提升系统...
9. **消息过滤**:RocketMQ 提供了基于 Tag 的消息过滤功能,允许消费者根据消息的 Tag 过滤和消费感兴趣的消息。 10. **监控与管理**:RocketMQ 提供了管理控制台,可以实时查看消息的发送、消费状态,监控系统的...
深入研究RocketMQ的源码,不仅可以提升对消息中间件的理解,也能为实际项目中的问题定位和性能优化提供宝贵经验。在学习过程中,可以结合官方文档、社区讨论和开源项目中的实践案例,逐步掌握RocketMQ的精髓。
RocketMQ 是一款高性能、分布式的消息中间件,常用于构建大规模分布式系统中的消息传递。尚硅谷提供的 RocketMQ 学习视频笔记旨在帮助初学者系统地掌握 RocketMQ 的核心概念和使用方法。 1. **MQ 简述** 消息队列...
RocketMQ是一个分布式消息和流平台,它被设计为简单和可靠的消息中间件。 以下是RocketMQ的一些关键特性和概述: 1.分布式:RocketMQ集群能够水平扩展,支持高并发和高容错。 2.消息队列:RocketMQ能够保证消息的顺序和...
4. **易于测试**:Spring的测试框架可以方便地对消息发送和接收进行单元测试,提高代码质量。 **集成步骤** 1. **添加依赖**:在`pom.xml`文件中引入RocketMQ Spring的依赖。 2. **配置RocketMQ**:在`application....
- **消息顺序**:RocketMQ支持基于Tag的顺序消息,通过特定的队列策略保证消息的顺序消费。 5. **消费模式** - **集群消费**:多个Consumer实例共同消费一个队列,消息被随机分发到其中一个Consumer。 - **广播...
7. **消息过滤**:RocketMQ支持基于标签(Tag)的消息过滤,消费者可以根据需要过滤接收的消息。 8. **消息重试与死信队列**:RocketMQ有内置的重试机制,当消费者处理消息失败时,消息会被自动重试。若多次尝试后...