`

RocketMQ最佳实践(一)4.0版本/概念介绍/安装调试/客户端demo

阅读更多

为什么选择RocketMQ

我们来看看官方回答:

“我们研究发现,对于ActiveMQ而言,随着越来越多的使用queues和topics,其IO成为了瓶颈。某些情况下,消费者缓慢(消费能力不足)还会拖慢生产者(造成消息阻塞)。虽然我们做了最大努力进行优化:节流、断路器或者回退,但是并不能进行优雅的扩展。因此我们开始专注于使用时下非常流行的kafka,但是仍然不能满足我们的要求,如低延迟和高可靠性,详情见这里。在这样的背景下,我们决定开发一个新的消息中间件来处理一系列广泛的使用场景,包括从传统的发布/订阅场景到高容量的实时交易系统中不允许消息丢失的场景。”

各位看官也可以撮这里去看看RocketMQ与ActiveMQ以及Kafka的比较

 

核心概念

  • 生产者(Producer):消息发送方,将业务系统中产生的消息发送到brokers(brokers可以理解为消息代理,生产者和消费者之间是通过brokers进行消息的通信),rocketmq提供了以下消息发送方式:同步、异步、单向
  • 生产者组(Producer Group):相同角色的生产者被归为同一组,比如通常情况下一个服务会部署多个实例,这多个实例就是一个组,生产者分组的作用只体现在消息回查的时候,即如果一个生产者组中的一个生产者实例发送一个事务消息到broker后挂掉了,那么broker会回查此实例所在组的其他实例,从而进行消息的提交或回滚操作。
  • 消费者(Consumer):消息消费方,从brokers拉取消息。站在用户的角度,有以下两种消费者。
  • 主动消费者(PullConsumer):从brokers拉取消息并消费。
  • 被动消费者(PushConsumer):内部也是通过pull方式获取消息,只是进行了扩展和封装,并给用户预留了一个回调接口去实现,当消息到底的时候会执行用户自定义的回调接口。
  • 消费者组(Consumer Group):和生产者组类似。其作用体现在实现消费者的负载均衡和容错,有了消费者组变得异常容易。需要注意的是:同一个消费者组的每个消费者实例订阅的主题必须相同。
  • 主题(Topic):主题就是消息传递的类型。一个生产者实例可以发送消息到多个主题,多个生产者实例也可以发送消息到同一个主题。同样的,对于消费者端来说,一个消费者组可以订阅多个主题的消息,一个主题的消息也可以被多个消费者组订阅。
  • 消息(Message):消息就像是你传递信息的信封。每个消息必须指定一个主题,就好比每个信封上都必须写明收件人。
  • 消息队列(Message Queues):在主题内部,逻辑划分了多个子主题,每个子主题被称为消息队列。这个概念在实现最大并发数、故障切换等功能上有巨大的作用。
  • 标签(Tag):标签,可以被认为是子主题。通常用于区分同一个主题下的不同作用或者说不同业务的消息。同时也是避免主题定义过多引起性能问题,通常情况下一个生产者组只向一个主题发送消息,其中不同业务的消息通过标签或者说子主题来区分。
  • 消息代理(Broker):消息代理是RockerMQ中很重要的角色。它接收生产者发送的消息,进行消息存储,为消费者拉取消息服务。它还存储消息消耗相关的元数据,包括消费群体,消费进度偏移和主题/队列信息。
  • 命名服务(Name Server):命名服务作为路由信息提供程序。生产者/消费者进行主题查找、消息代理查找、读取/写入消息都需要通过命名服务获取路由信息。
  • 消息顺序(Message Order):当我们使用DefaultMQPushConsumer时,我们可以选择使用“orderly”还是“concurrently”。
    • orderly:消费消息的有序化意味着消息被生产者按照每个消息队列发送的顺序消费。如果您正在处理全局顺序为强制的场景,请确保您使用的主题只有一个消息队列。注意:如果指定了消费顺序,则消息消费的最大并发性是消费组订阅的消息队列数。
    • concurrently:当同时消费时,消息消费的最大并发仅限于为每个消费客户端指定的线程池。注意:此模式不再保证消息顺序。

安装与调试

官方要求的环境:

  • 64bit OS, Linux/Unix/Mac is recommended;
  • 64bit JDK 1.7+;
  • Maven 3.2.x
  • Git

我的环境:(我喜欢使用较新的版本得意

  • CentOS Linux release 7.3.1611;
  • 64bit JDK 1.8.0_91;
  • apache-maven-3.5.0;
  • Git 1.8.3.1

安装jdk

麻烦各位看官自行搜索,资料多的吓人。。。微笑

安装maven

先去官网下载maven
然后上传到安装目录,解压:
[plain] view plain copy
 
  1. sudo tar zxvf apache-maven-3.5.0-bin.tar.gz  
解压完成设置环境变量:
[plain] view plain copy
 
  1. sudo vi /etc/profile  
然后使环境变量生效:
[plain] view plain copy
 
  1. source /etc/profile  
最后验证是否安装成功:
[plain] view plain copy
 
  1. mvn -v  

安装Git(so easy大笑

先检查看看是否已经安装过了:
[plain] view plain copy
 
  1. git --version  
如果没有就开始安装:
[plain] view plain copy
 
  1. sudo yum install git  
安装完毕再看看:
[plain] view plain copy
 
  1. git --version  

下面进行RocketMq安装

编译:
[plain] view plain copy
 
  1. > git clone https://github.com/apache/incubator-rocketmq.git  
[plain] view plain copy
 
  1. > cd incubator-rocketmq  
[plain] view plain copy
 
  1. > mvn clean package install -Prelease-all assembly:assembly -U  
[plain] view plain copy
 
  1. > cd target/apache-rocketmq-all  
在执行mvn编译的时候,你可能会遇到如下的问题:
这是由于没有权限创建目录造成的。所以,要么你切换到root用户,要么使用sudo:
[plain] view plain copy
 
  1. sudo mvn clean package install -Prelease-all assembly:assembly -U  
提示:sudo: mvn: command not found。好吧,也是醉了。我们还需要在你当前用户的Home目录下的一个隐藏文件(.bashrc)中添加点东西:
[plain] view plain copy
 
  1. > cd ~  
[plain] view plain copy
 
  1. > sudo vi .bashrc  
添加完成后,执行:source .bashrc  使修改生效。然后再重新执行看看:
[plain] view plain copy
 
  1. sudo mvn clean package install -Prelease-all assembly:assembly -U  
时间稍微有点长,我的环境用了16分钟,请看官耐心等待,完成后如下图:

启动RocketMQ

修改默认配置

由于RocketMQ默认配置要求很高,比如内存至少就要4个G,开发调试环境根本吃不消,所以我们开始启动前需要先修改这些参数。否则的话,我们很有会遇到内存分配或者不够的问题。
修改target/apache-rocketmq-all/bin/runserver.sh
[plain] view plain copy
 
  1. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"  
修改target/apache-rocketmq-all/bin/runbroker.sh
[plain] view plain copy
 
  1. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m  
修改target/apache-rocketmq-all/bin/tools.sh
[plain] view plain copy
 
  1. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"  

启动NameServer

进入target/apache-rocketmq-all目录下
[plain] view plain copy
 
  1. > nohup sh bin/mqnamesrv &  
[plain] view plain copy
 
  1. > tail -f ~/logs/rocketmqlogs/namesrv.log  
[plain] view plain copy
 
  1. The Name Server boot success...  

启动Broker

[plain] view plain copy
 
  1. > nohup sh bin/mqbroker -n localhost:9876 &  
[plain] view plain copy
 
  1. > tail -f ~/logs/rocketmqlogs/broker.log   
[plain] view plain copy
 
  1. The broker[%s, 172.17.0.1:10911] boot success...  

开放端口

[plain] view plain copy
 
  1. sudo vi /etc/sysconfig/iptables  
然后重启生效:
[plain] view plain copy
 
  1. sudo systemctl restart iptables  

添加ROCKETMQ_HOME环境变量

[plain] view plain copy
 
  1. sudo vi /etc/profile  


[plain] view plain copy
 
  1. source /etc/profile  


java客户端

pom.xml

[html] view plain copy
 
  1. <rocketmq.version>4.0.0-incubating</rocketmq.version>  
  2.   
  3. <dependency>  
  4.     <groupId>org.apache.rocketmq</groupId>  
  5.     <artifactId>rocketmq-client</artifactId>  
  6.     <version>${rocketmq.version}</version>  
  7. </dependency>  
  8. <dependency>  
  9.     <groupId>org.apache.rocketmq</groupId>  
  10.     <artifactId>rocketmq-common</artifactId>  
  11.     <version>${rocketmq.version}</version>  
  12. </dependency>  

生产者

[java] view plain copy
 
  1. import org.apache.rocketmq.client.exception.MQClientException;  
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;  
  3. import org.apache.rocketmq.client.producer.SendResult;  
  4. import org.apache.rocketmq.common.message.Message;  
  5.   
  6. import java.util.concurrent.TimeUnit;  
  7.   
  8. public class Producer {  
  9.     public static void main(String[] args) throws MQClientException,  
  10.             InterruptedException {  
  11.         /** 
  12.          * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  13.          * 注意:ProducerGroupName需要由应用来保证唯一<br> 
  14.          * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
  15.          * 因为服务器会回查这个Group下的任意一个Producer 
  16.          */  
  17.         DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
  18.         producer.setNamesrvAddr("192.168.56.101:9876");  
  19.         producer.setInstanceName("Producer");  
  20.         producer.setVipChannelEnabled(false);  
  21.   
  22.         /** 
  23.          * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  24.          * 注意:切记不可以在每次发送消息时,都调用start方法 
  25.          */  
  26.         producer.start();  
  27.   
  28.         /** 
  29.          * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 
  30.          * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br> 
  31.          * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br> 
  32.          * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 
  33.          */  
  34.         for (int i = 0; i < 1; i++) {  
  35.             try {  
  36.                 {  
  37.                     Message msg = new Message("TopicTest1",// topic  
  38.                             "TagA",// tag  
  39.                             "OrderID001",// key  
  40.                             ("Hello MetaQ").getBytes());// body  
  41.                     SendResult sendResult = producer.send(msg);  
  42.                     System.out.println(sendResult);  
  43.                 }  
  44.   
  45.                 {  
  46.                     Message msg = new Message("TopicTest2",// topic  
  47.                             "TagB",// tag  
  48.                             "OrderID0034",// key  
  49.                             ("Hello MetaQ").getBytes());// body  
  50.                     SendResult sendResult = producer.send(msg);  
  51.                     System.out.println(sendResult);  
  52.                 }  
  53.   
  54.                 {  
  55.                     Message msg = new Message("TopicTest3",// topic  
  56.                             "TagC",// tag  
  57.                             "OrderID061",// key  
  58.                             ("Hello MetaQ").getBytes());// body  
  59.                     SendResult sendResult = producer.send(msg);  
  60.                     System.out.println(sendResult);  
  61.                 }  
  62.             } catch (Exception e) {  
  63.                 e.printStackTrace();  
  64.             }  
  65.             TimeUnit.MILLISECONDS.sleep(1000);  
  66.         }  
  67.   
  68.         /** 
  69.          * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 
  70.          * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 
  71.          */  
  72.         producer.shutdown();  
  73.     }  
  74. }  

消费者

 

[java] view plain copy
 
  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  5. import org.apache.rocketmq.client.exception.MQClientException;  
  6. import org.apache.rocketmq.common.message.MessageExt;  
  7.   
  8. import java.util.List;  
  9.   
  10. public class PushConsumer {  
  11.   
  12.     /** 
  13.      * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br> 
  14.      * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br> 
  15.      */  
  16.     public static void main(String[] args) throws InterruptedException,  
  17.             MQClientException {  
  18.         /** 
  19.          * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  20.          * 注意:ConsumerGroupName需要由应用来保证唯一 
  21.          */  
  22.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(  
  23.                 "ConsumerGroupName");  
  24.         consumer.setNamesrvAddr("192.168.56.101:9876");  
  25.         consumer.setInstanceName("Consumber");  
  26.   
  27.         /** 
  28.          * 订阅指定topic下tags分别等于TagA或TagC或TagD 
  29.          */  
  30.         consumer.subscribe("TopicTest1""TagA || TagC || TagD");  
  31.         /** 
  32.          * 订阅指定topic下所有消息<br> 
  33.          * 注意:一个consumer对象可以订阅多个topic 
  34.          */  
  35.         consumer.subscribe("TopicTest2""*");  
  36.   
  37.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  38.   
  39.             /** 
  40.              * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 
  41.              */  
  42.             @Override  
  43.             public ConsumeConcurrentlyStatus consumeMessage(  
  44.                     List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  45.                 System.out.println(Thread.currentThread().getName()  
  46.                         + " Receive New Messages: " + msgs.size());  
  47.   
  48.                 MessageExt msg = msgs.get(0);  
  49.                 if (msg.getTopic().equals("TopicTest1")) {  
  50.                     // 执行TopicTest1的消费逻辑  
  51.                     if (msg.getTags() != null && msg.getTags().equals("TagA")) {  
  52.                         // 执行TagA的消费  
  53.                         System.out.println(new String(msg.getBody()));  
  54.                     } else if (msg.getTags() != null  
  55.                             && msg.getTags().equals("TagC")) {  
  56.                         // 执行TagC的消费  
  57.                     } else if (msg.getTags() != null  
  58.                             && msg.getTags().equals("TagD")) {  
  59.                         // 执行TagD的消费  
  60.                     }  
  61.                 } else if (msg.getTopic().equals("TopicTest2")) {  
  62.                     System.out.println(new String(msg.getBody()));  
  63.                 }  
  64.   
  65.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  66.             }  
  67.         });  
  68.   
  69.         /** 
  70.          * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  71.          */  
  72.         consumer.start();  
  73.   
  74.         System.out.println("Consumer Started.");  
  75.     }  
  76. }  

 

 

http://blog.csdn.net/jayjjb/article/details/69948357

分享到:
评论

相关推荐

    RocketMQ 5.2.0

    在5.2.0版本中,它提供了一系列优化和增强的功能,使其在高并发、低延迟、高可用性和可扩展性方面表现更加出色。本篇文章将详细探讨RocketMQ 5.2.0的核心知识点及其应用场景。 1. **分布式消息传递** - **主题...

    springboot-rocketmq-demo.zip

    标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...

    基于Android的TCP服务端/客户端调试Demo

    Android TCP测试demo,包含apk和源码,支持十六进制发送接收,以太网和wifi的IP获取以及网线插拔监听,Android Things 1.0版本与Android 8.0均测试通过。WIFI和网线均可通信。可参看博文UI功能...

    RocketMQ代码demo.zip

    在"rocketMQ-demo"这个压缩包中,应该包含了一个完整的示例项目,包括了上述的配置文件、Producer和Consumer的实现,以及可能的测试代码。你可以通过导入这个项目到IDE,运行并查看日志,以了解如何实际操作RocketMQ...

    spring4.0框架demo

    本文将围绕一个基于Maven构建的Spring 4.0框架Demo,深入探讨其核心概念、配置与实践。 一、Spring 4.0的关键特性 1. Java 8支持:Spring 4.0开始全面支持Java 8,包括Lambda表达式和日期时间API,这使得代码更加...

    glide4.0 (DEMO)

    Glide 4.0 版本引入了许多改进和新特性,旨在提供更好的用户体验和开发者友好性。在这个“glide4.0 (DEMO)”中,我们将深入探讨 Glide 4.0 的关键知识点。 1. **模块化**:Glide 4.0 强调模块化,允许开发者根据...

    RocketMQ消息队列demo

    这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行交互,实现消息的发布和订阅功能。 1. ** RocketMQ的基本概念 ** - **主题(Topic)**:主题是消息...

    MACRM4.0客户端配置手册

    ### Microsoft Dynamics CRM4.0 for Microsoft Office Outlook 安装配置详解 #### 一、系统要求与环境准备 Microsoft Dynamics CRM4.0 for Microsoft Office Outlook 的安装配置,首先需确保目标计算机满足以下...

    安卓蓝牙4.0开发DEMO

    这个"安卓蓝牙4.0开发DEMO"是谷歌官方提供的一个示范项目,旨在帮助开发者理解和实践如何在Android应用中集成蓝牙4.0功能。以下是一些关键的知识点: 1. **蓝牙适配器**:在Android中,`BluetoothAdapter`是负责...

    demo4.0源代码

    "demo4.0源代码"是一个软件开发项目的重要组成部分,通常包含了实现特定功能或服务的所有编程源文件。在这个场景中,"wo de demo4.0源代码 gong can kao o!" 意味着你拥有或者正在研究一个名为"demo4.0"的项目的源码...

    GridLayout使用Demo(兼容4.0以下版本)

    总之,"GridLayout使用Demo(兼容4.0以下版本)"是一个帮助开发者在低版本Android系统上实现GridLayout功能的示例项目。它可能包含导入第三方库、创建GridLayout对象、设置子视图的布局参数等步骤,为开发者提供了一种...

    消息中间件 RocketMQ 发布和订阅 Demo

    这个“消息中间件 RocketMQ 发布和订阅 Demo”是一个适合初学者的入门示例,通过 Java 编写,利用 Maven 进行项目管理,旨在帮助开发者快速理解如何使用 RocketMQ 实现发布和订阅操作。 首先,我们需要了解 ...

    61850服务端及客户端模拟软件(demo)

    总的来说,这款61850服务端及客户端模拟软件Demo为学习者提供了一个实践61850标准的平台,不仅有助于理解标准背后的理论,还可以提高实际操作技能。通过对服务端和客户端的模拟操作,用户可以深入理解变电站自动化...

    Ext4.0MVC演示例子 Ext4.0MVC

    Ext4.0 Ext4.0MVC Demo Ext4.0 Ext4.0MVC Demo Ext4.0 Ext4.0MVC Demo Ext4.0 Ext4.0MVC Demo

    google蓝牙4.0(ble 4.0)官方demo

    **蓝牙4.0(BLE 4.0)技术详解** 蓝牙4.0,也被称为低功耗蓝牙(Bluetooth Low Energy,BLE),是蓝牙技术联盟在2010年推出的一种新标准,旨在满足物联网(IoT)设备对于短距离、低功耗通信的需求。这个标准在谷歌...

    rocketmq-demo.zip

    这个"rocketmq-demo.zip"压缩包提供了一个入门级的示例,帮助开发者理解RocketMQ的基本工作原理和使用方法。以下是对RocketMQ及其相关代码示例的详细解释。 首先,RocketMQ的核心功能是作为一个消息队列,它在生产...

    android蓝牙4.0demo

    这个"android蓝牙4.0demo"项目是针对Android系统的一个演示程序,证明了它已成功通过测试,可以用于开发和理解如何在Android上实现蓝牙4.0功能。 **蓝牙4.0的主要特性:** 1. **低功耗**:蓝牙4.0设计的目标是极低...

    客户端DEMO

    【客户端DEMO】是软件开发中的一个重要组成部分,它通常是一个预览版本或示例应用程序,旨在展示特定功能或技术如何在实际环境中运行。这个DEMO主要用于开发者之间进行沟通,或者供用户试用,以理解软件的基本操作和...

    ios 蓝牙 BLE4.0_Demo(包含 客户端,服务端)

    本文将详细讲解“iOS 蓝牙 BLE4.0_Demo”项目,它是一个用于演示如何在iOS设备上实现蓝牙低功耗(Bluetooth Low Energy,简称BLE)4.0技术的示例应用。该Demo包括客户端和服务端两部分,旨在展示RSSI(Received ...

    Android蓝牙4.0开发demo

    **Android蓝牙4.0开发Demo详解** 在移动设备开发领域,Android系统提供了强大的API支持,使得开发者能够利用蓝牙技术进行各种交互。特别是蓝牙4.0(也称为Bluetooth Low Energy,BLE),它在低功耗通信方面表现优异...

Global site tag (gtag.js) - Google Analytics