`
longgangbai
  • 浏览: 7332286 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ BlobMessage发送消息的测试

阅读更多

       最近项目中将发送文件,但是测试采用字节流ByteMessage和BlobMessage ,StreamMessage的学习应用,并测试。

下面是BlobMessage测试的情况。但是在测试的时候发送word和pdf文档传送之后,不能打开。不知道为什么要,具体怎么处理之后可以。但是发送文本类文件没有任何问题。

消息的发送者:

package easyway.app.activemq.demo.fileserver;
import java.io.File;

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
/**
 * 消息的发送者
 * 备注在测试Word文档和pdf文档时不能打开,但是发送文本类文件可以,没有任何问题。
 * @author longgangbai
 *
 */
public class ActiveMQFileServerSender {


		private String user = ActiveMQConnection.DEFAULT_USER;
		private String password = ActiveMQConnection.DEFAULT_PASSWORD;
		private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
		private String subject = "Blob Queue";
		private Destination destination = null;
		private ActiveMQConnection connection = null;
		private ActiveMQSession session = null;
		private MessageProducer producer = null;

		// 初始化
		private void initialize() throws JMSException, Exception {
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					user, password, url);
			connection = (ActiveMQConnection) connectionFactory.createConnection();
			connection.setCopyMessageOnSend(false);
			session = (ActiveMQSession) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue(subject);
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		}

		// 发送消息
		public void produceMessage(File file) throws JMSException, Exception {
			initialize();
			BlobMessage msg = session.createBlobMessage(file);
			connection.start();
			System.out.println("Producer:->Sending message: " + file.getName());
			producer.send(msg);
			System.out.println("Producer:->Message sent complete!");
		}

		// 关闭连接
		public void close() throws JMSException {
			System.out.println("Producer:->Closing connection");
			if (producer != null)
				producer.close();
			if (session != null)
				session.close();
			if (connection != null)
				connection.close();
		}
	}

 

 

消息的接受者:

package easyway.app.activemq.demo.fileserver;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStream;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.command.ActiveMQBlobMessage;
/**
 * 消息的接受者
 * @author longgangbai
 *
 */
public class ActiveMQFileServerReceiver {

		private String user = ActiveMQConnection.DEFAULT_USER;
		private String password = ActiveMQConnection.DEFAULT_PASSWORD;
		private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
		private String subject = "Blob Queue";
		private Destination destination = null;
		private String filepath="D://photo_album_app_guide.bak.pdf";
		private ActiveMQConnection connection = null;
		private ActiveMQSession session = null;
		private MessageConsumer consumer = null;

		// 初始化
		private void initialize() throws JMSException, Exception {
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					user, password, url);
			connection = (ActiveMQConnection) connectionFactory.createConnection();
			connection.setCopyMessageOnSend(false);
			session = (ActiveMQSession) connection.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue(subject);
			consumer = session.createConsumer(destination);
		}

		//接受 消息
		public void receive(String filename) throws JMSException, Exception {
			initialize();
			this.filepath=filename;
			connection.start();
			Message msg =(BlobMessage)consumer.receive();
		    if(msg instanceof ActiveMQBlobMessage){}
		        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
		        StringBuilder b = new StringBuilder();
		        int i = input.read();
		        while (i != -1) {
		            b.append((char) i);
		            i = input.read();
		        }
		        input.close();
		        File uploaded = new File(filepath); 
		        BufferedWriter bw=new BufferedWriter(new FileWriter(uploaded));
		        bw.write(b.toString());
		        System.out.println("consumer receiver sucessful .....");
		        bw.flush();
		        bw.close();
		}

		// 关闭连接
		public void close() throws JMSException {
			System.out.println("Producer:->Closing connection");
			if (consumer != null)
				consumer.close();
			if (session != null)
				session.close();
			if (connection != null)
				connection.close();
		}
	}

 

 

测试代码:

package easyway.app.activemq.demo.fileserver;

import java.io.File;
/**
 * BlobMessage的使用
 * @author longgangbai
 *
 */
public class ActiveMQFileServerTest {


		/**
		 * 
		 * @param args
		 */
		public static void main(String[] args) throws Exception {

			ActiveMQFileServerSender producer = new ActiveMQFileServerSender();
			ActiveMQFileServerReceiver consumer = new ActiveMQFileServerReceiver();

			String fileName = "D://BlockingClient.java";
			String receiverFilename="D://BlockingClient_bak.java";
			File file = new File(fileName);
			producer.produceMessage(file);
            consumer.receive(receiverFilename);
			// 延时5000毫秒之后停止接受消息
			Thread.sleep(5000);
			// 开始监听
			producer.close();
			// 延时500毫秒之后发送消息
			Thread.sleep(2000);
			consumer.close();
		}
	}

 

分享到:
评论
2 楼 longgangbai 2011-10-23  
xuyangcn 写道
接收二进制文件时不应该用StringBuilder或者StringBuffer之类的,需要用byte数组直接写入FileOutputStream。看看字节和字符的区别。

我明白的意思,其实是我习惯了使用BufferedWriter和BufferedReader了,也是为了我测试方便才这样写的,O(∩_∩)O哈哈~
1 楼 xuyangcn 2011-10-18  
接收二进制文件时不应该用StringBuilder或者StringBuffer之类的,需要用byte数组直接写入FileOutputStream。看看字节和字符的区别。

相关推荐

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    activeMQ发送消息返回消息

    在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...

    Jmeter测试ActiveMQ性能报告

    主要使用JMeter进行压力测试,通过模拟大量并发用户发送和接收消息,评估ActiveMQ的处理能力和响应时间。 2.6 测试时间和地点 测试在特定日期和地点进行,确保了测试条件的一致性。 3. 测试人员 测试团队由具备...

    activeMQ收发工具.rar

    3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发工具发送和接收这些消息。 4. **队列与主题**:熟悉ActiveMQ中的队列(Queue)和主题(Topic)...

    ActiveMQ连接和使用测试工程

    5. **测试用例分析**:在`service-check`这个文件中,很可能包含了测试连接、发送和接收消息的代码。这些测试用例将帮助我们验证ActiveMQ的正确配置以及客户端代码的功能性。例如,它们可能创建一个生产者发送消息到...

    ActiveMQ接受和发送工具.rar

    在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个图形界面或者命令行工具,使得用户可以更直观地发送测试消息到ActiveMQ服务器,查看消息队列的状态,以及接收消息。使用这些工具,开发者可以快速验证ActiveMQ...

    springboot集成activemq实现消息接收demo

    为了测试我们的实现,我们需要发送一条消息到`messageQueue`。这可以通过创建一个`MessageSender`类完成,如下所示: ```java import org.springframework.beans.factory.annotation.Autowired; import org.spring...

    activemq消息发送和监听

    项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。...通过这些步骤,可以测试 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。

    ActiveMQ消息发送接收封装实现及定时测试.

    实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了

    mqttjs(activemq测试工具)

    总之,`mqttjs`作为ActiveMQ的测试工具,可以帮助开发者轻松创建MQTT客户端,进行各种消息交互测试。结合ActiveMQ的丰富功能和可配置性,我们可以对消息中间件进行详尽的验证和调优,确保其在实际应用中的稳定性和...

    SpringBoot整合ActiveMQ(消息中间件)实现邮件发送功能

    在本项目中,"SpringBoot整合ActiveMQ(消息中间件)实现邮件发送功能"是一个典型的企业级应用示例,它展示了如何将SpringBoot框架与Apache ActiveMQ集成,以实现基于消息队列的邮件发送服务。下面我们将详细探讨这个...

    activeMQ-5.5 入门测试

    - 测试脚本:可能用于自动化测试消息传递的正确性和性能。 通过研究这些源码,你可以深入了解ActiveMQ的工作原理,以及如何在实际项目中应用。同时,你可以通过修改源码,实现更复杂的场景,例如消息优先级、事务...

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    ActiveMQ与Zookeeper集群测试代码

    标题中的“ActiveMQ与Zookeeper集群测试代码”指的是一个实验或示例项目,旨在演示如何结合这两个组件来构建高可用的消息传递环境。Zookeeper在这里的角色可能是用来管理ActiveMQ集群的状态,实现节点间的选举和故障...

    测试activeMQ的java程序

    生产者负责创建消息并将其发送到ActiveMQ服务器,而消费者则负责监听并处理来自服务器的消息。这通常涉及以下步骤: 1. 添加ActiveMQ依赖:在项目中添加ActiveMQ的库依赖,以便能够使用其提供的API。 2. 创建...

    ActiveMQ消息中间件的测试案例.zip

    ActiveMQ消息中间件的测试案例

Global site tag (gtag.js) - Google Analytics