`
dwj147258
  • 浏览: 195595 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

spring与activeMQ(嵌入式启动activemq)

阅读更多

 一、准备工作

        一般的我们发送jms消息都需要启动activemq来当做服务器,消息发送到activemq后,在通过监听activemq的消息来接收到消息,activemq下载地址http://activemq.apache.org/download.html,然后通过activemq.bat来启动, 在这里我将通过一个例子来慢慢的讲解spring嵌入式启动ActiveMQ以及ActiveMQ监听器,发送订阅和点对点消息等,用到的是springMVC,这样我们就不需要启动activemq服务器就能够发送消息了,需要导入springmvc相关的jar包和activemqall包activemqjar包可以在下载的文件夹中找到。

二、从配置文件说起

      首先是web项目的起点说起,web.xml:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
  <display-name>StudentManager</display-name>
  <welcome-file-list>
    <welcome-file>index.html</welcome-file>
  </welcome-file-list>
  <servlet>
		<servlet-name>websocket</servlet-name>
		<servlet-class>main.java.test.websocket.WebSocketInitServlet</servlet-class>
	</servlet>
	<servlet-mapping>
		<servlet-name>websocket</servlet-name>
		<url-pattern>/websocket.ws</url-pattern>
	</servlet-mapping>
	<!-- <context-param>
		<param-name>contextConfigLocation</param-name>
		<param-value>classpath:main/resource/ActiveMQ.xml</param-value>
	</context-param> -->
  	<servlet>  
     <servlet-name>spring</servlet-name>  
      <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
       <init-param>  
            <param-name>contextConfigLocation</param-name>  
            <param-value>classpath:main/resource/spring-servlet.xml</param-value>  
        </init-param>  
      <load-on-startup>1</load-on-startup>  
  	</servlet>
 	 <servlet-mapping>  
      <servlet-name>spring</servlet-name>  
      <url-pattern>/*</url-pattern>  
  	</servlet-mapping>  
  	<context-param>  
      <param-name>contextConfigLocation</param-name>  
      <param-value>classpath:main/resource/applicationContext.xml</param-value>  
  </context-param> 
  
</web-app>

 这里面大家应该都能看懂,配置文件定义了两个servlet一个websocket一个spring,第一个大家可以不用关心,springservlet是springMVC的核心,这里关注的是spring的配置文件,applicationContext.xml这里先贴出配置文件,

<?xml version="1.0" encoding="UTF-8"?>
  <beans xmlns="http://www.springframework.org/schema/beans"  
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          xmlns:context="http://www.springframework.org/schema/context"
          xmlns:jms="http://www.springframework.org/schema/jms"
          xmlns:amq="http://activemq.apache.org/schema/core"
        xsi:schemaLocation="  
        	  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
              http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
              http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  	<import resource="myActiveMQDemo.xml"/>
  </beans>

 这里就是引入了一个activemq相关的配置文件,myActiveMQDemo.xml就是配置spring中的ActiveMQ如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
        
     <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> 
   	 <property name="config" value="classpath:main/resource/ActiveMQConfig.xml" /> 
   	 <property name="start"  value="true" /> 
	</bean>  
	 <bean id="myamqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
   		 <property name="brokerURL" value="tcp://localhost:61616"/>  
   		 <property name="trustedPackages">
        <list>
            <value>main.java</value>
        </list>
    </property> 
	</bean>
     <bean id="myconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="myamqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>
    

    <bean id="myjmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="false" />
    </bean>

    <bean id="myjmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="true" />
    </bean>
    <bean id="mymsgQueueSender" class="main.java.test.jms.mydemo.Service.MsgQueueSenderImp">
    	<property name="jmsTemplate" ref="myjmsQueueTemplate"/>
    </bean>
	 <bean id="mymsgTopicSender" class="main.java.test.jms.mydemo.Service.MsgTopicSenderImp">
    	<property name="jmsTemplate" ref="myjmsTopicTemplate"/>
    </bean>
	<bean id="myqueueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  		  <constructor-arg>  
     		   <value>msg_topic_two</value>  
  		  </constructor-arg>  
	</bean>  
	<!--这个是主题目的地,一对多的-->  
	<bean id="mytopicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
 		   <constructor-arg value="msg_topic_one"/>  
	</bean>
	<bean id="mytopicDestination2" class="org.apache.activemq.command.ActiveMQTopic">  
 		   <constructor-arg value="msg_topic_two"/>  
	</bean>
	
	<!-- 消息监听器 -->  
	<bean id="mytopicConsumer" class="main.java.test.jms.mydemo.consumer.TopicConsumer">
		<property name="destination" ref="mytopicDestination"/>
	</bean>
	<bean id="mytopicConsumer2" class="main.java.test.jms.mydemo.consumer.TopicConsumer2">
		<property name="destination" ref="mytopicDestination"/>
	</bean>       
	
    <bean id="myqueueDestinationMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.TopicConsumer"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="mytopicDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter" /> 
    </bean> 
    
     <bean id="myqueueDestinationMessageListenerAdapter2" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.TopicConsumer2"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter2" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="mytopicDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter2" /> 
    </bean>
    
     <bean id="myqueueDestinationMessageListenerAdapter3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.QueueConsumer"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter3" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="myqueueDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter3" /> 
    </bean>
    <!-- 方式二  end-->
</beans>  

 这个配置文件首先配置了一个broker,这是ActiveMQ服务的关键bean,有了他我们就可以不用启动ActiveMQ服务了,而ActiveMQConfig配置文件的内容如下:

 

 

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <bean
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
    <broker useJmx="false" persistent="false"
        xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://localhost:61616"/>
        </transportConnectors>
    </broker>
</beans>

 到这里,配置文件就结束了,看到这里的同学可能觉得有点绕,这里我解释一下,首先进入spring的配置文件,applicationContext.xml这个配置文件配置了spring中配置ActiveMQ的全部配置信息的文件myActiveMQDemo.xml,myActiveMQDemo.xml这个配置文件就是配置所有的ActiveMQ的信息,首先,我们嵌入式启动ActiveMQ就需要定义出他的关键服务:broker,broker是服务我们jms消息的关键服务,classpath:main/resource/ActiveMQConfig.xml这个则是配置了broker服务的关键信息,其实就是配置了一个连接地址,接下来回到myActiveMQDemo.xml这个配置文件,往下走...

 

 

<bean id="myamqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
   		 <property name="brokerURL" value="tcp://localhost:61616"/>  
   		 <property name="trustedPackages">
        <list>
            <value>main.java</value>
        </list>
    </property> 
	</bean>
     <bean id="myconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="myamqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>

 这两个bean是配置ActiveMQ的连接信息,第一个是配置连接工厂,真正连接的是myconnectionFactory这个bean,我们使用的是CachingConnectionFactory,接下来就是配置发送消息的spring模板bean,因为虽然我们配置了连接工厂,就如同spring的数据库连接模板一样,他为你写好了很多的样板代码,我们只需要调用模板中的发送方法就可以发送消息了,模板配置:

<bean id="myjmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="myconnectionFactory" />
        <property name="pubSubDomain" value="true" />
    </bean>
    <bean id="mymsgQueueSender" class="main.java.test.jms.mydemo.Service.MsgQueueSenderImp">
    	<property name="jmsTemplate" ref="myjmsQueueTemplate"/>
    </bean>

 这两个template可以说是一样的,pubSubDomain属性时配置发送订阅消息还是发送点对点消息,第一个bean就表示这是一个可以发送订阅消息的bean,只要把它注入到服务类中,调用他的方法就可以发送订阅消息,第二个bean是配置点对点的template。

 

 

 

<bean id="mymsgQueueSender" class="main.java.test.jms.mydemo.Service.MsgQueueSenderImp">
    	<property name="jmsTemplate" ref="myjmsQueueTemplate"/>
    </bean>
	 <bean id="mymsgTopicSender" class="main.java.test.jms.mydemo.Service.MsgTopicSenderImp">
    	<property name="jmsTemplate" ref="myjmsTopicTemplate"/>
    </bean>

 这里就是将template注入到服务类中,后面会贴出服务类的java代码,接下来就是配置目的地:

<bean id="myqueueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  		  <constructor-arg>  
     		   <value>msg_topic_two</value>  
  		  </constructor-arg>  
	</bean>  
	<!--这个是主题目的地,一对多的-->  
	<bean id="mytopicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
 		   <constructor-arg value="msg_topic_one"/>  
	</bean>
	<bean id="mytopicDestination2" class="org.apache.activemq.command.ActiveMQTopic">  
 		   <constructor-arg value="msg_topic_two"/>  
	</bean>

 配置了三个目的地,分别是点对点消息目的地,msg_topic_two , 订阅消息目的地:msg_topic_one和msg_topic_two接下来就是配置消息监听器,消息监听器有三种,可以参考;

 

http://dwj147258.iteye.com/blog/2330295,这里我们是用的第三种

 

先贴出xml代码:

<bean id="myqueueDestinationMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.TopicConsumer"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
 <property name="defaultResponseDestination" ref="defaultResponseQueue"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="mytopicDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter" /> 
    </bean> 
    
     <bean id="myqueueDestinationMessageListenerAdapter2" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.TopicConsumer2"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter2" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="mytopicDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter2" /> 
    </bean>
    
     <bean id="myqueueDestinationMessageListenerAdapter3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
     <property name="delegate">  
        <bean class="main.java.test.jms.mydemo.consumer.QueueConsumer"/> 
    </property>
     <property name="defaultListenerMethod" value="onMessage"/> 
    </bean>  
    <bean id="myjMSReceiverQueueListenerAdapter3" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">    
          <property name="connectionFactory" ref="myconnectionFactory" />    
        <property name="destination" ref="myqueueDestination" />    
        <property name="messageListener" ref="myqueueDestinationMessageListenerAdapter3" /> 
    </bean>

这是定义了三个消息监听器,拿一个出来解释就可以了,myqueueDestinationMessageListenerAdapter是定义了一个消息监听器,里面的属性delegate时指出消息监听类,defaultListenerMethod是指定接收消息的方法,defaultResponseDestination是指定回复消息的目的地,接下来就是配置消息监听器容器,里面的属性分别是配置连接工厂,目的地,以及消息监听器。到这里,配置问价那就已经全部结束。

 

三、前端和后台

 在前端主要就是一个jsp页面:

 

<%@ page language="java" contentType="text/html; charset=utf-8"
    pageEncoding="utf-8"%>
<%
	String path = request.getContextPath();
	System.out.println(path);
	String basePath = request.getScheme() + "://"
			+ request.getServerName() + ":" + request.getServerPort()
			+ path + "/";
	System.out.println(basePath);
%>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<base href="<%=basePath%>">

<title>ActiveMQ Demo程序</title>

<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<script type="text/javascript" src="js/jquery.js"></script>
<style type="text/css">
.h1 {
	margin: 0 auto;
}

#producer{
	width: 48%;
 	border: 1px solid blue; 
	height: 80%;
	align:center;
	margin:0 auto;
}

body{
	text-align :center;
} 
div {
	text-align :center;
}
textarea{
	width:80%;
	height:100px;
	border:1px solid gray;
}
button{
	background-color: rgb(62, 156, 66);
	border: none;
	font-weight: bold;
	color: white;
	height:30px;
}
</style>
<script type="text/javascript">
	
	function send(controller){
		if($("#message").val()==""){
			$("#message").css("border","1px solid red");
			return;
		}else{
			$("#message").css("border","1px solid gray");
		}
		debugger
		$.ajax({
			type : "POST",
			url:"activemq/"+controller,
			dataType:'text', 
			data:{"message":$("#message").val()},
			success:function(data){
				if(data=="suc"){
					$("#status").html("<font color=green>发送成功</font>");
					setTimeout(clear,1000);
				}else{
					$("#status").html("<font color=red>"+data+"</font>");
					setTimeout(clear,5000);
				}
			},
			error:function(data){
				$("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>");
				setTimeout(clear,5000);
			}
			
		});
	}
	
	function clear(){
		$("#status").html("");
	}

</script>
</head>

<body>
	<h1>Hello ActiveMQ</h1>
	<div id="producer">
		<h2>Producer</h2>
		<textarea id="message"></textarea>
		<br>
		<button onclick="send('queueSender')">发送Queue消息</button>
		<button onclick="send('topicSender')">发送Topic消息</button>
		<br>
		<span id="status"></span>
	</div>
</body>
</html>

你需要引入你的jquery库,接着是控制器代码:

 

 

package main.java.test.jms.SpringActivemq.controller;



import javax.jms.Destination;

import main.java.bean.LoginUserInfo;
import main.java.test.jms.SpringAcrivemq2.ProducerServiceImpl;
import main.java.test.jms.SpringActivemq.mq.producer.queue.QueueSender;
import main.java.test.jms.SpringActivemq.mq.producer.topic.TopicSender;
import main.java.test.jms.mydemo.Service.MsgQueueSenderImp;
import main.java.test.jms.mydemo.Service.MsgTopicSenderImp;
import main.java.test.jms.mydemo.consumer.ConstUtil;
import main.java.util.BeanUtil;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;


/**
 * 
 * @author liang
 * @description controller娴嬭瘯
 */
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
	
	
	/**
	 * 鍙戦�娑堟伅鍒伴槦鍒�
	 * Queue闃熷垪锛氫粎鏈変竴涓闃呰�浼氭敹鍒版秷鎭紝娑堟伅涓�棪琚鐞嗗氨涓嶄細瀛樺湪闃熷垪涓�
	 * @param message
	 * @return String
	 */
	@ResponseBody
	@RequestMapping("/queueSender")
	public String queueSender(@RequestParam("message")String message){
		String opt="";
		try {
//			queueSender.send("test.queue", message);
			MsgQueueSenderImp sender = (MsgQueueSenderImp)BeanUtil.getBeanByName("mymsgQueueSender");
			 LoginUserInfo user = new LoginUserInfo();
		        user.setId(2);
		        user.setPassWord("123");
		        user.setUserName("dengwei") ;
			sender.send(ConstUtil.MSG_TOPIC_TWO, user) ;
			opt = "suc";
		} catch (Exception e) {
			opt = e.getCause().toString();
		}
		return opt;
	}
	
	/**
	 * 鍙戦�娑堟伅鍒颁富棰�
	 * Topic涓婚 锛氭斁鍏ヤ竴涓秷鎭紝鎵�湁璁㈤槄鑰呴兘浼氭敹鍒�
	 * 杩欎釜鏄富棰樼洰鐨勫湴鏄竴瀵瑰鐨�
	 * @param message
	 * @return String
	 */
	@ResponseBody
	@RequestMapping("/topicSender")
	public String topicSender(@RequestParam("message")String message){
		String opt = "";
		try {
			System.out.println("fasong");
//			topicS.send("topic", message);
			MsgTopicSenderImp sender = (MsgTopicSenderImp)BeanUtil.getBeanByName("mymsgTopicSender");
			 LoginUserInfo user = new LoginUserInfo();
		        user.setId(2);
		        user.setPassWord("123");
		        user.setUserName("dengwei") ;
			sender.send(ConstUtil.MSG_TOPIC_ONE, user) ;
			opt = "suc";
		} catch (Exception e) {
			opt = e.getCause().toString();
		}
		return opt;
	}
	
}

 实例类代码:

package main.java.bean;

import java.io.Serializable;

public class LoginUserInfo implements Serializable{

	private static final long serialVersionUID = -5668675616184265758L;

	private String userName ; 
	
	private String passWord ; 
	
	private String role ; 
	
	private int id  ;

	public String getUserName() {
		return userName;
	}

	public void setUserName(String userName) {
		this.userName = userName;
	}

	public String getPassWord() {
		return passWord;
	}

	public void setPassWord(String passWord) {
		this.passWord = passWord;
	}

	public String getRole() {
		return role;
	}

	public void setRole(String role) {
		this.role = role;
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	@Override
	public String toString() {
		return "LoginUserInfo [userName=" + userName + ", passWord=" + passWord
				+ ", role=" + role + ", id=" + id + "]";
	}
	
	
	
}

 发送订阅消息接口:

package main.java.test.jms.mydemo.interfa;

import java.io.Serializable;

public interface MsgTopicSender {
	
	public void send(String topic , Serializable msg);
	
}

 发送点对点消息接口:

package main.java.test.jms.mydemo.interfa;

import java.io.Serializable;

public interface MsgQueueSender {
	
	public void send(String queue , Serializable msg);
	
}

 消费者接口:

 

package main.java.test.jms.mydemo.interfa;

import java.io.Serializable;

public interface Consumer {
	public void onMessage(Serializable msg);
	
	public String getDest() ;
}

 目的地:

package main.java.test.jms.mydemo.consumer;

public class ConstUtil {
	public static String MSG_TOPIC_ONE = "msg_topic_one" ;
	
	public static String MSG_TOPIC_TWO ="msg_topic_two" ;
}

 订阅消息发送类:

 

package main.java.test.jms.mydemo.Service;

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import main.java.test.jms.mydemo.interfa.MsgTopicSender;

public class MsgTopicSenderImp implements MsgTopicSender{
	
	private JmsTemplate jmsTemplate ;
	
	@Override
	public void send(String topic,final Serializable msg) {
		jmsTemplate.send(topic, new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
            	ObjectMessage message = session.createObjectMessage();
            	message.setObject(msg);
                return message;  
            }  
        });
	}

	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	
}

 

 

 

点对点消息发送类:

 

package main.java.test.jms.mydemo.Service;

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import main.java.test.jms.mydemo.interfa.MsgQueueSender;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MsgQueueSenderImp implements MsgQueueSender{
	
	private JmsTemplate jmsTemplate ;
	@Override
	public void send(String queue, final Serializable msg) {

		jmsTemplate.send(queue, new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
            	ObjectMessage message = session.createObjectMessage();
            	message.setObject(msg);
                return message;  
            }  
        });
		
	}
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
}

 订阅消息消费者1:

package main.java.test.jms.mydemo.consumer;

import java.io.Serializable;

import javax.jms.Destination;

import main.java.bean.LoginUserInfo;
import main.java.test.jms.mydemo.interfa.Consumer;

public class TopicConsumer implements Consumer{
	private Destination destination ;
	
	@Override
	public void onMessage(Serializable msg) {
		System.out.println("Topicconsumer receive");
		if(msg instanceof LoginUserInfo){
			LoginUserInfo user = (LoginUserInfo)msg ;
			System.out.println(user);
		}
	}
	public void handleMessage(Serializable msg){
		System.out.println("handmessage");
	}
	@Override
	public String getDest() {
		return ConstUtil.MSG_TOPIC_ONE ;
	}

	public Destination getDestination() {
		return destination;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}

}

 订阅消息消费者2:

package main.java.test.jms.mydemo.consumer;

import java.io.Serializable;

import javax.jms.Destination;

import main.java.bean.LoginUserInfo;
import main.java.test.jms.mydemo.interfa.Consumer;

public class TopicConsumer2 implements Consumer{
	private Destination destination ;
	@Override
	public void onMessage(Serializable msg) {
		System.out.println("Topicconsumer2 receive");
		if(msg instanceof LoginUserInfo){
			LoginUserInfo user = (LoginUserInfo)msg ;
			System.out.println(user);
		}
	}
	
	@Override
	public String getDest() {
		return ConstUtil.MSG_TOPIC_ONE ;
	}

	public Destination getDestination() {
		return destination;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}

}

 点对点消息消费者:

package main.java.test.jms.mydemo.consumer;

import java.io.Serializable;

import main.java.bean.LoginUserInfo;
import main.java.test.jms.mydemo.interfa.Consumer;

public class QueueConsumer implements Consumer{

	@Override
	public void onMessage(Serializable msg) {
		System.out.println("QueueConsumer receive");
		if(msg instanceof LoginUserInfo){
			LoginUserInfo user = (LoginUserInfo)msg ;
			System.out.println(user);
		}
	}

	@Override
	public String getDest() {
		// TODO Auto-generated method stub
		return null;
	}

}

 附上获取spring管理的bean的工具类:

package main.java.util;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.web.context.support.WebApplicationObjectSupport;

public class BeanUtil extends WebApplicationObjectSupport{
	private static ApplicationContext ac = new FileSystemXmlApplicationContext("classpath:main/resource/applicationContext.xml");
	public static Object getBeanByName(String name ){
		return ac.getBean(name);
	}
	
}

四、一些问题

    过程中遇到一些问题,

①发送一次订阅消息,接收到多次一样的,这是因为获取bean的工具类没有吧applicationContext静态,每调用一次就会加载一次导致创建了多个消息监听bean

②发送消息传输对象的时候总是报序列化异常,这是因为没有吧传输对象的包设置为白名单,具体配置是在配置连接工厂的时候,在brokerURL后跟一个属性,trustedPackages,然后列出白名单包列表。如下:

 <bean id="myamqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
   		 <property name="brokerURL" value="tcp://localhost:61616"/>  
   		 <property name="trustedPackages">
        <list>
            <value>main.java</value>
        </list>
    </property> 

 

这里附上嵌入启动ActiveMQ的另一边值得推荐的博文http://www.cnblogs.com/dead-trap-ramble/archive/2013/11/23/3439060.html

 

 

 

0
0
分享到:
评论

相关推荐

    ActiveMQ学习笔记之四--启动嵌入式Broker(纯代码方式)

    在本篇ActiveMQ学习笔记中,我们将探讨如何通过纯代码方式启动一个嵌入式的Broker,这对于测试、开发或者快速原型构建非常有用。ActiveMQ是一个开源的消息代理,它遵循Java消息服务(JMS)规范,提供了高可靠性的...

    spring+activeMQ 嵌入式配置 完整demo(包含jar包)

    运行ActiveMQ_Demo中的示例项目,你可以看到Spring应用成功地与嵌入式的ActiveMQ服务器交互,实现了消息的发送和接收。这个完整的demo可以帮助开发者快速理解并实践Spring与ActiveMQ的集成,为构建高效、可靠的...

    spring-activemq-integration

    要使用嵌入式ActiveMQ而不是外部代理,请在pom.xml中添加以下依赖项。在Docker上运行ActiveMQ 泊坞窗运行-it --rm -p 8161:8161 -p 61616:61616 vromero / activemq-artemisapplication.properties spring....

    Linux安装ActiveMQ.doc

    在开发环境中,ActiveMQ通常在Windows上通过命令行或集成于Spring框架内以嵌入式方式运行。然而,对于生产环境,部署在Linux服务器上是更稳定的选择。以下是一个在Linux系统上安装ActiveMQ的详细步骤。 首先,安装...

    Apache ActiveMQ Artemis.pdf

    20. Maven插件、单元测试、嵌入式ActiveMQ Artemis:涵盖了如何使用Maven插件,如何进行单元测试,以及如何将ActiveMQ Artemis嵌入到应用程序中。 整体而言,ActiveMQ Artemis用户手册是一份全面的文档,旨在指导...

    activeMQ学习

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open Message Broker Protocol,即AMQP...同时,熟悉ActiveMQ与其他开源框架(如Spring、Camel等)的整合也是提升开发能力的重要一步。

    ActiveMQ-Artemis .pdf

    18. **嵌入式代理和集成技术**:提供了如何将ActiveMQ Artemis嵌入到不同的应用程序服务器和框架中,以及如何使用CDI、Spring等集成技术。 19. **操作拦截、数据工具和Maven插件**:介绍了操作拦截以及如何使用数据...

    ActiveMQ电子书

    - 在JVM中嵌入式启动:进入`example`目录,执行`ant embedBroker`命令。 4. **管理界面**:通过浏览器访问`http://localhost:8161/admin`来管理ActiveMQ服务器。 #### 三、运行示例程序 - **Queue消息示例** - ...

    activemq-rar-5.3.1.rar

    4. "spring-osgi-core-1.2.1.jar" 和 "spring-core-2.5.6.jar":Spring的OSGi支持和核心库,使得ActiveMQ可以与OSGi容器集成,提供了模块化的功能。 5. "log4j-1.2.14.jar":日志框架Log4j,用于记录应用程序的运行...

    activemq-rar-5.4.1.rar

    RAR(Resource Adapter Archive)是Java EE应用程序服务器中用于部署资源适配器的格式,它允许ActiveMQ与这些服务器无缝集成。在这个rar包中,包含了运行ActiveMQ所需的各种依赖库,以支持企业级的消息传递功能。 ...

    spring-boot-jms:演示如何在Spring Boot中使用嵌入式activemq并将json消息添加到队列中

    在Spring Boot中如何使用嵌入式ActiveMQ代理的示例: -The example shows how to add JSON string to the queue as well. 先决条件 -Java 8 -Gradle 2.3+ 安装 运行junit $ gradle test 运行演示 $ gradle bootRun

    activemq-rar-4.1.1.rar

    1. **spring-2.0.jar**:这是Spring框架的早期版本,Spring是一个广泛应用的Java企业级应用开发框架,它提供了依赖注入、事务管理、AOP(面向切面编程)等功能,对于ActiveMQ的集成和配置非常关键。 2. **derby-...

    springboot-activemq-demo:Spring Boot 2.x ActiveMQ 5.x演示

    Spring Boot ActiveMQ演示一个使用嵌入式ActiveMQ进行演示的Spring Boot应用程序: 重新送货延迟接收和回复模式跑步启动应用程序并提交一条消息,以查看延迟重新交付。 $ curl -XPOST ...

    spring-boot-artemis-clustered-topic:一个示例项目,以集群模式使用主题(发布-订阅)演示通过Apache ActiveMQ Artemis 2.4.0在两个spring boot应用程序的生产者和使用者之间的异步通信。

    Spring Boot Artemis集群主题一个示例项目,通过集群模式下的topic (发布-订阅),通过Apache ActiveMQ Artemis 2.4.0演示了两个Spring Boot应用程序生产者和使用者之间的异步通信。介绍Apache ActiveMQ Artemis是...

    spring-boot-mmanyexamples.zip

    Spring Boot通过提供默认配置,使得开发者能够快速启动项目。只需添加必要的依赖,就可以创建一个完整的可运行的JAR或WAR。"spring-boot-mmanyexamples"可能包含了多个示例项目,每个项目都是一个独立的Spring Boot...

    spring-integration-jms-and-activemq

    它使用嵌入式 ActiveMQ 代理。 然后系统会提示您运行两个演示之一: 网关演示 通道适配器演示 控制台输出应如下所示: ========================================================= Welcome to the Spring ...

    Spring Boot面试题(2022最新版)-重点

    - `bootstrap.properties` 通常用于 Bootstrap 的上下文中,配置在 Spring 应用启动前就生效的内容,比如云平台配置等。 - `application.properties` 则用于配置 Spring 应用本身的行为。 **2.8 什么是 Spring ...

    Spring3.x 企业级应用开发源码库文件2

    Spring Data JPA或MyBatis等持久层框架可以轻松地与HSQLDB集成,进行快速的本地数据存储。 这些库文件的组合使用,可以构建出一个完整的、功能丰富的Spring 3.x企业级应用系统,涵盖了数据持久化、文件处理、消息...

    activemq-mule-embedded

    mule-starter-app-入门级Spring Boot Mule嵌入式应用程序 内容 博客 请查看。 用法 通过Gradle git clone https://github.com/glawson6/mule-starter-app.git cd mule-starter-app ...使用./gradlew进行./gradlew ...

    1.JeeSpringCloud介绍(推荐)1

    JeeSpringCloud是一款基于SpringBoot、SpringMVC、Mybatis、Redis、ActiveMq和SpringCloud的微服务分布式代码生成的敏捷开发系统架构。它的设计目的是为了简化开发流程,提高开发效率,尤其适用于大型和中型企业。...

Global site tag (gtag.js) - Google Analytics