`
yushl
  • 浏览: 11582 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

HornetQ初体验

    博客分类:
  • mq
 
阅读更多
技术介绍

下面来自百度百科

HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMSHornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。[1]

HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!

HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的依赖第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。

HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。

HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。

HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。

 

 

应用场景

首先HornetQ是一种消息服务中间件,高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 本文档主要涉及到HornetQJMS功能的使用,HornetQJMS只是对HornetQ 的一种封装,适配了javaJMS协议。

 

如何集成到项目

HornetQ目前大致有三种方式:standalone embedded Integrated  with JBoss as

我个人倾向于 standalone方式,因为:

1)  可以有更多的资源供HornetQ单独使用

2)  管理的话只需要关注HornetQ这一个产品的问题就行,而无需引入其他的复杂度。

3)  原项目中也是把消息中间件作为一个单独的模块部署,对原来的流程可以做到无缝承接。

目前我只是关注了HornetQ standalone这一模式,其他的暂且没有 深入。

 

使用HornetQ服务端很简单,直接运行% HornetQ _HOME%/bin下的bat/sh就可以启动(优化问题暂时没有考虑)

 

客户端推荐用HornetQclientSpring做集成,spring的配置文件内容大致如下所示:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

 

    <bean id="listener" class="org.hornetq.jms.example.ExampleListener"/>

   

 

    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="connectionFactory" ref="ConnectionFactory"/>

        <property name="destination" ref="/queue/exampleQueue"/>

        <property name="messageListener" ref="listener"/>

    </bean>

   

   

    <bean id="queueTarget" class="org.springframework.jndi.JndiObjectTargetSource">

   

        <property name="jndiName">

   

            <value>queue/testQueue</value>

   

        </property>

   

        <property name="jndiTemplate">

   

            <ref local="jndiTemplate"/>

   

        </property>

   

    </bean>

   

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">

   

        <property name="environment">

   

            <props>

   

                <prop key="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</prop>

   

                <prop key="java.naming.provider.url">jnp://localhost:1099</prop>

   

                <prop key="java.naming.factory.url.pkgs">org.jboss.naming:org.jnp.interfaces</prop>

   

            </props>

   

        </property>

 

</bean>

</beans>

 

因为HornetQclient主要是以JNDI和服务端进行连接,所以以上我们都是通过Spring提供的JMS模板类和JNDI模板类来对HornetQclient进行配置与管理。

 

使用步骤

   具体示例主要是以本地main方法为主,用spring来管理的话也很简单.

首先加入HornetQ客户端必须使用到的HornetQ 工程的jar

 



 

除了jboss-client.jar ,其他的都可以在HornetQ的下载包里找到,jboss-client.jar需要单独的下载JBoss AS,我下载的是JBoss AS7jboss-client.jar的目录为% JBoss AS7_HOME%/bin/client

 JMS Queue

1)       Queue Provider

 

publicstaticvoid main(String[] args) throws Exception{

      

           //初始化JNDI

           Properties properties = new Properties(); 

            properties.put("java.naming.factory.initial", 

                    "org.jnp.interfaces.NamingContextFactory"); 

            properties.put("java.naming.factory.url.pkgs", 

                    "org.jboss.naming:org.jnp.interfaces"); 

            properties.put("java.naming.provider.url", "jnp://localhost:1099"); 

            InitialContext ic = new InitialContext(properties);

           

            //建立ConnectionFactory

            ConnectionFactory cf = (ConnectionFactory) ic 

                    .lookup("/ConnectionFactory");

           

            //建立到Queue连接

            Queue orderQueue = (Queue) ic.lookup("queue/ExpiryQueue");

           

            //通过Queue建立Connection

            Connection connection = cf.createConnection(); 

           

            //通过Connection建立session

            Session session = connection.createSession(false, 

                    Session.AUTO_ACKNOWLEDGE);

           

            //建立JMS生产者

            MessageProducer producer = session.createProducer(orderQueue); 

            //这一步必须,启动connection

            connection.start(); 

           

            TextMessage message =  session.createTextMessage("First hornetq");

            producer.send(message);

            System.out.println("send success");

 

 

2)  Queue Consumer

 

        //初始化JNDI

       Properties properties = new Properties(); 

        properties.put("java.naming.factory.initial", 

                "org.jnp.interfaces.NamingContextFactory"); 

        properties.put("java.naming.factory.url.pkgs", 

                "org.jboss.naming:org.jnp.interfaces"); 

        properties.put("java.naming.provider.url", "jnp://localhost:1099"); 

        InitialContext ic = new InitialContext(properties); 

            ConnectionFactory cf = (ConnectionFactory) ic 

                    .lookup("/ConnectionFactory"); 

            Queue orderQueue = (Queue) ic.lookup("queue/ExpiryQueue"); 

            Connection connection = cf.createConnection(); 

            Session session = connection.createSession(false, 

                    Session.AUTO_ACKNOWLEDGE); 

            MessageConsumer consumer = session.createConsumer(orderQueue); 

            connection.start(); 

           

上面建立连接的部分的注释参考Queue Provider

 

/*          Message message =  consumer.receive();*/

           

            consumer.setMessageListener(new MessageListener() {

             

              @Override

              publicvoid onMessage(Message message) {

 

                  if(message instanceof TextMessage) {

                   TextMessage textMessage =  (TextMessage)message;

                   String text;

                     try {

                         text = textMessage.getText();

                      System.out.println("Get Text message" + text);

                     } catch (JMSException e) {

                         e.printStackTrace();

                     }

 

                  

                   }else {

                   System.out.println("Get message" + message);

                  }

                 

             

                 

              }

           });

           

            Thread.sleep(30000);

 

以上可以看到Consumer有二种方式

一种是调用receive,这样会阻塞,直到有消息为止,第二种是注册一个回调函数,实现MessageListener接口,这一种是异步的。

 

 

1)Topic Provider

 

    publicstaticvoid main(String[] args) throws Exception{

       Properties properties = new Properties(); 

        properties.put("java.naming.factory.initial", 

                "org.jnp.interfaces.NamingContextFactory"); 

        properties.put("java.naming.factory.url.pkgs", 

                "org.jboss.naming:org.jnp.interfaces"); 

        properties.put("java.naming.provider.url", "jnp://localhost:1099"); 

        InitialContext initialContext = new InitialContext(properties);

        // Step 2. perform a lookup on the topic

//        Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");

 

        // Step 3. perform a lookup on the Connection Factory

        ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");

 

        // Step 4. Create a JMS Connection

        Connection connection = cf.createConnection();

       

        // Step 11. Start the Connection

        connection.start();

 

        // Step 5. Create a JMS Session

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

       

//        Topic topic =  session.l("/topic/exampleTopic");

        Topic topic =  (Topic)initialContext.lookup("/topic/exampleTopic2");

 

        // Step 6. Create a Message Producer

        MessageProducer producer = session.createProducer(topic);

 

        // Step 9. Create a Text Message

        TextMessage message = session.createTextMessage("This is a text message");

 

        System.out.println("Sent message: " + message.getText());

 

        // Step 10. Send the Message

        producer.send(message);

       

        System.out.println("Topic send success");

 

 

     }

 

TopicProviderQueueProvider基本类似,只是一个是获得Queue,另外一个是获得Topic

 

2)Topic Consumer

 

    publicstaticvoid main(String[] args) throws Exception{

       Properties properties = new Properties(); 

        properties.put("java.naming.factory.initial", 

                "org.jnp.interfaces.NamingContextFactory"); 

        properties.put("java.naming.factory.url.pkgs", 

                "org.jboss.naming:org.jnp.interfaces"); 

        properties.put("java.naming.provider.url", "jnp://localhost:1099"); 

        InitialContext initialContext = new InitialContext(properties);

        // Step 2. perform a lookup on the topic

        Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic2");

 

        // Step 3. perform a lookup on the Connection Factory

        ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");

 

        // Step 4. Create a JMS Connection

        Connection connection = cf.createConnection();

 

        // Step 5. Create a JMS Session

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

       

        // Step 7. Create a JMS Message Consumer

        MessageConsumer messageConsumer1 = session.createConsumer(topic);

 

        // Step 8. Create a JMS Message Consumer

        MessageConsumer messageConsumer2 = session.createConsumer(topic);

       

     // Step 11. Start the Connection

        connection.start();

       

        // Step 12. Receive the message

        TextMessage messageReceived = (TextMessage)messageConsumer1.receive();

 

        System.out.println("Consumer 1 Received message: " + messageReceived.getText());

 

        // Step 13. Receive the message

        messageReceived = (TextMessage)messageConsumer2.receive();

 

        System.out.println("Consumer 2 Received message: " + messageReceived.getText());

}

Topic 模式的测试必须是consumer先启动,然后provider再启动,consumer才能获得消息。这是由于Topic的特性而决定的。

 

后记:到目前为止,我还没有找到方法像ActiveMQ那样动态的创建Queue或者Topic的,ActiveMQ中,如果向服务端发送请求,如果服务端没有这个Queue或者Topic,那么服务端会自动创建一个,但是HornetQ中没有这个功能,必须在配置文件中配置想要的Queue或者Topic HornetQ服务端会热加载配置文件。

如果直接启动的话,HornetQ默认加载%HornetQ_HOME%\config\stand-alone\non-clustered\ hornetq-jms.xml

具体配置如下:

  <queue name="DLQ">

      <entry name="/queue/DLQ"/>

   </queue>

  

   <queue name="ExpiryQueue">

      <entry name="/queue/ExpiryQueue"/>

   </queue>

  

   <topic name="exampleTopic">

      <entry name="/topic/exampleTopic"/>

   </topic>

  

   <topic name="exampleTopic2">

      <entry name="/topic/exampleTopic2"/>

   </topic>

 

HornetQ本身并没有提供想ActiveMQ那样的网页管理界面,它必须和JBoss asapplication server集成,或者自己写程序调用它提供的接口,但这样无形提高了工作量。

有个简单变通的方法,因为HornetQ里面集成了JMX,所以可以通过jdk的工具jconsole来查看里面的一些信息,和操作里面的一些属性,达到管理的目的



 

 

 

 

 

  • 大小: 18.2 KB
  • 大小: 29.5 KB
  • 大小: 47.3 KB
  • 大小: 51.3 KB
分享到:
评论

相关推荐

    HornetQ官方学习资料

    ### HornetQ官方学习资料知识点概述 #### 一、HornetQ简介 HornetQ是一款由JBoss开发并维护的消息中间件,它具备高度的可扩展性和灵活性,能够支持集群部署以及多种消息传递协议。HornetQ不仅完全支持JMS(Java ...

    HornetQ2.3 API 文档

    HornetQ 2.3.0 Alpha 发布,这不是一个简单的 Alpha 版本,同时也是一个大的发行版本。该版本对 2.2.0 进行了重构,引入一些原子故障迁移特性和大量企业特性改进。详细的新特性介绍请看发行说明。 HornetQ是一个...

    HornetQ 2.1 中文文档

    HornetQ是JBoss社区所研发的开放源代码消息中间件;HornetQ是以Java 5 编写,因此只需要操作系统支持Java虚拟机,HornetQ便可运行。 支持Java消息服务 (JMS) 1.1 版本 集群 (Clustering) 支持庞大的消息(Message)...

    Hornetq2.1中文手册

    HornetQ 2.1 用户手册是一份详尽的指南,主要针对那些对消息中间件感兴趣的开发者,尤其是对英文文档阅读有困难的用户。HornetQ 是 JBoss 社区开发的一个开源消息传递系统,它支持 Message Queuing (MQ) 协议,能够...

    HornetQ2.1中文手册

    ### HornetQ2.1中文手册关键知识点解析 #### 一、消息的相关概念 HornetQ2.1中文手册中详细介绍了消息处理系统的基本概念和技术细节,这对于理解和使用HornetQ至关重要。 ##### 4.1 消息相关的概念 - **消息**:在...

    hornetq 实例

    HornetQ是一款开源的消息中间件,它在Java消息服务(JMS)规范下提供高效、可伸缩、高可用性的消息传递功能。HornetQ的设计目标是为各种规模的应用提供灵活、高性能且易于使用的消息传递解决方案。下面将详细探讨...

    hermes 监听hornetq JMS配置步奏

    在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...

    hornetq-2.3.0.Final-bin.zip

    HornetQ是JBoss公司开发的一个开源的消息中间件(Message Broker),它提供高效、可扩展、高可用性的消息传递服务。在hornetq-2.3.0.Final-bin.zip这个压缩包中,包含了HornetQ 2.3.0 Final版本的所有组件和必要的...

    hornetq 2.4.0免安装

    HornetQ 2.4.0 是一款轻量级且高效的开源消息中间件(Message Queuing,简称MQ),它提供了可靠的消息传递服务,适用于分布式系统中的数据通信。这款MQ解决方案设计目标是高吞吐量、低延迟以及可扩展性,使得在大...

    ActiveMQ和HornetQ性能对比

    ### ActiveMQ与HornetQ性能对比分析 #### 概述 本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送...

    .net 连接HornetQ,需要的dll

    .NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。在.NET环境中与HornetQ进行交互,通常需要借助特定的客户端库,如Apache.NMS.Stomp。下面将...

    HornetQ2.1中文手册.7z

    HornetQ是JBoss公司开发的一个开源消息中间件,它在Java消息服务(JMS)规范的基础上提供了高效、可扩展且高度可靠的异步通信功能。这个“HornetQ 2.1中文手册”是一个压缩包文件,包含了对HornetQ 2.1版本的详细...

    hornetq-2.2.5.Final.zip

    hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip

    hornetq-2.4.0.Final-bin.tar

    hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用

    HornetQ集群配置

    HornetQ是一款高性能、可伸缩且开源的消息中间件,它被广泛用于构建分布式系统中的消息传递。在HornetQ中,集群配置是一种重要的特性,它允许多个HornetQ服务器形成一个集群,共享资源,提高可用性和可扩展性。本篇...

    hornetq-transports-2.0.0.GA.jar

    hornetq-transports-2.0.0.GA.jar

    hornetq-journal-2.3.19.Final.zip

    【标题】"hornetq-journal-2.3.19.Final.zip" 提供的是HornetQ消息中间件的一个组件——Journal模块的特定版本。HornetQ是JBoss社区开发的一个高性能、可扩展且功能丰富的开源消息传递系统,它被广泛用于企业级应用...

    HornetQ Messaging Developer's Guide.pdf

    HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...

    HornetQ 2_1用户手册

    什么是HornetQ? * HornetQ 是一个开源的软件项目。它的目标是一个多协议、可嵌入、高性能、可集群的异步消息系统。 * HornetQ 是一个消息中间件(MoM)。有关MoM和其它消息相关的概念解释请参见 Chapter 4, ...

    hornetq-2.4.0

    hornetQ 2.4.0

Global site tag (gtag.js) - Google Analytics