`
IXHONG
  • 浏览: 450093 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rocketmq demo

    博客分类:
  • MQ
阅读更多

首先下载rocketmq,启动需要指定rocketmq home目录

cd github

git clone -b develop https://github.com/apache/incubator-rocketmq.git

 

whatsmars-mq
  |-src
    |-main
      |-java
        |-com.itlong.whatsmars.mq.rocketmq.quickstart
	  BrokerStartup.java
	  Consumer.java
	  NamesrvStartup.java
	  Producer.java
      |-resource
        conf.properties
  pom.xml

 依赖:

 

 

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-namesrv -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-namesrv</artifactId>
            <version>4.0.0-incubating</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-broker -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-broker</artifactId>
            <version>4.0.0-incubating</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.0.0-incubating</version>
        </dependency>

    </dependencies>

 conf.properties

 

 

rocketmqHome=D:\\github\\incubator-rocketmq\\distribution
namesrvAddr=127.0.0.1:9876
mapedFileSizeCommitLog=52428800
mapedFileSizeConsumeQueue=30000

 类似于zookeeper的服务:

 

 

public class NamesrvStartup {

    public static void main(String[] args) {
        String classpath = BrokerStartup.class.getResource("/").getPath();
        args = new String[] {"-c", classpath + "conf.properties"};
        org.apache.rocketmq.namesrv.NamesrvStartup.main(args);
    }
}

 Broker:

 

 

public class BrokerStartup {

    public static void main(String[] args) {
        String classpath = BrokerStartup.class.getResource("/").getPath();
        args = new String[] {"-c", classpath + "conf.properties"};
        org.apache.rocketmq.broker.BrokerStartup.main(args);
        System.out.println("Broker started.");
    }
}

 Consumer:

 

 

package com.itlong.whatsmars.mq.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

 Producer:

 

 

package com.itlong.whatsmars.mq.rocketmq.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        producer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

 

 

conf.properties指定rocketmqHome,namesrvAddr等,依次启动NamesrvStartup,BrokerStartup,Consumer,Producer.

消息管理系统 https://github.com/javahongxi/whatsmars/tree/master/rocketmq-console

rocketmq原理与实践 http://wely.iteye.com/blog/2392089

0
0
分享到:
评论

相关推荐

    RocketMQ代码demo.zip

    在"rocketMQ-demo"这个压缩包中,应该包含了一个完整的示例项目,包括了上述的配置文件、Producer和Consumer的实现,以及可能的测试代码。你可以通过导入这个项目到IDE,运行并查看日志,以了解如何实际操作RocketMQ...

    springboot-rocketmq-demo.zip

    标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...

    RocketMQ消息队列demo

    在“RocketMQ消息队列demo”中,我们可以理解这是一个演示如何使用RocketMQ进行消息发布和订阅的实例。这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行...

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    rocketMQ demo案例

    rocketMQ demo案例 详细文档:...

    rocketmq-demo.zip

    这个"rocketmq-demo.zip"压缩包提供了一个入门级的示例,帮助开发者理解RocketMQ的基本工作原理和使用方法。以下是对RocketMQ及其相关代码示例的详细解释。 首先,RocketMQ的核心功能是作为一个消息队列,它在生产...

    阿里RocketMQ资料

    阿里RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它在设计上强调了高可用性、高吞吐量和低延迟,被广泛应用于大型互联网公司的业务系统中,为各种微服务架构提供稳定的消息传递和事件驱动支持。本资料集合涵盖...

    RocketMq_java_demo.rar

    RocketMQ是阿里巴巴开源的...这个Java SpringBoot的RocketMQ demo是一个基础的起点,实际应用中可能需要处理更复杂的场景,比如消息幂等性、事务消息、消息过滤等。你可以根据业务需求进一步学习和扩展RocketMQ的功能。

    spring-boot操作rocketmq的demo

    spring-boot操作rocketmq的demo,亲测可用,代码整理的好

    消息中间件 RocketMQ 发布和订阅 Demo

    这个“消息中间件 RocketMQ 发布和订阅 Demo”是一个适合初学者的入门示例,通过 Java 编写,利用 Maven 进行项目管理,旨在帮助开发者快速理解如何使用 RocketMQ 实现发布和订阅操作。 首先,我们需要了解 ...

    rocketmq-console.war,rocketmqdemo例子

    总的来说,RocketMQ Console和RocketMQ Demo都是理解并有效使用RocketMQ的关键工具。通过监控控制台,我们可以直观地了解RocketMQ集群的运行状态,而通过示例代码,我们可以深入学习RocketMQ的编程模型和特性,为...

    RocketMQ学习demo

    RocketMQ学习demo

    rocketmq 3.5.8.zip及相关demo和源码及安装使用ppt

    很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...

    RocketMQ 的小demo

    初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂

    RocketMQ相关资料文档以及demo

    这个压缩包文件包含了关于RocketMQ的相关资料文档和一个demo,将帮助我们深入理解其工作原理和使用方法。 首先,我们要理解RocketMQ的基本概念。它是基于发布/订阅模式的消息队列,支持点对点和发布/订阅两种消息...

    rocketmq-demo

    在本项目"rocketmq-demo"中,我们将探讨 RocketMQ 的基本概念、工作原理以及如何通过示例代码进行实践。 1. **RocketMQ 基本概念** - **主题(Topic)**:主题是消息的逻辑分类,类似于广播频道,多个消费者可以...

    RocketMQ Java Demo

    java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker

    RocketMQ 5.2.0

    Apache RocketMQ 是一款开源的分布式消息中间件,主要设计用于处理大规模实时数据传输。在5.2.0版本中,它提供了一系列优化和增强的功能,使其在高并发、低延迟、高可用性和可扩展性方面表现更加出色。本篇文章将...

    springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听

    在本文中,我们将深入探讨如何将SpringBoot 1.5.10.RELEASE与RocketMQ 4.3.1集成,构建一个支持多个消费者监听的消息服务示例。RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟、高可用性和...

Global site tag (gtag.js) - Google Analytics