`
tanglong8848
  • 浏览: 71152 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

IBM MQ 学习(一)

 
阅读更多
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
  
public class CLIENT_MQ{  
     //定义队列管理器和队列的名称  
     private static final String qmName = "MQ_SERVICE";//MQ的队列管理器名称 ;   
     //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称  
     private static MQQueueManager qMgr;//队列管理器
     public  static void init(){  
         //设置环境:  
         //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,  
         //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.  
         MQEnvironment.hostname="10.172.12.156";          //MQ服务器的IP地址        
         MQEnvironment.channel="SERVICE_JAVA";           //通道类型:服务器连接
         MQEnvironment.CCSID=1381;//437                    //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)  
         MQEnvironment.port=1456;                       //MQ端口  
         try {  
             //定义并初始化队列管理器对象并连接   
             //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。  
            qMgr = new MQQueueManager(qmName);  
        } catch (MQException e) {  
            // TODO Auto-generated catch block  
            System.out.println("初使化MQ出错");  
            e.printStackTrace();  
        }   
     }  
     /** 
      * 往MQ发送消息 
      * @param message 
      * @return 
      */  
     public static Map<String,Object> sendMessage(Object message,String qName){    
    	 Map<String,Object> map=new HashMap<String,Object>();
         try{     
             //设置将要连接的队列属性  
             // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface   
             //(except for completion code constants and error code constants).  
             //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.  
             //MQOO_OUTPUT:Open the queue to put messages.  
             /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/  
             //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;  
             /*以下选项可适合远程队列与本地队列*/  
        	 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
             //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
             int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;  
             //连接队列   
             //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.   
             //The inquire and set capabilities are inherited from MQManagedObject.   
             /*关闭了就重新打开*/  
            if(qMgr==null || !qMgr.isConnected()){  
                qMgr = new MQQueueManager(qmName);  
            }  
             MQQueue queue = qMgr.accessQueue(qName, openOptions);            
             //定义一个简单的消息  
             MQMessage putMessage = new MQMessage();
             map.put("messageId",putMessage);
             //String uuid=java.util.UUID.randomUUID().toString();
             //将数据放入消息缓冲区  
             putMessage.writeObject(message);    
             //设置写入消息的属性(默认属性)  
             MQPutMessageOptions pmo = new MQPutMessageOptions(); 
            
             //将消息写入队列   
             queue.put(putMessage,pmo); 
             map.put("message",message.toString());
             queue.close();  
         }catch (MQException ex) {   
             System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);   
             ex.printStackTrace();  
         }catch (IOException ex) {   
             System.out.println("An error occurred whilst writing to the message buffer: " + ex);   
         }catch(Exception ex){  
             ex.printStackTrace();  
         }finally{  
             try {  
                qMgr.disconnect();  
            } catch (MQException e) {  
                e.printStackTrace();  
            }  
          }  
         return map;  
     }  
     
     
      

     
     /** 
      * 处理完消息回放到MQ队列 
      * @param message 
      * @return 
      */  
     public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){    
    	 Map<String,Object> map=new HashMap<String,Object>();
         try{     
             //设置将要连接的队列属性  
             // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface   
             //(except for completion code constants and error code constants).  
             //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.  
             //MQOO_OUTPUT:Open the queue to put messages.  
             /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/  
             //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;  
             /*以下选项可适合远程队列与本地队列*/  
        	 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
             //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
             int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;  
             //连接队列   
             //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.   
             //The inquire and set capabilities are inherited from MQManagedObject.   
             /*关闭了就重新打开*/  
            if(qMgr==null || !qMgr.isConnected()){  
                qMgr = new MQQueueManager(qmName);  
            }  
             MQQueue queue = qMgr.accessQueue(qName, openOptions);            
             //定义一个简单的消息  
             MQMessage putMessage = new MQMessage();
             putMessage.messageId=mqMessage.messageId;
             map.put("messageId",putMessage);
             //String uuid=java.util.UUID.randomUUID().toString();
             //将数据放入消息缓冲区  
             putMessage.writeObject(message);    
             //设置写入消息的属性(默认属性)  
             MQPutMessageOptions pmo = new MQPutMessageOptions(); 
            
             //将消息写入队列   
             queue.put(putMessage,pmo); 
             map.put("message",message.toString());
             queue.close();  
         }catch (MQException ex) {   
             System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);   
             ex.printStackTrace();  
         }catch (IOException ex) {   
             System.out.println("An error occurred whilst writing to the message buffer: " + ex);   
         }catch(Exception ex){  
             ex.printStackTrace();  
         }finally{  
             try {  
                qMgr.disconnect();  
            } catch (MQException e) {  
                e.printStackTrace();  
            }  
          }  
         return map;  
     }  
     
     
     
     
     /** 
      * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息 
      * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。 
      * @return 
      */  
     public static String getMessage(String qName,MQMessage mqMessage){  
         String message="";  
         try{              
             //设置将要连接的队列属性  
             // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface   
             //(except for completion code constants and error code constants).  
             //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.  
             //MQOO_OUTPUT:Open the queue to put messages.  
        	 //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用
             //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
        	 
             int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;  
             MQMessage retrieve = new MQMessage();  
             //设置取出消息的属性(默认属性)  
             //Set the put message options.(设置放置消息选项)   
             MQGetMessageOptions gmo = new MQGetMessageOptions();   
             
             gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)   
             gmo.options = gmo.options + MQC.MQGMO_WAIT;  // Wait if no messages on the Queue(如果在队列上没有消息则等待)   
             gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)   
             gmo.waitInterval = 3000 ;  // Sets the time limit for the wait.(设置等待的毫秒时间限制)   
             /*关闭了就重新打开*/  
            if(qMgr==null || !qMgr.isConnected()){  
                qMgr = new MQQueueManager(qmName);  
            }  
             MQQueue queue = qMgr.accessQueue(qName, openOptions);  
             
             MQMessage retrievedMessage = new MQMessage();
             //从队列中取出对应messageId的消息
             retrieve.messageId = mqMessage.messageId; 
             // 从队列中取出消息
             queue.get(retrieve, gmo);  
             
            
             Object obj = retrieve.readObject();
             message=obj.toString();//解决中文乱码问题
             /*
     
             //int size = rcvMessage.getMessageLength();
             //byte[] p = new byte[size];
             //rcvMessage.readFully(p);
             
             int len=retrieve.getDataLength();
             byte[] str = new byte[len];
              retrieve.readFully(str,0,len);
              message = new String(str);//readUTF();    
             */
           
             queue.close();  
         }catch (MQException ex) {
        	 int reason=ex.reasonCode;
        	 if(reason==2033)//no messages
             {
                message="nomessage";
             }else{
                System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);   
             }
         }catch (IOException ex) {   
             System.out.println("An error occurred whilst writing to the message buffer: " + ex);   
         }catch(Exception ex){  
             ex.printStackTrace();  
         }finally{  
            try {  
                qMgr.disconnect();  
            } catch (MQException e) {  
                e.printStackTrace();  
            }  
         }  
         return message;  
     }  


     
     public static void main(String args[]) {  
    	 init();
    	 Map<String,Object> map = new HashMap<String,Object>();
    	 map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE");
    	 MQMessage mqMessage = (MQMessage)map.get("messageId");
    	 outSys("传输消息:",mqMessage.messageId.toString());  
    	 
    	 outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage));
    	 Map<String,Object> reply_map = new HashMap<String,Object>();
    	 reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage);
		 outSys("放入正常队列:",reply_map.get("message").toString());
		 
    	 outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage));
    	 
    	 
     }
     
     
     
     public static void outSys(String display,String val){
    	 System.out.println(display+val);
     }
     
} 

分享到:
评论

相关推荐

    IBM MQ学习开发详细文档

    IBM MQ 是一款强大的消息中间件,常用于政府和金融领域,提供可靠的数据传输和报文收发服务。本文档详细介绍了IBM MQ的安装、学习、实践以及遇到的问题和解决方案,同时还涉及了MQ通道的SSL加密传输和Java端的SSL...

    IBM MQ学习总结文档

    - **AMQ8101 错误**:这是一个常见的错误代码,表示“发生 IBMMQ 错误 (80F)”。此错误通常发生在安装过程中,可能是由于缺少必要的系统组件或不兼容的环境导致的。 - **配置完成后需要重启**:在完成IBM MQ的安装和...

    JAVA IBM MQ 接收、发送

    在IT行业中,IBM MQ(原名WebSphere MQ)是一种广泛使用的中间件,它提供了一种可靠的消息传递服务,使得应用程序可以相互通信,即使在不同的网络环境或操作系统之间也是如此。本篇文章将深入探讨如何使用Java API与...

    JAVA连接IBM MQ代码

    在IT行业中,Java是一种广泛应用的编程语言,而IBM MQ(原名WebSphere MQ)是IBM提供的消息中间件,用于在不同系统之间可靠地传递数据。本文将深入探讨如何使用Java来连接IBM MQ,以实现跨平台的信息交换。 首先,...

    IBM MQ资料集合

    【IBM MQ资料集合】是一个全面涵盖IBM MQ技术的资源包,包含了从基础到高级的各种学习材料,适合对IBM MQ感兴趣或者需要深入理解该技术的读者。这个集合中的文档包括了"精通WebSphere MQ"、"MQ JAVA编程"、"MQ6.0"等...

    IBM MQ JAVA DEMO

    通过这两个示例,开发者可以学习到如何在IBM MQ环境中使用Java API进行基本的交互,这对于理解和构建基于IBM MQ的消息传递系统至关重要。同时,这也是理解IBM MQ工作原理,以及如何在实际项目中实现异步通信和错误...

    JMS调用IBM MQ监听模式

    IBM MQ是一种可靠的企业级消息中间件,它允许不同系统和应用程序之间通过消息传递进行通信。JMS是Java平台上的标准接口,用于与消息中间件交互,如IBM MQ。通过JMS,我们可以创建生产者(发送消息)和消费者(接收...

    IBM MQ 入门及提高

    IBM MQ(原名 WebSphere MQ)是一款由IBM公司开发的企业级消息中间件软件,旨在帮助企业和组织构建安全、可靠的信息交换系统。IBM MQ的主要功能是确保消息能够在不同的应用程序之间安全、高效地传输,无论这些应用...

    IBM MQ Explorer

    **IBM MQ Explorer** 是IBM提供的一款强大的消息中间件管理工具,专用于监控和管理IBM MQ(原名WebSphere MQ)系统。...在实际工作中,应结合IBM MQ的文档和最佳实践,不断学习和探索,以充分发挥其功能。

    IBM MQ学习笔记试验脚本

    ### IBM MQ学习笔记知识点解析 #### 一、IBM MQ简介与基本操作 IBM MQ(Message Queuing)是一种消息中间件,用于实现不同应用程序之间安全可靠的消息传递。它提供了跨平台的解决方案,支持多种操作系统和开发语言...

    IBM MQ编程模式中文版

    通过这份"IBM MQ编程模式中文版"指南,开发者将学习如何创建和配置Queue Manager,建立与MQ的连接,发送和接收消息,处理消息属性,以及正确地处理异常。此外,还会了解到一些最佳实践和常见编程模式,以便于在实际...

    IBM MQ使用方法

    IBM MQ,全称为IBM Message Queue,是IBM提供的一款企业级的消息中间件产品,它允许应用程序在不同的网络环境和操作系统之间安全、...在实际操作中,结合官方文档和实践,不断探索和学习,将有助于你成为IBM MQ的专家。

    c# 操作ibm mq 手册

    IBM MQ(前称WebSphere MQ)是IBM提供的一款消息中间件,用于实现跨平台、网络环境下的分布式系统间可靠的消息传输。C#作为一种广泛使用的面向对象编程语言,与IBM MQ结合,为开发人员提供了强大的工具集来构建复杂...

    C#IBM MQ.rar

    总的来说,"C# IBM MQ.rar"可能包含示例代码、配置文件或教程,帮助开发者学习如何使用C#有效地与IBM MQ进行集成,从而构建可靠的分布式系统。掌握这些知识点对于在企业级环境中构建高效、稳定的应用程序至关重要。...

    ibm mq tools.rar

    IBM MQ,全称为IBM WebSphere MQ,是一种企业级的消息中间件,它允许应用程序在不同的网络协议、操作系统和硬件之间安全、可靠地交换消息。在这个"ibm mq tools.rar"压缩包中,我们可以期待找到一系列IBM MQ的开发...

    IBM MQ培训资料

    此IBM MQ培训资料专为7版本设计,旨在帮助学习者快速掌握这一强大工具的核心功能和技术。 首先,从文件列表中我们可以看到,资料涵盖了从基础到高级的不同主题: 1. "1.1MQ Technical Training Classes(Primary)-...

    IBM MQ Demo

    这个“IBM MQ Demo”是一个简单的示例,用于演示如何将消息放入IBM MQ队列中,这对于理解和学习如何与IBM MQ交互非常有帮助。 首先,我们来看一下提供的文件列表: 1. **Run.bat**:这是一个批处理文件,通常用于...

    Linux ibm MQ基本安装程序rpm

    7. MQSeriesSamples-8.0.0-4.x86_64.rpm:包含了一系列示例代码和配置,帮助用户学习和理解IBM MQ的用法。 8. MQSeriesMan-8.0.0-4.x86_64.rpm:提供IBM MQ的man手册页,是命令行操作的重要参考资料。 接下来,...

Global site tag (gtag.js) - Google Analytics