`

activemq-queue-selector

阅读更多
序上一篇:http://wangxinchun.iteye.com/blog/2145958
对于consumer而言,receive消息的时候,可以根据关键字筛选自己感兴趣的消息,当然可以定义不同的Destination 队列来承载这些消息,但是一般情况下,同一个队列可以保存一个主题的消息,减少队列的个数,为此新增了consumer的receive时的过滤功能。
case如下:

消息消费者:
/**
  通过改动intended参数,我们分别获取到了50条消息
*/
public class Consumer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final long TIMEOUT = 20000;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            //通过改动intended参数,我们分别获取到了50条消息
            MessageConsumer consumer = session.createConsumer(destination, "intended = 'you'");
            int i = 0;
            while (true) {
                Message message = consumer.receive(TIMEOUT);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Got " + i++ + ". message: " + text);
                    }
                } else {
                    break;
                }
            }

            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}



消息生产者:
/**
 * 此producer 对队列test-queue 发送消息,设置intended 属性,值分别为me you
 */
public class Producer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;

        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("Sending message #" + i);
                if (i % 2 == 0) {
                    System.out.println("Sending to me");
                    message.setStringProperty("intended", "me");
                } else {
                    System.out.println("Sending to you");
                    message.setStringProperty("intended", "you");
                }
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }

}


验证:通过Producer 发送了100 条消息。
通过Consumer 设置intended 为me 收到50 条消息,通过Consumer设置intended  为you 收到50条消息,符合预期~
0
0
分享到:
评论

相关推荐

    activemq-cpp发送接收消息,消息过滤器

    例如,我们可以在创建`MessageConsumer`时设置一个消息选择器表达式,如`selector="color='red'"`,这样只会接收那些具有`color`属性且值为`red`的消息。消息选择器基于JMS(Java Message Service)标准,因此对属性...

    ActiveMQ-Topic订阅发布模式Demo

    7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只处理符合特定条件的消息。 8. **示例代码**:文章可能提供了Java代码示例,展示如何创建连接、创建Topic、发布和订阅消息的完整流程。 9. **...

    PHP过滤(selector)接收ActiveMQ的指定队列或者主题消息

    $queue-&gt;setArguments(['x-match' =&gt; 'all', 'filter.string' =&gt; $selector]); // 接收消息 while (true) { $message = $queue-&gt;get(); if ($message !== false) { echo "Received message: " . $message-&gt;body ...

    activeMQ实例

    4. **消息筛选**:消费者可以使用Selector只接收满足特定条件的消息。 5. **消息组**:确保同组内的消息只被一个消费者消费,避免重复处理。 五、安全性与性能优化 1. **安全配置**:可以通过配置用户和角色,限制...

    springboot集成activeMQ

    而ActiveMQ则是Apache出品的一款开源的消息中间件,它实现了多种消息协议,如OpenWire、AMQP、STOMP等,是Java消息服务(JMS)的一个优秀实现。本文将详细介绍如何在Spring Boot项目中集成ActiveMQ,以及利用其特性...

    activemqactivemq

    然后,你可以创建一个Session,创建一个Destination(可能是Queue或Topic),并创建一个与之关联的Consumer,将监听器注册到Consumer上。 **发送消息**: 发送消息涉及到创建一个JMS生产者。首先,你需要创建一个...

    jms-selector:JMS Selector解析器,这是一个两遍解析器,它将选择器编译为最简单的形式,从而可以更快地执行

    在Java环境中,JMS的实现,如ActiveMQ、Apache Qpid等,都提供了内置的Selector解析功能。开发者可以通过`javax.jms.Message.getJMSSelector()`方法获取已设置的选择器,通过`MessageConsumer.setMessageListener...

    activimq demo,过滤器demo

    在ActiveMQ中,生产者(Producer)发送消息到一个或多个主题(Topic)或队列(Queue)。消费者(Consumer)则从这些主题或队列中接收消息。消息可以通过多种协议传输,如OpenWire、STOMP、AMQP和MQTT,这使得...

    ActiveMQ操作指南

    1. **消息队列(Message Queue)**:ActiveMQ的核心是消息队列,它存储并转发消息,确保消息的可靠传输。消息生产者将消息放入队列,消费者从队列中取出并处理消息。 2. **JMS(Java Message Service)**:JMS是一...

    apache MQ 点对点,发布/订阅者,开发流程

    点对点模式是一种基于队列(Queue)的消息传递模型,每个消息只会被一个消费者接收并处理。以下是在ActiveMQ中实现点对点模式的基本步骤: 1. **创建队列**:在ActiveMQ服务器上创建一个队列,可以使用XML配置文件...

    与ActiveMQ的Spring集成,用于基于POJO的服务

    在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它允许应用程序之间异步通信,提高系统的可扩展性和可靠性。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,广泛应用于Java环境中。...

    C#实现同Active MQ通讯的方法

    ActiveMQ还支持消息过滤机制,允许生产者为消息设置属性(Properties),消费者通过设置选择器(Selector)来决定是否接收特定的消息。这一特性使得消息分发可以更加精确和灵活。 文章中提到的示例代码是使用C#编写...

    JMS学习笔记精心总结

    1. **消息队列(Message Queue)与主题(Topic)**:队列遵循点对点模型,每个消息只能被一个消费者接收;主题遵循发布/订阅模型,一个消息可以被多个订阅者接收。 2. **消息**:消息是数据的载体,包含头(Header...

    JMS1.1规范(中文)

    6. **目的地(Destination)**:目的地是消息发送和接收的目标,可以是队列(Queue)或主题(Topic)。队列对应于点对点模型,主题对应于发布/订阅模型。 7. **消息工厂(ConnectionFactory)**:用于创建连接...

    chat:多用户聊天系统

    Java NIO的多路复用器(Selector)可以监控多个通道,当任何通道准备好进行读写操作时,它会通知应用程序,从而避免了线程阻塞。 3. **聊天协议**:聊天系统可能使用TCP或UDP协议来传输消息。TCP提供可靠的数据传输...

Global site tag (gtag.js) - Google Analytics