`
baobeituping
  • 浏览: 1068819 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

JMS-TOPIC生产者端(JMS服务器采用activeMQ)

阅读更多

1.首先启动JMS(ACTIVEMQ)服务器

2.编写代码如下:

package com.active;
import java.util.Arrays;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

public class ProducerTool {

    private Destination destination;
    private int messageCount = 10;
    private long sleepTime;
    private boolean verbose = true;
    private int messageSize = 255;
    private long timeToLive;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private String subject = "topic1";
    private boolean topic =true;
    private boolean transacted;
    private boolean persistent =true;//表示是否要持久化消息。

    /*
 ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
    [true|flase] 是否使用topic,默认是false
    [subject] subject的名字,默认是TOOL.DEFAULT
    [durabl] 是否持久化消息,默认是false
    [messagecount] 发送消息数量,默认是10
    [messagesize] 消息长度,默认是255
    [clientID] durable为true的时候,需要配置clientID
    [timeToLive] 消息存活时间
    [sleepTime] 发送消息中间的休眠时间
    [transacte]  是否采用事务

    ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
    [true|flase] 是否使用topic,默认是false
    [subject] subject的名字,默认是TOOL.DEFAULT
    [durabl] 是否持久化消息,默认是false
    [maxiumMessages] 接受最大消息数量,0表示不限制
    [clientID] durable为true的时候,需要配置clientID
    [transacte]  是否采用事务
    [sleepTime]  接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
    [receiveTimeOut] 接受超时


 */

    public static void main(String[] args) {
        ProducerTool producerTool = new ProducerTool();
        String[] unknown = CommandLineSupport.setOptions(producerTool, args);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " + Arrays.toString(unknown));
            System.exit(-1);
        }
        producerTool.run();
    }

    public void run() {
        Connection connection = null;
        try {
            System.out.println("Connecting to URL: " + url);
            System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
            System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
            System.out.println("Sleeping between publish " + sleepTime + " ms");
            if (timeToLive != 0) {
                System.out.println("Messages time to live " + timeToLive + " ms");
            }

            // Create the connection.首先创建一个连接
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            // Create the session.创建一个SESSION
            Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);

           //如果是TOPIC的方式则创建TOPIC连接.否则创建QUEUE连接
            if (topic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            // Create the producer.
            MessageProducer producer = session.createProducer(destination);
            if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            //我们如果设置Mode的方式为DeliveryMode.PERSISTENT.(我采用的是MYSQL的持久化方式)那么服务器所发送的消息将都保存到MYSQL数据库中的activemq_msgs表中
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //如果不是采用持久化保存的方式,那么当服务器重新启动的时候,由于客户端不在线时服务器所发的消息将不再保留.因为那些消息是保存在了服务器内存中的
            }
            if (timeToLive != 0) {
                producer.setTimeToLive(timeToLive);
            }

            // Start sending messages
            sendLoop(session, producer);

            System.out.println("Done.");

            // Use the ActiveMQConnection interface to dump the connection
            // stats.
           // ActiveMQConnection c = (ActiveMQConnection)connection;
           // c.getConnectionStats().dump(new IndentPrinter());

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            try {
                connection.close();

            注意发送消息完后要释放资源。
            } catch (Throwable ignore) {
            }
        }
    }

    protected void sendLoop(Session session, MessageProducer producer) throws Exception {

            TextMessage message = session.createTextMessage();
            message.setText("i love wuweiling");
            producer.send(message);
    }
    public void setPersistent(boolean durable) {
        this.persistent = durable;
    }

    public void setMessageCount(int messageCount) {
        this.messageCount = messageCount;
    }

    public void setMessageSize(int messageSize) {
        this.messageSize = messageSize;
    }

    public void setPassword(String pwd) {
        this.password = pwd;
    }

    public void setSleepTime(long sleepTime) {
        this.sleepTime = sleepTime;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public void setTimeToLive(long timeToLive) {
        this.timeToLive = timeToLive;
    }

    public void setTopic(boolean topic) {
        this.topic = topic;
    }

    public void setQueue(boolean queue) {
        this.topic = !queue;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }
}

分享到:
评论

相关推荐

    apache-activemq-5.16.6-bin.zip

    3. **创建消息生产者和消费者**: 使用JMS API或者支持的其他协议创建应用程序,发送和接收消息。 4. **访问Web Console**: 浏览器中输入`http://localhost:8161/admin`访问管理界面,监控和管理队列、主题和连接。 ...

    apache-activemq-5.15.6

    ActiveMQ在企业级应用中扮演着重要的角色,因为它允许应用程序通过异步通信来解耦生产者和消费者,从而提高系统的可扩展性和可靠性。 在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1....

    jms-1.1.jar(jms工具jar包)

    在实际应用中,`javax.jms-1.1.jar` 提供了JMS API的实现,开发人员可以使用这些API创建消息、连接到消息代理、创建生产者和消费者,以及进行消息的发送和接收。例如,使用`ConnectionFactory`创建连接工厂,然后...

    apache-activemq-5.9.0 下载

    2. **核心组件**:ActiveMQ的核心组件包括Broker(消息代理)、Producer(生产者)、Consumer(消费者)、Topic(主题)和Queue(队列)。Broker负责路由和存储消息,生产者发送消息,消费者接收消息。Topic适用于...

    apache-activemq-5.15.11-bin.tar.gz

    - **生产者与消费者**:使用Java或其他语言编写应用程序,通过JMS API创建消息生产和消费逻辑。 - **监控**:通过Web控制台,可以查看消息队列状态,管理消费者,以及监控系统性能。 - **消息策略**:设置消息...

    apache-activemq-5.15.7-bin

    4. **客户端连接**:开发者可以通过JMS API或其他支持的协议创建生产者和消费者,与ActiveMQ进行交互。 5. **监控管理**:ActiveMQ内置了一个Web控制台,可以在浏览器中访问`http://localhost:8161/admin`进行管理和...

    JMS-ActiveMQ入门实例

    **发送端** 的实现会创建一个JMS连接,然后创建一个生产者对象,用来向队列或主题发送消息。消息可以是文本、对象或者二进制数据。生产者将消息封装成`Message`对象,然后通过`send()`方法发送出去。 **接收端** 则...

    apache-activemq-5.15.10-bin.tar.gz

    5. **创建和配置目的地** - 在Web控制台中,可以创建Queue或Topic,并配置相关的消费者和生产者。 6. **停止服务** - 当完成测试或配置后,使用`bin/activemq stop`命令关闭服务。 **在应用开发中使用ActiveMQ:**...

    apache-activemq-5.15.1-bin.tar.gz

    1. **消息中间件**:ActiveMQ作为一个消息中间件,其主要任务是处理应用程序之间的通信,通过消息传递来解耦生产者和消费者,使得系统更加灵活和可扩展。 2. **JMS支持**:JMS是一个标准接口,定义了如何在分布式...

    apache-activemq-5.3.0-bin.zip

    - **Java API**:使用JMS API直接与ActiveMQ交互,创建生产者和消费者实例。 - **其他语言支持**:ActiveMQ支持多种编程语言,如Python、Ruby、C#等,通过相应的客户端库进行集成。 - **Spring框架集成**:Spring...

    apache-activemq-5.2.0.zip_activeMq 5.2.0_apache mq

    JMS提供了一种解耦通信的方式,使得生产者和消费者可以在不同时刻运行,甚至不知道彼此的存在。ActiveMQ作为JMS的实现,支持多种消息模式,包括点对点(Queue)和发布/订阅(Topic)。 点对点模式下,每个消息只有...

    apache-activemq-4.1.2

    这个压缩包文件"apache-activemq-4.1.2"包含了该版本的完整源码和二进制文件,允许用户在本地环境中安装和运行ActiveMQ服务器。 1. **Apache ActiveMQ基本概念**: - **JMS(Java Message Service)**:JMS是Java...

    apache-activemq-5.15.5-bin.tar.gz

    此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...

    ActiveMQ 中javax.jms的源码 javax.jms-sources-1.1.zip

    而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被广播给所有订阅者。 通过深入研究ActiveMQ的javax.jms源码,我们可以更深入地理解JMS的工作原理,了解如何利用ActiveMQ实现高效...

    apache-activemq-5.9.1-bin.tar.gz

    这个“apache-activemq-5.9.1-bin.tar.gz”压缩包包含了在Linux环境下安装和运行ActiveMQ所需的所有文件。 首先,让我们了解一下ActiveMQ的核心概念和功能。ActiveMQ是一个中间件,它作为应用程序之间的通信桥梁,...

    apache-activemq-5.15.2-bin.zip

    6. **JMS接口**:ActiveMQ支持JMS接口,这意味着开发者可以使用Java、Python、C++、.NET等多种语言创建生产者和消费者来发送和接收消息。JMS接口提供了一种标准的方式来处理异步消息,确保数据传输的可靠性和持久性...

    apache-activemq-5.15.2.zip

    通过JMS API,开发者可以创建消息生产者、消费者,并发送和接收消息。 2. **持久化机制**:ActiveMQ支持多种持久化策略,包括本地文件系统存储、LevelDB、JDBC以及KahaDB等。这确保了即使在服务宕机后,消息也不会...

    ActiveMQ-Topic订阅发布模式Demo

    在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这个主题,从而接收到这些消息。这种模式适用于需要广播消息或通知所有感兴趣方的情况。 在博客链接中...

    apache-activemq-5.12.0.zip

    开发人员可以利用这些库创建生产者和消费者,实现消息的发布与订阅。 7. **消息模式**:ActiveMQ支持点对点(Queue)、发布/订阅(Topic)等多种消息模式,满足不同场景的需求。 8. **监控与调试**:通过Web控制台...

    apache-activemq-5.15.2下的demo

    - **JMS接口**:JMS是Java平台上的标准API,用于应用程序之间的异步通信,提供了消息生产者、消费者和消息队列的概念。 - **Queues与Topics**:在ActiveMQ中,Queues支持一对一的消息传递,而Topics支持一对多的...

Global site tag (gtag.js) - Google Analytics