Queue实现的是点到点模型,在下面的例子中,启动2个消费者共同监听一个Queue,然后循环给这个Queue中发送多个消息,我们依然采用ActiveMQ。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER
, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = factory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Queue queue = new ActiveMQQueue("queueTest");
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 1 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 2 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageProducer producer = session.createProducer(queue);
for (int i=0; i<10;) {
producer.send(session.createTextMessage("Message:" + ++i));
}
session.close();
connection.close();
}
}
运行这个例子会得到下面的输出结果:
Consumer 1 get Message:1
Consumer 2 get Message:2
Consumer 1 get Message:3
Consumer 2 get Message:4
Consumer 1 get Message:5
Consumer 2 get Message:6
Consumer 1 get Message:7
Consumer 2 get Message:8
Consumer 1 get Message:9
Consumer 2 get Message:10
可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费。
分享到:
相关推荐
MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战
测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: 消息主题。 Queue: 队列。抽象的概念,消息可以发送到...
4. **集合框架**:Java集合框架包括List、Set、Queue等接口和ArrayList、LinkedList、HashSet、HashMap等实现类,书中的案例会演示它们的使用方法和场景。 5. **IO流**:理解输入输出流的概念,学习File类、字节流...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...
随书源码“RabbitMQ实战-随书源码”和“RabbitMQ in action 和 RabbitMQ实战-高效部署分布式消息队列”的配套代码,提供了丰富的示例,帮助读者深入理解RabbitMQ的使用和实践。 1. **RabbitMQ基础** - **AMQP协议*...
5. 队列(Command Queue):队列用于提交任务到设备执行,可以是同步或异步的。 6. 编程语言:OpenCL内核是用C99语言扩展的OpenCL C编写,允许声明并行函数和数据结构。 7. 主机与设备交互:主机程序(通常用C或...
### 实战应用 在实际的嵌入式项目中,例如网络设备驱动,当数据包到达但接收缓冲区为空时,驱动会将接收请求放入队列,并阻塞等待数据。另一方面,当发送缓冲区满时,驱动会阻止新数据的发送,直到缓冲区有空间。 #...
RocketMQ 介绍与实战 RocketMQ 是阿里巴巴中间件团队自研的一款高性能、高吞吐量、低延迟、高可用、高可靠(具备金融级稳定性)的分布式消息中间件。RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁...
会话创建成功后,根据消息类型(队列Queue或主题Topic),创建相应的目的地(Destination)。然后创建消息生产者(MessageProducer),并设置其为非持久性模式(DeliveryMode.NON_PERSISTENT),意味着消息将不会被...
1. **FreeRTOS核心概念**:首先,我们需要理解FreeRTOS的核心组件,如任务(Task)、信号量(Semaphore)、互斥锁(Mutex)、队列(Queue)、事件标志组(Event Group)以及定时器(Timer)。这些是构建实时系统的...
- **队列(Queue)**:存储消息的地方,消息只能从交换器路由到队列,不能直接由生产者发送到队列。 - **绑定(Binding)**:连接交换器和队列的关系,定义了消息如何从交换器流向队列的规则。 - **生产者...
《RabbitMQ实战高效部署分布式消息队列》这本书籍详细阐述了如何在实际环境中高效地部署和使用RabbitMQ这一流行的消息中间件。RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的开源消息队列系统...
在点对点消息传递域中,目的地被称为队列(queue),消息只能被一个消费者消费;而在发布/订阅消息传递域中,目的地被称为主题(topic),消息可被多个消费者消费。 消息生产者是会话创建的对象,用于将消息发送到...
本知识点将深入探讨RocketMQ的基础概念、工作原理、实战应用以及源码解析。 一、RocketMQ基础 1. 概念:RocketMQ是一个基于发布/订阅模式的消息队列,支持事务消息、定时/延时消息、顺序消息等多种特性,广泛应用...
对于Java集合框架,本书将深入剖析ArrayList、LinkedList、HashMap等常用数据结构,以及Set和Queue接口的实现。这些内容是处理复杂数据结构和算法的关键。同时,读者还将学习到文件和I/O流的操作,这对于读写文件和...
9. 异步编程:深入探讨回调函数、Promise、async/await,以及Event Loop和Callback Queue的工作原理。 10. 模块化:学习CommonJS、AMD和ES6模块,以及如何在不同场景下选择合适的模块系统。 11. 错误处理:了解try...
4. **集合框架**:Java集合框架是处理数据的重要工具,包括List、Set、Queue、Map等接口及其实现类,如ArrayList、LinkedList、HashSet、HashMap等,以及泛型和迭代器的使用。 5. **IO流与NIO**:Java的IO流系统...
5. 适配器(Adapters):如stack、queue、priority_queue,它们将基本容器包装成特定的逻辑结构。 6. 原语(Algorithms):包括内存管理和分配器,如allocators,它们管理内存分配和释放。 书中通过丰富的习题案例...
### ActiveMQ 实战 #### JMS 基本构件概览 **ActiveMQ** 是一个高性能、功能丰富的开源消息中间件,它实现了 **Java Message Service (JMS)** 规范。JMS 规范定义了一组接口,这些接口提供了一个标准的方式来进行...