`
weigang.gao
  • 浏览: 486304 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

rocket MQ消息队列

 
阅读更多

阿里云开发地址:https://www.aliyun.com

1.阿里云账号:springstudent2016  

2.GitHub 账号:gaoweigang/298gaoweigang_20180123

  注册GitHub使用的邮箱:1245508721@qq.com

3.博客:http://www.aiuxian.com/article/p-1933708.html

          http://blog.csdn.net/xiaojie19871116/article/details/46982907

          http://blog.csdn.net/loongshawn/article/details/51086876

4.rocketmq命令:http://jameswxx.iteye.com/blog/2091971

5.linux命令大全:http://man.linuxde.net/sh

6.分布式消息队列RocketMQ部署与监控:https://my.oschina.net/boltwu/blog/472905

7.rocketmq 消息队列的顺序性问题:https://my.oschina.net/u/1589819/blog/787823

一:RocketMQ消息队列环境搭建

http://blog.csdn.net/loongshawn/article/details/51086876

注意:每次在启动Broker之前需要指定nameserver地址(或者将nameserver地址配置到环境变量之中),其中10.125.1.186为所在服务器IP,eg:export NAMESRV_ADDR=10.125.1.186:9876

 

二:测试RocketMQ消息队列

①创建Maven项目目录结构如下:

②pom文件依赖配置


 

③创建生产者

package com.alibaba.rocketmq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class Producer {
	
	
	//使用你的账号构建一个客户端实例来访问DefaultMQProducer
	private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
	private static int initialState = 0;
	
	private Producer(){
		
	}
	
	public static DefaultMQProducer getDefaultMQProducer(){
		if(producer == null){
			producer = new DefaultMQProducer("ProducerGroupName");
		}
		if(initialState == 0){
			producer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
			try{
				producer.start();
			} catch(MQClientException e){
				e.printStackTrace();
			}
			initialState = 1;
		}
		
		return producer;
	}

}

 

④创建消费者

package com.alibaba.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {
	
	private static DefaultMQPushConsumer  consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    private static int initialState = 0;
    
    private Consumer(){
    	
    }
    
    public static DefaultMQPushConsumer  getDefaultMQPushConsumer(){
    	if(consumer == null){
    		consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    	}
    	
    	if(initialState == 0){
    		consumer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		initialState = 1;
    	}
    	return consumer;
    }
}

⑤生产者生产消息

package com.alibaba.rocketmq.service;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.producer.Producer;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class ProducerTtest {
	
	private static final Logger LOGGER = Logger.getLogger(ProducerTtest.class);
	
	public static void main(String[] args) {
		sendMsg();
	}
	
	//生产者发送消息
	public static void sendMsg(){
		//获取消息生产者
		DefaultMQProducer producer = Producer.getDefaultMQProducer();
		
		for(int i = 0; i < 2000 ;i++){
			Message msg = new Message("TopicTest1",   //topic
					                  "TagA",         //tag
					                  "OrderIDOO"+i,  //key
					                  ("Hello MetaQ"+i).getBytes()); //body
			
			SendResult sendResult;
			try {
				sendResult = producer.send(msg);
			} catch (MQClientException e) {
				e.printStackTrace();
			} catch (RemotingException e) {
				e.printStackTrace();
			} catch (MQBrokerException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		producer.shutdown();
	}

}

 

⑥消费者消费消息

package com.alibaba.rocketmq.service;

import java.util.List;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.consumer.Consumer;

public class ConsumerTest {

	private static final Logger LOGGER = Logger.getLogger(ConsumerTest.class);
	
	public static void main(String[] args) {
		receiveMsg();
	}

	// 消费者接受消息
	public static void receiveMsg() {
		// 获取消息消费者
		DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

		// 订阅主题
		try {
			consumer.subscribe("TopicTest1", "*");
			consumer.setConsumerGroup("gaoweigang");//设置消费组
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				/**
				 * 默认msgs里只有一条消息,可以通过设置consumerMessageBatchMaxSize参数来批量接受消息
				 */
				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {

					LOGGER.info(Thread.currentThread().getName()+" , Receive new Messages: "+msgs.size());
					MessageExt msg = msgs.get(0);

					if (msg.getTopic().equals("TopicTest1")) {
						// 执行TopicTest1的消费逻辑
						if (msg.getTags() != null
								&& msg.getTags().equals("TagA")) {
							// 执行TagA的消费
							LOGGER.info(new String(msg.getBody()));
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagB")) {
							// 执行TagB的消费
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagC")) {
							// 执行TagC的消费
						}
					} else if (msg.getTopic().equals("TopicTest2")) {
						// 执行TopicTest2的消费逻辑
					}

					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			// Consumer对象在使用之前必须要调用start
			consumer.start();

		} catch (MQClientException e) {

			e.printStackTrace();
		}
	}

}

⑦执行ProducerTest,然后使用如下命令查看指定主题中的数据



RocketMQ命令:


用法:


 

 

  • 大小: 1.5 KB
  • 大小: 20.4 KB
  • 大小: 7.9 KB
  • 大小: 26.5 KB
  • 大小: 15.5 KB
分享到:
评论

相关推荐

    Rocket MQ 使用排查指南1

    通过 Rocket MQ,秒杀请求被放入消息队列,缓解了下游通知系统的压力,保证了系统稳定性和消息的正确传递。 - 用户请求 → 秒杀业务处理系统 → Rocket MQ → 下游通知系统 → 用户通知 2. **异步解耦**: 用户注册...

    rocket mq 视频(龙果)

    这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    my ali rocket mq学习demo

    在这个“my ali rocket mq学习demo”中,我们有两个关键的Java源文件:Consumer.java和Producer.java,它们分别代表了消息队列中的生产者和消费者角色。 首先,我们来了解一下RocketMQ的基本概念: 1. **生产者...

    Rocket MQ 用户指南 v3.0.4

    ### RocketMQ 用户指南 v3.0.4 关键知识点...RocketMQ作为一款高性能的消息队列服务,在实际应用中具有广泛的应用场景和技术价值。通过深入理解这些知识点,可以帮助开发者更好地利用RocketMQ构建稳定可靠的应用系统。

    TongLINKQ与MQ对比分析报告.doc

    TongLINK/Q V7.0新增了多核支持、网络灵活性、队列分组、逻辑队列、动态调整与扩展、B/S远程配置管理、发布订阅、组消息、消息事务、消息浏览以及强大的管理工具等功能,强调高效管理和低系统开销。 2. **技术分析*...

    浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    1. 电商系统:使用高可用、可持久化的消息中间件,如 Active MQ、Rabbit MQ、Rocket Mq,实现消息队列的高可用性和持久化。 2. 消息确认模式:使用消息队列的确认模式,确保消息的完整性和可靠性。 3. 扩展流程:...

    rocket_mq.rar

    1. 查看集群状态:包括NameServer、Broker的运行状态,以及消息队列、主题等信息。 2. 消息跟踪:追踪消息在整个系统中的流转过程,帮助定位问题。 3. 消费者管理:查看和调整消费者的消费进度,监控消费状态。 4. ...

    消息中间件 RocketMQ 开发指南

    例如,合理设置消息队列的数量、批处理大小、消费线程数等参数,可以优化吞吐量和响应时间。同时,利用RocketMQ提供的监控工具,可以实时查看系统运行状态,及时发现并解决问题。 总的来说,《RocketMQ开发指南》...

    rocket 运维

    - **根据消息Key查询消息**:使用`findMessageByKey`命令根据消息Key查询消息。 - **根据Offset查询消息**:使用`findMessageByOffset`命令根据Offset查询消息。 #### 七、网络连接管理 - **查询Producer的网络连接...

    RocketMQ自定义selector实现消息通道定向发送和拉取

    在RocketMQ中,Selector是用于过滤和选择消息的关键组件,它允许消费者根据特定的条件从队列中选择要消费的消息。默认的Selector类型包括Tag(标签)、Key(关键字)和SQL92(基于SQL92语法的查询)。然而,为了满足...

    RocketmqBenchmark:Rocket MQ Benchmark Terraform UCloud

    《RocketmqBenchmark:基于Terraform的Rocket MQ性能测试在UCloud的应用》 RocketMQ,由阿里巴巴开源,是一款高效、稳定的分布式消息中间件,广泛应用于大数据处理和实时交易系统。它提供了高可靠、高可用的消息...

    rocketmq-all-4.4.0-source-release.zip

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。具有以下特点: 能够...

    RocketMQ安装包

    RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。 具有以下特点: 1、能够保证严格的消息顺序 2、提供丰富的消息拉取模式 3、高效的订阅者水平...

    rocketmq-console.rar

    2. **主题与队列管理**:用户可以通过界面创建、删除和修改RocketMQ的主题(Topic)以及队列(Queue),同时可以查看每个主题的分区和副本分布情况。 3. **消息跟踪**:提供消息轨迹查询功能,帮助开发者追踪消息的...

    RocketMQ原理分析

    对RocketMQ队列消息进行了一个整体分析,内容还算可以吧

    RocketMQ-原理简介

    标签中的“rocket mq”和“rocketmq”是对同一种技术的两种表述方式,它们指的是阿里巴巴开源的分布式消息中间件RocketMQ。这个标签帮助人们快速定位到与RocketMQ相关的资料、讨论和使用方法,是搜索和学习RocketMQ...

Global site tag (gtag.js) - Google Analytics