`

使用JMS实现MQ消息处理

阅读更多
MQ针对Java语言提供了两种接口来实现消息的发送和接收,这两种接口分别是:

Ø         Java

Ø         JMS

在北京地税财税库行横向联网项目中采用了MQ作为消息中间件,消息的发送和接收采用的是Java接口,在本文中将说明如何采用JMS实现MQ消息发送和接收功能。

2       JMS实现 MQ消息处理技术说明

如果使用JMS来访问MQ,则应当使用一个目录服务器来实现MQ连接工厂和队列资源的注册,以供客户端查找,客户端的在获取JMS资源的应选择LDAP。





3       环境准备
本次测试采用如下软件进行测试

Ø         MQ 6.0 for windows

Ø         IBM Tivoli Directory Server 6.0 (LDAP)

Ø         WebLogic 10.1

4       创建MQ对象
进入到MQ队列管理器创建中创建队列管理器和队列

1)  创建队列管理器QM_YFZX

2)  在队列管理器中创建队列QUEUE_TEST

5       LDAP配置
在TDS中创建一个LDAP分支dc=liutg用于MQ JMS注册使用,其访问的URL为:ldap://192.100.8.244:389/dc=liutg。

使用MQ提供的JMSAdmin工具实现连接工厂和队列的注册,步骤如下

1)      进入到%MQ%\Java\bin目录

2)      编辑JMSAdmin.config文件,按照下面内容进行修改

INITIAL_CONTEXT_FACTORY=com.sun.jndi.ldap.LdapCtxFactory

PROVIDER_URL=ldap://192.100.8.244:389/dc=liutg

SECURITY_AUTHENTICATION=simple

PROVIDER_USERDN=cn=root

PROVIDER_PASSWORD=admin123

3)      运行JMSAdmin命令





注册队列连接工厂

Def qcf(ConnectionFactory-mq) qmgr(QM_YFZX)





注册队列

Def q(Queue-mq)  qmgr(QM_YFZX) queue(QUEUE_TEST)









查看注册上下文:

Dis ctx





查看队列连接工厂

Dis qcf(cn= ConnectionFactory-mq)





查看队列

Dis q(cn=Queue-mq)






6       使用JMS发送消息
6.1    处理
1)获取上下文

    Hashtable<String,String> env = new Hashtable<String,String>();

    env.put(Context.INITIAL_CONTEXT_FACTORY, “com.sun.jndi.ldap.LdapCtxFactory”);

    env.put(Context.PROVIDER_URL, “ldap://192.100.8.244:389/dc=liutg”);

Context ctx = new InitialContext(env);

QueueConnectionFactory qconFactory =

(QueueConnectionFactory) ctx.lookup(“cn=ConnectionFactory-mq”);

       QueueConnection qcon = qconFactory.createQueueConnection();

    QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    Queue queue = (Queue) ctx.lookup(queueName);

    QueueSender qsender = qsession.createSender(queue);


    TextMessage msg = qsession.createTextMessage();

qcon.start();

msg.setText(“hello …”);

       qsender.send(msg);

      

       qsender.close();

    qsession.close();

    qcon.close();













6.2    示例
public class LdapQueueSend

{

  // Defines the JNDI context factory. LDAP

  public final static String JNDI_FACTORY="com.sun.jndi.ldap.LdapCtxFactory";






  // Defines the JMS context factory.

  public final static String JMS_FACTORY="cn=ConnectionFactory-mq";






  // Defines the queue.

  public final static String QUEUE="cn=Queue-mq";






  private QueueConnectionFactory qconFactory;

  private QueueConnection qcon;

  private QueueSession qsession;

  private QueueSender qsender;

  private Queue queue;

  private TextMessage msg;





  /**

   * Creates all the necessary objects for sending

   * messages to a JMS queue.

   *

   * @param ctx JNDI initial context

   * @param queueName name of queue

   * @exception NamingException if operation cannot be performed

   * @exception JMSException if JMS fails to initialize due to internal error

   */

  public void init(Context ctx, String queueName)

    throws NamingException, JMSException

  {

    qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

    qcon = qconFactory.createQueueConnection();

    qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    queue = (Queue) ctx.lookup(queueName);

    qsender = qsession.createSender(queue);

    msg = qsession.createTextMessage();

    qcon.start();

  }





  /**

   * Sends a message to a JMS queue.

   *

   * @param message  message to be sent

   * @exception JMSException if JMS fails to send message due to internal error

   */

  public void send(String message) throws JMSException {

    msg.setText(message);

    msg.setJMSCorrelationID("Req");

    qsender.send(msg);

  }





  /**

   * Closes JMS objects.

   * @exception JMSException if JMS fails to close objects due to internal error

   */

  public void close() throws JMSException {

    qsender.close();

    qsession.close();

    qcon.close();

  }

/** main() method.

  *

  * @param args WebLogic Server URL

  * @exception Exception if operation fails

  */

  public static void main(String[] args) throws Exception {

       //weblogic 10  FILE

       InitialContext ic = getInitialContext("ldap://192.100.8.244:389/dc=liutg");


    LdapQueueSend qs = new LdapQueueSend();

    qs.init(ic, QUEUE);

    readAndSend(qs);

    qs.close();

  }





  private static void readAndSend(LdapQueueSend qs)

    throws IOException, JMSException

  {

    BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));

    String line=null;

    boolean quitNow = false;

    do {

      System.out.print("Enter message (\"quit\" to quit): \n");

      line = msgStream.readLine();

      if (line != null && line.trim().length() != 0) {

        qs.send(line);

        System.out.println("JMS Message Sent: "+line+"\n");

        quitNow = line.equalsIgnoreCase("quit");

      }

    } while (! quitNow);





  }





  private static InitialContext getInitialContext(String url)

    throws NamingException

  {

    Hashtable<String,String> env = new Hashtable<String,String>();

    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);

    env.put(Context.PROVIDER_URL, url);

    return new InitialContext(env);

  }





}





7       使用JMS接收消息




package com.syax.study.jms;





import java.util.Hashtable;

import javax.jms.*;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

import javax.transaction.UserTransaction;





/**

* This example shows how to establish a connection to

* and receive messages from a JMS queue in a client-demarcated

* transaction.  The classes in this package operate on the same

* JMS queue. Run the classes together to witness messages being

* sent and received, and to browse the queue for messages.

*

* @author Copyright (c) 1999-2006 by BEA Systems, Inc. All Rights Reserved.

*/

public class LdapQueueReceiveInTx

{

// Defines the JNDI context factory.

         public final static String JNDI_FACTORY="com.sun.jndi.ldap.LdapCtxFactory";






  // Defines the JMS connection factory for the queue.

  public final static String JMS_FACTORY="cn=ConnectionFactory-mq";






  // Defines the queue.

  public final static String QUEUE="cn=Queue-mq";





  private QueueConnectionFactory qconFactory;

  private QueueConnection qcon;

  private QueueSession qsession;

  private QueueReceiver qreceiver;

  private Queue queue;

//  private UserTransaction utx;





/**

  * Receives message interface.

  */

  public void receiveMessages() throws Exception {

    Message msg = null;

    String msgText = "";





    try {

      // Set transaction timeout to 30 minutes.

//      utx.setTransactionTimeout(1800);

//      utx.begin();

      System.out.println("TRANSACTION BEGUN");

      do {

        msg = qreceiver.receive();

      

        if (msg != null) {

          if (msg instanceof TextMessage) {

            msgText = ((TextMessage)msg).getText();

          } else {

            msgText = msg.toString();

          }

          System.out.println("Message Received: "+ msgText );

          if (msgText.equalsIgnoreCase("rollback")) {

//              utx.rollback();

              System.out.println("TRANSACTION RollBacked");

            }

          if (msgText.equalsIgnoreCase("quit")) {

//            utx.commit();

            System.out.println("TRANSACTION COMMITTED");

          }

        }

      } while(msg != null && ! msgText.equalsIgnoreCase("quit"));

    } catch (JMSException jmse) {

      System.out.println("Error receiving JMS message: "+jmse);

      System.err.println("An exception occurred: "+jmse.getMessage());

      throw jmse;

    }

  }





  /**

   * Creates all the necessary objects for receiving

   * messages from a JMS queue.

   *

   * @param   ctx     JNDI initial context

   * @param queueName     name of queue

   * @exception NamingException operation cannot be performed

   * @exception JMSException if JMS fails to initialize due to internal error

   */

  public void init(Context ctx, String queueName)

    throws NamingException, JMSException

  {

    qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

    qcon = qconFactory.createQueueConnection();

    qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    //utx = (UserTransaction) ctx.lookup("javax.transaction.UserTransaction");

    queue = (Queue) ctx.lookup(queueName);

   

//    qreceiver = qsession.createReceiver(queue);

    qreceiver = qsession.createReceiver(queue,"JMSCorrelationID='Req'");

    qcon.start();

  }





  /**

   * Closes JMS objects.

   * @exception JMSException if JMS fails to close objects due to internal error

   */

  public void close() throws JMSException {

    qreceiver.close();

    qsession.close();

    qcon.close();

  }





/**

  * main() method.

  *

  * @param args  WebLogic Server URL

  * @exception  Exception if execution fails

  */

  public static void main(String[] args) throws Exception {





    InitialContext ic = getInitialContext("ldap://192.100.8.244:389/dc=liutg");


    LdapQueueReceiveInTx qr = new LdapQueueReceiveInTx();

    qr.init(ic, QUEUE);





    System.out.println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");





    qr.receiveMessages();

    qr.close();

  }





  private static InitialContext getInitialContext(String url)

    throws NamingException

  {

    Hashtable<String,String> env = new Hashtable<String,String>();

    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);

    env.put(Context.PROVIDER_URL, url);

    return new InitialContext(env);

  }





}

















8       与WebLogic集成
WebLogic与MQ集成有两种方法,一种是通过JMS Bridge实现,这种方法是将消息从MQ队列中放入到WebLogic Server中的队列,由于MQ优于WebLogic server自带的队列,这种方法MQ不能有效发挥其作用并且额外增加了WebLogic Server的处理量,因此不做考虑;第二种方法为创建一个外部的JMS Server,具体方法如下,

1)  设置WebLogic ClassPath。将MQ的java类包拷贝到WebLogic Domain的Lib目录下,启动WebLogic,进入WebLogic控制台

2)  创建一个JMS Module

3)  在JMS Module中创建一个Foreign Server

JNDI Initial Context Factory:  com.sun.jndi.ldap.LdapCtxFactory

JNDI Connection URL    ldap://192.100.8.244:389/dc=liutg

4)  在新创建的Foreign Server中

创建一个ForeignDestination,

Local JNDI Name:Queue-mq


Remote JNDI Name:cn=Queue-mq


创建一个ConnectionFactory

Local JNDI Name:ConnectionFactory-mq


Remote JNDI Name:cn= ConnectionFactory-mq


       

       这样,WebLogic就可以使用本地定义的ConnectionFactory和Queue来访问MQ。发送消息采用JMS发送,接收消息可以采用MDB实现。




分享到:
评论

相关推荐

    编码实现MQ连接池实现JMS消息发送连接管理

    本篇文章将深入探讨如何利用Java JMS实现MQ连接池,以优化资源管理和提高系统性能。 首先,我们需要理解JMS的基本概念。JMS提供两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe...

    jms远程IBM MQ 收发消息

    本教程将深入探讨如何使用JMS客户端模式来实现与IBM MQ的远程通信,包括同步和异步的消息收发处理。 首先,理解JMS的基本概念至关重要。JMS提供两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅...

    JMS调用IBM MQ监听模式

    标题“JMS调用IBM MQ监听模式”涉及的是Java消息服务(Java Message Service,简称JMS)与IBM ...通过以上知识点的学习和实践,开发者能够有效地利用JMS和IBM MQ进行消息传递,实现高可用、高性能的分布式系统通信。

    WebsphereMQ.rar_JMS MQ_MQ_MQ JMS_java mq jms_websphereMQ downloa

    在本场景中,"用jms向WebSphere mq里发送消息"是指使用JMS接口来与WebSphere MQ进行交互,实现消息的发布和订阅。 首先,我们需要理解JMS的核心概念。JMS提供两种消息模型:点对点(Point-to-Point)和发布/订阅...

    基于soap over jms 的websphere mq与axis2的实现

    总结起来,这个主题涵盖了如何在企业级环境中使用Web服务和消息中间件进行通信,通过SOAP over JMS实现可靠的数据传输,以及如何在Websphere MQ和Axis2之间建立桥梁,以满足高效、稳定的服务交互需求。学习和理解...

    mq、jms消息处理jar包

    在“mq、jms消息处理jar包”中,包含的jar文件通常是为了让Java应用能够与IBM MQ交互,使用JMS API。这些jar包可能包括以下部分: 1. `com.ibm.mq.allclient.jar`:这是IBM MQ客户端库的核心,包含了与MQ服务器通信...

    MQ、JMS以及ActiveMQ关系的理解

    总的来说,MQ、JMS以及ActiveMQ是现代分布式系统和企业级应用中不可或缺的组件,它们提供了灵活、可靠和高效的消息传递机制,使开发者能够在不同应用程序和服务之间实现有效的解耦合、异步处理和消息传递。...

    JMS获取与发送MQ信息 MQ命令

    4. **死信队列**:死信队列处理无法投递的消息,可以通过`DEFINE QLOCAL (QNAME) DEFPSIST (YES) REPLACE`定义本地死信队列,并使用`ALTER QMGR DEADQ (QNAME)`设定队列管理器的死信队列。 5. **远程队列和模型队列...

    IBMMQ消息队列源码

    - IBM MQ使用队列模型,其中生产者将消息放入队列,消费者从队列中取出消息进行处理。 2. **JMS(Java Message Service)**: - JMS是Java平台上的API,用于与消息中间件交互。在IBM MQ中,JMS接口提供了发送和...

    Spring Boot + RWD + JMS/MQ

    JMS(Java Message Service)和MQ(Message Queue)是消息中间件的两种形式,用于在分布式系统中异步处理通信。它们允许应用程序之间通过消息传递进行解耦,提高系统的可靠性和可扩展性。Spring Boot提供了对JMS的...

    spring下实现MQ

    本文将深入探讨如何使用Spring结合MQ6.0来实现JMS异步通信,并比较Spring与传统JMS实现的差异。 首先,Spring框架的优势在于其依赖注入(IOC)机制,它使得对象的创建和管理变得更加灵活。通过XML配置文件,我们...

    Spring-JMS把企业消息处理变容易.doc

    它通过提供一套抽象和模板类,使得开发者能够更加便捷地使用Java消息服务(JMS),并与各种JMS提供者,如IBM的WebSphere MQ进行集成。本文将深入探讨Spring JMS的核心功能,并通过一个点对点(P2P)的消息系统示例来...

    MQ发送接收消息完整版本下载

    在这个"MQClient"压缩包中,包含了实现MQ发送和接收消息功能的源代码。开发者可以通过这个工具学习如何在Java中使用JMS API创建客户端,连接到MQ服务器,发送消息,并接收来自队列或主题的消息。具体而言,以下几个...

    基于WebSphere MQ发送消息的简单JMS实例

    本文将深入探讨一个基于WebSphere MQ的简单Java Message Service (JMS) 实例,帮助你理解如何在实际项目中实现消息的发送。 首先,我们需要了解JMS。JMS是Java平台上的一个标准接口,它定义了用于在分布式环境中...

    [重要]基于Websphere MQ持久化消息实现异步转同步—方案二

    标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循Java消息服务...了解这些知识点后,你可以构建一个健壮的消息驱动系统,利用ActiveMQ和Spring JMS实现组件间的异步通信,提高系统的稳定性和可扩展性。

    Spring + JMS + MQ Connection

    Spring与JMS结合,可以方便地实现消息驱动的架构,而MQ作为JMS的实现,进一步加强了这种能力。本文将详细讲解如何在Spring中配置和使用JMS与MQ连接。 首先,要配置Spring与JMS的连接,我们需要在Spring的配置文件中...

Global site tag (gtag.js) - Google Analytics