- 浏览: 13940 次
- 性别:
- 来自: 南昌
文章分类
最新评论
一、简述
EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。
EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。
二、例子
见上章提到的源码。
(1)MDBQueueBean.java 这是EJB,它不需要其它什么 Home,Remote等接口。
(2)ejb-jar.xml EJB描述。跟其它类型的EJB的描述大不一样。
(3)jboss.xml JBOSS部署的描述。
需要部署的就以上三个文件。见myMDB.jar
(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。
三、MDB的不同之处:
在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已:
QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");
queueReceiver.setMessageListener(this);
感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。
摸了一天多,终于在jboss网站找到帮助文件:救星呀:
http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155
不过,在我的环境中,我还得做一点点小修改。见源码。。。
附:
(1)MDBQueueBean.java
(3)jboss.xml
(4)MDBClient.java
EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。
EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。
二、例子
见上章提到的源码。
(1)MDBQueueBean.java 这是EJB,它不需要其它什么 Home,Remote等接口。
(2)ejb-jar.xml EJB描述。跟其它类型的EJB的描述大不一样。
(3)jboss.xml JBOSS部署的描述。
需要部署的就以上三个文件。见myMDB.jar
(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。
三、MDB的不同之处:
在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已:
QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");
queueReceiver.setMessageListener(this);
感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。
摸了一天多,终于在jboss网站找到帮助文件:救星呀:
http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155
不过,在我的环境中,我还得做一点点小修改。见源码。。。
附:
(1)MDBQueueBean.java
package study.jms; import javax.ejb.MessageDrivenBean; import javax.jms.MessageListener; import javax.ejb.MessageDrivenContext; //import javax.ejb.CreateException; import javax.jms.Message; /**add by zengabo */ import javax.ejb.EJBException; import javax.naming.InitialContext; import javax.naming.Context; import javax.naming.NamingException; import javax.jms.QueueConnectionFactory; import javax.jms.QueueConnection; import javax.jms.QueueSession; import java.util.HashMap; //import java.util.Hashtable; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueSender; import javax.jms.TextMessage; public class MDBQueueBean implements MessageDrivenBean, MessageListener { MessageDrivenContext messageDrivenContext; /** add by zengabo */ private Context myContext; private QueueConnectionFactory queueConnectionFactory; private String queueConnectionFactoryName = "java:comp/env/jms/QCF"; private QueueConnection queueConnection; private QueueSession queueSession; private int i_count = 0 ; public MDBQueueBean(){ } public void ejbCreate() { //throws CreateException { System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count)); try{ init(); }catch (Exception e) { System.out.println(" init QueueMDB error"+e.toString()); throw new EJBException(" init QueueMDB error", e); } } private void init() throws JMSException, NamingException { myContext = new InitialContext(); /** lookup 1 */ Object obj = myContext.lookup(queueConnectionFactoryName); queueConnectionFactory = (QueueConnectionFactory) obj; queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE); /** in MDB , these are not use . QueueReceiver queueReceiver =null; queueReceiver = queueSession.createReceiver("queue/testQueue"); queueReceiver.setMessageListener(this); * * */ } public void ejbRemove() { this.messageDrivenContext = null ; try{ if (queueSession != null) { queueSession.close(); } if (queueConnection != null) { queueConnection.close(); } }catch (Exception e) { } } public void onMessage(Message message) { if (message instanceof javax.jms.BytesMessage) { javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message; /** @todo Process bytesMessage here */ System.out.println("**** get bytesMessage:") ; System.out.println(bytesMessage.toString()) ; } else if (message instanceof javax.jms.MapMessage) { javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message; /** @todo Process mapMessage here */ System.out.println("**** get mapMessage:") ; System.out.println(mapMessage.toString()) ; } else if (message instanceof javax.jms.ObjectMessage) { javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message; /** @todo Process objectMessage here */ System.out.println("**** get ObjectMessage:") ; HashMap hm = null; try { hm = (HashMap) objectMessage.getObject(); } catch (JMSException ex2) { System.out.println(ex2.toString()); } System.out.println(hm.get("name")) ; System.out.println(hm.get("age")) ; System.out.println(hm.get("sex")) ; } else if (message instanceof javax.jms.StreamMessage) { javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message; /** @todo Process streamMessage here */ System.out.println("**** get StreamMessage:") ; System.out.println(streamMessage.toString()) ; } else if (message instanceof javax.jms.TextMessage) { javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message; /** @todo Process textMessage here */ String rev = ""; System.out.println("**** get textMessage:") ; try { rev = objectMessage.getText(); System.out.println(rev); } catch (JMSException ex1) { System.out.println(ex1.toString()); } } else{ System.out.println("**** Message is unknow type") ; return ; } /** 回复 */ try{ Queue dest = (Queue) message.getJMSReplyTo(); sendReply(" jboss recv ok", dest); }catch(Exception ee){ System.out.println("**** Reply Message Error"+ee.toString()) ; } } private void sendReply(String text, Queue dest) throws JMSException { QueueSender sender = queueSession.createSender(dest); TextMessage tm = queueSession.createTextMessage(text); sender.send(tm); sender.close(); } public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) { this.messageDrivenContext = messageDrivenContext; } }
(2)ejb-jar.xml <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"> <ejb-jar> <display-name>myMDB</display-name> <enterprise-beans> <message-driven> <ejb-name>MDBQueue</ejb-name> <ejb-class>study.jms.MDBQueueBean</ejb-class> <transaction-type>Container</transaction-type> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <resource-ref> <description>test mdb</description> <res-ref-name>jms/QCF</res-ref-name> <res-type>javax.jms.QueueConnectionFactory</res-type> <res-auth>Container</res-auth> </resource-ref> </message-driven> </enterprise-beans> <assembly-descriptor> <container-transaction> <method> <ejb-name>MDBQueue</ejb-name> <method-name>*</method-name> </method> <trans-attribute>Required</trans-attribute> </container-transaction> </assembly-descriptor> </ejb-jar>
(3)jboss.xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd"> <jboss> <enterprise-beans> <message-driven> <ejb-name>MDBQueue</ejb-name> <destination-jndi-name>queue/B</destination-jndi-name> <configuration-name>Standard Message Driven Bean</configuration-name> <resource-ref> <res-ref-name>jms/QCF</res-ref-name> <jndi-name>ConnectionFactory</jndi-name> </resource-ref> </message-driven> </enterprise-beans> </jboss>
(4)MDBClient.java
package study.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; import java.util.Hashtable; import javax.naming.Context; /** * A complete JMS client example program that sends N TextMessages to * a Queue B and asynchronously receives the messages as modified by * TextMDB from Queue A. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class MDBClient { static final int N = 10; static CountDown done = new CountDown(N); QueueConnection conn; QueueSession session; Queue queA; Queue queB; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text="+tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { // InitialContext iniCtx = new InitialContext(); InitialContext iniCtx = getInitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); queA = (Queue) iniCtx.lookup("queue/A"); queB = (Queue) iniCtx.lookup("queue/B"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } private InitialContext getInitialContext() throws NamingException { Hashtable environment = new Hashtable(); environment.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); environment.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); environment.put(Context.PROVIDER_URL, "jnp://localhost:1099"); return new InitialContext(environment); } public void sendRecvAsync(String textBase) throws JMSException, NamingException, InterruptedException { System.out.println("Begin sendRecvAsync"); // Setup the PTP connection, session setupPTP(); // Set the async listener for queA QueueReceiver recv = session.createReceiver(queA); recv.setMessageListener(new ExListener()); // Send a few text msgs to queB QueueSender send = session.createSender(queB); for(int m = 0; m < 10; m ++) { TextMessage tm = session.createTextMessage(textBase+"#"+m); tm.setJMSReplyTo(queA); send.send(tm); System.out.println("sendRecvAsync, sent text="+tm.getText()); } System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin MDBClient,now=" + System.currentTimeMillis()); MDBClient client = new MDBClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); System.out.println("End MDBClient"); } }
相关推荐
【描述】"Ex4_to_mq4 -225 use for study only" 暗示这个版本可能是一个学习或研究用的工具,不推荐用于商业或实际交易环境。使用此工具的目的是为了学习和理解EA的工作原理,或者对现有的EA进行修改和优化。由于它...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,用于解耦应用程序的不同组件,提高系统的可扩展性和稳定性。RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的开源消息代理软件,广泛应用...
web web项目继承自spring-boot-study api 接口统一定义,model统一定义 common 公共模块 controller 控制器 dao 数据访问层 service 服务层 #启动项目说明: 要安装mysql,创建数据库,导入doc目录下的sql文件 启动...
在消息队列(MQ)中,"兔子MQ"(RabbitMQ)是一种广泛使用的开源消息代理,它遵循AMQP(Advanced Message Queuing Protocol)协议,能够实现高可用性和可扩展性。作为动物园管理员,我们需要理解和掌握如何设置、...
WebSphere MQ则被用作中间件,提供高可用性的消息传输服务,确保数据在不同系统间的无缝流动。WebSphere Portal Server与Web Services for Remote Portlets(WSRP)结合,为用户提供统一的门户体验,集中管理各种...
ELK已经学习完毕,目前考虑将最近所学的ELK整合Cloud结合MQ和EasyExcel 做一个文件管理项目,或报表项目。目前整合想法是,L同步数据库到ES上, 发生增删改,同时调用L,然后有数据优先找ES,没有就Redis缓存,再...
1.rabbitMQ消息中间件(注意:安装mq之前先安装erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的) 2.Redis(这里redis的目的是保存消费次数) 二、Mq消息机制: 1、mq消息机制几个重要的组件: 1)、...
改后缀 epub
springboot_studyspringboot学习springboot-security springboot整合权限认证springboot_MQ 整合RabbitMQspringboot_rabbir 整合RabbitMQspringboot_RocketMQ 整合RocketMQ 入门 事务处理springboot_design 设计模式...
“第五讲:企业应用集成模式”可能详细介绍了不同的集成策略,如API Gateway用于统一接口,企业级集成平台(如IBM WebSphere MQ或Apache Kafka)用于消息传递,以及适配器模式以兼容不同系统的接口。 “第六讲:...
- **ESL**: <http://www.nceltr.mq.edu.au/ESL> - **Eleaston.com**: - **Study.com**: <http://www.study.com/resources.html> - **TeachingEnglish**: - **Online Courses**: - **NLL**: - **Wicked Stuff**: ...