`

ActiveMQ(三)——ActiviteMQ实现方式之二:Queue

 
阅读更多

发送方代码:

package com.mycom.activemq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiviteMQ方式2:Queue方式
 * 消息发送方(生产者)
 * 
 * @author guweiqiang
 */
public class QueueSender {

	/**
	 * 发送消息
	 */
	public static void sendMessage(String brokerUrl, Map<String, Object> map) {
		// QueueConnectionFactory : 连接工厂
		QueueConnectionFactory queueConnectionFactory;

		// QueueConnection : JMS客户端到JMS Provider的连接
		QueueConnection queueConnection = null;

		// QueueSession : 发送/接收消息的会话
		QueueSession queueSession = null;

		// Queue : 消息队列
		Queue queue;

		// QueueSender : 消息发送者
		javax.jms.QueueSender queueSender;

		try {
			// 创建一个连接工厂QueueConnectionFactory实例
			queueConnectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);

			// 创建一个QueueConnection
			queueConnection = queueConnectionFactory.createQueueConnection();

			// 启动连接
			queueConnection.start();

			// 创建发送/接收消息的会话 QueueSession
			queueSession = queueConnection.createQueueSession(Boolean.TRUE,
					QueueSession.AUTO_ACKNOWLEDGE);

			// 创建消息队列 Queue
			queue = queueSession.createQueue("SecondQueue");

			// 创建消息发送者 QueueSender
			queueSender = queueSession.createSender(queue);
			queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			// 准备工作已完成,可以开始发送消息了
			// 发送消息
			MapMessage mapMsg = queueSession.createMapMessage();
			mapMsg.setInt("ID", (Integer) map.get("id"));
			mapMsg.setString("NAME", (String) map.get("name"));
			mapMsg.setInt("AGE", (Integer) map.get("age"));
			System.out.println(mapMsg);
			queueSender.send(mapMsg);

			// 提交会话
			queueSession.commit();

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			// 释放资源
			if (queueSession != null) {
				try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (queueConnection != null) {
				try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 测试
	 */
	public static void main(String[] args) {
		String brokerUrl = "tcp://127.0.0.1:61616";
		Map<String, Object> map = new HashMap<String, Object>();
		map.put("id", 1002);
		map.put("name", "guweiqiang2");
		map.put("age", 22);
		QueueSender.sendMessage(brokerUrl, map);
	}

}

 

 

接收方代码:

package com.mycom.activemq;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiviteMQ方式2:Queue方式
 * 消息接收方(消费者)
 * 
 * @author guweiqiang
 */
public class QueueReceiver {

	/**
	 * 接收消息
	 */
	public static void receiveMessage(String brokerUrl) {
		// QueueConnectionFactory : 连接工厂
		QueueConnectionFactory queueConnectionFactory;

		// QueueConnection : JMS客户端到JMS Provider的连接
		QueueConnection queueConnection = null;

		// QueueSession : 发送/接收消息的会话
		QueueSession queueSession = null;

		// Queue : 消息队列
		Queue queue;

		// QueueReceiver : 消息接收者
		javax.jms.QueueReceiver queueReceiver;

		try {
			// 创建一个连接工厂QueueConnectionFactory实例
			queueConnectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);

			// 创建一个QueueConnection
			queueConnection = queueConnectionFactory.createQueueConnection();

			// 启动连接
			queueConnection.start();

			// 创建发送/接收消息的会话 QueueSession
			queueSession = queueConnection.createQueueSession(Boolean.TRUE,
					QueueSession.AUTO_ACKNOWLEDGE);

			// 创建消息队列 Queue
			queue = queueSession.createQueue("SecondQueue");

			// 创建消息接收者
			queueReceiver = queueSession.createReceiver(queue);

			// 准备工作已经完成,可以开始接收消息了(通过实现MessageListener的onMessage方法来接收消息)
			queueReceiver.setMessageListener(new MessageListener() {

				@Override
				public void onMessage(Message message) {
					if (message != null) {
						MapMessage mapMsg = (MapMessage) message;
						try {
							System.out.println("ID:" + mapMsg.getInt("ID")
									+ "\t NAME:" + mapMsg.getString("NAME")
									+ "\t AGE:" + mapMsg.getInt("AGE"));
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				}
			});

			Thread.sleep(1000 * 100); // 进程睡眠一段时间再关闭

			// 提交会话
			queueSession.commit();

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			// 释放资源
			if (queueSession != null) {
				try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (queueConnection != null) {
				try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 测试
	 */
	public static void main(String[] args) {
		String brokerUrl = "tcp://127.0.0.1:61616";
		QueueReceiver.receiveMessage(brokerUrl);
	}

}

 

 

启动ActiveMQ,再在本地执行上述发送方和接收方代码,运行结果如下:

发送方console:

ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {NAME=guweiqiang2, AGE=22, ID=1002} }

 

 

接收方console:

ID:1002	 NAME:guweiqiang2	 AGE:22

 

 

 

分享到:
评论

相关推荐

    ActiveMQ实战——实现一个简易版的聊天室

    在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...

    ActiveMQ 集群——JDBC Master Slave + Broker Cluster

    Broker Cluster 模式是 ActiveMQ 集群中的另一种实现方式。在这个模式下,我们可以将多个 Broker 服务器组合在一起,以提高系统的可扩展性和可靠性。 要配置 Broker Cluster 模式,我们需要配置每个 Broker 服务器...

    activeMQ三种收发消息方式

    在这个小例子中,我们将探讨ActiveMQ的三种主要的消息收发方式:点对点、发布/订阅和事务处理模式。 1. **点对点(Point-to-Point)模式**: 在点对点模式下,消息从一个生产者发送到一个队列,然后由一个或多个...

    activeMQ JMS 3种创建方式

    2. 查找MBean:在MBean浏览器中找到ActiveMQ相关的MBeans,如`org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=myQueue`。 3. 操作MBean:通过MBean的操作接口,可以...

    Apache ActiveMQ Queue Topic 详解

    - 或者通过命令行方式,在 JVM 中启动 ActiveMQ:`cd example` 后运行 `ant embedBroker`。 4. **管理界面**:通过浏览器访问 `http://localhost:8161/admin` 进入 ActiveMQ 的管理后台系统,可以查看和管理 ...

    ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ(csdn)————程序.pdf

    - **ActiveMQ**:Apache ActiveMQ 是一个开源的消息中间件,它实现了 Java Message Service (JMS) 规范,提供可靠的消息传递和队列管理。 - **JMS**:Java Message Service 是一个标准接口,用于在分布式环境中交换...

    spring 整合activemq实现自定义动态消息队列

    本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...

    springboot集成activemq实现消息接收demo

    activemq: broker-url: tcp://localhost:61616 user: admin password: admin ``` 配置完成后,我们创建一个`MessageReceiver`类来接收消息。这个类通常会实现`MessageListener`接口,这样可以监听消息队列中的...

    深入掌握JMS——ActiveMQ 十一章

    ### 深入掌握JMS——ActiveMQ 十一章 #### JMS基本概念与重要知识点解析 **JMS(Java Message Service)简介** JMS(Java消息服务)是一种广泛应用于企业级应用中的消息中间件协议,它为应用程序提供了一种高效、...

    Spring 实现远程访问详解——jms和activemq

    本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...

    java springboot整合activemq工程

    #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,...

    ActiveMQ实现

    在本项目中,我们涵盖了ActiveMQ的基本安装和使用,以及如何通过代码与之交互。 首先,让我们了解什么是ActiveMQ。ActiveMQ是一个高性能、可靠的、完全支持JMS 1.1规范的消息中间件。它提供了多种协议的支持,包括...

    activemq 配置说明与activemq入门讲解

    3. **存储策略**:ActiveMQ支持两种存储方式——内存存储和文件存储(默认是KahaDB)。内存存储速度快但不持久,文件存储持久但相对较慢。可以通过配置`&lt;persistenceAdapter&gt;`元素来切换。 4. **网络连接**:...

    ActiveMQ+zookeeper实现高可用和负载均衡(代码和测试)

    ### ActiveMQ+zookeeper实现高可用和负载均衡 #### 一、背景与目标 在现代分布式系统中,消息中间件如ActiveMQ扮演着重要的角色,它能够帮助应用之间进行可靠的消息传递。为了保证系统的稳定性和可靠性,通常需要...

    自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用

    本资源提供的内容是关于ActiveMQ的连接池实现,分为两部分:一是作者自己实现的ActiveMQ连接池,二是新版本ActiveMQ自带的连接池。连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁...

    ActiveMQ使用mqtt协议的实现发布消息的三种方式.txt

    java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    这通常包括`spring-boot-starter-activemq`和`org.apache.activemq:activemq-client`,以及可能的MQTT客户端库,如`org.eclipse.paho:org.eclipse.paho.client.mqttv3`。 2. 配置ActiveMQ:在`application....

Global site tag (gtag.js) - Google Analytics