`

RocketMQ 3.2.6安装以及测试

阅读更多
下载 alibaba-rocketmq-3.2.6.tar.gz
下载页面:https://github.com/alibaba/RocketMQ/releases
下载地址:
https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz



解压
tar -xvf alibaba-rocketmq-3.2.6.tar.gz

需要jdk1.6以上
设置环境变量
vi .bash_profile

export JAVA_HOME=/opt/jdk6
export JRE_HOME=/opt/jdk6/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:CLASSPATH
export ROCKETMQ_HOME=/opt/mq/alibaba-rocketmq
export NAMESRV_ADDR=127.0.0.1:9876

启动
cd alibaba-rocketmq/bin
./play.sh

启动完成后
jps
会有NamesrvStartup和BrokerStartup两个java进程


如果没有问题,停止服务
cd alibaba-rocketmq/bin
sh mqshutdown broker
sh mqshutdown namesrv





二.设置防火墙:
默认启动用会使用3个端口 9876,10911,10912
分别代表名称服务端口,broker端口,broker ha端口。
ha端口haListenPort表示Master监听Slave请求的端口,默认为服务端口+1

添加到/etc/sysconfig/iptables的filter表INPUT链中

-A INPUT -m state --state NEW -m tcp -p tcp --dport 9876 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 10911 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 10912 -j ACCEPT

添加完成后重启防火墙
service iptables restart

四.获取broker默认配置
cd alibaba-rocketmq/bin
sh mqbroker -m

启动时如何加载配置文件呢
1.生成默认的配置模板
sh mqbroker -m > broker.p
2.修改配置
注意以下两个参数
    storePathRootDir=/opt/mq/rocketmq/store
    storePathCommitLog=/opt/mq/rocketmq/store/commitlog
默认commitlog是存放在用户主目录的下的store/commitlog目录下

namesrvAddr=127.0.0.1:9876
名称服务器地址,可以在命令行通过-n传入,多个名称服务器用;隔开

brokerIP1=192.168.6.57
根据实际情况填写,默认启动时自动识别,一般用于多网卡识别错误,手工配置。

brokerName=peteccBrkMaster
broker名称,默认主机名,我们可以改下peteccBrkMaster

listenPort=10911
默认监听端口

brokerId=0
0表示master,>0表示slave

autoCreateTopicEnable=true
是否自动创建topic,线上环境建议关闭

deleteWhen=04
删除文件的时间点,默认凌晨4点

fileReservdTime=48
文件保留时间,默认48小时

3.加载配置
nohup sh mqbroker -c broker.p &

五.生成namesrv默认配置

sh mqnamesrv -p >namesrv.p
可以通过 sh mqnamesrv -h  查看命令参数。




启动命令
nohup sh mqnamesrv -c namesrv.p &

六操作系统配置
os.sh大概意思是根据rocketmq的特点,修改系统参数,修改磁盘调度算法。
接下来我们操作一下,这个如要root权限

vim /etc/sysctl.conf增加

vm.overcommit_memory=1
vm.min_free_kbytes=5000000
vm.drop_caches=1
vm.zone_reclaim_mode=0
vm.max_map_count=655360
vm.dirty_background_ratio=50
vm.dirty_ratio=50
vm.page-cluster=3
vm.dirty_writeback_centisecs=360000
vm.swappiness=10

修改完成后sysctl -p

修改最大打开文件描述数

vim /etc/security/limits.conf添加
*  soft nofile 655350
*  hard nofile 655350

退出当前用户重新login就会生效,使用ulimit -n验证下。

修改io调试算法为deadline

查看当前系统支持的IO调度算法
[rocketmq@vtfsdb3 bin]$ dmesg | grep -i scheduler

查看当前系统默认io调度算法
[rocketmq@vtfsdb3 bin]$ cat /sys/block/sda/queue/scheduler
noop anticipatory deadline [cfq]

注意中间的sda换成实际运行存储rocketmq的磁盘,这个自己通过fdisk,df自己找出来吧,

临时更改I/O调度方法:
echo 'deadline' > /sys/block/sda/queue/scheduler

将这句加入开机启动/etc/rc.local中吧

修改完成,确认下
cat /sys/block/sda/queue/scheduler

jvm参数调整

cd alibaba-rocketmq/bin

有两个文件runserver.sh和runbroker.sh两个shell脚本,分别是name server和broker server的配置启动脚本
#vim修改runbroker.sh、runserver.sh两个文件
-------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
###Xms启动时内存,Xmx最大内存,Xmn最小内存

测试代码:
生产者:
/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;


/**
* Producer,发送消息
*
*/
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
       //DefaultCluster  please_rename_unique_group_name
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    // producer.setNamesrvAddr("");//
    producer.setNamesrvAddr("192.168.6.57:9876");
    /// producer.setInstanceName("");
    producer.setInstanceName("Product");
    producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("dell-PC",// topic   TopicTest
                    "TagA",// tag
                    ("Hello RocketMQ 测试信息------" + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(i+"--"+sendResult);
                Thread.sleep(30);

            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}


消费者:
/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.example.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;


/**
* Consumer,订阅消息
*/
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer
        ("please_rename_unique_group_name_4");//("please_rename_unique_group_name_4");
       
        consumer.setNamesrvAddr("192.168.6.57:9876");
        consumer.setInstanceName("Consumber");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("dell-PC", "*"); //TopicTest
      //  consumer.subscribe("BenchmarkTest", "*");
       // consumer.subscribe("benchmark_consumer_39", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                for(MessageExt item:msgs){
                System.out.println(new String(item.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}



生产者运行结果:



消费者运行结果:




七. mqadmin 命令使用:
cd /opt/mq/alibaba-rocketmq/bin

1. 查看所有消费组group:
   sh mqadmin consumerProgress -n 192.168.6.57:9876
2. 查看指定消费组下的所有topic数据堆积情况:
    sh mqadmin consumerProgress -n 192.168.6.57:9876 -g benchmark_consumer_33
3. 查看所有topic :
     sh mqadmin topicList -n 192.168.6.57:9876
4. 查看topic信息列表详情统计
   sh mqadmin topicstatus -n 192.168.6.57:9876 -t myTopicTest1
5.  新增topic
   sh mqadmin updateTopic –n 192.168.6.57 –c groupname –t topicname
6. 删除topic
  sh mqadmin deleteTopic –n 192.168.6.57:9876 –c groupname –t topicname





手动来配置mq参数:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/alibaba-rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/alibaba-rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/alibaba-rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536

#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128







  • 大小: 15.1 KB
  • 大小: 14.3 KB
  • 大小: 36.3 KB
  • 大小: 31.1 KB
  • 大小: 22.3 KB
分享到:
评论

相关推荐

    rocketmq3.2.6

    rocketmq3.2.6控制台;rocketmq3.2.6控制台;rocketmq3.2.6控制台;rocketmq3.2.6控制台;rocketmq3.2.6控制台;

    rocketMq3.2.6最新版

    RocketMQ 3.2.6是该软件的一个特定版本,它可能包含了在此之前的版本中积累的改进、优化以及新功能。 1. **RocketMQ核心概念**: - **消息队列**:RocketMQ的核心是消息队列,它存储并转发消息,确保消息的可靠...

    RocketMQ 3.2.6 web监控程序

    【RocketMQ 3.2.6 Web监控程序详解】 RocketMQ是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着重要角色,提供高可靠、高性能的消息传输服务。RocketMQ 3.2.6版本引入了Web监控程序,为用户...

    alibaba-rocketmq3.2.6

    本文将深入探讨RocketMQ的核心概念、功能特性以及3.2.6版本中的关键改进。 1. **核心概念**: - **消息队列(Message Queue)**:RocketMQ的核心是消息队列,它作为消息的存储和传输媒介,实现了生产者与消费者的...

    RocketMQ 3.2.6

    3. **NameServer**:NameServer是 RocketMQ 的服务注册与发现组件,它负责存储 Topic 和 Broker 之间的映射关系,以及 Broker 的元数据信息,但不参与消息的存储和传输,从而降低了系统的复杂性。 4. **Broker**:...

    RocketMQ-3.2.6

    RocketMQ-3.2.6是该软件的一个特定版本,提供了稳定性和性能的优化。 1. **RocketMQ基本概念**: - **Producer**: 生产者,负责发送消息到RocketMQ服务器。 - **Consumer**: 消费者,从RocketMQ服务器接收并处理...

    RocketMQ3.2.6

    RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 1、支持严格的消息顺序; 2、支持Topic与Queue两种模式; 3、亿级消息堆积能力; 4、比较友好的分布式特性; 5、同时支持Push与Pull方式消费消息;

    RocketMQ3.2.6消息中间件

    在3.2.6版本中,RocketMQ进一步优化了性能和稳定性,为企业级应用提供了强大的消息传递能力。 1. **消息模型** RocketMQ采用发布/订阅(Publish/Subscribe)模型,支持点对点和发布订阅两种消息模式。点对点模式下...

    RocketMQ-3.2.6源码

    通过阅读和理解RocketMQ 3.2.6的源码,开发者可以深入学习到分布式消息中间件的设计原理,以及如何在实际项目中有效利用这些特性,提高系统的可靠性和性能。此外,源码分析还能帮助开发者自定义插件,实现特定功能,...

    RocketMQ-3.2.6_part0

    在"RocketMQ-3.2.6_part0"的压缩包中,我们可以期待找到关于这个特定版本的源码、配置文件、API文档、示例代码以及可能的升级和迁移指南等内容。这些资源对于理解RocketMQ的内部工作原理、开发基于RocketMQ的应用...

    rocketmq-3.2.6 JAR包

    在这个“rocketmq-3.2.6 JAR包”中,包含了三个核心组件的JAR文件:`rocketmq-client-3.2.6.jar`, `rocketmq-common-3.2.6.jar`, 和 `rocketmq-remoting-3.2.6.jar`。 1. `rocketmq-client-3.2.6.jar`: 这个...

    RocketMQ-3.2.6源码工程

    RocketMQ的源码工程,直接导入到eclipse工程中即可使用;为了在eclipse中运行RocketMQ工程,在NamesrvStartup类的128行、BrokerStartup类的160号加入了一句环境路径的配置语句

    alibaba-rocketmq-3.2.6.tar.gz

    RocketMQ与rabbitMQ相比,其运行效率更为高效,作为alibaba双十一的信息交换组件,其效率可见一斑,同时因为2.x以来公司内部其他系统,建议使用3.x及其以上版本。Github下载已失效,apache-rocketMQ已经到4.0,自行...

    alibaba-rocketmq-3.2.6 包

    在解压“alibaba-rocketmq-3.2.6”包后,你将获得 RocketMQ 的源码、配置文件、依赖库以及相关文档。通过这些资源,你可以深入了解其内部实现,进行定制化开发,或者在本地环境中搭建和测试RocketMQ服务器。安装过程...

    rocketmq-3.2.6.tar.gz

    3.2.6版本是RocketMQ的一个稳定版本,提供了可靠的消息传输、高并发处理能力以及丰富的消息模式。以下是对RocketMQ 3.2.6版本中的主要知识点的详细说明: 1. **消息模型**:RocketMQ支持发布/订阅和点对点两种消息...

    RocketMQ-3.2.6-with-dependencies

    在RocketMQ-3.2.6压缩包中,用户可以找到RocketMQ的源码、编译脚本、配置文件以及相关的依赖库。解压后,用户可以按照官方文档的指引,配置环境变量,编译源码,启动NameServer、Broker等服务,并通过示例代码了解...

    SpringBoot+RocketMQ

    2. **配置RocketMQ**:在SpringBoot的配置文件(如application.properties或application.yml)中,我们需要设置RocketMQ服务器的地址、端口以及相关的认证信息,例如: ``` rocketmq.producer.group=YOUR_PRODUCER...

    rocketmq-console.war 3.2.6 管控台war包

    标题中的"rocketmq-console.war 3.2.6 管控台war包"指的是RocketMQ Console的3.2.6版本的Web应用包,通常以WAR(Web ARchive)格式发布。WAR文件是一种Java Web应用程序的标准打包格式,可以直接部署在支持Java ...

Global site tag (gtag.js) - Google Analytics