- 浏览: 987909 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
这篇主要是测试PTP模式下的回复消息,具体测试代码如下:
队列生产者2(消费者回复消息):
队列消费者3消费消息,并回复消息:
消费者4,消费消费者回复消息
开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.
消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5
消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
队列生产者2(消费者回复消息):
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 生存者Producer * @author donald * */ public class QueueProducer2 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "testQueue"; private static String replyQueueName = "replyQueue"; static { } public static void main(String[] args)throws Exception { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); // Connection 启动 connection.start(); System.out.println("Connection is start..."); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // Queue :消息的目的地;消息发送给谁. Queue destination = session.createQueue(qname); //消费者接受消息,回复消息到replyQueue队列 Queue replyQueue = session.createQueue(replyQueueName); // MessageProducer:消息发送者 MessageProducer producer = session.createProducer(destination); // 设置持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer,replyQueue); session.commit(); connection.close(); System.out.println("send text ok."); } public static void sendMessage(Session session, MessageProducer producer,Queue replyQueue) throws Exception { for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行 TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i); message.setJMSReplyTo(replyQueue); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i); producer.send(message); } } }
队列消费者3消费消息,并回复消息:
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 消费这Consumer * @author donald * */ public class QueueConsumer3 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "testQueue"; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Queue destination=session.createQueue(qname); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println("3消费消息:"+textMessage.getText()); MessageProducer producer = session.createProducer(message.getJMSReplyTo()); TextMessage replyMessage = session.createTextMessage(textMessage.getText()); producer.send(replyMessage); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); /* 另外一种接受方式 * while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ } }
消费者4,消费消费者回复消息
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 消费这Consumer * @author donald * */ public class QueueConsumer4 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "replyQueue"; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Queue destination=session.createQueue(qname); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println("4消费者回复消息:"+textMessage.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); /* 另外一种接受方式 * while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ } }
开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.
消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5
消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 2092JMS(ActiveMQ) PTP和PUB/SUB模 ... -
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4604JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6294JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6519JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8674JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6874JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4797JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12063JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4288ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7801下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1732JMS与MQ详解:http://www.fx114.net/q ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3131深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
在众多JMS提供商中,Apache ActiveMQ是一个非常流行的开源消息代理和集成模式服务器,广泛应用于分布式系统中。 **JMS基础** 1. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅...
- **通过二进制包安装**:从 Apache 官方网站下载最新的 ActiveMQ 二进制包,解压后即可使用。 - **配置示例**:在 `conf` 目录下编辑 `activemq.xml` 文件来配置 ActiveMQ 的各项参数。 - **启动**:通过命令行...
默认情况下,ActiveMQ的管理页面可通过8161端口访问,地址为`http://localhost:8161/admin`,提供监控和管理ActiveMQ实例的功能。 4. 使用ActiveMQ进行消息通信 在实际应用中,开发者会通过编程接口与ActiveMQ交互...
本压缩包中的"JMS ActiveMQ演示代码"提供了一个具体的实例,展示了如何在实际项目中使用ActiveMQ来实现消息传递。这个代码可以直接放入工程中运行,帮助开发者理解和实践JMS与ActiveMQ的结合使用。 **点对点(Point...
activeMQ5.2的jar包及使用实例,它既支持点到点(point-to-point)(PTP)模型和发布/订阅(Pub/Sub)模型。支持同步与异步消息发送。JDBC持久性管理使用数据库表来存储消息 。
JMS支持两种主要的消息传递模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别适用于不同类型的应用场景。 #### 三、ActiveMQ简介 Apache ActiveMQ 是一个开源的消息...
同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...
例如,在ActiveMQ中,可以通过实例化ActiveMQConnectionFactory来创建连接工厂。连接封装了应用程序与消息服务之间的虚拟连接。连接可以提供到服务的物理连接,并且可以通过连接工厂来创建。 会话是生产者和消费者...
- ActiveMQ C++ API支持多种消息传递模式,包括点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, PUB/SUB)模式,并提供了丰富的API来处理各种消息类型。 - **术语解析:** - **ActiveMQ**:Apache...
通常,一个应用程序会创建一个 `JMSConnection` 实例,并使用该实例来创建会话 (`Session`)。连接对象是多线程安全的,因此可以在不同的线程间共享。 ##### 1.3 会话 (Session) **会话** (`JMSSession`) 是生产和...
本文将详细介绍如何使用ActiveMQ创建一个简单的生产者实例,以及涉及到的相关知识点。 1. **JMS简介** Java Message Service (JMS) 是一个标准接口,用于在不同的应用之间传递消息。它定义了生产者(发送消息)、...
2. **发布/订阅通信模式(Publish/Subscribe, Pub/Sub)** 在发布/订阅模式中,消息被广播到多个订阅者,而不是一对一地发送。发布者将消息发送到一个主题(Topic),而多个订阅者可以订阅该主题,从而接收消息。...
【ActiveMQ 入门知识详解】 ActiveMQ 是一个开源的消息中间件,由 Apache 软件基金会开发。...在后续的专题中,我们将进一步探讨 ActiveMQ 的 API 使用、消息模式、集群配置以及与其他系统的整合等实战内容。
在ActiveMQ中,`ActiveMQConnectionFactory`是一个具体实例,负责创建与ActiveMQ服务器的连接。它是整个通信流程的起点,通过它,客户端能够获取到JMSConnection实例,进而与ActiveMQ进行交互。 #### 连接...
#### 二、ActiveMQ集群配置方案 接下来,我们将详细介绍ActiveMQ提供的几种集群配置方案: - **2.1 客户端集群 (Customer Cluster)** - **定义**: 客户端集群主要是指多个客户端应用程序共享一组消息中间件资源的...
#### 知识点二:Apache ActiveMQ介绍(Chapter 2) **Apache ActiveMQ**是一个开源的消息中间件,它是Apache软件基金会的一个顶级项目,支持多种消息协议如AMQP、STOMP等,并且兼容JMS标准。 - **ActiveMQ特点**:...
#### 二、ActiveMQ的安装与基本使用 **2.1 安装方法** - **源码安装**:下载ActiveMQ的源码包,编译并安装。 - **预编译包安装**:直接下载预编译好的ActiveMQ安装包进行安装。 **2.2 配置示例** - **修改配置...
2. Connection:建立与ActiveMQ服务器的连接。 3. Session:创建会话,设置事务性和确认模式。 4. Destination:可能是Queue或Topic,取决于所使用的消息模型。 5. MessageProducer:创建并配置用于发送消息的...