`
hbxflihua
  • 浏览: 676947 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RocketMQ自定义selector实现消息通道定向发送和拉取

阅读更多

RocketMQ的安装部署请参考官网Quick Start

RocketMQ的简单应用请参考官网github样例

本篇介绍如何通过自定义selector实现按messageQueue定向发送和接收消息

 

我们先看看MessageQueueSelector接口

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

 RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。

 

1、pom.xml引入rocketmq jar包

		<!-- 引入rocketmq -->
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>3.2.6</version>
		</dependency>
		
		<!-- 提供常用的lang包工具类 -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.3.2</version>
		</dependency>

 

 

2、MessageQueueSelector接口实现

package com.lh.rocketmq.selector;

import java.util.List;

import org.apache.commons.lang3.math.NumberUtils;

import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;

/**
 * 通过调用 producer.send(msg, new SelectMessageQueueByExtOrg() , queueId)指定发送通道
 *
 * @author lh
 * @since 2017-4-22
 * @version 1.0.0
 *
 */
public class SelectMessageQueueByExtOrg implements MessageQueueSelector {

	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		return mqs.get(NumberUtils.toInt(arg.toString()));
	}

}

 

3、producer通过自定义的MessageQueueSelector 发送消息

package com.lh.rocketmq.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.lh.rocketmq.common.MqConst;
import com.lh.rocketmq.selector.SelectMessageQueueByExtOrg;
/**
 * producer通过自定义的MessageQueueSelector 发送消息
 * @author lh
 * @since 2017-4-23
 *
 */
public class ProducerByExtOrgSelector {

	public static void main(String[] args) {

		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		producer.setNamesrvAddr(MqConst.NAME_SRV_ADDR);
		try {
			producer.start();
			int queueId = 0;
			for (int i = 0; i < 16000; i++) {
				queueId = i % 4;
				Message msg = new Message(MqConst.TOPIC_NAME, MqConst.TAG_PUSH + queueId, "key" + i, ("hello rocketmq " + i).getBytes());
				SendResult result = producer.send(msg, new SelectMessageQueueByExtOrg(), queueId);
				System.out.println("offset=" + result.getQueueOffset() + ", msgId=" + result.getMsgId() + ", sendStatus=" + result.getSendStatus());
			}

		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			producer.shutdown();
		}
	}

}

     mq常量类

package com.lh.rocketmq.common;
/**
 * mq常量类
 * @author lh
 *
 */
public class MqConst {
	
	/**
	 * 服务地址
	 */
	public static final String NAME_SRV_ADDR = "192.168.191.130:9876";
	
	/**
	 * 主题名称
	 */
	public static final String TOPIC_NAME = "rocketmq-simple-demo";
	/**
	 * broker名称
	 */
	public static final String BROKER_NAME = "localhost.localdomain";
	
	/**
	 * tag
	 */
	public static final String TAG_PUSH = "push";
	
	/**
	 * 消息定向queueId 
	 * 对应Message.getUserProperty(MqConst.MESSAGE_KEY_QUEUE_ID)
	 */
	public static final String MESSAGE_KEY_QUEUE_ID="queueId";
	
	
	/**
	 * 测试队列和tag相同的标识
	 */
	public static final int TARGET_QUEUEID_TAG= 0;
	
	
	

}

 

4、通过指定的messageQueue拉取消息

package com.lh.rocketmq.consumer;

import java.nio.charset.Charset;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.lh.rocketmq.common.MqConst;
/**
 * 通过指定的messageQueue拉取消息
 * @author lh
 * @since 2017-04-23
 *
 */
public class PullConsumerByQueueId {

	public static void main(String[] args) {
		long startTime = System.currentTimeMillis();
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer");
		consumer.setNamesrvAddr(MqConst.NAME_SRV_ADDR);
		long offset = 0;
		long maxOffset = offset;

		try {
			consumer.start();
			MessageQueue mq = new MessageQueue(MqConst.TOPIC_NAME, MqConst.BROKER_NAME, MqConst.TARGET_QUEUEID_TAG);			
			do{
				PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32);
				List<MessageExt> msgs = result.getMsgFoundList();
				if (msgs != null && msgs.size() != 0) {
					for (MessageExt msg : msgs) {
						System.out.println(new String(msg.getBody(), Charset.forName("utf-8")));
					}
				}
				offset = result.getNextBeginOffset();
				maxOffset = result.getMaxOffset();
				System.out.println("offset="+offset+", status="+result.getPullStatus());
			}while(offset < maxOffset);

		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			consumer.shutdown();
			long endTime = System.currentTimeMillis();			
			System.out.println("PullConsumerByQueueId\t take times="+ (endTime - startTime));
		}

	}

}

 附近为工程代码,有需要的同学请自行下载

分享到:
评论

相关推荐

    Android自定义矩形及selector、shape的使用

    在Android开发中,自定义矩形以及selector和shape的使用是构建用户界面的重要部分。它们允许开发者创造出丰富多样的视图样式,以满足各种设计需求。以下是对这些概念的详细解释: 1. 自定义矩形: 自定义矩形通常...

    Android代码-快速自定义 View 的 Selector

    This is a change Background Or TextColor Selector support library, with which you can directly specify the Background to be displayed in different states or TextColor Layout xml, such as clicking the ...

    rocketmq-starter

    在类上使用 `@RocketMQMessageListener` 注解来配置消费者的相关参数,如 topic、consumerGroup 和 selector 表达式,以决定消费者订阅哪些消息。 在生产端,你可以使用 `RocketMQTemplate` 类来发送消息。该模板类...

    自定义Radio(shape/selector)

    在本主题中,我们将深入探讨如何自定义`RadioButton`的外观,利用`shape`和`selector`来实现不同活跃状态的效果。 首先,`shape`是Android中的一个XML文件,用于定义自定义图形形状,如矩形、圆形、椭圆等,并可以...

    Android编程自定义菜单实现方法详解

    每个菜单按钮是一个`ImageButton`,通过设置不同的背景资源(`@drawable/menu_XXX_selector`)来实现按钮的选中和非选中状态。`android:layout_marginLeft`属性用来设置按钮之间的间距,`android:layout_width`和`...

    iconFont 实现selector的Demo

    在iconfont中实现selector,可以使图标在不同状态下呈现出不同的效果,增强用户界面的交互性和视觉反馈。 实现方法通常包括以下步骤: 1. **创建selector文件**:在res/drawable目录下创建一个XML文件,如`icon_...

    Android自定义照相机实现.docx编程资料

    ### Android自定义照相机实现详解 #### 一、项目背景与目标 ...通过以上步骤和技术细节的讲解,开发者可以实现一个完整的Android自定义照相机功能。这不仅有助于提升应用的功能性,还能显著增强用户体验。

    ListView Button ImageView 里应用selector选择器切换图片并保持住

    现在,我们将详细探讨如何在ListView、Button和ImageView中应用selector来实现这一功能。 首先,让我们了解什么是`selector`。Selector是Android中的一个资源类型,它可以定义不同状态下的显示样式,如按下、聚焦、...

    代码实现drawable的selector效果

    在本示例中,我们将探讨如何通过代码实现Drawable的Selector效果,以及如何处理圆形和圆角图片。 首先,Selector通常在XML中定义,但有时为了程序的动态性或避免为每个控件重复编写XML,我们可以用Java代码来创建。...

    自定义CheckBox样式

    如何自定义CheckBox的样式 1:首先在布局文件中添加CheckBox的控件配置,如: android:id="@+id/button1" style="@style/CheckBoxStyles"//这里就是用户可以自定...以上三步之后,实现CheckBox的样式自定义

    java selector 测试并发

    6. **关闭资源**:最后,确保在不再使用时关闭`Selector`和所有通道。 在多线程并发场景下,为了保证线程安全,可能需要使用`synchronized`关键字或者`Lock`来同步对共享资源的访问,防止数据竞争问题。 在给定的...

    jquery实现自定义下拉条

    2. CSS样式:为了实现自定义的背景色和拉条,我们需要为按钮和下拉内容设置相应的CSS样式。例如,可以设置背景颜色、边框、字体等属性。 ```css .custom-dropdown { position: relative; } #dropdown-toggle { ...

    flutter file-selector

    在插件使用方面,file_selector插件可以方便地实现文件选择功能,支持多选和所有类型的文件,并且在选择文件后可以获取到文件的路径和其他信息。 在使用file_selector插件时,需要注意以下几点: 在Android系统中...

    Android设置button背景selector和字体selector

    为了实现美观且交互丰富的按钮,我们可以使用Selector来定义按钮在不同状态下的背景和字体颜色。Selector是Android中的一种状态列表资源,它可以为控件在不同状态(如按下、默认、聚焦等)下显示不同的效果。 标题...

    ios-推送消息自定义脚标.zip

    在标题“ios-推送消息自定义脚标.zip”中提到的,是关于如何自定义TabBarItem的脚标,以便在接收到推送消息时,能够直观地显示未读消息的数量或者状态。TabBarItem是苹果UIKit框架中的一个关键组件,用于展示应用的...

    用selector设置button可用和不可用的样式

    本教程将详细介绍如何使用Selector来设置Button在可用和不可用状态下的样式,以提高应用的视觉效果和交互性。 Selector在Android中是一种基于状态的选择器,它可以为不同状态下的View定义不同的样式。它允许开发者...

    自定义tableview实现手势单元_Objective-C_下载.zip

    本教程将详细讲解如何使用Objective-C自定义UITableView,实现手势识别功能,以增强用户体验。 首先,我们要了解UITableView的基本结构。UITableView由多个单元格(UITableViewCell)组成,每个单元格可以显示不同...

    openwrt-firmware-selector:另一个OpenWrt固件选择器。 具有自定义图像生成器支持

    这是官方版本的Fork / Mirror,但是没有OpenWrt的特定更改,例如反馈链接和持续集成脚本。快速运行下载源并更改工作目录启动网络服务器(例如python3 -m http.server ) 在网络浏览器中转到...

    android自定义按钮效果(两种方法)

    在Android开发中,自定义按钮效果是提升...通过以上两种方法,你可以轻松地在Android应用中实现自定义按钮效果,提升应用的交互性和视觉吸引力。每种方法都有其适用场景,选择最合适的一种,让应用设计更加完善和专业。

Global site tag (gtag.js) - Google Analytics