`

openJms 介绍(二)

阅读更多
上篇openJms介绍 (一)  提到了openJms的构建及消息的发送和接收,这篇主要了解消息的发布和订阅。JMS 的发布/订阅模型定义了如何向一个内容节点发布和订阅消息,内容节点也叫主题(topic),主题是为发布者(publisher)和订阅者 (subscribe) 提供传输的中介。发布/订阅模型使发布者和订阅者之间不需要直接通讯(如RMI)就可保证消息的传送,有效解决系统间耦合问题(当然有这个需要才行),还有就是提供了一对一、一对多的通讯方式,比较灵活。

先介绍JMS里2个概念,持久订阅模式和非持久订阅模式,其实也是发布/订阅模型在可靠性上提供的2种方式:

非持久订阅模式:只有当客户端处于激活状态,也就是和JMS 服务器保持连接的状态下,才能接收到发送到某个Topic的消息,而当客户端处于离线状态时,则这个时间段发到Topic的消息将会永远接收不到。

持久订阅模式:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS 服务器会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS 服务器时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息,即消息永远能接收到。

下面我们就接着来看openJms在发布/订阅模式上的表现,由于篇幅关系,在这里只讲述非持久订阅模式,持久订阅模式可以根据JMS的标准来试。

消息发布的代码如下:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
* @author Liang.xf 2004-12-27
* openJms 发布消息演示
* www.javayou.com
*/
public class TopicPublish {
    public static void main(String[] args) {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            //openJms默认的端口是1099
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);
            //获得JMS Topic连接队列工厂
            TopicConnectionFactory factory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");

            //创建一个Topic连接,并启动
            TopicConnection topicConnection = factory.createTopicConnection();
            topicConnection.start();

            //创建一个Topic会话,并设置自动应答
            TopicSession topicSession =
                topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);

            //lookup 得到 topic1
            Topic topic = (Topic) context.lookup("topic1");
            //用Topic会话生成Topic发布器
            TopicPublisher topicPublisher = topicSession.createPublisher(topic);

            //发布消息到Topic
            System.out.println("消息发布到Topic");
            TextMessage message = topicSession.createTextMessage
                ("你好,欢迎定购Topic类消息");
            topicPublisher.publish(message);

            //资源清除,代码略  ... ...   
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

而订阅消息的接收有同步的和异步2种,他们分别使用receive()和onMessage(Message message)方法来接收消息,具体代码:

同步接收:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
* @author Liang.xf 2004-12-27
* openJms 非持久订阅同步接收演示
* www.javayou.com
*/
public class TopicSubscribeSynchronous {

    public static void main(String[] args) {
        try {
            System.out.println("定购消息接收启动:");
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //获得Topic工厂和Connection
            TopicConnectionFactory factory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");
            TopicConnection topicConnection = factory.createTopicConnection();
            topicConnection.start();

            //创建Topic的会话,用于接收信息
            TopicSession topicSession =
                topicConnection.createTopicSession(
                    false,
                    Session.AUTO_ACKNOWLEDGE);

            //lookup topic1
            Topic topic = (Topic) context.lookup("topic1");
                    //创建Topic subscriber
            TopicSubscriber topicSubscriber =
                topicSession.createSubscriber(topic);
            //收满10条订阅消息则退出
            for (int i=0; i<10; i++) {
                //同步消息接收,使用receive方法,堵塞等待,直到接收消息
                TextMessage message = (TextMessage) topicSubscriber.receive();
                System.out.println("接收订阅消息["+i+"]: " + message.getText());
            }
            //资源清除,代码略  ... ...
            System.out.println("订阅接收结束.");
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

非同步接收:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
* @author Liang.xf 2004-12-27
* openJms 非持久订阅异步接收演示
* www.javayou.com
*/
public class TopicSubscribeAsynchronous implements MessageListener {
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private Topic topic;
    private TopicSubscriber topicSubscriber;

    TopicSubscribeAsynchronous() {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //取得Topic的连接工厂和连接
            TopicConnectionFactory topicConnectionFactory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");
            topicConnection = topicConnectionFactory.createTopicConnection();

            //创建Topic的会话,用于接收信息
            topicSession =
                topicConnection.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            topic = (Topic) context.lookup("topic1");

            //创建Topic subscriber
            topicSubscriber = topicSession.createSubscriber(topic);
            //设置订阅监听
            topicSubscriber.setMessageListener(this);

            //启动信息接收
            topicConnection.start();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        System.out.println("非同步定购消息的接收:");
        try {
            TopicSubscribeAsynchronous listener =
                new TopicSubscribeAsynchronous();
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //收到订阅信息后自动调用此方法
    public void onMessage(Message message) {
        try {
            String messageText = null;
            if (message instanceof TextMessage)
                messageText = ((TextMessage) message).getText();
            System.out.println(messageText);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

编译好后,启动openJms服务,打开admin管理台,为了运行方便,这里先列出三个类的运行命令:
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicPublish
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicSubscribeSynchronous
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicSubscribeAsynchronous

先运行2个接收命令,再运行发布命令,可以看到控制台的Topic有消息接收,并且接收1和2都有消息接收的提示,到此完成演示,由于是非持久订阅,所以可以看到控制台上的Topic消息条数不会减少。

最后,说说openJms的缺点,它不支持XA transactions、集群和热备等高级功能,如果你需要这些特性,最好还是使用商业的JMS服务器,但不论怎样,openJms为我们提供了一个学习JMS的最好路径,有兴趣了解JMS的还是来尝试尝试吧。
分享到:
评论

相关推荐

    【openjms笔记二】代码范例讲解

    【openjms笔记二】代码范例讲解 OpenJMS是Java消息服务(Java Message Service)的一个开源实现,它提供了一种在分布式环境中传递可靠消息的机制。本篇笔记将通过具体的代码示例来深入理解OpenJMS的核心概念和用法...

    JMS OPENJMS的实现例子

    消息是数据载体,可以是文本、对象或者二进制数据。 在实践中,使用OPENJMS的步骤通常如下: 1. **配置OPENJMS**:设置服务器配置文件,包括定义连接工厂、目的地等,并启动OPENJMS服务器。 2. **建立连接**:...

    Openjms.rar_OpenJMS

    1. **安装与环境准备**:首先,你需要下载OpenJMS的源代码或预编译的二进制包,并确保你的开发环境中已经安装了Java开发工具包(JDK)。 2. **构建项目**:如果你使用的是源代码,需要使用Maven或Ant来编译并构建...

    openjms服务器

    4. **消息类型**:JMS定义了两种基本的消息类型:TextMessage(包含纯文本内容)和BytesMessage(用于传输二进制数据)。还有MapMessage(键值对形式的数据)、ObjectMessage(序列化的Java对象)和StreamMessage...

    openjms-0.7.7-beta-1-src

    《OpenJMS 0.7.7-Beta-1 源码解析与学习指南》 OpenJMS,全称为Open Java Message Service,是一个开源的Java消息服务实现,它提供了符合JMS(Java Message Service)规范的消息传递功能。OpenJMS 0.7.7-Beta-1是该...

    OpenJMS简介

    OpenJMS是一个开源的消息传递系统,它实现了Java消息服务(JMS)规范,为企业级应用程序提供可靠的异步通信。在本文中,我们将深入探讨OpenJMS的特性、工作原理以及如何在实际应用中利用它。 首先,让我们了解什么...

    openjms-0.7.7-beta-1

    由于仅给出文件名 "openjms-0.7.7-beta-1",我们可以假设这是整个OpenJMS 0.7.7 beta-1版本的压缩包,里面可能包含源代码、编译后的二进制文件、文档、示例、配置文件等资源。通常,这样的压缩包会包括以下部分: 1...

    JMS初级认识,用的是Openjms中间件

    1. **消息(Message)**:JMS的核心元素,是数据的载体,可以包含文本、对象或二进制数据。消息可以通过不同的消息类型进行传输,如点对点(Queue)和发布/订阅(Topic)。 2. **生产者(Producer)**:也称为消息发送...

    openJMS Jar

    二、openJMS的JMS特性 1. 消息模型:openJMS支持点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)两种消息模式,满足不同应用场景的需求。 2. 队列和主题:队列确保消息被单一接收者消费,而主题允许消息...

    开源JMS服务器-openJms

    **二、openJMS特性** 1. **高性能**:openJMS设计时考虑了性能优化,能够处理大量并发的消息发送和接收。 2. **高可用性**:支持集群和故障转移,确保服务的持续可用性。 3. **安全**:提供了用户认证和授权机制,...

    OPENJMS操作消息传输示例

    **OPENJMS操作消息传输示例** 在Java世界中,JMS(Java Message Service)是一种标准,用于在分布式环境中提供可靠的消息传递。它允许应用程序通过消息代理进行异步通信,从而实现解耦和高可用性。OPENJMS是其中一...

    应用openJMS实现JMS消息发布于订阅

    根据官方文档配置OpenJMS服务器,通常需要修改`conf/openjms.xml`配置文件,设置服务器端口、连接工厂等参数。 2. **创建主题(Topic)**: 在OpenJMS中,可以通过JMS API动态创建主题。例如,可以使用`...

    openjms-开源

    通常,这些内容可以帮助开发者更好地理解OpenJMS的工作机制,对其进行二次开发,或者根据项目需求进行配置和优化。 总之,OpenJMS作为一款开源的JMS实现,以其丰富的特性、灵活的部署方式和广泛的数据库支持,为...

    OpenJMS:作业管理系统(JMS)的开源版本

    OpenJMS OpenJMS是作业管理系统(JMS)的开源版本,它是用于安置临时工或合同工的企业解决方案。 OpenJMS是备受推崇的JMS2006桌面应用程序的完全重写版本,旨在在简单的本地网络(例如工作组)上运行,并使用了...

    tomcat+opemjms+mysql配置

    根据提供的文档内容,我们可以总结出以下关键知识点,这些知识点涵盖了如何配置Tomcat、OpenJMS 和 MySQL 的步骤和注意事项。 ### 一、Tomcat + OpenJMS + MySQL 配置 #### 1. 概述 - **目标**: 本文档旨在指导...

    openjms-example-project:一个使用ant build管理开放式jms示例的项目。 纯娱乐。 好好享受

    解压openjms-0.7.7-beta-1.zip 然后通过cmd直接start.sh即可,可以看到已经在监听端口。 也可以打开admin.bat 然后通过菜单Actions启动 编译 直接在根目录下进行编译,输入 ant 或者ant -f build.xml ###启动...

    Jms的例子 不错的例子

    消息是数据载体,可以是文本、二进制或对象形式。目的地是消息发送的目的地,可以是队列(Queue)或主题(Topic)。队列遵循一对一的模型,每个消息仅由一个消费者接收;主题遵循一对多模型,多个订阅者可以接收同一...

    JMS开发例子.pdf

    #### 二、JMS消息类型 JMS定义了几种不同类型的消息,包括: 1. **TextMessage**:用于携带纯文本消息。 2. **BytesMessage**:用于携带任意字节流数据。 3. **MapMessage**:用于携带一系列名称/值对。 4. **...

    JMS入门

    【JMS入门】这篇文章主要介绍了Java消息服务(Java Message Service,简称JMS)的基本概念和如何使用开源的JMS服务器OpenJMS进行实践操作。JMS是一种标准接口,用于应用程序之间的异步通信,特别是在分布式环境中,...

Global site tag (gtag.js) - Google Analytics