`
lws0402
  • 浏览: 108557 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

hornetq 集成 spring

    博客分类:
  • MQ
 
阅读更多
一、简介   
    HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。
• HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
• HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
• HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
• HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
• HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中

二、软件下载   
    hornet 2.4.0.Final
    下载地址:http://downloads.jboss.org/hornetq/hornetq-2.4.0.Final-bin.zip

三、目录结构
解压后,目录结构如下图:


bin:启动目录包括run.bat、run.sh、stop.bat和stop.sh四个文件,分别为windows和linux下的启动 和 停止 命令。
config:配置文件,直接启动时,使用的是config\stand-alone\non-clustered中的配置文件
examples:示例,很多例子可以用来学习
lib:相关的jar包


四、集成spring(不考虑集群)
   我使用的spring为3.2.9
    1、使用maven构建,在pom.xml文件中增加如下代码:

    <dependency>
      <groupId>io.netty</groupId> 
      <artifactId>netty-all</artifactId> 
      <version>4.0.36.Final</version>  
    </dependency>

    <dependency>
      <groupId>org.hornetq</groupId> 
      <artifactId>hornetq-commons</artifactId> 
      <version>2.4.7.Final</version>  
    </dependency> 

    <dependency>
      <groupId>org.hornetq</groupId>
      <artifactId>hornetq-core-client</artifactId> 
      <version>2.4.7.Final</version>  
    </dependency>
      
    <dependency>
      <groupId>org.hornetq</groupId>
      <artifactId>hornetq-jms-client</artifactId> 
      <version>2.4.1.Final</version>  
    </dependency>
      
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>3.2.9.RELEASE</version>
    </dependency>


    2、spring配置文件
<!--配置jms模板,程序里使用 jmsTemplate来发送消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="connectionFactory" />
      <property name="pubSubDomain" value="true" />      
    </bean>
    
<!—创建队列, testQueue为队列名称,在config\stand-alone\non-clustered\hornetq-jms.xml的文件中配置 -->
    <bean id="messageQueue" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createQueue">
        <constructor-arg value="testQueue" />
    </bean>

    <!—hornetq连接配置 host为队列所在服务器的ip,post为连接端口-->
    <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">  
        <constructor-arg  
            value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />  
        <constructor-arg>  
            <map key-type="java.lang.String" value-type="java.lang.Object">  
                <entry key="host" value="172.18.8.35"></entry>  
                <entry key="port" value="5445"></entry>  
            </map>  
        </constructor-arg>  
    </bean>
    
<!—建立连接工厂 -->
    <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"  
        factory-method="createConnectionFactoryWithoutHA">  
        <constructor-arg type="org.hornetq.api.jms.JMSFactoryType"  
            value="CF" />  
        <constructor-arg ref="transportConfiguration" />  
    </bean>
    
    <!—设置监听,用来接收消息。messageListener为消息监听,采用注解方式声明 -->
    <bean id="jmsContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="messageQueue" />
        <property name="messageListener" ref="messageTestListener" />  
    </bean>


    3、修改hornetq配置
   修改安装目录下config/stand-alone/non-clustered/hornetq-configuration.xml的配置文件:

<connectors>
      <connector name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFa  ctory</factory-class>
         <param key="host"  value="172.18.8.35"/>
         <param key="port"  value="5445"/>
      </connector>
  </connectors>
  <acceptors>
    <acceptor name="netty">      <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
      <param key="host"  value="172.18.8.35"/>
      <param key="port"  value="5445"/>
    </acceptor>
</acceptors>


代码中host改为本机可以访问到的ip,如果设置成127.0.0.1,其它机器将不能访问该服务器的消息队列
    修改hornetq-jms.xml配置,在该配置文件中增加名称为testQueue的队列,增加如下代码,其它配置可以使用默认配置

<queue name="testQueue">
      <entry name="/queue/testQueue"/>
   </queue>


    4、启动hornetq
      windows下直接运行run.bat,linux下运行run.sh

   5、发送消息

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class TestMQController {

	@Resource(name = "jmsTemplate")
	private JmsTemplate jmsTemplate;

	@Resource(name = "messageQueue")
    private Queue queue;

	@RequestMapping(value = "test")
	public String test() {
		jmsTemplate.send(queue, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage om = session.createTextMessage("测试消息");
                return om;
			}
		});
		return "";
	}
}



    6、接收消息
        先写监听器,监听器需要配置在 spring配置文件的 jmsContainer中

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class MessageTestListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage)message;
		try {
			System.out.println("MessageTestListener = " + textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}



五、异常处理
    收到消息进行处理时,可能会出现异常,比如说数据存取失败等,需要对异常消息进行处理。
    在这里我们实现spring-jms包中的SessionAwareMessageListener接口来进行消息监听,操作session的commit和rollback方法来确认和回滚消息,代码如下:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component
public class MessageTestListener implements SessionAwareMessageListener<Message>{

	public void onMessage(Message message, Session session) throws JMSException {
		TextMessage textMessage = (TextMessage)message;
		try {
			System.out.println("MessageTestListener = " + textMessage.getText());
                     //提交消息,确认消息处理成功
                     session.commit();
		} catch (JMSException e) {
                     //消息回滚
                     session.rollback();
			e.printStackTrace();
		}
	}

}


回滚后后的消息会进行几次重试(好像是5次),5次之后如果还是没有消费成功,就会进入死消息的队列,hornetq默认的死消息队列在hornetq-jms.xml配置文件中有默认配置,如下
<queue name="DLQ">
      <entry name="/queue/DLQ"/>
   </queue>

之后可以在该队列中取出消息,进行分析处理。
使用java代码连接hornetq时,会创建session,创建时可以指定消息确认机制,非事务时为AUTO_ACKNOWLEDGE和CLIENT_ACKNOWLEDGE,前者为自动通知mq,后者为需要手动调用message.acknowledge()进行通知,事务时为SESSION_TRANSACTED,但是spring封装了该操作,默认是AUTO_ACKNOWLEDGE,我们需要改成事务的,所以在spring配置监听的时候加上一个属性,如下:

<bean id="jmsContainer"     
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">     
        <property name="connectionFactory" ref="connectionFactory" />     
        <property name="destination" ref="messageQueue" />   
        <property name="messageListener" ref="messageTestListener" />  
        <property name="sessionTransacted" value="true" />
    </bean>

加入sessionTransacted为true,就可以设置事务的消息,从而解决处理时异常。

六、安全认证
    上述虽然可以进行消息的发送和处理,但是使用的都是默认用户,存在一定的安全隐患,hornetq支持用户的定义和权限的分配,具体的安全配置可以参考hornetq的用户手册(http://hornetq.sourceforge.net/docs/hornetq-2.1.2.Final/user-manual/zh/html/index.html),这里只讲spring如何进行hornetq的安全认证。我们都知道jms的connectionfactory提供两个创建连接的方法,createConnection()和createConnection(username,password),前者是使用默认用户访问,后者是采用了安全策略,根据前面的配置可以发现,spring接收消息的时候使用的是spring-jms包的监听类DefaultMessageListenerContainer,翻看源码发现DefaultMessageListenerContainer类中的connectionFactory是调用createConnection()来创建连接的,不能加入用户名和密码,这样连接时就会报错。解决这个问题需要借助于spring-jms包中的另一个类UserCredentialsConnectionFactoryAdapter,看类名就知道是connectionFactory的用户认证代理类,在这里面可以设置用户名和密码。UserCredentialsConnectionFactoryAdapter继承了javax.jms.ConnectionFactory,所以我们可以使用这个代理类来包装原先的connectionFactory,主要代码如下:
<!-- 建立连接工厂 (原先的工厂类)-->  
    <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"    
        factory-method="createConnectionFactoryWithoutHA">    
        <constructor-arg type="org.hornetq.api.jms.JMSFactoryType"    
            value="CF" />    
        <constructor-arg ref="transportConfiguration" />    
    </bean> 

<!-- 创建用户认证工厂类 -->
<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">    
    <!-- 这里的目标工厂,就是上面定义的工厂 -->
        <property name="targetConnectionFactory" ref="connectionFactory"/>
        <property name="username" value="mytest"/>
        <property name="password" value="mytest"/>    
    </bean>

<!-- 创建jms模板时,工厂类改为 myConnectionFactory -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
      <property name="connectionFactory" ref="myConnectionFactory" />  
      <property name="pubSubDomain" value="true" />        
    </bean> 

<!-- 创建监听时,也该用新的工厂类 -->  
    <bean id="jmsContainer"    
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
        <property name="connectionFactory" ref="myConnectionFactory" />    
        <property name="destination" ref="messageQueue" />  
        <property name="messageListener" ref="messageTestListener" />    
    </bean>

改为这样配置后,就可以通过用户认证了。

七、其他说明
    1、hornetq支持消息持久化,可以将消息持久化到硬盘,防止hornetq服务器down机后的消息丢失

    2、如果消息已发送,但此时接收端出错,重启接收端服务,如果消息未被消费,依然可以继续处理消息

  • 大小: 72.5 KB
分享到:
评论

相关推荐

    HornetQ2.0.0GA

    6. 容易集成:HornetQ 可以轻松集成到基于Java的任何应用程序中,包括但不限于Spring框架,以及JBoss应用服务器,因为HornetQ 原生是JBoss AS的一部分。 7. 集成开发环境:HornetQ 提供了强大的管理控制台和API,...

    springboot学习思维笔记.xmind

    spring-boot-starter-hornetq spring-boot-starter-integration spring-boot-starter-jdbc spring-boot-starter-jerscy spring-boot-starter-jta-atomikos spring-boot-starter-jta-...

    JMS相关的Source

    总之,这个"JMS相关的Source"可能包含了一个简单的Spring配置文件,展示了如何在JBoss环境中集成JMS服务,并使用Spring的工具进行消息的发送和接收。开发者可以参考这些代码来学习如何在自己的项目中实现异步通信和...

    springboot参考指南

    Spring集成 xiii. 34. 基于JMX的监控和管理 xiv. 35. 测试 Spring Boot参考指南 4 i. 35.1. 测试作用域依赖 ii. 35.2. 测试Spring应用 iii. 35.3. 测试Spring Boot应用 i. 35.3.1. 使用Spock测试Spring Boot应用 iv...

    Jboss事物处理

    本项目“HornetQ-JBoss-TS-MySQL-Tomcat-Spring-master”就是一个关于Jboss事物处理的实践案例,涵盖了多个关键组件。 首先,**HornetQ**是JBoss的一个开源消息传递系统,它提供了高性能、高可用性和可扩展性的消息...

    基于Jboss的jms编程

    1. **Spring集成JMS**: 在使用JMS时,Spring框架提供了强大的支持。为了在Jboss上进行JMS编程,你需要将一系列Spring相关的JAR文件放入服务的lib目录中,包括`spring-aop.jar`, `spring-beans.jar`, `spring-...

    SpringBoot启动器Starters使用及原理解析

    Starters可以理解为启动器,它包含了一系列可以集成到应用里面的依赖包,你可以一站式集成Spring及其他技术,而不需要到处找示例代码和依赖包。例如,如果你想使用Spring JPA访问数据库,只要加入spring-boot-...

    JBoss_Enterprise_Application_Platform-7.0

    6. **安全管理**:JBoss EAP 7.0提供了一套强大的安全机制,包括JAAS认证、Spring Security集成、OAuth 2.0和OpenID Connect支持,确保了企业级应用的安全性。 7. **数据存储**:EAP 7.0集成了Infinispan缓存,用于...

    liferay教程

    此外,Liferay还支持SSO(Single Sign-On)和LDAP集成,以便于企业进行身份验证和权限管理。 在实际应用中,Liferay的自定义开发是必不可少的。开发者可以通过编写Java Portlets、JSR-286 Portlets或者Freemarker...

    jboss资料大全,内容丰富,搜之不易

    它不仅支持Java EE的各种服务,如EJB(Enterprise JavaBeans)、JMS(Java Message Service)、JTA(Java Transaction API)等,还提供了对Spring、Hibernate等开源框架的集成。 2. **安装与配置**:JBoss的安装...

Global site tag (gtag.js) - Google Analytics