- 浏览: 39652 次
- 性别:
- 来自: 上海
文章分类
最新评论
Websphere MQ 不多介绍,IBM的消息中间件。
新手初学MQ,高手勿笑。献丑了。
MQ 文件的传输
对于MQ传输文件,我采取的思路是:
A.先定义一个序列化类(赞命名为fileBean),类的属性有fileName和fileContent 两个。
B.用输入流配合 BASE64Encoder 将文件格式化为 基于BASE64Encoder 的String编码 作为文件的内容。
C.将文件名和文件内容set到fileBean的fileContent属性中。
D.调用MQ将这个Object写到远程队列中去。
E.接收方接受到消息时用readObject()方法读出,强转成fileBean
F.从fileBean中取出文件名和文件内容,将文件内容用BASE64Decoder解码
G.用文件输出流将文件写到指定的位置,到此大功告成。
-------------------------废话少说,看代码-------------------------------
文件序列化类
文件发送和接受类
至此,文件发送接收完成。
新手初学MQ,高手勿笑。献丑了。
MQ 文件的传输
对于MQ传输文件,我采取的思路是:
A.先定义一个序列化类(赞命名为fileBean),类的属性有fileName和fileContent 两个。
B.用输入流配合 BASE64Encoder 将文件格式化为 基于BASE64Encoder 的String编码 作为文件的内容。
C.将文件名和文件内容set到fileBean的fileContent属性中。
D.调用MQ将这个Object写到远程队列中去。
E.接收方接受到消息时用readObject()方法读出,强转成fileBean
F.从fileBean中取出文件名和文件内容,将文件内容用BASE64Decoder解码
G.用文件输出流将文件写到指定的位置,到此大功告成。
-------------------------废话少说,看代码-------------------------------
文件序列化类
package com.test.mq; import java.io.Serializable; /** * * <p> * Title: FileBean.java * </p> * <p> * Description: * </p> * <p> * Copyright: Copyright (c) 2009 * </p> * <p> * Company: shunde * </p> * * @author: listening * @create date Nov 8, 2009 */ public class FileBean implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String fileName = "";// 文件名 private String fileContent = "";// 文件内容(BASE64Encoder编码之后的) public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public String getFileContent() { return fileContent; } public void setFileContent(String fileContent) { this.fileContent = fileContent; } }
文件发送和接受类
package com.test.mq; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.Date; import sun.misc.BASE64Decoder; import sun.misc.BASE64Encoder; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; /** * * <p> * Title: MQSendAndReceiveUtil.java * </p> * <p> * Description: * </p> * <p> * Copyright: Copyright (c) 2009 * </p> * <p> * Company: shunde * </p> * * @author: listening * @create date Nov 8, 2009 */ public class MQSendAndReceiveUtil { private MQQueueManager qManager; private MQQueue queue; private static String qmManager = "QM_00000000";// 队列管理器名称 private static String remoteQName = "RQ_88888888";// 远程队列名称 private static String localQName = "LQ_00000000";// 本地队列 private static String hostname = "192.168.1.66";// 本机名称 private static String channel = "DC.SVRCONN";// 服务器链接通道 private static int ccsid = 1381; private static int port = 1414; @SuppressWarnings("unchecked") private MQSendAndReceiveUtil() { MQEnvironment.hostname = hostname; MQEnvironment.channel = channel; MQEnvironment.CCSID = ccsid; MQEnvironment.port = port; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); try { qManager = new MQQueueManager(qmManager); } catch (MQException e) { e.printStackTrace(); } } /** * * Description:如果队列管理器为空,建立 * * @param: * @return: void * @exception Exception. * @author listening created at Nov 8, 2009 */ private void createConnection() { if (qManager == null) { new MQSendAndReceiveUtil(); } } /** * * Description:发送文件 * * @param:String fileName -文件名 * @return: void * @exception Exception. * @author listening created at Nov 8, 2009 */ public void sendFileMessage(String fileName) { this.createConnection(); try { int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式 queue = qManager.accessQueue(remoteQName, openOptions, null, null, null);// 连接队列(发送时此队列为发送方的远程队列) MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例 MQMessage message = new MQMessage();// 创建MQ消息实例 FileBean file = new FileBean();// 创建FileBean对象实例并赋值 file.setFileName(fileName); InputStream in = new FileInputStream("D:\\" + fileName); // 输入流读取要发送的文件 BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例 byte[] data = new byte[in.available()]; in.read(data); String content = encoder.encode(data);// 编码文件 得到String file.setFileContent(content); message.writeObject(file);// 将FileBean实例 file放入消息中发送 queue.put(message, pmo); qManager.commit(); this.logInfo("文件发送成功"); } catch (Exception e) { e.printStackTrace(); } finally { this.closeAction(); } } public void receiveFileMessage() { try { int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式 queue = qManager.accessQueue(localQName, openOptions, null, null, null);// 连接队列(接收时队列名为接收方的本地队列) MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收 gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败 gmo.waitInterval = 100;// 等待间隔 MQMessage inMsg = new MQMessage();// 创建消息实例 queue.get(inMsg, gmo);// 从队列中拿出消息 FileBean fileBean = new FileBean(); fileBean = (FileBean) inMsg.readObject(); // 读取消息强转为FileBean类型 String content = fileBean.getFileContent();// 取文件内容 BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例 byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组 String path = "E:\\" + fileBean.getFileName(); FileOutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置 out.write(contentArray, 0, contentArray.length); // System.out.print(fileBean.getFileName()); qManager.commit();// 提交事务 this.logInfo("文件接收成功,请注意查收");// 打印日志 } catch (Exception e) { e.printStackTrace(); } } /** * * Description: 释放资源 * * @param: * @return: * @exception Exception. * @author listening created at Nov 8, 2009 */ public void closeAction() { try { if (queue != null) { queue.close(); queue = null; } else if (qManager != null) { qManager.close(); qManager = null; } } catch (Exception e) { e.printStackTrace(); } } /** * * Description:打印成功日志信息 * * @param:String message-日志内容 * @return: void * @exception Exception. * @author listening created at Nov 8, 2009 */ public void logInfo(String message) { SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); System.out.println(format.format(new Date()) + "-------" + message + "+-------------"); } /** * * Description:main函数测试 * * @param: * @return: void * @exception Exception. * @author listening created at Nov 8, 2009 */ public static void main(String[] args) { new MQSendAndReceiveUtil().sendFileMessage("test.xml"); //new MQSendAndReceiveUtil().receiveFileMessage(); } }
至此,文件发送接收完成。
评论
3 楼
mrcai314
2014-02-21
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
*
* <p>
* Title: MQSendAndReceiveUtil.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class MQSendAndReceiveUtil {
private MQQueueManager qManager;
private MQQueue queue;
private static String qmManager = "QM_AAA";// 队列管理器名称
private static String remoteQName = "LQ_AAA";// 远程队列名称RQ_BBB
private static String localQName = "LQ_AAA";// 本地队列
private static String hostname = "169.254.12.138";// 本机名称
private static String channel = "DC.SVRCONN";// 服务器链接通道
private static int ccsid = 1381;
private static int port = 1414;
private static Hashtable properties = new Hashtable();
@SuppressWarnings("unchecked")
private MQSendAndReceiveUtil() {
/*MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES);*/
properties.put("hostname", "169.254.12.138");
properties.put("port", new Integer(1414));
properties.put("channel", "DC.SVRCONN");
properties.put("CCSID", new Integer(1381));
properties.put("transport", "MQSeries");
properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
try {
//qManager = new MQQueueManager(qmManager);
qManager = new MQQueueManager(qmManager,properties);
} catch (MQException e) {
e.printStackTrace();
}
}
/**
*
* Description:如果队列管理器为空,建立
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
private void createConnection() {
if (qManager == null) {
new MQSendAndReceiveUtil();
}
}
/**
*
* Description:发送文件
*
* @param:String fileName -文件名
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void sendFileMessage(String fileName) {
this.createConnection();
try {
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
queue = qManager.accessQueue(remoteQName, openOptions, null, null,null);// 连接队列(发送时此队列为发送方的远程队列)
MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例
MQMessage message = new MQMessage();// 创建MQ消息实例
FileBean file = new FileBean();// 创建FileBean对象实例并赋值
file.setFileName(fileName);
InputStream in = new FileInputStream("C:\\" + fileName); // 输入流读取要发送的文件
BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例
byte[] data = new byte[in.available()];
in.read(data);
String content = encoder.encode(data);// 编码文件 得到String
file.setFileContent(content);
message.writeObject(file);// 将FileBean实例 file放入消息中发送
queue.put(message, pmo);
qManager.commit();
this.logInfo("文件发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
this.closeAction();
}
}
public void receiveFileMessage() {
try {
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式
queue = qManager.accessQueue(localQName, openOptions, null, null,
null);// 连接队列(接收时队列名为接收方的本地队列)
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收
gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败
gmo.waitInterval = 100;// 等待间隔
MQMessage inMsg = new MQMessage();// 创建消息实例
queue.get(inMsg, gmo);// 从队列中拿出消息
FileBean fileBean = new FileBean();
fileBean = (FileBean)inMsg.readObject(); // 读取消息强转为FileBean类型
String content = fileBean.getFileContent();// 取文件内容
System.out.println(content);
BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例
byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组
String path = "C:\\" + fileBean.getFileName();
OutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置
out.write(contentArray, 0, contentArray.length);
System.out.print(fileBean.getFileName());
qManager.commit();// 提交事务
this.logInfo("文件接收成功,请注意查收");// 打印日志
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description: 释放资源
*
* @param:
* @return:
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void closeAction() {
try {
if (queue != null) {
queue.close();
queue = null;
} else if (qManager != null) {
qManager.close();
qManager = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description:打印成功日志信息
*
* @param:String message-日志内容
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void logInfo(String message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
System.out.println(format.format(new Date()) + "-------" + message
+ "+-------------");
}
/**
*
* Description:main函数测试
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public static void main(String[] args) {
//MQSendAndReceiveUtil mqsru = new MQSendAndReceiveUtil();
new MQSendAndReceiveUtil().sendFileMessage("spring-db.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
}
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
*
* <p>
* Title: MQSendAndReceiveUtil.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class MQSendAndReceiveUtil {
private MQQueueManager qManager;
private MQQueue queue;
private static String qmManager = "QM_AAA";// 队列管理器名称
private static String remoteQName = "LQ_AAA";// 远程队列名称RQ_BBB
private static String localQName = "LQ_AAA";// 本地队列
private static String hostname = "169.254.12.138";// 本机名称
private static String channel = "DC.SVRCONN";// 服务器链接通道
private static int ccsid = 1381;
private static int port = 1414;
private static Hashtable properties = new Hashtable();
@SuppressWarnings("unchecked")
private MQSendAndReceiveUtil() {
/*MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES);*/
properties.put("hostname", "169.254.12.138");
properties.put("port", new Integer(1414));
properties.put("channel", "DC.SVRCONN");
properties.put("CCSID", new Integer(1381));
properties.put("transport", "MQSeries");
properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
try {
//qManager = new MQQueueManager(qmManager);
qManager = new MQQueueManager(qmManager,properties);
} catch (MQException e) {
e.printStackTrace();
}
}
/**
*
* Description:如果队列管理器为空,建立
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
private void createConnection() {
if (qManager == null) {
new MQSendAndReceiveUtil();
}
}
/**
*
* Description:发送文件
*
* @param:String fileName -文件名
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void sendFileMessage(String fileName) {
this.createConnection();
try {
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
queue = qManager.accessQueue(remoteQName, openOptions, null, null,null);// 连接队列(发送时此队列为发送方的远程队列)
MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例
MQMessage message = new MQMessage();// 创建MQ消息实例
FileBean file = new FileBean();// 创建FileBean对象实例并赋值
file.setFileName(fileName);
InputStream in = new FileInputStream("C:\\" + fileName); // 输入流读取要发送的文件
BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例
byte[] data = new byte[in.available()];
in.read(data);
String content = encoder.encode(data);// 编码文件 得到String
file.setFileContent(content);
message.writeObject(file);// 将FileBean实例 file放入消息中发送
queue.put(message, pmo);
qManager.commit();
this.logInfo("文件发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
this.closeAction();
}
}
public void receiveFileMessage() {
try {
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式
queue = qManager.accessQueue(localQName, openOptions, null, null,
null);// 连接队列(接收时队列名为接收方的本地队列)
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收
gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败
gmo.waitInterval = 100;// 等待间隔
MQMessage inMsg = new MQMessage();// 创建消息实例
queue.get(inMsg, gmo);// 从队列中拿出消息
FileBean fileBean = new FileBean();
fileBean = (FileBean)inMsg.readObject(); // 读取消息强转为FileBean类型
String content = fileBean.getFileContent();// 取文件内容
System.out.println(content);
BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例
byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组
String path = "C:\\" + fileBean.getFileName();
OutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置
out.write(contentArray, 0, contentArray.length);
System.out.print(fileBean.getFileName());
qManager.commit();// 提交事务
this.logInfo("文件接收成功,请注意查收");// 打印日志
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description: 释放资源
*
* @param:
* @return:
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void closeAction() {
try {
if (queue != null) {
queue.close();
queue = null;
} else if (qManager != null) {
qManager.close();
qManager = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description:打印成功日志信息
*
* @param:String message-日志内容
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void logInfo(String message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
System.out.println(format.format(new Date()) + "-------" + message
+ "+-------------");
}
/**
*
* Description:main函数测试
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public static void main(String[] args) {
//MQSendAndReceiveUtil mqsru = new MQSendAndReceiveUtil();
new MQSendAndReceiveUtil().sendFileMessage("spring-db.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
}
2 楼
mrcai314
2014-02-21
我也出上import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
*
* <p>
* Title: MQSendAndReceiveUtil.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class MQSendAndReceiveUtil {
private MQQueueManager qManager;
private MQQueue queue;
private static String qmManager = "QM_AAA";// 队列管理器名称
private static String remoteQName = "LQ_AAA";// 远程队列名称RQ_BBB
private static String localQName = "LQ_AAA";// 本地队列
private static String hostname = "169.254.12.138";// 本机名称
private static String channel = "DC.SVRCONN";// 服务器链接通道
private static int ccsid = 1381;
private static int port = 1414;
private static Hashtable properties = new Hashtable();
@SuppressWarnings("unchecked")
private MQSendAndReceiveUtil() {
/*MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES);*/
properties.put("hostname", "169.254.12.138");
properties.put("port", new Integer(1414));
properties.put("channel", "DC.SVRCONN");
properties.put("CCSID", new Integer(1381));
properties.put("transport", "MQSeries");
properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
try {
//qManager = new MQQueueManager(qmManager);
qManager = new MQQueueManager(qmManager,properties);
} catch (MQException e) {
e.printStackTrace();
}
}
/**
*
* Description:如果队列管理器为空,建立
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
private void createConnection() {
if (qManager == null) {
new MQSendAndReceiveUtil();
}
}
/**
*
* Description:发送文件
*
* @param:String fileName -文件名
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void sendFileMessage(String fileName) {
this.createConnection();
try {
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
queue = qManager.accessQueue(remoteQName, openOptions, null, null,null);// 连接队列(发送时此队列为发送方的远程队列)
MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例
MQMessage message = new MQMessage();// 创建MQ消息实例
FileBean file = new FileBean();// 创建FileBean对象实例并赋值
file.setFileName(fileName);
InputStream in = new FileInputStream("C:\\" + fileName); // 输入流读取要发送的文件
BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例
byte[] data = new byte[in.available()];
in.read(data);
String content = encoder.encode(data);// 编码文件 得到String
file.setFileContent(content);
message.writeObject(file);// 将FileBean实例 file放入消息中发送
queue.put(message, pmo);
qManager.commit();
this.logInfo("文件发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
this.closeAction();
}
}
public void receiveFileMessage() {
try {
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式
queue = qManager.accessQueue(localQName, openOptions, null, null,
null);// 连接队列(接收时队列名为接收方的本地队列)
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收
gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败
gmo.waitInterval = 100;// 等待间隔
MQMessage inMsg = new MQMessage();// 创建消息实例
queue.get(inMsg, gmo);// 从队列中拿出消息
FileBean fileBean = new FileBean();
fileBean = (FileBean)inMsg.readObject(); // 读取消息强转为FileBean类型
String content = fileBean.getFileContent();// 取文件内容
System.out.println(content);
BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例
byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组
String path = "C:\\" + fileBean.getFileName();
OutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置
out.write(contentArray, 0, contentArray.length);
System.out.print(fileBean.getFileName());
qManager.commit();// 提交事务
this.logInfo("文件接收成功,请注意查收");// 打印日志
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description: 释放资源
*
* @param:
* @return:
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void closeAction() {
try {
if (queue != null) {
queue.close();
queue = null;
} else if (qManager != null) {
qManager.close();
qManager = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description:打印成功日志信息
*
* @param:String message-日志内容
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void logInfo(String message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
System.out.println(format.format(new Date()) + "-------" + message
+ "+-------------");
}
/**
*
* Description:main函数测试
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public static void main(String[] args) {
//MQSendAndReceiveUtil mqsru = new MQSendAndReceiveUtil();
new MQSendAndReceiveUtil().sendFileMessage("spring-db.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
}
述异常,经修改验证通过后代码如下:
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
*
* <p>
* Title: MQSendAndReceiveUtil.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class MQSendAndReceiveUtil {
private MQQueueManager qManager;
private MQQueue queue;
private static String qmManager = "QM_AAA";// 队列管理器名称
private static String remoteQName = "LQ_AAA";// 远程队列名称RQ_BBB
private static String localQName = "LQ_AAA";// 本地队列
private static String hostname = "169.254.12.138";// 本机名称
private static String channel = "DC.SVRCONN";// 服务器链接通道
private static int ccsid = 1381;
private static int port = 1414;
private static Hashtable properties = new Hashtable();
@SuppressWarnings("unchecked")
private MQSendAndReceiveUtil() {
/*MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES);*/
properties.put("hostname", "169.254.12.138");
properties.put("port", new Integer(1414));
properties.put("channel", "DC.SVRCONN");
properties.put("CCSID", new Integer(1381));
properties.put("transport", "MQSeries");
properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
try {
//qManager = new MQQueueManager(qmManager);
qManager = new MQQueueManager(qmManager,properties);
} catch (MQException e) {
e.printStackTrace();
}
}
/**
*
* Description:如果队列管理器为空,建立
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
private void createConnection() {
if (qManager == null) {
new MQSendAndReceiveUtil();
}
}
/**
*
* Description:发送文件
*
* @param:String fileName -文件名
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void sendFileMessage(String fileName) {
this.createConnection();
try {
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式
//int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
queue = qManager.accessQueue(remoteQName, openOptions, null, null,null);// 连接队列(发送时此队列为发送方的远程队列)
MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例
MQMessage message = new MQMessage();// 创建MQ消息实例
FileBean file = new FileBean();// 创建FileBean对象实例并赋值
file.setFileName(fileName);
InputStream in = new FileInputStream("C:\\" + fileName); // 输入流读取要发送的文件
BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例
byte[] data = new byte[in.available()];
in.read(data);
String content = encoder.encode(data);// 编码文件 得到String
file.setFileContent(content);
message.writeObject(file);// 将FileBean实例 file放入消息中发送
queue.put(message, pmo);
qManager.commit();
this.logInfo("文件发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
this.closeAction();
}
}
public void receiveFileMessage() {
try {
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式
queue = qManager.accessQueue(localQName, openOptions, null, null,
null);// 连接队列(接收时队列名为接收方的本地队列)
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收
gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败
gmo.waitInterval = 100;// 等待间隔
MQMessage inMsg = new MQMessage();// 创建消息实例
queue.get(inMsg, gmo);// 从队列中拿出消息
FileBean fileBean = new FileBean();
fileBean = (FileBean)inMsg.readObject(); // 读取消息强转为FileBean类型
String content = fileBean.getFileContent();// 取文件内容
System.out.println(content);
BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例
byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组
String path = "C:\\" + fileBean.getFileName();
OutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置
out.write(contentArray, 0, contentArray.length);
System.out.print(fileBean.getFileName());
qManager.commit();// 提交事务
this.logInfo("文件接收成功,请注意查收");// 打印日志
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description: 释放资源
*
* @param:
* @return:
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void closeAction() {
try {
if (queue != null) {
queue.close();
queue = null;
} else if (qManager != null) {
qManager.close();
qManager = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* Description:打印成功日志信息
*
* @param:String message-日志内容
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void logInfo(String message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
System.out.println(format.format(new Date()) + "-------" + message
+ "+-------------");
}
/**
*
* Description:main函数测试
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public static void main(String[] args) {
//MQSendAndReceiveUtil mqsru = new MQSendAndReceiveUtil();
new MQSendAndReceiveUtil().sendFileMessage("spring-db.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
}
述异常,经修改验证通过后代码如下:
1 楼
spsace
2011-04-15
发送XML成功,但读取的时候报错:
public static void main(String[] args) {
//new MQSendAndReceiveUtil().sendFileMessage("MyEmployee.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
异常如下:
java.io.StreamCorruptedException: invalid stream header: 58514820
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:782)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:279)
at com.ibm.mq.MQObjectInputStream.<init>(MQObjectInputStream.java:145)
at com.ibm.mq.MQMessage.readObject(MQMessage.java:2018)
at ace.mq.utils.MQSendAndReceiveUtil.receiveFileMessage(MQSendAndReceiveUtil.java:159)
at ace.mq.utils.MQSendAndReceiveUtil.main(MQSendAndReceiveUtil.java:233)
请楼主帮忙解答。
public static void main(String[] args) {
//new MQSendAndReceiveUtil().sendFileMessage("MyEmployee.xml");
new MQSendAndReceiveUtil().receiveFileMessage();
}
异常如下:
java.io.StreamCorruptedException: invalid stream header: 58514820
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:782)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:279)
at com.ibm.mq.MQObjectInputStream.<init>(MQObjectInputStream.java:145)
at com.ibm.mq.MQMessage.readObject(MQMessage.java:2018)
at ace.mq.utils.MQSendAndReceiveUtil.receiveFileMessage(MQSendAndReceiveUtil.java:159)
at ace.mq.utils.MQSendAndReceiveUtil.main(MQSendAndReceiveUtil.java:233)
请楼主帮忙解答。
相关推荐
在实际应用中,IBM MQ文件传输是指通过消息队列来传递文件数据。这通常涉及到将文件内容转换为消息,然后使用`mqput`或者类似的API方法放入队列,接收端则从队列中取出消息并恢复成原始文件。这种机制使得文件传输...
IBM MQ,全称为IBM WebSphere MQ,是一种企业级的消息中间件,它允许应用程序之间通过网络进行可靠的消息交换,尤其在处理大文件传输时表现出色。本文将深入探讨IBM MQ在大文件传输中的应用、工作原理以及相关的编程...
调用MQ向指定的队列中传输xml文件,同时发送成功后到指定的队列中去读取回复的xml格式的消息
本文将深入解析MQ系统的核心概念、最佳实践以及如何优化MQ的使用,确保信息传输的高效与安全。 ### MQ系统的核心概念 MQ系统作为分布式计算环境中的一种消息中间件,其主要功能是实现应用程序之间的异步通信。它...
在利用Websphere MQ实现大文件传输的应用场景下,主要涉及以下几个核心概念和技术要点: 1. **Websphere MQ数据交换网络**:指由多个Websphere MQ队列管理器构成的数据交换环境,这些队列管理器相互配合,确保数据...
本项目聚焦于如何利用MQ进行数据传输,并结合XML文件处理来实现数据库的入库操作。我们将深入探讨MQ的基本概念、XML文件的解析与操作,以及Hibernate在数据库操作中的应用。 首先,MQ是一种中间件,它允许应用程序...
在IBM WebSphere MQ(简称MQ)中,日志是至关重要的组成部分,主要用于确保消息的可靠传输和系统的稳定运行。MQ日志的主要功能是记录队列管理器控制下的所有关键数据变化,包括对象的创建与删除、消息的持久化更新、...
### MQ通道定义表文件说明 #### 一、概述 IBM MQ 是一款强大的消息中间件,广泛应用于企业级应用集成场景中。MQ 提供了多种方式来实现客户端与服务器端之间的连接,其中最为常见的是通过环境变量和通道定义表文件...
MQ是一种消息中间件,用于在不同的应用程序之间传递数据,确保数据的可靠传输,即使在发送方和接收方之间的网络连接不稳定时也是如此。在这个压缩包中,包含了驱动程序的库文件和示例代码,帮助开发者理解和使用MQ...
3. **安全性**:IBM MQ支持多种安全机制,如SSL/TLS加密、用户认证和授权(通过MQ队列管理器的安全性设置、通道认证记录等),确保数据传输的安全。 4. **通道和协议**:MQ通道用于在不同MQ节点间传输消息,支持TCP...
标题中的“如何利用MQ实现大文件传输和交换具有可靠、安全、断点传输等特点”涉及到的是基于消息队列(Message Queue, MQ)技术的大文件处理,这种技术在分布式系统中常用于实现数据交换,尤其是在处理大量数据或者...
IBM MQ(Message Queue)是IBM提供的一种企业级的消息中间件,用于在分布式系统中可靠地传输数据。在Java开发环境中,如果需要与IBM MQ进行交互,通常需要引入特定的IBM MQ Java API,这些API通常被打包成JAR(Java ...
3. **通道**:通道是MQ中数据传输的路径,包括发送通道(SDR)和接收通道(RCVR)。创建通道时,需要指定通道类型和连接地址,如`DEFINE CHANNEL (SDRNAME) CHLTYPE (SDR) +CONNAME ('IP_ADDRESS(PORT)') XMITQ ...
描述中的“IBM webSphere Mq应用jar包集合”是指IBM MQ提供的Java开发包,这些JAR文件包含了开发和运行与MQ交互的Java应用程序所需的类和资源。这些库使得开发者能够在Java环境中编写代码,以便发送、接收和管理MQ...
IBM MQ是一种消息中间件,它提供了一种可靠、高效的方式来传输应用程序之间的数据,确保即使在网络不稳定或系统故障的情况下也能完成消息传递。队列管理器是IBM MQ的核心组件,负责处理消息的存储和转发。队列则用于...
WebSphere MQ,通常简称为MQ,是一个强大的消息传递平台,用于在不同应用程序之间可靠地传输数据,确保了数据的高可用性和安全性。以下是关于这个安装包的一些关键知识点: 1. **MQ客户端**:MQ客户端是用于连接到...
MQ(Message Queuing)是一种消息中间件技术,用于在分布式系统中传递数据,确保消息的可靠传输和解耦。MQ日常维护手册是IT管理员进行MQ系统管理的重要参考文档,涵盖了一系列管理和监控MQ所需的知识点。 一、MQ...
"MQ security channel"是IBM MQ中的一个重要概念,主要涉及的是数据传输过程中的安全机制,确保信息在传输过程中不被窃取或篡改。 在IBM MQ v7.1及后续版本中,安全通道认证得到了显著增强,以满足更严格的安全需求...