`

ActiveMQ持久化

 
阅读更多
一.MQ基本操作

 

MQ中有几个很重要的组件:队列管理器(QueueManager)、队列(Queue)和通道(Channel)。其基本的操作方法如下:

 

创建队列管理器
crtmqm -q QMgrName
-q是指创建缺省的队列管理器

 

删除队列管理器
dltmqm QmgrName

 

启动队列管理器
strmqm QmgrName
如果是启动默认的队列管理器,可以不带其名字

 

停止队列管理器
endmqm QmgrName 受控停止

 

endmqm -i QmgrName 立即停止

 

endmqm -p QmgrName 强制停止

 

显示队列管理器
dspmq -m QmgrName

 

运行MQSeries命令
runmqsc QmgrName
如果是默认队列管理器,可以不带其名字

 

往队列中放消息
amqsput QName QmgrName
如果队列是默认队列管理器中的队列,可以不带其队列管理器的名字

 

从队列中取出消息
amqsget QName QmgrName
如果队列是默认队列管理器中的队列,可以不带其队列管理器的名字

 

启动通道
runmqchl -c ChlName -m QmgrName

 

启动侦听
runmqlsr -t TYPE -p PORT -m QMgrName

 

停止侦听
endmqlsr -m QmgrName

 

MQSeries命令

 

定义死信队列
DEFINE QLOCAL(QNAME) DEFPSIST(YES) REPLACE

 

设定队列管理器的死信队列
ALTER QMGR DEADQ(QNAME)

 

定义本地队列
DEFINE QL(QNAME) REPLACE

 

定义别名队列
DEFINE QALIAS(QALIASNAME) TARGQ(QNAME)

 

远程队列定义
DEFINE QREMOTE(QRNAME) +
RNAME(AAA) RQMNAME(QMGRNAME) +
XMITQ(QTNAME)

 

定义模型队列
DEFINE QMODEL(QNAME) DEFTYPE(TEMPDYN)

 

定义本地传输队列
DEFINE QLOCAL(QTNAME) USAGE(XMITQ) DEFPSIST(YES) +
INITQ(SYSTEM.CHANNEL.INITQ)+
PROCESS(PROCESSNAME) REPLACE

 

创建进程定义
DEFINE PROCESS(PRONAME) +
DESCR(‘STRING')+
APPLTYPE(WINDOWSNT)+
APPLICID(' runmqchl -c SDR_TEST -m QM_ TEST')
其中APPLTYPE的值可以是:CICS、UNIX、WINDOWS、WINDOWSNT等

 

创建发送方通道
DEFINE CHANNEL(SDRNAME) CHLTYPE(SDR)+
CONNAME(‘100.100.100.215(1418)') XMITQ(QTNAME) REPLACE
其中CHLTYPE可以是:SDR、SVR、RCVR、RQSTR、CLNTCONN、SVRCONN、CLUSSDR和CLUSRCVR。

 

创建接收方通道
DEFINE CHANNEL(SDR_ TEST) CHLTYPE(RCVR) REPLACE

 

创建服务器连接通道
DEFINE CHANNEL(SVRCONNNAME) CHLTYPE(SVRCONN) REPLACE

 

显示队列的所有属性
DISPLAY QUEUE(QNAME) [ALL]

 

显示队列的所选属性
DISPLAY QUEUE(QNAME) DESCR GET PUT
DISPLAY QUEUE(QNAME)MAXDEPTH CURDEPTH

 

显示队列管理器的所有属性
DISPLAY QMGR [ALL]

 

显示进程定义
DISPLAY PROCESS(PRONAME)

 

更改属性
ALTER QMGR DESCR(‘NEW DESCRIPTION')
ALTER QLOCAL(QNAME) PUT(DISABLED)
ALTER QALIAS(QNAME) TARGQ(TARGQNAME)

 

删除队列
DELETE QLOCAL(QNAME)
DELETE QREMOTE(QRNAME)

 

清除队列中的所有消息
CLEAR QLOCAL(QNAME)

 


二.配置一个能够通信的远程连接
以上讲述了MQ的基本命令操作,但只知道这些是没有实际意义的。MQ的最终目的是实现远程通信,所以下面就以一个具体的例子来说明如何实现远程连接。这个例子的目的是建立可以实现消息传递的一对MQ服务器,它们分别基于NT和UNIX平台。
首先在NT端建一队列管理器
crtmqm -q QM_NT
启动队列管理器
strmqm QM_NT
运行MQ控制台命令
runmqsc QM_NT
创建死信队列
DEFINE QL(NT.DEADQ) DEFPSIST(YES) REPLACE
更改队列管理器属性,设置其死信队列
ALTER QMGR DEADQ(NT.DEADQ)
创建进程定义
DEFINE PROCESS(P_NT)+
APPLTYPE(WINDOWSNT)+
APPLICID(' runmqchl -c SDR_NT -m QM_NT')
创建本地传输队列
DEFINE QL(QT_NT) USAGE(XMITQ) DEFPSIST(YES) +
INITQ(SYSTEM.CHANNEL.INITQ)+
PROCESS(P_NT) REPLACE
创建远程队列定义,对应于UNIX机器上的本地队列Q_UNIX,传输队列为QT_NT
DEFINE QREMOTE(QR_NT)+
RNAME(Q_UNIX) RQMNAME(QM_UNIX)+
XMITQ(QT_NT)
创建发送方通道,其传输队列为QT_NT,远程主机地址为10.10.10.2,侦听端口为1414
DEFINE CHANNEL(SDR_NT) CHLTYPE(SDR)+
CONNAME(‘10.10.10.2(1414)') XMITQ(QT_NT) REPLACE
创建服务器连接通道
DEFINE CHANNEL(S_NT) CHLTYPE(SVRCONN) REPLACE

 

在UNIX端创建队列管理器
crtmqm -q QM_UNIX
启动队列管理器
strmqm QM_UNIX
添加侦听程序
修改/etc/services文件,加入一行:
MQSeries 1414/tcp #MQSeries channel listener
修改/etc/inetd.conf文件,加入一行(启动侦听程序)
MQSeries stream tcp nowait mqm /usr/lpp/mqm/bin/amqcrsta amqcrsta -m
QM_UNIX
运行以下命令,以使修改起作用
refresh -s inetd

 

运行MQ控制台命令
runmqsc QM_UNIX
创建死信队列
DEFINE QL(UNIX.DEADQ) DEFPSIST(YES) REPLACE
更改队列管理器属性,设置其死信队列
ALTER QMGR DEADQ(UNIX.DEADQ)
创建接收方通道,其名字必须与远程发送方相同
DEFINE CHANNEL(SDR_NT) CHLTYPE(RCVR) REPLACE
创建本地队列
DEFINE QL(Q_UNIX) DEFPSIST(YES) REPLACE
创建服务器连接通道
DEFINE CHANNEL(S_UNIX) CHLTYPE(SVRCONN) REPLACE

 

经过以上操作之后,远程连接的配置工作完成。接下来需要验证配置是否正确。
在NT端启动发送方通道
runmqchl -c SDR_NT -m QM_NT 或 start chl(SDR_NT)
从NT端发送消息到UNIX端
amqsput QR_NT QM_NT
在UNIX端接收消息
/usr/mqm/samp/bin/amqsget Q_UNIX QM_UNIX

 

若能收到消息,说明配置成功。

 

另,在NT下一般情况下在建立队列管理器时会自动建立侦听器,启动队列管理器时则会自动启动侦听程序。当然也可以手动配置侦听程序。
修改/winnt/system32/drivers/etc/services文件,在文件中加入一行:
MQSeries 1414/tcp #MQSeries channel listener
启动侦听程序
runmqlsr -t tcp -p 1414 -m QM_NT

 

以上说明了怎样建立简单的单向传输网络。消息从NT端传送到UNIX端。建立从UNIX端到NT端的远程连接和以上相仿,要建立双向的传输网络也是同样的道理。

 

三.配置JNDI
用JMS实现消息的发送和接收时,经常会用到JNDI。因为JNDI这种方式比较灵活,对于编程也比较简单。
在安装了MQSeries Client for
Java之后,在/java/bin目录下找到JMSAdmin.config文件。该文件主要用来说明Context的存储方式及存储地址,对应于文件中的两个参数INITIAL_CONTEXT_FACTORY和PROVIDER_URL。典型的JMSAdmin.config文件内容如下:

 

#INITIAL_CONTEXT_FACTORY=com.sun.jndi.ldap.LdapCtxFactory
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.ejs.ns.jndi.CNInitialContextFactory
#
#PROVIDER_URL=ldap://polaris/o=ibm,c=us
PROVIDER_URL=file:/d:/temp
#PROVIDER_URL=iiop://localhost/
#
SECURITY_AUTHENTICATION=none

 

INITIAL_CONTEXT_FACTORY表示JMSAdmin
Tool使用的服务提供商。当前有三种受支持的值。com.sun.jndi.ldap.LdapCtxFactory用于LDAP,如果使用它就必须安装一个LDAP服务器。com.sun.jndi.fscontext.RefFSContextFactory用于文件系统上下文,它只需要使用者提供存放上下文的文件路径。com.ibm.ejs.ns.jndi.CNInitialContextFactory是专门为websphere提供的,它需要和websphere的CosNaming资源库一起使用。
PROVIDER_URL表示会话初始上下文的URL,由JMSAdmin
tool实现的所有JNDI操作的根。它和INITIAL_CONTEXT_FACTORY一一对应。

 

ldap://hostname/contextname 用于LDAP
file:[drive:]/pathname 用于文件系统上下文
iiop://hostname[:port]/[?TargetContext=ctx] 用于访问websphere
CosNaming名称空间

 

最后还有一个参数SECURITY_AUTHENTICATION,用于说明JNDI是否把安全性凭证传递给了您使用的服务供应商。只有当使用了LDAP服务供应商时,才使用此参数。此参数有三个值,none(匿名认证)、simple(简单认证)和CRAM-MD5认证机制。如果没有提供有效值,缺省值为none。

 

确认配置文件之后,可以在/java/bin目录下启动JMSAdmin控制台。也可以在任何目录下用下面的命令来启动控制台:
JMSAdmin -cfg MQ_JAVA_INSTALL_PATH/java/bin/JMSAdmin.config
其中MQ_JAVA_INSTALL_PATH为MQSeries Client for Java安装的根目录。
若启动失败,则好好检查一下您的环境变量是否设置正确。根据我个人的经验,除了把com.ibm.mq.jar和com.ibm.mqjms.jar加入到环境变量外,还要把fscontext.jar和providerutil.jar加入到环境变量。
进入JMSAdmin控制台后,您可以自由定义sub context。对于子上下文的操作,主要有一下命令:
display ctx
define ctx(ctxname)
change ctx(ctxname)
change ctx(=up)
change ctx(=init)
delete ctx(ctxname)

 

当然,在这里的主要任务并非是用来定义sub context,而是用来定义以下几个对象:
MQQueueConnectionFactory
MQTopicConnectionFactory
MQQueue
MQTopic
(还有其它的一些对象,如MQXAQueueConnectionFactory等,不常用到,在此不作说明。)
可以使用很多动词来操纵目录名称空间中的受管理对象。ALTER、DEFINE、DISPLAY、DELETE、COPY和MOVE,它们的用法都算比较简单,这里只列举一二以作说明。
例一:定义一QueueConnectionFactory,连接主机10.10.10.18,端口1414
DEFINE QCF(EXAMPLEQCF)+
DESC(Example Queue Connection Factory)+
TRAN(CLIENT)+
HOST(10.10.10.18)+
QMGR(QM_EXAMPLE)+
CHAN(S_EXAMPLE)+
PORT(1414)+
CCSID(1381)

 

例二:定义一Queue,其对应于MQ中的Q_EXAMPLE
DEFINE Q(EXAMPLEQL)+
DESC(Local queue)+
QMGR(QM_EXAMPLE)+
QUEUE(Q_EXAMPLE)+
CCSID(1381)

 

四.用JMS实现MQ编程
上面我们说明了怎样用JMSAdmin
Tool定义MQ对象的上下文。我们的最终目的是要用JMS来实现MQ编程,以实现在程序中对MQ队列进行收、发消息。所以,下面我们将重点讨论一下MQ的JMS实现。
如果您对JMS编程很熟悉,那么您也就会用JMS来实现MQ编程,因为用JMS来编写MQ程序与编写一般的JMS程序没有太大的差别。举个例子,当我们想发送一条消息到MQ的队列中,再从该队列中取回消息时,我们编程时主要有四个步骤。首先我们要初始化在程序中要用到的对象,然后才可以发送消息到队列中去,再就是收取消息了,最后要清除那些永久对象。这些都和普通的JMS程序相当。程序的源代码如下:

 

import java.util.Hashtable;
import javax.jms.*;
import javax.naming.*;
import javax.naming.directory.*;
public class sample {
protected QueueConnectionFactory factory=null;
protected QueueConnection connection;
protected QueueSession queueSession;
protected TextMessage outMessage;
protected QueueSender queueSender;
protected QueueReceiver queueReceiver;
public static final String qcfLookup="EXAMPLEQCF";
public static final String qLookup="EXAMPLEQL";
public static final String icf = 
"com.sun.jndi.fscontext.RefFSContextFactory";
public String url ="file:/d:/temp";
public void sampleInit() throws Exception {
Hashtable environment 
= new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY, icf);
environment.put(Context.PROVIDER_URL, url);
environment.put(Context.REFERRAL, 
"throw");
Context ctx
=new InitialDirContext(environment);
factory 
= (QueueConnectionFactory)ctx.lookup(qcfLookup);
Queue q1
=null;
q1
=(Queue)ctx.lookup(qLookup);
connection 
= factory.createQueueConnection();
queueSession 
= connection.createQueueSession(false
Session.AUTO_ACKNOWLEDGE);
queueSender 
= queueSession.createSender(q1);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
outMessage 
= queueSession.createTextMessage();
queueReceiver 
= queueSession.createReceiver(q1);
connection.start();
}

public void sendMessageOut(String message) throws JMSException {
outMessage.setText(message);
queueSender.send(outMessage);
}

public String receiveMessage() throws Exception{
return ((TextMessage)queueReceiver.receive()).getText();
}

public void sampleClose() throws JMSException {
queueSession.close();
connection.close();
}

public static void main(String[] args){
String rec;
sample sp 
= new sample();
try {
sp.sampleInit();
sp.sendMessageOut(
"Hello World!");
java.lang.Thread.sleep(
4000);
rec
=sp.receiveMessage();
System.out.println(
"Receive text is : "+rec);
sp.sampleClose();
}
catch(Exception e) {
e.printStackTrace();
}

}

}

 

五.远程管理
MQ在WINDOWS平台下具有图形化管理界面,但在UNIX平台下却只能通过命令行来进行操作。这样就给使用者带来很大的不便。我们都希望能通过图形界面来进行管理配置。为了实现我们的想法,我们就必须建立远程管理。
实现远程管理有以下几个步骤:
1.被管理队列管理器上的命令队列SYSTEM.ADMIN.COMMAND.QUEUE存在并可用。对于MQ 2版本应执行
amqscoma.tst 脚本来创建。
2.使用strmqcsv命令来启动被管理队列管理器上的命令服务器。
3.确定被管理队列管理器上的服务器连接通道SYSTEM.ADMIN.SVRCONN是否存在,如果不存在则创建它。
4.一般Unix、Linux平台中MQ默认的字符集为819,而Windows平台为1381,所以你必须改变其字符集,使两边的字符集相同。一般改被管理的字符集。
5.如果被管理队列管理器上的操作用户与管理队列管理器上的操作用户不同,那么你首先要确认管理队列管理器上的操作用户在被管理队列管理器上存在并且有管理MQ的权限,再者,你需要修改服务器连接通道SYSTEM.ADMIN.SVRCONN的MCAUSER属性为管理队列管理器上的操作用户。
6.启动被管理队列管理器上的侦听器。

 

做完这些工作之后,直接在管理队列管理器的MQ管理工具中显示被管理队列管理器即可。然后你就可以象操作本地队列管理器一样,在被管理队列管理器上定义你需要的MQ对象。

六.通道维护
在配置远程连接的时候,我们曾经创建过进程定义。那我们为什么要去创建进程定义呢?这就涉及MQ通道维护的概念。
通道长时间没有消息触发就会自动断开连接,不再保持运行状态。时间的长短可以由自己设定,默认值为6000秒。消息请求再次来临的时候,就必须再次启动通道。有些通道,如服务器连接通道、接收方通道等是自动触发启动的。当消息请求发送到通道后,通道立即启动,进入运行状态。但也有一些通道不会自动启动,最典型的就是发送方通道。当有消息请求需要使用通道进行消息传递的时候,发送方通道也不会自动启动并把消息发送到远程队列,而是把消息留在了与其相关联的传输队列中。
但是,在实际应用中我们又不可能每过一段时间去启动一次通道,或当有消息来再去启动通道。那应该怎么办?首先我们创建一个进程定义,这个进程定义的目的就是用来启动发送方通道。然后我们在传输队列的进程名称属性栏指定刚才定义的进程定义名称,再把触发器控制开关打开。这样,当有消息进入传输队列后,传输队列的触发器会启动触发执行指定的进程,从而启动发送方通道,把消息传输到远程队列中去。

分享到:
评论

相关推荐

    activemq持久化jdbc所需jar包.zip

    标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...

    ActiveMQ订阅模式持久化实现

    **ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...

    ActiveMQ持久化机制代码实例

    在本文中,我们将深入探讨ActiveMQ的持久化机制,并通过代码实例来展示其工作原理。 ActiveMQ的持久化机制是为了确保在系统崩溃或重启后,未被消费的消息仍然能够被恢复并继续处理。这主要涉及到两个关键概念:非...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    activeMQ mysql 持久化

    标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...

    activemq消息持久化所需Jar包

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它支持多种协议,如AMQP、STOMP、MQTT等,且提供了消息持久化功能,确保在系统故障后仍能恢复消息,保持数据完整性。本主题主要围绕“activemq消息持久化所需Jar包...

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...

    activeMQ使用JDBC所需要的jar包

    在ActiveMQ中,为了实现消息持久化,我们通常会利用JDBC(Java Database Connectivity)来存储消息数据。这确保了即使在服务器重启或故障后,消息依然能够被恢复,从而保持系统的高可用性和可靠性。本主题将详细讲解...

    ActiveMQ开发规范及方案

    持久化是指ActiveMQ对消息的持久化,即ActiveMQ将消息保存到存储设备中,以便在系统重启后可以恢复消息。ActiveMQ提供了多种持久化方式,例如jdbc、kahadb等。 介绍 持久化是 ActiveMQ 的一个重要特性,用于确保...

    ActiveMQ中Topic持久化Demo

    本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    activemq新手大全

    3. **activemq持久化机制**:activemq通过将消息写入磁盘来实现持久化,即使服务器重启,未消费的消息也能被重新加载。 总结起来,activemq作为强大的消息中间件,提供了一整套解决方案,帮助开发者构建可靠、高效...

    activemq-oracle.rar

    描述中提到的“ActiveMQ持久化所需jar包”意味着这些jar文件是用于确保ActiveMQ的消息持久化的关键组件。持久化是确保即使在系统崩溃或断电后,消息仍然能够被正确处理的重要特性。以下是每个jar文件的功能: 1. **...

    ActiveMQ的持久化(数据库)[归类].pdf

    4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...

    activemq-store-jdbm-2.0.jar.zip

    ActiveMQ Store JDBM 2.0 是一个专门为 Apache ActiveMQ 开发的持久化存储解决方案,它基于 JDBM 库,用于在Java应用程序中实现高效的数据存储。在本文中,我们将深入探讨 ActiveMQ Store JDBM 2.0 的核心概念、功能...

    activemq-store-jdbm-1.2.jar.zip

    ActiveMQ Store JDBM 1.2 是一个专门为 Apache ActiveMQ 集成设计的持久化存储解决方案,它利用 JDBM(Java Database Minimal)库来提供高效、可靠的存储服务。在这个版本中,我们主要关注的是 activemq-store-jdbm-...

    activemq-store-journal-2.1.jar.zip

    这里的“Store”指的是ActiveMQ的数据存储,而“Journal”则指日志记录,它是ActiveMQ持久化消息和事务的重要组件。这个特定版本(2.1)的jar文件是为了解决特定版本的ActiveMQ系统中关于存储和日志的依赖需求。 ...

Global site tag (gtag.js) - Google Analytics