近期学习了阿里的分布式消息中间件RocketMQ,对它的进行了基本的使用,写一篇博客记录一下:
1. 资料获取
RocketMQ相关资料基本都在RocketMQ在github上的主页:
https://github.com/alibaba/RocketMQ
相关软件、客户端包括源码的下载可以到:
https://github.com/alibaba/RocketMQ/releases
目前最新:v3.2.6【alibaba-rocketmq-3.2.6.tar.gz】
用户开发手册需要按照要求回复后,手册会发送到邮箱:
https://github.com/alibaba/RocketMQ/issues/1
【该手册对RocketMQ进行了一些介绍,但是并不是特别详细】
提供一个快速入门,可以到博客查看:
http://blog.csdn.net/a19881029/article/details/34446629
2. 部署RocketMQ
RocketMQ需要部署Name Server服务器和broker服务器,而broker服务器由有多种部署方式【master-slave】,启动都需要JDK以及JAVA_HOME环境变量,由于实验室机器有限,有两台机器:
172.13.206.165 部署Name Server和一个master broker
172.13.206.38 部署一个slave broker
(1) 首先部署Name Server【个人感觉类似JNDI,主要管理broker的注册信息】
A. 拷贝一份alibaba-rocketmq-3.2.6.tar.gz到机器172.13.206.16上并解压
B. cd 到bin目录,可以打开README.md开一下,里面简单介绍了Name Server和broker的启动命令
C. Name Server 的启动命令如下:
nohup sh mqnamesrv &
D. 启动前可以先看一下mqnamesrv的脚本,发现它实际是执行runserver.sh脚本让它去执行com.alibaba.rocketmq.namesrv.NamesrvStartup的main函数来启动Name Server,在runserver.sh脚本中可以看到JVM的启动参数配置:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
由于我的机器内存只有2g,所以需要修改一下JVM的启动参数【用户根据机器情况配置自己的启动参数】,我的修改:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=256m"
E. Name Server 的默认监听端口号为9876,所以我的Name Server地址为:172.13.206.165:9876 【Name Server是无状态的,可以很便利的进行水平扩展】
F. Name Server的关闭命令
sh mqshutdown namesrv
(2) 部署broker【消息中转角色,负责存储消息,转发消息】
broker集群有多种配置的策略,根据用户手册,大致有四种部署策略:
(1)单个Master
这种配置简单,但是风险比较大,一旦broker宕机会导致整个服务不可用。【实际中不会用这种方式】
(2)多Master
集群中无slave,有多个是master,单个broker宕机不会对应用有影响,性能最高;但是单台机器宕机期间,这台机器上未消费的消息在机器恢复之前不可订阅,消息实时性受影响。
(3)多Master多Slave+异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级;性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预;Master宕机或磁盘损坏时会有少量消息丢失。
(4)多Master多Slave+同步双写
每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功;服务可用性与数据可用性非常高;性能比异步HA略低。
在RocketMQ_HOME/conf目录下提供了四种配置的broker启动配置的示例文件,学习时可以按照这些配置
启动broker:
A. 生成配置文件
如果想对broker的启动进行更详细的掌控,可以使用以下命令生成配置文件模板:
sh mqbroker -m > broker.p
生成的broker.p文件如下,可以对配置进行修改:
namesrvAddr= brokerIP1=172.13.206.165 brokerName=issme-System-Product-Name brokerClusterName=DefaultCluster brokerId=0 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true rejectTransactionMessage=false fetchNamesrvAddrByAddressServer=false storePathRootDir=/home/issme/store storePathCommitLog=/home/issme/store/commitlog flushIntervalCommitLog=1000 flushCommitLogTimed=false deleteWhen=04 fileReservedTime=72 maxTransferBytesOnMessageInMemory=262144 maxTransferCountOnMessageInMemory=32 maxTransferBytesOnMessageInDisk=65536 maxTransferCountOnMessageInDisk=8 accessMessageInMemoryMaxRatio=40 messageIndexEnable=true messageIndexSafe=false haMasterAddress= brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH cleanFileForciblyEnable=true
B. 启动broker【启动一个Master和一个Slave,HA采用异步复制】
这里为了简化,采用conf中给出的配置文件对broker进行启动,broker的启动命令:
nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a.properties & nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a-s.properties &
这里启动了一个master和一个slave,它们是通过brokerName来进行配对,master的brokerId必须为0,而slave的brokerId不为0【一个master可以配置多个slave】; -n 指定Name Server地址,-c 指定配置文件的地址,这样broker启动完毕,默认端口号为10911:
master地址:172.13.206.165:10911
slave地址: 172.13.206.38:10911
C. 类似的我们可以看一下mqbroker的启动脚本,发现它实际是执行runbroker.sh脚本让它去执行com.alibaba.rocketmq.broker.BrokerStartup的main函数来启动broker,在runbroker.sh脚本中同样可以修改JVM的启动参数
D. broker关闭命令
sh mqshutdown broker
3. 客户端程序编写
这里编写了三个简单的客户端代码【具体根据业务调整】
引用RocketMQ客户端jar文件:
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
(1) producer
public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("172.13.206.165:9876"); try { producer.start(); for(int i=0; i<20; i++) { Message msg = new Message("TestTopicA", "Push", "test1", "Test Msg 1".getBytes(Charset.forName("utf-8"))); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("TestTopicA", "Pull", "test2", "Test Msg 2".getBytes(Charset.forName("utf-8"))); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
这里需要注意的是producer.shutdown(),它应用退出时,要调用来清理资源,关闭网络连接,从MetaQ服务器上注销自己,一般建议写在tomcat或jboss的退出钩子。
(2)PushConsumer
Push方式消费消息【根据RocketMQ开发手册,Push方式是以Pull长轮询的方式实现的】
public class PushConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("172.13.206.165:9876"); try { consumer.subscribe("TestTopicA", "*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { Message msg = msgs.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
(3)PullConsumer
以Pull方式消费消息:
public class PullConsumer { public static void main(String[] args) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer( "PullConsumer"); consumer.setNamesrvAddr("172.13.206.165:9876"); try { consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TestTopicA"); for(MessageQueue mq : mqs) { System.out.println("Consumer From the queue:" + mq); long offset = 0; PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32); List<MessageExt> msgs = result.getMsgFoundList(); if(msgs!=null && msgs.size() != 0) { for(MessageExt msg : msgs) { System.out.println(new String(msg.getBody(), Charset.forName("utf-8"))); } } offset = result.getNextBeginOffset(); System.out.println(result.getPullStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关推荐
RocketMQ是一款高性能、分布式的消息中间件,常用于大型分布式系统中的消息传递。Prometheus是一款流行的开源监控和警报工具,可以收集并存储时间序列数据,为包括RocketMQ在内的各种服务提供详细的监控。在这个场景...
prometheus监控rocketmq用到的rocketmq-exporter jar包,官方github.com/apache/rocketmq-exporter,mvn打包
Apache RocketMQ 是一款分布式消息中间件,它提供了高吞吐量、低延迟的消息传输服务。其核心概念包括: - **消息(Message):** 最小的数据单元,可以携带应用层数据。 - **主题(Topic):** 消息的分类,用来区分不同...
"QuickStart"是一个针对Telerik产品的入门教程或快速启动指南,旨在帮助用户迅速熟悉和掌握Telerik的相关工具和功能。Telerik是一家知名的软件开发公司,提供一系列用于构建高性能、美观的桌面和移动应用的开发工具...
----------------------------rocketmq 消息队列 ---------------------------- brokerIP1=192.168.1.144 ... tools.cmd org.apache.rocketmq.example.quickstart.Consumer 启动生产者(先后输入):set NAMESRV_
8. **中间件**:中间件是处理请求的过滤器,可以用于权限检查、会话管理和执行其他任务。它们可以全局应用,也可以针对特定路由或控制器。 9. **艺术酱 (Artisan)**:Artisan 是 Laravel 自带的命令行工具,它提供...
代码中国网站提供了丰富的开源项目、技术文章和论坛交流,对于理解和学习QuickStart的源码非常有帮助。 4. 非常专业的QuickStart软件源码: 这无疑是压缩包的核心内容,包含了软件的全部源代码。通过分析源码,我们...
Autoware是一款基于ROS(Robot Operating System)的开源软件,由名古屋大学开发,主要用于自动驾驶的研究与开发。它在GitHub上开放源代码,支持Linux和eSOL eMCOS等操作系统,并且能够运行在具有CPU、GPU的设备上,...
越来越发现,用鼠标将浪费我大量的时间,所以,最近我一直拼命的学习windows的快捷按键。现在很多操作都可以使用快捷按键完成。但是我发现我没法用很快的速度执行我选择的软件执行。...还有一个就是使用这个软件的设置...
`maven-archetype-quickstart-1.1.jar` 是一个Maven的快速启动 archetype 包,专门用于帮助开发者快速创建一个新的Maven项目结构。在这个压缩包中,你将找到一个预设的Java项目模板,以便于你能够迅速地开始编码。 ...
1.maven-archetype-quickstart-1.1.jar 用于搭建maven模块项目 2.打开cmd窗口,执行mvn install:install-file -DgroupId=org.apache.maven.archetypes -DartifactId=maven-archetype-quickstart -Dversion=1.1 -...
5. **LICENSE**和**NOTICE**文件:通常,开源项目会包含这些文件,用来声明版权和使用许可信息。 使用这个archetype,开发者可以避免手动创建目录结构和编写基础配置文件的繁琐工作。只需按照提示输入项目相关的...
【Android Quickstart范例详解】 Android Quickstart是针对初学者设计的一系列教程和示例代码,旨在帮助开发者快速上手Android应用开发。这个压缩包"quickstart-android-master"包含了所有必要的资源和代码,是你...
解决Unable to create project from archetype [org.apache.maven.archetypes:maven-archetype-quickstart:1.1] 1. 下载maven-archetype-quickstart-1.1.jar 文件地址: 2.cmd窗口执行mvn install:install-file -...
Robot Framework 是一个开源自动化测试框架,设计用于支持各种不同类型的测试,包括功能测试、验收测试、回归测试以及更多其他类型。它以关键字驱动的方式工作,使得测试用例易于编写和理解,尤其适合非编程背景的...
在解压"quickstart.rar"后,文件名列表中只有一个条目"quickstart",这可能意味着压缩包包含了一个名为"quickstart"的目录,该目录下就是按照Maven Quickstart Archetype生成的项目结构,包括pom.xml、src/main/java...
"quickstart-android,适用于android的firebase快速启动示例.zip"是一个开源项目,旨在帮助Android开发者快速了解并集成Firebase的各种功能到他们的应用中。 在压缩包中的"quickstart-android-master"文件夹中,你将...
.NET快速入门教程QuickStart 中文版。希望对.net感兴趣的朋友有所帮助。
Drupal 7是一套流行的开源内容管理系统(CMS),由Dries Buytaert创建。它允许用户创建和管理网站内容,并具有高度的可扩展性和灵活性。《Drupal 7 视觉快速指导》是一本由Tom Geller所著的入门教程,旨在通过...
标题《quickstart_uagateway.pdf》 描述《quickstart_uagateway》 标签《quickstart uagateway》 知识点: 1. UaGateway介绍: UaGateway是一个运行在Windows操作系统上的32位NT服务。它支持32位或64位(WOW64)的...