`

activeMQ嵌入Java代码的使用

 
阅读更多

初学amq,根据网上学习到的一些,整理了一下amq嵌入Java代码的使用,如有错误请指出。

 

首先需要导入两个jar包:

activemq-all-5.5.1.jar
slf4j-nop-1.4.3.jar

 

import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;

public class AmqManager {
	
	public static BrokerService broker = new BrokerService();
	private final static AmqManager amq = new AmqManager();
	/** 连接器 */
	private static Connection connection = null;
	/** 按队列名获取session */
	private static Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
	/** 按队列名称获取生产者对象 */
	private static Map<String, MessageProducer> producerMap = new ConcurrentHashMap<String, MessageProducer>();
	/** 按队列名称获取消费者对象 */
	private static Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
	
	private AmqManager(){};
	
	public static synchronized AmqManager getAMQ() throws Exception {
		return amq;
	}
	/**
	 * 获取连接器
	 * @param brokerUri
	 * @param clientID
	 * @return
	 * @throws Exception 
	 */
	public synchronized Connection initConnection(String brokerUri, String clientID) throws Exception {
		if (null == connection) {
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri);
			connection = connectionFactory.createConnection();
			connection.setClientID(clientID);
			connection.start();
		}
		return connection;
	}
	
	/**
	 * 初始化生产者
	 * @param queueName
	 * @param acknowledgeMode
	 * @param deliveryMode
	 * @return
	 * @throws JMSException
	 */
	public MessageProducer initProducer(String queueName, int acknowledgeMode, int deliveryMode) throws JMSException {
		MessageProducer producer = null;
		if (connection != null) {
			Session session = connection.createSession(false, acknowledgeMode);
			Destination destination = session.createQueue(queueName);
			producer = session.createProducer(destination);
	        producer.setDeliveryMode(deliveryMode);
	        sessionMap.put(queueName, session);
	        producerMap.put(queueName, producer);
		}
		return producer;
	}
	
	/**
	 * 初始化消费者
	 * @param queueName
	 * @param acknowledgeMode
	 * @param deliveryMode
	 * @return
	 * @throws JMSException
	 */
	public MessageConsumer initConsumer(String queueName, int acknowledgeMode) throws JMSException {
		MessageConsumer consumer = null;
		if (connection != null) {
			Session session = connection.createSession(false, acknowledgeMode);
			Destination destination = session.createQueue(queueName);
			consumer = session.createConsumer(destination);
			consumerMap.put(queueName, consumer);
		}
		return consumer;
	}

	/**
	 * 接收消息
	 * @param consumer
	 * @throws JMSException
	 */
	public void getMessage(String queueName) throws JMSException
    {
	    MessageConsumer consumer = consumerMap.get(queueName);
	    while (true) {
	        TextMessage textMessage = (TextMessage) consumer.receive(100000);
	        if(textMessage != null){
	            System.out.println("收到消息:" + textMessage.getText());
	        }else {
	            System.out.println("接收消息异常");
	            break;
	        }
	    }
    }

	/**
	 * 发送消息
	 * @param queueName
	 * @param message
	 * @throws JMSException
	 */
	public void sendMessage(String queueName, String message) throws JMSException {
	    TextMessage msg = sessionMap.get(queueName).createTextMessage(message);
	    //msg.setStringProperty("sqlId", "comstar-market-data-feedhub");
	    producerMap.get(queueName).send(msg);
	}
	
	/**
	 * 发送消息
	 * @param queueName
	 * @param message
	 * @param headName
	 * @param headValue
	 * @throws JMSException
	 */
	public void sendMessage(String queueName, String message, String headName,
	                String headValue) throws JMSException {
		TextMessage msg = sessionMap.get(queueName).createTextMessage(message);
		msg.setStringProperty(headName, headValue);
		producerMap.get(queueName).send(msg);
	}

	/**
	 * 关闭session
	 * @param queueName
	 * @throws JMSException
	 */
	public void close(String queueName) throws JMSException {
		Session session = sessionMap.get(queueName);
		if (null != session) {
			session.close();
			sessionMap.remove(queueName);
			destroy();
		}
	}
	
	/**
	 * 销毁连接
	 * note:如果session都没有了,则销毁连接
	 * @throws JMSException
	 */
	private synchronized void destroy() throws JMSException {
		if (connection != null) {
			if (0 == sessionMap.size()) {
				connection.close();
				connection = null;
			}
		}
	}
	
	public synchronized static void initAMQ() {
		try {
			TransportConnector connector = new TransportConnector();
			connector.setUri(new URI("tcp://localhost:61616"));
			broker.addConnector(connector);
			broker.start();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void destoryAMQ() {
		try {
			//close("");
			broker.stop();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		try {
			getAMQ();
			//BrokerService broker = new BrokerService();
			TransportConnector connector = new TransportConnector();
			connector.setUri(new URI("tcp://localhost:61616"));
			broker.addConnector(connector);
			broker.start();
			amq.initConnection("failover://tcp://localhost:61616", "cning");
			amq.initProducer("queue.test", Session.AUTO_ACKNOWLEDGE, DeliveryMode.NON_PERSISTENT);
			amq.initConsumer("queue.test", Session.AUTO_ACKNOWLEDGE);
			amq.sendMessage("queue.test", "hahahahahahahhahaha");
			amq.getMessage("queue.test");
			amq.destoryAMQ();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

分享到:
评论

相关推荐

    spring+activeMQ 嵌入式配置 完整demo(包含jar包)

    要将ActiveMQ嵌入到Spring应用中,首先需要在项目中引入ActiveMQ的相关依赖。在Maven项目中,可以在pom.xml文件中添加以下依赖: ```xml &lt;groupId&gt;org.apache.activemq &lt;artifactId&gt;activemq-spring &lt;version&gt;5...

    ActiveMQ In Action精简版

    使用ActiveMQ创建Java应用时,可以利用其提供的JMS API,通过简单的代码即可实现消息的发送和接收。此外,ActiveMQ还可以嵌入到Java容器中,如Web应用服务器,为容器内的应用提供消息传递服务。除了Java,ActiveMQ还...

    JMS ActiveMQ演示代码

    本压缩包中的"JMS ActiveMQ演示代码"提供了一个具体的实例,展示了如何在实际项目中使用ActiveMQ来实现消息传递。这个代码可以直接放入工程中运行,帮助开发者理解和实践JMS与ActiveMQ的结合使用。 **点对点(Point...

    ActiveMQ5.0 监视的JSP支持中文

    2. **JSP(Java Server Pages)**:JSP是Java的一种Web开发技术,允许在HTML页面中嵌入Java代码,用于动态生成网页内容。在ActiveMQ中,JSP常用于创建管理控制台,提供监控和管理消息队列的功能。 3. **国际化(i18...

    ActiveMQ消息总线介绍

    - **Java配置**:除了传统的XML配置外,还可以通过Java代码的方式进行配置。这种方式更加轻量级,易于集成到Java应用程序中。 #### 五、ActiveMQ与Spring框架的集成 Spring框架提供了对JMS的支持,使得开发者可以...

    ActiveMQ手册 - 开发文档

    - **创建生产者和消费者**:学习如何在Java代码中创建消息生产者和消费者,以及如何设置消息属性。 - **事务处理**:了解如何使用JMS事务保证消息的一致性。 - **消息选择器**:学习如何使用消息选择器筛选接收特定...

    activeMQ整合Spring

    ActiveMQ 可以作为独立服务器运行,也可以嵌入到应用中,支持多种语言的客户端,包括 Java、C++、Python 等。 二、Spring 框架与消息集成 Spring 提供了一个名为 `Spring JMS` 的模块,它简化了与 JMS 提供者(如 ...

    ActiveMQ简介.docx

    ActiveMQ 是一款由Apache软件基金会开发的开源消息中间件,它是基于Java的,可以在任何支持Java虚拟机的操作系统上运行。消息中间件的作用在于提供一个可靠的消息传递平台,使得应用程序之间可以通过消息进行通信,...

    ActiveMQ 消息队列

    3. **与Spring框架的无缝集成**:ActiveMQ内置了对Spring的支持,使得开发人员能够轻松地将ActiveMQ嵌入到基于Spring的应用程序中,利用Spring的强大功能,如依赖注入和面向切面编程。 4. **广泛的J2EE服务器兼容...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    这两个文件是Apache ActiveMQ项目的一部分,ActiveMQ是业界广泛使用的开放源代码消息代理,它实现了Java消息服务(JMS)规范。 1. **Apache ActiveMQ**: ActiveMQ是Apache软件基金会的一个项目,旨在提供一个高性能...

    apache-activemq-5.14.1资源下载

    10. **嵌入式使用**: ActiveMQ 可以作为应用程序的一部分被嵌入,这样可以减少网络延迟,提高性能,同时简化部署。 在实际应用中,Apache ActiveMQ 5.14.1 可以用于构建分布式系统、微服务架构、事件驱动系统,或者...

    ActiveMQ In Action

    **使用ActiveMQ**主要涉及如何创建Java应用、嵌入到Java容器中以及使用其他语言与ActiveMQ交互等内容。 - **创建Java应用**: - 使用JMS API编写代码来发送和接收消息。 - 示例展示了如何使用ActiveMQ提供的...

    ActiveMQ学习

    3. **与 Spring 集成**:ActiveMQ 可轻松嵌入到使用 Spring 框架的系统中,并且支持 Spring 2.0 的特性,简化了在 Spring 应用中的部署和配置。 4. **高效的消息持久化**:ActiveMQ 支持通过 JDBC 和 journal 实现...

    ActiveMQ——Broker

    ActiveMQ 是一个开源的消息...它可以独立运行,也可以嵌入到 Java 应用中,提供灵活的配置和强大的消息处理能力。理解和掌握 Broker 的工作原理及其配置方式,对于利用 ActiveMQ 构建高效、可靠的分布式系统至关重要。

    JavaEE开发技术与案例教程第2版-课件和代码.rar

    它将Java代码嵌入到HTML中,简化了页面的呈现逻辑。学习JSP时,你将了解EL(Expression Language)和JSTL(JavaServer Pages Standard Tag Library),这两个工具可以增强JSP的可读性和可维护性。 3. **EJB**:EJB...

    Tomcat与Java_Web开发技术详解3.pdf

    2. **JavaServer Pages (JSP)**:JSP是基于Java Servlet的另一种技术,它允许开发者将Java代码嵌入到HTML页面中,实现动态内容的生成。JSP页面在服务器端被编译成Servlet,然后执行。 3. **JavaBean**:JavaBean是一...

    activemq-store-bdbn-2.1.jar.zip

    这个版本的存储模块特别引入了Berkeley DB Java Edition(BDBNJ),它是一个高性能、可嵌入的数据存储解决方案,适用于需要持久化消息存储的应用场景。 Apache ActiveMQ是Apache软件基金会开发的一个开源消息代理,...

    经典JAVA实现BBS论坛系统

    JSP(JavaServer Pages)则是一种动态网页技术,将Java代码嵌入HTML中,使得视图层可以动态生成。 5. **用户认证和授权**:BBS系统必须有用户登录和注册功能,这就需要实现用户认证。同时,还需要控制用户权限,如...

    Java EE开发成才路线图

    Servlet是服务器端程序,JSP则允许在HTML中嵌入Java代码,简化视图层开发。 2. **Web开发** - **MVC模式**:了解Model-View-Controller架构,它是Java EE应用中常见的设计模式,用于分离业务逻辑、数据模型和用户...

Global site tag (gtag.js) - Google Analytics