`

ActiveMQ 即时通讯服务 浅析

 
阅读更多
一、 概述与介绍

ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。


二、 特性

1、 多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP

2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4、通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

5、支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA

6、支持通过JDBC和journal提供高速的消息持久化

7、从设计上保证了高性能的集群,客户端-服务器,点对点

8、支持Ajax

9、支持与Axis的整合

10、可以很容易得调用内嵌JMS provider,进行测试

三、 安装

开发环境:

System:Windows

JDK:1.6+

IDE:eclipse

apache ActiveMQ 5.8

Email:hoojo_@126.com

Blog:http://blog.csdn.net/IBM_hoojo

http://hoojo.cnblogs.com/

1、 下载ActiveMQ,下载地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip

2、 解压apache-activemq-5.8.0.zip即可完成ActiveMQ的安装

3、 解压后目录结构如下

image

+bin (windows下面的bat和unix/linux下面的sh) 启动ActiveMQ的启动服务就在这里

+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

+data (默认是空的)

+docs (index,replease版本里面没有文档)

+example (几个例子)

+lib (activeMQ使用到的lib)

+webapps (系统管理员控制台代码)

+webapps-demo(系统示例代码)

-activemq-all-5.8.0.jar (ActiveMQ的binary)

-user-guide.html (部署指引)

-LICENSE.txt

-NOTICE.txt

-README.txt

其他文件就不相信介绍了,搞Java的应该都知道干什么用的。

你可以进入bin目录,使用activemq.bat双击启动(windows用户可以选择系统位数,如果你是linux的话,就用命令行的发送去启动),如果一切顺利,你就会看见类似下面的信息:

image

如果你看到这个,那么恭喜你成功了。如果你启动看到了异常信息:

Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind

那么我告诉你,很不幸,你的端口被占用了。接下来你大概想知道是哪个程序占用了你的端口,并kill掉该进程或服务。或者你要尝试修改ActiveMQ的默认端口61616(ActiveMQ使用的默认端口是61616),在大多数情况下,占用61616端口的是Internet Connection Sharing (ICS) 这个Windows服务,你只需停止它就可以启动ActiveMQ了。

4、 启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。

image

其中在导航菜单中,Queues是队列方式消息。Topics是主题方式消息。Subscribers消息订阅监控查询。Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network是网络链接数监控。Send可以发送消息数据。

5、 运行demo示例,在dos控制台输入activemq.bat xbean:../conf/activemq-demo.xml 即可启动demo示例。官方提供的user-guide.html中的access the web console中是提示输入:activemq.bat console xbean:conf/activemq-demo.xml,我用这种方式不成功。

当然你还可以用绝对的文件目录方式:activemq.bat xbean:file:D:/mq/conf/activemq-demo.xml

image

如果提示conf/activemq-demo.xml没有找到,你可以尝试改变下路径,也就是去掉上面的“..”。通过http://localhost:8161/demo/ 就可以访问示例了。

image


四、 消息示例

1、ActiviteMQ消息有3中形式

JMS 公共

点对点域

发布/订阅域

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

(1)、点对点方式(point-to-point)

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

(2)、发布/订阅 方式(publish/subscriber Messaging)

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

2、ActiviteMQ接收和发送消息基本流程

image

发送消息的基本步骤:

(1)、创建连接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3)、使用连接Connection 建立会话Session

(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)、使用消息生产者MessageSender发送消息



消息接收者从JMS接受消息的步骤

(1)、创建连接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3)、使用连接Connection 建立会话Session

(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

五、 代码示例

在代码开始,我们先建一个project,在这个project中添加如下jar包

image

添加完jar包后就可以开始实际的代码工作了。

1、 使用JMS方式发送接收消息

消息发送者

package com.hoo.mq.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* <b>function:</b> 消息发送者
* @author hoojo
* @createDate 2013-6-19 上午11:26:43
* @file MessageSender.java
* @package com.hoo.mq.jms
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class MessageSender {

    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.queue";
   
    /**
     * <b>function:</b> 发送消息
     * @author hoojo
     * @createDate 2013-6-19 下午12:05:42
     * @param session
     * @param producer
     * @throws Exception
     */   
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";
            TextMessage text = session.createTextMessage(message);
           
            System.out.println(message);
            producer.send(text);
        }
    }
   
    public static void run() throws Exception {
       
        Connection connection = null;
        Session session = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(DESTINATION);
            // 创建消息制作者
            MessageProducer producer = session.createProducer(destination);
            // 设置持久化模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, producer);
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        MessageSender.run();
    }
}


接受者

package com.hoo.mq.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* <b>function:</b> 消息接收者
* @author hoojo
* @createDate 2013-6-19 下午01:34:27
* @file MessageReceiver.java
* @package com.hoo.mq.jms
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class MessageReceiver {

    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.queue";
   
   
    public static void run() throws Exception {
       
        Connection connection = null;
        Session session = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(DESTINATION);
            // 创建消息制作者
            MessageConsumer consumer = session.createConsumer(destination);
           
            while (true) {
                // 接收数据的时间(等待) 100 ms
                Message message = consumer.receive(1000 * 100);
               
                TextMessage text = (TextMessage) message;
                if (text != null) {
                    System.out.println("接收:" + text.getText());
                } else {
                    break;
                }
            }
           
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        MessageReceiver.run();
    }
}


2、 Queue队列方式发送点对点消息数据

发送方

package com.hoo.mq.queue;

import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* <b>function:</b> Queue 方式消息发送者
* @author hoojo
* @createDate 2013-6-19 下午04:34:36
* @file QueueSender.java
* @package com.hoo.mq.queue
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class QueueSender {
   
    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.queue";
   
    /**
     * <b>function:</b> 发送消息
     * @author hoojo
     * @createDate 2013-6-19 下午12:05:42
     * @param session
     * @param sender
     * @throws Exception
     */   
    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";
           
            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);
           
            sender.send(map);
        }
    }
   
    public static void run() throws Exception {
       
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Queue queue = session.createQueue(DESTINATION);
            // 创建消息发送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 设置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        QueueSender.run();
    }
}
接收方

package com.hoo.mq.queue;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* <b>function:</b> 消息接收者; 依赖hawtbuf-1.9.jar
* @author hoojo
* @createDate 2013-6-19 下午01:34:27
* @file MessageReceiver.java
* @package com.hoo.mq.queue
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class QueueReceiver {

    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String TARGET = "hoo.mq.queue";
   
   
    public static void run() throws Exception {
       
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Queue queue = session.createQueue(TARGET);
            // 创建消息制作者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
           
            receiver.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠100ms再关闭
            Thread.sleep(1000 * 100);
           
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        QueueReceiver.run();
    }
}


3、 Topic主题发布和订阅消息

消息发送方

package com.hoo.mq.topic;

import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


/**
* <b>function:</b> Queue 方式消息发送者
* @author hoojo
* @createDate 2013-6-19 下午04:34:36
* @file QueueSender.java
* @package com.hoo.mq.topic
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class TopicSender {
   
    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.topic";
   
    /**
     * <b>function:</b> 发送消息
     * @author hoojo
     * @createDate 2013-6-19 下午12:05:42
     * @param session 会话
     * @param publisher 发布者
     * @throws Exception
     */   
    public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";
           
            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);
           
            publisher.send(map);
        }
    }
   
    public static void run() throws Exception {
       
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 创建消息发送者
            TopicPublisher publisher = session.createPublisher(topic);
            // 设置持久化模式
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, publisher);
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        TopicSender.run();
    }
}
接收方

package com.hoo.mq.topic;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* <b>function:</b> 消息接收者; 依赖hawtbuf-1.9.jar
* @author hoojo
* @createDate 2013-6-19 下午01:34:27
* @file MessageReceiver.java
* @package com.hoo.mq.topic
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class TopicReceiver {

    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String TARGET = "hoo.mq.topic";
   
   
    public static void run() throws Exception {
       
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Topic topic = session.createTopic(TARGET);
            // 创建消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);
           
            subscriber.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠100ms再关闭
            Thread.sleep(1000 * 100);
           
            // 提交会话
            session.commit();
           
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
   
    public static void main(String[] args) throws Exception {
        TopicReceiver.run();
    }
}


4、 整合Spring实现消息发送和接收,在整合之前我们需要添加jar包,需要的jar包如下

image

这些jar包可以在D:\apache-activemq-5.8.0\lib这个lib目录中找到,添加完jar包后就开始编码工作。

消息发送者

package com.hoo.mq.spring.support;

import java.util.Date;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
* <b>function:</b> Spring JMSTemplate 消息发送者
* @author hoojo
* @createDate 2013-6-24 下午02:18:48
* @file Sender.java
* @package com.hoo.mq.spring.support
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class Sender {

    public static void main(String[] args) {
        ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                MapMessage message = session.createMapMessage();
                message.setString("message", "current system time: " + new Date().getTime());
               
                return message;
            }
        });
    }
}
消息接收者

package com.hoo.mq.spring.support;

import java.util.Map;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

/**
* <b>function:</b> Spring JMSTemplate 消息接收者
* @author hoojo
* @createDate 2013-6-24 下午02:22:32
* @file Receiver.java
* @package com.hoo.mq.spring.support
* @project ActiveMQ-5.8
* @blog http://blog.csdn.net/IBM_hoojo
* @email hoojo_@126.com
* @version 1.0
*/
public class Receiver {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml"); 
         
        JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate"); 
        while(true) { 
            Map<String, Object> map =  (Map<String, Object>) jmsTemplate.receiveAndConvert(); 
           
            System.out.println("收到消息:" + map.get("message")); 
        } 
    }
}
这里主要是用到了JmsTemplate这个消息模板,这个对象在spring的IoC容器中管理,所以要从spring的容器上下文中获取。下面看看spring的配置文件applicationContext-beans.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">
   
    <!-- 连接池  -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 
        <property name="connectionFactory"> 
            <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
                <property name="brokerURL" value="tcp://localhost:61616" /> 
            </bean> 
        </property> 
    </bean> 
     
    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
        <property name="brokerURL" value="tcp://localhost:61616" /> 
    </bean> 
   
    <!-- 配置消息目标 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 
        <!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
        <constructor-arg index="0" value="hoo.mq.queue" /> 
    </bean> 

    <!-- 消息模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
        <property name="connectionFactory" ref="activeMQConnectionFactory" /> 
        <property name="defaultDestination" ref="destination" /> 
        <property name="messageConverter"> 
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
        </property> 
    </bean> 
</beans>
这里的整合就比较简单了,如果你是web工程,那你在需要用jms的时候,只需用注入jmsTemplate即可。
转自http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activemq.html

p2p 点对点模式,  也就是一对一,服务端发送一条消息到Destination 即Queue  

虽然可能有n个客户端在队列中侦听消息,但只有一个可以读取到消息,之后消息将不存在,其他人没法读取

publish/substrib   则可以理解为一对多

1、P2P模型
在P2P模型中,有下列概念:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队 列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时 。
 每个消息只有一个消费者 (Consumer)(即一旦被消费,消息就不再在消息队列中)
 发送者和接收者之间在时间上没有依赖性 ,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
 接收者在成功接收消息之后需向队列应答成功
如果你希望发送的每个消息都应该被成功处理 的话,那么你需要P2P模型。

适用场合:想让接收者进行且只进行一次处理组件之间进行同步通信

2   在Pub/Sub模型中,有下列概念: 主题(Topic)、发布者(Publisher)、订阅者(Subscriber)。客户端将消息发送到主题。多个发布者将消息发送到Topic,系统 将这些消息传递给多个订阅者。
 每个消息可以有多个消费者
 发布者和订阅者之间有时间上的依赖性 。针对某个主题(Topic)的订阅者,它必须创建一个订阅之后,才能消费发布者的消息,而且,为了消费消息,订阅者 必须保持运行的状态。
当然,为了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消 息。
如果你希望发送的消息可以不被做任何处理、或者被一个消费者处理、或者可以被多个消费者处理 的话,那么可以采用Pub/Sub模型(也就是说发布者不关心有多少侦听者)。
关于时间 的依赖性
          二种模型的实现结果:
         对于p2p 模型的每个消息只能有一个消费者  如果我们定义二个消息接受者的Bean那么只能有一端会接收到消息。当你把部署在Jboss中的消息接收Bean去掉以后,然后发送消息 此时消息在队列中,一旦你重新部署他会立刻就接收到刚刚发送的消息 所以它没有时间的依赖性,
           pub/sub 模型可以有多个消费者 在这个模型中如果我们定义多个接收消息的Bean当我们在客户端发送消息的时候二个bean都会接收到消息,所以他有多个消费者 但是如果你把Jboss部署中的消息接收bean去掉之后,发送消息。然后在重新部署,那么消息也无法接收到 ,所以说他有时间的依赖性
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    ActiveMQ消息服务器 v6.0.1.zip

    1. 微服务通信:在微服务架构中,ActiveMQ作为服务间通信的桥梁,实现异步解耦和数据同步。 2. 流程工作流:在复杂的业务流程中,ActiveMQ可以作为任务调度工具,管理不同阶段的任务分配和执行。 3. 应用监控:通过...

    MQ和openfire 即时通讯【升级版】

    即时通讯在IT行业中扮演着至关重要的角色,尤其是在企业级应用和服务中。MQ(Message Queue)和Openfire是两种常用于构建即时通讯系统的组件。本文将深入探讨MQ与Openfire的结合,以及如何实现一个增强版的即时通讯...

    ajax,java即时通讯web qq

    【Ajax与Java即时通讯在Web QQ中的应用】 Ajax(Asynchronous JavaScript and XML)是一种在无需刷新整个网页的情况下,能够更新部分网页的技术。它通过在后台与服务器进行少量数据交换,使得网页实现异步更新,...

    activemq监控服务器状态,应用异常并发送邮件

    activemq监控服务器状态,应用异常并发送邮件详细Linux配置过程。 主要支持功能: 1、服务器CPU异常预警 2、服务器硬盘不足预警 3、tomcat进程自动关闭后自动启动及预警 4、数据库异常预警等等。

    ActiveMQ消息服务器 v5.17.6.zip

    Apache ActiveMQ是业界广泛使用的开源消息代理,它遵循Java Message Service(JMS)规范,提供了可靠的消息传递服务。ActiveMQ v5.17.6是该服务器的一个稳定版本,包含了丰富的功能和优化,是开发和部署企业级消息...

    ActiveMQ消息服务器 v5.18.3.zip

    ActiveMQ提供了高度可靠的消息传递服务,支持多种消息协议,如JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)、STOMP(Simple Text Oriented Messaging Protocol)等,能够帮助开发者构建...

    ActiveMQ消息服务配置

    ### ActiveMQ消息服务配置详解 #### 一、ActiveMQ配置概览 ActiveMQ是一款非常流行的开源消息中间件,它基于Java开发,支持多种消息传递模式,如点对点(P2P)、发布/订阅(Pub/Sub)等。本文将详细介绍ActiveMQ的配置...

    activemq5.3.1整合应用服务器详解

    3. **资源管理**:注意监控数据目录的空间使用情况,避免因磁盘空间不足导致服务不可用。 4. **性能调优**:根据实际应用场景调整ActiveMQ的各项配置,例如消息持久化策略、连接池大小等,以达到最佳性能。 #### 五...

    Linux下activeMQ的启动和停止.docx

    本文将详细介绍如何在Linux上启动和停止ActiveMQ服务。 首先,为了启动或停止ActiveMQ,你需要确保已经正确安装了Apache ActiveMQ,并且它的二进制目录位于`/opt/Founder/install/mq/apache-activemq-5.7/bin`。这...

    Spring整合Blazeds实现ActiveMQ JMS消息服务

    标题中的“Spring整合Blazeds实现ActiveMQ JMS消息服务”指的是在Java应用程序中使用Spring框架与Blazeds(一个Flex和Java之间的消息传递中间件)集成,通过ActiveMQ(一个流行的开源JMS提供商)来实现消息队列服务...

    即时通讯源码

    即时通讯(Instant Messaging,简称IM)源码是用于构建实时通信系统的软件代码,通常涉及到文本、语音、视频等多种通信方式。在本场景中,我们关注的是基于Java编程语言的即时通讯解决方案。Java作为一种多平台、...

    activeMQ-activeMQ

    ActiveMQ还支持多种协议,如AMQP(Advanced Message Queuing Protocol),STOMP(Simple Text Oriented Message Protocol)适合Web应用程序,XMPP(Extensible Messaging and Presence Protocol)常用于即时通讯。...

    activeMQ收发工具.rar

    ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...

    activemq+mqtt+android通信

    但总体来说,实现“activemq+mqtt+android通信”涉及的知识点包括:ActiveMQ服务器配置、MQTT协议原理、Android MQTT客户端库的使用、Android服务的创建与管理、网络连接的建立与维护等。在实际开发中,还需考虑性能...

    ActiveMQ入门

    3. **启动**:运行 ActiveMQ 的启动脚本,确保服务正常启动。 4. **测试**:使用测试工具验证 ActiveMQ 是否能正常接收和发送消息。 #### 五、ActiveMQ 的使用场景 **场景示例**: 1. **订单处理**:电商平台接收...

    activemq

    它为 C++ 开发者提供了访问 ActiveMQ 的接口,从而使开发者能够在 C++ 应用程序中轻松地集成消息队列服务。 - **Winkeemq-cpp**:这是一个基于 ActiveMQ-CPP 封装的高级 API 库,它简化了许多常见的初始化和清理工作...

    基于ActiveMQ的jms通讯

    在本实例中,我们使用的ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,提供了可靠的消息传递服务。ActiveMQ支持多种协议,如TCP/IP,且能与其他Java应用服务器集成。 在开发环境中,我们...

    WebSocket协议接收ActiveMQ

    2. 客户端连接:客户端通过WebSocket API建立到ActiveMQ的连接,指定目标URL通常是ws://或者wss://(如果是加密连接)加上ActiveMQ服务器的地址和WebSocket端口。 3. 订阅主题或队列:连接建立后,客户端可以订阅想...

    JMS事例,ActiveMQ服务器

    ActiveMQ是Apache软件基金会开发的一个开源JMS消息代理,它实现了多种消息协议,如OpenWire、STOMP、AMQP和MQTT,为开发者提供了高度可扩展且可靠的跨语言消息传递服务。 在Java开发中,JMS和ActiveMQ的结合使用能...

    ActiveMQ的activemq.xml详细配置讲解

    ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的行为、网络连接、存储策略以及消息路由...

Global site tag (gtag.js) - Google Analytics