`
junier
  • 浏览: 75638 次
  • 性别: Icon_minigender_1
  • 来自: 郑州
社区版块
存档分类
最新评论

jms和ActiveMQ的一个小例子

阅读更多
package com.example;


import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TopicPublisher;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

public class My implements MessageListener{

//	队列的连接工厂和连接
	private QueueConnectionFactory queueConnectionFactory;
	private QueueConnection queueConnection;
//	主题的连接工厂和连接
	private TopicConnectionFactory topicConnectinFactory;
	private TopicConnection topicConnection;
//	主题持久订阅的设置
	private String clientId;
	private String durableName;
//	是否为主题的持久订阅
	private boolean isDurable = false;
//	主题会话
	private TopicSession topicSession;
//	主题发布者
	private TopicPublisher  topicPublisher;
//	主题订阅者
	private TopicSubscriber topicSubscriber;
//	发布的主题
	private Topic topicPublish;
//	发布的主题名称
	private String topicPublishName;
//	订阅的主题名称
	private String topicSubscribName;
//	订阅的主题
	private Topic topicSubscrib;
//	是否采用主题模式
	private boolean isTopic = false;
//	队列会话
	private QueueSession queueSession;
//	队列发送者
	private QueueSender queueSender;
//	队列接收者
	private QueueReceiver queueReceiver;
//	发送的队列
	private Queue queueSend;
//  接收的队列
	private Queue queueReceive;
//	发送的队列名称
	private String queueSendName;
//	接收的队列名称
	private String queueReceiveName;
//	是否采用事务
	private boolean isTransacted = false;
//	应答的参数
	private int acknowledgementMode;
//	连接工厂的参数
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
//	是否在发送消息用持久模式
	private boolean isPersistent = false;
//	消息的过期时间
	private long timeToLive;
	
	
	public QueueConnectionFactory getQueueConnectionFactory() {
		if(this.queueConnectionFactory==null)
			this.queueConnectionFactory = new ActiveMQConnectionFactory(this.getUser(),this.getPassword(),this.getUrl());
		return queueConnectionFactory;
	}

	public QueueConnection getQueueConnection() {
		if(this.queueConnection==null)
		{

			try {
				this.queueConnection = this.getQueueConnectionFactory().createQueueConnection();
				this.queueConnection.start();
				
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return queueConnection;
	}

	public TopicConnectionFactory getTopicConnectinFactory() {
		if(this.topicConnectinFactory==null)
		{
			this.topicConnectinFactory = new ActiveMQConnectionFactory(this.getUser(),this.getPassword(),this.getUrl());
		}
		return topicConnectinFactory;
	}

	public TopicConnection getTopicConnection() {
		if(this.topicConnection==null)
		{
			try {
				this.topicConnection = this.getTopicConnectinFactory().createTopicConnection();
				
				if(this.isDurable && this.getClientId()!=null && this.getClientId().length()>0 && !"null".equals(this.getClientId()))
				{
					this.topicConnection.setClientID(this.getClientId());
				}
				this.topicConnection.start();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return topicConnection;
	}

	public String getClientId() {
		return clientId;
	}
	public void setClientId(String clientId) {
		this.clientId = clientId;
	}
	public String getDurableName() {
		return durableName;
	}
	public void setDurableName(String durableName) {
		this.durableName = durableName;
	}
	public boolean isDurable() {
		return isDurable;
	}
	public void setDurable(boolean isDurable) {
		this.isDurable = isDurable;
	}
	public TopicSession getTopicSession() {
		if(this.topicSession==null)
		{
			try {
			this.topicSession = this.getTopicConnection().createTopicSession(this.isTransacted(), this.getAcknowledgementMode());
			} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			}
		}
		return topicSession;
	}

	public TopicPublisher getTopicPublisher() {
		if(this.topicPublisher == null)
		{
			try {
				this.topicPublisher = this.getTopicSession().createPublisher(this.getTopicPublish());
//				是否采用发送方的持久及消息过期的设置
				if(this.isPersistent)
				{
					this.topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
					this.topicPublisher.setTimeToLive(this.getTimeToLive());
				}
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return topicPublisher;
	}

	public TopicSubscriber getTopicSubscriber() {
		if(this.topicSubscriber==null)
		{
			try {
				if(this.isDurable && this.isTopic )
				{
					this.topicSubscriber = this.getTopicSession().createDurableSubscriber(this.getTopicSubscrib(), this.getDurableName());
				}
				else
				{
					this.topicSubscriber = this.getTopicSession().createSubscriber(this.getTopicSubscrib());
					
				}
//				采用监听的方式接收消息,也可以采用主动接收的方式
				this.topicSubscriber.setMessageListener(this);
				this.getTopicConnection().start();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return topicSubscriber;
	}

	public Topic getTopicPublish() {
		this.topicPublish = new ActiveMQTopic(this.getTopicPublishName());
		return topicPublish;
	}

	public String getTopicPublishName() {
		return topicPublishName;
	}
	public void setTopicPublishName(String topicPublishName) {
		this.topicPublishName = topicPublishName;
	}
	public String getTopicSubscribName()
	{
		return topicSubscribName;
	}
	public void setTopicSubscribName(String topicSubscribName)
	{
		this.topicSubscribName = topicSubscribName;
	}
	public Topic getTopicSubscrib() {
		this.topicSubscrib = new ActiveMQTopic(this.getTopicSubscribName());
		return topicSubscrib;
	}

	public boolean isTopic() {
		return isTopic;
	}
	public void setTopic(boolean isTopic) {
		this.isTopic = isTopic;
	}
	public QueueSession getQueueSession() {
		if(this.queueSession==null)
		{
			try {
				this.queueSession = this.getQueueConnection().createQueueSession(this.isDurable(), this.getAcknowledgementMode());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return queueSession;
	}

	public QueueSender getQueueSender() {
		if(this.queueSender==null)
		{
			try {
				this.queueSender = this.getQueueSession().createSender(this.getQueueSend()) ;
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return queueSender;
	}

	public QueueReceiver getQueueReceiver()throws JMSException {
		if(queueReceiver==null)
		{
			this.queueReceiver = this.getQueueSession().createReceiver(this.getQueueReceive());
//			采用监听的方式接收消息,也可以采用主动接收的方式
			this.queueReceiver.setMessageListener(this);
//			this.queueConnection.start();
		}
		return queueReceiver;
	}

	public Queue getQueueSend() {
		this.queueSend = new ActiveMQQueue(this.getQueueSendName());
		return queueSend;
	}

	public 	Queue getQueueReceive()
	{
		this.queueReceive = new ActiveMQQueue(this.getQueueReceiveName());
		return queueReceive;
	}

	public String getQueueSendName() {
		return queueSendName;
	}
	public void setQueueSendName(String queueSendName) {
		this.queueSendName = queueSendName;
	}
	public String getQueueReceiveName() {
		return queueReceiveName;
	}
	public void setQueueReceiveName(String queueReceiveName) {
		this.queueReceiveName = queueReceiveName;
	}
	public boolean isTransacted() {
		return isTransacted;
	}
	public void setTransacted(boolean isTransacted) {
		this.isTransacted = isTransacted;
	}
	public int getAcknowledgementMode() {
		return acknowledgementMode;
	}
	public void setAcknowledgementMode(int acknowledgementMode) {
		this.acknowledgementMode = acknowledgementMode;
	}
	public String getUser() {
		return user;
	}
	public void setUser(String user) {
		this.user = user;
	}
	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public boolean isPersistent() {
		return isPersistent;
	}
	public void setPersistent(boolean isPersistent) {
		this.isPersistent = isPersistent;
	}
	public long getTimeToLive() {
		return timeToLive;
	}
	public void setTimeToLive(long timeToLive) {
		this.timeToLive = timeToLive;
	}
//	发布主题消息--字符串型的消息,也可以发布其他的消息类型
	public void publishTextMessage(String textMessage) throws JMSException
	{
		TextMessage message = this.getTopicSession().createTextMessage();
		message.clearBody();
		message.setText(textMessage);
		this.getTopicPublisher().publish(message);
		
		if(this.isTransacted)
		{
			this.getTopicSession().commit();
		}
		
	}
//	发送队列消息--字符串型的消息,也可以发送其他的消息类型
	public void sendTextMessage(String textMessage) throws JMSException
	{
		TextMessage message = this.getQueueSession().createTextMessage();
		message.clearBody();
		message.setText(textMessage);
		this.getQueueSender().send(message);
		if(this.isTransacted)
		{
			this.getQueueSession().commit();
		}
	}
//	监听接口的方法,用于接收消息
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		if(message instanceof  TextMessage)
		{
			TextMessage myMessage = (TextMessage)message;
			try {
				
				System.out.println(myMessage.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		if(this.isTransacted)
		{
			try {
				if(this.isTopic)
				{
					this.getTopicSession().commit();
				}
				else
				{
					this.getQueueSession().commit();
				}
				
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
//	关闭相应的资源
	public void closeResource() throws JMSException
	{ 
		if(this.isTopic)
		{
			if(this.topicPublisher!=null)
			{
				this.topicPublisher.close();
			}
			if(this.topicSubscriber!=null)
			{
				this.topicSubscriber.close();
			}
			if(this.topicSession!=null)
			{
				this.topicSession.close();
			}
			if(this.topicConnection!=null)
			{
				this.topicConnection.close();
			}
			System.out.println("TopicResource is closed..");
		}
		else
		{
			if(this.queueSender!=null)
			{
				this.queueSender.close();
			}
			if(this.queueReceiver!=null)
			{
				this.queueReceiver.close();
			}
			if(this.queueSession!=null)
			{
				this.queueSession.close();
			}
			if(this.queueConnection!=null)
			{
				this.queueConnection.close();
			}
			System.out.println("QueueResource is closed..");
		}
		
	}
//  测试
	public static void main(String[] args) throws JMSException
	{

//		队列  接收端的测试
//		My receivequeue = new My();
//		receivequeue.setDurable(false);
//		receivequeue.setTopic(false);
//		receivequeue.setTransacted(false);
//		receivequeue.setPersistent(false);
//		receivequeue.setAcknowledgementMode(QueueSession.AUTO_ACKNOWLEDGE);
//		receivequeue.setQueueReceiveName("kkk");
//		receivequeue.getQueueReceiver();
//		synchronized(receivequeue)
//		{
//			try {
//				receivequeue.wait(900);
//			} catch (InterruptedException e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//			}
//		}
//		receivequeue.closeResource();
		

		
//		主题  接收端得测试
		My subscribtopic = new My();
		subscribtopic.setDurable(true);
		subscribtopic.setClientId("opop");
		subscribtopic.setDurableName("ww");
		subscribtopic.setPersistent(false);
		subscribtopic.setTopic(true);
		subscribtopic.setTransacted(false);
		subscribtopic.setAcknowledgementMode(TopicSession.AUTO_ACKNOWLEDGE);
		subscribtopic.setTopicSubscribName("eee");
		subscribtopic.getTopicSubscriber();
	    synchronized(subscribtopic)
	    {
	    	try {
				subscribtopic.wait(900);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	    }
	    subscribtopic.closeResource();
		
		
	}
	
	
}//end class


以上是队列和主题的编码,下面是启动ActiveMQ的类
package com.by.ptop;

import org.apache.activemq.broker.BrokerService;

public class CreateServer {

	public void startServer()
	{
		BrokerService broker = new BrokerService();
//		配置jms服务器
		try {
			broker.addConnector("tcp://localhost:61616");
			broker.start();
			System.out.println("jms服务器已经启动.");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public static void main(String[] args)
	{
		CreateServer cs = new CreateServer();
		cs.startServer();
	}
}


下面是测试用的:
package com.example;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TopicSession;

import com.by.ptop.CreateServer;

public class MyTest {

	
	public static void main(String[] args) throws JMSException {
//		启动jms服务器
		CreateServer cs = new CreateServer();
		cs.startServer();

		for(int i = 1;i<=9;i++)
		{
//			队列  发送端得编程
//			My sendqueue = new My();
//			sendqueue.setDurable(false);
//			sendqueue.setPersistent(false);
//			sendqueue.setTransacted(false);
//			sendqueue.setTopic(false);
//			sendqueue.setAcknowledgementMode(QueueSession.AUTO_ACKNOWLEDGE);
//			sendqueue.setQueueSendName("kkk");
//			sendqueue.sendTextMessage("zz"+i);
//			sendqueue.closeResource();
			
//			主题 接收端得编程
			My publishtopic = new My();
			publishtopic.setDurable(true);
			publishtopic.setClientId("opop");
			publishtopic.setDurableName("ww");
			publishtopic.setPersistent(false);
			publishtopic.setTransacted(false);
			publishtopic.setTopic(true);
			publishtopic.setAcknowledgementMode(TopicSession.AUTO_ACKNOWLEDGE);
			publishtopic.setTopicPublishName("eee");
			publishtopic.publishTextMessage("zz"+i);
			publishtopic.closeResource();
		}
		
		

	}

}


以上是本人初学jms时写的,如有不足之处或者您有更好的代码,还望各位不吝赐教。小弟在这里先谢谢大家了。
分享到:
评论

相关推荐

    jms+activeMq+spring学习简单例子

    ActiveMQ是Apache软件基金会的一个开源项目,它实现了JMS规范,提供了一个高效、可靠的消息中间件。Spring框架是Java开发中的一个核心工具集,特别是在企业级应用开发中,它提供了简化事务管理、依赖注入和AOP(面向...

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...

    JMS之ActiveMQ工具类+使用例子.zip

    从给出的压缩包文件名称列表来看,这是一个包含构建脚本(mvnw和mvnw.cmd)、项目配置文件(pom.xml)、忽略文件(.gitignore)、IDE配置文件(activemq.iml、code.iml)以及帮助文档(HELP.md)的简单Java项目。...

    jms+sping+activeMq的例子serd和recevice

    总结来说,这个案例将教授初学者如何利用Spring框架集成ActiveMQ,创建一个简单的JMS应用,实现消息的发送和接收。通过学习这个案例,开发者可以更好地理解和掌握异步通信的原理以及Spring对JMS的支持。

    activeMQ JMS WEB 例子

    在Web环境中集成ActiveMQ,我们需要一个支持JMS的Web应用程序服务器,如Tomcat或Jetty。首先,要在Web应用中使用ActiveMQ,你需要将ActiveMQ的JAR文件添加到应用的类路径中。这些JAR文件通常可以在ActiveMQ的lib目录...

    JMS.rar_activemq_jms_jms activemq

    在这个例子中,我们连接到运行在本地的ActiveMQ服务器(默认端口61616),创建一个非事务性的会话,然后创建一个消息队列`myQueue`作为目的地。接着,我们创建一个`MessageProducer`来发送`TextMessage`,最后关闭...

    SpringMVC+JMS(ActiveMQ)整合的Demo

    这个Demo的价值在于它为开发者提供了一个实际操作的例子,帮助他们了解如何在实际项目中运用SpringMVC、JMS和ActiveMQ进行通信。开发者可以通过运行和调试这个Demo,学习如何在自己的应用中实现类似的功能,提升系统...

    JMS-ActiveMQ入门实例

    **JMS(Java Message Service)** 是一个Java平台上的标准接口,它定义了一种统一的API,使得应用程序可以与...通过实践这些例子,我们可以更好地掌握JMS和ActiveMQ的使用,为构建可扩展和高可用的分布式系统奠定基础。

    jms-test.zip_jms activemq_jms test

    描述中提到,“jms测试程序,将tomcat和activeMq整合在一起做的一个发送接受的发布订阅的例子”,这表明项目是基于Tomcat服务器,并且通过ActiveMQ实现了一个发布/订阅模式的消息传递。Tomcat是一个流行的Java应用...

    JMS+activeMQ消息中间件

    综上所述,这个例子提供了一个全面的实践,涵盖了Spring MVC、JMS和ActiveMQ的集成,帮助开发者理解如何在实际项目中使用消息中间件实现异步处理和解耦。通过学习这个示例,你可以了解到如何在Spring MVC环境中配置...

    SpringBoot使用JMS的小例子(ActiveMQ实现)

    SpringBoot简化了配置和应用开发,而ActiveMQ是Apache出品的一个开源、高效的JMS提供者,常用于实现消息队列和消息中间件。 首先,让我们了解JMS的基本概念。JMS是一个为分布式环境设计的API,它允许应用程序创建、...

    一个activeMQ的简单例子

    这个简单的ActiveMQ例子可能是为了演示如何设置和使用基本的生产者和消费者,以及如何通过消息队列实现异步通信。在实际应用中,我们还可以利用ActiveMQ的高级特性,如持久化、优先级、消息筛选等,以满足更复杂的...

    apache__activemq_jms 的例子(带jar包)

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息...ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    activemq的小例子

    总之,这个“ActiveMQ的小例子”旨在引导初学者进入消息中间件的世界,提供一个简单的起点来探索和实践ActiveMQ的使用,从而提升他们对分布式系统中消息传递的理解和应用能力。通过博客链接和提供的源码,你可以深入...

    Apache ActiveMQ 入门最简单例子

    在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...

    springMVC+activeMQ例子

    2. **配置ActiveMQ**:创建一个activemq.xml配置文件,设置ActiveMQ服务器的连接信息,如brokerURL、username和password。 3. **配置Spring**:在Spring的配置文件(如applicationContext.xml)中,配置JMS监听容器...

    activemq基于web的例子

    activemq基于web的例子

    ActiveMQ学习 完整例子

    - **消息中间件**:ActiveMQ作为一个消息中间件,负责在分布式系统中传递消息,解耦生产者和消费者。 - **JMS(Java Message Service)**:JMS是Java平台上的标准,定义了消息生产和消费的接口,ActiveMQ是实现JMS...

    ActiveMQ的简单例子

    本教程将通过一个简单的例子介绍ActiveMQ的两个核心模式:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。我们将使用IntelliJ IDEA作为集成开发环境来实现这些示例。 首先,我们需要...

Global site tag (gtag.js) - Google Analytics