`
gaojingsong
  • 浏览: 1210221 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【JAVA代码之RocketMQ生产和消费数据】

阅读更多

一、启动RocketMQ

[root@master ~]# cat /etc/hosts

# Do not remove the following line, or various programs

# that require network functionality will fail.

127.0.0.1               localhost.localdomain localhost

::1             localhost6.localdomain6 localhost6

 

192.168.1.106  node1

192.168.1.103  master

192.168.1.110  node2

[root@master ~]# cd /opt/alibaba-rocketmq/bin/

[root@master bin]# cat play.sh 

#!/bin/sh

#

# Name Server

#

nohup sh mqnamesrv > ns.log 2>&1 &

#

# Service Addr

#

ADDR=`hostname -i`:9876

#

# Broker

#

nohup sh mqbroker -n ${ADDR} > bk.log 2>&1 &

 

echo "Start Name Server and Broker Successfully, ${ADDR}"

[root@master bin]# sh play.sh 

Start Name Server and Broker Successfully, 192.168.1.103:9876

[root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[root@master bin]# cd ../

备注:此时topic不存在,但是生产数据的时候会自动创建

 

二、生产和消费数据

生产:

package cn.cn.mq.demo;

 

import java.util.concurrent.TimeUnit;

 

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

 

public class Producer {

   public static void main(String[] args) throws MQClientException,

         InterruptedException{

      /**

       * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>

       * 注意:ProducerGroupName需要由应用来保证唯一<br>

       * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

       * 因为服务器会回查这个Group下的任意一个Producer

       */

      final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

      producer.setNamesrvAddr("192.168.1.103:9876");

      producer.setInstanceName("Producer");

 

      /**

       * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>

       * 注意:切记不可以在每次发送消息时,都调用start方法

       */

      producer.start();

 

      /**

       * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

       * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>

       * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>

       * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

       */

      for (int i = 0; i < 3; i++){

         try {

            {

                Message msg = new Message("TopicTest1",// topic

                      "TagA",// tag

                      "OrderID001",// key

                      ("我的名字是程序员:"+i).getBytes());// body

                SendResult sendResult = producer.send(msg);

                System.out.println(sendResult);

            }

            

            {

                Message msg = new Message("TopicTest1",// topic

                      "TagC",// tag

                      "OrderID001",// key

                      ("我来测试RocketMQ:"+i).getBytes());// body

                SendResult sendResult = producer.send(msg);

                System.out.println(sendResult);

            }

         }catch(Exception e) {

            e.printStackTrace();

         }

         TimeUnit.MILLISECONDS.sleep(4000);

      }

 

      /**

       * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

       * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

       */

//    producer.shutdown();

      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

         public void run() {

            producer.shutdown();

         }

      }));

      System.exit(0);

   }

}

 

消费:

package cn.cn.mq.demo;

 

import java.util.List;

 

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageExt;

 

public class PushConsumer {

/**

* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>

* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>

*/

public static void main(String[] args) throws InterruptedException,

MQClientException {

/**

* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>

* 注意:ConsumerGroupName需要由应用来保证唯一

*/

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(

"ConsumerGroupName");

consumer.setNamesrvAddr("192.168.1.103:9876");

consumer.setInstanceName("Consumber");

 

/**

* 订阅指定topic下tags分别等于TagA或TagC或TagD

*/

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

/**

* 订阅指定topic下所有消息<br>

* 注意:一个consumer对象可以订阅多个topic

*/

//consumer.subscribe("TopicTest2", "*");

 

consumer.registerMessageListener(new MessageListenerConcurrently() {

 

public ConsumeConcurrentlyStatus consumeMessage(

List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

 

System.out.println(Thread.currentThread().getName()

+ " Receive New Messages: " + msgs.size());

 

MessageExt msg = msgs.get(0);

if (msg.getTopic().equals("TopicTest1")) {

// 执行TopicTest1的消费逻辑

if (msg.getTags() != null // 执行TagA的消费

&& msg.getTags().equals("TagA")) {

System.out.println("TagA:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 执行TagC的消费

&& msg.getTags().equals("TagC")) {

System.out.println("TagC:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 执行TagD的消费

&& msg.getTags().equals("TagD")) {

System.out.println("TagD:"+new String(msg.getBody()));

}

}  

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

 

}

});

 

/**

* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>

*/

consumer.start();

 

System.out.println("ConsumerStarted.");

}

}

 

三、验证消费结果

[root@master bin]# sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

TopicTest1

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

%RETRY%ConsumerGroupName

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[root@master bin]# sh mqadmin  topicStatus -n  192.168.1.103:9876 -t TopicTest1

#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated

master                            0     0                     4                       2016-10-20 14:38:19,236

master                            1     0                     4                       2016-10-20 14:38:19,243

master                            2     0                     2                       2016-10-20 14:38:15,171

master                            3     0                     2                       2016-10-20 14:38:15,180

[root@master bin]# shutdown -h now

 


 

 

消费数据

 



 

 

四、错误解决方案



 

  解决方案:版本不一致问题,更改同一版本问题解决

  • 大小: 43.7 KB
  • 大小: 157.2 KB
  • 大小: 122.3 KB
  • 大小: 109 KB
  • pom.rar (676 Bytes)
  • 下载次数: 7
2
1
分享到:
评论

相关推荐

    rocketMQ测试程序java版本

    通过阅读和运行这些代码,开发者可以了解RocketMQ在Java环境中的具体使用方式,为实际项目开发提供参考。 为了确保程序能够正常运行,还需要配置RocketMQ服务器,启动NameServer和Broker节点,这些步骤通常涉及修改...

    java -RocketMQ实战视频教程(上下全集)

    - **发送与接收消息**:通过具体的代码示例,演示如何使用Java API发送普通消息、顺序消息以及事务消息,并展示消息的消费逻辑。 - **性能优化**:讲解如何通过合理的配置提高RocketMQ系统的吞吐量和降低延迟,比如...

    Java_Apache RocketMQ 50的云原生实现.zip

    通过上述知识点,开发者可以了解到Java与Apache RocketMQ在云原生环境中的最佳实践,以及如何利用Java API进行消息生产和消费。同时,了解如何将RocketMQ部署到云环境中,并实现自动化的管理和监控,对于提升系统的...

    rocketmq实例代码

    RocketMQ实例代码解析 ...通过阅读和理解上述代码,你可以创建一个简单的RocketMQ生产者和消费者实例,进一步了解其工作原理和使用方法。在实际项目中,可以根据具体需求调整配置,实现更复杂的业务逻辑。

    Java_Apache RocketMQ是一个云原生消息传递和流媒体平台,使得构建事件驱动应用程序变得简单.zip

    使用Java开发RocketMQ应用程序,首先需要添加RocketMQ的依赖库到项目中,然后创建Producer和Consumer对象,编写发送和接收消息的代码。 1. 创建Producer: ```java DefaultMQProducer producer = new ...

    Rocketmq-AvroDemo:按照avro规范向rocketmq中生产和消费数据

    Rocketmq-AvroDemo 是一个基于Java的项目,其主要目的是展示如何按照Avro规范来生产和消费数据到Apache RocketMQ中。Avro是Apache Hadoop项目的一部分,它提供了一种高效、跨语言的数据序列化系统。这个Demo将帮助...

    rocketMQ所需jar包.zip

    RocketMQ使用它来增强代码的可读性和可维护性,特别是在处理字符串和执行通用编程任务时。 3. **rocketmq-client-4.6.1.jar**:这是RocketMQ的核心客户端库,包含了生产者、消费者、队列管理、事务处理等关键功能的...

    分布式消息系统RocketMQ的Java客户端使用代码示例.zip

    Java客户端是与RocketMQ服务器进行交互的主要接口,提供了丰富的API来实现消息的发送和接收。 在Java客户端使用RocketMQ的过程中,主要涉及以下几个核心概念和操作: 1. **Producer**: 生产者是发送消息的实体,你...

    RocketMQ集群、分布式WebSocket实现以及地图找房功能Java代码实现.zip

    在集群部署中,RocketMQ通过NameServer进行路由发现,Producer负责生产消息,Consumer则负责消费消息。为了保证高可用性,通常会配置多个NameServer和Broker节点,以实现故障切换和负载均衡。 在分布式WebSocket...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    5. **示例代码**:可能包含实际的Java代码示例,展示如何创建生产者和消费者线程,以及如何使用阻塞队列进行数据交换。 6. **性能优化**:在实际应用中,可能会讨论如何选择合适的队列大小、是否需要预创建线程、...

    rocketmq-demo.zip

    在Java代码中,生产者和消费者都会涉及到创建和使用主题。 4. **Message Queue**:消息队列是RocketMQ中的物理存储单位,每个主题可以有多个消息队列,消息在队列中顺序存储。消费者通常按照轮询或者负载均衡的方式...

    rocketmq监控需要的安装包

    1. **环境准备**:确保你的系统已安装Java运行环境(JRE)和Go语言环境,因为RocketMQ Exporter可能是用Go编写的。检查系统环境变量是否正确设置。 2. **构建或下载二进制**:如果你有Go环境,可以克隆"rocketmq-...

    rocketmq-3.2.6.tar.gz

    6. **消息回溯**:RocketMQ提供消息回溯功能,允许消费者重新消费历史消息,这对于调试和数据挖掘非常有用。 7. **丰富的API和客户端支持**:RocketMQ提供了Java、Python、Go等多种语言的SDK,方便不同技术栈的...

    rocketMq可视化界面

    这个可视化界面能够帮助运维人员和开发者直观地查看RocketMQ的运行状态,包括消费队列、生产者、消费者等关键指标。通过这个界面,你可以监控消息发送和接收的情况,查看消息堆积、延迟等信息,从而及时发现并解决...

    消费者实例(内含两个Java文件).zip

    总的来说,这个压缩包提供了一个基础的RocketMQ消费者实现示例,帮助开发者了解如何在Java应用中集成和使用RocketMQ进行消息消费。通过学习和调试这两个Java文件,开发者可以掌握RocketMQ的基本用法,进而将其应用到...

    rocketmq-client.zip

    在Python代码中,我们需要初始化JVM,加载RocketMQ的Java类,然后创建并使用Producer和Consumer对象。 在实际应用中,可能会遇到的挑战包括性能优化(如批量发送消息、合理设置Consumer的拉取消息间隔)、消息丢失...

    rocketmq-console.rar

    1. **集群监控**:RocketMQ Console可以展示各个Broker的运行状态,包括在线的Broker数量、每个Broker的存储容量、消息发送和消费的速率等关键指标。 2. **主题与队列管理**:用户可以通过界面创建、删除和修改...

    RocketMQ 3.2.6

    11. **Java API**:RocketMQ 3.2.6 版本提供了 Java 开发接口,开发者可以方便地在 Java 应用中集成 RocketMQ,实现消息的发送和消费。 学习RocketMQ 3.2.6,你需要理解以上核心概念,并通过实践操作加深理解。同时...

    rocketmq-spring-master_java_rocketmqchecker_

    5. **NameServer**:RocketMQ的路由中心,生产者和消费者都需要通过NameServer获取主题和队列的路由信息。 6. **RocketMQChecker**:这个标签可能指的是RocketMQ的健康检查工具,用于检测RocketMQ集群的运行状态,...

Global site tag (gtag.js) - Google Analytics