`
jjjssh
  • 浏览: 78011 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论
阅读更多

最近学了下 JMS Apache mq,写了个简单的例子。

同一管道,多个生产者,多个消费,下面是代码,写得不好,求喷,求指点

生产者:

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MyProducer implements Runnable {
	private static final int SEND_NUMBER = 5;
	Connection conn = null;
	Session session;
	Destination destination;
	MessageProducer msgProducer;
	
	String name;
	
	MyProducer(ConnectionFactory connFactory,Destination destination,String name){
		//从构造工厂获得链接
		try {
			this.conn = connFactory.createConnection();
		} catch (JMSException e) {
			e.printStackTrace();
		}
			
		this.destination = destination;
		this.name = name;
	}
	public void run() {
		try {
			//启动链接
			conn.start();
			session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			//消息生产者
			msgProducer = session.createProducer(destination);
			//设置不持久化
			msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			//构造并发送消息
			sendMsg(session,msgProducer);
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}finally{
			//消息发送完,记得把连接断开
			if(null !=this.conn){
				try{
					this.conn.close();
				}catch(JMSException ej){
					ej.printStackTrace();
				}
			}
		}
	}

	public void sendMsg(Session session,MessageProducer msgProducer)throws JMSException{
		for(int i=0;i<SEND_NUMBER;i++){
			TextMessage txtMsg = session.createTextMessage("hello,I'm "+name+",comming at "+i);
			System.out.println(txtMsg.getText());
			msgProducer.send(txtMsg);
		}
	}
}

 消费者:

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MyConsumer implements ExceptionListener, Runnable {
	Connection conn = null;
	Session session;
	Destination destination;
	MessageConsumer consumer;
	String name;
	
	MyConsumer(ConnectionFactory connFactory,Destination destination,String name){
		//从构造工厂获得链接
		try {
			this.conn = connFactory.createConnection();
		} catch (JMSException e) {
			e.printStackTrace();
		}
			
		this.destination = destination;
		this.name = name;
	}
	
	public void onException(JMSException e) {
		e.printStackTrace();
		System.out.println("JMS Exception occured.  Shutting down client.");
	}

	public void run() {
		try {
			conn.start();
			session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			consumer = session.createConsumer(destination);
			
			while(true){
				TextMessage message = (TextMessage)consumer.receive(1000);
				if(null !=message){
					System.out.println(name+"收到消息:"+message.getText());
					try {
						Thread.sleep(5000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}else{
					break;
				}
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}finally{
			try{
				if(null !=conn){
					conn.close();
				}
			}catch(Exception ee){
				ee.printStackTrace();
			}
		}
	}

}

 工厂:

/*
 * MQ工厂
 * 维护连接
 * 生产 producer(一个线程)
 * 生产consumer(一个线程)
 * */
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MQFactory {

	private static ConnectionFactory connFactory;
	private static Connection conn = null;
	private static Session session;
	private static Destination destination;
	
	static{
		//构造ConnetionFactory实例对象
		connFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");
		
		try {
			conn = connFactory.createConnection();
			session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("test-queue");
		} catch (JMSException e) {
			e.printStackTrace();
		}finally{
			if(null !=conn){
				try{
					conn.close();
				}catch(JMSException ej){
					ej.printStackTrace();
				}
			}
		}
	}
	
	public static MyProducer getProducer(String name){
		MyProducer producer = new MyProducer(connFactory,destination,name);
		return producer;
	}
	
	public static MyConsumer getConsumer(String name){
		MyConsumer consumer = new MyConsumer(connFactory,destination,name);
		return consumer;
	}
}

 main主程序:

import mq.MQFactory;

public class MQHelloWorld {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws InterruptedException{
		//thread(new Producer(),false);
		thread(MQFactory.getProducer("jasion"),false);
		Thread.sleep(2000);
		thread(MQFactory.getProducer("tom"),false);
		//thread(new Consumer(),false);
		thread(MQFactory.getConsumer("laopodaren"),false);
		thread(MQFactory.getConsumer("guke"),false);
	}
	
	public static void thread(Runnable runnable,boolean daemon){
		Thread brokerThread = new Thread(runnable);
		brokerThread.setDaemon(daemon);//设置成守护线程--不用关注他本身线程的退出问题,守护线程会在所有用户线程退出时,自动退出
		brokerThread.start();
	}

}

 启动mq(这个要自己下载)

运行main程序

测试结果:

hello,I'm jasion,comming at 0
hello,I'm jasion,comming at 1
hello,I'm jasion,comming at 2
hello,I'm jasion,comming at 3
hello,I'm jasion,comming at 4
hello,I'm tom,comming at 0
hello,I'm tom,comming at 1
hello,I'm tom,comming at 2
hello,I'm tom,comming at 3
hello,I'm tom,comming at 4
guke收到消息:hello,I'm jasion,comming at 0
laopodaren收到消息:hello,I'm jasion,comming at 1
laopodaren收到消息:hello,I'm jasion,comming at 3
guke收到消息:hello,I'm jasion,comming at 2
laopodaren收到消息:hello,I'm tom,comming at 0
guke收到消息:hello,I'm jasion,comming at 4
laopodaren收到消息:hello,I'm tom,comming at 2
guke收到消息:hello,I'm tom,comming at 1
laopodaren收到消息:hello,I'm tom,comming at 4
guke收到消息:hello,I'm tom,comming at 3

 

附件是要用到的jar包

  • jms.jar (31.6 KB)
  • 下载次数: 1
1
0
分享到:
评论

相关推荐

    JMS学习笔记精心总结

    **JMS学习笔记精心总结** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准接口。它允许应用程序创建、发送、接收和读取消息,使得应用程序能够在不直接连接的情况下...

    JMS学习笔记(一)——JMS简介安装ActiveMQ

    **JMS学习笔记(一)——JMS简介与ActiveMQ安装** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的标准接口。它为应用程序提供了一种标准的方式,用来创建、发送、接收和读取...

    JMS详细实例学习教程

    JMS 详细实例学习教程 JMS(Java Message Service)是一种基于Java平台的消息服务规范,用于在分布式系统中异步通信。JMS提供了一种灵活、可靠、可扩展的消息传递机制,使得应用程序之间可以相互通信和交换数据。 ...

    JMS 学习资源

    ### JMS 学习资源详解 #### 一、JMS 基本概念与通信模型 JMS(Java Message Service)是一种广泛应用于企业级应用程序之间的消息传递标准,它定义了一系列接口,用于创建、发送、接收消息。JMS 的设计目标是简化...

    JMS学习资料,适合初始人员学习使用

    在JMS学习的过程中,了解和掌握以下几个关键概念至关重要: 1. **消息(Message)**:是数据的载体,它封装了要传输的信息,可以是文本、二进制数据或对象。JMS定义了几种不同类型的Message,如TextMessage、...

    JMS学习资料(word文档)

    Java消息服务(Java Message Service,简称JMS)是一种在分布式环境中进行可靠消息传递的标准API,由Sun Microsystems与消息导向中间件(MOM)供应商共同制定。JMS的主要目标是为了解决应用程序之间的异步通信问题,...

    JMS学习教程概述

    **JMS学习教程概述** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它允许应用程序创建、发送、接收和读取消息,为分布式系统提供了可靠的、跨平台的通信...

    由浅入深学习、掌握JMS

    此外,它还会讨论JMS的不同消息模型,如点对点(Queue)和发布/订阅(Topic)模式,以及它们在实际应用场景中的选择和优劣。 接着,**《深入掌握JMS》** 进一步探讨了JMS的高级特性,如消息选择器、消息组、消息...

    JMS学习笔记

    **JMS学习笔记** Java消息服务(Java Message Service,简称JMS)是Java平台中用于在分布式环境中交换异步消息的标准API。它提供了一种可靠的消息传递机制,使得应用程序可以在不互相依赖的情况下进行通信,从而...

    JMS demo 及 资料

    2. **消息队列(Message Queue)与主题(Topic)**: JMS提供了两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe)。在P2P模型中,消息被发送到一个消息队列,每个消息只被一个消费者接收...

    jms.rar_jar j_java jms_jms_jms jar_jms.j

    描述中的"java消息系统 JMS 学习代码 例子 jar"表明这个压缩包是为了学习JMS而准备的,里面包含了示例代码,便于开发者理解和实践JMS的工作原理。`jar`文件通常用于打包和分发Java类库,这里可能是为了方便导入和...

    JMS 简单使用指南

    - **OpenJMS**:这是一个开源的JMS实现,遵循JMS 1.0.2规范,适合用于学习和研究JMS。 - **iLinkMQ**:由中国人开发的纯Java实现,完全支持JMS接口规范1.0.2,提供事务和可靠消息传输等功能,适用于企业级应用。 ...

    ActiveMQ学习笔记(二) JMS与Spring

    在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...

    JMS--J2EE培训材料

    通过本篇文章的学习,我们不仅了解了JMS的基本结构和开发实例,还深入了解了其高级接口、编程模型以及消息读取方式等内容。掌握JMS的核心概念和技术可以帮助开发者更好地构建复杂的企业级应用,提高系统的灵活性和可...

    JBOSS建立JMS应用实例

    3. JMS实体:主要包括Message(消息)、MessageProducer(消息生产者)、MessageConsumer(消息消费者)、Queue(队列)和Topic(主题)。 二、JBOSST中的JMS配置 1. 安装与启动:首先,需要下载并安装JBOSST,然后...

    JMS规范培训教程

    **JMS规范培训教程** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。...学习JMS规范及其实现,如SUN MQ,对于从事企业级Java开发的人员至关重要。

    JMS 开发简明教程

    Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它提供了一种可靠的消息传递机制,使得应用...学习JMS不仅可以提升你的专业技能,也有助于解决复杂的企业级问题。

    jms学习笔记jms学习笔记

    本文主要探讨的是消息中间件(Message-Oriented Middleware,简称MOM),特别是在Java消息服务(Java Message Service,JMS)的学习笔记。 JMS是Java平台上的一个标准API,用于在分布式环境中进行异步消息传递。它...

Global site tag (gtag.js) - Google Analytics