阿里云开发地址:https://www.aliyun.com
1.阿里云账号:springstudent2016
2.GitHub 账号:gaoweigang/298gaoweigang_20180123
注册GitHub使用的邮箱:1245508721@qq.com
3.博客:http://www.aiuxian.com/article/p-1933708.html
http://blog.csdn.net/xiaojie19871116/article/details/46982907
http://blog.csdn.net/loongshawn/article/details/51086876
4.rocketmq命令:http://jameswxx.iteye.com/blog/2091971
5.linux命令大全:http://man.linuxde.net/sh
6.分布式消息队列RocketMQ部署与监控:https://my.oschina.net/boltwu/blog/472905
7.rocketmq 消息队列的顺序性问题:https://my.oschina.net/u/1589819/blog/787823
一:RocketMQ消息队列环境搭建
http://blog.csdn.net/loongshawn/article/details/51086876
注意:每次在启动Broker之前需要指定nameserver地址(或者将nameserver地址配置到环境变量之中),其中10.125.1.186为所在服务器IP,eg:export NAMESRV_ADDR=10.125.1.186:9876
二:测试RocketMQ消息队列
①创建Maven项目目录结构如下:
②pom文件依赖配置
③创建生产者
package com.alibaba.rocketmq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; public class Producer { //使用你的账号构建一个客户端实例来访问DefaultMQProducer private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); private static int initialState = 0; private Producer(){ } public static DefaultMQProducer getDefaultMQProducer(){ if(producer == null){ producer = new DefaultMQProducer("ProducerGroupName"); } if(initialState == 0){ producer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址 try{ producer.start(); } catch(MQClientException e){ e.printStackTrace(); } initialState = 1; } return producer; } }
④创建消费者
package com.alibaba.rocketmq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; public class Consumer { private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); private static int initialState = 0; private Consumer(){ } public static DefaultMQPushConsumer getDefaultMQPushConsumer(){ if(consumer == null){ consumer = new DefaultMQPushConsumer("ConsumerGroupName"); } if(initialState == 0){ consumer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); initialState = 1; } return consumer; } }
⑤生产者生产消息
package com.alibaba.rocketmq.service; import org.apache.log4j.Logger; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.producer.Producer; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class ProducerTtest { private static final Logger LOGGER = Logger.getLogger(ProducerTtest.class); public static void main(String[] args) { sendMsg(); } //生产者发送消息 public static void sendMsg(){ //获取消息生产者 DefaultMQProducer producer = Producer.getDefaultMQProducer(); for(int i = 0; i < 2000 ;i++){ Message msg = new Message("TopicTest1", //topic "TagA", //tag "OrderIDOO"+i, //key ("Hello MetaQ"+i).getBytes()); //body SendResult sendResult; try { sendResult = producer.send(msg); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } producer.shutdown(); } }
⑥消费者消费消息
package com.alibaba.rocketmq.service; import java.util.List; import org.apache.log4j.Logger; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.consumer.Consumer; public class ConsumerTest { private static final Logger LOGGER = Logger.getLogger(ConsumerTest.class); public static void main(String[] args) { receiveMsg(); } // 消费者接受消息 public static void receiveMsg() { // 获取消息消费者 DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer(); // 订阅主题 try { consumer.subscribe("TopicTest1", "*"); consumer.setConsumerGroup("gaoweigang");//设置消费组 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,可以通过设置consumerMessageBatchMaxSize参数来批量接受消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { LOGGER.info(Thread.currentThread().getName()+" , Receive new Messages: "+msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 LOGGER.info(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagB")) { // 执行TagB的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 执行TagC的消费 } } else if (msg.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Consumer对象在使用之前必须要调用start consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
⑦执行ProducerTest,然后使用如下命令查看指定主题中的数据
RocketMQ命令:
用法:
相关推荐
通过 Rocket MQ,秒杀请求被放入消息队列,缓解了下游通知系统的压力,保证了系统稳定性和消息的正确传递。 - 用户请求 → 秒杀业务处理系统 → Rocket MQ → 下游通知系统 → 用户通知 2. **异步解耦**: 用户注册...
这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
在这个“my ali rocket mq学习demo”中,我们有两个关键的Java源文件:Consumer.java和Producer.java,它们分别代表了消息队列中的生产者和消费者角色。 首先,我们来了解一下RocketMQ的基本概念: 1. **生产者...
### RocketMQ 用户指南 v3.0.4 关键知识点...RocketMQ作为一款高性能的消息队列服务,在实际应用中具有广泛的应用场景和技术价值。通过深入理解这些知识点,可以帮助开发者更好地利用RocketMQ构建稳定可靠的应用系统。
TongLINK/Q V7.0新增了多核支持、网络灵活性、队列分组、逻辑队列、动态调整与扩展、B/S远程配置管理、发布订阅、组消息、消息事务、消息浏览以及强大的管理工具等功能,强调高效管理和低系统开销。 2. **技术分析*...
1. 电商系统:使用高可用、可持久化的消息中间件,如 Active MQ、Rabbit MQ、Rocket Mq,实现消息队列的高可用性和持久化。 2. 消息确认模式:使用消息队列的确认模式,确保消息的完整性和可靠性。 3. 扩展流程:...
1. 查看集群状态:包括NameServer、Broker的运行状态,以及消息队列、主题等信息。 2. 消息跟踪:追踪消息在整个系统中的流转过程,帮助定位问题。 3. 消费者管理:查看和调整消费者的消费进度,监控消费状态。 4. ...
例如,合理设置消息队列的数量、批处理大小、消费线程数等参数,可以优化吞吐量和响应时间。同时,利用RocketMQ提供的监控工具,可以实时查看系统运行状态,及时发现并解决问题。 总的来说,《RocketMQ开发指南》...
- **根据消息Key查询消息**:使用`findMessageByKey`命令根据消息Key查询消息。 - **根据Offset查询消息**:使用`findMessageByOffset`命令根据Offset查询消息。 #### 七、网络连接管理 - **查询Producer的网络连接...
在RocketMQ中,Selector是用于过滤和选择消息的关键组件,它允许消费者根据特定的条件从队列中选择要消费的消息。默认的Selector类型包括Tag(标签)、Key(关键字)和SQL92(基于SQL92语法的查询)。然而,为了满足...
《RocketmqBenchmark:基于Terraform的Rocket MQ性能测试在UCloud的应用》 RocketMQ,由阿里巴巴开源,是一款高效、稳定的分布式消息中间件,广泛应用于大数据处理和实时交易系统。它提供了高可靠、高可用的消息...
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。具有以下特点: 能够...
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。 具有以下特点: 1、能够保证严格的消息顺序 2、提供丰富的消息拉取模式 3、高效的订阅者水平...
2. **主题与队列管理**:用户可以通过界面创建、删除和修改RocketMQ的主题(Topic)以及队列(Queue),同时可以查看每个主题的分区和副本分布情况。 3. **消息跟踪**:提供消息轨迹查询功能,帮助开发者追踪消息的...
对RocketMQ队列消息进行了一个整体分析,内容还算可以吧
标签中的“rocket mq”和“rocketmq”是对同一种技术的两种表述方式,它们指的是阿里巴巴开源的分布式消息中间件RocketMQ。这个标签帮助人们快速定位到与RocketMQ相关的资料、讨论和使用方法,是搜索和学习RocketMQ...