`
longgangbai
  • 浏览: 7330377 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ 中ActiveMQBlobMessage的接收和发送

阅读更多

           在ActiveMQ中对比较大的消息采用一种ActiveMQBlobMessage方式发送的时候,因为可采用策略的不同而已使用HTTP协议字节流,文件系统的读文件,FTP协议的方式实现发送和接收文件。

上传和下载的数据流执行过程:

           在ActiveMQ中采用策略模式定义如下接口:

上传策略接口BlobUploadStrategy:

public interface BlobUploadStrategy {

    URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException;

    URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException;
}

 

下载策略接口BlobDownloadStrategy 如下:

public interface BlobDownloadStrategy {
    
    InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException;
    
    void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException;

}

 

通过文件方式实现上传和下载的策略为FileSystemBlobStrategy :

其中FileSystemBlobStrategy implements实现了 BlobUploadStrategy, BlobDownloadStrategy通过将源代码如下:

    protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException {
    	if (message.getURL() != null) {
    		try {
				return new File(message.getURL().toURI());
			} catch (URISyntaxException e) {
                                IOException ioe = new IOException("Unable to open file for message " + message);
                                ioe.initCause(e);
			}
    	}
        //replace all : with _ to make windows more happy
        String fileName = message.getJMSMessageID().replaceAll(":", "_"); 
        return new File(rootFile, fileName);        
        
    }

 

实现FTP上传和下载的策略的是FTPBlobUploadStrategy,FTPBlobDownloadStrategy实现。实质是采用commons-net包中的FTPClient实现相关的功能。

接口实现如下:

public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownloadStrategy 

 

public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStrategy 

 

上传和下载均继承自FTPStrategy的实现如下:

package org.apache.activemq.blob;

import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;

import javax.jms.JMSException;

import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.net.ftp.FTPClient;

public class FTPStrategy {

    protected BlobTransferPolicy transferPolicy;
    protected URL url;
    protected String ftpUser = "";
    protected String ftpPass = "";

    public FTPStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException {
        this.transferPolicy = transferPolicy;
        this.url = new URL(this.transferPolicy.getUploadUrl());
    }
    
    protected void setUserInformation(String userInfo) {
        if(userInfo != null) {
            String[] userPass = userInfo.split(":");
            if(userPass.length > 0) this.ftpUser = userPass[0];
            if(userPass.length > 1) this.ftpPass = userPass[1];
        } else {
            this.ftpUser = "anonymous";
            this.ftpPass = "anonymous";
        }
    }
    
    protected FTPClient createFTP() throws IOException, JMSException {
        String connectUrl = url.getHost();
        setUserInformation(url.getUserInfo());
        int port = url.getPort() < 1 ? 21 : url.getPort();
        
        FTPClient ftp = new FTPClient();
        try {
            ftp.connect(connectUrl, port);
        } catch(ConnectException e) {
            throw new JMSException("Problem connecting the FTP-server");
        }
        if(!ftp.login(ftpUser, ftpPass)) {
            ftp.quit();
            ftp.disconnect();
            throw new JMSException("Cant Authentificate to FTP-Server");
        }
        return ftp;
    }
    
}

 

基于HTTP协议的默认

public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUploadStrategy 

 

public class DefaultBlobDownloadStrategy extends DefaultStrategy implements BlobDownloadStrategy 

 

 

部分源代码如下:

public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException {
        return uploadStream(message, new FileInputStream(file));
    }

    public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException {
        URL url = createMessageURL(message);

        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        connection.setRequestMethod("PUT");
        connection.setDoOutput(true);

        // use chunked mode or otherwise URLConnection loads everything into
        // memory
        // (chunked mode not supported before JRE 1.5)
        connection.setChunkedStreamingMode(transferPolicy.getBufferSize());

        OutputStream os = connection.getOutputStream();

        byte[] buf = new byte[transferPolicy.getBufferSize()];
        for (int c = fis.read(buf); c != -1; c = fis.read(buf)) {
            os.write(buf, 0, c);
            os.flush();
        }
        os.close();
        fis.close();

        if (!isSuccessfulCode(connection.getResponseCode())) {
            throw new IOException("PUT was not successful: " + connection.getResponseCode() + " "
                                  + connection.getResponseMessage());
        }

        return url;
    }

 

分享到:
评论

相关推荐

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...

    springboot集成activemq实现消息接收demo

    本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml &lt;groupId&gt;org.spring...

    activemq 通过ajax发送接收消息简单例子

    在这个简单的例子中,我们将探讨如何利用Ajax与ActiveMQ交互,发送和接收消息。首先,你需要在服务器端设置一个ActiveMQ实例,可以通过下载并安装ActiveMQ来完成。安装完成后,启动ActiveMQ服务,这通常会开启一个...

    ActiveMQ发送和接收protobuf协议消息的实例(精心整理,亲测可用)

    在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...

    activeMQ收发工具.rar

    ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...

    ActiveMQ接受和发送工具.rar

    在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个图形界面或者命令行工具,使得用户可以更直观地发送测试消息到ActiveMQ服务器,查看消息队列的状态,以及接收消息。使用这些工具,开发者可以快速验证ActiveMQ...

    ActiveMQ消息发送接收封装实现及定时测试.

    实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    ActiveMq+SpringMVC实现邮件异步发送

    ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...

    activemq 接收文件流 C#

    3. 创建会话:在连接上创建`ISession`对象,会话是执行消息发送和接收操作的上下文。 4. 创建消费者:创建一个`IMessageConsumer`,它是接收消息的对象,通常会指定一个目的地(如队列或主题)。 5. 接收消息:调用...

    activemq-cpp发送接收消息,消息过滤器

    在本文中,我们将深入探讨如何使用`activemq-cpp`库在C++环境中发送和接收消息,并利用消息过滤器来实现特定的消息处理。`activemq-cpp`是Apache ActiveMQ的一个C++客户端,它提供了与ActiveMQ服务器进行交互的能力...

    WebSocket协议接收ActiveMQ

    4. 数据交换:客户端和服务器之间可以通过发送和接收WebSocket帧来交换消息。ActiveMQ会将内部的消息格式转换为WebSocket帧,并反之亦然。 5. 关闭连接:当不再需要连接时,客户端或服务器可以发送关闭帧来终止连接...

    activeMQ发送消息返回消息

    在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...

    activemqactivemq

    在“MQClient”文件中,我们可以期待看到具体的客户端代码示例,这些示例可能包括连接到ActiveMQ服务器、创建Session、发送和接收消息等操作。 ActiveMQ还支持多种协议,如OpenWire、AMQP、STOMP、MQTT和WS,这使得...

    AMQ接收与发送

    4. 运行客户端代码,启动发送端和接收端。 ### 7. 性能优化与安全 为了确保高效稳定地运行,ActiveMQ提供了多种性能优化选项,如调整内存使用、设置缓存大小、启用批量发送等。同时,对于安全性,ActiveMQ支持SSL/...

    Spring+ActiveMQ消息队列+前台接收消息

    6. **事务管理**:Spring和ActiveMQ可以结合使用JTA事务,确保消息发送和业务操作的原子性。当开启JTA事务时,如果业务操作失败,消息也会被回滚,不会被发送到队列。 **前台接收消息** 在Web应用中,前台接收消息...

    使用WebSocket协议接收ActiveMQ消息

    ActiveMQ,作为一款流行的开源消息中间件,也支持WebSocket协议,使得客户端可以通过WebSocket接口来接收和发送消息。 ActiveMQ是Apache软件基金会开发的消息队列产品,它遵循开放标准,如JMS(Java Message ...

    spring使用activeMQ实现消息发送

    通过使用`JmsTemplate`类,我们可以方便地发送和接收消息。 1. **配置ActiveMQ**:在开始之前,我们需要在本地或者远程部署一个ActiveMQ服务器。配置文件通常为`activemq.xml`,在这里可以设置broker(消息代理)的...

Global site tag (gtag.js) - Google Analytics