1、介绍
本章介绍在Jboss中使用Java Messageing Service (JMS). 不是JMS指南,而是JBoss和JMS如何一起使用,如果你想看JMS的介绍,请参考 JMS Specification 或 JMS turorial.
最近随着JBoss版本不断更新,对JMS支持越来越成熟,也造成JBoss各个版本之间的不同。在这里我主要介绍JBoss3.0.X版本。
本章给出的例子比较简单,并指出了如何使用JMS的原理。所有的例子可以通过ant build file 来建立。
为了能建立和运行此例子。我们使用两种方式来进行:一是使用Ant命令,二是使用JAR和JAVA基本命令。必须有下面的环境变量:
JAVA_HOME 安装JDK1.4的目录。
JBOSS_DIST 安装JBoss的目录。
你必须安装JDK, 如果使用Ant必须安装 Ant。可以参考我前面文档的介绍。
2、概述
1) 什么是JMS
JMS是Java API, 允许应用程序来建立、接收和读取消息。程序依靠这些API, 在运行时需要一个JMS实现接口,来提供管理和控制,这被称为JMS provider, 现在有几种不同的JMS Provider; 在JBoss中的叫做JbossMQ。
2) JMS 和J2EE
JMS是在EJB和J2EE框架开发之前进行开发的,所以在JMS说明书中没有涉及到EJB或J2EE。
EJB 和J2EE第一代版本中也没有涉及到JMS,一直到EJB1.1,在生成一个可用Beand的容器provider中JMS也不是必须的API。在J2EE1.2中JMS接口是必需的情况,但并不是非得要包含一个JMS Provider;在EJB2.0和J2EE1.3中又进行改变,应用服务器包含了一个JMS Provider,自从J2EE1。3需要EJB2.0,增加了以下两个JMS特性:
w 一种新Bean类型定义, 也就是消息驱动Beam (MDB), 这种bean做为JMS消息监听者,可以异步地处理JMS消息。
w JMS处理作为资源,来自一个Bean 的JMD 发布(发送)必须能和其他bean的全局事务环境共享。这个需要把JMS认为是一个容器管理资源,象JDBC的连接。
3) JMS 和JBoss
JBoss从2.0版本以后都支持JMS。 在2.1中增加了MDB,从2.4版本开始JMS作为一个事务资源。
JBoss中JMS的体系结构如下:JMS Provider, 叫做JbossMQ 。 是JBoss实现JMS 1.0.2规范的一部分,包括可选部分,象ASF(Application Service Facvility)。 JbossMQ处理和普遍JMS一样:建立 queues (队列)或topic(标题),持久性等。 MDB (Message Driven Beans), 资源适配器。
3、JMS Provider
JBoss有它自己的JMS Provider 叫做JbossMQ。 适合与JMS 1.0.2 的JMS Provider,并且能用在所有平常的JMS程序中。为了清楚在JBoss中JMS是如何工作的,首先要清楚在JMS中涉及到的概念和术语,最好的办法是阅读JMS规范,下面给出了简单的JMS介绍。
1) JMS的简单介绍
当你发送一个消息,你不能直接发送到对此消息感兴趣的接受者。而是你发送到一个目的地。对此消息感兴趣的接受者必须连接到目的地,得到此消息或在目的地设置订阅。
在JMS中有两种域:topics 和queues 。一个消息发送到一个topics ,可以有多个客户端。用topic发布允许一对多,或多对多通讯通道。消息的产生者被叫做publisher, 消息接受者叫做subscriber。
queue 是另外一种方式,仅仅允许一个消息传送给一个客户。一个发送者将消息放在消息队列中,接受者从队列中抽取并得到消息,消息就会在队列中消失。第一个接受者抽取并得到消息后,其他人就不能在得到它。
为了能发送和接收消息,必须得到一个JMS连接。该连接是使用JMS Provider得到连接的,在得到连接之后,建立一个会话(Session)。然后再建立publisher/sender 来发送消息或subscriber/receiver来接收消息。
运行时,如果使用topic 那么publisher 或subscriber 通过一个topic来关联,如果使用queue ,则sender 或receiver通过queue来关联起来。
通常,在JMS框架中运转的方法如下:
(1) 得到一个JNDI初始化上下文(Context);
(2) 根据上下文来查找一个连接工厂TopicConnectFactory/ QueueConnectionFactory (有两种连接工厂,根据是topic/queue来使用相应的类型);
(3) 从连接工厂得到一个连接(Connect 有两种[TopicConnection/ QueueConnection]);
(4) 通过连接来建立一个会话(Session);
(5) 查找目的地(Topic/ Queue);
(6) 根据会话以及目的地来建立消息制造者(TopicPublisher/QueueSender)和消费者(TopicSubscriber/ QueueReceiver).
为了得到一个连接和得到一个目的地(用来关联publisher/sender 或subscriber/receiver),必须用provider-specific参数。
通过JNDI来查找相应的连接工厂或目的地,JNDI适合于任何JMS Provider。但是查找用的名字是provider使用的。因此,在你使用的JMS Provider(其中包括JBossMQ),必须学会如何进行指定。JMS Provider中的任何设置,象连接特性,用到目的地等,在用到的Provider都有明确描述。
2) 配置
当使用一个JMS Provider时,有三个Provider-specific因素:
1、得到一个JNDI初始化上下文
2、用到的连接工厂的名字。
3、对目的地的管理和命名协定。
JBoss同它的JNDI一起执行。为了简单的JMS client, 配置和查找初始化上下文,同实现其他J2EE客户端一样。
JMS-specific 来约束JBoss 的JMS provider (JBossMQ)。JbossMQ是通过xml 文件jbossmq-service.xml进行配置的,该文件放在在server\default\deploy下。
在xml文件中最基本的三件事情:
1、增加一个新的目的地
2、配置用户
3、获得可用连接工厂的名字。
(1) 增加新的目的地
1、在目的地的xml文件在jboss 3.x中是jbossmq-destinations-service.xml(server/../deploy)。在文件中已经存在几个缺省的目的地,所以你比较容易明白怎样增加到文件中。在例子中你需要一个topic目的地 spool,所以增加下面的语句到jbossmq-destinations-service.xml中。这种方式是长久存在的,不随着JBoss服务器关闭而消失。
2、另外一种方法是可以通过JMX HTML管理界面。通过http://localhost:8080/jmx-console 来访问。在jboss.mq 下查找service=DestinationManager 的连接。然后在createTopic()或createQueue()来建立,这种方法建立的目的地是临时性的,随着服务器开始存在,当当JBoss 服务器重新启动时,动态建立的目的地将会不存在。在JbossMQ中所有目的地都有一个目的地类型的前缀。对于topic前缀是topic ,对于queues前缀是queue。例如查找一个testTopic目的地,需要查找名字为“topic/testTopic”。
在此种方法中有createTopic()或createQueue()分别有两种方法:一是有两个参数,二是有一个参数的。两个参数分别是:建立的目的地名称和JNDI名称。一个参数的只是目的地名称,对于JNDI名称默认是:[目的地类型(topic/queue) ]/目的地名称。
在这里我们使用的是第一种方法。直接修改jbossmq-destinations-service.xml文件。
(2) 管理用户
在JMS中可能关联一个连接和一个用户,不幸的是没有明确的方法来限制访问JbossMQ或访问特殊的目的地到一个给定的用户。为了给大部分角色,在JbossMQ中不需要建立用户,除非想有一个持久topic订阅者。在这个例子中,用户是必须的。
用户可以直接在文件jbossmq-state.xml(server/../conf)中添加。同样也可以使用JMX HTML管理界面来增加(jboss.mq->service=StateManager->addUser())。
(3) 连接工厂
JBossMQ 包括为topic和queue几个不同的连接工厂,每个连接工厂有自己特性。当通过JNDI来查找一个连接工厂时,需要知道此连接工厂的名称。所有可用连接工厂和它们的属性,名称都会在文件jbossmq-service.xml中。
有三种类型连接工厂,依赖的通讯协议如下:
OIL
快速双向scoket通讯协议。它是缺省的。
UIL
超过一个socket协议,可以使用在通过防火墙访问,或者当客户端不能正确的查找到服务器的IP地址。
RMI
最早的协议,是稳定的,但是比较慢。
JVM
在JBoss 2.4之后增加的一个新的连接工厂类型。不需要用socket,当客户端和JbossMQ使用同样虚拟机时,可以使用。
在JBoss2.4.1以后版本中,对于topic- 和 queue-目的地,连接工厂使用同样的名字。下表列出了在JBoss中JMS连接工厂:
目的地类型 JNDI名字 连接工厂类型
Topic/Queue java:/ConnectionFactory JVM
Topic/Queue java:/XAConnectionFactory JVM支持XA事务
Topic/Queue RMIConnectionFactory RMI
Topic/Queue RMIXAConnectionFactory RMI支持XA事务
Topic/Queue ConnectionFactory OIL
Topic/Queue XAConnectionFactory OIL支持XA事务
Topic/Queue UILConnectionFactory UIL
Topic/Queue UILXAConnectionFactory UIL支持XA事务
3) JBoss中高级JMS配置
在上边段落主要描述了和JbossMQ一起实行的基本配置工作。在本段来描述JMS其他配置。
(1) JMS持久性管理
JMS持久性管理(PM)负责存储消息,并且将消息标记为持久,如果服务器发生故障时,能保证消息不会丢失,并允许恢复持久性消息。持久性JMS消息可以使用不同的方法来完成。每个方法有自己优点和缺陷:
PM名字 优点 缺点
File 比较稳定 速度慢
Rollinglogged 速度比较快 此方法比较新,有些地方需要完善
JDBC 对于稳定性和可量测性比较好 必须有JDBC
Logged 速度快 Log files grow without bound, memory management problems- during recovery
在JBoss中缺省的持久性消息管理是File持久性管理。可以改变它,但必须对于一个JMS
Server有且仅有一个持久性管理配置。所以你在JBoss管理界面的jboss.mq – >
service=PersistenceManager 只是看到一个。
持久性管理的配置文件是jbossmq-service.xml。在server\..\deploy下。
为了让大家了解持久性管理的各种方法,我下面来逐个介绍如何配置。
File持久性管理
File持久性管理的概念就是为每一个持久性消息建立一个文件。消息持久性方法不是全部都能使用,但它是比较稳定的。
File持久性管理在JBoss发布时,作为一个缺省的持久性管理。如果你打开jbossmq-service.xml文件,你会看到下面的XML:
当设置Mbean配置时,File持久性管理允许你指定下面的属性:
DataDircetory 是存放持久性消息的路径,会把生成的数据文件放在此目录下。
设置Rollinglogged持久性管理
Rollinglogged持久性管理是比较新的一种持久性消息管理方法,因为它使用日志文件来持续多个消息,所以当建立一个文件时,不需要许多的I/O。
定义Rollinglogged持久性管理:
<mbean code="org.jboss.mq.pm.rollinglogged.PersistenceManager"
name="jboss.mq:service=PersistenceManager">
<attribute >db/jbossmq/file</attribute>
<depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends>
</mbean>
Rollinglogged持久性管理中DataDirctory 存放持久性消息的路径,会把生成的数据文件放在此目录下。
1)设置JDBC持久性管理
JDBC持久性管理使用数据库表来存储消息。需要一个JBoss配置的数据源来访问数据库。具体内容参考jbossmq-service.xml文件中的内容。
2)设置Logged持久性管理
Logged持久性管理是比较早的一个,在JBoss2.4.1以后版本中不在建议使用。现在有其他更好的办法。
4、举例说明
当我们清楚了以后内容后,现在我们来用JBoss实现一个例子来加深对JBoss和JMS的了解。
在上面叙述中,我们知道明确使用JMS provider有三个基本的事情要做:配置JNDI初始化上下文,连接工厂的名字和使用目的地的名字。
当编写产品的最好的事情是不受provider-specific 影响,使代码能在不同的JMS provider之间容易移植。在此这个例子没有聚焦在开发产品上,而是解释如何使用JbossMQ来工作。
1) 初始化上下文
配置JNDI的一个方法是通过属性文件jndi.properties。在这个文件中使用正确的值,并且把它所在的路径包含到classpath中,它比较容获得正确初始化上下文。
jndi.properties文件的内容如下:
把该文件放置的路径成为你的classpath的一部分。如果你使用这种方法,在初始化上下文时,代码比较简单: Context context = new IntialContext();1
在某些情景下,可能需要手工配置JNDI;例如当运行的类文件中环境已经配置了一个初始化上下文,但不是你想用的上下文时,需要手工来配置一个上下文。设置在哈希表中的几个属性值,并且使用此哈希表来实例化一个上下文。定义语法:
2) 查找连接工厂
自有了上下文后,需要查找一个连接工厂。为了查找它,使用一个可用的名字。查找连接工厂的代码如下:
对于一个topic目的地
Queue 目的地:
3) 建立连接和会话
在我们有了连接工厂后,建立一个连接,在此连接中建立一个会话。
对于topic代码如下:
对于queue代码如下:
一个会话建立时,配置是否调用事务
在事务Session中,当事务被提交后,自动接收,如果事务回滚,所有的被消费的消息将会被重新发送。
在非事务Session中,如果没有调用事务处理,消息传递的方式有三种:
Session.AUTO_ACKNOWLEDGE :当客户机调用的receive方法成功返回,或当MessageListenser 成功处理了消息,session将会自动接收消息的收条。
Session.CLIENT_ACKNOWLEDGE :客户机通过调用消息的acknowledge方法来接收消息。接收发生在session层。接收到一个被消费的消息时,将自动接收该session已经消费的所有消息。例如:如果消息的消费者消费了10条消息,然后接收15个被传递的消息,则前面的10个消息的收据都会在这15个消息中被接收。
Session.DUPS_ACKNOWLEDGE :指示session缓慢接收消息。
4) 查找目的地
现在我们来介绍建立publishes/sends 或subscribles/receives消息。
下面的代码列出来查找一个目的地:
对于topic 查找一个testTopic目的地
对于queue 查找一个testQueue目的地
注意:JbossM的前缀topic/ (queue/)通常被放在topic (queue)名字前面。
在JMS中,当客户机扮演每种角色,象对于topic来将的publisher ,subscriber 或对于queue来将的sender, receiver, 都有自己不同类继承和不同函数。
5) 建立一个消息制造者Message Producer (topic publisher/ queue sender)
消息制造者是一个由session创建的对象,主要工作是发送消息到目的地。
对于一个topic,需要通过TopicSession来创建一个TopicPublisher。代码如下:
对于一个queue,需要通过QueueSession来创建一个QueueSender。代码如下:
6) 消息发送
建立一个TestMessage并且publish 它, 代码:
建立一个TestMessage并且send它, 代码:
7) 下面是一个完成的topic publisher 代码,文件名HelloPublisher.java :
我们知道,使用JMS不仅能发送(send)/发布(publish)消息,也能获得(send)/发布(publish)的消息。在时间方式有良种方法来做:
同步(Synchronously):需要手工的去得到消息,为了得到一个消息客户机调用方法得到消息,直到消息到达或在规定的时间内没有到达而超时。我们在例子中没有说明这部分,大家可以实验一下。
异步(Asynchronously):你需要定义一个消息监听器(MessageListener),实现该接口。当消息达到时,JMS provider通过调用该对象的 onMessage方法来传递消息。
从原则来将,topic和queue都是异步的,但是在这两种目的地中有不同的类和方法。首先,必须定义一个MessageListener接口。
8) 建立一个MessageListener
在建立了你需要的subscriber/receiver,并且登记了监听器后。就可以调用连接的start方法得到JMS provider 发送到的消息了。如果在登记监听器之前调用start方法,很可能会丢失消息。
9) 建立消息消费者
对于topic来讲:
10) 完整的代码,放在文件HelloSubscriber.java中,如下:
11) 编辑、运行程序
直接使用命令(java)
开启命令操作符。设置classpath :
首先运行订阅消息端:java HelloSubscriber
再开启另外一个命令窗口设置classpath :
运行发布消息端:java HelloPublisher
5、补充
在最后我们解释JBoss-specific特性:如何用代码来管理目的地。JBoss各个版本可能不同,但是差别不大。我使用的是jboss3.0.6。
实现这个目的有两种不同的方法,依赖于是否代码是在和JBoss同样的虚拟机还是独立独立的。它们都包括调用一个通过service=DestinationManager 登记的JMX Bean。这个Mbean 有四个方法来管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
在代码中实现管理目的地在影射怎样调用MBean有不同的地方。如果程序虚拟机和Mbean服务器一样,可以直接调用。
建立一个topic 目的地的代码如下:
如果程序和Mbean服务器的虚拟机不同,需要通过一个JMX adapter。一个JMX adapter是一个HTML GUI。用程序通过URL来调用Mbean。代码如下:
编辑命令:
运行命令
当运行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
表明你建立成功。当JBoss关闭重新启动时。该目的地不会在存在。
本章介绍在Jboss中使用Java Messageing Service (JMS). 不是JMS指南,而是JBoss和JMS如何一起使用,如果你想看JMS的介绍,请参考 JMS Specification 或 JMS turorial.
最近随着JBoss版本不断更新,对JMS支持越来越成熟,也造成JBoss各个版本之间的不同。在这里我主要介绍JBoss3.0.X版本。
本章给出的例子比较简单,并指出了如何使用JMS的原理。所有的例子可以通过ant build file 来建立。
为了能建立和运行此例子。我们使用两种方式来进行:一是使用Ant命令,二是使用JAR和JAVA基本命令。必须有下面的环境变量:
JAVA_HOME 安装JDK1.4的目录。
JBOSS_DIST 安装JBoss的目录。
你必须安装JDK, 如果使用Ant必须安装 Ant。可以参考我前面文档的介绍。
2、概述
1) 什么是JMS
JMS是Java API, 允许应用程序来建立、接收和读取消息。程序依靠这些API, 在运行时需要一个JMS实现接口,来提供管理和控制,这被称为JMS provider, 现在有几种不同的JMS Provider; 在JBoss中的叫做JbossMQ。
2) JMS 和J2EE
JMS是在EJB和J2EE框架开发之前进行开发的,所以在JMS说明书中没有涉及到EJB或J2EE。
EJB 和J2EE第一代版本中也没有涉及到JMS,一直到EJB1.1,在生成一个可用Beand的容器provider中JMS也不是必须的API。在J2EE1.2中JMS接口是必需的情况,但并不是非得要包含一个JMS Provider;在EJB2.0和J2EE1.3中又进行改变,应用服务器包含了一个JMS Provider,自从J2EE1。3需要EJB2.0,增加了以下两个JMS特性:
w 一种新Bean类型定义, 也就是消息驱动Beam (MDB), 这种bean做为JMS消息监听者,可以异步地处理JMS消息。
w JMS处理作为资源,来自一个Bean 的JMD 发布(发送)必须能和其他bean的全局事务环境共享。这个需要把JMS认为是一个容器管理资源,象JDBC的连接。
3) JMS 和JBoss
JBoss从2.0版本以后都支持JMS。 在2.1中增加了MDB,从2.4版本开始JMS作为一个事务资源。
JBoss中JMS的体系结构如下:JMS Provider, 叫做JbossMQ 。 是JBoss实现JMS 1.0.2规范的一部分,包括可选部分,象ASF(Application Service Facvility)。 JbossMQ处理和普遍JMS一样:建立 queues (队列)或topic(标题),持久性等。 MDB (Message Driven Beans), 资源适配器。
3、JMS Provider
JBoss有它自己的JMS Provider 叫做JbossMQ。 适合与JMS 1.0.2 的JMS Provider,并且能用在所有平常的JMS程序中。为了清楚在JBoss中JMS是如何工作的,首先要清楚在JMS中涉及到的概念和术语,最好的办法是阅读JMS规范,下面给出了简单的JMS介绍。
1) JMS的简单介绍
当你发送一个消息,你不能直接发送到对此消息感兴趣的接受者。而是你发送到一个目的地。对此消息感兴趣的接受者必须连接到目的地,得到此消息或在目的地设置订阅。
在JMS中有两种域:topics 和queues 。一个消息发送到一个topics ,可以有多个客户端。用topic发布允许一对多,或多对多通讯通道。消息的产生者被叫做publisher, 消息接受者叫做subscriber。
queue 是另外一种方式,仅仅允许一个消息传送给一个客户。一个发送者将消息放在消息队列中,接受者从队列中抽取并得到消息,消息就会在队列中消失。第一个接受者抽取并得到消息后,其他人就不能在得到它。
为了能发送和接收消息,必须得到一个JMS连接。该连接是使用JMS Provider得到连接的,在得到连接之后,建立一个会话(Session)。然后再建立publisher/sender 来发送消息或subscriber/receiver来接收消息。
运行时,如果使用topic 那么publisher 或subscriber 通过一个topic来关联,如果使用queue ,则sender 或receiver通过queue来关联起来。
通常,在JMS框架中运转的方法如下:
(1) 得到一个JNDI初始化上下文(Context);
(2) 根据上下文来查找一个连接工厂TopicConnectFactory/ QueueConnectionFactory (有两种连接工厂,根据是topic/queue来使用相应的类型);
(3) 从连接工厂得到一个连接(Connect 有两种[TopicConnection/ QueueConnection]);
(4) 通过连接来建立一个会话(Session);
(5) 查找目的地(Topic/ Queue);
(6) 根据会话以及目的地来建立消息制造者(TopicPublisher/QueueSender)和消费者(TopicSubscriber/ QueueReceiver).
为了得到一个连接和得到一个目的地(用来关联publisher/sender 或subscriber/receiver),必须用provider-specific参数。
通过JNDI来查找相应的连接工厂或目的地,JNDI适合于任何JMS Provider。但是查找用的名字是provider使用的。因此,在你使用的JMS Provider(其中包括JBossMQ),必须学会如何进行指定。JMS Provider中的任何设置,象连接特性,用到目的地等,在用到的Provider都有明确描述。
2) 配置
当使用一个JMS Provider时,有三个Provider-specific因素:
1、得到一个JNDI初始化上下文
2、用到的连接工厂的名字。
3、对目的地的管理和命名协定。
JBoss同它的JNDI一起执行。为了简单的JMS client, 配置和查找初始化上下文,同实现其他J2EE客户端一样。
JMS-specific 来约束JBoss 的JMS provider (JBossMQ)。JbossMQ是通过xml 文件jbossmq-service.xml进行配置的,该文件放在在server\default\deploy下。
在xml文件中最基本的三件事情:
1、增加一个新的目的地
2、配置用户
3、获得可用连接工厂的名字。
(1) 增加新的目的地
1、在目的地的xml文件在jboss 3.x中是jbossmq-destinations-service.xml(server/../deploy)。在文件中已经存在几个缺省的目的地,所以你比较容易明白怎样增加到文件中。在例子中你需要一个topic目的地 spool,所以增加下面的语句到jbossmq-destinations-service.xml中。这种方式是长久存在的,不随着JBoss服务器关闭而消失。
<mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.mq.destination:service=Topic,name=spool"> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean>
2、另外一种方法是可以通过JMX HTML管理界面。通过http://localhost:8080/jmx-console 来访问。在jboss.mq 下查找service=DestinationManager 的连接。然后在createTopic()或createQueue()来建立,这种方法建立的目的地是临时性的,随着服务器开始存在,当当JBoss 服务器重新启动时,动态建立的目的地将会不存在。在JbossMQ中所有目的地都有一个目的地类型的前缀。对于topic前缀是topic ,对于queues前缀是queue。例如查找一个testTopic目的地,需要查找名字为“topic/testTopic”。
在此种方法中有createTopic()或createQueue()分别有两种方法:一是有两个参数,二是有一个参数的。两个参数分别是:建立的目的地名称和JNDI名称。一个参数的只是目的地名称,对于JNDI名称默认是:[目的地类型(topic/queue) ]/目的地名称。
在这里我们使用的是第一种方法。直接修改jbossmq-destinations-service.xml文件。
(2) 管理用户
在JMS中可能关联一个连接和一个用户,不幸的是没有明确的方法来限制访问JbossMQ或访问特殊的目的地到一个给定的用户。为了给大部分角色,在JbossMQ中不需要建立用户,除非想有一个持久topic订阅者。在这个例子中,用户是必须的。
用户可以直接在文件jbossmq-state.xml(server/../conf)中添加。同样也可以使用JMX HTML管理界面来增加(jboss.mq->service=StateManager->addUser())。
<User> <Name>jacky</Name> <Password>jacky</Password> <Id>DurableSubscriberExample</Id> </User>
(3) 连接工厂
JBossMQ 包括为topic和queue几个不同的连接工厂,每个连接工厂有自己特性。当通过JNDI来查找一个连接工厂时,需要知道此连接工厂的名称。所有可用连接工厂和它们的属性,名称都会在文件jbossmq-service.xml中。
有三种类型连接工厂,依赖的通讯协议如下:
OIL
快速双向scoket通讯协议。它是缺省的。
UIL
超过一个socket协议,可以使用在通过防火墙访问,或者当客户端不能正确的查找到服务器的IP地址。
RMI
最早的协议,是稳定的,但是比较慢。
JVM
在JBoss 2.4之后增加的一个新的连接工厂类型。不需要用socket,当客户端和JbossMQ使用同样虚拟机时,可以使用。
在JBoss2.4.1以后版本中,对于topic- 和 queue-目的地,连接工厂使用同样的名字。下表列出了在JBoss中JMS连接工厂:
引用
目的地类型 JNDI名字 连接工厂类型
Topic/Queue java:/ConnectionFactory JVM
Topic/Queue java:/XAConnectionFactory JVM支持XA事务
Topic/Queue RMIConnectionFactory RMI
Topic/Queue RMIXAConnectionFactory RMI支持XA事务
Topic/Queue ConnectionFactory OIL
Topic/Queue XAConnectionFactory OIL支持XA事务
Topic/Queue UILConnectionFactory UIL
Topic/Queue UILXAConnectionFactory UIL支持XA事务
3) JBoss中高级JMS配置
在上边段落主要描述了和JbossMQ一起实行的基本配置工作。在本段来描述JMS其他配置。
(1) JMS持久性管理
JMS持久性管理(PM)负责存储消息,并且将消息标记为持久,如果服务器发生故障时,能保证消息不会丢失,并允许恢复持久性消息。持久性JMS消息可以使用不同的方法来完成。每个方法有自己优点和缺陷:
引用
PM名字 优点 缺点
File 比较稳定 速度慢
Rollinglogged 速度比较快 此方法比较新,有些地方需要完善
JDBC 对于稳定性和可量测性比较好 必须有JDBC
Logged 速度快 Log files grow without bound, memory management problems- during recovery
在JBoss中缺省的持久性消息管理是File持久性管理。可以改变它,但必须对于一个JMS
Server有且仅有一个持久性管理配置。所以你在JBoss管理界面的jboss.mq – >
service=PersistenceManager 只是看到一个。
持久性管理的配置文件是jbossmq-service.xml。在server\..\deploy下。
为了让大家了解持久性管理的各种方法,我下面来逐个介绍如何配置。
File持久性管理
File持久性管理的概念就是为每一个持久性消息建立一个文件。消息持久性方法不是全部都能使用,但它是比较稳定的。
File持久性管理在JBoss发布时,作为一个缺省的持久性管理。如果你打开jbossmq-service.xml文件,你会看到下面的XML:
<mbean code="org.jboss.mq.pm.file.PersistenceManager" name="jboss.mq:service=PersistenceManager"> <attribute >db/jbossmq/file</attribute> <depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends> </mbean>
当设置Mbean配置时,File持久性管理允许你指定下面的属性:
DataDircetory 是存放持久性消息的路径,会把生成的数据文件放在此目录下。
设置Rollinglogged持久性管理
Rollinglogged持久性管理是比较新的一种持久性消息管理方法,因为它使用日志文件来持续多个消息,所以当建立一个文件时,不需要许多的I/O。
定义Rollinglogged持久性管理:
引用
<mbean code="org.jboss.mq.pm.rollinglogged.PersistenceManager"
name="jboss.mq:service=PersistenceManager">
<attribute >db/jbossmq/file</attribute>
<depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends>
</mbean>
Rollinglogged持久性管理中DataDirctory 存放持久性消息的路径,会把生成的数据文件放在此目录下。
1)设置JDBC持久性管理
JDBC持久性管理使用数据库表来存储消息。需要一个JBoss配置的数据源来访问数据库。具体内容参考jbossmq-service.xml文件中的内容。
2)设置Logged持久性管理
Logged持久性管理是比较早的一个,在JBoss2.4.1以后版本中不在建议使用。现在有其他更好的办法。
4、举例说明
当我们清楚了以后内容后,现在我们来用JBoss实现一个例子来加深对JBoss和JMS的了解。
在上面叙述中,我们知道明确使用JMS provider有三个基本的事情要做:配置JNDI初始化上下文,连接工厂的名字和使用目的地的名字。
当编写产品的最好的事情是不受provider-specific 影响,使代码能在不同的JMS provider之间容易移植。在此这个例子没有聚焦在开发产品上,而是解释如何使用JbossMQ来工作。
1) 初始化上下文
配置JNDI的一个方法是通过属性文件jndi.properties。在这个文件中使用正确的值,并且把它所在的路径包含到classpath中,它比较容获得正确初始化上下文。
jndi.properties文件的内容如下:
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory java.naming.provider.url=localhost:1099 java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
把该文件放置的路径成为你的classpath的一部分。如果你使用这种方法,在初始化上下文时,代码比较简单: Context context = new IntialContext();1
在某些情景下,可能需要手工配置JNDI;例如当运行的类文件中环境已经配置了一个初始化上下文,但不是你想用的上下文时,需要手工来配置一个上下文。设置在哈希表中的几个属性值,并且使用此哈希表来实例化一个上下文。定义语法:
Hashtable props = new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); props.put(Context.PROVIDER_URL, "localhost:1099"); props.put("java.naming.rmi.security.manager", "yes"); props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
2) 查找连接工厂
自有了上下文后,需要查找一个连接工厂。为了查找它,使用一个可用的名字。查找连接工厂的代码如下:
对于一个topic目的地
TopicConnectionFactory topicFactory = (TopicConnectionFactory) context.lookup (“ConnectionFactory”)
Queue 目的地:
QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup (“ConnectionFactory”)
3) 建立连接和会话
在我们有了连接工厂后,建立一个连接,在此连接中建立一个会话。
对于topic代码如下:
//建立一个连接 topicConnection = topicFactory.createTopicConnection(); //建立一个会话 topicSession = topicConnection.createTopicSession(false, //不需要事务 Session.AUTO_ACKNOLEDGE //自动接收消息的收条。 );
对于queue代码如下:
//建立一个连接 queueConnection = queueFactory.createQueueConnection(); //建立一个会话 queueSession = queueConnection .createQueueSession(false, //不需要事务 Session.AUTO_ACKNOLEDGE //自动接收消息的收条。 );
一个会话建立时,配置是否调用事务
在事务Session中,当事务被提交后,自动接收,如果事务回滚,所有的被消费的消息将会被重新发送。
在非事务Session中,如果没有调用事务处理,消息传递的方式有三种:
Session.AUTO_ACKNOWLEDGE :当客户机调用的receive方法成功返回,或当MessageListenser 成功处理了消息,session将会自动接收消息的收条。
Session.CLIENT_ACKNOWLEDGE :客户机通过调用消息的acknowledge方法来接收消息。接收发生在session层。接收到一个被消费的消息时,将自动接收该session已经消费的所有消息。例如:如果消息的消费者消费了10条消息,然后接收15个被传递的消息,则前面的10个消息的收据都会在这15个消息中被接收。
Session.DUPS_ACKNOWLEDGE :指示session缓慢接收消息。
4) 查找目的地
现在我们来介绍建立publishes/sends 或subscribles/receives消息。
下面的代码列出来查找一个目的地:
对于topic 查找一个testTopic目的地
Topic topic = (Topic) context.lookup(“topic/testTopic”);
对于queue 查找一个testQueue目的地
Queue queue= (Queue) context.lookup(“queue/testQueue”);
注意:JbossM的前缀topic/ (queue/)通常被放在topic (queue)名字前面。
在JMS中,当客户机扮演每种角色,象对于topic来将的publisher ,subscriber 或对于queue来将的sender, receiver, 都有自己不同类继承和不同函数。
5) 建立一个消息制造者Message Producer (topic publisher/ queue sender)
消息制造者是一个由session创建的对象,主要工作是发送消息到目的地。
对于一个topic,需要通过TopicSession来创建一个TopicPublisher。代码如下:
TopicPublisher topicPublisher = TopicSession.createPublisher(topic);
对于一个queue,需要通过QueueSession来创建一个QueueSender。代码如下:
QueuePublisher queuePublisher = queueSession.createSender(queue);
6) 消息发送
建立一个TestMessage并且publish 它, 代码:
TextMessage message = topicSession.createTestMessage(); message.setText(msg); topicPublishe.publish(topic, message);
建立一个TestMessage并且send它, 代码:
TextMessage message = queueSession.createTestMessage(); message.setText(msg); queueSender.send(queue, message);
7) 下面是一个完成的topic publisher 代码,文件名HelloPublisher.java :
import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicPublisher; import javax.jms.Topic; import javax.jms.TextMessage; import javax.jms.Session; import javax.jms.JMSException; import java.util.Hashtable; public class HelloPublisher { TopicConnection topicConnection; TopicSession topicSession; TopicPublisher topicPublisher; Topic topic; public HelloPublisher(String factoryJNDI, String topicJNDI) throws JMSException, NamingException { Hashtable props=new Hashtable(); props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory"); props.put(Context.PROVIDER_URL, "localhost:1099"); props.put("java.naming.rmi.security.manager", "yes"); props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming"); Context context = new InitialContext(props); TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(factoryJNDI); topicConnection = topicFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic)context.lookup(topicJNDI); topicPublisher = topicSession.createPublisher(topic); } public void publish(String msg) throws JMSException { TextMessage message = topicSession.createTextMessage(); message.setText(msg); topicPublisher.publish(topic, message); } public void close() throws JMSException { topicSession.close(); topicConnection.close(); } public static void main(String[] args) { try { HelloPublisher publisher = new HelloPublisher( "ConnectionFactory", "topic/testTopic"); for (int i = 1; i < 11; i++) { String msg = "Hello World no. " + i; System.out.println("Publishing message: " + msg); publisher.publish(msg); } publisher.close(); } catch(Exception ex) { System.err.println( "An exception occurred while testing HelloPublisher25: " + ex); ex.printStackTrace(); } } }
我们知道,使用JMS不仅能发送(send)/发布(publish)消息,也能获得(send)/发布(publish)的消息。在时间方式有良种方法来做:
同步(Synchronously):需要手工的去得到消息,为了得到一个消息客户机调用方法得到消息,直到消息到达或在规定的时间内没有到达而超时。我们在例子中没有说明这部分,大家可以实验一下。
异步(Asynchronously):你需要定义一个消息监听器(MessageListener),实现该接口。当消息达到时,JMS provider通过调用该对象的 onMessage方法来传递消息。
从原则来将,topic和queue都是异步的,但是在这两种目的地中有不同的类和方法。首先,必须定义一个MessageListener接口。
8) 建立一个MessageListener
在建立了你需要的subscriber/receiver,并且登记了监听器后。就可以调用连接的start方法得到JMS provider 发送到的消息了。如果在登记监听器之前调用start方法,很可能会丢失消息。
public void onMessage(Message m) { try { String msg = ((TextMessage)m).getText(); System.out.println("HelloSubscriber got message: " + msg); } catch(JMSException ex) { System.err.println("Could not get text message: " + ex); ex.printStackTrace(); } }
9) 建立消息消费者
对于topic来讲:
//建立一个订阅者 topicSubscriber = topicSession.createSubscriber(topic); //设置消息监听器, topicSubscriber.setMessageListener(this) //连接开始 topicConnection.start(); 对于queue来将: //建立一个订阅者 queueReceiver = queueSession.createReceiver(queue); //设置消息监听器, queueReceiver .setMessageListener(this) //连接开始 queueConnection.start();
10) 完整的代码,放在文件HelloSubscriber.java中,如下:
import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.Topic; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.JMSException; public class HelloSubscriber implements MessageListener { TopicConnection topicConnection; TopicSession topicSession; TopicSubscriber topicSubscriber; Topic topic; public HelloSubscriber(String factoryJNDI, String topicJNDI) throws JMSException, NamingException { Context context = new InitialContext(); TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(factoryJNDI); topicConnection = topicFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); topic = (Topic)context.lookup(topicJNDI); topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this); System.out.println( "HelloSubscriber subscribed to topic: " + topicJNDI); topicConnection.start(); } public void onMessage(Message m) { try { String msg = ((TextMessage)m).getText(); System.out.println("HelloSubscriber got message: " + msg); } catch(JMSException ex) { System.err.println("Could not get text message: " + ex); ex.printStackTrace(); } } public void close() throws JMSException { topicSession.close(); topicConnection.close(); } public static void main(String[] args) { try { HelloSubscriber subscriber = new HelloSubscriber( "TopicConnectionFactory", "topic/testTopic"); } catch(Exception ex) { System.err.println( "An exception occurred while testing HelloSubscriber: " + ex); ex.printStackTrace(); } } }
11) 编辑、运行程序
直接使用命令(java)
开启命令操作符。设置classpath :
引用
set classpath=C:\jboss-3.0.6_tomcat-4.1.18\client\jbossall-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\jboss-j2ee.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\jnp-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\log4j.jar;.
首先运行订阅消息端:java HelloSubscriber
再开启另外一个命令窗口设置classpath :
引用
set classpath=C:\jboss-3.0.6_tomcat-4.1.18\client\jbossall-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\jboss-j2ee.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\jnp-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\client\log4j.jar;.
运行发布消息端:java HelloPublisher
5、补充
在最后我们解释JBoss-specific特性:如何用代码来管理目的地。JBoss各个版本可能不同,但是差别不大。我使用的是jboss3.0.6。
实现这个目的有两种不同的方法,依赖于是否代码是在和JBoss同样的虚拟机还是独立独立的。它们都包括调用一个通过service=DestinationManager 登记的JMX Bean。这个Mbean 有四个方法来管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
在代码中实现管理目的地在影射怎样调用MBean有不同的地方。如果程序虚拟机和Mbean服务器一样,可以直接调用。
建立一个topic 目的地的代码如下:
MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next(); server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"), method, new Object[] { “myTopic” }, new String[] { "java.lang.String" });
如果程序和Mbean服务器的虚拟机不同,需要通过一个JMX adapter。一个JMX adapter是一个HTML GUI。用程序通过URL来调用Mbean。代码如下:
import java.io.InputStream; import java.net.URL; import java.net.HttpURLConnection; import javax.management.MBeanServerFactory; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.jms.Topic; import javax.jms.Queue; public class DestinationHelper { static final String HOST = "localhost"; static final int PORT = 8080; static final String BASE_URL_ARG = "/jmx-console/HtmlAdaptor?"; public static void createDestination(Class type, String name) throws Exception { String method = null; if (type == Topic.class) { method = "createTopic"; } else if (type == Queue.class) { method = "createQueue";} invoke(method, name); } public static void destroyDestination(Class type, String name) throws Exception { String method = null; if (type == Topic.class) { method = "destroyTopic"; } else if (type == Queue.class) { method = "destroyQueue";} invoke(method, name); } protected static void invoke(String method, String destName) throws Exception { try { MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next(); invokeViaMBean(method, destName); }catch(Exception ex) { invokeViaUrl(method, destName);} } protected static void invokeViaUrl(String method, String destName) throws Exception { String action = "action=invokeOp&methodIndex=6&name=jboss.mq%3Aservice%3DDestinationManager&arg0=" + destName; String arg = BASE_URL_ARG + action; URL url = new URL("http", HOST, PORT, arg); HttpURLConnection con = (HttpURLConnection)url.openConnection(); con.connect(); InputStream is = con.getInputStream(); java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream(); byte[] buff = new byte[1024]; for(;;) { int size = is.read( buff ); if (size == -1 ) { break; } os.write(buff, 0, size); } os.flush(); if (con.getResponseCode() != HttpURLConnection.HTTP_OK ) { throw new Exception ("Could not invoke url: " + con.getResponseMessage() ); } else { System.out.println("Invoked URL: " + method + " for destination " + destName + "got resonse: " + os.toString()); } } protected static void invokeViaMBean(String method, String destName) throws Exception { MBeanServer server = (MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next(); server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"), method, new Object[] { destName }, new String[] { "java.lang.String" }); } public static void main(String[] args) { try { if (args.length >0){ destroyDestination(Topic.class,"myCreated"); }else { createDestination(Topic.class,"myCreated"); } }catch(Exception ex) { System.out.println("Error in administering destination: " + ex); ex.printStackTrace(); } } }
编辑命令:
引用
javac -classpath C:\jboss-3.0.6_tomcat-4.1.18\client\jbossall-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\lib\jboss-jmx.jar;. DestinationHelper.java
运行命令
引用
java -classpath C:\jboss-3.0.6_tomcat-4.1.18\client\jbossall-client.jar;C:\jboss-3.0.6_tomcat-4.1.18\lib\jboss-jmx.jar;. DestinationHelper
当运行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
表明你建立成功。当JBoss关闭重新启动时。该目的地不会在存在。
相关推荐
【JBoss JMS包详解】 JBoss JMS(Java Message Service)是Red Hat公司开发的JMS实现,它是JBoss Application Server的一部分,提供了一个标准、可靠且面向消息的中间件服务。在分布式环境中,JMS作为异步通信的...
总结来说,JBOSS中的JMS应用实例涉及了JMS的基本概念、JBOSST的配置、代码编写以及具体的应用场景。通过实践这些步骤,开发者可以掌握如何在JBOSST环境中利用JMS进行高效的数据通信。同时,提供的文档和项目文件为...
本文档描述了apache web服务器安装以及常用的编译模式;描述了apache+jboss3.2.6做负载均衡(load balance)的部署细节以及一些常见错误说明;描述了部署jboss3.2.3/3.2.6时一些心得、常用配置项
**JBoss JMS应用构建详解** JBoss是一个开源的应用服务器,它支持Java消息服务(Java Message Service,简称JMS),这是一种标准的API,用于在分布式环境中进行异步通信。JMS允许应用程序创建、发送、接收和读取...
【基于Jboss的JMS编程】是关于Java消息服务(JMS)在Jboss应用服务器上的实现和配置的教程,适合初学者理解JMS的基本概念和应用。JMS是一种标准API,用于在分布式环境中发送、接收和管理消息,提供可靠的数据传输。 ...
**JBoss 7 配置 JMS 知识点详解** JBoss Application Server(简称 JBoss AS)是一款开源的企业级 Java 应用服务器,由 Red Hat 公司维护。JBoss 7 是其一个重要的版本,它引入了许多新特性和性能优化。在 JBoss 7 ...
JBoss自带了一个符合JMS 1.1规范的JMS提供商,叫做JBoss Messaging或JBossMQ。当在JBoss环境中使用JMS API时,实际上是透明地使用了JBoss Messaging引擎。JBoss Messaging完全实现了JMS规范,因此,最好的JBoss ...
### jBoss 3.0快速入门指南核心知识点详解 ...这部分介绍了如何在jBoss中使用JMS API。 以上内容涵盖了jBoss 3.0快速入门所需的核心知识点,希望能够帮助初学者快速上手并掌握该平台的基本使用方法。
【JBoss 服务器下的 JMS 实例】 Java 消息服务 (JMS) 是 Java 平台中用于处理异步消息传递的标准API,它在面向服务架构 (SOA) 中扮演着关键角色,特别是在需要与外部系统进行异步通信的企业环境中。JBoss 服务器...
在JBOSS 环境中配置JMS,在程序中可以通过JNDI 获取连接,如消息启动Bean 就可以通过JNDI获取:@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue ...
【标题】:“(转)JBOSS7.1下开发JMS(HornetQ)的示例DEMO” 在本文中,我们将深入探讨如何在JBoss Application Server 7.1 (JBoss AS 7.1)环境中开发Java消息服务(JMS)应用程序,特别是使用HornetQ作为消息...
4. JMS(Java Message Service):支持异步消息传递。 5. JTA(Java Transaction API):处理分布式事务管理。 总的来说,JBoss 作为一个强大的开源应用服务器,因其免费、高效、易用和丰富的功能,深受开发者喜爱...
它提供了企业级的功能,如EJB(Enterprise JavaBeans)、JMS(Java Message Service)、JTS/JTA(Java Transaction Service / Java Transaction API)、Servlet和JSP(JavaServer Pages)、JNDI(Java Naming and ...
5. **JMS**:JBoss 4.0.5.GA集成了JMS服务,允许应用程序通过消息队列进行异步通信,增强了系统的可扩展性和容错性。 6. **Web容器**:内建Tomcat或Jetty作为HTTP服务器,支持Servlet 2.4和JSP 2.0,提供Web应用的...
1. **JBoss简介**:JBoss是Red Hat公司的一个开源Java EE应用服务器,基于J2EE(Java Platform, Enterprise Edition)标准,提供了一系列的企业级服务,如EJB、JMS、JTA等,用于构建、部署和管理分布式企业级应用。...
JBoss,作为一款开源的应用服务器,是Java EE(现在称为Jakarta EE)应用程序的重要运行环境。它由Red Hat公司维护,提供了对Web服务、EJB(Enterprise JavaBeans)、JMS(Java Message Service)等标准的全面支持。...
- **JMS (Java Message Service)**:介绍JMS的原理及用途,教授如何在JBoss AS 7中实现消息传递服务。 ##### 4. 应用程序开发 - **Web应用程序**:介绍如何基于JBoss AS 7构建Web应用程序,包括使用Servlets、JSP...