import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class CLIENT_MQ{
//定义队列管理器和队列的名称
private static final String qmName = "MQ_SERVICE";//MQ的队列管理器名称 ;
//private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称
private static MQQueueManager qMgr;//队列管理器
public static void init(){
//设置环境:
//MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,
//因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.
MQEnvironment.hostname="10.172.12.156"; //MQ服务器的IP地址
MQEnvironment.channel="SERVICE_JAVA"; //通道类型:服务器连接
MQEnvironment.CCSID=1381;//437 //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)
MQEnvironment.port=1456; //MQ端口
try {
//定义并初始化队列管理器对象并连接
//MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。
qMgr = new MQQueueManager(qmName);
} catch (MQException e) {
// TODO Auto-generated catch block
System.out.println("初使化MQ出错");
e.printStackTrace();
}
}
/**
* 往MQ发送消息
* @param message
* @return
*/
public static Map<String,Object> sendMessage(Object message,String qName){
Map<String,Object> map=new HashMap<String,Object>();
try{
//设置将要连接的队列属性
// Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
//(except for completion code constants and error code constants).
//MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
//MQOO_OUTPUT:Open the queue to put messages.
/*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
/*以下选项可适合远程队列与本地队列*/
//int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
//int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
//连接队列
//MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
//The inquire and set capabilities are inherited from MQManagedObject.
/*关闭了就重新打开*/
if(qMgr==null || !qMgr.isConnected()){
qMgr = new MQQueueManager(qmName);
}
MQQueue queue = qMgr.accessQueue(qName, openOptions);
//定义一个简单的消息
MQMessage putMessage = new MQMessage();
map.put("messageId",putMessage);
//String uuid=java.util.UUID.randomUUID().toString();
//将数据放入消息缓冲区
putMessage.writeObject(message);
//设置写入消息的属性(默认属性)
MQPutMessageOptions pmo = new MQPutMessageOptions();
//将消息写入队列
queue.put(putMessage,pmo);
map.put("message",message.toString());
queue.close();
}catch (MQException ex) {
System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);
ex.printStackTrace();
}catch (IOException ex) {
System.out.println("An error occurred whilst writing to the message buffer: " + ex);
}catch(Exception ex){
ex.printStackTrace();
}finally{
try {
qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
return map;
}
/**
* 处理完消息回放到MQ队列
* @param message
* @return
*/
public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){
Map<String,Object> map=new HashMap<String,Object>();
try{
//设置将要连接的队列属性
// Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
//(except for completion code constants and error code constants).
//MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
//MQOO_OUTPUT:Open the queue to put messages.
/*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
/*以下选项可适合远程队列与本地队列*/
//int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
//int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
//连接队列
//MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
//The inquire and set capabilities are inherited from MQManagedObject.
/*关闭了就重新打开*/
if(qMgr==null || !qMgr.isConnected()){
qMgr = new MQQueueManager(qmName);
}
MQQueue queue = qMgr.accessQueue(qName, openOptions);
//定义一个简单的消息
MQMessage putMessage = new MQMessage();
putMessage.messageId=mqMessage.messageId;
map.put("messageId",putMessage);
//String uuid=java.util.UUID.randomUUID().toString();
//将数据放入消息缓冲区
putMessage.writeObject(message);
//设置写入消息的属性(默认属性)
MQPutMessageOptions pmo = new MQPutMessageOptions();
//将消息写入队列
queue.put(putMessage,pmo);
map.put("message",message.toString());
queue.close();
}catch (MQException ex) {
System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);
ex.printStackTrace();
}catch (IOException ex) {
System.out.println("An error occurred whilst writing to the message buffer: " + ex);
}catch(Exception ex){
ex.printStackTrace();
}finally{
try {
qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
return map;
}
/**
* 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息
* 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。
* @return
*/
public static String getMessage(String qName,MQMessage mqMessage){
String message="";
try{
//设置将要连接的队列属性
// Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
//(except for completion code constants and error code constants).
//MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
//MQOO_OUTPUT:Open the queue to put messages.
//int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用
//int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
MQMessage retrieve = new MQMessage();
//设置取出消息的属性(默认属性)
//Set the put message options.(设置放置消息选项)
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)
gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(如果在队列上没有消息则等待)
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)
gmo.waitInterval = 3000 ; // Sets the time limit for the wait.(设置等待的毫秒时间限制)
/*关闭了就重新打开*/
if(qMgr==null || !qMgr.isConnected()){
qMgr = new MQQueueManager(qmName);
}
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage retrievedMessage = new MQMessage();
//从队列中取出对应messageId的消息
retrieve.messageId = mqMessage.messageId;
// 从队列中取出消息
queue.get(retrieve, gmo);
Object obj = retrieve.readObject();
message=obj.toString();//解决中文乱码问题
/*
//int size = rcvMessage.getMessageLength();
//byte[] p = new byte[size];
//rcvMessage.readFully(p);
int len=retrieve.getDataLength();
byte[] str = new byte[len];
retrieve.readFully(str,0,len);
message = new String(str);//readUTF();
*/
queue.close();
}catch (MQException ex) {
int reason=ex.reasonCode;
if(reason==2033)//no messages
{
message="nomessage";
}else{
System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);
}
}catch (IOException ex) {
System.out.println("An error occurred whilst writing to the message buffer: " + ex);
}catch(Exception ex){
ex.printStackTrace();
}finally{
try {
qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
return message;
}
public static void main(String args[]) {
init();
Map<String,Object> map = new HashMap<String,Object>();
map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE");
MQMessage mqMessage = (MQMessage)map.get("messageId");
outSys("传输消息:",mqMessage.messageId.toString());
outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage));
Map<String,Object> reply_map = new HashMap<String,Object>();
reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage);
outSys("放入正常队列:",reply_map.get("message").toString());
outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage));
}
public static void outSys(String display,String val){
System.out.println(display+val);
}
}
分享到:
相关推荐
IBM MQ 是一款强大的消息中间件,常用于政府和金融领域,提供可靠的数据传输和报文收发服务。本文档详细介绍了IBM MQ的安装、学习、实践以及遇到的问题和解决方案,同时还涉及了MQ通道的SSL加密传输和Java端的SSL...
- **AMQ8101 错误**:这是一个常见的错误代码,表示“发生 IBMMQ 错误 (80F)”。此错误通常发生在安装过程中,可能是由于缺少必要的系统组件或不兼容的环境导致的。 - **配置完成后需要重启**:在完成IBM MQ的安装和...
在IT行业中,IBM MQ(原名WebSphere MQ)是一种广泛使用的中间件,它提供了一种可靠的消息传递服务,使得应用程序可以相互通信,即使在不同的网络环境或操作系统之间也是如此。本篇文章将深入探讨如何使用Java API与...
在IT行业中,Java是一种广泛应用的编程语言,而IBM MQ(原名WebSphere MQ)是IBM提供的消息中间件,用于在不同系统之间可靠地传递数据。本文将深入探讨如何使用Java来连接IBM MQ,以实现跨平台的信息交换。 首先,...
【IBM MQ资料集合】是一个全面涵盖IBM MQ技术的资源包,包含了从基础到高级的各种学习材料,适合对IBM MQ感兴趣或者需要深入理解该技术的读者。这个集合中的文档包括了"精通WebSphere MQ"、"MQ JAVA编程"、"MQ6.0"等...
通过这两个示例,开发者可以学习到如何在IBM MQ环境中使用Java API进行基本的交互,这对于理解和构建基于IBM MQ的消息传递系统至关重要。同时,这也是理解IBM MQ工作原理,以及如何在实际项目中实现异步通信和错误...
IBM MQ是一种可靠的企业级消息中间件,它允许不同系统和应用程序之间通过消息传递进行通信。JMS是Java平台上的标准接口,用于与消息中间件交互,如IBM MQ。通过JMS,我们可以创建生产者(发送消息)和消费者(接收...
IBM MQ(原名 WebSphere MQ)是一款由IBM公司开发的企业级消息中间件软件,旨在帮助企业和组织构建安全、可靠的信息交换系统。IBM MQ的主要功能是确保消息能够在不同的应用程序之间安全、高效地传输,无论这些应用...
**IBM MQ Explorer** 是IBM提供的一款强大的消息中间件管理工具,专用于监控和管理IBM MQ(原名WebSphere MQ)系统。...在实际工作中,应结合IBM MQ的文档和最佳实践,不断学习和探索,以充分发挥其功能。
### IBM MQ学习笔记知识点解析 #### 一、IBM MQ简介与基本操作 IBM MQ(Message Queuing)是一种消息中间件,用于实现不同应用程序之间安全可靠的消息传递。它提供了跨平台的解决方案,支持多种操作系统和开发语言...
通过这份"IBM MQ编程模式中文版"指南,开发者将学习如何创建和配置Queue Manager,建立与MQ的连接,发送和接收消息,处理消息属性,以及正确地处理异常。此外,还会了解到一些最佳实践和常见编程模式,以便于在实际...
IBM MQ,全称为IBM Message Queue,是IBM提供的一款企业级的消息中间件产品,它允许应用程序在不同的网络环境和操作系统之间安全、...在实际操作中,结合官方文档和实践,不断探索和学习,将有助于你成为IBM MQ的专家。
IBM MQ(前称WebSphere MQ)是IBM提供的一款消息中间件,用于实现跨平台、网络环境下的分布式系统间可靠的消息传输。C#作为一种广泛使用的面向对象编程语言,与IBM MQ结合,为开发人员提供了强大的工具集来构建复杂...
总的来说,"C# IBM MQ.rar"可能包含示例代码、配置文件或教程,帮助开发者学习如何使用C#有效地与IBM MQ进行集成,从而构建可靠的分布式系统。掌握这些知识点对于在企业级环境中构建高效、稳定的应用程序至关重要。...
IBM MQ,全称为IBM WebSphere MQ,是一种企业级的消息中间件,它允许应用程序在不同的网络协议、操作系统和硬件之间安全、可靠地交换消息。在这个"ibm mq tools.rar"压缩包中,我们可以期待找到一系列IBM MQ的开发...
此IBM MQ培训资料专为7版本设计,旨在帮助学习者快速掌握这一强大工具的核心功能和技术。 首先,从文件列表中我们可以看到,资料涵盖了从基础到高级的不同主题: 1. "1.1MQ Technical Training Classes(Primary)-...
这个“IBM MQ Demo”是一个简单的示例,用于演示如何将消息放入IBM MQ队列中,这对于理解和学习如何与IBM MQ交互非常有帮助。 首先,我们来看一下提供的文件列表: 1. **Run.bat**:这是一个批处理文件,通常用于...
7. MQSeriesSamples-8.0.0-4.x86_64.rpm:包含了一系列示例代码和配置,帮助用户学习和理解IBM MQ的用法。 8. MQSeriesMan-8.0.0-4.x86_64.rpm:提供IBM MQ的man手册页,是命令行操作的重要参考资料。 接下来,...