`
absolute
  • 浏览: 190799 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

一段同步接收和发送MQ消息的代码

阅读更多
java 代码
  1. package com.sdb.payment.core.mq;   
  2.   
  3. import org.apache.log4j.Logger;   
  4.   
  5. import com.ibm.mq.MQC;   
  6. import com.ibm.mq.MQEnvironment;   
  7. import com.ibm.mq.MQException;   
  8. import com.ibm.mq.MQGetMessageOptions;   
  9. import com.ibm.mq.MQMessage;   
  10. import com.ibm.mq.MQPutMessageOptions;   
  11. import com.ibm.mq.MQQueue;   
  12. import com.ibm.mq.MQQueueManager;   
  13.   
  14. public class MessageQueueService {   
  15.     private static Logger logger = Logger.getLogger(MessageQueueService.class);   
  16.   
  17.     private String hostname = "192.168.0.117";   
  18.   
  19.     private String channel = "CHL.SVRCONN";   
  20.   
  21.     private String queueManager = "QM_SERVER";   
  22.   
  23.     private String sendQueue = "OMP.QRMT";   
  24.   
  25.     private String recvQueue = "OMP.QLCA";   
  26.   
  27.     private int port = 24100;   
  28.   
  29.     private int ccsid = 1381;   
  30.   
  31.     private int failedCount = 5;   
  32.   
  33.     private int intervalTime = 1000;   
  34.   
  35.     public MessageQueueService() {   
  36.         MQEnvironment.hostname = hostname;   
  37.         MQEnvironment.channel = channel;   
  38.         MQEnvironment.CCSID = ccsid;   
  39.         MQEnvironment.port = port;   
  40.     }   
  41.   
  42.     public String send(String sendMsg) throws Exception {   
  43.         MQQueueManager qManager = new MQQueueManager(queueManager);   
  44.            
  45.         // send message   
  46.         int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;   
  47.         MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);   
  48.         MQPutMessageOptions pmo = new MQPutMessageOptions();   
  49.         MQMessage send = new MQMessage();   
  50.         send.write(sendMsg.getBytes());   
  51.         System.out.println("send message : " + sendMsg);   
  52.         sQueue.put(send, pmo);   
  53.         sQueue.close();   
  54.         System.out.println("send message Id");   
  55.         for (int i = 0; i<send.messageId.length; i++) {   
  56.             System.out.print(send.messageId[i]);   
  57.         }   
  58.         System.out.println();   
  59.         System.out.println("send message Id");   
  60.         // fetch message   
  61.         openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING   
  62.                 + MQC.MQOO_INPUT_SHARED;   
  63.         MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);   
  64.         MQGetMessageOptions getOptions = new MQGetMessageOptions();   
  65.         getOptions.options = MQC.MQGMO_WAIT;   
  66.         getOptions.waitInterval = intervalTime;   
  67.            
  68.         MQMessage recvMsg = new MQMessage();   
  69.         recvMsg.messageId = send.messageId;//这里是关键,要保持接收的msgid跟发送的msgid值是一样的,   
  70.                                            //这样就会根据msgId来取队列的消息了,而不会取到别的消息   
  71.         send.clearMessage();   
  72.            
  73.         boolean received = false;   
  74.         int fetchCount = 0;   
  75.         while (!received) {   
  76.             try {   
  77.                 fetchCount++;   
  78.                 rQueue.get(recvMsg, getOptions);   
  79.                 //logger.debug("the " + fetchCount + " time fetch message!");   
  80.                 System.out.println("fetch message !!!");   
  81.                 received = true;   
  82.             } catch (MQException me) {   
  83.                 if (me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {   
  84.                     if (fetchCount > failedCount) {   
  85.                         recvMsg.clearMessage();   
  86.                         rQueue.close();   
  87.                         qManager.disconnect();   
  88.                         //logger.error("can't fetch message for " + me.getMessage());   
  89.                         return null;   
  90.                     }   
  91.                 }   
  92.             } catch (Exception ex) {   
  93.                 recvMsg.clearMessage();   
  94.                 rQueue.close();   
  95.                 qManager.disconnect();   
  96.                 //logger.error("can't fetch message for " + ex.getMessage());   
  97.                 return null;   
  98.             }   
  99.         }   
  100.            
  101.         byte[] bMsg = new byte[recvMsg.getMessageLength()];   
  102.         recvMsg.readFully(bMsg);   
  103.         System.out.println("rec correlationId Id");   
  104.         for (int i = 0; i<recvMsg.correlationId.length; i++) {   
  105.             System.out.print(recvMsg.correlationId[i]);   
  106.         }   
  107.         System.out.println();   
  108.         System.out.println("rec correlationId Id");   
  109.         String recv = new String(bMsg);   
  110.         recvMsg.clearMessage();   
  111.         rQueue.close();   
  112.            
  113.         qManager.disconnect();   
  114.            
  115.         return recv;   
  116.     }   
  117.   
  118.     public void setChannel(String channel) {   
  119.         this.channel = channel;   
  120.     }   
  121.   
  122.     public void setHostname(String hostname) {   
  123.         this.hostname = hostname;   
  124.     }   
  125.   
  126.     public void setQueueManager(String queueManager) {   
  127.         this.queueManager = queueManager;   
  128.     }   
  129.   
  130.     public void setPort(int port) {   
  131.         this.port = port;   
  132.     }   
  133.   
  134.     public void setIntervalTime(int intervalTime) {   
  135.         this.intervalTime = intervalTime;   
  136.     }   
  137.   
  138.     public void setFailedCount(int failedCount) {   
  139.         this.failedCount = failedCount;   
  140.     }   
  141.   
  142.     public void setRecvQueue(String recvQueue) {   
  143.         this.recvQueue = recvQueue;   
  144.     }   
  145.   
  146.     public void setSendQueue(String sendQueue) {   
  147.         this.sendQueue = sendQueue;   
  148.     }   
  149. }  
分享到:
评论

相关推荐

    [重要]基于Websphere MQ非持久化消息实现异步转同步

    以下是一段简单的Java代码示例,展示了如何发送和接收非持久化消息: ```java // 创建WMQ连接工厂和队列管理器 ConnectionFactory cf = new com.ibm.mq.jms.MQConnectionFactory(); cf.setStringProperty...

    基于Go实现的分布式MQ消息队列

    - **队列模式(点对点)**:这种模式类似于Redis中的List结构,通过“rpush”和“lpop”操作来实现消息的发送和接收。但是,它无法满足某些场景下消息需要被多个消费者同时消费的需求。 - **发布/订阅模式(PUB/SUB...

    基于JavaSpring的MQ消息处理框架.zip

    系统支持多种消息中间件(如Kafka和ActiveMQ),并提供了灵活的消息发送和接收策略。主要功能包括 1. 持久化事件框架基于Google的EventBus封装的同步事件框架,支持持久化事件,并通过任务调度系统定时重试执行持久...

    netMQ订阅和发布

    - 多线程使用需谨慎,因为NetMQ是线程不安全的,需要正确管理和同步对消息队列的访问。 7. **调试与日志**: - 由于`NetMQPub-Sub.zip`已经经过调试,可以正常运行,所以代码应该包含了必要的错误处理和日志记录...

    阿里云消息队列MQ

    定时/延时消息功能允许生产者设定消息在未来特定时间点或延迟一段时间后投递,增加了消息发送的灵活性。需要注意的是,设置的投递时间必须在当前时间戳之后,否则消息会立即投递。 要使用阿里云消息队列MQ,开发者...

    消息缓冲通信机制代码

    6. 消息发送和接收:该代码中实现了消息的发送和接收,使用`send`函数发送消息,使用`receive`函数接收消息。 7. 线程创建和销毁:该代码中实现了线程的创建和销毁,使用`Create`函数创建线程,使用`Destroy`函数...

    WebShpere MQ Java Interface日文版

    - **发送端与接收端Bean**:包括发送消息并接收响应的发送端Bean(Sender Bean),以及接收消息并发送响应的接收端Bean(Receiver Bean)。 - **简化设置**:为开发者隐藏了消息传递(JMS)的具体细节,简化了开发流程。 ...

    互斥锁的消息队列,很综合的程序源码

    在IT行业中,互斥锁(Mutex)和消息队列是两种关键的同步机制,它们在多线程编程和进程通信中发挥着重要作用。本程序源码综合运用了这些概念,结合了延时处理和容错处理,使得系统更加稳定和高效。 首先,互斥锁是...

    RabbitMQ_Mirror机制分析

    新节点加入允许新的slave节点中途加入到集群中,新加入的slave节点并不同步master节点的所有在该slave加入之前存在的消息,只对新来的消息保持同步,随着旧的消息被消费,经过一段时间后,slave节点就会与master节点...

    浅谈消息中间件IBM WebSphere MQ

    例如,QM1和QM2分别定义本地队列和远程队列,通过定义发送和接收通道,允许两个队列管理器之间来回交换消息。 配置示例: QM1: - 定义本地队列QL_QM1和SENDER1,其中SENDER1作为传输队列。 - 定义远程队列REMOTE1...

    关于分布式异构数据库数据同步系统的研究

    1. **消息服务器(JMSServer)**:作为系统核心,遵循JMS标准,负责数据的异步传输和消息的存储转发。 2. **名字服务器(LDAP/JNDI Provider)**:用于服务发现和资源定位,确保系统组件间的通信畅通。 3. **管理...

    C# MSMQ 简单事例

    事务性消息允许在一组操作中发送和接收消息,确保消息的原子性。消息的持久性保证即使服务重启,消息也不会丢失。而过滤功能则允许你根据消息的属性或标签来筛选消息。 为了更好地理解这个过程,你可以参考`...

    WebSphere MQ

    - 前端Web服务器将客户提交的交易请求封装成消息,通过MQ通道发送给后端相应的队列管理器。 - 后端队列管理器根据消息内容将其分发至相应处理系统。 - 处理完成后,返回结果消息再次通过MQ通道传回前端。 #### ...

    ZeroMQ指导

    为了确保消息的准确传递,0MQ支持定义合同与协议,这样就可以在发送和接收消息之前协商好数据交换的规则。 #### 面向服务器的可靠的队列(管家模式) 管家模式为服务器提供了一种方式,通过它管理消息队列,以确保...

    CDC-MQ数据库生僻字转码&#158;;

    在IT行业中,数据库管理和消息队列(MQ)的交互是一个常见的任务,特别是在数据同步和流处理的场景下。本文将详细探讨标题“CDC-MQ数据库生僻字转码”所涉及的知识点,以及如何解决描述中提到的生僻字乱码问题。 **...

    Spring-JMS把企业消息处理变容易.doc

    模板类提供了发送消息、同步接收消息以及访问JMS会话和消息生产者的辅助方法。在需要更复杂操作时,模板类会将控制权交给用户实现的回调接口,如`MessageCreator`和`SessionCallback`,以实现自定义的行为。 `org....

    JMS详细教程

    - **消息**:在JMS中,消息是数据的载体,它包含了一段要传输的信息。 - **生产者(Producer)**:生产者是创建并发送消息的应用程序。 - **消费者(Consumer)**:消费者是接收并处理消息的应用程序。 - **队列...

    企业号回调模式

    回调模式允许程序在特定事件发生时执行一段代码,而不是等待该事件完成后再进行下一步操作。这种模式在处理网络请求、数据同步、任务调度等方面非常常见。 在“企业号回调模式”中,我们主要关注的是如何在企业应用...

    异步消息实战

    2. **2.0版本**:在此基础上引入了Broker作为中间层,客户端不再直接与数据库交互,而是通过Broker进行消息的发送和接收。这一改进解决了许多1.0版本中存在的问题,例如: - 提高了系统的稳定性和可用性。 - 减少...

Global site tag (gtag.js) - Google Analytics