`

开源消息中间件ActiveMQ回顾:Java客户端实现

 
阅读更多

前一段时间工作中经常使用到Apache ActiveMQ用作消息传输。今天在公司不是很忙,于是又深入研究了一下,总结一下分享出来。

基于ActiveMQ的Java客户端实现例子。

接口定义:

public interface MQService {
    public void start();
    public void sendQueueMessage(String text) throws JMSException;
    public void publishTopicMessage(String text) throws JMSException;
    public void destroy();
}

实现类(部分代码折行了,大家将就着看吧):

public class ActiveMQServiceImpl implements MQService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQServiceImpl.class);
    private static MQService instance = null;
    
    private InitialContext initialContext;
    
    private QueueConnectionFactory queueConnectionFactory;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue queue;
    private QueueSender queueSender;
    private QueueReceiver queueReceiver;
    
    private TopicConnectionFactory topicConnectionFactory;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private Topic topic;
    private TopicPublisher topicPublisher;
    private TopicSubscriber topicSubscriber;
    
    protected ActiveMQServiceImpl() {
        try {
        	initialContext = new InitialContext(getInitalContextTable());
            initQueue();
            initTopic();
            LOGGER.info("AMQ init complete!");
        } catch (Exception e) {
            LOGGER.error("failed to connect mq:",e);
        }
    }
    
    private static Hashtable<String,String> getInitalContextTable(){
    	Hashtable<String,String> table=new Hashtable<String,String>();
    	table.put("java.naming.factory.initial", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.factory.initial"));
    	table.put("java.naming.provider.url", (String)ConfigProject.AMQ_CONFIG.getString("java.naming.provider.url"));
    	return table;
    }
    
    public static MQService getInstance(){
    	if(instance==null){
    		instance = new ActiveMQServiceImpl();
    		return instance;
    	}
    	return instance;
    }
    
    protected void initQueue() throws Exception{
    	LOGGER.info("initQueue begin...");
    	//获取queueConnectionFactory,通过javax.jms.ConnectionFactory.
    	queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");
    	
    	//创建queueConnection
    	if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal")!=null) {
            queueConnection = queueConnectionFactory.createQueueConnection(
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"),
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials")
            		);
        } else {
            queueConnection = queueConnectionFactory.createQueueConnection();
        }
        
        //设置queueConnection的clientId持久化消息
        String clientID = "queue@";
        try {
            clientID += InetAddress.getLocalHost();
        } catch (Exception e) {
            clientID += UUID.randomUUID().toString();
        }
        queueConnection.setClientID(clientID);
        
        //创建queueSession
        queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        
        //创建queue
        queue = queueSession.createQueue((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.queue"));
        
        //创建queueSender发送消息
        queueSender = queueSession.createSender(queue);
        //设置消息是否持久化:如果消费端没有接收到消息,设置持久化后即使AMQ服务器重启,消费端启动后也会收到消息.
        //如果设置非持久化,消息只保存在AMQ服务器内存中,一旦服务器重启,消费端将收不到消息.
        queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //创建queueReceiver接收消息
        queueReceiver = queueSession.createReceiver(queue);
        queueReceiver.setMessageListener(new QueueReceiveListener());
        LOGGER.info("initQueue end...");
    }
    
    protected void initTopic() throws Exception{
    	LOGGER.info("initTopic begin...");
    	//获取topicConnectionFactory
        topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup("ConnectionFactory");
        //创建topicConnection
        if (ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal") != null) {
            topicConnection = topicConnectionFactory.createTopicConnection(
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.principal"),
            		(String)ConfigProject.AMQ_CONFIG.getProperty("java.naming.security.credentials"));
        } else {
            topicConnection = topicConnectionFactory.createTopicConnection();
        }
        
        //设置clientId
        String clientID = "topic@";
        try {
            clientID += InetAddress.getLocalHost();
        } catch (Exception e) {
            clientID += UUID.randomUUID().toString();
        }
        topicConnection.setClientID(clientID);
        
        //创建topicSession
        topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建topic
        topic = topicSession.createTopic((String)ConfigProject.AMQ_CONFIG.getProperty("test.notification.topic"));
        
        //创建发布者
        topicPublisher = topicSession.createPublisher(topic);
        topicPublisher.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //创建消费者
        topicSubscriber  = topicSession.createSubscriber(topic);
        topicSubscriber.setMessageListener(new TopicReciverListener());
        LOGGER.info("initTopic end...");
    }
    

    public void start() {
        if(queueConnection!=null)
        {
            try {
                queueConnection.start();
            } catch (JMSException e) {
            	LOGGER.error("failed to start queue connection!",e);
            }
        }
        if(topicConnection!=null)
        {
            try {
                topicConnection.start();
            } catch (JMSException e) {
            	LOGGER.error("failed to start topic connection!",e);
            }
        }
    }

    public void sendQueueMessage(String text) throws JMSException {
        queueSender.send(queueSession.createTextMessage(text));
    }
    
    public void publishTopicMessage(String text) throws JMSException {
    	topicPublisher.publish(topicSession.createTextMessage(text));
    }

    public void destroy() {
        try {
        	this.queueSender.close();
        	this.queueReceiver.close();
            this.queueSession.close();
            this.queueConnection.close();
            
            this.topicPublisher.close();
            this.topicSubscriber.close();
            this.topicSession.close();
            this.topicConnection.close();
            
            this.initialContext.close();
        } catch (NamingException e) {
            LOGGER.warn("failed to shutdown mq", e);
        } catch (Exception e) {
            LOGGER.warn("failed to shutdown mq", e);
        }
    }
}

上面程序中:

ConfigProject是个配置工具类,读取properties中的AMQ配置信息。

JNDI初始化InitialContext时一定要注意加载amq的InitialContextFactory和请求url地址。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

java.naming.provider.url=failover://(nio://1x.xx.xx.x3:61616,nio://1x.xx.xx.x4:61616)randomize=false

日志用的是logback+slf4j这2个开源组件。

QueueReceiveListener是队列消费端的监听器,TopicReciverListener是主题订阅者的监听器。

QueueReceiveListener.java:

/**
 * 消息监听类:
 * 如果需要回复,则可以作为内部类实现.
 */
public class QueueReceiveListener implements MessageListener{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveListener.class);
	
	public void onMessage(Message message) {
		LOGGER.info("QueueReceiveListener onMessage begin...");
		try{
			if(message instanceof TextMessage){
				TextMessage textMessage = (TextMessage) message;
				LOGGER.info("textMessage:"+textMessage.getText());
			}
		}catch(Exception e){
			e.printStackTrace();
		}
		LOGGER.info("QueueReceiveListener onMessage end...");
	}
	
}

 TopicReciverListener.java:

public class TopicReciverListener implements MessageListener{
	
	private static final Logger LOGGER = LoggerFactory.getLogger(TopicReciverListener.class);
	
	public void onMessage(Message message) {
		LOGGER.info("TopicReciverListener onMessage begin...");
		try{
			if(message instanceof TextMessage){
				TextMessage textMessage = (TextMessage) message;
				LOGGER.info("textMessage:"+textMessage.getText());
			}
		}catch(Exception e){
			e.printStackTrace();
		}
		LOGGER.info("TopicReciverListener onMessage end...");
	}

}

同样,这两个监听类也可以通过内部类的形式写在实现类内部,例如需要回复收到的消息时。此处因为没有回复消息的功能,所以就单独写了一个类。

最后,定义一个工具类方便调用:

public class MQServiceUtils {
        //此处同样可以使用IoC框架注入
	private static MQService mqService = ActiveMQServiceImpl.getInstance();
	
	public static void sendMessage(String message){
		try{
			mqService.sendQueueMessage(message);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	public static void publishMessage(String message){
		try{
			mqService.publishTopicMessage(message);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

 

 

附一些不错的参考资料:

http://www.iteye.com/topic/836509 基于ActiveMQ 的发布/订阅实例

http://wenku.baidu.com/view/a41c0a29bd64783e09122bbb.htmls  p2p/topic开发流程.
http://wenku.baidu.com/view/6c763565f5335a8102d22059.html  AMQ PPT文档.
http://wenku.baidu.com/view/4e63b6dbad51f01dc281f10d.html  AMQ in Action中文版
http://www.huaishao8.com/config/activemq/143.html AMQ使用详解(连载)
http://wenku.baidu.com/view/a41c0a29bd64783e09122bbb.htmls  ActiveMQ教程
http://jinguo.iteye.com/blog/233188 
分享到:
评论

相关推荐

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    本实验主要关注的是如何使用ActiveMQ这一开源的消息中间件来实现单线程多队列的场景。ActiveMQ是由Apache软件基金会开发的,它是Java消息服务(JMS)的实现,支持多种协议,并且可以跨平台运行。 【描述】:...

    消息中间件应用开发: ActiveMQ实现单 线程多队列-Java代码类资源

    ActiveMQ是由Apache软件基金会开发的一款开源消息中间件,它支持多种消息协议,如AMQP、STOMP、MQTT等,并且提供了丰富的API和客户端库,便于集成到各种Java应用程序中。在单线程多队列的设置下,ActiveMQ可以有效地...

    Java消息中间件ActiveMQ学习资料

    Java消息中间件ActiveMQ是Apache软件基金会开发的一款开源消息队列系统,广泛应用于分布式系统中的异步处理和解耦。ActiveMQ提供了多种协议支持,包括开放标准的JMS(Java Message Service)和AMQP(Advanced ...

    ActiveMQ客户端

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的Message Broker,它允许应用程序通过消息传递进行异步通信。ActiveMQ客户端库是用于与ActiveMQ服务器交互的接口,允许开发者在他们的应用中发送和接收消息。这个...

    自己实现的 ActiveMQ 多线程客户端 包含生产消息客户端和消费者消息客户端

    ActiveMQ 是一个开源的消息中间件,它遵循开放消息模型(JMS)标准,提供高性能、高可用性和可扩展性的消息传递服务。在这个项目中,我们看到的是一个自己实现的 ActiveMQ 客户端,它特别关注多线程的实现,这在处理...

    简单的java实现消息中间件activemq的收发.zip

    简单的java实现消息中间件activemq的收发

    工作学习-消息中间件activeMQ学习总结

    ActiveMQ是Apache软件基金会提供的一个开源消息中间件,支持多种协议和编程语言。ActiveMQ提供了高可用性、容错性和持久化等特性。 四、JMS规范 JMS(Java Message Service)是一个基于Java平台的消息中间件API,...

    消息队列:ActiveMQ:ActiveMQ与Java消息服务(JMS)API教程.docx

    消息队列:ActiveMQ:ActiveMQ与Java消息服务(JMS)API教程.docx

    java中间件之activemq

    Java中间件领域的ActiveMQ是一款由Apache开发的开源消息中间件,它为企业级应用提供高效、可扩展、稳定且安全的消息通信服务。ActiveMQ的核心目标是实现标准的、面向消息的集成,支持多语言环境,确保不同平台之间的...

    本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能

    本项目正是利用Spring平台来集成开源消息队列中间件ActiveMQ,以实现消息的发送和接收功能,同时也对消息队列的两种主要工作模式——生产者-消费者模式和发布-订阅模式进行了深入探讨。 首先,ActiveMQ是Apache软件...

    消息队列:ActiveMQ:ActiveMQ消息类型:点对点与发布订阅.docx

    消息队列:ActiveMQ:ActiveMQ消息类型:点对点与发布订阅.docx

    消息中间件activemq项目demo

    1. **ActiveMQ**:ActiveMQ是Apache软件基金会的一个开源项目,是一款高效、灵活且功能丰富的消息中间件,它支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,用于在分布式系统中进行可靠的消息传递。 2. **Java ...

    activeMQ 服务端客户端 java代码

    Apache ActiveMQ 是一款开源的消息中间件,它遵循开放消息中间件协议(Open Message Broker Protocol),支持多种消息协议,如 OpenWire、STOMP、AMQP、MQTT 和 WebSocket 等。在 Java 开发环境中,ActiveMQ 提供了...

    消息队列:ActiveMQ:ActiveMQ的Web控制台使用.docx

    消息队列:ActiveMQ:ActiveMQ的Web控制台使用.docx

    ActiveMQ消息中间件

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。 下载本压缩包后解压运行里面的activemq.bat即可。 http://192.168.0.61:8161/ 是管理ActiveMQ的后台管理系统入口, 如需在JAVA中使用,请下载本人的 ...

    ActiveMQ消息中间件面试专题.pdf

    作为面向消息的中间件(MOM),ActiveMQ实现了JMS(Java Message Service)1.1规范,这一规范定义了企业级消息服务的标准接口和行为。 JMS定义了一套API,应用程序可以通过这些API发送和接收消息。消息通常被分为两...

    java消息中间件教程-activemq

    - **3-1 JMS规范**:Java消息服务(Java Message Service, JMS)是Java平台中关于面向消息中间件(MOM)的标准客户端接口,它的主要目的是让Java应用程序能够与实现JMS的应用程序服务器通信。JMS定义了两种消息模型...

    activemq spring 客户端配置

    ActiveMQ是Apache软件基金会开发的一个开源消息代理,它实现了多种消息协议,如JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)等,用于在分布式系统中传递消息,实现解耦和异步处理。...

    消息队列中间件ActiveMQ入门到精通视频教程及资料

    003-顺序消费+消息过滤SELECTOR+MessageConsumer+MySql持久化;004-p2p模式+pulish-subscribe发布订阅模式+与spring集成;005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)...

Global site tag (gtag.js) - Google Analytics