`
Riddick
  • 浏览: 640134 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

ActiveMQ5.2发送和接受BlobMessage

    博客分类:
  • JMS
阅读更多

JMS消息生产者:

import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageSendTest {
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
	private String subject = "Blob Queue";
	private Destination destination = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private MessageProducer producer = null;

	// 初始化
	private void initialize() throws JMSException, Exception {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		connection = (ActiveMQConnection) connectionFactory.createConnection();
		/*
		 * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
		 * (default) the uploader is lost in translation ;)
		 * !!!!!!!!!!!!!!!!!!!!!!!!!
		 */
		connection.setCopyMessageOnSend(false);
		session = (ActiveMQSession) connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue(subject);
		// destination = session.createTopic(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	}

	// 发送消息
	public void produceMessage(File file) throws JMSException, Exception {
		initialize();
		BlobMessage msg = session.createBlobMessage(file);
		connection.start();
		System.out.println("Producer:->Sending message: " + file.getName());
		producer.send(msg);
		System.out.println("Producer:->Message sent complete!");
	}

	// 关闭连接
	public void close() throws JMSException {
		System.out.println("Producer:->Closing connection");
		if (producer != null)
			producer.close();
		if (session != null)
			session.close();
		if (connection != null)
			connection.close();
	}
}

 

JMS消息消费者:

package iprai.ace.activemq;

import java.io.File;

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageSendTest {

	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
	private String subject = "Blob Queue";
	private Destination destination = null;
	private ActiveMQConnection connection = null;
	private ActiveMQSession session = null;
	private MessageProducer producer = null;

	// 初始化
	private void initialize() throws JMSException, Exception {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		connection = (ActiveMQConnection) connectionFactory.createConnection();
		/*
		 * !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
		 * (default) the uploader is lost in translation ;)
		 * !!!!!!!!!!!!!!!!!!!!!!!!!
		 */
		connection.setCopyMessageOnSend(false);
		session = (ActiveMQSession) connection.createSession(false,
				Session.AUTO_ACKNOWLEDGE);
		destination = session.createQueue(subject);
		// destination = session.createTopic(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	}

	// 发送消息
	public void produceMessage(File file) throws JMSException, Exception {
		initialize();
		BlobMessage msg = session.createBlobMessage(file);
		connection.start();
		System.out.println("Producer:->Sending message: " + file.getName());
		producer.send(msg);
		System.out.println("Producer:->Message sent complete!");
	}

	// 关闭连接
	public void close() throws JMSException {
		System.out.println("Producer:->Closing connection");
		if (producer != null)
			producer.close();
		if (session != null)
			session.close();
		if (connection != null)
			connection.close();
	}
}

 

测试代码:

import java.io.File;

public class BlobMessageTest {

	/**
	 * topic方式,必须先启动消费者,然后是生产者,否则接收不到消息。 queue方式,最好先启动生产者,然后启动消费者,否则也容易收不到消息。
	 * 
	 * @param args
	 */
	public static void main(String[] args) throws Exception {

		BlobMessageSendTest producer = new BlobMessageSendTest();
		BlobMessageReceiveTest consumer = new BlobMessageReceiveTest();

		String fileName = "D:/装Win7后装XP.txt";
		// String fileName = "d:/JAVA+开发视频会议系统详细设计.doc";
		File file = new File(fileName);
		producer.produceMessage(file);

		producer.close();
		// 延时500毫秒之后停止接受消息
		Thread.sleep(2000);
		// 开始监听
		consumer.consumeMessage();
		// 延时500毫秒之后发送消息
		Thread.sleep(2000);
		consumer.close();
	}
}

 

分享到:
评论

相关推荐

    activeMQ5.2的jar包及使用实例

    activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。

    ActiveMQ 5.2指导手册

    其中,消息是应用程序之间传递的数据单元,客户端可以通过发送消息和接收消息的方式进行通信。 为什么使用ActiveMQ?ActiveMQ的主要优点在于它的高性能、可伸缩性、高可用性以及它的标准化。它支持多种消息协议,如...

    ActiveMQ接受和发送工具.rar

    这个"ActiveMQ接受和发送工具.rar"压缩包包含了用于与ActiveMQ交互的实用工具,方便用户进行消息的接收和发送操作。 在使用ActiveMQ时,了解以下几个关键知识点是至关重要的: 1. **Java Message Service (JMS)**...

    activemq消息的发送与接受封装的工具类

    activemq消息的发送与接受封装的工具类,只要你导入jar包

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    activeMQ收发工具.rar

    ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...

    activemq消息发送和监听

    项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...

    activemq 发送,接受,监控样例程序

    activemq 发送,接受,监控样例程序

    activeMQ发送消息返回消息

    在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...

    ActiveMQ消息发送接收封装实现及定时测试.

    实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...

    ActiveMQ延迟发送

    综上所述,ActiveMQ的延迟发送功能是通过设置消息头的`JMSTimestamp`字段来实现的,而搭建集群则涉及多个Broker节点的配置,包括数据持久化、网络连接、负载均衡和容错机制等方面。在实际项目中,结合`...

    ActiveMQ发送和接收protobuf协议消息的实例(精心整理,亲测可用)

    在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...

    activeMQ消息发送过程与原理浅析

    【描述】:本文旨在解析activeMQ消息中间件在发送消息过程中的工作原理,包括同步发送和异步发送两种方式。activeMQ的消息发送涉及到客户端与broker之间的交互,对于消息的可靠性和性能有着直接影响。 【标签】:...

    接受ActiveMQ信息,通过openfire公告发送给指定用户

    总结一下,实现“接受ActiveMQ信息,通过Openfire公告发送给指定用户”的过程主要包括以下步骤: 1. 配置ActiveMQ服务器并创建JMS消费者来接收消息。 2. 解析接收到的XML消息,提取出公告的相关信息。 3. 使用Java...

    ActiveMq+SpringMVC实现邮件异步发送

    ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...

    springboot集成activemq实现消息接收demo

    此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...

    SpringBoot整合ActiveMQ(消息中间件)实现邮件发送功能

    总结,这个项目实例展示了如何利用SpringBoot的便捷性和ActiveMQ的高效通信能力,实现一个异步的邮件发送服务。通过阅读和理解这个项目,开发者可以学习到SpringBoot的整合技巧,以及消息中间件在实际业务场景中的...

Global site tag (gtag.js) - Google Analytics