简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特性
-
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
-
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
-
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
-
通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
-
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
-
支持通过JDBC和journal提供高速的消息持久化
-
从设计上保证了高性能的集群,客户端-服务器,点对点
-
支持Ajax
-
支持与Axis的整合
-
可以很容易得调用内嵌JMS provider,进行测试
JMS简介
JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。
JMS服务提供者
JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。
消息管理对象
消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。
消息的生产者消费者
消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。
JMS消息
息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。
消息标头
消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其
中一些字段的值,其它值则由JMS提供程序填写。
JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表
示消息、设置优先权和失效时间等等,并且为消息确定路由。
JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写
JMSDeliveryMode: 由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。
JMSMessageID: 由Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写
JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写
JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID
JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段
JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过
JMSType: 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求
JMSExpiration: Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息
JMSPriority: Send 方法设置。包含客户端在发送消息时所设置有限级值
消息属性
消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属
性,以便进行消息的选择 。
一般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、
Double、Float、Int、Long、Object、Short及String。
每一属性均由字符串名字和相关的值组成 ,例如:
TextMessage msg = tsession.createTextMessage();
msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);
String customer = msg.getStringProperty(“CUSTOMER_NAME”);
其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。
消息主体
消息主体包含了消息的核心数据。
JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、
StreamMessage和ObjectMessage
选择最合适的消息类型可以使JMS最有效 的处理消息。
将数据作为简单字符串存放在主体中(XML就可以作为字符串发)
TextMessage msg = session.createTextMessage();
msg.setText(text);
有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响
移植性。
只自己定义了两个方法setText(String s)、getText()
使用一张映射表来存放其主体内容(参照Jms API)
MapMessage msg = session.createMapMessage();
msg.setString(“CUSTOMER_NAME”,”John”);
msg.setInt(“CUSTOMER_AGE”,12);
String s = msg.getString(“CUSTOMER_NAME”);
int age = msg.getInt(“CUSTOMER_AGE”);
将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有
消息格式保持一致等(参照Jms API)
byte[] data;
BytesMessage msg = session.createBytesMessage();
msg.wirte(data);
byte[] msgData = new byte[256];
int bytesRead = msg.readBytes(msgData);
用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种
消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API)
StringMessage msg = session.createStreamMessage();
msg.writeString(“John”);
msg.writeInt(12);
String s = msg.readString();
Int age = msg.readInt();
(PS:个人认为有点像socket的信息收发)
用于往消息中写入可序列化的对象。
消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个
集合写入消息。
客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写
message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写
自己只单独定义了两个方法:getObject()和setObject(Serializable s)
ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)
Message只读时被set抛出MessageNotWriteableException;
set和get时,如果对象序列化失败抛出MessageFormatException
消息的通信方式(点对点通信和发布/订阅方式)
点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,
Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和"发送消息已接受"到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
发布/订阅方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户
端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
编程模式
消息产生者向JMS发送消息的步骤
-
创建连接使用的工厂类JMS ConnectionFactory
-
使用管理对象JMS ConnectionFactory建立连接Connection
-
使用连接Connection建立会话Session
-
使用会话Session和管理对象Destination创建消息生产者MessageSender
-
使用消息生产者MessageSender发送消息
消息消费者从JMS接受消息的步骤
-
创建连接使用的工厂类JMS ConnectionFactory
-
使用管理对象JMS ConnectionFactory建立连接Connection
-
使用连接Connection 建立会话Session
-
使用会话Session和管理对象Destination创建消息消费者MessageReceiver
-
使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。
ActiveMQ运行
ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于
监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。
打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有
消息,如下截图
简单的例子
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Test {
public static void main(String[] args) throws JMSException {
new Receiver().beginReceiveMsg();
new Sender().send();
}
}
class Sender {
static String subject = "TEST";
static String user = ActiveMQConnection.DEFAULT_USER;
static String password = ActiveMQConnection.DEFAULT_PASSWORD;
// failover://tcp://localhost:61616
static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
boolean transacted = true;
boolean persistent = true;
int ackMode = Session.AUTO_ACKNOWLEDGE;
Session session;
Destination destination;
MessageProducer producer;
public Sender() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, ackMode);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
public void send() throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("test", "test");
message.setStringProperty("RECEIVER_CHAR", " TEST ");
producer.send(message);
session.commit();
}
}
class Receiver implements MessageListener {
static String subject = "TEST";
static String user = ActiveMQConnection.DEFAULT_USER;
static String password = ActiveMQConnection.DEFAULT_PASSWORD;
// failover://tcp://localhost:61616
static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
boolean transacted = true;
int ackMode = Session.AUTO_ACKNOWLEDGE;
Session session;
Destination destination;
MessageConsumer consumer;
public Receiver() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, ackMode);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination, "RECEIVER_CHAR = TEST");
}
public void beginReceiveMsg() throws JMSException {
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
MapMessage msg = (MapMessage) message;
try {
System.out.println(msg.getString("test"));
} catch (JMSException e) {
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
e.printStackTrace();
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}
其他产品
1、开源JMS供应商
2、商业JMS供应商
- 大小: 33.4 KB
分享到:
相关推荐
ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ...
在本篇讨论中,我们将关注如何利用Spring框架和ActiveMQ来实现JMS消息队列。 首先,我们需要理解Spring框架中的JMS支持。Spring提供了一个强大的抽象层,简化了与各种MQ提供商(如ActiveMQ、RabbitMQ等)的集成。它...
消息队列:ActiveMQ:消息队列的生产者与消费者模型.docx
ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS...了解和掌握这些知识点,有助于面试者在面试中展示对ActiveMQ的深入理解和实际应用能力,同时也是确保在日常开发工作中正确、高效使用消息队列的重要基础。
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。
ActiveMQ中的生产者(Producer)发送消息到队列或主题(Topic),而消费者(Consumer)则从这些队列或主题中接收消息。队列遵循FIFO(先进先出)原则,每个消息只能被一个消费者接收;主题则支持多播,多个订阅者...
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它负责在不同服务之间异步传输数据,从而提高系统的响应速度和可扩展性。ActiveMQ是Apache出品的一款开源、高性能、支持多种协议的消息中间件...
本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问...
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
### ActiveMQ 消息队列概述 #### 一、ActiveMQ简介 ActiveMQ是由Apache基金会提供的一个开源消息中间件,其作为业界最成熟且功能强大的消息总线之一,在分布式系统和微服务架构中发挥着核心作用。ActiveMQ遵循Java...
在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升系统的可扩展性和响应速度,降低不同组件之间的耦合度。 首先,Spring框架是Java企业级应用开发的事实标准,它...
在这个"activeMq消息队列demo"中,我们可以学习到如何在实际开发环境中运用ActiveMQ。 1. **ActiveMQ基本概念** - **消息队列**:是一种先进先出(FIFO)的数据结构,消息按照发送顺序被存储和消费。 - **生产者*...
**消息队列基础** 在分布式系统中,消息队列(Message Queue, MQ)扮演着重要的角色,它是一种异步通信机制,能够有效地解耦应用程序,提高系统的可扩展性和容错性。消息队列允许生产者将消息发送到队列,然后由...
作为消息中间件的MQ在java开发中起着举足轻重的地位,无论是ActiveMQ、RabbitMQ、还是RokcetMQ至少要会一个,否则别说自己是java程序员。Java自学网整理了目前行业最常用的消息中间件视频供大家学习。
本文档详细介绍了在Apache ActiveMQ 5.15.3版本中如何进行消息过期时间的设置,以及如何配置自动清除机制,特别是针对死信队列的处理方式。 #### 1. 消息过期设置 ##### 参数详解 - **Message 过期则客户端不能...
本文将深入探讨如何使用PHP的过滤器(selector)来接收ActiveMQ中的特定队列或主题消息。 首先,我们需要理解ActiveMQ中的队列和主题概念。队列(Queue)是点对点通信模式,每个消息只能被一个消费者接收并处理,而...
本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...