tags 标签
顾名思义,tags就是一个消息的标签。
生成方式
Message msg = new Message("PullTopic", "pull", i+"", 1+"".getBytes());
Message msg = new Message();
msg.setTags("pull");上面代码中的 pull 就是标签。
如何使用了。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PullTopic", "pull");
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {
//这里你可以得到 pull标签的 消息
Message msg = list.get(0);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();哇!!! 是不是很棒。不同的MessageListenerConcurrently 实例消费不同的消息。能解决很多问题。
呵呵,呵呵
真相如何
tags,对Broker 是没有一点意义的。也不做任何的处理。
Consumer只是依据 tags尽心消息过滤而已
下面是 tags 处理的核心方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
public void subscribe(String topic, String fullClassName, String filterClassSource)
throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}消息过滤的方法
public class PullAPIWrapper {
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
}
//processPullResult 会被 pull操作成功之后调用
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private void pullAsyncImpl(//
final MessageQueue mq,//
final String subExpression,//
final long offset,//
final int maxNums,//
final PullCallback pullCallback,//
final boolean block,//
final long timeout) throws MQClientException, RemotingException, InterruptedException {
.......
this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.ASYNC, // 10
new PullCallback() {
@Override
public void onException(Throwable e) {
pullCallback.onException(e);
}
@Override
public void onSuccess(PullResult pullResult) {
pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper
.processPullResult(mq, pullResult, subscriptionData));
}
});
}
public void pullMessage(final PullRequest pullRequest) {
..........
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
........
}
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,
boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
this.makeSureStateOK();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis =
block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
mq, // 1
subscriptionData.getSubString(), // 2
0L, // 3
offset, // 4
maxNums, // 5
sysFlag, // 6
0, // 7
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
timeoutMillis, // 9
CommunicationMode.SYNC, // 10
null// 11
);
return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}所以 tags 作用只是 在 Consumer 做消息过滤。
比如 所有订单到 push到一个 主题里面,有N个服务这个主题进行消费,每个服务只对应一个平台的商家。类似的场景可以使用tags
DefaultMQPullConsumer 没有subscribe方法,所以消息过滤需要自己做。很好。
分享到:
相关推荐
**Python RocketMQ** 在分布式系统中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的可扩展性和容错性。RocketMQ是阿里巴巴开源的一款分布式消息中间件,最初由Java开发,...
* 标签(Tags):第二级消息类型,相当于书的目录,可以基于标签做消息过滤 发送与订阅群组: * 生产组(Producer Group):用于消息的发送 * 消费组(Consumer Group):用于消息的订阅处理 RocketMQ 集群方式有...
- **标签(Tags)**:消息的第二级分类,进一步细化消息类型。如“订单交易-创建”、“订单交易-付款”等。 - **1.2.2 发送与订阅群组** - **生产组(Producer Group)**:负责消息的发送。每个生产者属于一个生产组。...
2. **切换分支**:进入RocketMQ目录,根据需要切换到4.3.0版本的分支,例如`git checkout tags/4.3.0`。 3. **配置Maven**:在RocketMQ项目根目录下,打开`pom.xml`文件,确保所有依赖项和插件版本与当前环境匹配。...
- 学习在SpringBoot中配置RocketMQ的相关属性,如namesrv地址、topic和tags等。 3. **11.dubbo服务提供方.mp4**: - Dubbo是阿里巴巴开源的高性能RPC框架,与RocketMQ结合可以实现服务间的异步通信。 - 了解如何...
在`select`方法中,我们将实现自己的逻辑,根据消息的属性如MessageID、Topic、Tags、Keys等进行筛选。 ```java public class CustomSelector implements MessageSelector { @Override public String select...
此外,还将介绍RocketMQ的一些核心概念,如Keys与Tags的作用,并给出具体的Spring Boot整合RocketMQ的步骤。 #### 二、顺序消息 在顺序消息处理场景中,我们需要确保消息能够按照一定的顺序进行生产和消费。这种...
HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...
7. **消息过滤**:RocketMQ提供了基于标签(tags)的消息过滤机制,允许消费者根据特定条件筛选消息。 8. **Offset管理**:消费者维护自己的消费位置(offset),以便在重启后从上次停止的地方继续消费。 9. **...
rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式
rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式
RocketMq是一种流行的开源消息队列系统,事务消息是其核心功能之一。事务消息发送代码流程详解是指在RocketMq中实现事务消息发送的整个过程。下面我们将详细介绍RocketMq事务消息发送代码流程详解。 一、RocketMq...
java8 源码 Apache RocketMQ ...tags/rocketmq-all-4.1.0-incubating # 修改rocketmq/distribution/conf/broker.conf 添加 listenPort = 10911配置【重要】 # 如果需要可以事先修改jvm参数配置,因为默