`
keezzm
  • 浏览: 14474 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

ActiveMQ——消息队列基础篇

阅读更多

简介

ActiveMQ Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特性

  1.  多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2.  完全支持JMS1.1J2EE 1.4规范 (持久化,XA消息,事务)
  3. Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器( Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBCjournal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9.  支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

JMS简介

     JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

JMS服务提供者

     JMS服务提供者实现消息队列和通知,同时实现消息管理的APIJMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

消息管理对象

     消息管理对象提供对消息进行操作的APIJMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactoryTopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

消息的生产者消费者

     消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。

JMS消息

     息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。

消息标头

消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其

中一些字段的值,其它值则由JMS提供程序填写。

JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表

示消息、设置优先权和失效时间等等,并且为消息确定路由。

 

JMSDestination Send方法设置。指定消息的目的地,由JMS提供程序填写

JMSDeliveryMode Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。

JMSMessageID Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写

JMSTimeStamp Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写

JMSCorrelationID 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID

JMSReplyTo 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段

JMSRedelivered JMS提供程序设置。指出该消息先前被发送过

JMSType 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求

JMSExpiration Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息

JMSPriority Send 方法设置。包含客户端在发送消息时所设置有限级值

消息属性

消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属

性,以便进行消息的选择

一般通过setXXXProperty方法来定义消息属性,XXX取值为:BooleanByte

DoubleFloatIntLongObjectShortString

每一属性均由字符串名字和相关的值组成 ,例如:

TextMessage msg = tsession.createTextMessage();

msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

String customer = msg.getStringProperty(“CUSTOMER_NAME”);

其中的”CUSTOMER_NAME””MyCustomer”就是消息当中对应的keyvalue

消息主体

消息主体包含了消息的核心数据。

JMS 定义了5中消息类型: TextMessageMapMessageBytesMessage

StreamMessageObjectMessage

选择最合适的消息类型可以使JMS最有效 的处理消息。

 

  • TextMessage(文本消息)

将数据作为简单字符串存放在主体中(XML就可以作为字符串发)

TextMessage msg = session.createTextMessage();

msg.setText(text);

有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响

移植性。

只自己定义了两个方法setText(String s)getText()

 

  • MapMessage(映射消息)

使用一张映射表来存放其主体内容(参照Jms API

MapMessage msg = session.createMapMessage();

msg.setString(“CUSTOMER_NAME”,”John”);

msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);

int age = msg.getInt(“CUSTOMER_AGE”);

 

  • BytesMessage(字节消息)

将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有

消息格式保持一致等(参照Jms API

byte[] data;

BytesMessage msg = session.createBytesMessage();

msg.wirte(data);

byte[] msgData = new byte[256];

int bytesRead = msg.readBytes(msgData);

 

  •  StreamMessage(流消息)

用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种

消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API

StringMessage msg = session.createStreamMessage();

msg.writeString(“John”);

msg.writeInt(12);

String s = msg.readString();

Int age = msg.readInt();

PS:个人认为有点像socket的信息收发)

 

  •  ObjectMessage(对象消息)

用于往消息中写入可序列化的对象。

消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个

集合写入消息。

客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写

message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写

自己只单独定义了两个方法:getObject()setObject(Serializable s)

ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessagebody无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)

Message只读时被set抛出MessageNotWriteableException;

setget时,如果对象序列化失败抛出MessageFormatException

消息的通信方式(点对点通信和发布/订阅方式)

点对点方式(point-to-point

点对点的消息发送方式主要建立在 Message QueueSenderReceiver上,

Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue , Receiver ClientQueue中接收消息和"发送消息已接受"Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。

发布/订阅方式(publish/subscriber Messaging

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户

端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destinationreceive方法,和实现message listener 接口的onMessage 方法。

编程模式

消息产生者向JMS发送消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection建立会话Session
  4. 使用会话Session和管理对象Destination创建消息生产者MessageSender
  5. 使用消息生产者MessageSender发送消息

消息消费者从JMS接受消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection 建立会话Session
  4. 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
  5. 使用消息消费者MessageReceiver接受消息,需要用setMessageListenerMessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

ActiveMQ运行

ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于

监控ActiveMQadmin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。

打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有

消息,如下截图



  

简单的例子

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Test {
	public static void main(String[] args) throws JMSException {
		new Receiver().beginReceiveMsg();
		new Sender().send();
	}
}

class Sender {

	static String subject = "TEST";
	static String user = ActiveMQConnection.DEFAULT_USER;
	static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	// failover://tcp://localhost:61616
	static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	boolean transacted = true;
	boolean persistent = true;
	int ackMode = Session.AUTO_ACKNOWLEDGE;

	Session session;
	Destination destination;
	MessageProducer producer;

	public Sender() throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(transacted, ackMode);
		destination = session.createQueue(subject);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	}

	public void send() throws JMSException {
		MapMessage message = session.createMapMessage();
		message.setString("test", "test");
		message.setStringProperty("RECEIVER_CHAR", " TEST ");
		producer.send(message);
		session.commit();
	}
}

class Receiver implements MessageListener {

	static String subject = "TEST";
	static String user = ActiveMQConnection.DEFAULT_USER;
	static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	// failover://tcp://localhost:61616
	static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	boolean transacted = true;
	int ackMode = Session.AUTO_ACKNOWLEDGE;

	Session session;
	Destination destination;
	MessageConsumer consumer;

	public Receiver() throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(transacted, ackMode);
		destination = session.createQueue(subject);
		consumer = session.createConsumer(destination, "RECEIVER_CHAR = TEST");
	}

	public void beginReceiveMsg() throws JMSException {
		consumer.setMessageListener(this);
	}

	@Override
	public void onMessage(Message message) {
		if (message instanceof MapMessage) {
			MapMessage msg = (MapMessage) message;
			try {
				System.out.println(msg.getString("test"));
			} catch (JMSException e) {
				try {
					session.rollback();
				} catch (JMSException e1) {
					e1.printStackTrace();
				}
				e.printStackTrace();
			} finally {
				if (session != null) {
					try {
						session.close();
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		}
	}
}

其他产品

1、开源JMS供应商

  • jbossmq(jboss 4)
  • jboss messaging (jboss 5)
  • joram-4.3.21 2006-09-22
  • openjms-0.7.7-alpha-3.zip December 26, 2005
  • mantamq
  • ubermq
  • SomnifugiJMS 2005-7-27

2、商业JMS供应商

  • IBM WebSphere MQ
  • BEA WebLogic JMS
  • Oracle AQ
  • NonStop Server for Java Message Service(JMS)
  • Sun Java System Message Queue
  • Sonic jms
  • TIBCO Enterprise For JMS
  • iLinkMQ (国内)
  • TongLink/Q(北京东方通科技)

 

 

  • 大小: 33.4 KB
分享到:
评论
2 楼 asuschb 2011-06-30  
对于初学者学习MQ还是有帮助的
1 楼 zhendell 2011-06-29  
不符合直观思维

相关推荐

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ...

    JMS之Spring +activeMQ实现消息队列

    在本篇讨论中,我们将关注如何利用Spring框架和ActiveMQ来实现JMS消息队列。 首先,我们需要理解Spring框架中的JMS支持。Spring提供了一个强大的抽象层,简化了与各种MQ提供商(如ActiveMQ、RabbitMQ等)的集成。它...

    消息队列:ActiveMQ:消息队列的生产者与消费者模型.docx

    消息队列:ActiveMQ:消息队列的生产者与消费者模型.docx

    7道消息队列ActiveMQ面试题!

    ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS...了解和掌握这些知识点,有助于面试者在面试中展示对ActiveMQ的深入理解和实际应用能力,同时也是确保在日常开发工作中正确、高效使用消息队列的重要基础。

    SpringBoot快速玩转ActiveMQ消息队列

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。

    ActiveMQ消息队列主题订阅Spring整合

    ActiveMQ中的生产者(Producer)发送消息到队列或主题(Topic),而消费者(Consumer)则从这些队列或主题中接收消息。队列遵循FIFO(先进先出)原则,每个消息只能被一个消费者接收;主题则支持多播,多个订阅者...

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

    activemq配置组合队列(复制)、负载均衡

    在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它负责在不同服务之间异步传输数据,从而提高系统的响应速度和可扩展性。ActiveMQ是Apache出品的一款开源、高性能、支持多种协议的消息中间件...

    Spring boot+ActiveMQ整合消息队列实现发布订阅、生产者消费者模型(适合开发人员了解学习ActiveMQ机制)

    本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下: 1.开启activeMQ,访问...

    spring 整合activemq实现自定义动态消息队列

    百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...

    MSMQ、RabbitMQ、ActiveMQ消息队列调试工具

    可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。

    ActiveMQ 消息队列

    ### ActiveMQ 消息队列概述 #### 一、ActiveMQ简介 ActiveMQ是由Apache基金会提供的一个开源消息中间件,其作为业界最成熟且功能强大的消息总线之一,在分布式系统和微服务架构中发挥着核心作用。ActiveMQ遵循Java...

    Spring+ActiveMQ消息队列+前台接收消息

    在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升系统的可扩展性和响应速度,降低不同组件之间的耦合度。 首先,Spring框架是Java企业级应用开发的事实标准,它...

    activeMq消息队列demo

    在这个"activeMq消息队列demo"中,我们可以学习到如何在实际开发环境中运用ActiveMQ。 1. **ActiveMQ基本概念** - **消息队列**:是一种先进先出(FIFO)的数据结构,消息按照发送顺序被存储和消费。 - **生产者*...

    消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    **消息队列基础** 在分布式系统中,消息队列(Message Queue, MQ)扮演着重要的角色,它是一种异步通信机制,能够有效地解耦应用程序,提高系统的可扩展性和容错性。消息队列允许生产者将消息发送到队列,然后由...

    ActiveMQ RabbitMQ RokcetMQ 消息队列中间件视频教程

    作为消息中间件的MQ在java开发中起着举足轻重的地位,无论是ActiveMQ、RabbitMQ、还是RokcetMQ至少要会一个,否则别说自己是java程序员。Java自学网整理了目前行业最常用的消息中间件视频供大家学习。

    ActiveMQ消息过期时间设置和自动清除解决方案

    本文档详细介绍了在Apache ActiveMQ 5.15.3版本中如何进行消息过期时间的设置,以及如何配置自动清除机制,特别是针对死信队列的处理方式。 #### 1. 消息过期设置 ##### 参数详解 - **Message 过期则客户端不能...

    PHP过滤(selector)接收ActiveMQ的指定队列或者主题消息

    本文将深入探讨如何使用PHP的过滤器(selector)来接收ActiveMQ中的特定队列或主题消息。 首先,我们需要理解ActiveMQ中的队列和主题概念。队列(Queue)是点对点通信模式,每个消息只能被一个消费者接收并处理,而...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...

Global site tag (gtag.js) - Google Analytics