`

Spring整合ActiveMQ

    博客分类:
  • MQ
 
阅读更多

公司有新的项目-智慧公交服务平台。要求实现公交GPS定位,在地图上动态显示订阅的公交车行车轨迹、轨迹回放等等一些功能。这就要用到消息推送服务中间 件ActiveMQ。采用UDP的方式推送消息。先简单介绍一下Spring整合ActiveMQ,后续将介绍Spring+activeMQ+Flex 消息推送的实现。

一.消息监听
Spring提供了三种 AbstractMessageListenerContainer 的子类,每种各有其特点。
 
第一种:SimpleMessageListenerContainer
      这个消息侦听容器是三种中最简单的。它在启动时创建固定数量的JMS session并在容器的整个生命周期中使用它们。这个类不能动态的适应运行时的要求或参与消息接收的事务处理。然而它对JMS提供者的要求也最低。它只需要简单的JMS API。
 
第二种:DefaultMessageListenerContainer
 
      这个消息侦听器使用的最多。和 SimpleMessageListenerContainer 相反,这个子类可以动态适应运行时侯的要求,也可以参与事务管理。每个收到的消息都注册到一个XA事务中(如果使用 JtaTransactionManager 配置过),这样就可以利用XA事务语义的优势了。这个类在对JMS提供者的低要求和提供包括事务参于等的强大功能上取得了很好的平衡。
 
第三种:ServerSessionMessageListenerContainer
 
     这个监听器容器利用JMS ServerSessionPool SPI动态管理JMS Session。 使用者各种消息监听器可以获得运行时动态调优功能,但是这也要求JMS提供者支持ServerSessionPool SPI。如果不需要运行时性能调整,请使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。 
二.自动将消息转化为Java对象
        转化器在很多组件中都是必不缺少的东西。Spring挺过MessageConverter接口提供了对消息转换的支持。
三.代码
      1.修改activeMQ  conf文件夹下activemq.xml配置文件,加入UDP传输方式(紫的部分)
       
[html]  
<!--     Licensed to the Apache Software Foundation (ASF) under one or more  
    contributor license agreements.  See the NOTICE file distributed with  
    this work for additional information regarding copyright ownership.  
    The ASF licenses this file to You under the Apache License, Version 2.0  
    (the "License"); you may not use this file except in compliance with  
    the License.  You may obtain a copy of the License at  
  
    http://www.apache.org/licenses/LICENSE-2.0  
  
    Unless required by applicable law or agreed to in writing, software  
    distributed under the License is distributed on an "AS IS" BASIS,  
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
    See the License for the specific language governing permissions and  
    limitations under the License.  
  
  -->   
- <!--  START SNIPPET: example  
  -->   
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">  
- <!--  Allows us to use system properties as variables in this configuration file  
  -->   
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
- <property name="locations">  
  <value>file:${activemq.conf}/credentials.properties</value>   
  </property>  
  </bean>  
- <!--  Allows log searching in hawtio console  
  -->   
  <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop" />   
- <!--         The <broker> element is used to configure the ActiveMQ broker. 
     
  -->   
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">  
- <destinationPolicy>  
- <policyMap>  
- <policyEntries>  
- <policyEntry topic=">">  
- <!--  The constantPendingMessageLimitStrategy is used to prevent  
                         slow topic consumers to block producers and affect other consumers  
                         by limiting the number of messages that are retained  
                         For more information, see:  
  
                         http://activemq.apache.org/slow-consumer-handling.html  
  
                      
  -->   
- <pendingMessageLimitStrategy>  
  <constantPendingMessageLimitStrategy limit="1000" />   
  </pendingMessageLimitStrategy>  
  </policyEntry>  
  </policyEntries>  
  </policyMap>  
  </destinationPolicy>  
- <!--             The managementContext is used to configure how ActiveMQ is exposed in  
            JMX. By default, ActiveMQ uses the MBean server that is started by  
            the JVM. For more information, see:  
  
            http://activemq.apache.org/jmx.html  
          
  -->   
- <managementContext>  
  <managementContext createConnector="false" />   
  </managementContext>  
- <!--             Configure message persistence for the broker. The default persistence  
            mechanism is the KahaDB store (identified by the kahaDB tag).  
            For more information, see:  
  
            http://activemq.apache.org/persistence.html  
          
  -->   
- <persistenceAdapter>  
  <kahaDB directory="${activemq.data}/kahadb" />   
  </persistenceAdapter>  
- <!--             The systemUsage controls the maximum amount of space the broker will  
            use before disabling caching and/or slowing down producers. For more information, see:  
            http://activemq.apache.org/producer-flow-control.html  
            
  -->   
- <systemUsage>  
- <systemUsage>  
- <memoryUsage>  
  <memoryUsage percentOfJvmHeap="70" />   
  </memoryUsage>  
- <storeUsage>  
  <storeUsage limit="100 gb" />   
  </storeUsage>  
- <tempUsage>  
  <tempUsage limit="50 gb" />   
  </tempUsage>  
  </systemUsage>  
  </systemUsage>  
- <!--             The transport connectors expose ActiveMQ over a given protocol to  
            clients and other brokers. For more information, see:  
  
            http://activemq.apache.org/configuring-transports.html  
          
  -->   
- <transportConnectors>  
- <!--  DOS protection, limit concurrent connections to 1000 and frame size to 100MB  
  -->   
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <strong><span style="color:#cc33cc;"><transportConnector name="udp" uri="udp://0.0.0.0:8123" /> </span></strong>  
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />   
  </transportConnectors>  
- <!--  destroy the spring context on shutdown to stop jetty  
  -->   
- <shutdownHooks>  
  <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />   
  </shutdownHooks>  
  </broker>  
- <!--         Enable web consoles, REST and Ajax APIs and demos  
        The web consoles requires by default login, you can disable this in the jetty.xml file  
  
        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details  
      
  -->   
  <import resource="jetty.xml" />   
  </beans>  
- <!--  END SNIPPET: example  
  -->   
 
2. applicationContext.xml 
      
[html]  
  <?xml version="1.0" encoding="UTF-8" ?>   
- <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:flex="http://www.springframework.org/schema/flex" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">  
- <!-- %%%%%%%%%%%%%%%%%%*********************消息处理 ACTIVEMQ***************************%%%%%%%%%%%%%  
  -->   
- <!--  JMS TOPIC MODEL  
  -->   
- <!--  TOPIC链接工厂  
  -->   
- <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  <property name="brokerURL" value="udp://localhost:8123" />   
- <!--  UDP传输方式  
  -->   
- <!--    <property name="brokerURL" value="tcp://localhost:61616" />   
  -->   
- <!--  TCP传输方式  
  -->   
  <property name="useAsyncSend" value="true" />   
  </bean>  
- <bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
  <property name="brokerURL" value="udp://localhost:8123" />   
- <!--  UDP传输方式  
  -->   
- <!--    <property name="brokerURL" value="tcp://localhost:61616" />   
  -->   
- <!--  TCP传输方式  
  -->   
  </bean>  
- <!--  定义主题  
  -->   
- <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">  
  <constructor-arg value="normandy.topic" />   
  </bean>  
  <bean id="messageConvertForSys" class="com.tech.gps.util.MessageConvertForSys" />   
- <!--  TOPIC send jms模板  
  -->   
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  <property name="connectionFactory" ref="topicSendConnectionFactory" />   
  <property name="defaultDestination" ref="myTopic" />   
  <property name="messageConverter" ref="messageConvertForSys" />   
- <!--  发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 
  -->   
  <property name="deliveryMode" value="1" />   
  <property name="pubSubDomain" value="true" />   
- <!--  开启订阅模式  
  -->   
  </bean>  
- <!--  消息发送方  
  -->   
- <bean id="topicSender" class="com.tech.gps.util.MessageSender">  
  <property name="jmsTemplate" ref="jmsTemplate" />   
  </bean>  
- <!--  消息接收方  
  -->   
  <bean id="topicReceiver" class="com.tech.gps.util.MessageReceiver" />   
- <!--  主题消息监听容器  
  -->   
- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  <property name="connectionFactory" ref="topicListenConnectionFactory" />   
  <property name="pubSubDomain" value="true" />   
- <!-- true 订阅模式  
  -->   
  <property name="destination" ref="myTopic" />   
- <!--  目的地 myTopic  
  -->   
  <property name="subscriptionDurable" value="true" />   
- <!-- -这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉 
  -->   
  <property name="clientId" value="clientId_1" />   
  <property name="messageListener" ref="topicReceiver" />   
  </bean>  
- <!--  Servlet  
  -->   
- <bean id="ControlServlet1" class="com.tech.gps.servlet.ControlServlet1">  
  <property name="topicSender" ref="topicSender" />   
  </bean>  
  </beans>  
 
3. web.xml
       
[html] 
  <?xml version="1.0" encoding="UTF-8" ?>   
- <web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">  
- <!--  加载spring配置文件applicationContext.xml  
  -->   
- <listener>  
  <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>   
  </listener>  
- <!--  指明spring配置文件在何处  
  -->   
- <context-param>  
  <param-name>contextConfigLocation</param-name>   
  <param-value>classpath*:applicationContext.xml</param-value>   
  </context-param>  
- <servlet>  
  <servlet-name>ControlServlet1</servlet-name>   
  <servlet-class>com.tech.gps.servlet.DelegatingServletProxy</servlet-class>   
  </servlet>  
- <servlet-mapping>  
  <servlet-name>ControlServlet1</servlet-name>   
  <url-pattern>/ControlServlet1</url-pattern>   
  </servlet-mapping>  
- <welcome-file-list>  
  <welcome-file>index11.jsp</welcome-file>   
  </welcome-file-list>  
  </web-app>  
 
4. 消息发送
      
[html]  
package com.tech.gps.util;  
  
import org.springframework.jms.core.JmsTemplate;  
  
   
public class MessageSender {    
      
    private JmsTemplate jmsTemplate;    
    
    public void sendMessage(String msg){    
         
        jmsTemplate.convertAndSend(msg);    
    }  
  
    public JmsTemplate getJmsTemplate() {  
        return jmsTemplate;  
    }  
  
    public void setJmsTemplate(JmsTemplate jmsTemplate) {  
        this.jmsTemplate = jmsTemplate;  
    }    
   
}    
   
5. 消息转换
       
[html]  
package com.tech.gps.util;  
  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.ObjectMessage;  
import javax.jms.Session;  
import javax.jms.TopicPublisher;  
  
import org.springframework.jms.support.converter.MessageConversionException;  
import org.springframework.jms.support.converter.MessageConverter;  
  
public class MessageConvertForSys implements MessageConverter {    
      
    public Message toMessage(Object object, Session session)    
            throws JMSException, MessageConversionException {    
          
        System.out.println("sendMessage:"+object.toString());    
        ObjectMessage objectMessage = session.createObjectMessage();    
        objectMessage.setStringProperty("key",object.toString());    
            
        return objectMessage;    
    }    
       
    public Object fromMessage(Message message) throws JMSException,    
            MessageConversionException {    
          
         
        ObjectMessage objectMessage = (ObjectMessage) message;    
        return objectMessage.getObjectProperty("key");    
    }  
   
}   
6. 消息接收
       
[html] 
package com.tech.gps.util;  
  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.ObjectMessage;  
  
public class MessageReceiver implements MessageListener {    
        
    public void onMessage(Message m) {    
          
        ObjectMessage om = (ObjectMessage) m;    
        try {    
            String key = om.getStringProperty("key");   
            System.out.println(" ");  
            System.out.println("receiveMessage:"+key);      
              
        } catch (JMSException e) {    
            e.printStackTrace();    
        }    
    }  
  
       
}    
7. servlet控制器
       
[html]  
package com.tech.gps.servlet;  
  
import java.io.IOException;  
import java.io.PrintWriter;  
  
import javax.servlet.ServletContext;  
import javax.servlet.ServletException;  
import javax.servlet.http.HttpServlet;  
import javax.servlet.http.HttpServletRequest;  
import javax.servlet.http.HttpServletResponse;  
  
import org.springframework.context.ApplicationContext;  
import org.springframework.web.context.WebApplicationContext;  
import org.springframework.web.context.support.WebApplicationContextUtils;  
   
import com.tech.gps.util.MessageSender;  
   
   
public class ControlServlet1 extends HttpServlet {  
      
    private MessageSender topicSender;  
       
    
    public MessageSender getTopicSender() {  
        return topicSender;  
    }  
  
    public void setTopicSender(MessageSender topicSender) {  
        this.topicSender = topicSender;  
    }  
  
    public void init() throws ServletException {  
   
    }  
       
    public void doGet(HttpServletRequest request, HttpServletResponse response)  
            throws ServletException, IOException {  
  
             doPost(request,response);  
    }  
  
       
    public void doPost(HttpServletRequest request, HttpServletResponse response)  
            throws ServletException, IOException {  
    
          request.setCharacterEncoding("utf-8");  
           
          for(int i =0;i<10;i++){  
              
            try {  
                 Thread.sleep(1000);  
              } catch (InterruptedException e) {  
                   
                  e.printStackTrace();  
              }     
                
              topicSender.sendMessage("坐标:118.36582,37.2569812");  
            
          }  
           
    }  
  
}  
8.Spring整合Servlet
  
[html]  
package com.tech.gps.servlet;  
  
import java.io.IOException;  
  
import javax.servlet.GenericServlet;  
import javax.servlet.Servlet;  
import javax.servlet.ServletException;  
import javax.servlet.ServletRequest;  
import javax.servlet.ServletResponse;  
  
import org.springframework.web.context.WebApplicationContext;  
import org.springframework.web.context.support.WebApplicationContextUtils;  
  
public class DelegatingServletProxy extends GenericServlet{  
  
     private String targetBean;    
     private Servlet proxy;   
  
      
    public void service(ServletRequest req, ServletResponse res)  
            throws ServletException, IOException {  
           
          proxy.service(req, res);    
    }    
    
       
    public void init() throws ServletException {    
        this.targetBean = getServletName();    
        getServletBean();    
        proxy.init(getServletConfig());    
    }    
    
    private void getServletBean() {    
        WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());    
        this.proxy = (Servlet) wac.getBean(targetBean);    
    }    
}    
   
9. 输出
       sendMessage:坐标:128.36582,32.2569812
       receiveMessage:坐标:128.36582,32.2569812
分享到:
评论

相关推荐

    Spring集成ActiveMQ配置

    Spring集成ActiveMQ配置详解 Spring框架与ActiveMQ的集成,为开发者提供了一种高效、可靠的JMS消息处理机制。在企业级应用中,这种集成能够极大地提升系统的响应速度和容错能力,特别是在需要异步通信和分布式事务...

    spring整合Activemq源码

    《Spring整合ActiveMQ深度解析》 在现代企业级应用开发中,消息队列(Message Queue)扮演着重要的角色,它能够有效地实现系统间的解耦,提高系统的可扩展性和并发处理能力。Spring作为Java领域的主流框架,与...

    Spring整合activemq

    首先,我们要理解Spring整合ActiveMQ的核心概念。在Spring框架中,我们可以通过使用Spring的JMS模块来配置ActiveMQ。这个过程涉及到以下几个关键点: 1. **JMS配置**:在Spring的配置文件中,我们需要定义一个`...

    Spring和ActiveMQ整合的完整实例

    将Spring与ActiveMQ整合,可以轻松地在Spring应用中实现消息队列的功能,提高系统的可扩展性和可靠性。 首先,让我们了解Spring框架如何支持消息传递。Spring提供了JmsTemplate类,这是一个模板类,用于简化发送和...

    spring 整合activemq实现自定义动态消息队列

    百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...

    Spring集成ActiveMQ配置.docx

    Spring 集成 ActiveMQ 配置 Spring 集成 ActiveMQ 配置是指将 Spring 框架与 ActiveMQ 消息队列集成,以实现基于 JMS(Java Message Service)的消息传递。ActiveMQ 是 Apache 软件基金会的一个开源的消息队列系统...

    Spring整合ActiveMQ简单实例

    **Spring 整合 ActiveMQ 简单实例** 在当今的分布式系统中,消息队列(Message Queue)作为异步处理、解耦组件的关键技术,被广泛应用。Spring 框架与 ActiveMQ 的整合,使得开发者能够轻松地在 Spring 应用程序中...

    spring整合activemq的maven工程

    Spring通过其`spring-jms`模块提供了对JMS的支持,可以方便地与消息代理如ActiveMQ进行集成。主要涉及以下几个核心组件: 1. **ConnectionFactory**: 这是创建JMS连接的工厂,Spring通过`JmsTransactionManager`或`...

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    spring集成activemq例子demo

    在Spring框架中集成ActiveMQ,我们可以创建一个高效的、可扩展的系统,利用消息队列来解耦生产者和消费者,提高系统的响应速度和可靠性。以下将详细介绍如何进行Spring与ActiveMQ的集成,并提供一些关键知识点。 1....

    spring集成activemq演示queue和topic 持久化

    以上就是Spring集成ActiveMQ的基本流程,涉及Queue和Topic的创建、持久化到MySQL以及消息的发送与接收。这个测试Demo涵盖了消息中间件的核心功能,可以帮助你理解并实践消息传递系统在实际项目中的应用。

    spring整合activemq

    Spring整合ActiveMQ是Java开发中常见的一种技术组合,主要用于实现应用程序间的异步消息通信。Spring框架提供了对ActiveMQ的高度集成,使得开发者能够轻松地在应用中加入消息队列功能,提高系统的可扩展性和可靠性。...

    Spring整合ActiveMQ实现队列和主题发布订阅通信

    本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和主题(Topic)两种不同的通信方式。队列遵循“先进先出”原则,每个消息只有一个消费者;而主题支持多播,允许多个消费者同时接收消息。 首先,你需要...

    Spring集成ActiveMQ

    在Spring集成ActiveMQ时,通常会采用Spring的配置文件形式来配置消息工厂(ConnectionFactory)、消息队列目的地(Destination)、消息监听容器(MessageListenerContainer)等。这样的配置方式可以让Spring管理...

    spring整合activeMQ

    本文将围绕"Spring整合ActiveMQ"这一主题,深入讲解如何在Spring框架中实现ActiveMQ的集成。 1. **Spring与ActiveMQ的整合基础** - Spring框架提供了一套完整的消息编程模型,包括Message、MessageListener、...

Global site tag (gtag.js) - Google Analytics