`
shijian4810
  • 浏览: 20756 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

RocketMQ安装与demo

阅读更多

服务器设置

1. 安装64JDK

2. 设置Linux文件系统为Ext4

3.开启987610911防火墙端口

源码编码

1. 安装Maven 2. 下载RocketMQ源码,下载地址:http://github.com/alibaba/RocketMQ.git/trunk,进入到源码解压目录下运行install.batDOS命令行切换到解压目录运行: mvn -Dmaven.test.skip=true clean package install assembly:assembly -U,编译成功后,在target目录下会有alibaba-rocketmq-3.1.1.tar.gz,该压缩包就是安装包。

3. 安装

alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:tar -zxvf

alibaba-rocketmq-3.1.1.tar.gz设置执行权限chmod +x ./alibaba-rocketmq/bin/*

4. 运行

配置采用双Master,双Slave,异步复制的配置方式,共需要4台服务器做硬件支持。 a. 修改配置

1)创建目录

mkdir /home/rocket/alibaba-rocketmq/logs #创建日志目录

mkdir -p /home/rocket/alibaba-rocketmq/data/store/commitlog #创建数据存储目录

更改日志目录

cd /home/rocket/alibaba-rocketmq/conf

2)修改A主配置

vi ./conf/2m-2s-async/broker-a.properties:

1. # brokerClusterName=DefaultCluster

2. brokerName=broker-a

3. brokerId=0

4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1

92.168.1.122:9876

5. defaultTopicQueueNums=4

6. autoCreateTopicEnable=true

7. autoCreateSubscriptionGroup=true

8. listenPort=10911

9. deleteWhen=04

10. fileReservedTime=120

11. mapedFileSizeCommitLog=1073741824

12. mapedFileSizeConsumeQueue=50000000

13. destroyMapedFileIntervalForcibly=120000

14. redeleteHangedFileInterval=120000

15. diskMaxUsedSpaceRatio=88

16. storePathRootDir=/usr/framework/rocketmq/datas

17. storePathCommitLog=/usr/framework/rocketmq/logs

18. maxMessageSize=65536

19. flushCommitLogLeastPages=4

20. flushConsumeQueueLeastPages=2

21. flushCommitLogThoroughInterval=10000

22. flushConsumeQueueThoroughInterval=60000

23. checkTransactionMessageEnable=false

24. sendMessageThreadPoolNums=128

25. pullMessageThreadPoolNums=128

26. brokerRole=SYNC_MASTER

27. flushDiskType=ASYNC_FLUSH

3)修改A从配置

vi ./conf/2m-2s-async/broker-a-s.properties:

1. # brokerClusterName=DefaultCluster

2. brokerName=broker-a

3. brokerId=1

4. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1

92.168.1.122:9876

5. defaultTopicQueueNums=4

6. autoCreateTopicEnable=true

7. autoCreateSubscriptionGroup=true

8. listenPort=10911

9. deleteWhen=04

10. fileReservedTime=120

11. mapedFileSizeCommitLog=1073741824

12. mapedFileSizeConsumeQueue=50000000

13. destroyMapedFileIntervalForcibly=120000

14. redeleteHangedFileInterval=120000

15. diskMaxUsedSpaceRatio=88

16. storePathRootDir=/usr/framework/rocketmq/datas

17. storePathCommitLog=/usr/framework/rocketmq/logs

18. maxMessageSize=65536

19. flushCommitLogLeastPages=4

20. flushConsumeQueueLeastPages=2

21. flushCommitLogThoroughInterval=10000

22. flushConsumeQueueThoroughInterval=60000

23. checkTransactionMessageEnable=false

24. sendMessageThreadPoolNums=128

25. pullMessageThreadPoolNums=128

26. brokerRole=SLAVE

27. flushDiskType=ASYNC_FLUSH

4)修改B主配置

vi ./conf/2m-2s-async/broker-b.properties:

28. # brokerClusterName=DefaultCluster

29. brokerName=broker-b

30. brokerId=0

31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1

92.168.1.122:9876

32. defaultTopicQueueNums=4

33. autoCreateTopicEnable=true

34. autoCreateSubscriptionGroup=true

35. listenPort=10911

36. deleteWhen=04

37. fileReservedTime=120

38. mapedFileSizeCommitLog=1073741824

39. mapedFileSizeConsumeQueue=50000000

40. destroyMapedFileIntervalForcibly=120000

41. redeleteHangedFileInterval=120000

42. diskMaxUsedSpaceRatio=88

43. storePathRootDir=/usr/framework/rocketmq/datas

44. storePathCommitLog=/usr/framework/rocketmq/logs

45. maxMessageSize=65536

46. flushCommitLogLeastPages=4

47. flushConsumeQueueLeastPages=2

48. flushCommitLogThoroughInterval=10000

49. flushConsumeQueueThoroughInterval=60000

50. checkTransactionMessageEnable=false

51. sendMessageThreadPoolNums=128

52. pullMessageThreadPoolNums=128

53. brokerRole=SYNC_MASTER

54. flushDiskType=ASYNC_FLUSH 5)修改B从配置

vi ./conf/2m-2s-async/broker-b-s.properties:

28. # brokerClusterName=DefaultCluster

29. brokerName=broker-b

30. brokerId=1

31. namesrvAddr=192.168.1.119:9876;192.168.1.120:9876;192.168.1.121:9876;1

92.168.1.122:9876

32. defaultTopicQueueNums=4

33. autoCreateTopicEnable=true

34. autoCreateSubscriptionGroup=true

35. listenPort=10911

36. deleteWhen=04

37. fileReservedTime=120

38. mapedFileSizeCommitLog=1073741824

39. mapedFileSizeConsumeQueue=50000000

40. destroyMapedFileIntervalForcibly=120000

41. redeleteHangedFileInterval=120000

42. diskMaxUsedSpaceRatio=88

43. storePathRootDir=/usr/framework/rocketmq/datas

44. storePathCommitLog=/usr/framework/rocketmq/logs

45. maxMessageSize=65536

46. flushCommitLogLeastPages=4

47. flushConsumeQueueLeastPages=2

demo:

 Producer

 

package com.lvxc.study.tech.rmq;  

  

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  

import com.alibaba.rocketmq.client.producer.SendResult;  

import com.alibaba.rocketmq.common.message.Message;  

  

public class Producer {  

  

    public static void main(String[] args) {  

        DefaultMQProducer producer = new DefaultMQProducer("Producer");  

        //nameserver服务,多个以;分开   

        producer.setNamesrvAddr("192.168.133.128:9876");  

        try{  

            producer.start();  

              

            Message msg = new Message("PushTopic","push","1","Just for test.".getBytes());  

            SendResult result = producer.send(msg);  

            System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;  

              

            msg = new Message("PushTopic","push","2","Just for test.".getBytes());  

            result = producer.send(msg);  

            System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;  

              

            msg = new Message("PullTopic","pull","1","Just for test.".getBytes());  

            result = producer.send(msg);  

            System.out.println("id:"+result.getMsgId()+" result:" +result.getSendStatus());;  

        }catch(Exception e){  

            e.printStackTrace();  

        }finally {  

            producer.shutdown();  

        }  

    }  

  

}  
  Consumer

 

package com.lvxc.study.tech.rmq;  

  

import java.util.List;  

  

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  

import com.alibaba.rocketmq.common.message.Message;  

import com.alibaba.rocketmq.common.message.MessageExt;  

  

public class Consumer {  

  

    public static void main(String[] args) {  

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");    

        consumer.setNamesrvAddr("192.168.133.128:9876");     

        try {    

            //订阅PushTopicTagpush的消息    

            consumer.subscribe("PushTopic", "push");    

            //程序第一次启动从消息队列头取数据    

            consumer.setConsumeFromWhere(    

                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);    

            consumer.registerMessageListener(    

                new MessageListenerConcurrently() {    

                    public ConsumeConcurrentlyStatus consumeMessage(    

                            List<MessageExt> list,    

                            ConsumeConcurrentlyContext Context) {    

                        Message msg = list.get(0);    

                        System.out.println(msg.toString());    

                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    

                    }    

                }    

            );    

            consumer.start();    

        } catch (Exception e) {    

            e.printStackTrace();    

        }    

    }    

  

}  

 

分享到:
评论

相关推荐

    RocketMQ消息队列demo

    这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行交互,实现消息的发布和订阅功能。 1. ** RocketMQ的基本概念 ** - **主题(Topic)**:主题是消息...

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    RocketMQ代码demo.zip

    在"rocketMQ-demo"这个压缩包中,应该包含了一个完整的示例项目,包括了上述的配置文件、Producer和Consumer的实现,以及可能的测试代码。你可以通过导入这个项目到IDE,运行并查看日志,以了解如何实际操作RocketMQ...

    消息中间件 RocketMQ 发布和订阅 Demo

    这将允许你的应用与 RocketMQ 服务端进行通信。在 `pom.xml` 文件中,查找或添加如下依赖: ```xml &lt;groupId&gt;com.aliyun.openservices&lt;/groupId&gt; &lt;artifactId&gt;rocketmq-client 最新版本号 ``` 2. **创建...

    springboot-rocketmq-demo.zip

    标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...

    RocketMQ 的小demo

    初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂

    rocketmq-demo.zip

    在本示例中,你将看到基于Java API编写的Producer和Consumer类,它们是与RocketMQ交互的关键组件。 1. **Producer**:生产者负责创建和发送消息到RocketMQ服务器。在RocketMQ中,生产者可以是单个实例或一组实例,...

    rocketmq 3.5.8.zip及相关demo和源码及安装使用ppt

    很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...

    spring-boot操作rocketmq的demo

    spring-boot操作rocketmq的demo,亲测可用,代码整理的好

    springboot集成RocketMQ消息队列demo源码案例演示

    一、简介 二、环境部署 1、编译打包 2、修改配置 3、服务启动 4、控制台安装 三、工程搭建 1、工程结构 2、依赖管理 3、配置文件 4、配置类 四、基础用法 1、消息生产 2、消息消费 五、参考源码

    RocketMQ相关资料文档以及demo

    这个压缩包文件包含了关于RocketMQ的相关资料文档和一个demo,将帮助我们深入理解其工作原理和使用方法。 首先,我们要理解RocketMQ的基本概念。它是基于发布/订阅模式的消息队列,支持点对点和发布/订阅两种消息...

    RocketMQ学习demo

    RocketMQ学习demo

    rocketmq系列资料.zip

    为了帮助大家更好的学习和使用RocketMQ,因此提供相关学习文档,一起学习,资源包括 RocketMQ初步认知、RocketMQ单机环境搭建、RocketMQ集群部署实践、基于myeclipse的RocketMQ--Demo实践、基于RocketMQ--Demo项目的...

    SpringBoot集成RocketMq,打包成jar包引入到SpringBoot项目中,使用RocketMq发送消费消息的功能

    SpringBoot集成RocketMq,可打包成jar包引入到SpringBoot项目中,方便快捷的使用RocketMq的发送消费消息的功能

    RocketMQ 5.2.0

    - **主题(Topic)与队列(Queue)**: RocketMQ 使用主题和队列的概念来组织消息。一个主题可以包含多个队列,每个队列存储一部分消息。这种设计允许消费者并行消费,提高处理速度。 - **发布/订阅模型**: 支持点...

    rocketmq-demo:最终一致性分布式事务-rocketmq使用

    本示例“rocketmq-demo”将着重讲解如何利用 RocketMQ 实现最终一致性分布式事务,这对于理解和掌握分布式系统的事务处理机制至关重要。 在分布式系统中,事务处理是个复杂的话题,而最终一致性是解决分布式事务的...

    springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听

    在本文中,我们将深入探讨如何将SpringBoot 1.5.10.RELEASE与RocketMQ 4.3.1集成,构建一个支持多个消费者监听的消息服务示例。RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟、高可用性和...

    rocketmq-demo

    在本项目"rocketmq-demo"中,我们将探讨 RocketMQ 的基本概念、工作原理以及如何通过示例代码进行实践。 1. **RocketMQ 基本概念** - **主题(Topic)**:主题是消息的逻辑分类,类似于广播频道,多个消费者可以...

    RocketMq_java_demo.rar

    RocketMQ是阿里巴巴开源的...这个Java SpringBoot的RocketMQ demo是一个基础的起点,实际应用中可能需要处理更复杂的场景,比如消息幂等性、事务消息、消息过滤等。你可以根据业务需求进一步学习和扩展RocketMQ的功能。

    RocketMQ Java Demo

    java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker

Global site tag (gtag.js) - Google Analytics