`

activemq-queue

阅读更多
mq意为:消息队列。其中active mq 是apache的一个顶级开源项目,是jms的一个标准实现。

为什么要用mq?
1、通信,mq最本质的需求就是机器间的通信,即消息传递。
2、系统解耦,异步通信。mq消息的发送,不需要等待对方的返回,吞吐量高。
3、生产者和消费者的消息通信可靠性保证。一旦消息到了mq,消息的接收有保障。

mq消息传递的两种模式:
1、PTP:Point to Point,即点对点的消息模型;
    一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。

2、Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;
    一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。
其中:Pub/Sub又有Nondurable subscription(非持久订阅)和durable subscription (持久化订阅)2种消息处理方式。

非持久订阅:只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。
持久订阅时:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider 时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。


典型术语:
JMS Provider:实现JMS 接口的消息中间件;
Queue:队列目标;
Topic:主题目标;
  主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(s ubscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要 接触即可保证消息的传送。

ConnectionFactory:连接工厂,JMS 用它创建连接;
Connection:JMS 客户端到JMS Provider 的连接;
Destination:消息的目的地;
Session:会话,一个发送或接收消息的线程; 客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。
MessageProducer:由Session 对象创建的用来发送消息的对象;
MessageConsumer:由Session 对象创建的用来接收消息的对象; 客户端用MessageConsumer 接收队列中的消息,如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。

Acknowledge:签收;
Transaction:事务。

执行流程:
 按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
2. 利用factory构造JMS connection
3. 启动connection
4. 通过connection创建JMS session.
5. 指定JMS destination.
6. 创建JMS producer或者创建JMS message并提供destination.
7. 创建JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

最简单的案例:
Producer code:
public class Producer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("Sending message #" + i);
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}


consumer:
public class Consumer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final long TIMEOUT = 20000;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
        //MQ 连接工厂对象
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;

        try {

        	//建连接
            connection = connectionFactory.createConnection();
            connection.start();

            //创建会话
            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            //获取消息的目的地
            Destination destination = session.createQueue("test-queue");
            
            //生成消息消费者
            MessageConsumer consumer = session.createConsumer(destination);

            int i = 0;
            while (true) {
            	//从目的地获取消息
                Message message = consumer.receive(TIMEOUT);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Got " + i++ + ". message: " + text);
                    }
                } else {
                    break;
                }
            }

            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}


broker:
/**
 * 嵌入式AMQ broker 启动服务
 * @author xinchun.wang
 *
 */
public class MQBroker {
	public static void main(String[] args) throws Exception {
		BrokerService brokerService = new BrokerService();
		brokerService.setBrokerName("tpBroker");
		brokerService.setPersistent(false);
		brokerService.addConnector("tcp://localhost:61616");
		brokerService.start();
		System.out.println("brokerService start ok ~");
	}

}




activemq 学习资源准备

官方网址:
http://activemq.apache.org/

源码svn:
https://svn.apache.org/repos/asf/activemq/trunk/

下载activemq
http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip

运行MQ:

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat 即启动了mq的 broker。

另外AMQ 还可以通过代码的方式嵌入式启动:
/**
 * 嵌入式AMQ broker 启动服务
 * @author xinchun.wang
 *
 */
public class MQBroker {
	public static void main(String[] args) throws Exception {
		BrokerService brokerService = new BrokerService();
		brokerService.setBrokerName("tpBroker");
		brokerService.setPersistent(false);
		brokerService.addConnector("tcp://localhost:61616");
		brokerService.start();
		System.out.println("brokerService start ok ~");
	}

}


嵌入式启动AMQ,pom 引入active-all.jar包
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.10.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>jcl-over-slf4j</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-api</artifactId>
			</exclusion>

			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>  



client 发送和接受消息需要引入active-client包
pom xml 为:
         <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-jms_1.1_spec</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.10.0</version>
        </dependency>
0
0
分享到:
评论

相关推荐

    activemq-cpp-library-3.9.5-src.zip

    《ActiveMQ-CPP库3.9.5源代码解析与应用》 ActiveMQ-CPP库是Apache ActiveMQ项目的一部分,它提供了一套C++接口,用于与ActiveMQ消息代理进行通信。这个库允许开发者在C++应用程序中实现高级消息队列协议(AMQP)和...

    apache-activemq-5.9.0-bin

    1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...

    activemq-web-console-5.11.2

    1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...

    apache-activemq-5.15.6

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 2. **消息模型**:ActiveMQ支持多种消息模型,包括持久化消息、非持久化消息、事务消息等。持久化消息即使在服务器重启后也能保持,而非...

    apache-activemq-5.16.6-bin.zip

    1. **消息队列(Message Queue)**: 消息队列是ActiveMQ的核心组件,它存储并转发消息,确保消息的可靠传输。消息生产者发送消息到队列,而消费者从队列中接收消息,两者之间无需直接交互。 2. **主题(Topic)与...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...

    apache-activemq-5.12.0-bin

    4. **主题与队列**:ActiveMQ支持两种消息模式:主题(Topic)和队列(Queue)。主题用于广播式通信,一条消息可以被多个消费者接收;队列则遵循先入先出(FIFO)原则,消息只能被一个消费者接收。 5. **管理界面**...

    apache-activemq-5.9.0 下载

    3. **JMS支持**:ActiveMQ完全符合JMS 1.1规范,支持Message Queue、Topic、Temporary Queue和Temporary Topic四种类型。用户可以通过JMS API创建连接、会话、生产者、消费者并发送/接收消息。 4. **协议支持**:...

    apache-activemq-5.15.11-bin.tar.gz

    - **队列与主题**:在ActiveMQ中,有两种主要的消息传递模型——队列(Queue)和主题(Topic)。队列采用点对点模型,每个消息只被一个消费者接收;主题采用发布/订阅模型,可以有多个订阅者接收同一消息。 2. **...

    ActiveMQ-Queue点对点消息-Receive+Listener方式

    ActiveMQ-Queues点对点消息-Receive+Listener方式:参考博文:http://blog.csdn.net/ABAP_Brave/article/details/53443725

    boot-example-activemq-queue-2.0.5

    使用SpringBoot方式集成ActiveMQ ActiveMQ消息中间件的点对点模式point to point 消息队列 * 消息消费者从queue中取出并且消费消息 * 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的...

    activeMQ-cpp 测试文件

    cms::Destination* destination = session-&gt;createQueue("TestQueue"); // 创建生产者 cms::MessageProducer* producer = session-&gt;createProducer(destination); // 创建消息 cms::TextMessage* message = ...

    apache-activemq-5.15.3-bin.tar.gz

    消息队列(Message Queue)** 消息队列是ActiveMQ的核心概念,它存储和转发消息。消息在发送方和接收方之间通过队列传输,确保即使发送方和接收方不在同一时间在线,消息也能被正确地保存和传递。 **4. 主要特性**...

    apache-activemq-5.15.0-bin.tar.7z

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...1、点对点(queue) 2、一对多(topic)

    apache-activemq-5.15.7-bin

    1. **消息队列(Message Queue)**:ActiveMQ作为消息中间件,主要工作是处理和传递消息。它将生产者发送的消息存储在队列中,然后由消费者按需消费,实现应用间的解耦。 2. **主题(Topic)与队列(Queue)**:...

    apache-activemq-5.3.1-bin.tar.gz

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...

    apache-activemq-5.15.7-bin.tar.gz

    标签"apache activemq mq 消息队列"进一步强调了ActiveMQ作为消息队列(Message Queue, MQ)的角色。消息队列是处理大量并发请求和实现负载均衡的关键技术。在分布式系统中,它能缓存消息,确保即使在发送方和接收方...

    apache-activemq-5.5.1-bin.zip加上入门demo

    解压缩apache-activemq-5.5.1-bin.zip,然后双击...包含了apache-activemq-5.5.1-bin.zip以及ActiveMQ一个helloworld的demo启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

    activemq-client-5.8.0.jar

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它遵循开放消息中间件协议(Open Message Middleware,OMM)标准,如JMS(Java Message Service)。在本案例中,我们关注的是`activemq-client-5.8.0.jar`,这是...

    apache-activemq-5.9.1-bin.tar.gz

    此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...

Global site tag (gtag.js) - Google Analytics