`

ActiveMQ消息选择器与异步接收

 
阅读更多
一、消息选择器
   消息选择器:过滤消息属性与设置条件相等的消息进行消费。语义与sql一致。
  
   private final String selector_1 = "sex='w'";
   this.consumer = session.createConsumer(destination, selector_1);
   


二、消息异步接收
   消息异步接收 :当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息

示例:

public class Producer {
	// 建立connectionFactory工厂对象
	private ActiveMQConnectionFactory connectionFactory;
	// 连接对象
	private Connection connection;
	// session对象
	private Session session;
	// 生产者
	private MessageProducer producer;

	public Producer() {
		this.connectionFactory = new ActiveMQConnectionFactory();
		try {
			this.connection = connectionFactory.createConnection("fu", "fu");
			this.connection.start();
			//参一:开启事务,参二,手工签收
			this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
			this.producer = session.createProducer(null);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void send() throws Exception {
		Destination destination = this.session.createQueue("first");

		MapMessage map = this.session.createMapMessage();
		//设置消息
		map.setString("name", "zs");
		map.setString("age", "40");
		//设置消息属性
		map.setStringProperty("sex", "m");

		MapMessage map1 = this.session.createMapMessage();
		map1.setString("name", "ls");
		map1.setString("age", "20");
		map1.setStringProperty("sex", "w");
		
		MapMessage map2 = this.session.createMapMessage();
		map2.setString("name", "ww");
		map2.setString("age", "35");
		map2.setStringProperty("sex", "w");
		//参一目标,参二数据,参三非持久化,参四做优先及,参五失效时间
		this.producer.send(destination, map, DeliveryMode.PERSISTENT, 2, 1000*10);
		this.producer.send(destination, map1, DeliveryMode.PERSISTENT, 2, 1000*10);
		this.producer.send(destination, map2, DeliveryMode.PERSISTENT, 9, 1000*10);

		//提交事务
        this.session.commit();
        //关闭连接
        this.connection.close();
	}

	public static void main(String[] args) throws Exception {
		Producer p = new Producer();
		p.send();
	}

}




public class Comsumer {

	// private final String selector_0 = "age>30";
	// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
	private final String selector_1 = "sex='w'";

	// 建立connectionFactory工厂对象
	private ActiveMQConnectionFactory connectionFactory;
	// 连接对象
	private Connection connection;
	// session对象
	private Session session;
	// 生产者
	private MessageConsumer consumer;
	// 目标地址
	private Destination destination;

	public Comsumer() {
		this.connectionFactory = new ActiveMQConnectionFactory();
		try {
			this.connection = connectionFactory.createConnection("fu", "fu");
			this.connection.start();
			this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
			this.destination = this.session.createQueue("first");
			// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
			this.consumer = session.createConsumer(destination, selector_1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void recever() throws Exception {
		// 消息异步接收:当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息
		this.consumer.setMessageListener(new Listener());

	}

	class Listener implements MessageListener {

		public void onMessage(Message message) {
			try {
				if (message instanceof TextMessage) {

				} else if (message instanceof MapMessage) {
					MapMessage m = (MapMessage) message;
					System.out.println(m.toString());
					System.out.println(m.getString("name"));
					System.out.println(m.getString("age"));
					// 手工签收消息
					m.acknowledge();
				}

			} catch (JMSException e) {
				e.printStackTrace();
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Comsumer c = new Comsumer();
		c.recever();
	}
}


   
分享到:
评论

相关推荐

    activeMQ发送消息返回消息

    同时,JMS规范也提供了许多高级特性,如消息选择器、消息组、消息优先级等,这些都可以根据业务需求进行灵活运用。 通过以上讲解,我们可以看出,ActiveMQ的发送消息并接收返回信息涉及到JMS规范的多个层面,包括...

    spring + activemq 消息sample

    6. **消息选择器**:可以通过设置消息选择器来过滤接收的消息,只处理满足特定条件的消息。 7. **消息持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,未被消费的消息也不会丢失。 8. **监控与管理*...

    结合Spring2.0和ActiveMQ进行异步消息调用

    例如,可以使用`<jms:listener-container>`元素配置一个ActiveMQ的连接工厂,然后定义消息监听器来处理接收到的消息。 ```xml <bean class="org.apache.activemq.ActiveMQConnectionFactory"> ``...

    activeMq消息队列demo

    - **消息选择器**:允许消费者基于特定条件筛选消息。 - **主题(Topics)和队列(Queues)的结合**:使用虚拟主题(Virtual Topics)实现。 - **事务支持**:确保消息的一致性。 - **网络连接**:多个ActiveMQ...

    ActiveMQ与spring集成实例之使用消息转换器

    **ActiveMQ与Spring集成实例——使用消息转换器** 在企业级应用开发中,消息队列(Message Queue,MQ)作为一种解耦和异步处理的重要工具,被广泛应用。Apache ActiveMQ 是一个开源的消息中间件,它支持多种消息...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    JMS之Spring +activeMQ实现消息队列

    JMS(Java Message Service)是Java平台中用于与MQ交互的标准API,它提供了一种可靠且可扩展的方式来发送和接收消息。在本篇讨论中,我们将关注如何利用Spring框架和ActiveMQ来实现JMS消息队列。 首先,我们需要...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    2. **会话与消息**:在连接建立后,可以创建会话来处理消息。会话是线程安全的,允许并发执行多个操作。消息分为多种类型,包括文本、对象、流和文件等,它们可以通过生产者发送到队列或主题,也可以由消费者接收。 ...

    python 发送和接收ActiveMQ消息的实例

    在本文中,我们将深入探讨如何使用Python与ActiveMQ进行交互,特别关注如何发送和接收消息。ActiveMQ是一款开源的消息中间件,由Java开发,它支持多种协议,包括AMQP、MQTT、OpenWire和Stomp。在Python环境中,通常...

    ActiveMQ消息中间件面试专题.zip

    - **消息选择器**:如何通过消息选择器筛选特定的消息进行消费。 - **网络拓扑和故障转移**:理解网络故障时的集群和复制策略,以及如何配置。 - **安全性设置**:如何设置用户权限,以及使用SSL/TLS进行安全通信...

    jms+activeMQ研究文档

    MessageProducer对象允许指定消息的传送模式、优先级和有效期,而MessageConsumer可以使用消息选择器,选择符合特定标准的消息。此外,消费者支持同步和异步两种接收消息的方式,异步接收通常通过注册Message...

    ActiveMQ的安装与使用

    5. **消息筛选和分组**:通过消息选择器,可以实现对消息的筛选和分组,提高处理效率。 6. **消息优先级和时间戳**:消息可以设置优先级,确保关键消息优先处理;同时,消息带有时间戳,便于跟踪和管理。 **...

    activemq-example

    4. 选择器:介绍如何使用消息选择器,让消费者只接收满足特定条件的消息。 5. 安全与认证:展示如何配置ActiveMQ的用户权限和身份验证。 6. 路由与过滤:探讨消息路由策略,如网络连接、虚拟主题以及使用DLQ(Dead...

    spring 与ACTIVEMQ整合

    3. **消息选择器**:允许消费者根据特定条件选择接收消息,提高消息处理效率。 4. **Spring Boot集成**:在Spring Boot项目中,可以通过自动配置简化整合过程。 总结,Spring与ActiveMQ的整合能帮助我们构建高可用...

    ActiveMQ+Core+API

    - **消息选择器**:允许消费者基于消息的属性过滤接收到的消息。 - **预定义目的地**:通过配置文件预先定义队列和主题,简化代码。 - **事务支持**:使用JMS事务确保消息的一致性。 - **消息优先级**:指定消息...

    C#,activemq,mq

    对于消费者,你可以设置一个消息选择器,以便只接收满足特定条件的消息。 ActiveMQ提供了多种消息类型,包括TextMessage(文本消息)、ObjectMessage(序列化对象消息)、BytesMessage(二进制消息)等。每种消息...

    activemq5.5.1 Spring模板

    2. 消息监听器:使用`MessageListener`接口,可以实现异步接收消息,提高系统性能。 3. 事务支持:对于需要确保消息完整性的场景,可以开启JMS事务,保证消息的原子性。 4. 持久化与性能:根据需求调整ActiveMQ的...

    activemq_basic.rar

    6. **消息 selectors**:解释如何使用消息选择器来过滤接收到的消息,只处理满足条件的消息。 7. **事务与应答**:展示如何在JMS中使用事务保证消息的一致性,以及消费者确认(acknowledgement)机制。 8. **持久...

    activemq安装包与demo

    Apache ActiveMQ 是一个流行的开源消息代理,它实现了 Java Message Service (JMS) 规范,允许应用程序之间进行异步通信。在本教程中,我们将详细介绍如何安装 ActiveMQ,以及如何使用 JMS 进行开发,包括点对点...

    apache-activemq-5.11

    10. **消息过滤与选择器**:通过使用消息选择器,消费者可以选择性地接收特定类型的消息,增强了消息处理的灵活性。 总之,Apache ActiveMQ 5.11版本提供了强大的消息中间件功能,支持各种消息模型、持久化策略和...

Global site tag (gtag.js) - Google Analytics