`
greemranqq
  • 浏览: 975489 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

activem 消息选择器Selector

    博客分类:
  • JMS
阅读更多

一、序言

       消息大多数情况都是发送到broker 的,在知道Destination 的情况下,都可以消费,因此有些情况下需要我们将消息分组、隔离,或则指定A消息,只能有A消费者消费等等情况,这里做个大概的介绍和实例。

 

二、实例场景

       我们通过有时候我们需要一个queue/topic 通道,然后发送消息,但是我们要对不同消费者接受的消息进行限制,或者说过滤,就可以使用这种情况。

       我这里建立一个queue,分别发送 300条A,B消息,分别有消费者A,B接收

 

三、代码实例

       基本连接代码这里就就不贴了,建议前的

       发送者代码:

        

   Destination send_destination = session.createQueue("order_queue");
       MessageProducer producer = session.createProducer(send_destination);
 for(int i =0;i<300;i++){
            // 创建一个文本消息
            TextMessage message =  session.createTextMessage("A-张三-"+i);
            // 这里我们分别设置对应的消息信息,当成是一组消息
            message.setStringProperty("JMSXGroupID","A");
            producer.send(message);

            TextMessage message1 =  session.createTextMessage("B-李四-"+i);
            message1.setStringProperty("JMSXGroupID","B");
            producer.send(message1);
        }
 

 

   消费者代码:

 

   Destination destination = session.createQueue("order_queue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='A'");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("A:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
   消费者B

 

   

// 指定接收消息的地方
        Destination destination = session.createQueue("order_queue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='B'");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("B:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
 

 

    然后开启A,B消费者监听,启动发送者,那么就能看到消息分别消费了

    同时Selector 支持一些表达式的过滤,比如可以写成:JMSXGroupID = 'A' or JMSXGroupID = 'B'

 更多可参考:http://activemq.apache.org/features.html

 

四、其他情况分析

       1.如果A挂掉,那么B收消息会收到影响,因为在同一个队列,A 积压的消息量越多,那么B收到最后消息的时间越长。道理很简单,A挂了,B前面把大部分消息收到了,后面只有少量B的消息以及积压的A消息,分配几率一定的情况下,B发送的时间就拖久了。

 

       2.如果消费者不指定"JMSXGroupID='A'" 这个,那么会默认收取未收到的所有消息,他会默认当成一个分组了。

       3.这种模式和应答模式结合的时候,有朋友测试会卡主,但是我一直没重现~。~ 希望多测测

       

五、小结

        1.AMQ 提供的这个东西还是有一定用处,虽然可以减少了我们的通道的数量,同样的会照成通道压力过大,小范围的消息是可以的。

         2.有问题请指出,有其他的场景欢迎分享,或者需要满足其他场景的可以一起讨论。

分享到:
评论

相关推荐

    activem监听工具用于监听消息队列

    标题“activem监听工具用于监听消息队列”指出我们要讨论的是ActiveMQ提供的一个功能——监听工具。这个工具对于开发者和运维人员来说非常关键,因为它可以实时查看消息队列的状态,监控消息的发送、接收和处理情况...

    消息中间件 视频 百度云 未加密 4种消息件

    消息中间件 ActiveMQ RABBITMQ rocketMq kafka ActiveM]q消息中间件 ActiveMQ RABBITMQ rocketMq kafka ActiveM]q

    RabbitMq实例以及安装包

    另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。 还有就是,Kafka 的性能(吞吐...

    ActiveMQ从入门到精通(二)

    本文来自于jianshu,介绍了消息的顺序消费、JMS Selectors、消息的同步/异步接受方式、Message、P2P/PubSub、持久化订阅、持久化消息到MySQL以及与Spring整合等知识。...在上一篇文章中,我们已经明确知道了ActiveM

    springcloud面试准备,面经

    异步通信可以通过消息队列,如 RabbitMq、ActiveM、Kafka 等消息队列实现。 五、什么是服务熔断?什么是服务降级? 熔断机制是应对雪崩效应的一种微服务链路保护机制。当某个微服务不可用或者响应时间太长时,会...

Global site tag (gtag.js) - Google Analytics