下面的例子程序是从Jboss的官方文档上面获取的。简单改造。
文档地址:http://www.jboss.org/file-access/default/members/jbossas/freezone/docs/Server_Configuration_Guide/4/html/index.html
please refer the chapter 7
package org.liuxt.jboss;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
/**
* A complete JMS client example program that sends a
* TextMessage to a Queue and asynchronously receives the
* message from the same Queue.
*
* @author Scott.Stark@jboss.org
* @version $Revision: 1.9 $
*/
public class PTPMessage
{
static Logger log=Logger.getLogger(PTPMessage.class);
QueueConnection conn;
QueueSession session;
Queue que;
QueueSender sender;
QueueReceiver receiver;
int i=0;
int messageCount=100;
final BlockingQueue<String> blockQueue=new ArrayBlockingQueue<String>(100);
public class ReceiverListener implements MessageListener
{
int j=0;
public void onMessage(Message msg)
{
TextMessage tm = (TextMessage) msg;
try {
log.info("onMessage, received message:=" + tm.getText());
//log.info("onMessage, received message:=" + tm.getJMSMessageID());
} catch(Throwable t) {
t.printStackTrace();
}
finally{
if(++j==messageCount){
try {
blockQueue.put("over");
} catch (InterruptedException e) {
}
}
}
}
}
/**
* setup Setup the PTP connection, session
* @throws JMSException
* @throws NamingException
*/
public void setupPTP()
throws JMSException,
NamingException
{
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
conn = qcf.createQueueConnection();
que = (Queue) iniCtx.lookup("queue/testQueue");
session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
}
public void startMessageTest() throws JMSException, NamingException
{
while(++i<=this.messageCount){
//log.info("Begin sendRecvAsync");
// Send a text msg
TextMessage tm = session.createTextMessage("...message..."+i);
this.sender.send(tm);
//log.info("sending message, sent text=" + tm.getText());
}
}
private void createSender() throws JMSException {
this.sender = this.session.createSender(que);
}
private void createReceiver() throws JMSException {
this.receiver = this.session.createReceiver(que);
this.receiver.setMessageListener(new ReceiverListener());
}
public void stop()
throws JMSException
{
conn.stop();
this.sender.close();
this.receiver.close();
session.close();
conn.close();
}
public static void main(String args[])
throws Exception
{
PTPMessage client = new PTPMessage();
client.setupPTP();
client.createSender();
client.createReceiver();
client.startMessageTest();
client.blockQueue.take();
//done.acquire();
client.stop();
}
}
分享到:
评论