`
baobeituping
  • 浏览: 1065162 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

JMS-TOPIC消费者端(JMS服务器采用activeMQ)

阅读更多

package com.active;

import java.io.IOException;
import java.util.Arrays;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool implements MessageListener, ExceptionListener {

    private boolean running;

    private Session session;
    private Destination destination;
    private MessageProducer replyProducer;

    private boolean pauseBeforeShutdown;
    private boolean verbose = true;
    private int maxiumMessages =10;//该参数表示当接受到几条消息以后关闭客户端
    private String subject = "topic1";
    private boolean topic =true;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private boolean transacted;
    private boolean durable =true;//表示是否持久化消息.这个参数要和setClientID方法同时使用
    private String clientId="3161";
    private int ackMode = Session.AUTO_ACKNOWLEDGE;
    private String consumerName = "James";
    private long sleepTime;
    private long receiveTimeOut;

    /*
 ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
    [true|flase] 是否使用topic,默认是false
    [subject] subject的名字,默认是TOOL.DEFAULT
    [durabl] 是否持久化消息,默认是false
    [messagecount] 发送消息数量,默认是10
    [messagesize] 消息长度,默认是255
    [clientID] durable为true的时候,需要配置clientID
    [timeToLive] 消息存活时间
    [sleepTime] 发送消息中间的休眠时间
    [transacte]  是否采用事务

    ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
    [true|flase] 是否使用topic,默认是false
    [subject] subject的名字,默认是TOOL.DEFAULT
    [durabl] 是否持久化消息,默认是false
    [maxiumMessages] 接受最大消息数量,0表示不限制
    [clientID] durable为true的时候,需要配置clientID
    [transacte]  是否采用事务
    [sleepTime]  接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
    [receiveTimeOut] 接受超时


 */

    public static void main(String[] args) {
        ConsumerTool consumerTool = new ConsumerTool();
        String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " + Arrays.toString(unknown));
            System.exit(-1);
        }
        consumerTool.run();
    }

    public void run() {
        try {
            running = true;
            System.out.println("Connecting to URL: " + url);
            System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
            System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");

            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            Connection connection = connectionFactory.createConnection();

             //首先建立一个连接
            if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
                connection.setClientID(clientId);

//使用这个方法的时候就是客户端告诉服务端保留我客户端一个ID的引用.该客户端每次都会以该ID去连接服务器
//如果服务器发送某个消息我客户端不在线,那么服务器将客户端未接受的消息保存,然后当有ID号的客户端上线//后,服务器将把客户断没有接受的消息发送给注册了ID号的客户端 

           }
            connection.setExceptionListener(this);
            connection.start();

            session = connection.createSession(transacted, ackMode);
            if (topic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            MessageConsumer consumer = null;
            if (durable && topic) {
                consumer = session.createDurableSubscriber((Topic)destination, consumerName);

//只有TOPIC的方式才有持久化的做法.
            }
else {
                consumer = session.createConsumer(destination);
            }

            if (maxiumMessages > 0) {
                consumeMessagesAndClose(connection, session, consumer);
            } else {
                if (receiveTimeOut == 0) {
                    consumer.setMessageListener(this);
                } else {
                    consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
                }
            }

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    public void onMessage(Message message) {

    //这是监听的方法.客户端通过注册一个监听器.只要服务器发送了消息.客户端就会调用该方法来得到消息.
        try {

            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                if (verbose) {

                    String msg = txtMsg.getText();
                    if (msg.length() > 50) {
                        msg = msg.substring(0, 50) + "...";
                    }

                    System.out.println("Received: " + msg);
                }
            } else {
                if (verbose) {
                    System.out.println("Received: " + message);
                }
            }

            if (message.getJMSReplyTo() != null) {
                replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
            }

            if (transacted) {
                session.commit();
            } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
                message.acknowledge();
            }

        } catch (JMSException e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public synchronized void onException(JMSException ex) {
        System.out.println("JMS Exception occured.  Shutting down client.");
        running = false;
    }

    synchronized boolean isRunning() {
        return running;
    }

    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
        System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");

        for (int i = 0; i < maxiumMessages && isRunning();) {
            Message message = consumer.receive(1000);
            if (message != null) {
                i++;
                onMessage(message);
            }
        }
        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
        if (pauseBeforeShutdown) {
            System.out.println("Press return to shut down");
            System.in.read();
        }
    }

    protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
        System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");

        Message message;
        while ((message = consumer.receive(timeout)) != null) {
            onMessage(message);
        }

        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
        if (pauseBeforeShutdown) {
            System.out.println("Press return to shut down");
            System.in.read();
        }
    }

    public void setAckMode(String ackMode) {
        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.CLIENT_ACKNOWLEDGE;
        }
        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.AUTO_ACKNOWLEDGE;
        }
        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
        }
        if ("SESSION_TRANSACTED".equals(ackMode)) {
            this.ackMode = Session.SESSION_TRANSACTED;
        }
    }

    public void setClientId(String clientID) {
        this.clientId = clientID;
    }

    public void setConsumerName(String consumerName) {
        this.consumerName = consumerName;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public void setMaxiumMessages(int maxiumMessages) {
        this.maxiumMessages = maxiumMessages;
    }

    public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
        this.pauseBeforeShutdown = pauseBeforeShutdown;
    }

    public void setPassword(String pwd) {
        this.password = pwd;
    }

    public void setReceiveTimeOut(long receiveTimeOut) {
        this.receiveTimeOut = receiveTimeOut;
    }

    public void setSleepTime(long sleepTime) {
        this.sleepTime = sleepTime;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public void setTopic(boolean topic) {
        this.topic = topic;
    }

    public void setQueue(boolean queue) {
        this.topic = !queue;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }

}

分享到:
评论

相关推荐

    apache-activemq-5.9.0-bin

    这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...

    apache-activemq-5.16.6-bin.zip

    这个"apache-activemq-5.16.6-bin.zip"文件包含了ActiveMQ的最新稳定版本5.16.6的二进制发行版,主要用于在各种环境中部署和运行。 **Apache ActiveMQ核心概念** 1. **消息队列(Message Queue)**: 消息队列是...

    apache-activemq-5.15.6

    在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1. **JMS规范**:JMS是Java平台上的标准接口,用于与消息队列交互。它定义了生产者如何发送消息以及消费者如何接收消息的规则。ActiveMQ...

    apache-activemq-5.15.0-bin.tar.7z

    ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持...

    jms-1.1.jar(jms工具jar包)

    5. **主题(Topic)**:支持多播,允许多个消费者同时接收相同的消息。发布/订阅模型,发布者发布消息到主题,多个订阅者可以订阅该主题并接收消息。 6. **消息代理(Message Broker)**:也称为消息中间件,是负责...

    apache-activemq-5.9.0 下载

    2. **核心组件**:ActiveMQ的核心组件包括Broker(消息代理)、Producer(生产者)、Consumer(消费者)、Topic(主题)和Queue(队列)。Broker负责路由和存储消息,生产者发送消息,消费者接收消息。Topic适用于...

    apache-activemq-5.15.11-bin.tar.gz

    - **生产者与消费者**:使用Java或其他语言编写应用程序,通过JMS API创建消息生产和消费逻辑。 - **监控**:通过Web控制台,可以查看消息队列状态,管理消费者,以及监控系统性能。 - **消息策略**:设置消息...

    apache-activemq-5.12.0-bin

    在“apache-activemq-5.12.0-bin”这个压缩包中,包含了运行Apache ActiveMQ所需的所有文件,适用于Windows操作系统。 Apache ActiveMQ作为消息队列的实现,主要功能包括: 1. **消息传输**:ActiveMQ允许应用程序...

    apache-activemq-5.15.7-bin

    4. **客户端连接**:开发者可以通过JMS API或其他支持的协议创建生产者和消费者,与ActiveMQ进行交互。 5. **监控管理**:ActiveMQ内置了一个Web控制台,可以在浏览器中访问`http://localhost:8161/admin`进行管理和...

    JMS-ActiveMQ入门实例

    **接收端** 则需要创建一个消费者来监听队列或主题。对于队列,消费者使用`MessageConsumer`对象接收消息;对于主题,消费者需要先创建一个`MessageListener`,当有新消息到达时,`onMessage()`方法会被调用。 实例...

    activemq-web-console-5.11.2

    activemq-web-console的默认使用方式是通过在activemq.xml中导入jetty.xml配置一个jetty server来实现的。其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    apache-activemq-5.3.1-bin.tar.gz

    这个压缩包“apache-activemq-5.3.1-bin.tar.gz”是针对Linux/Unix系统的二进制发行版,通常包含运行和管理ActiveMQ所需的所有文件。 **1. ActiveMQ简介** Apache ActiveMQ是一个基于标准的消息中间件,它实现了JMS...

    apache-activemq-5.15.1-bin.tar.gz

    这个压缩包"apache-activemq-5.15.1-bin.tar.gz"是针对Linux操作系统的安装包,包含了运行和管理ActiveMQ所需的所有文件。 首先,让我们详细了解ActiveMQ的基本概念和功能: 1. **消息中间件**:ActiveMQ作为一个...

    apache-activemq-5.15.10-bin.tar.gz

    5. **创建和配置目的地** - 在Web控制台中,可以创建Queue或Topic,并配置相关的消费者和生产者。 6. **停止服务** - 当完成测试或配置后,使用`bin/activemq stop`命令关闭服务。 **在应用开发中使用ActiveMQ:**...

    apache-activemq-5.2.0.zip_activeMq 5.2.0_apache mq

    在压缩包`apache-activemq-5.2.0`中,你将找到以下内容: - bin目录:包含了启动和停止ActiveMQ服务器的脚本。 - conf目录:存放配置文件,如activemq.xml,用于配置broker和各种设置。 - lib目录:包含运行ActiveMQ...

    javax.jms-1.1.jar

    9. **集成JMS**:在Java应用中,可以通过导入`javax.jms-1.1.jar`,然后配置相应的JMS提供者(如ActiveMQ、RabbitMQ等),就可以利用JMS API进行消息传递。在Spring框架中,还可以利用Spring JMS模块简化JMS的使用。...

    apache-activemq-5.3.0-bin.zip

    - **Java API**:使用JMS API直接与ActiveMQ交互,创建生产者和消费者实例。 - **其他语言支持**:ActiveMQ支持多种编程语言,如Python、Ruby、C#等,通过相应的客户端库进行集成。 - **Spring框架集成**:Spring...

    apache-activemq-5.15.5-bin.tar.gz

    此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...

    ActiveMQ 中javax.jms的源码 javax.jms-sources-1.1.zip

    而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被广播给所有订阅者。 通过深入研究ActiveMQ的javax.jms源码,我们可以更深入地理解JMS的工作原理,了解如何利用ActiveMQ实现高效...

Global site tag (gtag.js) - Google Analytics