`
jackle_liu
  • 浏览: 147799 次
文章分类
社区版块
存档分类
最新评论

jboss4官方jms程序(Pub-Sub)

阅读更多
两个类:
TopicRecvClient.java(接收类subscriber)
TopicSendClient.java(发送类publisher)
 
TopicRecvClient.java:
package com.rox.jms;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
 * A JMS client example program that synchronously receives a message a Topic
 *
 *
 * @author Scott.Stark@jboss.org
 *
 * @version $Revision:$
 *
 */
public class TopicRecvClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    public void setupPubSub()
    throws JMSException, NamingException
    {
        Properties environment = new Properties();
        environment.put(Context.PROVIDER_URL, "localhost:1099");
        environment.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
        InitialContext iniCtx = new InitialContext(environment);
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
        TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    public void recvSync()
    throws JMSException, NamingException
    {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createSubscriber(topic);
        Message msg = recv.receive(5000);
        if (msg == null)
            System.out.println("Timed out waiting for msg");
        else
            System.out.println("TopicSubscriber.recv, msgt=" + msg);
    }
    public void stop() throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[]) throws Exception
    {
        System.out.println("Begin TopicRecvClient, now="
                + System.currentTimeMillis());
        TopicRecvClient client = new TopicRecvClient();
        client.recvSync();
        client.stop();
        System.out.println("End TopicRecvClient");
        System.exit(0);
    }
}
TopicSendClient.java:
package com.rox.jms;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
 * A JMS client example program that sends a TextMessage to a Topic
 *
 *
 * @author Scott.Stark@jboss.org
 *
 * @version $Revision:$
 *
 */
public class TopicSendClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    public void setupPubSub()
    throws JMSException, NamingException
    {
        Properties environment = new Properties();
        environment.put(Context.PROVIDER_URL, "localhost:1099");
        environment.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
        InitialContext iniCtx = new InitialContext(environment);
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
        TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    public void sendAsync(String text)
    throws JMSException, NamingException
    {
        System.out.println("Begin sendAsync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Send a text msg
        TopicPublisher send = session.createPublisher(topic);
        TextMessage tm = session.createTextMessage(text);
        send.publish(tm);
        System.out.println("sendAsync, sent text="
        + tm.getText());
        send.close();
        System.out.println("End sendAsync");
    }
    public void stop() throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[]) throws Exception
    {
        System.out.println("Begin TopicSendClient, now="
                + System.currentTimeMillis());
        TopicSendClient client = new TopicSendClient();
        client.sendAsync("A text msg, now=" + System.currentTimeMillis());
        client.stop();
        System.out.println("End TopicSendClient");
        System.exit(0);
    }
}
  运行的时候先运行TopicRecvClient,然后迅速(5000毫秒之内)运行TopicSendClient,会看到TopicRecvClient的控制台中出现接收的信息.topic的消息只能在发送的时候接收到,如果再发送之后再去接收是收不到任何信息的.
  还有一种介于p2p和pub2sub之间的一种jms类型(durable pub2sub),写一个java类:
DurableTopicRecvClient.java:
package com.rox.jms;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
 * A JMS client example program that synchronously receives a message a Topic
 *
 *
 *
 * @author Scott.Stark@jboss.org
 *
 * @version $Revision:$
 *
 */
public class DurableTopicRecvClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    public void setupPubSub()
    throws JMSException, NamingException
    {
        Properties environment = new Properties();
        environment.put(Context.PROVIDER_URL, "localhost:1099");
        environment.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
        InitialContext iniCtx = new InitialContext(environment);
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("john", "needle");
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
        TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    public void recvSync()
    throws JMSException, NamingException
    {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createDurableSubscriber(topic,
                "chap6-ex1dtps");
        Message msg = recv.receive(5000);
        if (msg == null)
            System.out.println("Timed out waiting for msg");
        else
            System.out.println("DurableTopicRecvClient.recv, msgt=" + msg);
    }
    public void stop() throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[]) throws Exception
    {
        System.out.println("Begin DurableTopicRecvClient, now="
                + System.currentTimeMillis());
        DurableTopicRecvClient client = new DurableTopicRecvClient();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }
} 
  这时候就可以先运行TopicSendClient了,然后再运行DurableTopicRecvClient,也可以收到消息.
     DurableTopicRecvClient.java中有两句代码需要引起注意:
     conn = tcf.createTopicConnection("john", "needle");
     TopicSubscriber recv = session.createDurableSubscriber(topic,"chap6-ex1dtps");
    
分享到:
评论
1 楼 卒子99 2007-04-20  
不知道你分析过这段代码吗了?我用的是jboss4.0,上面的例子也已经调通了,不过还是有一个问题,就是p/s这种模式下使用Durable订阅,在书上看到是需要指定用户名和密码,就是下面这段代码:
conn = tcf.createTopicConnection("john", "needle");

之前也查阅过一些文章,是要把example/jms下的jbossmq-state.xml复制到服务器的conf下。我已经试过了,如果不部署这个配置文件已经能订阅成功。但是我在配置文件中修改了john的密码,却没有任何作用。所以我想请问,这个jms用户列表是在哪儿定义的呢?

相关推荐

    jboss-jms包

    - **消息模型**:JMS支持两种消息模型,点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 - P2P模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者消费。消息一旦被消费...

    jboss 7 配置 jms

    - JMS 支持两种主要的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。 - 主要组件包括:消息生产者、消息消费者、消息代理(如消息队列或主题)、消息和目的地。 2. **...

    JBOSS建立JMS应用实例(附源码)

    - **消息模型**:JMS提供了两种主要的消息模型——点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。P2P模型基于队列,消息由一个生产者发送到队列,然后由一个或多个消费者接收;Pub/Sub...

    基于Jboss的jms编程

    【基于Jboss的JMS编程】是关于Java消息服务(JMS)在Jboss应用服务器上的实现和配置的教程,适合初学者理解JMS的基本概念和应用。JMS是一种标准API,用于在分布式环境中发送、接收和管理消息,提供可靠的数据传输。 ...

    Spring JMS 消息处理-基于JNDI

    Spring JMS可以处理点对点(P2P)和发布/订阅(Pub/Sub)两种消息模式,这两种模式在分布式系统中各有其应用场景。 接下来,我们谈谈JNDI。JNDI是一种服务,它提供了一个接口来查找和管理Java对象的名称,这些对象...

    JBOSS使用指南

    - **下载**: 用户可以从JBoss官方网站(http://www.jboss.org/jbossas/downloads/)免费下载JBoss应用服务器。提供的二进制版本包括`.zip`、`.tar.gz`及`.bz2`格式,根据系统平台选择相应的二进制版本进行下载。 - *...

    JBoss中文文档(pdf版)

    - 支持点对点(P2P)和发布/订阅(Pub/Sub)模型。 - **消息驱动Bean**: - 用于处理JMS消息的EJB组件。 - 可以自动接收并处理来自消息队列的消息。 #### 七、容器管理持久化(CMP) - **CMP**: - 容器管理持久化是一...

    JMS 简单使用指南

    - **消息服务体系结构**:JMS定义了两种主要的消息传送模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)模型。 - **消息传送模型对照表**:PTP模式使用Queue,消息只能被一个消费...

    Jboss EJB3.0 实例教程.pdf

    - **主题消息**:基于发布/订阅(Pub/Sub)的消息传递模型。 #### 六、实体Bean (Entity Bean) - **持久化配置**:`persistence.xml`文件是EJB 3.0实体Bean的重要配置文件之一,用于指定持久化单元。 - **数据源...

    fuse esb mq jms

    - 支持两种消息传递模式:点对点(Point-to-Point, PTP)和发布/订阅(Publish-Subscribe, PUB/SUB)。 - 提供了事务支持,确保消息的可靠传递。 - 可以在不同的消息中间件之间进行互操作。 ### 三、ActiveMQ ...

    JBoss EJB3.0实例教程

    - **消息传递模型**:教程涵盖了点对点(PTP)和发布订阅(Pub/Sub)两种消息传递模型,这是JMS(Java消息服务)的核心概念,适用于构建异步通信和事件驱动的系统。 - **消息驱动Bean**:作为处理JMS消息的组件,...

    李腾飞]JMS与MDB简介.pdf

    JMS支持两种主要的消息模型:点对点(P2P)和发布订阅(Pub/Sub)。在点对点模型中,消息被发送到一个特定的队列,而该队列中的每个消息只能被一个消费者消费。这意味着一旦消息被消费,它就会从队列中移除。而在...

    Message-Driven Bean EJB实例源代码

    在Java企业级应用中,MDBs被用来接收和响应JMS(Java Message Service)消息,从而解耦应用程序的不同部分,提高系统的可扩展性和可靠性。以下是对"Message-Driven Bean EJB实例源代码"的详细解析: 1. **什么是...

    基于JMS(Java Messaging Service)开发JAVAMAIL详解

    JMS支持两种主要的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个消费者接收。而在发布/订阅模型中,消息从一个...

    EJB3.0中文资料

    - **Topic消息的发送与接收 (Pub/sub消息传递模型)** - JMS是一种消息中间件标准,用于实现应用程序之间的异步通信。 #### 六、实体BEAN (ENTITY BEAN) - **持久化PERSISTENCE.XML配置文件** - **JBOSS数据源的...

Global site tag (gtag.js) - Google Analytics