序上一篇:
http://wangxinchun.iteye.com/blog/2145958
对于consumer而言,receive消息的时候,可以根据关键字筛选自己感兴趣的消息,当然可以定义不同的Destination 队列来承载这些消息,但是一般情况下,同一个队列可以保存一个主题的消息,减少队列的个数,为此新增了consumer的receive时的过滤功能。
case如下:
消息消费者:
/**
通过改动intended参数,我们分别获取到了50条消息
*/
public class Consumer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private static final long TIMEOUT = 20000;
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
//通过改动intended参数,我们分别获取到了50条消息
MessageConsumer consumer = session.createConsumer(destination, "intended = 'you'");
int i = 0;
while (true) {
Message message = consumer.receive(TIMEOUT);
if (message != null) {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("Got " + i++ + ". message: " + text);
}
} else {
break;
}
}
consumer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.out.println("Could not close an open connection...");
}
}
}
}
}
消息生产者:
/**
* 此producer 对队列test-queue 发送消息,设置intended 属性,值分别为me you
*/
public class Producer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private static final int NUM_MESSAGES_TO_SEND = 100;
private static final long DELAY = 100;
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
TextMessage message = session.createTextMessage("Message #" + i);
System.out.println("Sending message #" + i);
if (i % 2 == 0) {
System.out.println("Sending to me");
message.setStringProperty("intended", "me");
} else {
System.out.println("Sending to you");
message.setStringProperty("intended", "you");
}
producer.send(message);
Thread.sleep(DELAY);
}
producer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.out.println("Could not close an open connection...");
}
}
}
}
}
验证:通过Producer 发送了100 条消息。
通过Consumer 设置intended 为me 收到50 条消息,通过Consumer设置intended 为you 收到50条消息,符合预期~
分享到:
相关推荐
例如,我们可以在创建`MessageConsumer`时设置一个消息选择器表达式,如`selector="color='red'"`,这样只会接收那些具有`color`属性且值为`red`的消息。消息选择器基于JMS(Java Message Service)标准,因此对属性...
7. **消息过滤**:在订阅时,可以使用Selector来过滤接收到的消息,只处理符合特定条件的消息。 8. **示例代码**:文章可能提供了Java代码示例,展示如何创建连接、创建Topic、发布和订阅消息的完整流程。 9. **...
$queue->setArguments(['x-match' => 'all', 'filter.string' => $selector]); // 接收消息 while (true) { $message = $queue->get(); if ($message !== false) { echo "Received message: " . $message->body ...
4. **消息筛选**:消费者可以使用Selector只接收满足特定条件的消息。 5. **消息组**:确保同组内的消息只被一个消费者消费,避免重复处理。 五、安全性与性能优化 1. **安全配置**:可以通过配置用户和角色,限制...
而ActiveMQ则是Apache出品的一款开源的消息中间件,它实现了多种消息协议,如OpenWire、AMQP、STOMP等,是Java消息服务(JMS)的一个优秀实现。本文将详细介绍如何在Spring Boot项目中集成ActiveMQ,以及利用其特性...
然后,你可以创建一个Session,创建一个Destination(可能是Queue或Topic),并创建一个与之关联的Consumer,将监听器注册到Consumer上。 **发送消息**: 发送消息涉及到创建一个JMS生产者。首先,你需要创建一个...
在Java环境中,JMS的实现,如ActiveMQ、Apache Qpid等,都提供了内置的Selector解析功能。开发者可以通过`javax.jms.Message.getJMSSelector()`方法获取已设置的选择器,通过`MessageConsumer.setMessageListener...
在ActiveMQ中,生产者(Producer)发送消息到一个或多个主题(Topic)或队列(Queue)。消费者(Consumer)则从这些主题或队列中接收消息。消息可以通过多种协议传输,如OpenWire、STOMP、AMQP和MQTT,这使得...
1. **消息队列(Message Queue)**:ActiveMQ的核心是消息队列,它存储并转发消息,确保消息的可靠传输。消息生产者将消息放入队列,消费者从队列中取出并处理消息。 2. **JMS(Java Message Service)**:JMS是一...
点对点模式是一种基于队列(Queue)的消息传递模型,每个消息只会被一个消费者接收并处理。以下是在ActiveMQ中实现点对点模式的基本步骤: 1. **创建队列**:在ActiveMQ服务器上创建一个队列,可以使用XML配置文件...
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它允许应用程序之间异步通信,提高系统的可扩展性和可靠性。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,广泛应用于Java环境中。...
ActiveMQ还支持消息过滤机制,允许生产者为消息设置属性(Properties),消费者通过设置选择器(Selector)来决定是否接收特定的消息。这一特性使得消息分发可以更加精确和灵活。 文章中提到的示例代码是使用C#编写...
1. **消息队列(Message Queue)与主题(Topic)**:队列遵循点对点模型,每个消息只能被一个消费者接收;主题遵循发布/订阅模型,一个消息可以被多个订阅者接收。 2. **消息**:消息是数据的载体,包含头(Header...
6. **目的地(Destination)**:目的地是消息发送和接收的目标,可以是队列(Queue)或主题(Topic)。队列对应于点对点模型,主题对应于发布/订阅模型。 7. **消息工厂(ConnectionFactory)**:用于创建连接...
Java NIO的多路复用器(Selector)可以监控多个通道,当任何通道准备好进行读写操作时,它会通知应用程序,从而避免了线程阻塞。 3. **聊天协议**:聊天系统可能使用TCP或UDP协议来传输消息。TCP提供可靠的数据传输...