`
Donald_Draper
  • 浏览: 987909 次
社区版块
存档分类
最新评论

ActiveMQ PTP模式实例二

阅读更多
这篇主要是测试PTP模式下的回复消息,具体测试代码如下:
队列生产者2(消费者回复消息):
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  生存者Producer 
 * @author donald
 *
 */
public class QueueProducer2 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   private static String replyQueueName =  "replyQueue";
   static {
	  
   }
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
       // Session: 一个发送或接收消息的线程  
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Queue :消息的目的地;消息发送给谁.  
       Queue  destination = session.createQueue(qname);  
       //消费者接受消息,回复消息到replyQueue队列
       Queue  replyQueue = session.createQueue(replyQueueName);
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       // 设置持久化,此处学习,实际根据项目决定  
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer,replyQueue);  
       session.commit();  
 
       connection.close();  
       System.out.println("send text ok.");  
   }  
     
   public static void sendMessage(Session session, MessageProducer producer,Queue replyQueue)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           message.setJMSReplyTo(replyQueue);
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  
}  


队列消费者3消费消息,并回复消息:

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer3 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("3消费消息:"+textMessage.getText());
                   MessageProducer producer = session.createProducer(message.getJMSReplyTo());  
                   TextMessage replyMessage = session.createTextMessage(textMessage.getText());
                   producer.send(replyMessage);  
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  



消费者4,消费消费者回复消息

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer4 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "replyQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("4消费者回复消息:"+textMessage.getText());
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
} 

开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.

消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5


消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
分享到:
评论

相关推荐

    JMS-ActiveMQ入门实例

    在众多JMS提供商中,Apache ActiveMQ是一个非常流行的开源消息代理和集成模式服务器,广泛应用于分布式系统中。 **JMS基础** 1. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅...

    ActiveMQ快速上手 PDF

    - **通过二进制包安装**:从 Apache 官方网站下载最新的 ActiveMQ 二进制包,解压后即可使用。 - **配置示例**:在 `conf` 目录下编辑 `activemq.xml` 文件来配置 ActiveMQ 的各项参数。 - **启动**:通过命令行...

    ActiveMQ使用入门.pdf

    默认情况下,ActiveMQ的管理页面可通过8161端口访问,地址为`http://localhost:8161/admin`,提供监控和管理ActiveMQ实例的功能。 4. 使用ActiveMQ进行消息通信 在实际应用中,开发者会通过编程接口与ActiveMQ交互...

    JMS ActiveMQ演示代码

    本压缩包中的"JMS ActiveMQ演示代码"提供了一个具体的实例,展示了如何在实际项目中使用ActiveMQ来实现消息传递。这个代码可以直接放入工程中运行,帮助开发者理解和实践JMS与ActiveMQ的结合使用。 **点对点(Point...

    activeMQ5.2的jar包及使用实例

    activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。

    ActiveMQ消息总线介绍

    JMS支持两种主要的消息传递模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别适用于不同类型的应用场景。 #### 三、ActiveMQ简介 Apache ActiveMQ 是一个开源的消息...

    Spring 实现远程访问详解——jms和activemq

    同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...

    ActiveMQ 实战

    例如,在ActiveMQ中,可以通过实例化ActiveMQConnectionFactory来创建连接工厂。连接封装了应用程序与消息服务之间的虚拟连接。连接可以提供到服务的物理连接,并且可以通过连接工厂来创建。 会话是生产者和消费者...

    activemq-cpp开发手册

    - ActiveMQ C++ API支持多种消息传递模式,包括点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, PUB/SUB)模式,并提供了丰富的API来处理各种消息类型。 - **术语解析:** - **ActiveMQ**:Apache...

    activeMq 实战

    通常,一个应用程序会创建一个 `JMSConnection` 实例,并使用该实例来创建会话 (`Session`)。连接对象是多线程安全的,因此可以在不同的线程间共享。 ##### 1.3 会话 (Session) **会话** (`JMSSession`) 是生产和...

    ActiveMQ生产者

    本文将详细介绍如何使用ActiveMQ创建一个简单的生产者实例,以及涉及到的相关知识点。 1. **JMS简介** Java Message Service (JMS) 是一个标准接口,用于在不同的应用之间传递消息。它定义了生产者(发送消息)、...

    ActiveMq 点对点 发布订阅demo

    2. **发布/订阅通信模式(Publish/Subscribe, Pub/Sub)** 在发布/订阅模式中,消息被广播到多个订阅者,而不是一对一地发送。发布者将消息发送到一个主题(Topic),而多个订阅者可以订阅该主题,从而接收消息。...

    从入门到精通的ActiveMQ.docx

    【ActiveMQ 入门知识详解】 ActiveMQ 是一个开源的消息中间件,由 Apache 软件基金会开发。...在后续的专题中,我们将进一步探讨 ActiveMQ 的 API 使用、消息模式、集群配置以及与其他系统的整合等实战内容。

    ActiveMQ_in_Action_中文

    在ActiveMQ中,`ActiveMQConnectionFactory`是一个具体实例,负责创建与ActiveMQ服务器的连接。它是整个通信流程的起点,通过它,客户端能够获取到JMSConnection实例,进而与ActiveMQ进行交互。 #### 连接...

    activemq 集群配置文档

    #### 二、ActiveMQ集群配置方案 接下来,我们将详细介绍ActiveMQ提供的几种集群配置方案: - **2.1 客户端集群 (Customer Cluster)** - **定义**: 客户端集群主要是指多个客户端应用程序共享一组消息中间件资源的...

    ActiveMQ In Action

    #### 知识点二:Apache ActiveMQ介绍(Chapter 2) **Apache ActiveMQ**是一个开源的消息中间件,它是Apache软件基金会的一个顶级项目,支持多种消息协议如AMQP、STOMP等,并且兼容JMS标准。 - **ActiveMQ特点**:...

    ActiveMQ快速上手

    #### 二、ActiveMQ的安装与基本使用 **2.1 安装方法** - **源码安装**:下载ActiveMQ的源码包,编译并安装。 - **预编译包安装**:直接下载预编译好的ActiveMQ安装包进行安装。 **2.2 配置示例** - **修改配置...

    JMS模拟ActiveMQ代理服务器并实现消息收发

    2. Connection:建立与ActiveMQ服务器的连接。 3. Session:创建会话,设置事务性和确认模式。 4. Destination:可能是Queue或Topic,取决于所使用的消息模型。 5. MessageProducer:创建并配置用于发送消息的...

Global site tag (gtag.js) - Google Analytics