(1)MDBQueueBean.java
package study.jms;
import javax.ejb.MessageDrivenBean;
import javax.jms.MessageListener;
import javax.ejb.MessageDrivenContext;
//import javax.ejb.CreateException;
import javax.jms.Message;
/**add by zengabo */
import javax.ejb.EJBException;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import java.util.HashMap;
//import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
public class MDBQueueBean
implements MessageDrivenBean, MessageListener {
MessageDrivenContext messageDrivenContext;
/** add by zengabo */
private Context myContext;
private QueueConnectionFactory queueConnectionFactory;
private String queueConnectionFactoryName = "java:comp/env/jms/QCF";
private QueueConnection queueConnection;
private QueueSession queueSession;
private int i_count = 0 ;
public MDBQueueBean(){
}
public void ejbCreate() { //throws CreateException {
System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));
try{
init();
}catch (Exception e) {
System.out.println(" init QueueMDB error"+e.toString());
throw new EJBException(" init QueueMDB error", e);
}
}
private void init() throws JMSException, NamingException {
myContext = new InitialContext();
/** lookup 1 */
Object obj = myContext.lookup(queueConnectionFactoryName);
queueConnectionFactory = (QueueConnectionFactory) obj;
queueConnection = queueConnectionFactory.createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
/** in MDB , these are not use .
QueueReceiver queueReceiver =null;
queueReceiver = queueSession.createReceiver("queue/testQueue");
queueReceiver.setMessageListener(this);
*
* */
}
public void ejbRemove() {
this.messageDrivenContext = null ;
try{
if (queueSession != null) {
queueSession.close();
}
if (queueConnection != null) {
queueConnection.close();
}
}catch (Exception e) {
}
}
public void onMessage(Message message) {
if (message instanceof javax.jms.BytesMessage) {
javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
/** @todo Process bytesMessage here */
System.out.println("**** get bytesMessage:") ;
System.out.println(bytesMessage.toString()) ;
}
else if (message instanceof javax.jms.MapMessage) {
javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
/** @todo Process mapMessage here */
System.out.println("**** get mapMessage:") ;
System.out.println(mapMessage.toString()) ;
}
else if (message instanceof javax.jms.ObjectMessage) {
javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
/** @todo Process objectMessage here */
System.out.println("**** get ObjectMessage:") ;
HashMap hm = null;
try {
hm = (HashMap) objectMessage.getObject();
}
catch (JMSException ex2) {
System.out.println(ex2.toString());
}
System.out.println(hm.get("name")) ;
System.out.println(hm.get("age")) ;
System.out.println(hm.get("sex")) ;
}
else if (message instanceof javax.jms.StreamMessage) {
javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
/** @todo Process streamMessage here */
System.out.println("**** get StreamMessage:") ;
System.out.println(streamMessage.toString()) ;
}
else if (message instanceof javax.jms.TextMessage) {
javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
/** @todo Process textMessage here */
String rev = "";
System.out.println("**** get textMessage:") ;
try {
rev = objectMessage.getText();
System.out.println(rev);
}
catch (JMSException ex1) {
System.out.println(ex1.toString());
}
}
else{
System.out.println("**** Message is unknow type") ;
return ;
}
/** 回复 */
try{
Queue dest = (Queue) message.getJMSReplyTo();
sendReply(" jboss recv ok", dest);
}catch(Exception ee){
System.out.println("**** Reply Message Error"+ee.toString()) ;
}
}
private void sendReply(String text, Queue dest) throws JMSException {
QueueSender sender = queueSession.createSender(dest);
TextMessage tm = queueSession.createTextMessage(text);
sender.send(tm);
sender.close();
}
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
this.messageDrivenContext = messageDrivenContext;
}
}
(2)ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
<display-name>myMDB</display-name>
<enterprise-beans>
<message-driven>
<ejb-name>MDBQueue</ejb-name>
<ejb-class>study.jms.MDBQueueBean</ejb-class>
<transaction-type>Container</transaction-type>
<message-driven-destination>
<destination-type>javax.jms.Queue</destination-type>
</message-driven-destination>
<resource-ref>
<description>test mdb</description>
<res-ref-name>jms/QCF</res-ref-name>
<res-type>javax.jms.QueueConnectionFactory</res-type>
<res-auth>Container</res-auth>
</resource-ref>
</message-driven>
</enterprise-beans>
<assembly-descriptor>
<container-transaction>
<method>
<ejb-name>MDBQueue</ejb-name>
<method-name>*</method-name>
</method>
<trans-attribute>Required</trans-attribute>
</container-transaction>
</assembly-descriptor>
</ejb-jar>
(3)jboss.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd">
<jboss>
<enterprise-beans>
<message-driven>
<ejb-name>MDBQueue</ejb-name>
<destination-jndi-name>queue/B</destination-jndi-name>
<configuration-name>Standard Message Driven Bean</configuration-name>
<resource-ref>
<res-ref-name>jms/QCF</res-ref-name>
<jndi-name>ConnectionFactory</jndi-name>
</resource-ref>
</message-driven>
</enterprise-beans>
</jboss>
(4)MDBClient.java
package study.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
import java.util.Hashtable;
import javax.naming.Context;
/**
* A complete JMS client example program that sends N TextMessages to
* a Queue B and asynchronously receives the messages as modified by
* TextMDB from Queue A.
*
* @author Scott.Stark@jboss.org
* @version $Revision: 1.10 $
*/
public class MDBClient
{
static final int N = 10;
static CountDown done = new CountDown(N);
QueueConnection conn;
QueueSession session;
Queue queA;
Queue queB;
public static class ExListener
implements MessageListener
{
public void onMessage(Message msg)
{
done.release();
TextMessage tm = (TextMessage) msg;
try {
System.out.println("onMessage, recv text="+tm.getText());
} catch(Throwable t) {
t.printStackTrace();
}
}
}
public void setupPTP()
throws JMSException, NamingException
{
// InitialContext iniCtx = new InitialContext();
InitialContext iniCtx = getInitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
conn = qcf.createQueueConnection();
queA = (Queue) iniCtx.lookup("queue/A");
queB = (Queue) iniCtx.lookup("queue/B");
session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
}
private InitialContext getInitialContext() throws NamingException {
Hashtable environment = new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
environment.put(Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");
return new InitialContext(environment);
}
public void sendRecvAsync(String textBase)
throws JMSException, NamingException, InterruptedException
{
System.out.println("Begin sendRecvAsync");
// Setup the PTP connection, session
setupPTP();
// Set the async listener for queA
QueueReceiver recv = session.createReceiver(queA);
recv.setMessageListener(new ExListener());
// Send a few text msgs to queB
QueueSender send = session.createSender(queB);
for(int m = 0; m < 10; m ++) {
TextMessage tm = session.createTextMessage(textBase+"#"+m);
tm.setJMSReplyTo(queA);
send.send(tm);
System.out.println("sendRecvAsync, sent text="+tm.getText());
}
System.out.println("End sendRecvAsync");
}
public void stop()
throws JMSException
{
conn.stop();
session.close();
conn.close();
}
public static void main(String args[])
throws Exception
{
System.out.println("Begin MDBClient,now=" +
System.currentTimeMillis());
MDBClient client = new MDBClient();
client.sendRecvAsync("A text msg");
client.done.acquire();
client.stop();
System.exit(0);
System.out.println("End MDBClient");
}
}
分享到:
相关推荐
下面我们将通过一个简单的Java JMS代码示例来理解其基本用法: 1. **JMS提供者选择** 在开始之前,你需要选择一个JMS提供者,例如Apache ActiveMQ、IBM WebSphere MQ或RabbitMQ。这里假设我们使用ActiveMQ,首先...
在本文中,我们将深入探讨SpringJMS的基本概念、如何与ActiveMQ集成,以及如何通过示例代码理解其工作原理。 1. **SpringJMS简介** SpringJMS是Spring框架对JMS API的包装,它简化了生产者和消费者之间的消息通信...
本示例代码将帮助我们深入理解JMS的基本用法和概念。 JMS的核心组件包括生产者(Producer)、消费者(Consumer)和消息队列(Message Queue)。生产者创建并发送消息,消费者接收并处理这些消息,而消息队列则作为...
`javax.jms.jar` 包是实现JMS规范的核心库,包含了JMS接口和实现类,使得开发者可以轻松地在Java应用程序中集成消息服务。 JMS主要包含两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe...
在IT领域,特别是Java企业级应用开发中,JMS(Java Message Service)是一个不可或缺的技术,它为应用程序提供了异步通信的能力。从给定的代码片段来看,主要涉及到JMS消息驱动Bean(MDB)的实现,这在Java EE环境中...
Java消息服务(Java Message Service,简称JMS)是...通过深入学习这个教程和研究源代码,开发者可以更好地理解JMS的工作方式,并能在实际项目中有效地利用JMS进行应用程序间的数据交换,提高系统的可靠性和扩展性。
基于Java通讯开发jms源代码 (jms通讯开发源码) java,net,socket,通讯开发,jms /* * @(#)Message.java 1.60 02/04/09 * * Copyright 1997-2002 Sun Microsystems, Inc. All Rights Reserved. * * SUN ...
spring-jmsspring-jmsspring-jmsspring-jmsspring-jmsspring-jms
在“flex用到JMS代码下载”这个主题中,我们讨论的是如何在Flex应用中集成JMS来实现分布式通信。以下是一些关键知识点: 1. **Flex与JMS的结合**: Flex作为客户端技术,通过AMF(Action Message Format)或HTTP等...
JMS允许应用程序创建、发送、接收和读取消息,它是异步通信的一种方式,能够有效地解耦发送者和接收者,使得系统更加健壮和可扩展。 在JMS中,主要有两种类型的消息模型:点对点(Point-to-Point,P2P)和发布/订阅...
在实际开发中,开发者会使用JMS API或者第三方库(如Spring JMS)来简化上述步骤,提高代码的可读性和可维护性。通过学习和理解JMS的基础知识,我们可以构建出稳定、可靠的分布式系统,实现不同服务间的解耦和异步...
JMS允许应用程序在不关心彼此的位置、速度或执行时间的情况下进行通信,这在分布式系统中尤为重要,因为它们可以处理大量并发请求并确保数据的可靠传输。 JMS的核心概念包括以下几点: 1. **消息**:消息是数据的...
Java聊天程序代码是一个基于JavaEE技术栈开发的实时通信应用,它允许用户通过网络进行文本、语音甚至视频的交流。这个完整的压缩包包含了所有必要的组件,使得开发者可以在解压后直接在特定环境中运行,比如Eclipse...
Classes contained in javax.jms.jar: javax.transaction.xa.XAResource.class javax.jms.BytesMessage.class javax.jms.Message.class javax.jms.JMSException.class javax.jms.Destination.class javax.jms....
**JMS(Java Message Service)** 是一个Java平台上的标准接口,它定义了一种统一的API,使得应用程序可以与各种消息中间件进行交互。JMS主要用于在分布式环境中传递消息,提供可靠的异步通信机制。 **ActiveMQ** ...
通过使用JMS,开发者可以专注于业务逻辑,而不必关心底层的消息传输细节,这极大地提高了代码的可移植性和系统的灵活性。同时,JMS提供的消息持久化和优先级机制确保了消息的重要性和可靠性。在WebLogic这样的应用...
JMS是Java平台中用于异步通信的标准API,它允许应用程序创建、发送、接收和读取消息。在WebLogic环境中配置JMS并进行测试是确保分布式系统可靠性和可扩展性的重要步骤。以下将详细讲解这个过程。 1. **JMS基本概念*...
通过JMS,开发人员能够编写不依赖于特定消息服务提供商的应用程序,实现跨平台和跨应用的通信。 2. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub...
Java Message Service(JMS)是Java平台中用于创建、发送、接收和读取消息的应用程序接口。它提供了一种标准的方式来解耦...在实践中,理解并熟练运用JMS消息服务代码,能够有效提升Java企业级应用的开发效率和质量。
javax.jms.BytesMessage.class javax.jms.Connection.class javax.jms.ConnectionConsumer.class javax.jms.ConnectionFactory.class javax.jms.ConnectionMetaData.class javax.jms.DeliveryMode.class javax.jms....