`

JMS发布订阅模式

    博客分类:
  • JMS
 
阅读更多
方式一:
package com.deppon.test04.jms.topicpublish;
发布者:
import java.util.Properties;

import javax.jms.DeliveryMode; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.Session; 
import javax.jms.Topic; 
import javax.jms.TopicConnection; 
import javax.jms.TopicConnectionFactory; 
import javax.jms.TopicPublisher; 
import javax.jms.TopicSession; 
import javax.jms.TopicSubscriber; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
 
public class SimplePublisher { 
 
    private TopicPublisher producer; 
    private TopicSession session; 
    private TopicConnection connection; 
    private boolean isOpen = true; 
     
    public SimplePublisher() throws Exception{ 
       
        Properties props = new Properties();
//        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
//        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        props.setProperty("connectionFactoryNames","TopicCF");
        props.setProperty("queue.queue1","jms.queue1");
        props.setProperty("topic.topic1","jms.topic1");
        javax.naming.Context context = new InitialContext(props);
       
//        Context context = new InitialContext(); 
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF"); 
        connection = connectionFactory.createTopicConnection(); 
        connection.setClientID("OK111"); 
        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        Topic topic = (Topic)context.lookup("topic1"); 
        producer = session.createPublisher(topic);//non durable 
        producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
        connection.start(); 
         
    } 
     
     
    public boolean send(String text) { 
        if(!isOpen){ 
            throw new RuntimeException("session has been closed!"); 
        } 
        try{ 
            Message message = session.createTextMessage(text); 
            producer.send(message); 
            return true; 
        }catch(Exception e){ 
            return false; 
        } 
    } 
     
    public synchronized void close(){ 
        try{ 
            if(isOpen){ 
                isOpen = false; 
            } 
            session.close(); 
            connection.close(); 
        }catch (Exception e) { 
            // 
        } 
    } 
     


订阅者:
package com.deppon.test04.jms.topicpublish;

import java.util.Properties;

import javax.jms.Session; 
import javax.jms.Topic; 
import javax.jms.TopicConnection; 
import javax.jms.TopicConnectionFactory; 
import javax.jms.TopicSession; 
import javax.jms.TopicSubscriber; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
   
public class SimpleSubscriber { 
 
    private TopicConnection connection; 
    private TopicSession session; 
    private TopicSubscriber consumer; 
     
    private boolean isStarted; 
     
    public SimpleSubscriber(String clientId) throws Exception{ 
       
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        props.setProperty("connectionFactoryNames","TopicCF");
        props.setProperty("queue.queue1","jms.queue1");
        props.setProperty("topic.topic1","jms.topic1");
        javax.naming.Context context = new InitialContext(props);
       
//        Context context = new InitialContext(); 
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
        connection = connectionFactory.createTopicConnection(); 
        connection.setClientID(clientId); 
        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        Topic topic = (Topic)context.lookup("topic1"); 
        consumer = session.createDurableSubscriber(topic, "Test-subscriber"); 
        consumer.setMessageListener(new TopicMessageListener()); 
    } 
     
     
    public synchronized boolean start(){ 
        if(isStarted){ 
            return true; 
        } 
        try{ 
            connection.start(); 
            isStarted = true; 
            return true; 
       }catch(Exception e){ 
            return false; 
        } 
    } 
     
    public synchronized void close(){ 
        isStarted = false; 
        try{ 
            session.close(); 
            connection.close(); 
        }catch(Exception e){ 
            // 
        } 
    } 


监听消息类:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Message;
import javax.jms.MessageListener;

public class TopicMessageListener implements MessageListener {

    MessageListener paramMessageListener;

    public MessageListener getParamMessageListener() {
        return paramMessageListener;
    }

    public void setParamMessageListener(MessageListener paramMessageListener) {
        this.paramMessageListener = paramMessageListener;
    }

    @Override
    public void onMessage(Message paramMessage) {
//        this.paramMessageListener = paramMessage.get;
        // TODO Auto-generated method stub
        System.out.println("AAAAAAAAA BBBBB");
        System.out.println("AAAAAAAAA CCCCC");
    }
   
}
测试类:
package com.deppon.test04.jms.topicpublish;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SimpleTestMain { 
      /**
     * @param args
     */ 
    public static void main(String[] args) throws Exception{ 
               
        SimpleSubscriber consumer = new SimpleSubscriber("TestClientId");
       
//        consumer.close();
        consumer.start(); 
         
        SimplePublisher productor = new SimplePublisher(); 
       
        for(int i=0; i<10; i++){ 
            productor.send("message content:" + i); 
        } 
        productor.close(); 
        //consumer.close(); 
    } 
     
 


方式二:

发布者:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendTopic {
    private static final int SEND_NUMBER = 5;
    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq发送的消息" + i);
            //发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
  
    public static void main(String[] args) {
        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            //构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            //启动
            connection.start();
            //获取操作连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //获取session注意参数值FirstTopic是一个服务器的topic(与queue消息的发送相比,这里是唯一的不同)
            destination = session.createTopic("FirstTopic");
            //得到消息生成者【发送者】
            producer = session.createProducer(destination);
            //设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

订阅者:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveTopic implements Runnable {
    private String threadName;

    ReceiveTopic(String threadName) {
         this.threadName = threadName;
    }

    public void run() {
         // ConnectionFactory:连接工厂,JMS用它创建连接
         ConnectionFactory connectionFactory;
         // Connection:JMS客户端到JMS Provider的连接
         Connection connection =null;
         // Session:一个发送或接收消息的线程
         Session session;
         // Destination:消息的目的地;消息发送给谁.
         Destination destination;
         //消费者,消息接收者
         MessageConsumer consumer;
         connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
         try {
               //构造从工厂得到连接对象
               connection = connectionFactory.createConnection();
               //启动
               connection.start();
               //获取操作连接,默认自动向服务器发送接收成功的响应
               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
               //获取session注意参数值FirstTopic是一个服务器的topic
               destination = session.createTopic("FirstTopic");
               consumer = session.createConsumer(destination);
               while (true) {
                    //设置接收者接收消息的时间,为了便于测试,这里设定为100s
                    TextMessage message = (TextMessage) consumer
                                .receive(100 * 1000);
                    if (null != message) {
                          System.out.println("线程"+threadName+"收到消息:" + message.getText());
                    } else {
                          continue;
                    }
               }
         } catch (Exception e) {
               e.printStackTrace();
         } finally {
               try {
                    if (null != connection)
                          connection.close();
               } catch (Throwable ignore) {
               }
         }
    }

    public static void main(String[] args) {
          //这里启动3个线程来监听FirstTopic的消息,与queue的方式不一样三个线程都能收到同样的消息
         ReceiveTopic receive1=new ReceiveTopic("thread1");
         ReceiveTopic receive2=new ReceiveTopic("thread2");
         ReceiveTopic receive3=new ReceiveTopic("thread3");
         Thread thread1=new Thread(receive1);
         Thread thread2=new Thread(receive2);
         Thread thread3=new Thread(receive3);
         thread1.start();
         thread2.start();
         thread3.start();
    }
}
参考:
http://activemq.apache.org/jndi-support.html
http://shift-alt-ctrl.iteye.com/blog/1915240
http://coderbase64.iteye.com/blog/2081937
分享到:
评论

相关推荐

    JMS IBM MQ 订阅模式

    **JMS IBM MQ 订阅模式详解** Java Message Service(JMS)是一种API,它为在分布式环境中进行异步通信提供了标准接口。IBM MQ是IBM提供的一个消息中间件产品,它支持多种消息传递模型,包括点对点和发布/订阅模型...

    MQ JMS 发布订阅配置、代码

    以下是一个配置MQ JMS发布订阅的详细步骤: 1. **创建队列管理器**:队列管理器是MQ的核心组件,负责消息的存储、路由和传输。在WebSphere MQ资源管理器中,可以通过“新建”-&gt;“队列管理器”向导创建,例如命名为`...

    JMS实现的信息的广播订阅

    总结起来,JMS的广播订阅模式通过Topic实现了高效的消息广播,使得多个订阅者能同时接收到相同的消息。这一特性在构建大型分布式系统、事件驱动架构以及实时通知系统中非常有价值。理解和熟练运用JMS广播订阅,能够...

    ActiveMQ-Topic订阅发布模式Demo

    本示例“ActiveMQ-Topic订阅发布模式Demo”主要关注的是发布/订阅模式,这是一种一对多的消息传递方式。在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这...

    订阅发布模式

    订阅发布模式(Publish/Subscribe,简称Pub/Sub)是一种在分布式系统中进行消息传递的设计模式,它允许消息生产者(发布者)将消息发送到中间件(通常是一个消息代理),然后由多个消息消费者(订阅者)根据自己的...

    ActiveMQ的点对点与发布/订阅模式小demo

    在这个"ActiveMQ的点对点与发布/订阅模式小demo"中,我们将深入理解这两种基本的消息传递模型,并了解如何在实践中运用ActiveMQ。 1. **点对点模式(Point-to-Point,P2P)**: 点对点模式是基于队列(Queue)的...

    ActiveMQ订阅模式持久化实现

    在发布/订阅模式下,消息生产者(Publisher)发送消息到主题(Topic),而多个消息消费者(Subscriber)可以订阅该主题,从而接收消息。这种模式特别适用于广播式通信,让所有订阅者都能接收到相同的消息。 **持久...

    activeMq点对点和发布/订阅模式demo

    在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...

    jms培训PPT

    - **主题(Topic)**: 支持发布/订阅模式,多个消费者可以订阅同一个主题,当一个生产者发送消息到主题时,所有订阅该主题的消费者都能接收到消息。 - **队列(Queue)**: 支持点对点模型,每个消息只能被一个消费...

    PublisherSubscriber(发布订阅者)消息模式开发流程

    在这个模式中,有两个主要角色:发布者(Publisher)和订阅者(Subscriber)。发布者负责发送消息,而订阅者负责接收消息。这种模式广泛应用于各个领域,例如:聊天室、股票交易平台、物流系统等等。 在本文中,...

    JMS的建立及其发布

    发布/订阅模式下,消息发布者将消息发送到一个主题,多个订阅者可以订阅该主题,从而接收到消息。 要建立JMS,首先你需要选择一个JMS提供商,如ActiveMQ、RabbitMQ或Apache Qpid等。每个提供商都有自己的实现,但都...

    jms-test.zip_jms activemq_jms test

    描述中提到,“jms测试程序,将tomcat和activeMq整合在一起做的一个发送接受的发布订阅的例子”,这表明项目是基于Tomcat服务器,并且通过ActiveMQ实现了一个发布/订阅模式的消息传递。Tomcat是一个流行的Java应用...

    SpringMvc+redis+activeMq实现消息发布订阅(测试通过)

    在发布订阅模式下,生产者发布消息到主题,而订阅者可以接收到这些消息。 实现SpringMvc、Redis和ActiveMQ的消息发布订阅,首先需要在项目中引入相应的依赖。对于SpringMvc,通常需要添加Spring Web和Spring ...

    JMS 简介以及Weblogic配置JMS图解

    综上所述,JMS为Java开发者提供了一套标准的接口,用于构建可靠的消息传递系统,无论是简单的点对点通信还是复杂的发布/订阅模式。通过使用JMS,开发者可以专注于业务逻辑,而不必关心底层的消息传输细节,这极大地...

    activemq和spring整合发布消息和订阅消息demo

    3. **发布/订阅模式**:在JMS中,有两种消息模型:点对点(Queue)和发布/订阅(Topic)。在这个示例中,我们关注的是发布/订阅模式。在这种模式下,一个消息发布者将消息发送到主题,而多个订阅者可以订阅该主题以...

    消息订阅发布

    【消息订阅发布】是一种在分布式系统中广泛使用的通信模式,它允许不同的应用程序之间共享和交换信息,无需直接互相了解对方的细节。在这个模式下,发布者(publisher)发送消息到一个主题(topic)或者队列(queue...

    JMS消息模型 JMS学习.doc

    3. **JMS Domains**:JMS有两种主要的消息模型,即点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)模式。 - **Point-to-Point (PTP)模式**:在这种模型中,消息从一个生产者发送到一个...

    activemq_spring.rar_Spring和ActiveMQ_spring_消息中间件_消息发布订阅_消息订阅

    6. **实现消息的发布订阅模式**:在ActiveMQ中,有两种消息传递模式:点对点(Queue)和发布订阅(Topic)。点对点模式下,每个消息只有一个消费者;发布订阅模式下,一个消息可以被多个消费者(订阅者)接收。在...

    基于发布订阅的分布式监控主动消息交互研究.pdf

    分布式系统是一种计算架构,其中多个计算节点之间通过网络互连,协同完成任务,而非...通过采用发布订阅模式和公共信息模型,可以有效提升电力系统监控的实时性和准确性,对于保障电网的安全、可靠运行具有关键作用。

Global site tag (gtag.js) - Google Analytics