首先下载rocketmq,启动需要指定rocketmq home目录
cd github
git clone -b develop https://github.com/apache/incubator-rocketmq.git
whatsmars-mq |-src |-main |-java |-com.itlong.whatsmars.mq.rocketmq.quickstart BrokerStartup.java Consumer.java NamesrvStartup.java Producer.java |-resource conf.properties pom.xml
依赖:
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-namesrv --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-namesrv</artifactId> <version>4.0.0-incubating</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-broker --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-broker</artifactId> <version>4.0.0-incubating</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.0.0-incubating</version> </dependency> </dependencies>
conf.properties
rocketmqHome=D:\\github\\incubator-rocketmq\\distribution namesrvAddr=127.0.0.1:9876 mapedFileSizeCommitLog=52428800 mapedFileSizeConsumeQueue=30000
类似于zookeeper的服务:
public class NamesrvStartup { public static void main(String[] args) { String classpath = BrokerStartup.class.getResource("/").getPath(); args = new String[] {"-c", classpath + "conf.properties"}; org.apache.rocketmq.namesrv.NamesrvStartup.main(args); } }
Broker:
public class BrokerStartup { public static void main(String[] args) { String classpath = BrokerStartup.class.getResource("/").getPath(); args = new String[] {"-c", classpath + "conf.properties"}; org.apache.rocketmq.broker.BrokerStartup.main(args); System.out.println("Broker started."); } }
Consumer:
package com.itlong.whatsmars.mq.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ consumer.setNamesrvAddr("127.0.0.1:9876"); /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); } }
Producer:
package com.itlong.whatsmars.mq.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /* * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ producer.setNamesrvAddr("127.0.0.1:9876"); /* * Launch the instance. */ producer.start(); for (int i = 0; i < 1000; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); } }
conf.properties指定rocketmqHome,namesrvAddr等,依次启动NamesrvStartup,BrokerStartup,Consumer,Producer.
消息管理系统 https://github.com/javahongxi/whatsmars/tree/master/rocketmq-console
rocketmq原理与实践 http://wely.iteye.com/blog/2392089
相关推荐
在"rocketMQ-demo"这个压缩包中,应该包含了一个完整的示例项目,包括了上述的配置文件、Producer和Consumer的实现,以及可能的测试代码。你可以通过导入这个项目到IDE,运行并查看日志,以了解如何实际操作RocketMQ...
标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...
在“RocketMQ消息队列demo”中,我们可以理解这是一个演示如何使用RocketMQ进行消息发布和订阅的实例。这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
rocketMQ demo案例 详细文档:...
这个"rocketmq-demo.zip"压缩包提供了一个入门级的示例,帮助开发者理解RocketMQ的基本工作原理和使用方法。以下是对RocketMQ及其相关代码示例的详细解释。 首先,RocketMQ的核心功能是作为一个消息队列,它在生产...
阿里RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它在设计上强调了高可用性、高吞吐量和低延迟,被广泛应用于大型互联网公司的业务系统中,为各种微服务架构提供稳定的消息传递和事件驱动支持。本资料集合涵盖...
RocketMQ是阿里巴巴开源的...这个Java SpringBoot的RocketMQ demo是一个基础的起点,实际应用中可能需要处理更复杂的场景,比如消息幂等性、事务消息、消息过滤等。你可以根据业务需求进一步学习和扩展RocketMQ的功能。
spring-boot操作rocketmq的demo,亲测可用,代码整理的好
这个“消息中间件 RocketMQ 发布和订阅 Demo”是一个适合初学者的入门示例,通过 Java 编写,利用 Maven 进行项目管理,旨在帮助开发者快速理解如何使用 RocketMQ 实现发布和订阅操作。 首先,我们需要了解 ...
总的来说,RocketMQ Console和RocketMQ Demo都是理解并有效使用RocketMQ的关键工具。通过监控控制台,我们可以直观地了解RocketMQ集群的运行状态,而通过示例代码,我们可以深入学习RocketMQ的编程模型和特性,为...
RocketMQ学习demo
很全的rocket包及安装详细说明附加demo示例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。...
初学者可以来学习一下,rocketMQ的简单的小demo 简单易懂
这个压缩包文件包含了关于RocketMQ的相关资料文档和一个demo,将帮助我们深入理解其工作原理和使用方法。 首先,我们要理解RocketMQ的基本概念。它是基于发布/订阅模式的消息队列,支持点对点和发布/订阅两种消息...
在本项目"rocketmq-demo"中,我们将探讨 RocketMQ 的基本概念、工作原理以及如何通过示例代码进行实践。 1. **RocketMQ 基本概念** - **主题(Topic)**:主题是消息的逻辑分类,类似于广播频道,多个消费者可以...
Apache RocketMQ 是一款开源的分布式消息中间件,主要设计用于处理大规模实时数据传输。在5.2.0版本中,它提供了一系列优化和增强的功能,使其在高并发、低延迟、高可用性和可扩展性方面表现更加出色。本篇文章将...
java 使用 rocketmq的一个生产者和消费者的实现,其中要先启动rocket的nameserver 和borker
在本文中,我们将深入探讨如何将SpringBoot 1.5.10.RELEASE与RocketMQ 4.3.1集成,构建一个支持多个消费者监听的消息服务示例。RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟、高可用性和...