`
longgangbai
  • 浏览: 7339001 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ的各种表SQL的管理

 
阅读更多

      ActiveMQ为了方便的切换数据库,更为了深入了解ActiveMQSQL语句的详细的信息,可以通过Statements获取各种SQL语句。在ActivMQ第一次加载的时候,通过Statements生产响应的Spring Bean,加载到内存中。在ActiveMQ的管理和监控的时候,从内容获取相关的SQL语句,简化了ActiveMQ的消息的管理和监控。

 

package com.easyway.activemq.proxy;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.regex.Pattern;

import org.apache.activemq.store.jdbc.Statements;

/**
 * 获取ActiveMQ中SQL相关的各种语句
 * @author longgangbai
 *
 */
public class ActiveMQSQLManager {
	 /**
	  * 
	  * @param statement
	  * @return
	  */
	 public static String returnStatement(Object statement){
	    	return ((String)statement).replace("<", "&lt;").replace(">", "&gt;");
	    	
	    }
		/**
		 * 
		 * @param args
		 */
		public static void main(String[] args) throws Exception{
	    	createSQLForActiveMQ();
		}
		/**
		 * 创建ActiveMQ中语句
		 * @throws IllegalAccessException
		 * @throws InvocationTargetException
		 */
		private static void createSQLForActiveMQ() throws IllegalAccessException, InvocationTargetException {
			Statements s=new Statements();
	    	//设置表的前缀
	    	s.setTablePrefix("ACTIVEMQ.");
	    	//获取Statements中创建的语句的集合
	    	String[] stats=s.getCreateSchemaStatements();
	    	//构建Statements的Bean
	    	System.out.println("<bean id=\"statements\" class=\"org.apache.activemq.store.jdbc.Statements\">");
	    	createSQLForTableIndex(stats);
	    	
	    	createSQLForActiveMQBussine(s);
	    	//end of generating because of typo
	    	createSQLOfDropTable(s);
	    	System.out.println("</bean>");
		}
		/**
		 * 创建ActiveMQ监控和管理使用的SQL语句
		 * @param s
		 * @throws IllegalAccessException
		 * @throws InvocationTargetException
		 */
		private static void createSQLForActiveMQBussine(Statements s) throws IllegalAccessException, InvocationTargetException {
			//获取Statements的方法
	    	Method[] methods=Statements.class.getMethods();
	    	//
	    	Pattern sPattern= Pattern.compile("get.*Statement$");
	    	Pattern setPattern= Pattern.compile("set.*Statement$");
	    	//获取存储Set方法
	    	ArrayList<String> setMethods=new ArrayList<String>();
	    	for(int i=0; i<methods.length;i++){
	    		if(setPattern.matcher(methods[i].getName()).find()){
	    			setMethods.add(methods[i].getName());
	    		}
	    	}
	    	//
	    	for(int i=0; i<methods.length;i++){
	    		if(sPattern.matcher(methods[i].getName()).find()&&setMethods.contains(methods[i].getName().replace("get","set"))){
	    			System.out.println("<property name=\""+methods[i].getName().substring(3,4).toLowerCase()+methods[i].getName().substring(4)+"\" value=\""+returnStatement(methods[i].invoke(s, (Object[])null))+"\" />");
	    		}
	    	}
	    	//for a typo is not needed if removeMessageStatment typo is corrected
	    	//获取get的方法
	    	Pattern sPattern2= Pattern.compile("get.*Statment$");
	    	for(int i=0; i<methods.length;i++){
	    		if(sPattern2.matcher(methods[i].getName()).find()){
	    			System.out.println("<property name=\""+methods[i].getName().substring(3,4).toLowerCase()+methods[i].getName().substring(4)+"\" value=\""+returnStatement(methods[i].invoke(s, (Object[])null))+"\" />");
	    		}
	    	}
		}
		/**
		 * 创建表的创建语句和索引语句
		 * @param stats
		 */
		private static void createSQLForTableIndex(String[] stats) {
			//获取创建表和索引的时候的Schema语句
	    	System.out.println("<property name=\"createSchemaStatements\">");
	    	System.out.println("<list>");
	    	for(int i=0; i<stats.length;i++){
	    		System.out.println("<value>"+stats[i]+"</value>");
	    	}
	    	System.out.println("</list>");
	    	System.out.println("</property>");
		}
		/**
		 * 获取删除语句
		 * @param s
		 */
		private static void createSQLOfDropTable(Statements s) {
			//构建删除语句
	    	String[] statsDrop=s.getDropSchemaStatements();
	    	System.out.println("<property name=\"dropSchemaStatements\">");
	    	System.out.println("<list>");
	    	for(int i=0; i<statsDrop.length;i++){
	    		System.out.println("<value>"+statsDrop[i]+"</value>");
	    	}
	    	System.out.println("</list>");
	    	System.out.println("</property>");
		}
}

 

 

生成的spring bean如下:

<bean id="statements" class="org.apache.activemq.store.jdbc.Statements">
<property name="createSchemaStatements">
<list>
<value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_MSGS(ID BIGINT NOT NULL, CONTAINER VARCHAR(250), MSGID_PROD VARCHAR(250), MSGID_SEQ BIGINT, EXPIRATION BIGINT, MSG BLOB, PRIMARY KEY ( ID ) )</value>
<value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_MIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (MSGID_PROD,MSGID_SEQ)</value>
<value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_CIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (CONTAINER)</value>
<value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_EIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (EXPIRATION)</value>
<value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER VARCHAR(250) NOT NULL, SUB_DEST VARCHAR(250), CLIENT_ID VARCHAR(250) NOT NULL, SUB_NAME VARCHAR(250) NOT NULL, SELECTOR VARCHAR(250), LAST_ACKED_ID BIGINT, PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))</value>
<value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_LOCK( ID BIGINT NOT NULL, TIME BIGINT, BROKER_NAME VARCHAR(250), PRIMARY KEY (ID) )</value>
<value>INSERT INTO ACTIVEMQ.ACTIVEMQ_LOCK(ID) VALUES (1)</value>
<value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_MSGS ADD PRIORITY BIGINT</value>
<value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_PIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (PRIORITY)</value>
<value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS ADD PRIORITY BIGINT DEFAULT 5 NOT NULL</value>
<value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS DROP PRIMARY KEY</value>
<value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)</value>
</list>
</property>
<property name="addMessageStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_MSGS(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)" />
<property name="updateMessageStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_MSGS SET MSG=? WHERE ID=?" />
<property name="findMessageSequenceIdStatement" value="SELECT ID, PRIORITY FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?" />
<property name="findMessageStatement" value="SELECT MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=? AND MSGID_SEQ=?" />
<property name="findMessageByIdStatement" value="SELECT MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE ID=?" />
<property name="findAllMessagesStatement" value="SELECT ID, MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=? ORDER BY ID" />
<property name="findLastSequenceIdInMsgsStatement" value="SELECT MAX(ID) FROM ACTIVEMQ.ACTIVEMQ_MSGS" />
<property name="lastProducerSequenceIdStatement" value="SELECT MAX(MSGID_SEQ) FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=?" />
<property name="findLastSequenceIdInAcksStatement" value="SELECT MAX(LAST_ACKED_ID) FROM ACTIVEMQ.ACTIVEMQ_ACKS" />
<property name="createDurableSubStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) VALUES (?, ?, ?, ?, ?, ?, ?)" />
<property name="findDurableSubStatement" value="SELECT SELECTOR, SUB_DEST FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />
<property name="findAllDurableSubsStatement" value="SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND PRIORITY=0" />
<property name="updateLastPriorityAckRowOfDurableSubStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_ACKS SET LAST_ACKED_ID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?" />
<property name="deleteSubscriptionStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />
<property name="findAllDurableSubMessagesStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; D.LAST_ACKED_ID ORDER BY M.PRIORITY DESC, M.ID" />
<property name="findDurableSubMessagesStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; D.LAST_ACKED_ID AND M.ID &gt; ? ORDER BY M.ID" />
<property name="nextDurableSubscriberMessageStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; ? ORDER BY M.ID " />
<property name="durableSubscriberMessageCountStatement" value="SELECT COUNT(*) FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER      AND M.ID &gt;          ( SELECT LAST_ACKED_ID FROM ACTIVEMQ.ACTIVEMQ_ACKS           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID           AND SUB_NAME=D.SUB_NAME )" />
<property name="findAllDestinationsStatement" value="SELECT DISTINCT CONTAINER FROM ACTIVEMQ.ACTIVEMQ_ACKS" />
<property name="removeAllMessagesStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=?" />
<property name="removeAllSubscriptionsStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=?" />
<property name="deleteOldMessagesStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE ( EXPIRATION&lt;&gt;0 AND EXPIRATION&lt;?) OR (ID &lt;=      ( SELECT min(ACTIVEMQ.ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE ACTIVEMQ.ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ.ACTIVEMQ_MSGS.CONTAINER )   )" />
<property name="lockCreateStatement" value="SELECT * FROM ACTIVEMQ.ACTIVEMQ_LOCK FOR UPDATE" />
<property name="lockUpdateStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_LOCK SET TIME = ? WHERE ID = 1" />
<property name="destinationMessageCountStatement" value="SELECT COUNT(*) FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=?" />
<property name="findNextMessagesStatement" value="SELECT ID, MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=? AND ID &gt; ? ORDER BY ID" />
<property name="lastAckedDurableSubscriberMessageStatement" value="SELECT MAX(LAST_ACKED_ID) FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />
<property name="selectDurablePriorityAckStatement" value="SELECT LAST_ACKED_ID FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY = ?" />
<property name="insertDurablePriorityAckStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY) VALUES (?, ?, ?, ?)" />
<property name="updateDurableLastAckStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_ACKS SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />
<property name="dropSchemaStatements">
<list>
<value>DROP TABLE ACTIVEMQ.ACTIVEMQ_ACKS</value>
<value>DROP TABLE ACTIVEMQ.ACTIVEMQ_MSGS</value>
<value>DROP TABLE ACTIVEMQ.ACTIVEMQ_LOCK</value>
</list>
</property>
</bean>

 

分享到:
评论
1 楼 hibluse 2013-01-06  
你好,我想陪着在mq中配置statements改变MSGS的类型为varchar。请问如何配置。还有。这样配置会不会提高数据库的持久化的性能。目前数据库持久化性能很低。不知什么原因。谢谢

相关推荐

    宜立方商城(淘淘商城) 包含 ActiveMQ dubbo SQL文件

    商城中的各种业务服务,例如商品展示、用户管理、交易处理等,都可通过dubbo框架提供的接口进行透明通信,使得系统维护和功能迭代更加高效和安全。在淘淘商城的架构设计中,dubbo不仅保障了服务之间的高效协作,还...

    activemq activeMq笔记

    通过编辑此文件可以修改 ActiveMQ 的各种配置选项,例如 TCP 端口、持久化策略等。 #### 配置示例 - **修改TCP端口**:如果需要修改 ActiveMQ 的默认监听端口(61616),可以在 `activemq.xml` 文件中找到 `...

    activeMQ+spring整合

    接着,编写Mapper接口和对应的XML映射文件,就可以在Spring管理的Bean中直接调用SQL语句了。 ```xml ``` 通过这样的整合,我们可以构建出一个高效、可扩展的企业级应用,利用ActiveMQ进行消息传递,...

    ActiveMQ5.12.1 安装与配置.docx

    - ActiveMQ提供了一个基于Web的管理控制台,其默认运行在`8161`端口上。可以通过浏览器访问`http://服务器IP地址:8161`来登录控制台。 ```bash http://192.168.119.128:8161 ``` - 登录页面会要求输入用户名和...

    Spring Boot整合ActiveMQ

    Spring Boot 整合 ActiveMQ 的过程涉及到多个技术栈的集成,包括前端模板引擎Thymeleaf、数据库连接池Druid、事务管理以及消息队列的使用。以下将详细阐述这些知识点。 1. **Spring Boot**: Spring Boot 是由 ...

    apache-activemq-5.15.0-bin

    - **消息过滤**:通过主题订阅者可以使用SQL92标准的过滤表达式来筛选接收的消息。 3. **安装Apache ActiveMQ 5.15.0 on Windows** - 下载:首先,从Apache官方网站下载`apache-activemq-5.15.0-bin.zip`压缩包,...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    描述中提到的“activemq.xml”是ActiveMQ的配置文件,用于设置服务器的各种参数,包括数据存储方式。在这个例子中,配置文件包含了如何使用JDBC Persistence Adapter来保存和恢复消息到MySQL数据库的信息。 **JDBC...

    apache-activemq-5.13.0-bin.tar.gz

    2. **支持多种协议**:ActiveMQ不仅支持JMS,还支持OpenWire、STOMP、AMQP、MQTT、WS-MQ等多样化的协议,适应各种应用场景。 3. **持久化机制**:ActiveMQ提供强大的持久化能力,即使在服务器宕机的情况下,也能...

    activeMQ使用JDBC所需要的jar包

    5. **数据库表创建**:首次启动时,ActiveMQ会尝试自动创建必要的数据库表。如果没有自动创建,需要手动执行ActiveMQ提供的SQL脚本来创建。 请注意,JDBC持久化虽然强大,但也存在性能开销,因为它涉及频繁的数据库...

    apache-activemq-5.3.0-bin.zip

    Apache ActiveMQ是世界上最流行的开源消息代理和队列服务器,它基于Java Message Service(JMS)规范,提供可靠的消息传递服务。...同时,通过持续监控和调优,可以确保ActiveMQ在各种复杂的业务环境中稳定运行。

    activemq_master-slave集群安装文档

    3. 创建数据库表,ActiveMQ会使用这些表来存储消息和元数据。这通常通过执行提供的SQL脚本来完成,这些脚本可以在ActiveMQ的源码包中的db目录下找到,针对不同数据库有不同的脚本。 4. 配置Slave Broker(在HOST02...

    apache-activemq-5.11.4-bin.zip

    6. **跨语言支持**:ActiveMQ的多种协议支持使其可以与Java、Python、Ruby、C#等各种语言编写的应用程序无缝集成。 **ActiveMQ 5.11.4的主要改进和特性** 1. **性能提升**:此版本可能包含对内部算法和数据结构的...

    ActiveMQ订阅模式持久化实现

    6. **事务管理**:为了保证消息的可靠传递,ActiveMQ支持事务。在发送或接收消息时,可以包裹在JMS事务中,确保消息要么全部成功,要么全部回滚。 7. **高可用性与故障转移**:使用ActiveMQ的网络连接器(Network ...

    java+ftpServer+derby+ActiveMQ

    Derby基于Java语言,因此它是跨平台的,可以在各种操作系统上运行。它适用于小型应用程序,如嵌入式系统或开发环境,因为它不需要复杂的安装过程,占用资源少,易于管理和使用。Derby支持SQL标准,提供了事务处理、...

    ActiveMQ与Tomcat整合教程.docx

    HibernateHibernate 是一个流行的 Java 对象关系映射(ORM)框架,它简化了数据库操作,使得开发者可以通过 Java 对象来处理数据库事务,而无需直接编写 SQL 语句。Hibernate 提供了数据持久化的功能,能够自动...

    SpringMVC+Hibernate+ActiveMQ+spider

    通过Hibernate,开发者可以用Java对象来操作数据库,而无需编写大量的SQL语句。这不仅降低了开发难度,也使得代码更加灵活,能够适应不同的数据库系统。 ActiveMQ则是Apache出品的一个开源消息中间件,它实现了JMS...

    ActiveMQ+zookeeper实现高可用和负载均衡(代码和测试)

    - **JDBC**:通过数据库(如MySQL、SQL Server、Oracle等)来存储消息。适用于需要跨服务器集群共享数据的场景。 - **LevelDB**:基于索引的数据存储方式,相较于KahaDB性能更优。本案例将采用LevelDB作为数据存储...

    Spring MVC + JPA + MQ + redis +activemq 集成项目实例

    在高并发场景下,ActiveMQ能够有效地管理和调度消息,确保消息的顺序和一致性。 集成这些技术的关键在于配置和协调。例如,我们需要配置Spring MVC的DispatcherServlet、JPA的数据源和实体管理,设置消息队列的连接...

    springmvc+dubbo/zookeeper+activemq+redis+mybatis+druid

    构建了一个复杂的企业级应用架构,包括前端展示(SpringMVC)、后端服务(Dubbo)、服务注册与发现(Zookeeper)、消息中间件(ActiveMQ)、高速缓存(Redis)、数据库访问(MyBatis)以及数据库连接管理(Druid)。...

Global site tag (gtag.js) - Google Analytics