`

阿里开源消息中间件RocketMQ QuickStart

阅读更多

近期学习了阿里的分布式消息中间件RocketMQ,对它的进行了基本的使用,写一篇博客记录一下:

 

1. 资料获取

RocketMQ相关资料基本都在RocketMQ在github上的主页:

https://github.com/alibaba/RocketMQ

 

相关软件、客户端包括源码的下载可以到:

https://github.com/alibaba/RocketMQ/releases

目前最新:v3.2.6【alibaba-rocketmq-3.2.6.tar.gz】

 

用户开发手册需要按照要求回复后,手册会发送到邮箱:

https://github.com/alibaba/RocketMQ/issues/1

【该手册对RocketMQ进行了一些介绍,但是并不是特别详细】

 

提供一个快速入门,可以到博客查看:

http://blog.csdn.net/a19881029/article/details/34446629

 

2. 部署RocketMQ

RocketMQ需要部署Name Server服务器和broker服务器,而broker服务器由有多种部署方式【master-slave】,启动都需要JDK以及JAVA_HOME环境变量,由于实验室机器有限,有两台机器:

172.13.206.165 部署Name Server和一个master broker

172.13.206.38   部署一个slave broker

 

 (1) 首先部署Name Server【个人感觉类似JNDI,主要管理broker的注册信息】

 

A. 拷贝一份alibaba-rocketmq-3.2.6.tar.gz到机器172.13.206.16上并解压

B. cd 到bin目录,可以打开README.md开一下,里面简单介绍了Name Server和broker的启动命令

C. Name Server 的启动命令如下:

 

 

nohup sh mqnamesrv &
 

 

D. 启动前可以先看一下mqnamesrv的脚本,发现它实际是执行runserver.sh脚本让它去执行com.alibaba.rocketmq.namesrv.NamesrvStartup的main函数来启动Name Server,在runserver.sh脚本中可以看到JVM的启动参数配置:

 

 

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
 

 

由于我的机器内存只有2g,所以需要修改一下JVM的启动参数【用户根据机器情况配置自己的启动参数】,我的修改:

 

 

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=256m"
 

 

E. Name Server 的默认监听端口号为9876,所以我的Name Server地址为:172.13.206.165:9876 【Name Server是无状态的,可以很便利的进行水平扩展】

 

F. Name Server的关闭命令

 

 

sh mqshutdown namesrv
 

 

(2) 部署broker【消息中转角色,负责存储消息,转发消息】

broker集群有多种配置的策略,根据用户手册,大致有四种部署策略:

(1)单个Master

这种配置简单,但是风险比较大,一旦broker宕机会导致整个服务不可用。【实际中不会用这种方式】

(2)多Master

集群中无slave,有多个是master,单个broker宕机不会对应用有影响,性能最高;但是单台机器宕机期间,这台机器上未消费的消息在机器恢复之前不可订阅,消息实时性受影响。

(3)多Master多Slave+异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级;性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预;Master宕机或磁盘损坏时会有少量消息丢失。

(4)多Master多Slave+同步双写

每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功;服务可用性与数据可用性非常高;性能比异步HA略低。

在RocketMQ_HOME/conf目录下提供了四种配置的broker启动配置的示例文件,学习时可以按照这些配置

 

启动broker:

A. 生成配置文件

如果想对broker的启动进行更详细的掌控,可以使用以下命令生成配置文件模板:

 

 

sh mqbroker -m > broker.p
 

 

生成的broker.p文件如下,可以对配置进行修改:

 

namesrvAddr=
brokerIP1=172.13.206.165
brokerName=issme-System-Product-Name
brokerClusterName=DefaultCluster
brokerId=0
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/home/issme/store
storePathCommitLog=/home/issme/store/commitlog
flushIntervalCommitLog=1000
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=72
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
 

 

B. 启动broker【启动一个Master和一个Slave,HA采用异步复制】

这里为了简化,采用conf中给出的配置文件对broker进行启动,broker的启动命令:

 

 

nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a.properties &

nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a-s.properties & 
 

 

这里启动了一个master和一个slave,它们是通过brokerName来进行配对,master的brokerId必须为0,而slave的brokerId不为0【一个master可以配置多个slave】; -n 指定Name Server地址,-c 指定配置文件的地址,这样broker启动完毕,默认端口号为10911:

master地址:172.13.206.165:10911

slave地址: 172.13.206.38:10911

 

C. 类似的我们可以看一下mqbroker的启动脚本,发现它实际是执行runbroker.sh脚本让它去执行com.alibaba.rocketmq.broker.BrokerStartup的main函数来启动broker,在runbroker.sh脚本中同样可以修改JVM的启动参数

 

D. broker关闭命令

 

sh mqshutdown broker

 

3. 客户端程序编写

这里编写了三个简单的客户端代码【具体根据业务调整】

引用RocketMQ客户端jar文件:

 

<dependency>
		<groupId>com.alibaba.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>3.2.6</version>
</dependency>

 

(1) producer

public class Producer {

	public static void main(String[] args) {
		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		producer.setNamesrvAddr("172.13.206.165:9876");

		try {
			
			producer.start();
			
			
			for(int i=0; i<20; i++) {
				
				Message msg = new Message("TestTopicA", "Push", "test1",
						"Test Msg 1".getBytes(Charset.forName("utf-8")));
	
				SendResult result = producer.send(msg);
				System.out.println("id:" + result.getMsgId() +  
	                    " result:" + result.getSendStatus());
				
				msg = new Message("TestTopicA", "Pull", "test2",
						"Test Msg 2".getBytes(Charset.forName("utf-8")));
				
				
				
				result = producer.send(msg);
				
				System.out.println("id:" + result.getMsgId() +  
	                    " result:" + result.getSendStatus());
			}
		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			producer.shutdown();
		}
	}
}

 

这里需要注意的是producer.shutdown(),它应用退出时,要调用来清理资源,关闭网络连接,从MetaQ服务器上注销自己,一般建议写在tomcat或jboss的退出钩子。

 

(2)PushConsumer

Push方式消费消息【根据RocketMQ开发手册,Push方式是以Pull长轮询的方式实现的】

 

public class PushConsumer {
	public static void main(String[] args) {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
		consumer.setNamesrvAddr("172.13.206.165:9876");
		
		try {
			consumer.subscribe("TestTopicA", "*");
			consumer.setMessageModel(MessageModel.BROADCASTING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					Message msg = msgs.get(0);
					System.out.println(msg.toString());
					
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
			
		} catch (MQClientException e) {
			e.printStackTrace();
		}
	}
}

 

(3)PullConsumer

以Pull方式消费消息:

 

public class PullConsumer {

	public static void main(String[] args) {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(
				"PullConsumer");
		consumer.setNamesrvAddr("172.13.206.165:9876");

		try {
			consumer.start();
			
			Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TestTopicA");
			for(MessageQueue mq : mqs) {
				System.out.println("Consumer From the queue:" + mq);
				long offset = 0;
				PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32);
				List<MessageExt> msgs = result.getMsgFoundList();
				if(msgs!=null && msgs.size() != 0) {
					for(MessageExt msg : msgs) {
						System.out.println(new String(msg.getBody(), Charset.forName("utf-8")));
					}
				}
				offset = result.getNextBeginOffset();
				System.out.println(result.getPullStatus());
			}
			

		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

 

分享到:
评论

相关推荐

    rocketmq监控需要的安装包

    RocketMQ是一款高性能、分布式的消息中间件,常用于大型分布式系统中的消息传递。Prometheus是一款流行的开源监控和警报工具,可以收集并存储时间序列数据,为包括RocketMQ在内的各种服务提供详细的监控。在这个场景...

    rocketmq-exporter-0.0.2-SNAPSHOT.jar

    prometheus监控rocketmq用到的rocketmq-exporter jar包,官方github.com/apache/rocketmq-exporter,mvn打包

    Apache RocketMQ v4.7.1 开发者指南.pdf

    Apache RocketMQ 是一款分布式消息中间件,它提供了高吞吐量、低延迟的消息传输服务。其核心概念包括: - **消息(Message):** 最小的数据单元,可以携带应用层数据。 - **主题(Topic):** 消息的分类,用来区分不同...

    QuickStart

    "QuickStart"是一个针对Telerik产品的入门教程或快速启动指南,旨在帮助用户迅速熟悉和掌握Telerik的相关工具和功能。Telerik是一家知名的软件开发公司,提供一系列用于构建高性能、美观的桌面和移动应用的开发工具...

    集成seata 分布式事务配置、sentienl 限流、熔断、降级、gateway 网关、jwt 、rocketmq

    ----------------------------rocketmq 消息队列 ---------------------------- brokerIP1=192.168.1.144 ... tools.cmd org.apache.rocketmq.example.quickstart.Consumer 启动生产者(先后输入):set NAMESRV_

    Laravel开发-laravel-quickstart

    8. **中间件**:中间件是处理请求的过滤器,可以用于权限检查、会话管理和执行其他任务。它们可以全局应用,也可以针对特定路由或控制器。 9. **艺术酱 (Artisan)**:Artisan 是 Laravel 自带的命令行工具,它提供...

    非常专业的QuickStart软件源码

    代码中国网站提供了丰富的开源项目、技术文章和论坛交流,对于理解和学习QuickStart的源码非常有帮助。 4. 非常专业的QuickStart软件源码: 这无疑是压缩包的核心内容,包含了软件的全部源代码。通过分析源码,我们...

    Autoware_QuickStart_v1.1.pdf

    Autoware是一款基于ROS(Robot Operating System)的开源软件,由名古屋大学开发,主要用于自动驾驶的研究与开发。它在GitHub上开放源代码,支持Linux和eSOL eMCOS等操作系统,并且能够运行在具有CPU、GPU的设备上,...

    quickStart

    越来越发现,用鼠标将浪费我大量的时间,所以,最近我一直拼命的学习windows的快捷按键。现在很多操作都可以使用快捷按键完成。但是我发现我没法用很快的速度执行我选择的软件执行。...还有一个就是使用这个软件的设置...

    maven-archetype-quickstart-1.1.jar包下载

    `maven-archetype-quickstart-1.1.jar` 是一个Maven的快速启动 archetype 包,专门用于帮助开发者快速创建一个新的Maven项目结构。在这个压缩包中,你将找到一个预设的Java项目模板,以便于你能够迅速地开始编码。 ...

    maven-archetype-quickstart-1.1.zip

    5. **LICENSE**和**NOTICE**文件:通常,开源项目会包含这些文件,用来声明版权和使用许可信息。 使用这个archetype,开发者可以避免手动创建目录结构和编写基础配置文件的繁琐工作。只需按照提示输入项目相关的...

    android quickstart范例

    【Android Quickstart范例详解】 Android Quickstart是针对初学者设计的一系列教程和示例代码,旨在帮助开发者快速上手Android应用开发。这个压缩包"quickstart-android-master"包含了所有必要的资源和代码,是你...

    maven-archetype-quickstart-1.1.jar下载

    解决Unable to create project from archetype [org.apache.maven.archetypes:maven-archetype-quickstart:1.1] 1. 下载maven-archetype-quickstart-1.1.jar 文件地址: 2.cmd窗口执行mvn install:install-file -...

    maven-archetype-quickstart

    1.maven-archetype-quickstart-1.1.jar 用于搭建maven模块项目 2.打开cmd窗口,执行mvn install:install-file -DgroupId=org.apache.maven.archetypes -DartifactId=maven-archetype-quickstart -Dversion=1.1 -...

    robotframework-quickstart

    Robot Framework 是一个开源自动化测试框架,设计用于支持各种不同类型的测试,包括功能测试、验收测试、回归测试以及更多其他类型。它以关键字驱动的方式工作,使得测试用例易于编写和理解,尤其适合非编程背景的...

    quickstart.rar

    在解压"quickstart.rar"后,文件名列表中只有一个条目"quickstart",这可能意味着压缩包包含了一个名为"quickstart"的目录,该目录下就是按照Maven Quickstart Archetype生成的项目结构,包括pom.xml、src/main/java...

    quickstart-android,适用于android的firebase快速启动示例.zip

    "quickstart-android,适用于android的firebase快速启动示例.zip"是一个开源项目,旨在帮助Android开发者快速了解并集成Firebase的各种功能到他们的应用中。 在压缩包中的"quickstart-android-master"文件夹中,你将...

    .NET快速入门教程QuickStart 中文版

    .NET快速入门教程QuickStart 中文版。希望对.net感兴趣的朋友有所帮助。

    Drupal 7 Visual QuickStart Guide

    Drupal 7是一套流行的开源内容管理系统(CMS),由Dries Buytaert创建。它允许用户创建和管理网站内容,并具有高度的可扩展性和灵活性。《Drupal 7 视觉快速指导》是一本由Tom Geller所著的入门教程,旨在通过...

    quickstart_uagateway.pdf

    标题《quickstart_uagateway.pdf》 描述《quickstart_uagateway》 标签《quickstart uagateway》 知识点: 1. UaGateway介绍: UaGateway是一个运行在Windows操作系统上的32位NT服务。它支持32位或64位(WOW64)的...

Global site tag (gtag.js) - Google Analytics