- 浏览: 174935 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
19941:
大神,提供的demo读取配置文件有问题啊,一直读取不到。。。
整合了DFS+DHT+MyFastClitn的java客户端实现 -
lobin:
为什么我启动的时候,能启动, 但报如下错误呢?[2016-12 ...
FastDHT(分布式hash系统)安装和与FastDFS整合实现自定义文件ID -
yuqiyi:
这篇文章为了凸显fst得效率也是醉了.其一:kryo不做任何优 ...
高性能序列化框架FST -
a6186694:
<div class="quote_title ...
淘宝消息中间件RocketMQ的安装和简单使用 -
bo_hai:
能详细讲讲吗?
FastDHT(分布式hash系统)安装和与FastDFS整合实现自定义文件ID
RocketMQ是什么?
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
RocketMQ是alibaba开源的java消息中间件。在github上开源,项目同时也在oschina上。地址:https://git.oschina.net/vintagewang/RocketMQ/
下载RocketMQ3.0.8 最新的是3.1.0 试了好多次,编译怎么都不行啊。
果断使用了3.0.8
https://github.com/alibaba/RocketMQ/archive/v3.0.8.zip 本人机器有点问题git居然安装不上(纳闷啊)直接下的zip包。
解压开来,找到项目中pom.xml并且把其中的
<parent>
<groupId>com.taobao</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent>
给注释掉,不然弄死不能编译的,我可是折腾了好久好久啊,这是由于阿里开源不彻底导致的。
下面开始编译吧。cmd控制台进到解压文件夹的pom.xml目录 执行
mvn -Dmaven.test.skip=true clean package install assembly:assembly -U 或者直接运行根目录下的install.cmd 推荐控制台运行,能直观的看到错误信息。install.cmd出错了控制台就没了。注释了parent之后应该就不报错了。等待它去网上下载完的jar包,并且编译完吧。
编译完成之后,根目录下回多出来一个叫target的目录,将里面的alibaba-rocketmq-3.0.8文件夹拷贝到你的安装盘符下吧,
命令行进入到${alibaba-rocketmq-3.0.8}/bin 输入start mqnamesrv.exe 会弹出一个信息窗口,记录的是日志也可以写到文件中 后面接着写 >${你的日志存放目录} ,在日志文件中看到The name Server boot success 说明启动成功了,输入jps -v 检测
2636 -Djava.ext.dirs=F:\taobao\alibaba-rocketmq-3.0.8\alibaba-rocketmq\bin/../lib-Drocketmq.home.dir=F:\taobao\alibaba-rocketmq-3.0.8\alibaba-rocketmq\bin/..-XX:MaxNewSize=512M -XX:MaxPermSize=128M -XX:NewSize=256M -XX:PermSize=128M -Xms512m -Xmx1g exit abort
启动borker start mqbroker.exe -n 127.0.0.1:9876 同样的弹出一个窗口 看到success表示成功了,文件日志同上。
Java操作列子,来自开源项目中。
建立一个普通的maven项目
Pom中加入如下配置
<dependencies> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.8</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.8</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies>
消息生产者
package rocketmq_test.test; import java.util.concurrent.TimeUnit; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException{ /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ProducerGroupName需要由应用来保证唯一<br> * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("Producer"); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br> * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br> * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ for (int i = 0; i < 10; i++){ try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQA").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQB").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQC").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } }catch(Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ //producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } }
消费者处理消息
package rocketmq_test.test; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { /** * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br> * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException{ /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ConsumerGroupName需要由应用来保证唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("Consumber"); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("TopicTest1","TagA || TagC || TagD"); /** * 订阅指定topic下所有消息<br> * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("TopicTest2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt>msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() +" Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if(msg.getTopic().equals("TopicTest1")) { //执行TopicTest1的消费逻辑 if(msg.getTags() != null && msg.getTags().equals("TagA")) { //执行TagA的消费 System.out.println(new String(msg.getBody())); }else if (msg.getTags() != null &&msg.getTags().equals("TagC")) { //执行TagC的消费 System.out.println(new String(msg.getBody())); }else if (msg.getTags() != null &&msg.getTags().equals("TagD")) { //执行TagD的消费 System.out.println(new String(msg.getBody())); } }else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("ConsumerStarted."); } }
启动消费者
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-8 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-9 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-10 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-11 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-12 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-13 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-14 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-15 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-16 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-17 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-18 Receive New Messages: 1
Hello MetaQB
ConsumeMessageThread-ConsumerGroupName-19 Receive New Messages: 1
Hello MetaQA
ConsumeMessageThread-ConsumerGroupName-20 Receive New Messages: 1
Hello MetaQB
运行生产者,
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002715, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F00000000000027AB, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002840, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F00000000000028D5, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F000000000000296B, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002A00, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002A95, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002B2B, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002BC0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002C55, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002CEB, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002D80, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002E15, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002EAB, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002F40, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000002FD5, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F000000000000306B, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000003100, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000003195, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F000000000000322B, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=0], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F00000000000032C0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F0000000000003355, messageQueue=MessageQueue [topic=TopicTest2, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8490100002A9F00000000000033EB, messageQueue=MessageQueue [topic=TopicTest3, brokerName=E97M3HS2ANMGCSZ, queueId=1], queueOffset=8]
评论
3 楼
a6186694
2016-11-05
247687009 写道
aiyoaiyo0330 写道
请教个问题 我按照你的代码 写了一遍 但是报错 com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1 我把mqnamesrv.exe 开启了 我是在windows下运行的
你可以去看看淘宝开源
不能直接启动mqbroker.exe ;
得这样子 开启另一个windows终端cmd,进入解压的bin目录,也可一步输入mqbroker -n 127.0.0.1:9876启动broker,保持mqbroker.exe运行,不要关闭这个终端;然后把这个9876端口号写在 consumer.setNamesrvAddr("127.0.0.1:9876");
2 楼
yjq8116
2015-01-02
247687009 写道
aiyoaiyo0330 写道
请教个问题 我按照你的代码 写了一遍 但是报错 com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1 我把mqnamesrv.exe 开启了 我是在windows下运行的
你可以去看看淘宝开源
这个问题我也遇到了,这是因为启动broker需要 mqbroker.exe -n localhost:9876,而不是双击 mqbroker.exe
1 楼
247687009
2014-06-13
aiyoaiyo0330 写道
请教个问题 我按照你的代码 写了一遍 但是报错 com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1 我把mqnamesrv.exe 开启了 我是在windows下运行的
你可以去看看淘宝开源
相关推荐
阿里分布式消息中间件RocketMQ深入解析 RocketMQ是阿里巴巴自研的第三代分布式消息中间件,2012年开源,2016年捐献给Apache软件基金会,成为孵化项目。RocketMQ具有高性能、低延迟、可靠重试、分布式事务等特性,...
标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...
消息中间件rocketmq源码解析,rocketmq的相关使用方法及源码分析
《RocketMQ 开发指南》是一本详尽介绍Apache RocketMQ这一高效消息中间件的书籍,旨在帮助开发者理解和掌握其核心概念、使用方法以及最佳实践。RocketMQ,作为阿里巴巴开源的分布式消息中间件,广泛应用于大数据处理...
集群订阅和广播订阅的消费场景。内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,延迟等级调整等)实现压测的精细化控制。毫不夸张的说,RocketMQ 覆盖的功能基本都能测到。 3 ...
消息中间件RocketMq学习
这个“消息中间件 RocketMQ 发布和订阅 Demo”是一个适合初学者的入门示例,通过 Java 编写,利用 Maven 进行项目管理,旨在帮助开发者快速理解如何使用 RocketMQ 实现发布和订阅操作。 首先,我们需要了解 ...
使用阿里中间件RocketMQ、Tair、jstorm对双十一实时交易进行实时计算 使用阿里中间件RocketMQ、Tair、jstorm对双十一实时交易进行实时计算 使用阿里中间件RocketMQ、Tair、jstorm对双十一实时交易进行实时计算 使用...
在那里,你可以找到最权威的文档、问题解答和原理介绍,从而更好地理解和掌握RocketMQ的使用和原理。 通过以上内容的学习,我们可以对RocketMQ的工作原理和核心机制有更深入的理解,这对于在实际开发中解决遇到的...
消息中间件RocketMQ测试程序
RocketMQ是阿里巴巴开源的一款高性能的分布式消息中间件,对于学习和使用RocketMQ,建议从其官方GitHub仓库获取最权威的文档、问题解答和原理介绍。RocketMQ的应用场景广泛,尤其适合处理大规模分布式系统中的消息...
消息中间件RocketMQ实战视频教程教程+代码+文档
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于解耦应用、异步处理以及提高系统的响应速度和吞吐量。本压缩包中的资源可能是一个逐步学习 RocketMQ 的教程,从基础...
RocketMQ,作为一款开源的消息中间件,源自阿里巴巴,并在2016年捐赠给了Apache软件基金会,成为顶级项目。RocketMQ的设计目标是提供低延迟、高可扩展性、高可靠性的分布式消息传递服务,适用于大数据领域的实时数据...
1. 消息可靠性: 2. 消息低延迟: 3. 每个消息⾄少投递⼀次: 4. 每个消息只消费⼀次: 5. Broker的Buffer满了怎么办? 6. 回溯消息: 7. 消息堆积: 8. 分布式事务: 9. 定时消息: 10. 消息重试: 11. RocketMq是...
RocketMQ是一款由阿里巴巴开源的消息中间件,它在分布式系统中起着至关重要的作用,用于解耦应用程序、实现异步处理和提高系统的可扩展性。RocketMQ的设计理念是高可用、高吞吐量和低延迟,这使得它在大规模并发场景...
JEECG 智能开发平台消息中间件使用手册主要介绍了 JEECG 智能开发平台中的消息中间件模块的使用方法和详细配置。该中间件模块主要提供了消息提醒功能,包括短信、邮件、微信、系统消息等,旨在为系统中具有消息提醒...
总的来说,RocketMQ作为一种成熟的消息中间件,通过其灵活的设计和强大的功能,为企业级应用提供了可靠的消息传递和系统解耦能力,是学习和实施分布式系统通信的理想选择。了解并熟练掌握RocketMQ,有助于开发者构建...
RocketMQ作为一款强大的消息中间件,提供了丰富的功能和高可扩展性,适用于各种复杂的分布式系统场景。通过理解其核心概念、使用方法和高级特性,开发者能够更好地利用RocketMQ来构建稳定、高效的系统。
消息中间件是现代分布式系统中不可或缺的组件,它在...通过以上内容的学习和实践,你将能够熟练掌握消息中间件的基本概念、使用技巧和最佳实践,从而在实际工作中灵活运用,解决复杂问题,成为一名消息中间件实战高手。