`
longgangbai
  • 浏览: 7349303 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ模板和Velocity整合使用

 
阅读更多
ActiveMQ模板使用
    在ActiveMQ中AMQ将消息存在文件中的方法,将每一个消息创建一个对象,使用Velocity模板将信息替换之后追加到文件中。
  在AMQJournalTool中关于各种信息的格式如下:
private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
	private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
	private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
	private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
	private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
	private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
代码如下:
		VelocityContext context = new VelocityContext();
		List keys = Arrays.asList(context.getKeys());

		for (Iterator iterator = System.getProperties().entrySet()
				.iterator(); iterator.hasNext();) {
			Map.Entry kv = (Map.Entry) iterator.next();
			String name = (String) kv.getKey();
			String value = (String) kv.getValue();

			if (!keys.contains(name)) {
				context.put(name, value);
			}
		}
		
		VelocityEngine velocity = new VelocityEngine();
		velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
		velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
		velocity.init();


		resources.put("message", messageFormat);
		resources.put("topicAck", topicAckFormat);
		resources.put("queueAck", queueAckFormat);
		resources.put("transaction", transactionFormat);
		resources.put("trace", traceFormat);
		resources.put("unknown", unknownFormat);
将信息写入文件:
private void display(Entry entry) throws Exception {

		if (entry.getQuery() != null) {
			List list = Collections.singletonList(entry);
			List results = entry.getQuery().execute(list).getResults();
			if (results.isEmpty()) {
				return;
			}
		}

		CustomResourceLoader.setResources(resources);
		try {

			context.put("location", entry.getLocation());
			context.put("record", entry.getRecord());
			context.put("type", entry.getType());
			if (entry.getRecord() instanceof ActiveMQMessage) {
				context.put("body", new MessageBodyFormatter(
						(ActiveMQMessage) entry.getRecord()));
			}

			Template template = velocity.getTemplate(entry.getFormater());
			PrintWriter writer = new PrintWriter(System.out);
			template.merge(context, writer);
			writer.println();
			writer.flush();
		} finally {
			CustomResourceLoader.setResources(null);
		}
	}
附代码如下:
package org.apache.activemq.console.command.store.amq;

import java.io.File;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.apache.velocity.app.VelocityEngine;
import org.josql.Query;


public class AMQJournalTool {

	private final ArrayList<File> dirs = new ArrayList<File>();
	private final WireFormat wireFormat = new OpenWireFormat();
	private final HashMap<String, String> resources = new HashMap<String, String>();

	private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
	private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
	private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
	private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
	private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
	private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
	private String where;
	private VelocityContext context;
	private VelocityEngine velocity;
	private boolean help;

	public static void main(String[] args) throws Exception {
		AMQJournalTool consumerTool = new AMQJournalTool();
		String[] directories = CommandLineSupport
				.setOptions(consumerTool, args);
		if (directories.length < 1) {
			System.out
					.println("Please specify the directories with journal data to scan");
			return;
		}
		for (int i = 0; i < directories.length; i++) {
			consumerTool.getDirs().add(new File(directories[i]));
		}
		consumerTool.execute();
	}

	public void execute() throws Exception {

		if( help ) {
			showHelp();
			return;
		}
		
		if (getDirs().size() < 1) {
			System.out.println("");
			System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
			System.out.println("");
			showHelp();
			return;
		}

		for (File dir : getDirs()) {
			if( !dir.exists() ) {
				System.out.println("");
				System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
				System.out.println("");
				showHelp();
				return;
			}
			if( !dir.isDirectory() ) {
				System.out.println("");
				System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
				System.out.println("");
				showHelp();
				return;
			}
		}
		
		
		context = new VelocityContext();
		List keys = Arrays.asList(context.getKeys());

		for (Iterator iterator = System.getProperties().entrySet()
				.iterator(); iterator.hasNext();) {
			Map.Entry kv = (Map.Entry) iterator.next();
			String name = (String) kv.getKey();
			String value = (String) kv.getValue();

			if (!keys.contains(name)) {
				context.put(name, value);
			}
		}
		
		velocity = new VelocityEngine();
		velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
		velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
		velocity.init();


		resources.put("message", messageFormat);
		resources.put("topicAck", topicAckFormat);
		resources.put("queueAck", queueAckFormat);
		resources.put("transaction", transactionFormat);
		resources.put("trace", traceFormat);
		resources.put("unknown", unknownFormat);

		Query query = null;
		if (where != null) {
			query = new Query();
			query.parse("select * from "+Entry.class.getName()+" where "+where);

		}

		ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
		manager.start();
		try {
			Location curr = manager.getFirstLocation();
			while (curr != null) {

				ByteSequence data = manager.read(curr);
				DataStructure c = (DataStructure) wireFormat.unmarshal(data);

				Entry entry = new Entry();
				entry.setLocation(curr);
				entry.setRecord(c);
				entry.setData(data);
				entry.setQuery(query);
				process(entry);

				curr = manager.getNextLocation(curr);
			}
		} finally {
			manager.close();
		}
	}

	private void showHelp() {
		InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
		Scanner scanner = new Scanner(is);
		while (scanner.hasNextLine()) {
			String line = scanner.nextLine();
			System.out.println(line);
		}
		scanner.close();	}

	private void process(Entry entry) throws Exception {

		Location location = entry.getLocation();
		DataStructure record = entry.getRecord();

		switch (record.getDataStructureType()) {
		case ActiveMQMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQBytesMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQBlobMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQMapMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQObjectMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQStreamMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
			entry.setType("ActiveMQTextMessage");
			entry.setFormater("message");
			display(entry);
			break;
		case JournalQueueAck.DATA_STRUCTURE_TYPE:
			entry.setType("Queue Ack");
			entry.setFormater("queueAck");
			display(entry);
			break;
		case JournalTopicAck.DATA_STRUCTURE_TYPE:
			entry.setType("Topic Ack");
			entry.setFormater("topicAck");
			display(entry);
			break;
		case JournalTransaction.DATA_STRUCTURE_TYPE:
			entry.setType(getType((JournalTransaction) record));
			entry.setFormater("transaction");
			display(entry);
			break;
		case JournalTrace.DATA_STRUCTURE_TYPE:
			entry.setType("Trace");
			entry.setFormater("trace");
			display(entry);
			break;
		default:
			entry.setType("Unknown");
			entry.setFormater("unknown");
			display(entry);
			break;
		}
	}

	private String getType(JournalTransaction record) {
		switch (record.getType()) {
		case JournalTransaction.XA_PREPARE:
			return "XA Prepare";
		case JournalTransaction.XA_COMMIT:
			return "XA Commit";
		case JournalTransaction.XA_ROLLBACK:
			return "XA Rollback";
		case JournalTransaction.LOCAL_COMMIT:
			return "Commit";
		case JournalTransaction.LOCAL_ROLLBACK:
			return "Rollback";
		}
		return "Unknown Transaction";
	}

	private void display(Entry entry) throws Exception {

		if (entry.getQuery() != null) {
			List list = Collections.singletonList(entry);
			List results = entry.getQuery().execute(list).getResults();
			if (results.isEmpty()) {
				return;
			}
		}

		CustomResourceLoader.setResources(resources);
		try {

			context.put("location", entry.getLocation());
			context.put("record", entry.getRecord());
			context.put("type", entry.getType());
			if (entry.getRecord() instanceof ActiveMQMessage) {
				context.put("body", new MessageBodyFormatter(
						(ActiveMQMessage) entry.getRecord()));
			}

			Template template = velocity.getTemplate(entry.getFormater());
			PrintWriter writer = new PrintWriter(System.out);
			template.merge(context, writer);
			writer.println();
			writer.flush();
		} finally {
			CustomResourceLoader.setResources(null);
		}
	}

	public void setMessageFormat(String messageFormat) {
		this.messageFormat = messageFormat;
	}

	public void setTopicAckFormat(String ackFormat) {
		this.topicAckFormat = ackFormat;
	}

	public void setTransactionFormat(String transactionFormat) {
		this.transactionFormat = transactionFormat;
	}

	public void setTraceFormat(String traceFormat) {
		this.traceFormat = traceFormat;
	}

	public void setUnknownFormat(String unknownFormat) {
		this.unknownFormat = unknownFormat;
	}

	public void setQueueAckFormat(String queueAckFormat) {
		this.queueAckFormat = queueAckFormat;
	}

	public String getQuery() {
		return where;
	}

	public void setWhere(String query) {
		this.where = query;
	}

	public boolean isHelp() {
		return help;
	}

	public void setHelp(boolean help) {
		this.help = help;
	}

	/**
	 * @return the dirs
	 */
	public ArrayList<File> getDirs() {
		return dirs;
	}

}

 

分享到:
评论

相关推荐

    JMS教程+activemq以及activemq和tomcat的整合

    通过整合ActiveMQ和Tomcat,可以在Web应用中充分利用消息队列的优势,实现异步处理、提高系统的可扩展性和可靠性。同时,正确配置持久化消息和Tomcat服务器对于保证服务的稳定性和数据的完整性至关重要。在实际项目...

    zabbix-activemq监控模板

    zabbix-activemq监控模板zabbix-activemq监控模板zabbix-activemq监控模板

    ActiveMQ与spring整合封装

    以上就是ActiveMQ与Spring整合封装的基本过程,通过这样的方式,我们不仅实现了全注解开发,简化了操作,还提高了系统的性能和可扩展性。在实际应用中,还可以根据需求进行更深入的定制,比如添加事务支持、错误处理...

    Spring和ActiveMQ整合的完整实例

    将Spring与ActiveMQ整合,可以轻松地在Spring应用中实现消息队列的功能,提高系统的可扩展性和可靠性。 首先,让我们了解Spring框架如何支持消息传递。Spring提供了JmsTemplate类,这是一个模板类,用于简化发送和...

    activemq与spring的整合案例

    接下来,我们需要配置Spring的ApplicationContext.xml文件,声明一个ActiveMQ的ConnectionFactory和一个JMS模板(JMSTemplate)。ConnectionFactory是创建连接到消息代理的工厂,而JMSTemplate则是Spring提供的发送...

    activemq与tomcat整合

    整合ActiveMQ和Tomcat的好处在于,它允许Web应用程序利用ActiveMQ的强大功能,如发布/订阅模式和点对点模式的消息传递,以及高可用性和故障恢复能力。同时,由于ActiveMQ是独立于Tomcat运行的,因此可以与其他应用...

    ActiveMQ与Spring线程池整合实例

    ActiveMQ与Spring线程池整合的一个实例。 lib库没有上传。 对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar ...

    JMS教程+activemq以及activemq和tomcat的整合+整合实例代码+持久化消息配置以及工程+tomcat服务器的配置

    JMS简明教程+JMS规范教程+activemq以及activemq和tomcat的整合+整合实例代码+持久化消息配置以及工程+tomcat服务器的配置+整合需要的lib文件+部署多个tomcat服务器方案等

    activeMQ+spring整合

    在IT行业中,ActiveMQ和Spring的整合是企业级应用中常见的消息中间件解决方案。这个项目是基于Maven构建的,涵盖了ActiveMQ与Spring框架的集成,同时也涉及到了MyBatis的使用,使得数据访问更加便捷。下面将详细介绍...

    activemq与spring整合源代码

    ActiveMQ和Spring的整合是企业级应用中常见的一种技术组合,尤其在分布式系统和微服务架构中,消息队列(Message Broker)如ActiveMQ扮演着至关重要的角色。它能有效地实现系统间的异步通信,提高系统的可扩展性和...

    Springboot和ActiveMQ的整合实例

    在IT行业中,Spring Boot和ActiveMQ的整合是一个常见的任务,特别是在构建分布式系统和微服务架构时。Spring Boot以其简化配置和快速开发的特性受到广泛欢迎,而ActiveMQ作为一款开源的消息中间件,提供了消息队列...

    Spring和ActiveMQ的整合实例源码

    10. **Tomcat服务器**:Tomcat是一个流行的Java Web服务器,它可以部署和运行使用Spring和ActiveMQ的Web应用程序。 通过上述知识点,我们可以理解如何在Spring环境中利用ActiveMQ进行消息传递,实现高并发、解耦的...

    java springboot整合activemq工程

    java springboot整合activemq工程 #activemq配置 #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring....

    ActiveMQ与Tomcat整合教程

    首先,为了使Tomcat能够识别和使用ActiveMQ,你需要将ActiveMQ的必要库文件引入到Tomcat的环境中。具体操作是将ActiveMQ lib目录下的5个关键jar包(activemq-core-5.1.0.jar, activemq-web-5.1.0.jar, geronimo-j2ee...

    Spring Boot整合ActiveMQ

    Spring Boot 整合 ActiveMQ 的过程涉及到多个技术栈的集成,包括前端模板引擎Thymeleaf、数据库连接池Druid、事务管理以及消息队列的使用。以下将详细阐述这些知识点。 1. **Spring Boot**: Spring Boot 是由 ...

    springboot整合activemq案例

    总结起来,这个Spring Boot整合ActiveMQ的案例涵盖了如何配置和使用Queue与Topic,以及通过定时任务和Controller请求来发送消息。理解并掌握这些知识点,有助于我们在实际项目中构建高效、可靠的分布式消息传递系统...

    activemq5.5.1 Spring模板

    《ActiveMQ 5.5.1与Spring模板的深度整合》 在当今的企业级应用开发中,消息中间件起着至关重要的作用,它能够有效地解耦应用程序,提高系统的可扩展性和可靠性。Apache ActiveMQ作为开源社区中最受欢迎的消息...

    Maven+Spring+ActiveMQ整合

    maven spring mq整合,注意最新版的mq的jar包是集成了spring的,我用的5.11.1的。 运行之前,先要下载mq服务本地运行http://apache.fayea.com//activemq/5.14.3/apache-activemq-5.14.3-bin.zip

    ActiveMQ安装和使用

    ### ActiveMQ 安装与使用详解 #### 一、ActiveMQ简介 ActiveMQ 是Apache出品的一款优秀的开源消息中间件,支持多种消息传输协议,并且具备...通过以上步骤,可以有效地部署和使用ActiveMQ来实现消息中间件的功能。

Global site tag (gtag.js) - Google Analytics