这里使用的 MQ 中间件是开源的 ActiveMQ,我们没有采用 BytesMessage 来按字节传送文件,而是 ActiveMQ 为我们提供了 org.apache.activemq.BlobMessage,可以用它来传送大对象。org.apache.activemq.ActiveMQSession 中有以下几个创建 BlobMessage 对象的方法:
createBlobMessage(URL url)
createBlobMessage(URL url, boolean deletedByBroker)
createBlobMessage(File file)
createBlobMessage(InputStream in)
接收到 BlobMessage 消息后,可以调用其 getInputStream() 方法获得数据,然后写成磁盘文件,文件名、文件大小等可通过 Message 的 getXxxProperty("Property.Name") 取的。
注 意,传输入文件的时候,发送方创建 ConnectionFactory 时的 brokerURL 需要指定 jms.blobTransferPolicy.uploadUrl 或者jms.blobTransferPolicy.defaultUploadUrl 属性为 ActiveMQ 中 fileserver 应用的 URI,即指定传输 BlogMessage 的 BlobTransferPolicy 策略,参看 Configuring the BLOB Transfer Policy。
1. 启动 ActiveMQ
2. 编写发送文件的程序 FileSender.java
package org.laurel.jms;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.swing.JFileChooser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
/**
* 通过 ActiveMQ 发送文件的程序
*
* @author wb-liufei
*
*/
public class FileSender {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// 选择文件
JFileChooser fileChooser = new JFileChooser();
fileChooser.setDialogTitle("请选择要传送的文件");
if (fileChooser.showOpenDialog(null) != JFileChooser.APPROVE_OPTION) {
return;
}
File file = fileChooser.getSelectedFile();
// 获取 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");
// 创建 Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建 Session
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 Destination
Destination destination = session.createQueue("File.Transport");
// 创建 Producer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置为非持久性
// 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件
// 构造 BlobMessage,用来传输文件
//如果设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 消息持久性的话,
//发送方传文件的时候,接收方可以不在线,文件会暂存在 ActiveMQ 服务器上,等到接收程序上线后仍然可以收到发过来的文件。
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
System.out.println("开始发送文件:" + file.getName() + ",文件大小:" + file.length() + " 字节");
// 7. 发送文件
producer.send(blobMessage);
System.out.println("完成文件发送:" + file.getName());
producer.close();
session.close();
connection.close(); // 不关闭 Connection, 程序则不退出
}
}
3. 编写接收文件的程序 FileReceiver.java
package org.laurel.jms;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.swing.JFileChooser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
public class FileReciever {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// 获取 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建 Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建 Destinatione
Destination destination = session.createQueue("File.Transport");
// 创建 Consumer
MessageConsumer consumer = session.createConsumer(destination);
// 注册消息监听器,当消息到达时被触发并处理消息
consumer.setMessageListener(new MessageListener() {
// 监听器中处理消息
public void onMessage(Message message) {
if (message instanceof BlobMessage) {
BlobMessage blobMessage = (BlobMessage) message;
try {
String fileName = blobMessage.getStringProperty("FILE.NAME");
System.out.println("文件接收请求处理:" + fileName + ",文件大小:" + blobMessage.getLongProperty("FILE.SIZE")
+ " 字节");
JFileChooser fileChooser = new JFileChooser();
fileChooser.setDialogTitle("请指定文件保存位置");
fileChooser.setSelectedFile(new File(fileName));
if (fileChooser.showSaveDialog(null) == JFileChooser.APPROVE_OPTION) {
File file = fileChooser.getSelectedFile();
OutputStream os = new FileOutputStream(file);
System.out.println("开始接收文件:" + fileName);
InputStream inputStream = blobMessage.getInputStream();
// 写文件,你也可以使用其他方式
byte[] buff = new byte[256];
int len = 0;
while ((len = inputStream.read(buff)) > 0) {
os.write(buff, 0, len);
}
os.close();
System.out.println("完成文件接收:" + fileName);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
4. 运行程序
先执行 FileReceiver 程序来监听消息
再执行发送程序 FileSender,将会提示你选择一个要传送的文件(上图左边),确定后就会把文件发送到 ActiveMQ 服务器上
接收端 FileReceiver 监听到有文件传过来的消息后,会自动弹出保存文件的对话框,要你选择保存位置(上图右边),文件名能保持一致。确定后就开始接收文件,存到指定的位置。
如果设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 消息持久性的话,发送方传文件的时候,接收方可以不在线,文件会暂存在 ActiveMQ 服务器上,等到接收程序上线后仍然可以收到发过来的文件。
分享到:
相关推荐
本教程将围绕"使用JMS操作ActiveMQ"这一主题,详细阐述如何通过JMS与ActiveMQ进行交互,包括创建生产者、消费者以及消息的发送和接收。 首先,我们需要理解JMS的基本概念。在JMS中,消息生产者(Producer)负责创建...
ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,提供了一个高效、可靠的中间件服务,用于处理消息队列。本文将深入探讨如何在Spring 3.0中整合JMS与ActivemQ,以及它们在实际应用中的关键知识点。 首先,...
本实例将带领初学者了解如何使用JMS和ActiveMQ构建一个简单的消息传递系统。首先,我们需要安装以下软件包: 1. **tokyocabinet-1.4.45.tar.gz**: 这是一个高效的键值存储库,通常用于数据缓存和日志记录。在...
ActiveMQ的主要特点包括支持多种编程语言和协议的客户端、完全支持JMS 1.1和J2EE 1.4规范、对Spring的支持、支持多种传输协议以及持久化和事务处理能力。 在实际的项目中,消息队列经常被用于将一些耗时的操作,如...
Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的消息传递和队列管理。...
**Apache ActiveMQ与JMS简介** Apache ActiveMQ是基于Java消息服务(Java Message Service,简称...使用这些文件,开发者可以快速导入到IDE中,编译并运行示例代码,了解和学习如何在实际项目中使用ActiveMQ工具类。
在IT行业中,SpringMVC和JMS(Java Message Service)是两种非常重要的技术,而ActiveMQ则是实现JMS规范的开源消息代理。本项目结合这三者,提供了一个整合的Demo,旨在帮助开发者理解如何在SpringMVC应用中集成JMS...
文档《服务管理.doc》和《消息管理.ppt》可能涵盖了服务管理和消息管理的细节,包括服务的注册、发现、监控和管理,以及如何使用JMS和ActiveMQ进行消息传递。这些文档可能详细阐述了服务总线管理系统的设计和流程,...
在IT行业中,Spring框架...Spring简化了JMS的集成和管理,ActiveMQ作为强大的消息中间件,保证了消息的稳定传输。通过理解和掌握这一技术栈,开发者可以构建出高可用、松耦合的应用系统,提高系统的整体性能和稳定性。
Spring简化了开发流程,JMS提供了消息传输的标准,ActiveMQ保证了消息传递的稳定,而Tomcat则提供了运行环境。通过这个组合,开发者可以构建出高并发、高可用的系统,满足大规模应用的需求。在实际工作中,学习并...
在"Apache Camel JMS ActiveMQ"的使用样例中,我们有两个主要的场景: 1. **从本地读取信息推送到MQ中**:这一部分涉及到了Apache Camel的数据交换模型,即从本地源(可能是文件、数据库或任何其他数据源)读取信息...
综上所述,这个例子提供了一个全面的实践,涵盖了Spring MVC、JMS和ActiveMQ的集成,帮助开发者理解如何在实际项目中使用消息中间件实现异步处理和解耦。通过学习这个示例,你可以了解到如何在Spring MVC环境中配置...
本压缩包中的"JMS ActiveMQ演示代码"提供了一个具体的实例,展示了如何在实际项目中使用ActiveMQ来实现消息传递。这个代码可以直接放入工程中运行,帮助开发者理解和实践JMS与ActiveMQ的结合使用。 **点对点(Point...
ActiveMQ是Apache软件基金会开发的一款开源Java消息服务(JMS)提供商,它实现了JMS规范,为分布式系统提供了可靠的消息传输。本篇文章主要介绍JMS的基础知识以及如何在Linux环境下安装ActiveMQ。 首先,让我们详细...
将ActiveMQ与Spring整合,可以方便地在Spring应用中使用JMS,实现消息驱动的架构。 本文将深入探讨ActiveMQ与Spring整合的关键知识点: 1. **Spring对JMS的支持**: Spring提供了`org.springframework.jms`包,该...
将JMS、ActiveMQ和Spring整合,首先需要在Spring配置文件中定义ConnectionFactory,这是与消息服务器建立连接的工厂类。接着,创建目的地(Destination)配置,包括队列(Queue)和主题(Topic)。然后,定义Message...
这个示例展示了如何在分布式系统中使用JMS进行异步通信,确保即使在组件之间出现故障时,数据也能安全地传输。在实际开发中,可以根据需求调整队列配置,如设置消息持久化、使用事务会话等,以实现更复杂的业务逻辑...
在本文中,我们将深入探讨如何在SpringBoot中使用JMS(Java Message Service)与ActiveMQ进行通信,包括两种主要的消息传输模式:点对点(生产者/消费者模式)和发布/订阅模式。 首先,你需要在本地安装并运行...