`
jahu
  • 浏览: 61059 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

rocketmq之tags

 
阅读更多
tags 标签
顾名思义,tags就是一个消息的标签。
生成方式
Message  msg = new Message("PullTopic", "pull", i+"", 1+"".getBytes());
Message  msg = new Message();
msg.setTags("pull");上面代码中的 pull 就是标签。
如何使用了。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PullTopic", "pull");
consumer.registerMessageListener(
    new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext Context) {
                        //这里你可以得到 pull标签的 消息
                        Message msg = list.get(0);       
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
            );
consumer.start();哇!!! 是不是很棒。不同的MessageListenerConcurrently 实例消费不同的消息。能解决很多问题。
呵呵,呵呵
真相如何
tags,对Broker 是没有一点意义的。也不做任何的处理。
Consumer只是依据 tags尽心消息过滤而已
下面是 tags 处理的核心方法
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                        topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
   
    public void subscribe(String topic, String fullClassName, String filterClassSource)
        throws MQClientException {
        try {
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                        topic, "*");
            subscriptionData.setSubString(fullClassName);
            subscriptionData.setClassFilterMode(true);
            subscriptionData.setFilterClassSource(filterClassSource);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
       
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
}消息过滤的方法
public class PullAPIWrapper {
   
    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
            final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

            List<MessageExt> msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() &amp;&amp; !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            }

            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }
}

//processPullResult 会被 pull操作成功之后调用
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
    private void pullAsyncImpl(//
                               final MessageQueue mq,//
                               final String subExpression,//
                               final long offset,//
                               final int maxNums,//
                               final PullCallback pullCallback,//
                               final boolean block,//
                               final long timeout) throws MQClientException, RemotingException, InterruptedException {
    .......
    this.pullAPIWrapper.pullKernelImpl(//
                    mq, // 1
                    subscriptionData.getSubString(), // 2
                    0L, // 3
                    offset, // 4
                    maxNums, // 5
                    sysFlag, // 6
                    0, // 7
                    this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
                    timeoutMillis, // 9
                    CommunicationMode.ASYNC, // 10
                    new PullCallback() {

                        @Override
                        public void onException(Throwable e) {
                            pullCallback.onException(e);
                        }


                        @Override
                        public void onSuccess(PullResult pullResult) {
                            pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper
                                    .processPullResult(mq, pullResult, subscriptionData));
                        }
                    });
                              
    }
   
    public void pullMessage(final PullRequest pullRequest) {
        ..........
         PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult =DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                pullRequest.getMessageQueue(), pullResult, subscriptionData);
        ........
    }

    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,
                                boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException {
    this.makeSureStateOK();

    if (null == mq) {
        throw new MQClientException("mq is null", null);

    }

    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }

    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }

    this.subscriptionAutomatically(mq.getTopic());

    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
                mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis =
            block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
            mq, // 1
            subscriptionData.getSubString(), // 2
            0L, // 3
            offset, // 4
            maxNums, // 5
            sysFlag, // 6
            0, // 7
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
            timeoutMillis, // 9
            CommunicationMode.SYNC, // 10
            null// 11
    );

    return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}所以 tags 作用只是 在 Consumer 做消息过滤。
比如 所有订单到 push到一个 主题里面,有N个服务这个主题进行消费,每个服务只对应一个平台的商家。类似的场景可以使用tags
DefaultMQPullConsumer 没有subscribe方法,所以消息过滤需要自己做。很好。
0
1
分享到:
评论

相关推荐

    Python RocketMQ

    **Python RocketMQ** 在分布式系统中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的可扩展性和容错性。RocketMQ是阿里巴巴开源的一款分布式消息中间件,最初由Java开发,...

    RocketMQ使用手册

    * 标签(Tags):第二级消息类型,相当于书的目录,可以基于标签做消息过滤 发送与订阅群组: * 生产组(Producer Group):用于消息的发送 * 消费组(Consumer Group):用于消息的订阅处理 RocketMQ 集群方式有...

    RocketMQ 手册.pdf

    - **标签(Tags)**:消息的第二级分类,进一步细化消息类型。如“订单交易-创建”、“订单交易-付款”等。 - **1.2.2 发送与订阅群组** - **生产组(Producer Group)**:负责消息的发送。每个生产者属于一个生产组。...

    Linux环境下maven编译好的RocketMQ最新版本4.3.0

    2. **切换分支**:进入RocketMQ目录,根据需要切换到4.3.0版本的分支,例如`git checkout tags/4.3.0`。 3. **配置Maven**:在RocketMQ项目根目录下,打开`pom.xml`文件,确保所有依赖项和插件版本与当前环境匹配。...

    全面解剖RocketMQ和项目实战-day2-part1.7z

    - 学习在SpringBoot中配置RocketMQ的相关属性,如namesrv地址、topic和tags等。 3. **11.dubbo服务提供方.mp4**: - Dubbo是阿里巴巴开源的高性能RPC框架,与RocketMQ结合可以实现服务间的异步通信。 - 了解如何...

    RocketMQ自定义selector实现消息通道定向发送和拉取

    在`select`方法中,我们将实现自己的逻辑,根据消息的属性如MessageID、Topic、Tags、Keys等进行筛选。 ```java public class CustomSelector implements MessageSelector { @Override public String select...

    Tedu五阶段SpringBoot整合RocketMQ

    此外,还将介绍RocketMQ的一些核心概念,如Keys与Tags的作用,并给出具体的Spring Boot整合RocketMQ的步骤。 #### 二、顺序消息 在顺序消息处理场景中,我们需要确保消息能够按照一定的顺序进行生产和消费。这种...

    rocketmq-flume:用于RocketMQ与Flume-ng之间的消息接收和投递

    HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...

    藏经阁-Stream Processing with Apache RocketMQ.pdf

    7. **消息过滤**:RocketMQ提供了基于标签(tags)的消息过滤机制,允许消费者根据特定条件筛选消息。 8. **Offset管理**:消费者维护自己的消费位置(offset),以便在重启后从上次停止的地方继续消费。 9. **...

    rocketmq-flume-master:flume收集日志发送到rocketmq

    rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式

    rocketmq-flume

    rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译...tags 可选 空字符串 Tag名称,遵循RocketMQ配置方式

    RocketMq事务消息发送代码流程详解

    RocketMq是一种流行的开源消息队列系统,事务消息是其核心功能之一。事务消息发送代码流程详解是指在RocketMq中实现事务消息发送的整个过程。下面我们将详细介绍RocketMq事务消息发送代码流程详解。 一、RocketMq...

    java8源码-rocketmq-docker:RocketMQDocker

    java8 源码 Apache RocketMQ ...tags/rocketmq-all-4.1.0-incubating # 修改rocketmq/distribution/conf/broker.conf 添加 listenPort = 10911配置【重要】 # 如果需要可以事先修改jvm参数配置,因为默

Global site tag (gtag.js) - Google Analytics