package com.sesoft.dev.mq.app;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Hashtable;
import oracle.jdbc.xa.client.OracleXADataSource;
import com.ibm.mq.MQC;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
* Copyright (c) 2010,Selenium Soft All rights reserved.
* @author eric.wang
* @date 2010-9-25
* @说明 MQ与ORACLE数据库的XA事务操作
*/
public class MQOrclXA {
public static final String qmName = "QM_eric";
public static final String queue = "default";
private static MQQueueManager qmgr = null;
private static Connection conn = null;
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
}
public static void initialize() throws Exception {
Hashtable properties = new Hashtable();
// 设置THREAD_AFFINITY_PROPERTY=TRUE,意思是多阶段事务不支持Shared方式的连接
properties.put(MQC.THREAD_AFFINITY_PROPERTY, new Boolean(true));
//下面可以取二个值MQC.TRANSPORT_MQSERIES_BINDINGS or MQC.TRANSPORT_MQSERIES_CLIENT
properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_BINDINGS);
qmgr = new MQQueueManager("QM_eric", properties);
//准备XA数据库
OracleXADataSource xads = new OracleXADataSource();
String dburl2 = "jdbc:oracle:thin:@192.168.21.226:1521:test";
xads.setURL(dburl2);
xads.setUser("edientjoin");
xads.setPassword("edientjoin");
// 得到连接
conn = qmgr.getJDBCConnection(xads);
// 关闭自动提交,必须
conn.setAutoCommit(false);
// all done
System.out.println("初始化完成");
}
public void doTransaction(){
try {
// theQueueManager.begin();
String sql = "update employees t set t.last_name='wang' where t.employee_id='1'";
PreparedStatement stmt = conn.prepareStatement(sql);
String msg = "A Msg";
stmt.execute();
stmt.close();
sendMsg(msg);
//不用提交数据库的事务,由消息中间件负责提交事务。
qmgr.commit();
// qmgr.backout();
System.out.println(" 事务操作成功");
}
catch (Exception e) {
// 发送失败的操作
System.out.println("事务操作失败,backout,错误信息是:");
e.printStackTrace();
try {
qmgr.backout();
}
catch (MQException me) {
System.out.println("backout失败:");
me.printStackTrace();
}
}
finally {
System.out.println("准备关闭资源: ");
try {
qmgr.disconnect();
conn.close();
}
catch (Exception e) {
System.out.println("关闭失败,错误信息如下:");
e.printStackTrace();
}
}
}
private static void sendMsg(String text) throws Exception {
MQQueue queue = qmgr.accessQueue(qmName, MQC.MQOO_INPUT_AS_Q_DEF
| MQC.MQOO_OUTPUT, null, null, null);
MQMessage msg = new MQMessage();
msg.writeUTF(text);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options += MQC.MQPMO_SYNCPOINT;
queue.put(msg, pmo);
}
}
这儿的事务处理代码实际上是有问题的。
还要相关的配置才行。
如果是ORACLE数据库本身相关的XA事务,ORACLE提供有相关的DEMO。请参阅。
分享到:
相关推荐
总之,“MQ Cert Part IV”深入浅出地讲解了WebSphere MQ Solution Designer认证考试中关于MQI调用的关键知识点,包括房务调用、消息处理调用、队列操作以及控制消息检索的高级技术,为准备参加认证考试的中级设计师...
这篇文章将介绍什么是分布式事务,分布式...相信耐心看完这篇文章,谈到分布式事务,不再只是有“2PC”、“3PC”、“MQ的消息事 务”、“最终一致性”、“TCC”等这些知识碎片,而是能够将知识连成一片,形成知识体系。
刀靴├──刀api-API模块├──├──商品api-商品API├──├──order-api-订单API├──刀auth-鉴权服务├──├──授权服务器-鉴权服务器├──├──auth-biz-鉴权逻辑├──刀务-业务服务├──├──刀品-...
- **MQ集群:**采用三个MQ集群,实现消息队列的高可用性。 - **应用服务集群:**提高应用服务的稳定性和可靠性。 - **负载均衡器:**确保系统能够承受高并发访问,提供稳定的性能。 ##### 4.3 系统实现的主要功能 ...