使用场景:
1.通过JMS facade更新数据库,完全解耦了客户端和服务器端处理过程。
2.与遗留老式系统接口
3.应付大访问量交易系统
4.协同多系统之间处理效率
5、数据同步等
ActiveMQ 是Apache出品,是目前最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
特性及优势
1 、实现 JMS1.1 规范,支持 J2EE1.4 以上
2 、可运行于任何 jvm 和大部分 web 容器( ActiveMQ works great in any JVM )
3 、支持多种语言客户端( java, C, C++, AJAX, ACTIONSCRIPT 等等)
4 、支持多种协议( stomp , openwire , REST )
5 、良好的 spring 支持( ActiveMQ has great Spring Support )
6 、速度很快, JBossMQ 的十倍( ActiveMQ is very fast; often 10x faster than JBossMQ. )
7 、与 OpenJMS 、 JbossMQ 等开源 jms provider 相比, ActiveMQ 有 Apache 的支持,持续发展的优势明显。
2、ActiveMQ中的两种消息处理模式
Queue 与 Topic 的比较
1 、 JMS Queue 执行 load balancer 语义:
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
2 、 Topic 实现 publish 和 subscribe 语义:
一条消息被 publish 时,它将发到所有感兴趣的订阅者,所以零到多个 subscriber 将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的 subscriber 能够获得消息的一个拷贝。
3 、分别对应两种消息模式:
Point-to-Point ( 点对点 ),Publisher/Subscriber Model ( 发布 / 订阅者 )
其中在 Publicher/Subscriber 模式下又有 Nondurable subscription (非持久订阅)和 durable subscription ( 持久化订阅 )2 种消息处理方式。
接下来开始MQ的实例
1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。
3、配置conf\activemq.xml web管理界面启动IP和端口
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
</transportConnectors>
4、配置conf\jetty.xml 通讯端口
<property name="connectors">
<list>
<bean id="Connector"
class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8161" />
</bean>
</list>
</property>
注:一般保持默认 不修改8161端口
5、在eclipse中新建项目,将apache-activemq-5.5.1\lib中所需要的jar包复制到项目中.
5.1 Class Sender
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
public static void main(String[] args) throws JMSException{
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//连接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事务性会话,自动确认消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的目的地
Destination destination = session.createQueue("queue.hello");
//消息生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
//发送消息
sendText(session, producer);
session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
producer.close();
session.close();
conn.close();
}
//对象消息
public static void sendObject(Session session,MessageProducer producer) throws JMSException{
User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(user);
producer.send(objectMessage);
}
//字节消息
public static void sendBytes(Session session,MessageProducer producer) throws JMSException{
String s = "BytesMessage字节消息";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(s.getBytes());
producer.send(bytesMessage);
}
//流消息
public static void setStream(Session session,MessageProducer producer)throws JMSException{
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("streamMessage流消息");
streamMessage.writeLong(55);
producer.send(streamMessage);
}
//键值对消息
public static void sendMap(Session session,MessageProducer producer)throws JMSException{
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("age", new Long(32));
mapMessage.setDouble("sarray", new Double(5867.15));
mapMessage.setString("username", "键值对消息");
producer.send(mapMessage);
}
//文本消息
public static void sendText(Session session,MessageProducer producer)throws JMSException{
TextMessage textMessage = session.createTextMessage("文本消息");
producer.send(textMessage);
}
}
5.2 Class Receiver
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver implements MessageListener{
private boolean stop = false;
public void execute() throws Exception {
//连接工厂
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//连接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事务性会话,自动确认消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的来源地
Destination destination = session.createQueue("queue.hello");
//消息消费者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//等待接收消息
while(!stop){
Thread.sleep(5000);
}
session.commit();
consumer.close();
session.close();
conn.close();
}
public void onMessage(Message m) {
try{
if(m instanceof TextMessage){ //接收文本消息
TextMessage message = (TextMessage)m;
System.out.println(message.getText());
}else if(m instanceof MapMessage){ //接收键值对消息
MapMessage message = (MapMessage)m;
System.out.println(message.getLong("age"));
System.out.println(message.getDouble("sarray"));
System.out.println(message.getString("username"));
}else if(m instanceof StreamMessage){ //接收流消息
StreamMessage message = (StreamMessage)m;
System.out.println(message.readString());
System.out.println(message.readLong());
}else if(m instanceof BytesMessage){ //接收字节消息
byte[] b = new byte[1024];
int len = -1;
BytesMessage message = (BytesMessage)m;
while((len=message.readBytes(b))!=-1){
System.out.println(new String(b, 0, len));
}
}else if(m instanceof ObjectMessage){ //接收对象消息
ObjectMessage message = (ObjectMessage)m;
User user = (User)message.getObject();
System.out.println(user.getName() + " _ " + user.getInfo());
}else{
System.out.println(m);
}
stop = true;
}catch(JMSException e){
stop = true;
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
Receiver r = new Receiver();
r.execute();
}
}
5.3、实体对象User,必须实现Serializable
import java.io.Serializable;
@SuppressWarnings("serial")
public class User implements Serializable{
private String name;
private String info;
public User(String name,String info){
this.name = name;
this.info = info;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}
5.4、运行serder 与receiver......同事可以登录mq后台管理界面,看到相应的消息信息
http://ip地址:8161
分享到:
相关推荐
1. **PPTX文件(activemq.pptx)** - 这通常是一个演示文稿,详细介绍了JMS和ActiveMQ的基础知识、工作原理以及如何使用它们。它可能包含概念解释、架构图、配置示例和使用步骤等内容,帮助学习者理解ActiveMQ的核心...
ActiveMQ 是一个流行的开源JMS提供者,它支持多种协议,如AMQP、STOMP和OpenWire等。 在"Apache Camel JMS ActiveMQ"的使用样例中,我们有两个主要的场景: 1. **从本地读取信息推送到MQ中**:这一部分涉及到了...
**ActiveMQ** 是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,提供了多种协议的支持,如AMQP、STOMP、MQTT等。ActiveMQ因其高效、稳定和易用的特点,被广泛应用在企业级的消息传递系统中。 本入门...
这个压缩包"apache-activemq-5.8.0-bin.zip"包含了ActiveMQ 5.8.0版本的二进制发行版,供用户在本地计算机上安装和运行。 1. **Apache ActiveMQ简介** - Apache ActiveMQ是业界广泛使用的消息代理,提供可靠的消息...
这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...
配置文件位于`/opt/apache-activemq-5.x.x/conf`目录下,主要关注`activemq.xml`。这个文件定义了ActiveMQ的核心配置,包括消息存储、网络连接等。根据需求,你可以调整这些设置以优化性能或安全性。 3. **启动和...
在本文中,我们将深入探讨Apache ActiveMQ,特别是针对“apache-activemq-5.15.9-bin.zip”这个版本在Windows平台上作为MQTT服务器的使用。 首先,让我们理解MQTT(Message Queuing Telemetry Transport)。MQTT是...
这个"apache-activemq-5.15.8-bin.zip"文件包含了ActiveMQ的可执行版本,用于在本地计算机上安装和运行ActiveMQ服务。 首先,我们需要了解ActiveMQ的核心概念。它是一个消息代理,扮演着消息生产者与消费者之间的...
标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本都会包含错误修复、性能提升以及新功能的添加。 描述中提到"启动要求jdk版本8+", 这意味着在运行Apache ActiveMQ 5.16.5...
这个"apache-activemq-5.16.6-bin.zip"文件包含了ActiveMQ的最新稳定版本5.16.6的二进制发行版,主要用于在各种环境中部署和运行。 **Apache ActiveMQ核心概念** 1. **消息队列(Message Queue)**: 消息队列是...
这个`apache-activemq-5.13.2-bin.tar.gz`压缩包包含了ActiveMQ的可执行版本,适用于运行在Linux环境下的Java应用程序。版本号5.13.2意味着这是该软件的特定稳定版本,它可能包含了bug修复和性能优化。 ActiveMQ的...
这个"apache-activemq-5.14.3-bin.zip"压缩包包含了在Windows环境下部署和运行ActiveMQ所需的所有文件。让我们深入探讨一下这个版本的ActiveMQ及其在Java消息服务中的应用。 首先,Java消息服务(JMS)是一种标准...
ActiveMQ是JMS的一个具体实现,支持JMS的两种消息模型。ActiveMQ使用AMQP协议集成多平台应用,使用STOMP协议通过websockets在Web应用程序之间交换消息,使用MQTT协议管理物联网设备 ActiviMq消息队列,可解决服务解...
ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持...
这个压缩包“apache-activemq-5.15.3-bin.tar.gz”包含了Apache ActiveMQ 5.15.3版本的源代码和可执行文件,适合在Linux环境下部署和使用。 **1. Apache ActiveMQ简介** Apache ActiveMQ是Apache软件基金会的一个...
这个“apache-activemq-5.15.15二进制包,安装包”包含了运行和配置ActiveMQ所需的所有组件,方便用户在本地计算机或服务器上快速部署和使用。该版本5.15.15是Apache ActiveMQ的一个稳定版本,提供了许多增强的功能...
在描述中提到的`apache-activemq-5.9.0`是ActiveMQ的一个特定版本,发布于2014年,这个版本包括了对32位和64位系统的支持,确保了在不同硬件环境下的兼容性。ActiveMQ 5.9.0版本提供了许多增强功能和改进,包括性能...
在你提到的`apache-activemq-5.15.7-bin`压缩包中,包含的是ActiveMQ的可执行版本,无需安装,解压后即可使用,这为用户提供了便捷的部署方式。 **ActiveMQ核心概念与功能:** 1. **消息队列(Message Queue)**:...