`
Donald_Draper
  • 浏览: 981507 次
社区版块
存档分类
最新评论

JMS(ActiveMQ) PTP和PUB/SUB模式实例

阅读更多
深入浅出JMS(一)——JMS简介 :http://blog.csdn.net/aking21alinjuju/article/details/6051421
深入浅出JMS(二)——JMS的组成 :http://blog.csdn.net/aking21alinjuju/article/details/6071123
JMS消息详解 :http://blog.csdn.net/buoymp/article/details/784696
深入掌握JMS(一):JSM基础 :http://blog.csdn.net/zhangxs_3/article/details/4034713
深入掌握JMS(二):一个JMS例子:http://blog.csdn.net/zhangxs_3/article/details/4034775
深入掌握JMS(三):MessageListener:http://blog.csdn.net/zhangxs_3/article/details/4034788
深入掌握JMS(四):实战Queue:http://blog.csdn.net/zhangxs_3/article/details/4034801
深入掌握JMS(五):实战Topic :http://blog.csdn.net/zhangxs_3/article/details/4034811
深入掌握JMS(六):消息头 :http://blog.csdn.net/zhangxs_3/article/details/4034834
深入掌握JMS(七):DeliveryMode例子:http://blog.csdn.net/zhangxs_3/article/details/4034837
深入掌握JMS(八):JMSReplyTo :http://blog.csdn.net/zhangxs_3/article/details/4034847
JMS应用示例教程:http://qidaoxp.iteye.com/blog/480047
JMS与MQ详解:http://www.fx114.net/qa-48-91234.aspx
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在做下面的测试之前要先安装ActiveMQ,下载apache-activemq-5.12.1-bin.tar.gz包,解压后
目录如下:
[root@zabbix apache-activemq-5.12.1]# ls
activemq-all-5.12.1.jar  bin  conf  data  docs  examples  lib  LICENSE  NOTICE  README.txt  tmp  webapps  webapps-demo
[root@zabbix apache-activemq-5.12.1]# cd bin


启动ActiveMQ
[root@zabbix bin]# ./activemq start
INFO: Loading '/acivemq/apache-activemq-5.12.1//bin/env'
INFO: Using java '/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/acivemq/apache-activemq-5.12.1//data/activemq.pid' (pid ' 4047')

查看ActiveMQ状态
[root@zabbix bin]# ./activemq status
INFO: Loading '/acivemq/apache-activemq-5.12.1//bin/env'
INFO: Using java '/bin/java'
ActiveMQ is running (pid '4047')
[root@zabbix bin]# 

ActiveMQ相关进程8161控制台用户admin,密码admin(http://192.168.126.128:8161),
ActiveMq 监听端口61616
[root@zabbix bin]# netstat -ntlp | grep 4047
tcp6       0      0 :::5672                 :::*                    LISTEN      4047/java           
tcp6       0      0 :::61613                :::*                    LISTEN      4047/java           
tcp6       0      0 :::61614                :::*                    LISTEN      4047/java           
tcp6       0      0 :::61616                :::*                    LISTEN      4047/java           
tcp6       0      0 :::42614                :::*                    LISTEN      4047/java           
tcp6       0      0 :::1883                 :::*                    LISTEN      4047/java           
tcp6       0      0 :::8161                 :::*                    LISTEN      4047/java   

关闭ActiveMq,原ActiveMq配置,这个命令会报错
[root@zabbix bin]# ./activemq stop

修改conf/activemq.xml文件的如下配置即可:
 
  <!--   
         The managementContext is used to configure how ActiveMQ is exposed in   
         JMX. By default, ActiveMQ uses the MBean server that is started by   
         the JVM. For more information, see:   
      
             http://activemq.apache.org/jmx.html   
     -->  
     <managementContext>  
       <managementContext createConnector="false"/>  
     </managementContext>  

修改 createConnector="true" 然后重新启动activemq,即可


需要引入的jar包如下:




PTP生产者

package mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  生存者Producer 
 * @author donald
 *
 */
public class QueueProducer {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
 
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
      //创建一个session
//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
 
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Queue :消息的目的地;消息发送给谁.  
       Queue  destination = session.createQueue(qname);  
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       //设置生产者的模式,有两种可选
//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer);  
       session.commit();  
 
       connection.close();  
       System.out.println("send text ok.");  
   }  
     
   public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  
} 

PTP消费者1:
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("消费消息:"+textMessage.getText());  
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  

PTP消费者2:

public class QueueConsumer2 {  
      .... 除了下面这一句,其他与QueueConsumer相同
      System.out.println("2消费消息:"+textMessage.getText()); 
      ....
}  


控制台输出

PTP生产者:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.

PTP消费者1:

消费消息:向ActiveMq发送的Queue消息1
消费消息:向ActiveMq发送的Queue消息3
消费消息:向ActiveMq发送的Queue消息5

PTP消费者2:
2消费消息:向ActiveMq发送的Queue消息2
2消费消息:向ActiveMq发送的Queue消息4

从上面可以看出,Queue的消息,只要被消费者,消费一次


PUB/SUB生产者
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  发布者Publisher  
 * @author donald
 *
 */
public class TopicPublisher {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String tname =  "testTopic";
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
       // Session: 一个发送或接收消息的线程  
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Topicr :消息的目的地;消息发送给谁.  
       Topic  destination = session.createTopic(tname);  
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       // 设置持久化,此处学习,实际根据项目决定  
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer);  
       session.commit();  
 
       connection.close();  
       System.out.println("send Order ok.");  
   }  
   /**
    * 
    * @param session
    * @param producer
    * @throws Exception
    */
   public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       Order order = new Order();
       order.setId(1);
       order.setAmount(150.62);
       order.setGoodsId(15);
       order.setGoodsAmount(2);
       order.setShopId(5656);
       //我们也可以将Object转换为Json String,作为TextMessage来传送,在消费再反Json String 为Obejct
       ObjectMessage orderMess = session.createObjectMessage(order);
       System.out.println("向ActiveMq:"+tname+"发送订单信息:" + "ActiveMq 发送的Topic消息"); 
       producer.send(orderMess); 
   }  
 
}  

PUB/SUB 订阅者1
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  订阅者TopicSubscriber  
 * @author donald
 *
 */
public class TopicSubscriber {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String tname =  "testTopic";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Topic destination=session.createTopic(tname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
            	   ObjectMessage objMessage=(ObjectMessage)message;  
                   Order order = (Order)objMessage.getObject();
                   System.out.println("消费订单信息:"+order.toString()); 
                   
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  


PUB/SUB 订阅者2

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  订阅者TopicSubscriber  
 * @author donald
 *
 */
public class TopicSubscriber2 {  
     ...其他与TopicSubscriber相同
     System.out.println("2消费订单信息:"+order.toString());  
     ...
              
}  


订单信息类:
package mq.enity;

import java.io.Serializable;

public class Order implements Serializable{
	
	/**
	 * 
	 */
	private static final long serialVersionUID = -343247274477730446L;
	private Integer id;//订单id
	private Double amount;
	private Integer goodsId;//商品id
	private Integer goodsAmount;//商品数量
	private Integer shopId;//店铺名
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public Double getAmount() {
		return amount;
	}
	public void setAmount(Double amount) {
		this.amount = amount;
	}
	public Integer getGoodsId() {
		return goodsId;
	}
	public void setGoodsId(Integer goodsId) {
		this.goodsId = goodsId;
	}
	public Integer getGoodsAmount() {
		return goodsAmount;
	}
	public void setGoodsAmount(Integer goodsAmount) {
		this.goodsAmount = goodsAmount;
	}
	public Integer getShopId() {
		return shopId;
	}
	public void setShopId(Integer shopId) {
		this.shopId = shopId;
	}
	public String toString(){
		return "订单id:"+this.id+","+"金额(元):"+this.amount+","+"商品id:"+
	          this.shopId+","+"商品数量:"+this.goodsAmount+","+"店铺id:"+this.shopId;
	}
	
}


控制台输出:

PUB/SUB生产者:

Connection is start...
向ActiveMq:testTopic发送订单信息:ActiveMq 发送的Topic消息
send Order ok.


PUB/SUB 订阅者1:

消费订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656

PUB/SUB 订阅者2:
2消费订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
  • 大小: 4.2 KB
分享到:
评论

相关推荐

    Spring 实现远程访问详解——jms和activemq

    同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...

    JMS-ActiveMQ入门实例

    1. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)。点对点模型中,消息被发送到一个队列,每个消息只能被一个消费者接收;而在发布/订阅模型中,...

    JMS模拟ActiveMQ代理服务器并实现消息收发

    JMS中有两种消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个消费者接收;而在发布/订阅模型中,消息从发布者发送...

    ActiveMQ快速上手 PDF

    - **消息模式**:主要有 PTP(Point-to-Point)和 Pub/Sub(Publish/Subscribe)两种。 - PTP:每个消息只被一个消费者消费。 - Pub/Sub:发布者发送消息到一个主题,所有订阅该主题的消费者都能接收到消息。 - **...

    ActiveMQ消息总线介绍

    JMS支持两种主要的消息传递模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别适用于不同类型的应用场景。 #### 三、ActiveMQ简介 Apache ActiveMQ 是一个开源的消息...

    JMS.rar_answers_jms

    3. **两种消息类型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)模式。PTP模式下,每个消息只有一个消费者,而Pub/Sub模式中,一个消息可以被多个订阅者接收...

    ActiveMQ使用入门.pdf

    - **Pub/Sub (Publish/Subscribe)**:发布/订阅模型,一个消息可以被多个订阅者接收。 - **Queue**:队列,FIFO(先进先出)结构,每个消息仅由一个消费者消费。 - **Topic**:主题,发布/订阅模型中的概念,多个...

    ActiveMQ生产者

    JMS提供两种消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。 2. **ActiveMQ安装与配置** 首先,你需要从官方网站下载ActiveMQ的最新版本并解压。然后,启动ActiveMQ服务器,...

    ActiveMQ 实战

    目的地是消息的发送目标和接收来源,根据JMS1.0.2规范定义的两种消息传递域,包括点对点(PTP)消息传递域和发布/订阅(Pub/Sub)消息传递域。在点对点消息传递域中,目的地被称为队列(queue),消息只能被一个消费...

    ActiveMq 点对点 发布订阅demo

    2. **发布/订阅通信模式(Publish/Subscribe, Pub/Sub)** 在发布/订阅模式中,消息被广播到多个订阅者,而不是一对一地发送。发布者将消息发送到一个主题(Topic),而多个订阅者可以订阅该主题,从而接收消息。...

    activeMq 实战

    根据 JMS 1.0.2 规范,有两种消息传递模型:点对点 (Point-to-Point, PTP) 和发布/订阅 (Publish/Subscribe, PUB/SUB)。 - **点对点** 模型的特点包括: - 每个消息只能有一个消费者。 - 消费者和生产者之间没有...

    JMS规范培训教程 中文版

    JMS提供两种消息模式:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模式下,消息从一个单一的生产者发送到一个队列,然后被一个或多个消费者接收。而发布/订阅模式下,消息...

    从入门到精通的ActiveMQ.docx

    3. **高可用性**:通过集群配置,可以实现 ActiveMQ 实例间的故障转移和负载均衡。 4. **安全性**:ActiveMQ 支持用户身份验证和权限控制,确保消息传输的安全。 5. **Spring 整合**:ActiveMQ 可以无缝集成到 ...

    JMS学习笔记精心总结

    JMS提供两种消息模型:点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个或多个消费者从队列中接收。发布/订阅模型中,消息由...

    一个简单的JMS客户端应用

    它支持两种消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。点对点模型基于队列,消息被一个生产者发送到队列,然后由一个消费者接收;而发布/订阅模型则涉及主题,多个订阅者...

    消息中间件介绍和代码示例

    2. 发布/订阅(Publish/Subscribe,Pub/Sub):在这种模式下,消息通过主题(Topic)进行广播。一个消息可以被多个订阅者(Subscriber)接收,每个订阅者都可以独立地消费消息。与点对点模式不同,发布者和订阅者...

    JMS开发例子.rar

    JMS提供了两种主要的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。 1. 点对点模型:在这个模型中,消息被发送到一个称为队列(Queue)的实体。每个消息只被一个消费者接收...

Global site tag (gtag.js) - Google Analytics