JMS消息生产者:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProducer {
public final static int MAX_SEND_TIMES = 100;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
private void initialize() {
ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
user, password, url);
try {
conn = connFac.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = session.createQueue(subject);
producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
conn.start();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void produceMessage(String message) {
initialize();
try {
TextMessage tm = session.createTextMessage(message);
long startTime = System.currentTimeMillis();
System.out.println("-----------------Producer-->Sending Message---------------");
for(int i=0; i<MAX_SEND_TIMES; i++) {
producer.send(tm);
if((i+1)%1000 == 0) {
System.out.println("This is the" + i + " message!");
}
}
System.out.println("-------------------Producer--->Message Sent Complete!----------------");
long endTime = System.currentTimeMillis();
long executeTime = endTime - startTime;
System.out.println("ActiveMQ send" + MAX_SEND_TIMES + " messages used " + executeTime + "ms");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() throws JMSException {
System.out.println("--------------------Producer--->Closing Connection----------------");
if(producer != null) {
producer.close();
}
if(session != null) {
session.close();
}
if(conn != null) {
conn.close();
}
}
}
JMS消息消费者:
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumer implements MessageListener {
public static int RECEIVED_MSG_NUM = 0;
long startReceiveTime = 0;
long endReceiveTime = 0;
long receiveDuringTime = 0;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL:DEFAULT";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private void initialize() {
ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
user, password, url);
try {
conn = connFac.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = session.createQueue(subject);
consumer = session.createConsumer(dest);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void consumeMessage() {
initialize();
try {
conn.start();
System.out.println("-------------------Consumer--->Starting Listening----------------");
consumer.setMessageListener(this);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void close() throws JMSException {
System.out.println("------------Consumer--->Closing Connection--------------");
if(consumer != null) {
consumer.close();
}
if(session != null) {
session.close();
}
if(conn != null) {
conn.close();
}
}
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
if(message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
String msg = tm.getText();
if(RECEIVED_MSG_NUM == 0) {
startReceiveTime = System.currentTimeMillis();
}
RECEIVED_MSG_NUM++;
if((RECEIVED_MSG_NUM+1) % 1000 == 0) {
System.out.println("-----------------Consumer--->Received:" + RECEIVED_MSG_NUM + "--------------");
}
//Receive the last message
if(RECEIVED_MSG_NUM == ActiveMQProducer.MAX_SEND_TIMES - 1) {
endReceiveTime = System.currentTimeMillis();
receiveDuringTime = endReceiveTime - startReceiveTime;
System.out.println("---------------ActiveMQ Receive " + ActiveMQProducer.MAX_SEND_TIMES +
" messages used:" + receiveDuringTime + " ms");
} else {
System.out.println(System.currentTimeMillis() + "---Consumer--->Received:" + message);
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
测试代码:
import javax.jms.JMSException;
public class ActiveMQTest {
public static void main(String[] args) throws JMSException, InterruptedException {
ActiveMQConsumer consumer = new ActiveMQConsumer();
ActiveMQProducer producer = new ActiveMQProducer();
char[] tempChars = new char[1024];
for(int i=0; i<1024; i++) {
tempChars[i] = 'a';
}
String tempMsg = String.valueOf(tempChars);
//开始监听
consumer.consumeMessage();
producer.produceMessage(tempMsg);
producer.close();
//延时5000ms后关闭连接
Thread.sleep(5000);
consumer.close();
}
}
(说明:以上三个类放在同一个包下)
分享到:
相关推荐
activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。
其中,消息是应用程序之间传递的数据单元,客户端可以通过发送消息和接收消息的方式进行通信。 为什么使用ActiveMQ?ActiveMQ的主要优点在于它的高性能、可伸缩性、高可用性以及它的标准化。它支持多种消息协议,如...
这个"ActiveMQ接受和发送工具.rar"压缩包包含了用于与ActiveMQ交互的实用工具,方便用户进行消息的接收和发送操作。 在使用ActiveMQ时,了解以下几个关键知识点是至关重要的: 1. **Java Message Service (JMS)**...
这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...
activemq消息的发送与接受封装的工具类,只要你导入jar包
在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...
ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...
项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。
7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...
activemq 发送,接受,监控样例程序
综上所述,ActiveMQ的延迟发送功能是通过设置消息头的`JMSTimestamp`字段来实现的,而搭建集群则涉及多个Broker节点的配置,包括数据持久化、网络连接、负载均衡和容错机制等方面。在实际项目中,结合`...
实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了
在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...
【描述】:本文旨在解析activeMQ消息中间件在发送消息过程中的工作原理,包括同步发送和异步发送两种方式。activeMQ的消息发送涉及到客户端与broker之间的交互,对于消息的可靠性和性能有着直接影响。 【标签】:...
在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...
总结一下,实现“接受ActiveMQ信息,通过Openfire公告发送给指定用户”的过程主要包括以下步骤: 1. 配置ActiveMQ服务器并创建JMS消费者来接收消息。 2. 解析接收到的XML消息,提取出公告的相关信息。 3. 使用Java...
通过使用`JmsTemplate`类,我们可以方便地发送和接收消息。 1. **配置ActiveMQ**:在开始之前,我们需要在本地或者远程部署一个ActiveMQ服务器。配置文件通常为`activemq.xml`,在这里可以设置broker(消息代理)的...
ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...
此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...