一、消息选择器
消息选择器:过滤消息属性与设置条件相等的消息进行消费。语义与sql一致。
private final String selector_1 = "sex='w'";
this.consumer = session.createConsumer(destination, selector_1);
二、消息异步接收
消息异步接收 :当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息
示例:
public class Producer {
// 建立connectionFactory工厂对象
private ActiveMQConnectionFactory connectionFactory;
// 连接对象
private Connection connection;
// session对象
private Session session;
// 生产者
private MessageProducer producer;
public Producer() {
this.connectionFactory = new ActiveMQConnectionFactory();
try {
this.connection = connectionFactory.createConnection("fu", "fu");
this.connection.start();
//参一:开启事务,参二,手工签收
this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
this.producer = session.createProducer(null);
} catch (Exception e) {
e.printStackTrace();
}
}
public void send() throws Exception {
Destination destination = this.session.createQueue("first");
MapMessage map = this.session.createMapMessage();
//设置消息
map.setString("name", "zs");
map.setString("age", "40");
//设置消息属性
map.setStringProperty("sex", "m");
MapMessage map1 = this.session.createMapMessage();
map1.setString("name", "ls");
map1.setString("age", "20");
map1.setStringProperty("sex", "w");
MapMessage map2 = this.session.createMapMessage();
map2.setString("name", "ww");
map2.setString("age", "35");
map2.setStringProperty("sex", "w");
//参一目标,参二数据,参三非持久化,参四做优先及,参五失效时间
this.producer.send(destination, map, DeliveryMode.PERSISTENT, 2, 1000*10);
this.producer.send(destination, map1, DeliveryMode.PERSISTENT, 2, 1000*10);
this.producer.send(destination, map2, DeliveryMode.PERSISTENT, 9, 1000*10);
//提交事务
this.session.commit();
//关闭连接
this.connection.close();
}
public static void main(String[] args) throws Exception {
Producer p = new Producer();
p.send();
}
}
public class Comsumer {
// private final String selector_0 = "age>30";
// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
private final String selector_1 = "sex='w'";
// 建立connectionFactory工厂对象
private ActiveMQConnectionFactory connectionFactory;
// 连接对象
private Connection connection;
// session对象
private Session session;
// 生产者
private MessageConsumer consumer;
// 目标地址
private Destination destination;
public Comsumer() {
this.connectionFactory = new ActiveMQConnectionFactory();
try {
this.connection = connectionFactory.createConnection("fu", "fu");
this.connection.start();
this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
this.destination = this.session.createQueue("first");
// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
this.consumer = session.createConsumer(destination, selector_1);
} catch (Exception e) {
e.printStackTrace();
}
}
public void recever() throws Exception {
// 消息异步接收:当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息
this.consumer.setMessageListener(new Listener());
}
class Listener implements MessageListener {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
} else if (message instanceof MapMessage) {
MapMessage m = (MapMessage) message;
System.out.println(m.toString());
System.out.println(m.getString("name"));
System.out.println(m.getString("age"));
// 手工签收消息
m.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Comsumer c = new Comsumer();
c.recever();
}
}
分享到:
相关推荐
同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活运用。 通过以上讲解,我们可以看出,ActiveMQ的发送消息并接收返回信息涉及到JMS规范的多个层面,包括...
6. **消息选择器**:可以通过设置消息选择器来过滤接收的消息,只处理满足特定条件的消息。 7. **消息持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,未被消费的消息也不会丢失。 8. **监控与管理*...
例如,可以使用`<jms:listener-container>`元素配置一个ActiveMQ的连接工厂,然后定义消息监听器来处理接收到的消息。 ```xml <bean class="org.apache.activemq.ActiveMQConnectionFactory"> ``...
- **消息选择器**:允许消费者基于特定条件筛选消息。 - **主题(Topics)和队列(Queues)的结合**:使用虚拟主题(Virtual Topics)实现。 - **事务支持**:确保消息的一致性。 - **网络连接**:多个ActiveMQ...
**ActiveMQ与Spring集成实例——使用消息转换器** 在企业级应用开发中,消息队列(Message Queue,MQ)作为一种解耦和异步处理的重要工具,被广泛应用。Apache ActiveMQ 是一个开源的消息中间件,它支持多种消息...
本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...
JMS(Java Message Service)是Java平台中用于与MQ交互的标准API,它提供了一种可靠且可扩展的方式来发送和接收消息。在本篇讨论中,我们将关注如何利用Spring框架和ActiveMQ来实现JMS消息队列。 首先,我们需要...
2. **会话与消息**:在连接建立后,可以创建会话来处理消息。会话是线程安全的,允许并发执行多个操作。消息分为多种类型,包括文本、对象、流和文件等,它们可以通过生产者发送到队列或主题,也可以由消费者接收。 ...
在本文中,我们将深入探讨如何使用Python与ActiveMQ进行交互,特别关注如何发送和接收消息。ActiveMQ是一款开源的消息中间件,由Java开发,它支持多种协议,包括AMQP、MQTT、OpenWire和Stomp。在Python环境中,通常...
- **消息选择器**:如何通过消息选择器筛选特定的消息进行消费。 - **网络拓扑和故障转移**:理解网络故障时的集群和复制策略,以及如何配置。 - **安全性设置**:如何设置用户权限,以及使用SSL/TLS进行安全通信...
MessageProducer对象允许指定消息的传送模式、优先级和有效期,而MessageConsumer可以使用消息选择器,选择符合特定标准的消息。此外,消费者支持同步和异步两种接收消息的方式,异步接收通常通过注册Message...
5. **消息筛选和分组**:通过消息选择器,可以实现对消息的筛选和分组,提高处理效率。 6. **消息优先级和时间戳**:消息可以设置优先级,确保关键消息优先处理;同时,消息带有时间戳,便于跟踪和管理。 **...
4. 选择器:介绍如何使用消息选择器,让消费者只接收满足特定条件的消息。 5. 安全与认证:展示如何配置ActiveMQ的用户权限和身份验证。 6. 路由与过滤:探讨消息路由策略,如网络连接、虚拟主题以及使用DLQ(Dead...
3. **消息选择器**:允许消费者根据特定条件选择接收消息,提高消息处理效率。 4. **Spring Boot集成**:在Spring Boot项目中,可以通过自动配置简化整合过程。 总结,Spring与ActiveMQ的整合能帮助我们构建高可用...
- **消息选择器**:允许消费者基于消息的属性过滤接收到的消息。 - **预定义目的地**:通过配置文件预先定义队列和主题,简化代码。 - **事务支持**:使用JMS事务确保消息的一致性。 - **消息优先级**:指定消息...
对于消费者,你可以设置一个消息选择器,以便只接收满足特定条件的消息。 ActiveMQ提供了多种消息类型,包括TextMessage(文本消息)、ObjectMessage(序列化对象消息)、BytesMessage(二进制消息)等。每种消息...
2. 消息监听器:使用`MessageListener`接口,可以实现异步接收消息,提高系统性能。 3. 事务支持:对于需要确保消息完整性的场景,可以开启JMS事务,保证消息的原子性。 4. 持久化与性能:根据需求调整ActiveMQ的...
6. **消息 selectors**:解释如何使用消息选择器来过滤接收到的消息,只处理满足条件的消息。 7. **事务与应答**:展示如何在JMS中使用事务保证消息的一致性,以及消费者确认(acknowledgement)机制。 8. **持久...
Apache ActiveMQ 是一个流行的开源消息代理,它实现了 Java Message Service (JMS) 规范,允许应用程序之间进行异步通信。在本教程中,我们将详细介绍如何安装 ActiveMQ,以及如何使用 JMS 进行开发,包括点对点...
10. **消息过滤与选择器**:通过使用消息选择器,消费者可以选择性地接收特定类型的消息,增强了消息处理的灵活性。 总之,Apache ActiveMQ 5.11版本提供了强大的消息中间件功能,支持各种消息模型、持久化策略和...