`
bh三少
  • 浏览: 102073 次
  • 性别: Icon_minigender_1
  • 来自: 北海
社区版块
存档分类
最新评论

ActiveMQ基础实践

 
阅读更多

一、背景知识

1.JMS

JMSJava Message ServiceJava消息服务,是一组应用程序接口规范,是Java平台中关于面向消息中间件(MOM)的API,用于在多个应用程序之间,或分布式系统中发送和接收消息,进行异步通信。JMS是一套与平台无关的API,目前绝大部分的MOM厂商都对JMS提供了支持。

(可以下载JMS的规范文档,里面详细介绍了JMS的更多特性)

2.ActiveMQ

ActiveMQApache社区的开源消息产品,是一个完全支持JMS1.1Java EE1.4规范的JMS Provider的实现。ActiveMQ支持多种语言和协议编写的客户端,如,语言:JavaC/C++C#RubyPerlPythonPHP,协议:OpenWireSSLStompTCPUDPXMPPRESTNIOWS等。ActiveMQ易于与中间件服务器、Spring等进行集成,是ServiceMixGeronimo默认的消息系统。

更多ActiveMQ的特性与应用,访问:http://activemq.apache.org/index.html

二、实践环境配置

    本教程使用的中间件服务器为Tomcat,三个Tomcat分别部署三个独立的应用,一个用于生产消息,另外两个用于消费消息;一个独立部署的ActiveMQ服务器。本教程使用的技术主要有:JMSActiveMQSpring IOCSpring JMS等。

1.ActiveMQ的安装

    Apache官网下载ActiveMQ,解压下载的Apache ActiveMQ包,到${ACTIVEMQ_HOME}/bin目录运行activemq.bat,启动ActiveMQ服务。

2.Tomcat的配置

如果是在一台机器实践本教程,需要Tomcat的服务端口,否则会发生端口占用的异常,到${TOMCAT_HOME}/conf/server.xml下修改端口。

    Tomcat容器中配置JMS管理对象,JMS管理对象包括两个:ConnectionFactoryDestinationDestination包括QueueTopic。配置在容器中的JMS管理对象,在应用中可以通过JNDI的方式获取。当然还有其他的方式,后面会有介绍。在${TOMCAT_HOME}/confcontext.xml文件中添加以下内容(三个Tomcat都要做相同的配置):

<Resource name="jms/QueueConnectionFactory"
		auth="Container"
		type="org.apache.activemq.ActiveMQConnectionFactory"
		description="JMS Queue ConnectionFactory"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100"
		brokerName="QueueActiveMQBroker" />
<Resource name="jms/TopicConnectionFactory"
		auth="Container"
		type="org.apache.activemq.ActiveMQConnectionFactory"
		description="JMS Topic ConnectionFactory"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100"
		brokerName="TopicActiveMQBroker" />
<Resource name="jms/TestQueue1"
		auth="Container"
		type="org.apache.activemq.command.ActiveMQQueue"
		description="Test JMS Queue"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		physicalName="testQueue1" />
<Resource name="jms/TestTopic1"
		auth="Container"
		type="org.apache.activemq.command.ActiveMQTopic"
		description="Test JMS Topic"
		factory="org.apache.activemq.jndi.JNDIReferenceFactory"
		physicalName="testTopic1" />

说明:上面配置了两个ConnectionFactory,一个用于Queue,一个用于Topic,配置了一个队列testQueue1和一个主题testTopic161616ActiveMQ默认的监听连接的端口;failover连接机制可以在ActiveMQ服务down掉并恢复后尝试去连接ActiveMQ服务,initialReconnectDelay表示重新连接延迟时间。
    配置完成后,我们可以启动其中的一个Tomcat,在浏览器中输入:http://localhost:61616/admin,就会看到ActiveMQweb管理界面,分别点击QueuesTopics,可以看到我们创建的QueueTopic,如下图:

三、编写应用

1.编写消息生产者应用

    编写一个简单的消息生产者应用并部署到Tomcat

(1)添加依赖包

    添加ActiveMQSpringJMS等依赖包,如下图所示:

说明:ActiveMQjar包在${ACTIVEMQ_HOME}/lib目录下可以找到;servletjar包不要添加到工程的WebContent\WEB-INF\lib路径下,因为我们部署到Tomcat的时候不需要这个包,只是在开发的时候需要,可以新建一个文件夹存放,然后buildpath中。

(2)编写消息生产者类

    编写两个消息生产者类,一个发送到Queue,另一个发送到Topic。例子中使用SpringJMS模板类进行消息的发送。

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class QueueMessageProducer {

	private JmsTemplate jmsTemplate;
	
	private Destination defaultDestination;
	
	public void sendMsg(final String message) {
		MessageCreator creator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		};
		jmsTemplate.send(defaultDestination, creator);
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public void setDefaultDestination(Destination defaultDestination) {
		this.defaultDestination = defaultDestination;
	}
}
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class TopicMessageProducer {

	private JmsTemplate jmsTemplate;
	
	private Destination defaultDestination;
	
	public void sendMsg(final String message) {
		MessageCreator creator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		};
		jmsTemplate.send(defaultDestination, creator);
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void setDefaultDestination(Destination defaultDestination) {
		this.defaultDestination = defaultDestination;
	}
}

 (3)编写Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans					http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
						http://activemq.apache.org/schema/core
				http://activemq.apache.org/schema/core/activemq-core.xsd">
	
	<!-- 通过JNDI获取ConnectionFactory -->
	<bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" />
	</bean>
	<bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" />
	</bean>
	
	<!-- 通过JNDI获取Destination -->
	<bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestQueue1" />
	</bean>
	<bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestTopic1" />
	</bean>
	
	<!-- 配置JMS模板 -->
	<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="queueConnectionFactory" />
	</bean>
	<bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="topicConnectionFactory" />
	</bean>
	
	<bean id="queueMessageProducer" class="com.zcl.mqproducer.service.QueueMessageProducer">
		<property name="jmsTemplate" ref="queueJmsTemplate" />
		<property name="defaultDestination" ref="testQueue1" />
	</bean>
	<bean id="topicMessageProducer" class="com.zcl.mqproducer.service.TopicMessageProducer">
		<property name="jmsTemplate" ref="topicJmsTemplate" />
		<property name="defaultDestination" ref="testTopic1" />
	</bean>
</beans>

 (4)编写Servlet

    编写一个Servlet类,响应客户端的请求。代码如下:

import java.io.IOException;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import com.zcl.mqproducer.service.QueueMessageProducer;
import com.zcl.mqproducer.service.TopicMessageProducer;

public class MessageServlet extends HttpServlet {

	private static final long serialVersionUID = 3231052945892531021L;

	private QueueMessageProducer queueMsgProducer;
	
	private TopicMessageProducer topicMsgProducer;
	
	public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
		String param = request.getParameter("operate");
		if("queueSend".equals(param)) {
			String message = request.getParameter("message");
			queueMsgProducer.sendMsg(message);
			response.sendRedirect("index.jsp");
		} else if("topicSend".equals(param)) {
			String message = request.getParameter("message");
			topicMsgProducer.sendMsg(message);
			response.sendRedirect("index.jsp");
		} else {
			response.sendRedirect("index.jsp");
		}
	}
	
	public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
		doGet(request, response);
	}
	
	public void init() {
		ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());
		queueMsgProducer = (QueueMessageProducer) context.getBean("queueMessageProducer");
		topicMsgProducer = (TopicMessageProducer) context.getBean("topicMessageProducer");
	}
}

 (5)编写JSP文件和web.xml文件

编写首页:index.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>欢迎页面</title>
	</head>
	<body>
		<a href="sendQueueMsg.jsp">发送Queue消息</a><br><br>
		<a href="sendTopicMsg.jsp">发送Topic消息</a><br>
	</body>
</html>

 

 编写发送Queue消息的页面:sendQueueMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>发送Queue消息</title>
	</head>
	<body>
		<form method="POST" action="message?operate=queueSend">
			<table>
				<tr>
					<td><textarea style="width:95%;" name="message"></textarea></td>
				</tr>
				<tr>
					<td><input type="submit" value="发送Queue消息" /></td>
				</tr>
			</table>
		</form>
	</body>
</html>

 

 编写发送Topic消息的页面:sendTopicMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>发送Topic消息</title>
	</head>
	<body>
		<form method="POST" action="message?operate=topicSend">
			<table>
				<tr>
					<td><textarea style="width:95%;" name="message"></textarea></td>
				</tr>
				<tr>
					<td><input type="submit" value="发送Topic消息" /></td>
				</tr>
			</table>
		</form>
	</body>
</html>

 

 编写web.xml文件,加载Spring配置文件和Servlet类。

 

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
  <display-name>mqproducer</display-name>
  <!-- 加载spring配置文件 -->
	<context-param>
		<param-name>contextConfigLocation</param-name>
		<param-value>classpath:/spring/applicationContext.xml</param-value>
	</context-param>
	
	 <servlet>
  		<servlet-name>messageServlet</servlet-name>
  		<servlet-class>com.zcl.mqproducer.servlet.MessageServlet</servlet-class>
  	</servlet>
  	<servlet-mapping>
  		<servlet-name>messageServlet</servlet-name>
  		<url-pattern>/message</url-pattern>
  	</servlet-mapping>
	
	<!-- spring上下文监听器 -->
	<listener>
		<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
	</listener>
  
  	<welcome-file-list>
    	<welcome-file>index.jsp</welcome-file>
  	</welcome-file-list>
</web-app>

 

2.编写消息消费者应用

(1)添加依赖包

    和消息生产者的依赖包相同。

(2)编写消息消费者类

    编写两个消息消费者类,一个消费Queue消息,一个消费Topic消息。消费者类实现MessageListener接口。

 

import java.util.ArrayList;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueMessageConsumer implements MessageListener {

	private static List<String> queueMsgs = new ArrayList<String>();
	
	public void onMessage(Message message) {
		if(message instanceof TextMessage) {
			TextMessage text = (TextMessage)message;
			try {
				handleMsg(text);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	public List<String> showAllQueueMsgs() {
		return queueMsgs;
	}
	
	private void handleMsg(TextMessage message) throws JMSException {
		queueMsgs.add(message.getText());
	}
}

 import java.util.ArrayList;

import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TopicMessageConsumer implements MessageListener {
	
	private List<String> topicMsgs = new ArrayList<String>();

	public void onMessage(Message message) {
		if(message instanceof TextMessage) {
			TextMessage text = (TextMessage)message;
			try {
				handleMsg(text);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	public List<String> showAllTopicMsgs() {
		return topicMsgs;
	}
	
	private void handleMsg(TextMessage message) throws JMSException {
		topicMsgs.add(message.getText());
	}
}

 (3)编写Spring配置文件

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://activemq.apache.org/schema/core
			http://activemq.apache.org/schema/core/activemq-core.xsd">
	
	<!-- 通过JNDI获取ConnectionFactory -->
	<bean id="queueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/QueueConnectionFactory" />
	</bean>
	<bean id="topicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TopicConnectionFactory" />
	</bean>
	
	<!-- 通过JNDI获取Destination -->
	<bean id="testQueue1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestQueue1" />
	</bean>
	<bean id="testTopic1" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiName" value="java:comp/env/jms/TestTopic1" />
	</bean>
	
	<bean id="queueMessageConsumer" class="com.zcl.mqconsumer01.service.QueueMessageConsumer" />
	<bean id="topicMessageConsumer" class="com.zcl.mqconsumer01.service.TopicMessageConsumer" />
	
	<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="queueConnectionFactory" />
		<property name="destination" ref="testQueue1" />
		<property name="messageListener" ref="queueMessageConsumer" />
	</bean>
	<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicConnectionFactory" />
		<property name="destination" ref="testTopic1" />
		<property name="messageListener" ref="topicMessageConsumer" />
	</bean>
</beans>

 

 说明:上面配置了ListenerContainer,这是一个消息监听器,从指定的QueueTopic监听消息,自动接收消息,也可以使用JmsTemplate主动从QueueTopic获取消息。

(4)编写Servlet

    编写一个Servlet类,响应客户端的请求。代码如下:

 

import java.io.IOException;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

import com.zcl.mqconsumer01.service.QueueMessageConsumer;
import com.zcl.mqconsumer01.service.TopicMessageConsumer;

public class MessageServlet extends HttpServlet {

	private static final long serialVersionUID = 1266665483325197008L;
	
	private QueueMessageConsumer queueMsgConsumer;
	
	private TopicMessageConsumer topicMsgConsumer;

	public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
		String param = request.getParameter("operate");
		if("queueReceive".equals(param)) {
			List<String> queueMsgs = queueMsgConsumer.showAllQueueMsgs();
			request.setAttribute("queueMsgs", queueMsgs);
			getServletContext().getRequestDispatcher("/receiveQueueMsg.jsp").forward(request, response);
		} else if("topicReceive".equals(param)) {
			List<String> topicMsgs = topicMsgConsumer.showAllTopicMsgs();
			request.setAttribute("topicMsgs", topicMsgs);
			getServletContext().getRequestDispatcher("/receiveTopicMsg.jsp").forward(request, response);
		} else {
			response.sendRedirect("index.jsp");
		}
	}
	
	public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
		doGet(request, response);
	}
	
	public void init() {
		ApplicationContext context = WebApplicationContextUtils.getRequiredWebApplicationContext(getServletContext());
		queueMsgConsumer = (QueueMessageConsumer) context.getBean("queueMessageConsumer");
		topicMsgConsumer = (TopicMessageConsumer) context.getBean("topicMessageConsumer");
	}
}

 (5)编写JSP文件和web.xml文件

编写首页:index.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>欢迎页面</title>
	</head>
	<body>
		<a href="message?operate=queueReceive">查看Queue接收消息</a><br><br>
		<a href="message?operate=topicReceive">查看Topic接收消息</a><br>
	</body>
</html>

 

 编写查看Queue消息界面:receiveQueueMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>Queue消息列表</title>
	</head>
	<body>
		<table>
			<thead>
				<tr>
					<td>Queue消息列表</td>
				</tr>
			</thead>
			<tbody>
				<c:forEach var="message" items="${queueMsgs}">
				<tr>
					<td>${message}</td>
				</tr>
				</c:forEach>
			</tbody>
		</table>
	</body>
</html>

 

 编写查看Topic消息界面:receiveTopicMsg.jsp

 

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
	<head>
		<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
		<title>Topic消息列表</title>
	</head>
	<body>
		<table>
			<thead>
				<tr>
					<td>Topic消息列表</td>
				</tr>
			</thead>
			<tbody>
				<c:forEach var="message" items="${topicMsgs}">
				<tr>
					<td>${message}</td>
				</tr>
				</c:forEach>
			</tbody>
		</table>
	</body>
</html>

 

         web.xml和消息生产者的工程一样。

3.部署应用

    创建多一个消息消费者工程,内容和以上的消息消费者一样。把三个工程分包部署到三个Tomcat。启动ActiveMQ服务,再分别启动三个Tomcat,分别发送Queue消息和Topic消息,Queue消息一次只能有一个消费者,Topic消息是发送给所有的订阅者。

四、在应用中设置JMS管理对象

 

在上面的例子中,我们使用中间件服务器Tomcat配置JMS管理对象ConnectionFactoryDestination,我们也可以在应用配置JMS的管理对象,在Spring的配置文件配置。在JDBC中,针对数据库操作的重要资源Connection我们采用数据库连接池,以提高性能。同样的,在ActiveMQ中,针对SessionConnection,也有池的实现组件--activemq-pool,下面我们就使用activemq-pool来管理ConnectionSession对象。

 

    先注释之前在${TOMCAT_HOME}/conf/context.xml中配置的JMS管理对象,在应用的Spring配置文件中注释ConnectionFactoryDestination的配置,添加以下内容:

 

<bean id="queuePoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>
<bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>
	
<bean id="testQueue1" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="testQueue1" />
</bean>
<bean id="testTopic1" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="testTopic1" />
</bean>

 

更改JmsTemplateListenerContainer所引用的ConnectionFactory。到${ACTIVEMQ_HOME}/lib/optional目录下复制两个jaractivemq-pool-5.6.0.jarcommons-pool-1.5.6.jar到应用的WebContent\WEB-INF\lib路径下。重启三个Tomcat就可以了。

五、关于消息的持久化

 

ActiveMQ的消息默认是持久化的,默认采用内存数据库kahaDB。生产一个Queue消息,如果被消费者消费了,就会从数据库中删除,如果所有的消费者尚未激活,该消息则一直保存在数据库中,直到有任意一个消费者激活,ActiveMQ就会将该消息推送给该消费者。而生产一个Topic消息,所有连接到该Topic的订阅者都将收到该消息,如果当前没有连接到该Topic的订阅者,则该消息会丢失,就算后面有订阅者重新连接到该Topic也不会收到该消息,也就是说订阅者只能消费连接到Topic后的消息,而之前发送的Topic消息则不能消费。有时候为了保证订阅者因失效而重新连接该Topic后也能消费之前发送的消息,我们需要对Topic消息的订阅者做一些处理,后面会谈到。

打开${ACTIVEMQ_HOME}/conf/activemq.xml文件,找到:

<persistenceAdapter>

            <kahaDB directory="${activemq.data}/kahadb"/>

 

</persistenceAdapter>

 

说明ActiveMQ默认是采用kahaDB进行持久化的,我们也可以关闭消息的持久化,在<broker>节点处添加:

 

Persistent=”false”

我们把ActiveMQ的持久化介质改为mysql,需要在${ACTIVEMQ_HOME}/conf/activemq.xml文件的<broker>节点之外添加(可以参考${ACTIVEMQ_HOME}/conf/activemq-jdbc.xml文件):

 

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<property name="url" value="jdbc:mysql://localhost:3306/activemq_jdbc?relaxAutoCommit=true"/>
		<property name="username" value="root"/>
		<property name="password" value="root"/>
		<property name="maxActive" value="200"/>
		<property name="poolPreparedStatements" value="true"/>
</bean>

 这里使用的是dbcp数据源,也可以更改其它的。然后注释掉:

 

<persistenceAdapter>

            <kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

添加jdbc持久化适配器:

 

<persistenceAdapter>
     <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data/activemq_jdbc/kahadb" dataSource="#mysql-ds" />
</persistenceAdapter>

 

添加mysql驱动包和dbcp的依赖包到${ACTIVEMQ_HOME}/lib目录下,在mysql中创建一个数据库,如:activemq_jdbc,重新启动ActiveMQ服务,就会在mysql数据库中生成三张表,如下:

 

activemq_acksActiveMQ的签收信息;

activemq_lock:ActiveMQ的锁信息;

activemq_msgs:ActiveMQ的消息的信息。

 

重启消息生产者的Tomcat,关闭所有消息消费者的Tomcat,利用消息生成者发送几个消息,然后查看activemq_msgs表的数据,就会看到发送的消息,如下:

 

 

在订阅者尚未连接到发布者之前,发布者之前发布的消息不能被后来重新激活连接的订阅者消费,订阅者只能消费激活连接后发布者发布的消息。但我们可以改变这种情况,获得更高的消息可靠性。

 

   我们需要的订阅者端设置一个客户端ID(只能针对Topic消息),如(红色部分):

 

<bean id="topicPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="failover:(tcp://localhost:61616)?initialReconnectDelay=100" />
				<property name="useAsyncSend" value="true" />
				<property name="clientID" value="consumer_02" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
</bean>

 

 然后在监听容器中设置一些属性,如下(红色部分):

 

<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicPoolConnectionFactory" />
		<property name="destination" ref="testTopic1" />
		<property name="messageListener" ref="topicMessageConsumer" />
		<!-- 开启发布订阅模式 -->
		<property name="pubSubDomain" value="true" />
		<!-- 开启订阅持久化 -->
		<property name="subscriptionDurable" value="true"/>
		<property name="clientId" value="consumer_02" />
		<property name="durableSubscriptionName" value="consumer_02" />
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

 

 重启订阅者的Tomcat进行对比测试,就能看见效果。

六、关于消息的安全配置

 

以上的JMS应用都是不加入安全机制的,任何连入Internet的人,只要知道消息服务的具体地址(包括IP、端口、消息地址),就可以随意地发送和接收消息,这会造成非常严重的后果。

 

        ActiveMQ支持可插拔的安全机制,只要在${ACTIVEMQ_HOME}/conf/activemq.xml配置即可。以下采用JAAS的安全机制去配置ActiveMQ的验证与授权。在activemq.xmlbroker节点中添加如下内容:

 

<!-- 添加认证与授权 -->
<plugins>
	<!-- 采用JAAS的管理机制配置角色权限 -->
	<jaasAuthenticationPlugin configuration="activemq-domain" />
	<authorizationPlugin>
		<map>
			<authorizationMap>
				<authorizationEntries>
					<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
					<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
					<authorizationEntry queue="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" />
					<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users" />
					<authorizationEntry queue="testQueue1" read="guests" write=" users " admin="users,guests" />
					<authorizationEntry topic="testTopic1" read="guests" write="users" admin="users,guests" />
				</authorizationEntries>
			</authorizationMap>
		</map>
	</authorizationPlugin>
</plugins>

 

 说明:read,write,admin(里面的内容表示都是用户组)表示的意思:

 

“>”是通配符,代表所有;

ActiveMQ.Advisory.>ActiveMQ默认创建的Destination(包括QueueTopic);

 

配置中用到了三个用户组:admins,users,guests

    在${ACTIVEMQ_HOME}/conf目录下创建一个login.config文件,如下:

 

activemq-domain {
	org.apache.activemq.jaas.PropertiesLoginModule required
	debug=true
	org.apache.activemq.jaas.properties.user="users.properties"
	org.apache.activemq.jaas.properties.group="groups.properties";
};

 

      ${ACTIVEMQ_HOME}/conf目录下创建一个users.properties文件,用于存放用户和密码,如:

 

admin=admin
keven=keven
zyq=zyq

 

   在${ACTIVEMQ_HOME}/conf目录下创建一个groups.properties文件,用于存放用户组和对应的用户,如:

 

admins=admin
users=keven
guests=zyq

 

修改Spring配置文件,在<bean id=”connectionFactory”…>中设置访问的用户名和密码,如:

 

<property name="userName" value="zyq" />
<property name="password" value="zyq" />

 这里设置的访问用户要和activemq.xml文件中的配置对应,如果设置不正确,在启动Tomcat后发送消息时会在ActiveMQ的控制台看到提示信息。

 

 

 

 

  • 大小: 8.3 KB
  • 大小: 28.8 KB
  • 大小: 61.8 KB
  • 大小: 14.6 KB
  • 大小: 53.6 KB
  • 大小: 21.5 KB
分享到:
评论

相关推荐

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    **ActiveMQ实践入门指南** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性和易于管理的特点,在企业级应用中备受青睐。...

    ActiveMQ实践入门指南

    这一过程虽然简单,但却是理解ActiveMQ工作原理的基础。随着对ActiveMQ的深入了解,用户可以逐步探索其高级功能,如消息过滤、事务管理、集群配置等,以满足更复杂的应用需求。 总之,ActiveMQ不仅是一款功能强大的...

    ActiveMQ与REST API实践

    **ActiveMQ与REST API实践** ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息中间件标准——Java Message Service(JMS)。ActiveMQ以其高性能、稳定性和丰富的特性在分布式系统中广泛使用,...

    ActiveMQ基础入门

    **ActiveMQ基础入门** Apache ActiveMQ是开源的、基于Java的消息中间件,它遵循JMS(Java消息服务)规范,提供了高效、可靠的异步通信能力。在本文中,我们将深入探讨ActiveMQ的基础知识,帮助你快速上手。 1. **...

    activemq_demo,activeMQ的简单demo

    【标题】"activemq_demo,activeMQ的简单demo"涉及的是Apache ActiveMQ的实践应用,这是一款开源的消息中间件,广泛应用于分布式系统中的异步通信。ActiveMQ提供了多种消息协议的支持,包括开放消息中间件协议...

    分享一些ActiveMQ的资料

    "ActiveMQ in Action"是一本权威的ActiveMQ专著,深入探讨了ActiveMQ的高级特性,如集群、持久化、网络连接等,适合已经有一定基础并希望深入了解的开发者。 通过这些资料,用户可以系统性地学习和掌握ActiveMQ,...

    ActiveMQ In Action及其源码

    1. ActiveMQ基础:介绍ActiveMQ的基本架构和核心概念。 2. 安装与配置:讲解如何在不同的操作系统上部署和配置ActiveMQ。 3. 消息模型:详细解释点对点(Queue)和发布/订阅(Topic)两种消息模型。 4. 高级特性:...

    ActiveMQ讲义.ppt

    **ActiveMQ深度解析** ...通过本讲义的学习,读者将能够全面了解ActiveMQ的基本原理和操作,为在实际项目中应用ActiveMQ打下坚实的基础。在实践中,不断探索和优化,才能充分发挥ActiveMQ在复杂分布式系统中的价值。

    Apache ActiveMQ Artemis.pdf

    整体而言,ActiveMQ Artemis用户手册是一份全面的文档,旨在指导开发者如何使用ActiveMQ Artemis进行服务器端编程,涵盖了从基础概念、架构理解、配置使用到性能调优和安全配置等众多方面。开发者可以通过这份手册...

    activemq5.5.1 Spring模板

    一、ActiveMQ基础介绍 ActiveMQ是Apache软件基金会下的一个项目,遵循JMS(Java消息服务)规范,支持多种协议如OpenWire、STOMP、AMQP等,能与各种编程语言无缝对接。ActiveMQ具备高可用性、高性能和可伸缩性,支持...

    activemq demo

    在这个“guo-mq02”压缩包中,包含了实现这些功能的源代码,通过分析和运行这些代码,你将深入理解ActiveMQ的工作原理和使用方式,为实际项目中的应用打下坚实基础。对于初学者来说,这是一个绝佳的学习资源,不仅...

    activeMQ视频

    通过这节视频教程,你将掌握ActiveMQ的基础知识,为进一步探索其高级特性,如消息过滤、消息组、消息分发策略等奠定基础。在实际项目中,ActiveMQ可以帮助你构建解耦、可靠的分布式系统,提升系统的稳定性和可扩展性...

    activemq_basic.rar

    总的来说,"activemq_basic.rar"是初学者学习ActiveMQ的宝贵资源,它通过实际操作帮助你掌握消息中间件的基础知识和实践技巧,对于理解分布式系统中的消息传递和解耦具有重要意义。通过深入学习和实践,你将能够熟练...

    【BAT必备】activeMQ面试题

    #### 一、ActiveMQ基础概念 **1.1 什么是ActiveMQ?** ActiveMQ是Apache出品的一款开源的消息中间件,它实现了JMS(Java消息服务)1.1和J2EE 1.4规范,完全支持JMS API,允许任何JMS客户端与ActiveMQ服务器进行...

    activeMQ学习资料

    1. **ActiveMQ基础概念** - **消息队列**:ActiveMQ的核心是消息队列,它允许应用程序之间通过发送和接收消息进行通信,解耦了生产者和消费者。 - **JMS接口**:ActiveMQ实现了JMS接口,提供了消息生产者、消费者...

    linux-apache-activemq-5.15.3和 linux-jdk1.8

    了解这些基础知识后,开发者可以进一步学习如何创建和管理队列、主题,设置消费者和生产者,以及利用ActiveMQ提供的高级特性如虚拟主题、网络连接和代理集群等。同时,掌握JDK 1.8的新特性和最佳实践,能帮助编写更...

    activemq入门实例,有源代码

    Apache ActiveMQ是一个开源的消息中间件,它遵循Java Message Service (JMS) 规范,用于在分布式系统中传输消息。ActiveMQ提供了高可靠性和高性能的...记得结合`activemq.doc`文档逐步实践,以便更好地掌握这些知识。

    JMS 使用 ActiveMQ 传送文件

    **标题:“JMS 使用 ActiveMQ 传送文件”** 在IT领域,Java消息服务(Java ...通过这些知识点的学习和实践,开发者可以掌握使用JMS和ActiveMQ进行文件传输的核心技能,为构建可靠的、分布式的应用打下坚实的基础。

    activeMQ初学使用demo

    ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的消息传递...实践是检验真理的唯一标准,动手操作这个DEMO,你将更好地理解和掌握ActiveMQ的魅力。

    ActiveMQ in Action

    1. ActiveMQ的基础概念:了解ActiveMQ的起源、设计理念及其在企业级应用中的作用。 2. JMS规范:解释了JMS的用途和它如何为不同类型的基于消息的应用程序提供了一个标准的、统一的API。JMS提供了一组公共的接口和...

Global site tag (gtag.js) - Google Analytics