RocketMQ不止可以直接推送消息,在消费端注册监听器进行监听,还可以由消费端决定自己去拉取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/**
* PullConsumer,订阅消息
*/
publicclassPullConsumer{
//Java缓存
privatestaticfinalMap<MessageQueue,Long>offseTable=newHashMap<MessageQueue,Long>();
publicstaticvoidmain(String[]args)throwsMQClientException{
DefaultMQPullConsumer consumer=newDefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取订阅主题的队列,默认队列大小是4
Set<MessageQueue>mqs=consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for(MessageQueue mq:mqs){
System.out.println("Consume from the queue: "+mq);
SINGLE_MQ:while(true){
try{
PullResult pullResult=
consumer.pullBlockIfNotFound(mq,null,getMessageQueueOffset(mq),32);
List<MessageExt>list=pullResult.getMsgFoundList();
if(list!=null&&list.size()<100){
for(MessageExt msg:list){
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
caseFOUND:
// TODO
break;
caseNO_MATCHED_MSG:
break;
caseNO_NEW_MSG:
breakSINGLE_MQ;
caseOFFSET_ILLEGAL:
break;
default:
break;
}
}
catch(Exceptione){
e.printStackTrace();
}
}
}
consumer.shutdown();
}
privatestaticvoidputMessageQueueOffset(MessageQueue mq,longoffset){
offseTable.put(mq,offset);
}
privatestaticlonggetMessageQueueOffset(MessageQueue mq){
Longoffset=offseTable.get(mq);
if(offset!=null){
System.out.println(offset);
returnoffset;
}
return0;
}
|
刚开始的没有细看PullResult对象,以为拉取到的结果没有MessageExt对象还跑到群里面问别人,犯2了
特别要注意 静态变量offsetTable的作用,拉取的是按照从offset(理解为下标)位置开始拉取,拉取N条,offsetTable记录下次拉取的offset位置
http://www.changeself.net/archives/rocketmq%E5%85%A5%E9%97%A8%EF%BC%883%EF%BC%89%E6%8B%89%E5%8F%96%E6%B6%88%E6%81%AF.html
相关推荐
### RocketMQ 入门知识点详解 #### 一、RocketMQ 概述 RocketMQ 是一款高性能、可伸缩的消息中间件,它具有低延迟、高可靠的特点,适用于各种消息传递场景。RocketMQ 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型...
RocketMQ是中国阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着重要的角色,用于处理高并发、低延迟的...提供的"RocketMQ源码解析.pdf"和"RocketMQ入门指南.pdf"将是深入理解RocketMQ的宝贵资源。
4. **消费者(Consumer)**: 消费者从RocketMQ服务器接收消息并处理,有两种消费模式:Push模式(消息推送给消费者)和Pull模式(消费者主动拉取消息)。 5. **NameServer**: 是 RocketMQ 的服务注册与发现组件,...
- **基础概念**:包括生产者、消费者、消息服务器、命名服务器、消息、主题、标签、心跳、监听器、拉取/推送消费模式等。 - **安装步骤**:涵盖Linux和Windows平台,包括JDK安装、RocketMQ压缩包解压、启动命名...
3. **RocketMQ的生产与消费**:Producer通过NameServer获取到Consumer的订阅信息,然后将消息发送到对应的Broker。Consumer根据订阅的主题和Tag从Broker拉取或推送到消息。 4. **消息模型**:RocketMQ支持发布/订阅...
【消息中间件RocketMQ入门】 消息中间件(MQ)是一种在分布式系统中用于通信的技术,它通过消息队列作为消息的容器,使得系统组件之间的交互变得更加高效和解耦。RocketMQ是由阿里巴巴开源的高性能消息中间件,已被...
3. **Consumer**:消息消费者,主要由后台系统用于异步消费消息。 4. **Push Consumer**:由服务端主动推送给消费者端的消息消费方式。 5. **Pull Consumer**:消费者端定时从服务端拉取消息的消费方式。 6. **...
- **Consumer**:消费者,负责从RocketMQ系统中拉取消息或接收消息。 3. **RocketMQ API 使用** - **创建Producer**:开发者需要创建一个 Producer 实例,设置相关的配置,如 Group ID,然后启动它。 - **发送...
这个"rocketmq-demo.zip"压缩包提供了一个入门级的示例,帮助开发者理解RocketMQ的基本工作原理和使用方法。以下是对RocketMQ及其相关代码示例的详细解释。 首先,RocketMQ的核心功能是作为一个消息队列,它在生产...
提供丰富的消息拉取模式3.高效的订阅者水平扩展能力4.实时的消息订阅机制5.亿级消息堆积能力(1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步(2)Broker部署相对复杂,Broker氛围Master...
3. **工作流程**:生产者将消息发送到 Broker,消费者通过NameServer获取消息路由信息,然后从Broker拉取或消费消息。整个过程支持集群部署,确保服务的高可用性。 在《Rocketmq使用.docx》文档中,可能包含以下...
总的来说,这个项目旨在提供一个快速入门RocketMQ的平台,通过实际操作加深对RocketMQ的理解。在Linux环境下编译和安装RocketMQ是一项基础任务,而理解其核心概念和操作流程对于开发者来说至关重要。通过阅读提供的...
拉取方式消费(配置方式复杂,位点可能发生偏移,弃用) 简单入门实例 1.添加maven依赖: < groupId> com . maihaoche < / groupId > < artifactId> spring - boot - starter - rocketmq < / artifactId >...
4. **消费消息**:消费者订阅Topic后,可以从Broker拉取消息进行处理。 此外,Apache RocketMQ社区还提供了各种语言的客户端库,方便开发者快速集成到自己的应用中。同时,社区也鼓励用户加入钉钉群进行交流和支持...
RocketMQ源自Metaq,提供严格的顺序消息、多种消息拉取模式和事务消息等功能,适合大规模分布式系统。RabbitMQ则基于AMQP,支持多种协议,提供丰富的路由和负载均衡功能,适合企业级整合。 **RabbitMQ**的六种工作...
对于卡夫卡的对比,与其他消息系统相比,如RabbitMQ、Redis、ZeroMQ、ActiveMQ、MetaQ/RocketMQ等,卡夫卡具有高性能跨语言的分布式发布/订阅消息系统的特性,支持数据持久化,具有完全分布式的特性,并能够同时支持...
【Kafka 入门到精通】的讲解涵盖了多个关键知识点,包括消息系统的优点、Message Queue 的对比、Kafka 的架构及特性。以下是这些内容的详细阐述: 1. **为何使用消息系统** - **数据持久化**:消息系统能够确保...