`

ActiveMQ之三 -- 使用ActiveMQ来传送文件

 
阅读更多

这个方法还有待研究,目前还有如下几个疑点:
1. ActiveMQ 报出这样的信息:

INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info

 

2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?

 

使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。

客户端收到内容后,根据命令将内容合并到一个文件中。  

 

package org.apache.activemq.exchange.file;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    /**
     * @param args
     */
    public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("EXCHANGE.FILE");

        MessageConsumer consumer = session.createConsumer(destination);

        boolean appended = false;
        try {
            while (true) {
                Message message = consumer.receive(5000);
                if (message == null) {
                    continue;
                }

                if (message instanceof StreamMessage) {
                    StreamMessage streamMessage = (StreamMessage) message;
                    String command = streamMessage.getStringProperty("COMMAND");
                    
                    if ("start".equals(command)) {
                        appended = false;
                        continue;
                    }

                    if ("sending".equals(command)) {
                        byte[] content = new byte[4096];
                        String file_name = message.getStringProperty("FILE_NAME");
                        BufferedOutputStream bos = null;
                        bos = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
                        if (!appended) {
                            appended = true;
                        }
                        while (streamMessage.readBytes(content) > 0) {
                            bos.write(content);
                        }
                        bos.close();
                        continue;
                    }

                    if ("end".equals(command)) {
                        appended = false;
                        continue;
                    }
                }
            }
        } catch (JMSException e) {
            throw e;
        } finally {
            if (connection != null) {
                connection.close();
            }
        }

    }

}

 

发送端将文件分包,逐次发送到客户端  

package org.apache.activemq.exchange.file;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publisher {

    public static String FILE_NAME = "01.mp3";
    
    public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("EXCHANGE.FILE");        
        MessageProducer producer = session.createProducer(destination);
        long time = System.currentTimeMillis();
        
        //通知客户端开始接受文件
        StreamMessage message = session.createStreamMessage();
        message.setStringProperty("COMMAND", "start");
        producer.send(message);
        
        //开始发送文件
        byte[] content = new byte[4096];
        InputStream ins = Publisher.class.getResourceAsStream(FILE_NAME);
        BufferedInputStream bins = new BufferedInputStream(ins);
        while (bins.read(content) > 0) {
            //
            message = session.createStreamMessage();
            message.setStringProperty("FILE_NAME", FILE_NAME);
            message.setStringProperty("COMMAND", "sending");
            message.clearBody();
            message.writeBytes(content);
            producer.send(message);
        }
        bins.close();
        ins.close();
        
        //通知客户端发送完毕
        message = session.createStreamMessage();
        message.setStringProperty("COMMAND", "end");
        producer.send(message);
        
        connection.close();
        
        System.out.println("Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
    }
}
 

 

分享到:
评论
2 楼 luotianwen456123 2015-01-12  
文件大小有限制吗
1 楼 hotbain 2013-01-13  
谢谢分享,值得收藏。又学了好多啊! 

相关推荐

    apache-activemq-5.9.0-bin

    6. **网络传输优化**:ActiveMQ使用高效的TCP/IP连接,并支持NIO(非阻塞I/O)和TCP套接字压缩,以提高网络传输效率。 7. **管理工具**:压缩包内的管理工具,如“bin”目录下的控制台应用程序,允许用户监控和管理...

    activemq-cpp-library-3.9.5-src.zip

    在使用ActiveMQ-CPP库之前,需要将其源代码编译成库文件。这通常涉及到配置CMakeLists.txt文件,设置依赖项,并执行`cmake`和`make`命令。完成编译后,将库链接到你的C++项目中,即可开始使用提供的API。 6. **...

    apache-activemq-5.8.0-bin.zip

    - ActiveMQ支持用户认证和授权,可以通过XML配置文件来设置用户和角色,以保护消息的传输安全。 7. **性能优化** - 可配置缓存策略,如使用内存池和消息存储策略,以优化消息处理速度和资源利用。 - 可通过调整...

    apache-activemq-5.13.2-bin.tar.gz

    解压`apache-activemq-5.13.2-bin.tar.gz`文件,通常使用`tar -zxvf 文件名`命令。解压后,你会得到一个名为`apache-activemq-5.13.2`的目录,里面包含了所有必要的可执行文件和配置文件。这个目录结构包括了`bin`、...

    activemq-rar-5.10.0.rar

    4. **activemq-openwire-legacy-5.10.0.jar**:OpenWire是ActiveMQ内部使用的序列化协议,用于高效地在网络上传输消息。"Legacy"可能意味着这个版本支持旧的通信协议。 5. **activemq-kahadb-store-5.10.0.jar**:...

    activemq-all-5.2.0-jar包

    使用`activemq-all-5.2.0.jar`,开发者可以通过JMS API来创建连接、生产者、消费者和消息。此外,ActiveMQ还提供了XML配置文件,用于设置服务器、网络连接、安全策略等。 **部署和运行** 在Java环境中,将`activemq...

    activemq-web-4.1.2.jar.zip

    在本篇文章中,我们将深入探讨ActiveMQ的Web组件——ActiveMQ Web 4.1.2,并重点关注其jar文件以及与之相关的依赖。 首先,`activemq-web-4.1.2.jar`是ActiveMQ Web应用程序的核心库文件,它包含了运行基于Web的...

    apache-activemq-5.15.0-bin

    - 配置:进入解压后的目录,找到`conf`文件夹,编辑`activemq.xml`配置文件,根据需求进行配置,如设置数据存储路径、网络监听端口等。 - 启动:在命令行中,切换到`bin`目录下,运行`start.bat`启动ActiveMQ...

    apache-activemq-5.3.1-bin.tar.gz

    要使用ActiveMQ,你需要一个JMS客户端库,如Apache Qpid或Artemis JMS,它们提供API来发送和接收消息。连接到ActiveMQ服务器通常涉及以下步骤:创建ConnectionFactory,创建一个Session,然后创建Producer和Consumer...

    apache-activemq-5.3.0-bin.zip

    - **消息中间件**:ActiveMQ作为消息中间件,负责在分布式系统中传输消息,解耦应用之间的通信。 - **JMS(Java Message Service)**:JMS是Java平台上的标准接口,定义了生产、发送、接收和消费消息的API。 - **...

    apache-activemq-5.15.9-bin.zip

    Apache ActiveMQ是世界上最流行的开源消息代理和队列服务器,它基于Java消息服务(JMS)规范,用于在分布式系统中高效地传输数据。这个压缩包"apache-activemq-5.15.9-bin.zip"包含了Apache ActiveMQ 5.15.9版本的可...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    这两个文件是Apache ActiveMQ项目的一部分,ActiveMQ是业界广泛使用的开放源代码消息代理,它实现了Java消息服务(JMS)规范。 1. **Apache ActiveMQ**: ActiveMQ是Apache软件基金会的一个项目,旨在提供一个高性能...

    apache-activemq-5.15.3-bin

    - **配置文件**:主要的配置文件是`conf/activemq.xml`,用户可以在此修改消息传输、持久化、安全等设置。 - **创建队列和主题**:通过管理控制台或者编程方式创建JMS队列和主题。 - **生产者与消费者**:编写应用...

    apache-activemq-5.9.1-bin.tar.gz

    这个“apache-activemq-5.9.1-bin.tar.gz”压缩包包含了在Linux环境下安装和运行ActiveMQ所需的所有文件。 首先,让我们了解一下ActiveMQ的核心概念和功能。ActiveMQ是一个中间件,它作为应用程序之间的通信桥梁,...

    apache-activemq-5.12.0-bin

    在“apache-activemq-5.12.0-bin”这个压缩包中,包含了运行Apache ActiveMQ所需的所有文件,适用于Windows操作系统。 Apache ActiveMQ作为消息队列的实现,主要功能包括: 1. **消息传输**:ActiveMQ允许应用程序...

    apache-activemq-5.15.10-bin.tar.gz

    2. **编写代码** - 使用JMS API创建ConnectionFactory,连接到ActiveMQ服务器,创建MessageProducer和MessageConsumer来发送和接收消息。 3. **测试与优化** - 测试消息传递的性能和可靠性,根据需求调整ActiveMQ的...

    apache-activemq-5.15.1-bin.tar.gz

    7. **安全特性**:ActiveMQ可以通过用户认证和授权策略来保护消息传递的安全,支持SSL/TLS加密传输,增强网络通信的安全性。 8. **消息过滤和路由**:通过使用主题和筛选器,ActiveMQ可以实现消息的智能路由,只有...

    apache-activemq-5.15.2-bin.zip

    10. **性能优化**:ActiveMQ提供了许多性能调优选项,如使用内存映射文件、批量发送消息和预分配文件,以提高消息处理速度和系统性能。 综上所述,“apache-activemq-5.15.2-bin.zip”压缩包包含了一整套在Windows...

    activemq-rar-5.6.0.rar

    7. snappy-java-1.0.3.jar:Snappy Java库,提供快速的数据压缩和解压缩,可能用于优化ActiveMQ的消息传输效率。 8. jackson-mapper-asl-1.9.2.jar:Jackson库的一部分,用于JSON序列化和反序列化,方便数据交换。 9...

    apache-activemq-5.15.7-bin.tar.gz

    总的来说,Apache ActiveMQ是一款功能强大的消息中间件,适合构建大规模、高并发、分布式系统,通过解压并配置"apache-activemq-5.15.7-bin.tar.gz"文件,开发者可以轻松地在自己的环境中部署和使用它。

Global site tag (gtag.js) - Google Analytics