`
huanglz19871030
  • 浏览: 248841 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

开源JMS服务器-openJms介绍和应用举例

    博客分类:
  • JMS
阅读更多

本文介绍开源的JMS服务器openJms,及怎样使用openJms来构建系统之间健全、高度可用的通讯,从而简化企业级应用的研发。openJms符合SUN的JMS API 1.0.2规范,支持消息队列,还支持消息传递的发布/订阅模式,本文先就系统服务的搭建及JMS的非结构化消息发送和接收进行说明。

JMS 有五种消息类型。三种结构化或半结构化的消息类型(MapMessage、ObjectMessage 和 StreamMessage)及两种非结构化的或自由格式的消息类型(TextMessage 和 BytesMessage)。而这里虽然我们只对非结构化消息进行说明,但非结构化的消息格式却能够更好地进行交互操作,因为他们在消息上非常少利用结构,在此基础上和XML再进行结合,将能方便的进行更好的扩展,XML相关简化操作参考《Jaxb来实现Java-XML的转换》。

下面具体来介绍服务器搭建,在http://openjms.sourceforge.net/downloads.html下载openJms,解压后能直接使用,在 \openjms-0.7.6.1\bin 里,有openJms的运行脚本,执行 startup 启动,弹出一个新的窗口,服务就运行在新窗口内,shutdown 为停止命令:

服务运行后,就能开始使用JMS服务了,至此服务搭建完毕,简单得不能再简单了。

下面是消息发送和接收的研发,研发中需要的jar包在\openjms-0.7.6.1\lib里能找到:
openjms-0.7.6.1.jar
jms-1.0.2a.jar
exolabcore-0.3.7.jar
commons-logging-1.0.3.jar

把上面的类包加入到项目中,下面是消息发送服务的代码:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
 * @author Liang.xf 2004-12-24
 * For openJms 演示, Message 发送
 *
www.javayou.com
 */
public class QueueSend {
    public static void main(String[] args) {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            //openJms默认的端口是1099
            properties.put(Context.PROVIDER_URL,
                 "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //获得JMS信息连接队列工厂
            QueueConnectionFactory queueConnectionFactory =
                (QueueConnectionFactory) context.lookup(
                    "JmsQueueConnectionFactory");
            //获得JMS信息连接队列
            QueueConnection queueConnection =
                queueConnectionFactory.createQueueConnection();
            //产生队列Session,设置事务为false,自动应答消息接收
            QueueSession queueSession =
                queueConnection.createQueueSession(
                    false,
                    Session.AUTO_ACKNOWLEDGE);

            //获得默认内建在JMS里的队列之一:queue1
            Queue queue = (Queue) context.lookup("queue1");
            //产生JMS队列发送器
            QueueSender queueSender =
                queueSession.createSender(queue);
            //发送数据到JMS
            TextMessage message = queueSession.createTextMessage();
            message.setText("Hello, I’m openJms.");
            queueSender.send(message);

            System.out.println(
                ""信息写入JMS服务器队列");

            //以下做清除工作,代码略
            // ... ...
                       
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


执行程式发送消息,然后打开JMS控制台,用 admin 命令启动管理平台,点击菜单Actions-Connections-online,出现界面如下:

能看到JSM默认的队列queue1里已有1条消息了,而其他的队列还是空着的。

下面我们来看看消息接收服务的代码:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
 * @author Liang.xf 2004-12-24
 * For openJms 演示, Message接收
 *
www.javayou.com
 */
public class QueueReceiveSynchronous {
    public static void main(String[] args) {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            properties.put(Context.PROVIDER_URL,
                "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //获得JMS信息连接队列工厂
            QueueConnectionFactory queueConnectionFactory =
                (QueueConnectionFactory) context.lookup(
                    "JmsQueueConnectionFactory");

            //获得JMS信息连接队列
            QueueConnection queueConnection =
                queueConnectionFactory.createQueueConnection();

            //启动接收队列线程
            queueConnection.start();
            //产生队列Session,设置事务为false,自动应答消息接收
            QueueSession queueSession =
                queueConnection.createQueueSession(
                    false,
                    Session.AUTO_ACKNOWLEDGE);
            //获得默认内建在JMS里的队列之一:queue1
            Queue queue = (Queue) context.lookup("queue1");
            //产生JMS队列接收器
            QueueReceiver queueReceiver =
                queueSession.createReceiver(queue);
            //通过同步的方法接收消息
            Message message = queueReceiver.receive();
            String messageText = null;
            if (message instanceof TextMessage)
                messageText = ((TextMessage) message).                         
                    getText();
            System.out.println(messageText);
            //以下做清除工作,代码略
            // ... ...
           
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

编译后运行接收信息服务,能看到接收到并打印之前发送的消息,再看看控制台,发现queue1的消息队列变为0,消息已被读取,消息发送和接收到此结束。

JMS 的发布/订阅模型定义了怎么向一个内容节点发布和订阅消息,内容节点也叫主题(topic),主题是为发布者(publisher)和订阅者(subscribe) 提供传输的中介。发布/订阅模型使发布者和订阅者之间不必直接通讯(如RMI)就可确保消息的传送,有效解决系统间耦合问题(当然有这个需要才行),更有就是提供了一对一、一对多的通讯方式,比较灵活。

先介绍JMS里2个概念,持久订阅模式和非持久订阅模式,其实也是发布/订阅模型在可靠性上提供的2种方式:

非持久订阅模式:只有当客户端处于激活状态,也就是和JMS 服务器保持连接的状态下,才能接收到发送到某个Topic的消息,而当客户端处于离线状态时,则这个时间段发到Topic的消息将会永远接收不到。

持久订阅模式:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS 服务器会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS 服务器时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息,即消息永远能接收到。

下面我们就接着来看openJms在发布/订阅模式上的表现,由于篇幅关系,在这里只讲述非持久订阅模式,持久订阅模式能根据JMS的标准来试。

消息发布的代码如下:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
 * @author Liang.xf 2004-12-27
 * openJms 发布消息演示
 *
www.javayou.com
 */
public class TopicPublish {
    public static void main(String[] args) {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            //openJms默认的端口是1099
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);
            //获得JMS Topic连接队列工厂
            TopicConnectionFactory factory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");

            //创建一个Topic连接,并启动
            TopicConnection topicConnection = factory.createTopicConnection();
            topicConnection.start();

            //创建一个Topic会话,并设置自动应答
            TopicSession topicSession =
                topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);

            //lookup 得到 topic1
            Topic topic = (Topic) context.lookup("topic1");
            //用Topic会话生成Topic发布器
            TopicPublisher topicPublisher = topicSession.createPublisher(topic);

            //发布消息到Topic
            System.out.println("消息发布到Topic");
            TextMessage message = topicSession.createTextMessage
                ("你好,欢迎定购Topic类消息");
            topicPublisher.publish(message);

            //资源清除,代码略  ... ...    
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

而订阅消息的接收有同步的和异步2种,他们分别使用receive()和onMessage(Message message)方法来接收消息,具体代码:

同步接收:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
 * @author Liang.xf 2004-12-27
 * openJms 非持久订阅同步接收演示
 *
www.javayou.com
 */
public class TopicSubscribeSynchronous {

    public static void main(String[] args) {
        try {
            System.out.println("定购消息接收启动:");
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //获得Topic工厂和Connection
            TopicConnectionFactory factory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");
            TopicConnection topicConnection = factory.createTopicConnection();
            topicConnection.start();

            //创建Topic的会话,用于接收信息
            TopicSession topicSession =
                topicConnection.createTopicSession(
                    false,
                    Session.AUTO_ACKNOWLEDGE);

            //lookup topic1
            Topic topic = (Topic) context.lookup("topic1");
                    //创建Topic subscriber
            TopicSubscriber topicSubscriber =
                topicSession.createSubscriber(topic);
            //收满10条订阅消息则退出
            for (int i=0; i<10; i++) {
                //同步消息接收,使用receive方法,堵塞等待,直到接收消息
                TextMessage message = (TextMessage) topicSubscriber.receive();
                System.out.println("接收订阅消息["+i+"]: " + message.getText());
            }
            //资源清除,代码略  ... ...
            System.out.println("订阅接收结束.");
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

非同步接收:

package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
/**
 * @author Liang.xf 2004-12-27
 * openJms 非持久订阅异步接收演示
 *
www.javayou.com
 */
public class TopicSubscribeAsynchronous implements MessageListener {
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private Topic topic;
    private TopicSubscriber topicSubscriber;

    TopicSubscribeAsynchronous() {
        try {
            //取得JNDI上下文和连接
            Hashtable properties = new Hashtable();
            properties.put(
                Context.INITIAL_CONTEXT_FACTORY,
                "org.exolab.jms.jndi.InitialContextFactory");
            properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
            Context context = new InitialContext(properties);

            //取得Topic的连接工厂和连接
            TopicConnectionFactory topicConnectionFactory =
                (TopicConnectionFactory) context.lookup(
                    "JmsTopicConnectionFactory");
            topicConnection = topicConnectionFactory.createTopicConnection();

            //创建Topic的会话,用于接收信息
            topicSession =
                topicConnection.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            topic = (Topic) context.lookup("topic1");

            //创建Topic subscriber
            topicSubscriber = topicSession.createSubscriber(topic);
            //设置订阅监听
            topicSubscriber.setMessageListener(this);

            //启动信息接收
            topicConnection.start();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        System.out.println("非同步定购消息的接收:");
        try {
            TopicSubscribeAsynchronous listener =
                new TopicSubscribeAsynchronous();
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //收到订阅信息后自动调用此方法
    public void onMessage(Message message) {
        try {
            String messageText = null;
            if (message instanceof TextMessage)
                messageText = ((TextMessage) message).getText();
            System.out.println(messageText);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

编译好后,启动openJms服务,打开admin管理台,为了运行方便,这里先列出三个类的运行命令:
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicPublish
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicSubscribeSynchronous
java -cp .\; -Djava.ext.dirs=.\lib; javayou.demo.openjms.TopicSubscribeAsynchronous

先运行2个接收命令,再运行发布命令,能看到控制台的Topic有消息接收,并且接收1和2都有消息接收的提示,到此完成演示,由于是非持久订阅,所以能看到控制台上的Topic消息条数不会减少。

最后,说说openJms的缺点,他不支持XA transactions、集群和热备等高级功能,如果你需要这些特性,最佳还是使用商业的JMS服务器,但不论怎样,openJms为我们提供了一个学习JMS的最佳路径,有兴趣了解JMS的还是来尝试尝试吧。

2
0
分享到:
评论

相关推荐

    开源JMS服务器-openJms

    开源JMS(Java Message Service)服务器openJMS是企业级消息传递系统的一种实现,它提供了标准JMS接口,用于应用程序之间的异步通信。作为一个开源项目,openJMS提供了可扩展且可靠的平台,允许开发者在分布式环境中...

    openjms-0.7.7-beta-1

    "openjms-0.7.7-beta-1" 是一个软件版本标识,其中 "openjms" 是一个开源的Java消息服务(Java Message Service,简称JMS)实现,版本号为0.7.7,标记为beta-1,表示这是一个测试阶段的版本,可能含有未修复的bug或...

    openjms服务器

    OpenJMS服务器是一款开源的消息中间件,主要用于Java消息服务(JMS)的实现。它提供了一个可靠的、可扩展的平台,使得应用程序能够通过消息传递进行通信,从而实现解耦和异步处理。JMS是Java平台上的一种标准接口,...

    openjms-0.7.7-beta-1-src

    通过对OpenJMS 0.7.7-Beta-1源码的阅读和分析,开发者不仅可以掌握JMS规范,还能深入了解消息中间件的内部工作原理,提升在分布式系统和企业级应用开发中的能力。同时,源码中的设计模式和编程实践也是软件工程师...

    JMS OPENJMS的实现例子

    OPENJMS是JMS的一个开源实现,提供了一个轻量级的消息中间件,使得开发者可以方便地在Java应用程序之间实现消息交换。 JMS的核心概念包括生产者(Producer)、消费者(Consumer)、队列(Queue)和主题(Topic)。...

    JMS初级认识,用的是Openjms中间件

    作为一个轻量级的消息传递服务器,OpenJMS适用于小型到大型的企业级应用,具有良好的可扩展性和稳定性。 **JMS核心概念:** 1. **消息(Message)**:JMS的核心元素,是数据的载体,可以包含文本、对象或二进制数据...

    应用openJMS实现JMS消息发布于订阅

    在提供的压缩包文件`openjms-0.7.7-beta-1`中,包含了OpenJMS的库和配置文件。根据官方文档配置OpenJMS服务器,通常需要修改`conf/openjms.xml`配置文件,设置服务器端口、连接工厂等参数。 2. **创建主题(Topic)...

    开源的JMS服务器和源码

    OpenJMS是一个开源的Java Message Service API 1.0.2 规范的实现,它包含有以下特性: *. 它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。 *. 支持同步与异步消息发送 *. JDBC持久性...

    OpenJMS简介

    OpenJMS是一个开源的消息传递系统,它实现了Java消息服务(JMS)规范,为企业级应用程序提供可靠的异步通信。在本文中,我们将深入探讨OpenJMS的特性、工作原理以及如何在实际应用中利用它。 首先,让我们了解什么...

    Openjms.rar_OpenJMS

    OpenJMS是一个开源的消息中间件(Message Oriented Middleware, MOM),它实现了Java消息服务(Java Message Service, JMS)规范,允许应用程序通过异步发送和接收消息进行通信。在Java开发环境中,OpenJMS提供了...

    openJMS Jar

    总结,openJMS作为开源的JMS实现,提供了丰富的功能和高度的灵活性,使得开发者可以轻松地在Java应用中实现消息传递。理解并掌握openJMS的配置、使用和优化,对于提升Java应用的可扩展性和稳定性具有重要意义。通过...

    JMS入门

    【JMS入门】这篇文章主要介绍了Java消息服务(Java Message Service,简称JMS)的基本概念和如何使用开源的JMS服务器OpenJMS进行实践操作。JMS是一种标准接口,用于应用程序之间的异步通信,特别是在分布式环境中,...

    jms.rar_jar j_java jms_jms_jms jar_jms.j

    `openjms-0.7.7-beta-1`则是OpenJMS的一个早期版本,OpenJMS是一个开源的JMS实现,提供了消息中间件的功能,用于处理和传递消息。 现在,我们详细讨论一下JMS的核心概念和应用场景: 1. **消息队列**:JMS通过消息...

    JMS 简单使用指南

    - **OpenJMS**:这是一个开源的JMS实现,遵循JMS 1.0.2规范,适合用于学习和研究JMS。 - **iLinkMQ**:由中国人开发的纯Java实现,完全支持JMS接口规范1.0.2,提供事务和可靠消息传输等功能,适用于企业级应用。 ...

    openjms-开源

    了解了OpenJMS的基本特性和应用场景后,我们来看一下压缩包文件`openjms-0.7.7-beta-1`。这个版本可能包含了OpenJMS的源代码、构建脚本、文档和示例,供开发者学习、调试和定制。通常,这些内容可以帮助开发者更好地...

    解析Tomcat下应用JMS开发技巧

    最后,需要将JMS服务器与Web服务器集成,首先需要配置应用的web.xml文件,添加context-param和listener元素,以便 aktivemq.xml文件的配置。 在web.xml文件中,需要添加以下代码: &lt;context-param&gt; &lt;param-name&gt;...

    OPENJMS操作消息传输示例

    OPENJMS是其中一个开源的JMS实现,它提供了轻量级的消息中间件,支持多种协议和传输方式。本示例将深入探讨如何使用OPENJMS进行消息传输,并通过`JmsTest`这个实例文件来展示具体操作。 首先,我们需要理解JMS的...

    J2EE学习中一些值得研究的开源项目

    在Java领域,特别是针对企业级应用开发的J2EE(Java 2 Platform, Enterprise Edition)框架中,有许多优秀的开源项目可以帮助开发者更好地理解和掌握这一技术栈的核心概念与实践技巧。本文将对一些值得关注的开源...

    JMS开发例子

    为了进行消息的发送和接收,你需要将OpenJMS的库文件(如openjms-0.7.6.1.jar, jms-1.0.2a.jar, exolabcore-0.3.7.jar, 和commons-logging-1.0.3.jar)添加到你的项目类路径中。以下是一个简单的消息发送服务的Java...

Global site tag (gtag.js) - Google Analytics