服务器设置
1. 安装64位JDK;
2. 设置Linux文件系统为Ext4
3.开启9876,10911防火墙端口
源码编码
1. 安装Maven 2. 下载RocketMQ源码,下载地址:http://github.com/alibaba/RocketMQ.git/trunk,进入到源码解压目录下运行install.bat或DOS命令行切换到解压目录运行: mvn -Dmaven.test.skip=true clean package install assembly:assembly -U,编译成功后,在target目录下会有alibaba-rocketmq-3.1.1.tar.gz,该压缩包就是安装包。
3. 安装
将alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:tar -zxvf
alibaba-rocketmq-3.1.1.tar.gz设置执行权限chmod +x ./alibaba-rocketmq/bin/*
4. 运行
配置采用双Master,双Slave,异步复制的配置方式,共需要4台服务器做硬件支持。 a. 修改配置
(1)创建目录
mkdir /home/rocket/alibaba-rocketmq/logs #创建日志目录
mkdir -p /home/rocket/alibaba-rocketmq/data/store/commitlog #创建数据存储目录
更改日志目录
cd /home/rocket/alibaba-rocketmq/conf
(2)修改A主配置
vi ./conf/2m-2s-async/broker-a.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=0
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SYNC_MASTER
27. flushDiskType=ASYNC_FLUSH
(3)修改A从配置
vi ./conf/2m-2s-async/broker-a-s.properties:
1. # brokerClusterName=DefaultCluster
2. brokerName=broker-a
3. brokerId=1
4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
5. defaultTopicQueueNums=4
6. autoCreateTopicEnable=true
7. autoCreateSubscriptionGroup=true
8. listenPort=10911
9. deleteWhen=04
10. fileReservedTime=120
11. mapedFileSizeCommitLog=1073741824
12. mapedFileSizeConsumeQueue=50000000
13. destroyMapedFileIntervalForcibly=120000
14. redeleteHangedFileInterval=120000
15. diskMaxUsedSpaceRatio=88
16. storePathRootDir=/usr/framework/rocketmq/datas
17. storePathCommitLog=/usr/framework/rocketmq/logs
18. maxMessageSize=65536
19. flushCommitLogLeastPages=4
20. flushConsumeQueueLeastPages=2
21. flushCommitLogThoroughInterval=10000
22. flushConsumeQueueThoroughInterval=60000
23. checkTransactionMessageEnable=false
24. sendMessageThreadPoolNums=128
25. pullMessageThreadPoolNums=128
26. brokerRole=SLAVE
27. flushDiskType=ASYNC_FLUSH
(4)修改B主配置
vi ./conf/2m-2s-async/broker-b.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=0
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
48. flushCommitLogThoroughInterval=10000
49. flushConsumeQueueThoroughInterval=60000
50. checkTransactionMessageEnable=false
51. sendMessageThreadPoolNums=128
52. pullMessageThreadPoolNums=128
53. brokerRole=SYNC_MASTER
54. flushDiskType=ASYNC_FLUSH (5)修改B从配置
vi ./conf/2m-2s-async/broker-b-s.properties:
28. # brokerClusterName=DefaultCluster
29. brokerName=broker-b
30. brokerId=1
31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1
92.168.1.122:9876
32. defaultTopicQueueNums=4
33. autoCreateTopicEnable=true
34. autoCreateSubscriptionGroup=true
35. listenPort=10911
36. deleteWhen=04
37. fileReservedTime=120
38. mapedFileSizeCommitLog=1073741824
39. mapedFileSizeConsumeQueue=50000000
40. destroyMapedFileIntervalForcibly=120000
41. redeleteHangedFileInterval=120000
42. diskMaxUsedSpaceRatio=88
43. storePathRootDir=/usr/framework/rocketmq/datas
44. storePathCommitLog=/usr/framework/rocketmq/logs
45. maxMessageSize=65536
46. flushCommitLogLeastPages=4
47. flushConsumeQueueLeastPages=2
demo:
Producer类
package com.lvxc.study.tech.rmq;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
//nameserver服务,多个以;分开
producer.setNamesrvAddr("192.168.133.128:9876");
try{
producer.start();
Message msg = new Message("PushTopic","push","1","Just for test.".getBytes());
SendResult result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PushTopic","push","2","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
msg = new Message("PullTopic","pull","1","Just for test.".getBytes());
result = producer.send(msg);
System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;
}catch(Exception e){
e.printStackTrace();
}finally {
producer.shutdown();
}
}
}
Consumer类
package com.lvxc.study.tech.rmq;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("192.168.133.128:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
相关推荐
这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行交互,实现消息的发布和订阅功能。 1. ** RocketMQ的基本概念 ** - **主题(Topic)**:主题是消息...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
在"rocketMQ-demo"这个压缩包中,应该包含了一个完整的示例项目,包括了上述的配置文件、Producer和Consumer的实现,以及可能的测试代码。你可以通过导入这个项目到IDE,运行并查看日志,以了解如何实际操作RocketMQ...
这将允许你的应用与 RocketMQ 服务端进行通信。在 `pom.xml` 文件中,查找或添加如下依赖: ```xml <groupId>com.aliyun.openservices</groupId> <artifactId>rocketmq-client 最新版本号 ``` 2. **创建...
标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...
初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂
在本示例中,你将看到基于Java API编写的Producer和Consumer类,它们是与RocketMQ交互的关键组件。 1. **Producer**:生产者负责创建和发送消息到RocketMQ服务器。在RocketMQ中,生产者可以是单个实例或一组实例,...
很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...
spring-boot操作rocketmq的demo,亲测可用,代码整理的好
一、简介 二、环境部署 1、编译打包 2、修改配置 3、服务启动 4、控制台安装 三、工程搭建 1、工程结构 2、依赖管理 3、配置文件 4、配置类 四、基础用法 1、消息生产 2、消息消费 五、参考源码
这个压缩包文件包含了关于RocketMQ的相关资料文档和一个demo,将帮助我们深入理解其工作原理和使用方法。 首先,我们要理解RocketMQ的基本概念。它是基于发布/订阅模式的消息队列,支持点对点和发布/订阅两种消息...
RocketMQ学习demo
为了帮助大家更好的学习和使用RocketMQ,因此提供相关学习文档,一起学习,资源包括 RocketMQ初步认知、RocketMQ单机环境搭建、RocketMQ集群部署实践、基于myeclipse的RocketMQ--Demo实践、基于RocketMQ--Demo项目的...
SpringBoot集成RocketMq,可打包成jar包引入到SpringBoot项目中,方便快捷的使用RocketMq的发送消费消息的功能
- **主题(Topic)与队列(Queue)**: RocketMQ 使用主题和队列的概念来组织消息。一个主题可以包含多个队列,每个队列存储一部分消息。这种设计允许消费者并行消费,提高处理速度。 - **发布/订阅模型**: 支持点...
本示例“rocketmq-demo”将着重讲解如何利用 RocketMQ 实现最终一致性分布式事务,这对于理解和掌握分布式系统的事务处理机制至关重要。 在分布式系统中,事务处理是个复杂的话题,而最终一致性是解决分布式事务的...
在本文中,我们将深入探讨如何将SpringBoot 1.5.10.RELEASE与RocketMQ 4.3.1集成,构建一个支持多个消费者监听的消息服务示例。RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟、高可用性和...
在本项目"rocketmq-demo"中,我们将探讨 RocketMQ 的基本概念、工作原理以及如何通过示例代码进行实践。 1. **RocketMQ 基本概念** - **主题(Topic)**:主题是消息的逻辑分类,类似于广播频道,多个消费者可以...
RocketMQ是阿里巴巴开源的...这个Java SpringBoot的RocketMQ demo是一个基础的起点,实际应用中可能需要处理更复杂的场景,比如消息幂等性、事务消息、消息过滤等。你可以根据业务需求进一步学习和扩展RocketMQ的功能。
java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker