JMS消息生产者:
import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageSendTest {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
private String subject = "Blob Queue";
private Destination destination = null;
private ActiveMQConnection connection = null;
private ActiveMQSession session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = (ActiveMQConnection) connectionFactory.createConnection();
/*
* !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
* (default) the uploader is lost in translation ;)
* !!!!!!!!!!!!!!!!!!!!!!!!!
*/
connection.setCopyMessageOnSend(false);
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(File file) throws JMSException, Exception {
initialize();
BlobMessage msg = session.createBlobMessage(file);
connection.start();
System.out.println("Producer:->Sending message: " + file.getName());
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
JMS消息消费者:
package iprai.ace.activemq;
import java.io.File;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageSendTest {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
private String subject = "Blob Queue";
private Destination destination = null;
private ActiveMQConnection connection = null;
private ActiveMQSession session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = (ActiveMQConnection) connectionFactory.createConnection();
/*
* !!!!!!!!!!!!!!!!!!!!!!!!! very important. If it is set to true
* (default) the uploader is lost in translation ;)
* !!!!!!!!!!!!!!!!!!!!!!!!!
*/
connection.setCopyMessageOnSend(false);
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(File file) throws JMSException, Exception {
initialize();
BlobMessage msg = session.createBlobMessage(file);
connection.start();
System.out.println("Producer:->Sending message: " + file.getName());
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
测试代码:
import java.io.File;
public class BlobMessageTest {
/**
* topic方式,必须先启动消费者,然后是生产者,否则接收不到消息。 queue方式,最好先启动生产者,然后启动消费者,否则也容易收不到消息。
*
* @param args
*/
public static void main(String[] args) throws Exception {
BlobMessageSendTest producer = new BlobMessageSendTest();
BlobMessageReceiveTest consumer = new BlobMessageReceiveTest();
String fileName = "D:/装Win7后装XP.txt";
// String fileName = "d:/JAVA+开发视频会议系统详细设计.doc";
File file = new File(fileName);
producer.produceMessage(file);
producer.close();
// 延时500毫秒之后停止接受消息
Thread.sleep(2000);
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(2000);
consumer.close();
}
}
分享到:
相关推荐
activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。
其中,消息是应用程序之间传递的数据单元,客户端可以通过发送消息和接收消息的方式进行通信。 为什么使用ActiveMQ?ActiveMQ的主要优点在于它的高性能、可伸缩性、高可用性以及它的标准化。它支持多种消息协议,如...
这个"ActiveMQ接受和发送工具.rar"压缩包包含了用于与ActiveMQ交互的实用工具,方便用户进行消息的接收和发送操作。 在使用ActiveMQ时,了解以下几个关键知识点是至关重要的: 1. **Java Message Service (JMS)**...
activemq消息的发送与接受封装的工具类,只要你导入jar包
这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...
ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...
项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。
7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...
activemq 发送,接受,监控样例程序
在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...
综上所述,ActiveMQ的延迟发送功能是通过设置消息头的`JMSTimestamp`字段来实现的,而搭建集群则涉及多个Broker节点的配置,包括数据持久化、网络连接、负载均衡和容错机制等方面。在实际项目中,结合`...
在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...
【描述】:本文旨在解析activeMQ消息中间件在发送消息过程中的工作原理,包括同步发送和异步发送两种方式。activeMQ的消息发送涉及到客户端与broker之间的交互,对于消息的可靠性和性能有着直接影响。 【标签】:...
总结一下,实现“接受ActiveMQ信息,通过Openfire公告发送给指定用户”的过程主要包括以下步骤: 1. 配置ActiveMQ服务器并创建JMS消费者来接收消息。 2. 解析接收到的XML消息,提取出公告的相关信息。 3. 使用Java...
ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...
此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...
总结,这个项目实例展示了如何利用SpringBoot的便捷性和ActiveMQ的高效通信能力,实现一个异步的邮件发送服务。通过阅读和理解这个项目,开发者可以学习到SpringBoot的整合技巧,以及消息中间件在实际业务场景中的...