一、JMS的理解
JMS(Java Message Service)是jcp组织02-03年定义了jsr914规范(http://jcp.org/en/jsr/detail?id=914),它定义了消息的格式和消息传递模式;
消息包括:消息头,消息扩展属性和消息体,其结构看起来与SOAP非常的相似,但一般情况下,SOAP主要关注远程服务调用,而消息则专注于信息的交换;
消息分为:消息生产者,消息服务器和消息消费者。生产者与消费者之间是透明的,生产者在产生消息之后,把消息发送到消息服务器,再由消息服务器发给消费者,因此它们构成了JMS的3点结构;
消息服务器再给消费者时,有2种模式:点到点(ptp: point to point)模式和发布/订阅(publish/subscribe)模式;
ptp:即生产者把消息投递到消息服务器后,这条消息只能由某一个消费者使用;
发布/订阅:顾名思义,就是共享消息了,只要愿意,消费者都可以监听消息;
二、消息服务器(ActiveMQ)
消息服务器在JMS的3点结构中起着重要作用,没有它,生产者的消息不知道如何投递出去,消费者不知道从哪里取得消息,它同样是隔离生产者和消费者的关键部分…………
JMS消息服务器有很多:ActiveMQ、Jboss MQ、Open MQ、RabbitMQ、ZeroMQ等等。
本文介绍的是开源的Java实现的Apache ActiveMQ(http://activemq.apache.org),它的特性在首页就能看到,我就不再介绍了;
1、下载AMQ:http://activemq.apache.org/download.html,最新版本是5.5.0;
2、解压apache-activemq-5.5.0-bin.zip文件到文件系统(比如D:\ActiveMQ-5.5.0);
3、执行bin/activemq.bat脚本即可启动AMQ:
INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
......
INFO | Listening for connections at: tcp://SHI-AP33382A:61616
当看到上面的日志输出时,表示AMQ已经启动了;
4、默认情况下,AMQ使用conf/activemq.xml作为配置文件,我们可修改它,然后以 bin/activemq.bat xbean:./conf/my.xml启动AMQ;
三、持久化消息(MySQL)
因为接下来我们修改AMQ的默认配置文件,所以先备份conf/activemq.xml文件;
1、建立MySQL数据库:要使用MySQL存储消息,必须告诉AMQ数据源:
/**
* 创建数据库
*/
CREATE DATABASE misc DEFAULT CHARSET=UTF8;
/**
* 创建用户和授权
*/
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'%' IDENTIFIED BY 'misc_root_pwd';
GRANT ALL PRIVILEGES ON misc.* TO 'misc_root'@'localhost' IDENTIFIED BY 'misc_root_pwd';
通过上面的SQL脚本,我们建立了名为misc的数据库,并且把所有权限都赋予了misc_root的用户;
由于AMQ需要在本数据库中建立数据表,因此用户的权限必须具有建表权限;
2、添加MySQL数据源:默认情况下,AMQ使用KahaDB存储(我对KahaDB不了解),注释到KahaDB的配置方式,改为MySQL的:
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;
3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:
<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&characterEncoding=UTF-8"/>
<property name="username" value="misc_root"/>
<property name="password" value="misc_root_pwd"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;
整个AMQ的配置文件内容为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:/META-INF/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost">
<!--
For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!--
Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS" />
</persistenceAdapter>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" />
</transportConnectors>
</broker>
<!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&characterEncoding=UTF-8" />
<property name="username" value="misc_root" />
<property name="password" value="misc_root_pwd" />
<property name="poolPreparedStatements" value="true" />
</bean>
<!--
Enable web consoles, REST and Ajax APIs and demos
It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
四、查看MySQL数据表
重新启动AMQ,启动完成之后,我们发现,misc数据库多了3张数据表:
mysql> SHOW tables;
+----------------+
| Tables_in_misc |
+----------------+
| activemq_acks |
| activemq_lock |
| activemq_msgs |
+----------------+
数据表activemq_msgs即为持久化消息表;
五、持久化消息
系统启动完毕之后,消息表中内容为空:
mysql> SELECT * FROM activemq_msgs;
Empty set
1、发送消息:打开http://127.0.0.1:8161/demo/页面,找到“Send a message”链接,打开页面(http://127.0.0.1:8161/demo/send.html),填写完表格后,点击“Send”按键,即AMQ投递了一个消息;
2、查看消息:发送之后,我们可以看到数据表中多了一条消息:
mysql> SELECT * FROM activemq_msgs;
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
| ID | CONTAINER | MSGID_PROD | MSGID_SEQ | EXPIRATION | MSG | PRIORITY |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
| 1 | queue://FOO.BAR | ID:SHI-AP33382A-1486-1309840138441-2:2:1:1 | 1 | 0 | | 5 |
+----+-----------------+--------------------------------------------+-----------+------------+-----+----------+
3、取得消息:找到“Receive a message”链接,打开页面(http://127.0.0.1:8161/demo/message/FOO/BAR?readTimeout=10000&type=queue),发现该页面不是一个标准HTML页面,查看其源代码,其内容是不是就是刚才的消息内容?
4、查看消息:消息消费之后,我们可以看到数据表没有消息了:
mysql> SELECT * FROM activemq_msgs;
Empty set
5、我们可以生产多条消息,然后一条一条的消费,发现消息表中的消息一条一条的减少;
6、在发送消息页面,“Destination Type”如果选择“Topic”的话,则消息表中并没有数据,原因在于“Queue”为ptp模式消息,“Topic”为发布/订阅模式消息,当没有订阅者时,消息直接丢掉了。
JMS的内容先介绍到这里,下面我将结合Spring来启动AMQ(即AMQ与应用一同启动,上面介绍的都是单独的启动),通过测试代码来发送和消费消息,敬请期待!
------------------------
欢迎大家批评指正:
http://obullxl.iteye.com
http://www.cnblogs.com/obullxl
http://hi.baidu.com/obullxl
-----------------------
分享到:
相关推荐
标题中的“activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml”指的是Apache ActiveMQ的一个特定版本(5.15.15)配置文件,该配置文件用于实现消息队列的数据持久化,通过JDBC连接MySQL 8.0以上的版本。ActiveMQ...
标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...
总结来说,"activemq-5.15+mysqljdbc配置.zip"提供了ActiveMQ的一个定制化版本,它集成了MySQL数据库作为持久化存储,并使用Durid作为连接池。这种配置适用于那些需要强大数据持久化和恢复能力的场景,但也需要对...
在分布式系统中,消息持久化是指当消息代理(如ActiveMQ)接收到消息后,会将其存储到磁盘中,即使服务器重启或出现故障,也能保证这些消息不会丢失。这在高可用性和容错性方面扮演着关键角色。 要实现ActiveMQ的...
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持使用JDBC进行持久化,确保在故障恢复时数据的完整性。 总之,`activemq.xml`配置文件是管理ActiveMQ核心行为的核心,通过细致地...
2. **持久化到数据库**:为了提高可靠性,ActiveMQ 还可以配置为将消息存储在关系型数据库(如 MySQL 或 PostgreSQL)中。同样,这需要在 `activemq.xml` 配置文件中进行设置,包括连接数据库的详细信息以及具体的...
2. **持久化**:ActiveMQ提供了强大的持久化机制,即使在服务器宕机后,也能保证消息不丢失。它支持基于文件系统和关系数据库(如MySQL、PostgreSQL)的持久化策略。 3. **协议支持**:除了JMS,ActiveMQ还支持...
- **持久化**:支持文件系统和关系数据库(如MySQL、PostgreSQL)等多种持久化机制,确保消息在系统重启后仍可恢复。 - **多协议支持**:除了JMS,还支持STOMP、AMQP、MQTT和WebSockets等多种协议,方便不同平台和...
- 在这个项目中,Eclipselink用于将聊天记录等数据持久化到MySQL数据库中。 3. **JPA (Java Persistence API)**: - JPA是Java标准,定义了如何在Java应用中管理和持久化对象。 - 它提供了一套API来映射Java类到...
Apache ActiveMQ 是一款非常流行的开源消息中间件,它支持 Java 消息服务 (JMS) 标准,并提供了多种高级功能,例如持久化、集群、故障转移等。ActiveMQ 能够帮助开发者实现解耦、可靠的消息传输以及高性能的应用程序...
还有可能是数据库连接驱动,如`mysql-connector-java.jar`,因为ActiveMQ可以使用关系型数据库来持久化消息,例如MySQL。 在实际应用中,这些jar文件会被添加到项目的类路径中,使得开发人员能够构建基于ActiveMQ的...
这个JAR文件扮演着关键角色,使得ActiveMQ能够利用数据库来持久化消息,确保在系统故障后数据的恢复。 ActiveMQ Store JDBC模块的核心功能在于将消息存储在数据库中,而不是默认的文件系统。这样做的好处包括: 1....
在 ActiveMQ 的众多存储机制中,JDBC 存储是一个重要的组成部分,它允许使用关系数据库来持久化消息。`activemq-store-jdbc-1.4.jar` 是这个存储模块的特定版本,专用于与 JDBC 驱动程序交互,从而将消息数据存储在...
具体改进内容通常可以在发布的变更日志中找到,但在这里我们主要关注其核心功能:通过JDBC接口与数据库进行交互,实现消息的持久化。 三、JDBC存储的优势 1. **数据一致性**:JDBC存储利用了数据库的事务特性,...
此文件名表明该压缩包用于在ActiveMQ中集成JDBM数据库作为持久化存储。 ActiveMQ作为消息中间件,主要职责是接收、存储和转发消息,确保消息的可靠传输。它支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,并且...
在IT行业中,Spring Boot、MyBatis和ActiveMQ是三个非常重要的组件,分别用于简化Spring应用的初始化,处理持久化操作以及实现消息队列。本文将深入探讨如何将这三个技术整合到一个项目中,以便构建高效、可扩展的...
MySQL作为持久化存储,保证数据安全;而ActiveMQ则作为消息总线,协调各个服务之间的通信。SpringBoot的优雅在于它可以将这些复杂的组件无缝地整合在一起,让开发者可以专注于业务逻辑,而不是底层的集成细节。 ...
实验六至实验十进一步涵盖了企业级服务和框架的使用,如EJB(Enterprise JavaBeans)提供组件化开发,JMS实现异步通信,Struts和Hibernate分别处理MVC(Model-View-Controller)架构和持久化,而Spring则整合了这些...