`
Riddick
  • 浏览: 640087 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

ActiveMQ5.2发送和接受TextMessage

    博客分类:
  • JMS
阅读更多

 JMS消息生产者:

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProducer {
	
	public final static int MAX_SEND_TIMES = 100;
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	private String subject = "TOOL.DEFAULT";
	private Destination dest = null;
	private Connection conn = null;
	private Session session = null;
	
	private MessageProducer producer = null;
	
	private void initialize() {
		ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
				user, password, url); 
		try {
			conn = connFac.createConnection();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			dest = session.createQueue(subject);
			producer = session.createProducer(dest);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			conn.start();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void produceMessage(String message) {
		initialize();
		try {
			TextMessage tm = session.createTextMessage(message);
			long startTime = System.currentTimeMillis();
			System.out.println("-----------------Producer-->Sending Message---------------");
			for(int i=0; i<MAX_SEND_TIMES; i++) {
				producer.send(tm);
				if((i+1)%1000 == 0) {
					System.out.println("This is the" + i + " message!");
				}
			}
			System.out.println("-------------------Producer--->Message Sent Complete!----------------");
			long endTime = System.currentTimeMillis();
			long executeTime = endTime - startTime;
			System.out.println("ActiveMQ send" + MAX_SEND_TIMES + " messages used " + executeTime + "ms");
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void close() throws JMSException {
		System.out.println("--------------------Producer--->Closing Connection----------------");
		if(producer != null) {
			producer.close();
		}
		if(session != null) {
			session.close();
		}
		if(conn != null) {
			conn.close();
		}
	}
}

 JMS消息消费者:

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer implements MessageListener {
	
	public static int RECEIVED_MSG_NUM = 0;
	long startReceiveTime = 0;
	long endReceiveTime = 0;
	long receiveDuringTime = 0;
	
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	private String subject = "TOOL:DEFAULT";
	private Destination dest = null;
	private Connection conn = null;
	private Session session = null;
	private MessageConsumer consumer = null;
	
	private void initialize() {
		ActiveMQConnectionFactory connFac = new ActiveMQConnectionFactory(
				user, password, url);
		try {
			conn = connFac.createConnection();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			dest = session.createQueue(subject);
			consumer = session.createConsumer(dest);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void consumeMessage() {
		initialize();
		try {
			conn.start();
			System.out.println("-------------------Consumer--->Starting Listening----------------");
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void close() throws JMSException {
		System.out.println("------------Consumer--->Closing Connection--------------");
		if(consumer != null) {
			consumer.close();
		}
		if(session != null) {
			session.close();
		}
		if(conn != null) {
			conn.close();
		}
	}

	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		try {
			if(message instanceof TextMessage) {
				TextMessage tm = (TextMessage) message;
				String msg = tm.getText();
				if(RECEIVED_MSG_NUM == 0) {
					startReceiveTime = System.currentTimeMillis();
				}
				RECEIVED_MSG_NUM++;
				
				if((RECEIVED_MSG_NUM+1) % 1000 == 0) {
					System.out.println("-----------------Consumer--->Received:" + RECEIVED_MSG_NUM + "--------------");
				}
				
				//Receive the last message
				 if(RECEIVED_MSG_NUM == ActiveMQProducer.MAX_SEND_TIMES - 1) {
					 endReceiveTime = System.currentTimeMillis();
					 receiveDuringTime = endReceiveTime - startReceiveTime;
					 System.out.println("---------------ActiveMQ Receive " + ActiveMQProducer.MAX_SEND_TIMES + 
							 " messages used:" + receiveDuringTime + " ms");
				 } else {
					 System.out.println(System.currentTimeMillis() + "---Consumer--->Received:" + message);
				 }
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}	
	}
}

 

测试代码:

import javax.jms.JMSException;

public class ActiveMQTest {
	
	public static void main(String[] args) throws JMSException, InterruptedException {
		ActiveMQConsumer consumer = new ActiveMQConsumer();
		ActiveMQProducer producer = new ActiveMQProducer();
		char[] tempChars = new char[1024];
		for(int i=0; i<1024; i++) {
			tempChars[i] = 'a';
		}
		
		String tempMsg = String.valueOf(tempChars);
		//开始监听
		consumer.consumeMessage();
		producer.produceMessage(tempMsg);
		producer.close();
		
		//延时5000ms后关闭连接
		Thread.sleep(5000);
		consumer.close();
	}
}

 (说明:以上三个类放在同一个包下)

分享到:
评论

相关推荐

    activeMQ5.2的jar包及使用实例

    activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。

    ActiveMQ 5.2指导手册

    其中,消息是应用程序之间传递的数据单元,客户端可以通过发送消息和接收消息的方式进行通信。 为什么使用ActiveMQ?ActiveMQ的主要优点在于它的高性能、可伸缩性、高可用性以及它的标准化。它支持多种消息协议,如...

    ActiveMQ接受和发送工具.rar

    这个"ActiveMQ接受和发送工具.rar"压缩包包含了用于与ActiveMQ交互的实用工具,方便用户进行消息的接收和发送操作。 在使用ActiveMQ时,了解以下几个关键知识点是至关重要的: 1. **Java Message Service (JMS)**...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    activemq消息的发送与接受封装的工具类

    activemq消息的发送与接受封装的工具类,只要你导入jar包

    activeMQ发送消息返回消息

    在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。 在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是...

    activeMQ收发工具.rar

    ActiveMQ收发工具的核心功能是通过Java应用程序发送和接收ActiveMQ消息。这个jar包简化了对ActiveMQ服务器的交互过程,使得开发者无需编写复杂的代码就能进行消息传递的测试和调试。通过在命令行中执行`java -jar ...

    activemq消息发送和监听

    项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看消息队列的状态。 在实际应用中,你可能还需要考虑消息的可靠性、顺序性、幂等性以及错误处理等复杂问题。例如,使用事务性...

    activemq 发送,接受,监控样例程序

    activemq 发送,接受,监控样例程序

    ActiveMQ延迟发送

    综上所述,ActiveMQ的延迟发送功能是通过设置消息头的`JMSTimestamp`字段来实现的,而搭建集群则涉及多个Broker节点的配置,包括数据持久化、网络连接、负载均衡和容错机制等方面。在实际项目中,结合`...

    ActiveMQ消息发送接收封装实现及定时测试.

    实现了ActiveMQ的初步封装,比较适合新手入门学习,简单明了

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便实现实时的消息发送和接收功能。首先,让我们简要了解一下SpringBoot和ActiveMQ。 **SpringBoot简介** SpringBoot是Spring框架的一个...

    activeMQ消息发送过程与原理浅析

    【描述】:本文旨在解析activeMQ消息中间件在发送消息过程中的工作原理,包括同步发送和异步发送两种方式。activeMQ的消息发送涉及到客户端与broker之间的交互,对于消息的可靠性和性能有着直接影响。 【标签】:...

    ActiveMQ发送和接收protobuf协议消息的实例(精心整理,亲测可用)

    在本文中,我们将深入探讨如何使用ActiveMQ发送和接收基于protobuf(Protocol Buffers)协议的消息,同时也会介绍如何进行ActiveMQ的简化封装和配置自动重连机制。 首先,protobuf是Google开发的一种数据序列化协议...

    接受ActiveMQ信息,通过openfire公告发送给指定用户

    总结一下,实现“接受ActiveMQ信息,通过Openfire公告发送给指定用户”的过程主要包括以下步骤: 1. 配置ActiveMQ服务器并创建JMS消费者来接收消息。 2. 解析接收到的XML消息,提取出公告的相关信息。 3. 使用Java...

    spring使用activeMQ实现消息发送

    通过使用`JmsTemplate`类,我们可以方便地发送和接收消息。 1. **配置ActiveMQ**:在开始之前,我们需要在本地或者远程部署一个ActiveMQ服务器。配置文件通常为`activemq.xml`,在这里可以设置broker(消息代理)的...

    ActiveMq+SpringMVC实现邮件异步发送

    ActiveMQ作为一个开源的消息中间件,被广泛用于实现消息队列和发布/订阅模式,它允许应用将非实时任务如邮件发送等操作放到后台处理,从而提升系统的响应速度。在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件...

    springboot集成activemq实现消息接收demo

    此外,ActiveMQ支持多种协议和特性,如topic、持久化、事务消息等,可以根据项目需求进一步探索和利用。 这个简单的Demo展示了如何在Spring Boot中集成ActiveMQ进行消息接收。通过这种方式,你可以构建出一个可靠的...

Global site tag (gtag.js) - Google Analytics