public
class
ConsumeMessageClusteringDemo {
protected
void
init() {
DefaultMQPushConsumer consumer =
new
DefaultMQPushConsumer(
"ConsumerGroupName"
);
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
try
{
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe(
"TopicTest1"
,
"TagA || TagC || TagD"
);
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe(
"TopicTest2"
,
"*"
);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new
MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() +
" Receive New Messages: "
+ msgs);
MessageExt msg = msgs.get(
0
);
if
(msg.getTopic().equals(
"TopicTest1"
)) {
if
(msg.getTags() !=
null
&& msg.getTags().equals(
"TagA"
)) {
String msgbody =
new
String(msg.getBody());
System.out.println(msgbody);
}
else
if
(msg.getTags() !=
null
&& msg.getTags().equals(
"TagC"
)) {
String msgbody =
new
String(msg.getBody());
System.out.println(msgbody);
}
else
if
(msg.getTags() !=
null
&& msg.getTags().equals(
"TagD"
)) {
String msgbody =
new
String(msg.getBody());
System.out.println(msgbody);
}
}
else
if
(msg.getTopic().equals(
"TopicTest2"
)) {
}
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
}
catch
(MQClientException e) {
e.printStackTrace();
}
}
}
相关推荐
**RocketMQ 开发手册 3.2.4 知识点详解** Apache RocketMQ 是一个高性能、分布式的消息中间件,最初由阿里巴巴开源,并且现在已经成为Apache顶级项目。它主要用于实现应用解耦、消息队列、异步处理以及构建事件驱动...
RocketMQ作为开源项目,社区活跃,用户可根据自己的需求进行定制开发,并且能够从社区获得支持。它的源码托管在GitHub上,为社区贡献者提供了共同参与和改进产品的平台。 #### 官网与社区链接 - 官网地址:*** * ...
RocketMQ开发手册3.2.4版本详细介绍了该产品的架构、特性、使用方法和最佳实践。本文将结合手册内容,对RocketMQ的核心知识点进行详尽阐述。 首先,RocketMQ采用了发布/订阅(Publish/Subscribe)模式,这是消息...
RocketMQ是一个高性能、高可用的消息中间件,它在分布式系统中扮演着至关重要的角色,提供可靠的消息传递和消息队列服务。以下从文档中提取的知识点涵盖了RocketMQ的核心特性和解决的消息中间件问题。 1. RocketMQ...
本文档旨在描述 RocketMQ 的多个关键特性的实现原理,幵对消息中间件遇到的各种问题迕行总结,阐述 RocketMQ 如何解决返些问题。文中主要引用了 JMS 规范不 CORBA Notification 规范,规范为我们设计系统挃明了 方吐...
### RocketMQ开发指南知识点概述 #### 一、RocketMQ简介及发展历程 - **RocketMQ**是由阿里巴巴自主研发的消息中间件,经过多年的迭代和发展,已经成为业界广泛使用的高性能消息队列系统之一。 - **版本**: 本指南...
《RocketMQ开发手册3.2.4》是2015年发布的一份详细指南,主要面向希望深入了解和使用RocketMQ的开发者。RocketMQ是由阿里巴巴开源的分布式消息中间件,它在大规模分布式系统中扮演着重要的角色,提供高可用、高可靠...
在这个"rocketmq4.5.0安装部署(安装包,开发手册,配置文件).zip"压缩包中,包含了进行RocketMQ 4.5.0版本安装和部署所需的所有关键资源。 首先,我们来看"RocketMQ 开发手册3.2.4.pdf",这是一份详细的开发者指南...
杨开元(详细书签).pdf》、《RocketMQ架构原理剖析.pptx》和《RocketMQ开发手册3.2.4.pdf》,它们涵盖了从实战技巧到深度理论的全面知识。 首先,《RocketMQ实战与原理解析.杨开元(详细书签).pdf》由杨开元撰写,这...
3.2.4版本的开发手册提供了全面的指南,帮助开发者深入理解和使用RocketMQ。以下是对RocketMQ核心知识点的详细阐述: 一、RocketMQ概述 RocketMQ源于阿里巴巴内部的消息系统,后逐渐发展成为一款开源产品。它具有高...
- **定义**:RocketMQ是一款高性能、可扩展的分布式消息中间件,它由阿里巴巴集团开发并开源,广泛应用于微服务架构中进行消息传递。 - **特点**: - **严格的顺序保证**:支持按需保障消息顺序,确保消息处理的...
### RocketMQ 使用手册 #### 一、RocketMQ 简介及特性 RocketMQ 是一款高性能、可扩展的分布式消息中间件,它被设计用于解决大规模互联网应用中的消息传输问题。该中间件具备以下特点: 1. **严格的消息顺序保证*...
本文档集合包含了RocketMQ的核心参数配置、使用方法以及详细的手册,适用于开发人员、运维人员以及对消息队列感兴趣的学者进行深入学习。 首先,我们来看看《rocketmq参数配置.pdf》。这份文档详细介绍了RocketMQ的...
总的来说,"rocketmq-externals-release-rocketmq-console-1.0.0"压缩包提供了RocketMQ集群管理的重要工具,是运维和开发人员管理RocketMQ系统不可或缺的一部分。通过理解和使用这个控制台,可以更有效地监控和优化...
《RocketMQ_userguide》是RocketMQ的用户手册,它详细介绍了如何安装、配置和使用RocketMQ。在这个文档中,你可以了解到RocketMQ的基本架构,包括NameServer、Producer、Consumer以及Broker等核心组件。NameServer是...
总的来说,这个压缩包是一个全面的学习资源,适合那些想要理解和掌握RocketMQ的开发人员,无论是初学者还是有经验的开发者,都能从中受益。通过阅读资料、实践代码示例和查阅文档,学习者可以系统地学习RocketMQ,并...
4. **docs** 目录:文档资料,包括用户手册、开发者指南和API参考,对于理解RocketMQ的工作原理和开发应用非常有帮助。 5. **src** 目录:源代码,虽然通常在部署时我们不需要修改源码,但对于深入理解 RocketMQ ...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。...无论是开发分布式应用、实施数据同步,还是进行复杂的消息处理,RocketMQ都是一个值得信赖的选择。
《阿里巴巴Java开发手册》是阿里巴巴集团为提升Java开发效率与代码质量而编撰的一份重要指导文档,尤其在“码出高效,码出质量”的愿景下,它为开发者提供了全面的编程规范和最佳实践。这份手册针对不同级别的开发者...