`

JMS-activeMQ

阅读更多
使用场景:
1.通过JMS facade更新数据库,完全解耦了客户端和服务器端处理过程。
2.与遗留老式系统接口
3.应付大访问量交易系统
4.协同多系统之间处理效率
5、数据同步等

ActiveMQ 是Apache出品,是目前最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

特性及优势
1 、实现 JMS1.1 规范,支持 J2EE1.4 以上
2 、可运行于任何 jvm 和大部分 web 容器( ActiveMQ works great in any JVM )
3 、支持多种语言客户端( java, C, C++, AJAX, ACTIONSCRIPT 等等)
4 、支持多种协议( stomp , openwire , REST )
5 、良好的 spring 支持( ActiveMQ has great Spring Support )
6 、速度很快, JBossMQ 的十倍( ActiveMQ is very fast; often 10x faster than JBossMQ. )
7 、与 OpenJMS 、 JbossMQ 等开源 jms provider 相比, ActiveMQ 有 Apache 的支持,持续发展的优势明显。



2、ActiveMQ中的两种消息处理模式
 Queue 与 Topic 的比较
1 、 JMS Queue 执行 load balancer 语义:
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
2 、 Topic 实现 publish 和 subscribe 语义:
一条消息被 publish 时,它将发到所有感兴趣的订阅者,所以零到多个 subscriber 将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的 subscriber 能够获得消息的一个拷贝。
3 、分别对应两种消息模式:
Point-to-Point ( 点对点 ),Publisher/Subscriber Model ( 发布 / 订阅者 )
其中在 Publicher/Subscriber 模式下又有 Nondurable subscription (非持久订阅)和 durable subscription ( 持久化订阅 )2 种消息处理方式。


接下来开始MQ的实例
1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

3、配置conf\activemq.xml  web管理界面启动IP和端口
  
 <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61616"/>
     </transportConnectors>

4、配置conf\jetty.xml  通讯端口
 
  <property name="connectors">
      <list>
        <bean id="Connector"  
               class="org.eclipse.jetty.server.nio.SelectChannelConnector">
            <property name="port" value="8161" />
         </bean>
       </list>
     </property>
     注:一般保持默认 不修改8161端口
     

5、在eclipse中新建项目,将apache-activemq-5.5.1\lib中所需要的jar包复制到项目中.
   5.1 Class Sender
     import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
	
	public static void main(String[] args) throws JMSException{
		ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
		        ActiveMQConnection.DEFAULT_USER,  
		        ActiveMQConnection.DEFAULT_PASSWORD,  
		        "tcp://localhost:61616");  
		  
		//连接到JMS提供者  
		Connection conn = connFactory.createConnection();  
		conn.start();  
		  
		//事务性会话,自动确认消息  
		Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
		  
		//消息的目的地  
		Destination destination = session.createQueue("queue.hello");  
		  
		//消息生产者  
		MessageProducer producer = session.createProducer(destination);  
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
		
		//发送消息
		sendText(session, producer);
		
		session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地  
		producer.close();  
		session.close();  
		conn.close();  
	}
	
	
	//对象消息  
	public static void sendObject(Session session,MessageProducer producer) throws JMSException{
		User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口  
		ObjectMessage objectMessage = session.createObjectMessage();  
		objectMessage.setObject(user);  
		producer.send(objectMessage);  
	}
	
	//字节消息  
	public static void sendBytes(Session session,MessageProducer producer) throws JMSException{
		String s = "BytesMessage字节消息";  
		BytesMessage bytesMessage = session.createBytesMessage();  
		bytesMessage.writeBytes(s.getBytes());  
		producer.send(bytesMessage);  
	}
	 
	//流消息  
	public static void setStream(Session session,MessageProducer producer)throws JMSException{
		StreamMessage streamMessage = session.createStreamMessage();  
		streamMessage.writeString("streamMessage流消息");  
		streamMessage.writeLong(55);  
		producer.send(streamMessage);  
	}
	
	//键值对消息  
	public static void sendMap(Session session,MessageProducer producer)throws JMSException{
		MapMessage mapMessage = session.createMapMessage();  
		mapMessage.setLong("age", new Long(32));  
		mapMessage.setDouble("sarray", new Double(5867.15));  
		mapMessage.setString("username", "键值对消息");  
		producer.send(mapMessage);  
	}
	
	//文本消息  
	public static void sendText(Session session,MessageProducer producer)throws JMSException{  
		TextMessage textMessage = session.createTextMessage("文本消息");  
		producer.send(textMessage);  
	}
} 


5.2 Class Receiver
 
  import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver implements MessageListener{  
        private boolean stop = false;  
          
        public void execute() throws Exception {  
            //连接工厂  
            ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                    ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD,  
                    "tcp://localhost:61616");  
              
            //连接到JMS提供者  
            Connection conn = connFactory.createConnection();  
            conn.start();  
              
            //事务性会话,自动确认消息  
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
              
            //消息的来源地  
            Destination destination = session.createQueue("queue.hello");  
              
            //消息消费者  
            MessageConsumer consumer = session.createConsumer(destination);  
            consumer.setMessageListener(this);  
              
            //等待接收消息  
            while(!stop){  
                Thread.sleep(5000);  
            }  
              
            session.commit();  
              
            consumer.close();  
            session.close();  
            conn.close();  
        }  
      
        public void onMessage(Message m) {  
            try{  
                if(m instanceof TextMessage){ //接收文本消息  
                    TextMessage message = (TextMessage)m;  
                    System.out.println(message.getText());  
                }else if(m instanceof MapMessage){ //接收键值对消息  
                    MapMessage message = (MapMessage)m;  
                    System.out.println(message.getLong("age"));  
                    System.out.println(message.getDouble("sarray"));  
                    System.out.println(message.getString("username"));  
                }else if(m instanceof StreamMessage){ //接收流消息  
                    StreamMessage message = (StreamMessage)m;  
                    System.out.println(message.readString());  
                    System.out.println(message.readLong());  
                }else if(m instanceof BytesMessage){ //接收字节消息  
                    byte[] b = new byte[1024];  
                    int len = -1;  
                    BytesMessage message = (BytesMessage)m;  
                    while((len=message.readBytes(b))!=-1){  
                        System.out.println(new String(b, 0, len));  
                    }  
                }else if(m instanceof ObjectMessage){ //接收对象消息  
                    ObjectMessage message = (ObjectMessage)m;  
                    User user = (User)message.getObject();  
                    System.out.println(user.getName() + " _ " + user.getInfo());  
                }else{  
                    System.out.println(m);  
                }  
                  
                stop = true;  
            }catch(JMSException e){  
                stop = true;  
                e.printStackTrace();  
            }  
        }  
        
        
        public static void main(String[] args) throws Exception{
        	Receiver r = new Receiver();
        	r.execute();
        	
        }
    }  


  5.3、实体对象User,必须实现Serializable
    import java.io.Serializable;

@SuppressWarnings("serial")
public class User implements Serializable{
	private String name;
	private String info;
   
	public User(String name,String info){
		this.name = name;
		this.info = info;
	}
	
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getInfo() {
		return info;
	}

	public void setInfo(String info) {
		this.info = info;
	}

	 
}


5.4、运行serder 与receiver......同事可以登录mq后台管理界面,看到相应的消息信息
http://ip地址:8161


分享到:
评论

相关推荐

    JMS-activemq 实例(分ppt,eclipse工程,说明三部分)

    1. **PPTX文件(activemq.pptx)** - 这通常是一个演示文稿,详细介绍了JMS和ActiveMQ的基础知识、工作原理以及如何使用它们。它可能包含概念解释、架构图、配置示例和使用步骤等内容,帮助学习者理解ActiveMQ的核心...

    ApacheCamel-JMS-ActiveMQ

    ActiveMQ 是一个流行的开源JMS提供者,它支持多种协议,如AMQP、STOMP和OpenWire等。 在"Apache Camel JMS ActiveMQ"的使用样例中,我们有两个主要的场景: 1. **从本地读取信息推送到MQ中**:这一部分涉及到了...

    JMS-ActiveMQ入门实例

    **ActiveMQ** 是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,提供了多种协议的支持,如AMQP、STOMP、MQTT等。ActiveMQ因其高效、稳定和易用的特点,被广泛应用在企业级的消息传递系统中。 本入门...

    apache-activemq-5.8.0-bin.zip

    这个压缩包"apache-activemq-5.8.0-bin.zip"包含了ActiveMQ 5.8.0版本的二进制发行版,供用户在本地计算机上安装和运行。 1. **Apache ActiveMQ简介** - Apache ActiveMQ是业界广泛使用的消息代理,提供可靠的消息...

    apache-activemq-5.9.0-bin

    这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...

    apache-activemq-5.15.9.rar

    在本文中,我们将深入探讨Apache ActiveMQ,特别是针对“apache-activemq-5.15.9-bin.zip”这个版本在Windows平台上作为MQTT服务器的使用。 首先,让我们理解MQTT(Message Queuing Telemetry Transport)。MQTT是...

    apache-activemq-5.15.8-bin.zip

    这个"apache-activemq-5.15.8-bin.zip"文件包含了ActiveMQ的可执行版本,用于在本地计算机上安装和运行ActiveMQ服务。 首先,我们需要了解ActiveMQ的核心概念。它是一个消息代理,扮演着消息生产者与消费者之间的...

    apache-activemq-5.16.6-bin.zip

    这个"apache-activemq-5.16.6-bin.zip"文件包含了ActiveMQ的最新稳定版本5.16.6的二进制发行版,主要用于在各种环境中部署和运行。 **Apache ActiveMQ核心概念** 1. **消息队列(Message Queue)**: 消息队列是...

    apache-activemq-5.13.2-bin.tar.gz

    这个`apache-activemq-5.13.2-bin.tar.gz`压缩包包含了ActiveMQ的可执行版本,适用于运行在Linux环境下的Java应用程序。版本号5.13.2意味着这是该软件的特定稳定版本,它可能包含了bug修复和性能优化。 ActiveMQ的...

    apache-activemq-5.14.3-bin.zip

    这个"apache-activemq-5.14.3-bin.zip"压缩包包含了在Windows环境下部署和运行ActiveMQ所需的所有文件。让我们深入探讨一下这个版本的ActiveMQ及其在Java消息服务中的应用。 首先,Java消息服务(JMS)是一种标准...

    apache-activemq-5.16.5

    标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本都会包含错误修复、性能提升以及新功能的添加。 描述中提到"启动要求jdk版本8+", 这意味着在运行Apache ActiveMQ 5.16.5...

    apache-activemq-5.16.5-bin.tar.gz 下载(5积分)

    ActiveMQ是JMS的一个具体实现,支持JMS的两种消息模型。ActiveMQ使用AMQP协议集成多平台应用,使用STOMP协议通过websockets在Web应用程序之间交换消息,使用MQTT协议管理物联网设备 ActiviMq消息队列,可解决服务解...

    apache-activemq-5.15.3-bin.tar.gz

    这个压缩包“apache-activemq-5.15.3-bin.tar.gz”包含了Apache ActiveMQ 5.15.3版本的源代码和可执行文件,适合在Linux环境下部署和使用。 **1. Apache ActiveMQ简介** Apache ActiveMQ是Apache软件基金会的一个...

    apache-activemq-5.15.15二进制包,安装包

    这个“apache-activemq-5.15.15二进制包,安装包”包含了运行和配置ActiveMQ所需的所有组件,方便用户在本地计算机或服务器上快速部署和使用。该版本5.15.15是Apache ActiveMQ的一个稳定版本,提供了许多增强的功能...

    apache-activemq-5.15.0-bin.tar.7z

    ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种语言编写客户端 2、对spring的支持,很容易和spring整合 3、支持多种传输协议:TCP,SSL,NIO,UDP等 4、支持...

    apache-activemq-5.9.0 下载

    在描述中提到的`apache-activemq-5.9.0`是ActiveMQ的一个特定版本,发布于2014年,这个版本包括了对32位和64位系统的支持,确保了在不同硬件环境下的兼容性。ActiveMQ 5.9.0版本提供了许多增强功能和改进,包括性能...

    apache-activemq-5.15.7-bin

    在你提到的`apache-activemq-5.15.7-bin`压缩包中,包含的是ActiveMQ的可执行版本,无需安装,解压后即可使用,这为用户提供了便捷的部署方式。 **ActiveMQ核心概念与功能:** 1. **消息队列(Message Queue)**:...

    apache-activemq-5.3.1-bin.tar.gz

    这个压缩包“apache-activemq-5.3.1-bin.tar.gz”是针对Linux/Unix系统的二进制发行版,通常包含运行和管理ActiveMQ所需的所有文件。 **1. ActiveMQ简介** Apache ActiveMQ是一个基于标准的消息中间件,它实现了JMS...

Global site tag (gtag.js) - Google Analytics