- 浏览: 7330377 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
在ActiveMQ中对比较大的消息采用一种ActiveMQBlobMessage方式发送的时候,因为可采用策略的不同而已使用HTTP协议字节流,文件系统的读文件,FTP协议的方式实现发送和接收文件。
上传和下载的数据流执行过程:
在ActiveMQ中采用策略模式定义如下接口:
上传策略接口BlobUploadStrategy:
public interface BlobUploadStrategy { URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException; URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException; }
下载策略接口BlobDownloadStrategy 如下:
public interface BlobDownloadStrategy { InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException; void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException; }
通过文件方式实现上传和下载的策略为FileSystemBlobStrategy :
其中FileSystemBlobStrategy implements实现了 BlobUploadStrategy, BlobDownloadStrategy通过将源代码如下:
protected File getFile(ActiveMQBlobMessage message) throws JMSException, IOException { if (message.getURL() != null) { try { return new File(message.getURL().toURI()); } catch (URISyntaxException e) { IOException ioe = new IOException("Unable to open file for message " + message); ioe.initCause(e); } } //replace all : with _ to make windows more happy String fileName = message.getJMSMessageID().replaceAll(":", "_"); return new File(rootFile, fileName); }
实现FTP上传和下载的策略的是FTPBlobUploadStrategy,FTPBlobDownloadStrategy实现。实质是采用commons-net包中的FTPClient实现相关的功能。
接口实现如下:
public class FTPBlobDownloadStrategy extends FTPStrategy implements BlobDownloadStrategy
public class FTPBlobUploadStrategy extends FTPStrategy implements BlobUploadStrategy
上传和下载均继承自FTPStrategy的实现如下:
package org.apache.activemq.blob; import java.io.IOException; import java.net.ConnectException; import java.net.MalformedURLException; import java.net.URL; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.commons.net.ftp.FTPClient; public class FTPStrategy { protected BlobTransferPolicy transferPolicy; protected URL url; protected String ftpUser = ""; protected String ftpPass = ""; public FTPStrategy(BlobTransferPolicy transferPolicy) throws MalformedURLException { this.transferPolicy = transferPolicy; this.url = new URL(this.transferPolicy.getUploadUrl()); } protected void setUserInformation(String userInfo) { if(userInfo != null) { String[] userPass = userInfo.split(":"); if(userPass.length > 0) this.ftpUser = userPass[0]; if(userPass.length > 1) this.ftpPass = userPass[1]; } else { this.ftpUser = "anonymous"; this.ftpPass = "anonymous"; } } protected FTPClient createFTP() throws IOException, JMSException { String connectUrl = url.getHost(); setUserInformation(url.getUserInfo()); int port = url.getPort() < 1 ? 21 : url.getPort(); FTPClient ftp = new FTPClient(); try { ftp.connect(connectUrl, port); } catch(ConnectException e) { throw new JMSException("Problem connecting the FTP-server"); } if(!ftp.login(ftpUser, ftpPass)) { ftp.quit(); ftp.disconnect(); throw new JMSException("Cant Authentificate to FTP-Server"); } return ftp; } }
基于HTTP协议的默认
public class DefaultBlobUploadStrategy extends DefaultStrategy implements BlobUploadStrategy
public class DefaultBlobDownloadStrategy extends DefaultStrategy implements BlobDownloadStrategy
部分源代码如下:
public URL uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException { return uploadStream(message, new FileInputStream(file)); } public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws JMSException, IOException { URL url = createMessageURL(message); HttpURLConnection connection = (HttpURLConnection)url.openConnection(); connection.setRequestMethod("PUT"); connection.setDoOutput(true); // use chunked mode or otherwise URLConnection loads everything into // memory // (chunked mode not supported before JRE 1.5) connection.setChunkedStreamingMode(transferPolicy.getBufferSize()); OutputStream os = connection.getOutputStream(); byte[] buf = new byte[transferPolicy.getBufferSize()]; for (int c = fis.read(buf); c != -1; c = fis.read(buf)) { os.write(buf, 0, c); os.flush(); } os.close(); fis.close(); if (!isSuccessfulCode(connection.getResponseCode())) { throw new IOException("PUT was not successful: " + connection.getResponseCode() + " " + connection.getResponseMessage()); } return url; }
发表评论
-
ActiveMQ的拦截器插件
2011-07-22 09:29 6601ActiveMQ拦截器使用和原 ... -
ActiveMQ的各种表SQL的管理
2011-07-20 20:58 3500在ActiveMQ为了方便的切换数据库,更为了深入 ... -
ActiveMQ中advisory的使用和原理
2011-07-20 18:46 2900在ActiveMQ中的监控和管理也可以通过Advisory实现 ... -
ActiveMQ的异步转发(DispatchAsync)功能
2011-07-20 11:29 47011. 消息者异步转发功能 针对正常情况下,在一个 ... -
ActiveMQ 的独占消费(Exclusive Consumer)
2011-07-20 11:26 4093我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有 ... -
ActiveMQ5.5在Tomcat6.0中部署
2011-07-19 22:27 2920在ActiveMQ中监控管理Web组件为ActiveMQCon ... -
Window 下ActiveMQ端口冲突,负载均衡,主备配置
2011-07-17 16:03 5525在Java 学习中Window操作系 ... -
ActiveMQ中消息权限策略
2011-07-17 00:31 2667在ActiveMQ发送消息的时候,可以通过MessageAut ... -
ActiveMQ和Jetty整合使用
2011-07-07 22:49 5585在ActiveMQ中的activemq.b ... -
ActiveMQ 和Commons-Daemon整合
2011-07-07 20:13 2940在一般的java项目中,如果在linu ... -
关于ActiveMQ中怎么实现一对多发送消息讨论
2011-07-07 19:50 6067无 ... -
ActiveMQ 和JAXWS整合
2011-07-04 22:02 2211在多个系统中可能考虑到远程访问等的,采用WebServ ... -
ActiveMQ-Camel的使用
2011-07-02 10:27 10223在一个电子系统中可能接受来自不同供应商的 ... -
ActiveMQ模板和Velocity整合使用
2011-07-01 19:50 2306ActiveMQ模板使用 在ActiveMQ中AMQ ... -
ActiveMQ中消息游标
2011-06-30 18:16 2656在 ActiveMQ 5.0的之前版本中,b ... -
ActiveMQ和Tomcat的整合应用
2011-06-30 17:00 11209在ActiveMQ的资源让容器Tomcat管理时 ... -
ActiveMQ关于文件传输需要注意哪些方面?
2011-06-18 22:11 6176最近一直在关注一些文件传输中间件的实现,想用Acti ... -
关于ActiveMQ中Session和Connection资源的管理
2011-06-15 23:43 25099配置完了持久化之后,我们就可以使用代码来发送 ... -
ActiveMQ中关于文件锁的机制的学习
2011-06-14 23:31 3334在ActiveMQ中提供了文件数据库机 ... -
ActiveMQ的JMX监控使用
2011-06-10 17:26 13152package easyway.app.activemq.d ...
相关推荐
7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...
本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml <groupId>org.spring...
在这个简单的例子中,我们将探讨如何利用Ajax与ActiveMQ交互,发送和接收消息。首先,你需要在服务器端设置一个ActiveMQ实例,可以通过下载并安装ActiveMQ来完成。安装完成后,启动ActiveMQ服务,这通常会开启一个...
在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...
ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...
在压缩包中的"ActiveMQ接受和发送工具"很可能包含了一个图形界面或者命令行工具,使得用户可以更直观地发送测试消息到ActiveMQ服务器,查看消息队列的状态,以及接收消息。使用这些工具,开发者可以快速验证ActiveMQ...
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...
这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...
ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...
3. 创建会话:在连接上创建`ISession`对象,会话是执行消息发送和接收操作的上下文。 4. 创建消费者:创建一个`IMessageConsumer`,它是接收消息的对象,通常会指定一个目的地(如队列或主题)。 5. 接收消息:调用...
在本文中,我们将深入探讨如何使用`activemq-cpp`库在C++环境中发送和接收消息,并利用消息过滤器来实现特定的消息处理。`activemq-cpp`是Apache ActiveMQ的一个C++客户端,它提供了与ActiveMQ服务器进行交互的能力...
4. 数据交换:客户端和服务器之间可以通过发送和接收WebSocket帧来交换消息。ActiveMQ会将内部的消息格式转换为WebSocket帧,并反之亦然。 5. 关闭连接:当不再需要连接时,客户端或服务器可以发送关闭帧来终止连接...
在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...
在“MQClient”文件中,我们可以期待看到具体的客户端代码示例,这些示例可能包括连接到ActiveMQ服务器、创建Session、发送和接收消息等操作。 ActiveMQ还支持多种协议,如OpenWire、AMQP、STOMP、MQTT和WS,这使得...
4. 运行客户端代码,启动发送端和接收端。 ### 7. 性能优化与安全 为了确保高效稳定地运行,ActiveMQ提供了多种性能优化选项,如调整内存使用、设置缓存大小、启用批量发送等。同时,对于安全性,ActiveMQ支持SSL/...
6. **事务管理**:Spring和ActiveMQ可以结合使用JTA事务,确保消息发送和业务操作的原子性。当开启JTA事务时,如果业务操作失败,消息也会被回滚,不会被发送到队列。 **前台接收消息** 在Web应用中,前台接收消息...
ActiveMQ,作为一款流行的开源消息中间件,也支持WebSocket协议,使得客户端可以通过WebSocket接口来接收和发送消息。 ActiveMQ是Apache软件基金会开发的消息队列产品,它遵循开放标准,如JMS(Java Message ...
通过使用`JmsTemplate`类,我们可以方便地发送和接收消息。 1. **配置ActiveMQ**:在开始之前,我们需要在本地或者远程部署一个ActiveMQ服务器。配置文件通常为`activemq.xml`,在这里可以设置broker(消息代理)的...