`
longgangbai
  • 浏览: 7331463 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

JMS 异步消息的开发(一)

阅读更多

    JMS的简单应用:

点对点模型(Point-to-Point)

          点对点消息传送模型允许JMS客户端通过队列(queue)这个虚拟通道来同步和异步发送、接收消息。在点对点模型中,消息生产者称为发送者(Sender),而消息消费者则称为接收者(receiver)。传统上,点对点模型是一个基于拉取(Pull)或基于轮询(polling)的消息传送模型,这种模型从队列中请求消息,而不是自动地将消息推送到客户端。点对点消息传送模型的一个突出特点就是:发送到队列的消息被一个而且仅仅一个接收者所接收,即使可能有多个接收者在一个队列中侦听同一消息时,也是如此。

          点对点消息传送模型既支持异步“即发即弃(fire and forget)”消息传送方式,又支持同步请求/应答消息传送方式。点对点消息传送模型比发布/订阅模型具有更强的耦合性,发送者通常会知道消息将被如何使用,而且也会知道谁将接收该消息。举例来说,发送者可能会向一个队列发送一个证券交易订单并等待响应,响应中应包含一个交易确认码。这样一来,消息发送者就会知道消息接收者将要处理交易订单。另一个例子就是一个生成长时间运行报告的异步请求。发送者发出报告请求,而当该报告准备就绪时,就会给发送者发送一条通知消息。在这种情况下,发送者就会知道消息接收者将要处理该消息并创建报告。

          点对点模型支持负载均衡,它允许多个接收者侦听同一队列,并以此来分配负载。如图1-4所示,JMS提供者负责管理队列,确保每条消息被组内下一个可用的接收者消费一次,而且仅仅一次。JMS规范没有规定在多个接收者中间分发消息的规则,尽管某些JMS厂商已经选择实现此规则来提升负载均衡能力。点对点模型还具有其他优点,比如说,队列浏览器允许客户端在消费其消息之前查看队列内容——在发布/订阅模型中,并没有这种浏览器的概念。

在JBoss下面JMS的简单开发点对点的JMS应用:

 

部署关于JMS的配置过程如下:

备注:

              JMS配置文件必须在JBoss的在%JBoss_HOME%\server\default\deploy中,同时文件的命名必须以xxx-service.xml中。

 

jboss-jms-service.xml配置信息如下:

 

<?xml version="1.0" encoding="UTF-8"?>
<!-- $Id: mail-service.xml 62350 2007-04-15 16:50:12Z dimitris@jboss.org $ -->
<server>

  <!-- ==================================================================== -->
  <!-- Mail Connection Factory                                              -->
  <!-- ==================================================================== -->

  <mbean code="org.jboss.mq.server.jmx.Queue"
         name="jboss.mq.destination:service=Queue,name=longgangbai">
    <attribute name="JNDIName">queue/longgangbai</attribute>
    <depends optional-attribute-name="DestinationManager" >jboss.mq:service=DestinationManager</depends>
  </mbean>

</server>

 

在客户端:

备注下面为:JMS1.1 TopicConnectionFactory 和QueueConnectionFactory合并为ConnectionFactory

package com.easyway.jboss.jms.ptp.service;
import java.util.Properties;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
 * 队列消息的发送者
 * 采用JMS点对点,一对一的(PTP消息传递模型)
 * @author longgangbai
 *
 */
public class QueueSender {
	
	/**
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		QueueConnection conn=null;
		QueueSession session=null;
		try {
			//得到一个JNDI初始化上下文
			Properties props=new Properties();
			//设置
			props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
			
			props.setProperty(Context.PROVIDER_URL, "localhost:1099");
			
			props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
			
			InitialContext ctx=new InitialContext(props);
			//根据上下文查找一个连接工厂TopicConnectionFactory/QueueConnectionFactory  (有了两种连接工厂,
			//根绝topic、queue来是哟偶那个相应的类型),该连接工厂是有JMS提供的,不需要我们自己创建,每一个
			//厂商为他绑定一个全局的JNDI,我们功过JNDI便可获取它。
			QueueConnectionFactory factory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
			//从连接工厂 中得到一个连接(Connect 的类型有两种,TopicConnection、QueueConnection)
			conn=factory.createQueueConnection();
			//通过连接来建立一个会话(Session)
			//建立一个不需要事物的并且能自动确认消息已接收的会话
			session=conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
			//查找目的地(目的地的类型有两种:Topic、Queue)
			Destination destination =(Queue)ctx.lookup("queue/longgangbai");
			//根绝会话以及目的地建立消息生产者MessageProducter(QueueSender 和TopicPublisher都扩展自MessageProducer接口)
			MessageProducer producer=session.createProducer(destination);
			
			TextMessage msg=session.createTextMessage("hi ,longgangbai ,this is jboss jms MDB message");
			//发送文本消息
			producer.send(msg);
			//发送对象消息
			producer.send(session.createObjectMessage(new SMS("wangnabaobao","this is my girl friend")));
			
			
			MapMessage mapmsg=session.createMapMessage();
			mapmsg.setObject("name", "baobao");
			producer.send(mapmsg);
			
			BytesMessage bmsg=session.createBytesMessage();
			bmsg.writeBytes("I am a good boy !".getBytes());
			
			producer.send(bmsg);
			
			
			StreamMessage smsg=session.createStreamMessage();
			smsg.writeString("阿里巴巴,http://www.alibaba.com");
			producer.send(smsg);
			
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			
			try {
				session.close();
				conn.close();
			} catch (JMSException e2) {
				e2.printStackTrace();
			}
		}
	}

}

 

运行JBoss在:

在jboss.mq.destination中的选择相关的消息服务信息:

 

 

选择listMesages:

 

 

查询

 

 

 接受消息的代码如下:

package com.easyway.jboss.jms.ptp.service;

import java.io.ByteArrayOutputStream;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

/**
 * Queue消息的接受方的MDB
 * @author longgangbai
 * 
 * 
 * 在JBoss 的%JAVA_HOME%\server\default\deploy目录下添加相关配置
 * @author longgangbai
 */
@MessageDriven(activationConfig={
		@ActivationConfigProperty(propertyName = "destinationType",  //消息的目标的类型: 取值可以为javax.jms.Queue或者javax.jms.Topic
				propertyValue = "javax.jms.Queue"),
	    @ActivationConfigProperty(propertyName = "destination",  //消息的目标的地址,取值是目标地址的JDNI名称:
						propertyValue = "queue/longgangbai"),
	    @ActivationConfigProperty(propertyName = "acknowledgeMode", //消息的确认模式,却只可以为:Auto-acknowledge或者Dups_ok_acknowledge
	    		//消息确认:是指JMS客户端通知JMS provider确认消息已经到达的一种机制,在EJB中,收到消息后发送确认是EJB容器的职责,
	    		//确认消息就是JMS Provider ,EJB容器已经收到并处理了消息,如没有确认,JMS provider 就不知道容器是否收到了,消息,进而重发消息
									propertyValue = "Auto-acknowledgeMode")
})
public class DisplayMessage implements MessageListener{

	@Override
	public void onMessage(Message msg) {
		try {
			if(msg instanceof TextMessage){
				TextMessage tmsg=(TextMessage)msg;
				String content=tmsg.getText();
				System.out.println(content);
			}else if(msg instanceof ObjectMessage){
				ObjectMessage tmsg=(ObjectMessage)msg;
				SMS sms=(SMS)tmsg.getObject();
				String content=sms.getUsername()+":"+sms.getMessage();
				System.out.println(content);
			}else if(msg instanceof MapMessage){
				MapMessage mmsg=(MapMessage)msg;
				String content=mmsg.getString("name");
				System.out.println(content);
			}else if(msg instanceof BytesMessage){
				BytesMessage bytesmsg=(BytesMessage)msg;
				ByteArrayOutputStream byteStream=new ByteArrayOutputStream();
				byte[] buffer=new byte[256];
				int length=0;
				while((length=bytesmsg.readBytes(buffer))!=-1){
					byteStream.write(buffer,0,length);
				}
				String context=new String(byteStream.toByteArray());
				System.out.println(context);
			}else if(msg instanceof ObjectMessage){
				ObjectMessage tmsg=(ObjectMessage)msg;
				SMS sms=(SMS)tmsg.getObject();
				String content=sms.getUsername()+":"+sms.getMessage();
				System.out.println(content);
			}else if(msg instanceof StreamMessage){
				StreamMessage tmsg=(StreamMessage)msg;
				String content=tmsg.readString();
				System.out.println(content);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

package com.easyway.jboss.jms.ptp.service;
import java.util.Properties;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
 * 队列消息的接受者
 * 采用JMS点对点的模式
 * @author longgangbai
 *
 */
public class QueueReceive {
		/**
		 * 
		 * @param args
		 */
		public static void main(String[] args) {
			QueueConnection conn=null;
			QueueSession session=null;
			try {
				//得到一个JNDI初始化上下文
				Properties props=new Properties();
				//设置
				props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
				
				props.setProperty(Context.PROVIDER_URL, "localhost:1099");
				
				props.setProperty(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
				
				InitialContext ctx=new InitialContext(props);
				//根据上下文查找一个连接工厂TopicConnectionFactory/QueueConnectionFactory  (有了两种连接工厂,
				//根绝topic、queue来是哟偶那个相应的类型),该连接工厂是有JMS提供的,不需要我们自己创建,每一个
				//厂商为他绑定一个全局的JNDI,我们功过JNDI便可获取它。
				QueueConnectionFactory factory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");
				//从连接工厂 中得到一个连接(Connect 的类型有两种,TopicConnection、QueueConnection)
				conn=factory.createQueueConnection();
				//通过连接来建立一个会话(Session)
				//建立一个不需要事物的并且能自动确认消息已接收的会话
				session=conn.createQueueSession(false,TopicSession.AUTO_ACKNOWLEDGE);
				//查找目的地(目的地的类型有两种:Topic、Queue)
				Destination destination =(Queue)ctx.lookup("queue/longgangbai");
				//根绝会话以及目的地建立消息生产者MessageProducter(QueueSender 和TopicPublisher都扩展自MessageProducer接口)
				MessageConsumer consumer=session.createConsumer(destination);
				
				DisplayMessage ml = new DisplayMessage();   
				consumer.setMessageListener(ml);  
				System.out.println("开始处理休息..");
				conn.start();
				System.out.println("消息处理完毕...");
				
			} catch (Exception e) {
				e.printStackTrace();
			}finally{
				
				try {
					session.close();
					conn.close();
				} catch (JMSException e2) {
					e2.printStackTrace();
				}
			}
		}

	}

 

分享到:
评论

相关推荐

    tomcat spring jms 异步消息传递入门实例

    本教程将带你逐步了解如何利用Tomcat、Spring和JMS(Java Message Service)构建一个简单的异步消息传递入门实例。 首先,让我们来理解一下核心组件: 1. **Tomcat**:这是一个流行的开源Java Servlet容器,用于...

    JMS异步通信

    Java消息服务(JMS)是Java平台上用于异步通信的标准API。它允许应用程序通过消息传递在分布式系统中进行通信,从而实现解耦和提高...通过截图和教程,WSAD提供了一个全面的平台,帮助开发者理解和实现JMS异步通信。

    spring jms tomcat 异步消息传递入门实例

    在IT行业中,Spring框架是Java应用开发的基石,它提供了丰富的功能来简化应用程序的构建,而JMS(Java Message Service)则是处理异步通信的一种标准API。Tomcat作为轻量级的应用服务器,常用于部署Spring应用。在这...

    Spring JMS使异步消息变得简单.doc

    【Spring JMS】是Spring框架中的一个模块,用于简化Java消息服务(JMS)的使用,使得异步消息处理变得更加简单和灵活。Spring JMS通过提供一个模板类,抽象了JMS API的底层细节,让开发者能够更加专注于消息的处理...

    使用Spring JMS轻松实现异步消息传递.docx

    在 IT 领域,特别是在 Web 应用开发中,异步消息传递是一种提高系统效率和响应性的重要技术。传统的同步处理方式可能会导致用户等待时间较长,尤其是在处理复杂的业务逻辑或者需要执行耗时操作时。Spring 框架提供了...

    基于EASERVER的异步消息,已测试通过

    总的来说,"基于EASERVER的异步消息,已测试通过"这个主题涵盖了企业级应用开发中的一个重要概念,即异步通信,以及如何在EASERVER环境中利用PB11.5和C#实现这一机制。这样的实现有助于提高系统性能,处理高并发请求...

    J2EE的异步消息机制

    总的来说,J2EE的异步消息机制通过JMS提供了强大的工具,允许企业应用在高并发、高负载的环境中实现高效、可靠的通信,增强了系统的灵活性和稳定性。开发者利用这些机制可以构建出能够应对复杂业务场景的企业级解决...

    JMS两个开发包

    Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的标准API。它允许应用程序创建、发送、接收和读取消息。在Java应用环境中,JMS扮演着至关重要的角色,尤其在分布式系统和企业...

    JMS 开发简明教程

    Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它提供了一种可靠的消息传递机制,使得应用程序可以在分布式环境中发送、接收和管理消息。JMS允许应用程序在不...

    JMS消息发送及订阅

    Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的标准接口。它提供了一种可靠的消息传递机制,使得应用程序可以在分布式环境中交换信息。在这个主题中,我们将深入探讨JMS消息...

    浅析WebLogic服务器上异步消息的接收

    消息驱动Bean(Message-Driven Bean,简称MDB)是EJB 2.0规范中引入的一种无状态会话Bean,专门用于处理基于消息的异步请求。 在WebLogic服务器上接收异步消息,主要依赖于MDB以及与之相关的技术和机制。以下将从几...

    JMS消息模型 JMS学习.doc

    Java 消息服务(JMS,Java Message Service)是一种用于在分布式系统中进行异步数据交换的API,它为应用程序提供了标准的接口来发送和接收消息。JMS规范由JavaSoft(现为Oracle公司的一部分)制定,目的是促进不同...

    结合Spring2.0和ActiveMQ进行异步消息调用

    在IT行业中,异步消息处理是一种常见的优化系统性能和可扩展性的技术。Spring框架和ActiveMQ的结合使用,为开发者提供了强大的异步消息传递能力。本文将深入探讨如何结合Spring 2.0与ActiveMQ来实现异步消息调用,并...

    C#JMS开发

    虽然JMS是为Java设计的,但通过适配器或开源库,C#开发者也能利用其功能,实现分布式系统中的异步通信和消息传递。 **一、JMS简介** JMS是一种API,用于在不同的应用程序之间发送和接收消息。它提供了标准化的接口...

    Spring+weblogic接收JMS消息

    在IT行业中,Spring框架是Java应用开发中的一个关键组件,尤其在企业级应用中,它提供了丰富的功能,如依赖注入、AOP(面向切面编程)以及与各种服务的集成,包括消息传递系统。WebLogic Server是Oracle公司的一款...

    weblogic与jms+spring

    JMS 是一个为Java平台设计的消息中间件接口,它允许应用程序通过消息传递进行通信,而Spring框架则提供了一种便捷的方式来管理应用程序的配置和服务,包括对JMS的集成。 **WebLogic的安装与配置:** 1. 下载...

    消息中间件和JMS消息服务.pdf

    ### 消息中间件与JMS消息服务详解 #### 一、引言 随着分布式系统的规模和复杂度不断增加,传统的远程过程调用(RPC)中间件技术...对于开发人员来说,理解和掌握JMS接口和消息模型是构建高效、可靠的企业级应用的基础。

    Spring+weblogic9.2发送JMS消息

    在IT行业中,Spring框架是Java应用开发中的一个关键组件,它提供了一整套服务和工具,使得开发者可以更高效地构建可维护、可扩展的应用。WebLogic Server是Oracle公司的一款企业级应用服务器,广泛用于部署和管理...

    jms消息通讯

    在IT行业中,JMS(Java Message Service)是Java平台上的一个标准接口,它定义了用于在分布式环境中交换异步消息的API。JMS允许应用程序创建、发送、接收和读取消息,有效地支持企业级应用之间的通信。在J2EE(Java ...

Global site tag (gtag.js) - Google Analytics