`
lishiguang
  • 浏览: 193508 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ(PTP) HelloWorld

阅读更多

感谢作者分享 转自 http://blog.sina.com.cn/s/blog_4b5bc0110100kboh.html

 

1.JMS介绍

    JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1)JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

2) 消息管理对象提供对消息进行操作的API。JMS API中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3)消息的生产者和消费者。消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)

4)消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:

  消息头(header)――JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

  属性(property)――用来添加删除消息头以外的附加信息。

  消息体(body)――JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

 

2.Messages 通信方式

上面提到JMS通信方式分为点对点通信和发布/订阅方式

1)点对点方式(point-to-point)

   点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2)发布/订阅 方式(publish/subscriber Messaging)

    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

 

3.为什么选用ActiveMQ

   1)ActiveMQ是一个开放源码

   2)基于Apache 2.0 licenced 发布并实现了JMS 1.1。

   3)ActiveMQ现在已经和作为很多项目的异步消息通信核心了

   4)在很多中小型项目中采用ActiveMQ+SPRING+TOMCAT开发模式。

 

4.编程模式

4.1消息产生者向JMS发送消息的步骤

(1)创建连接使用的工厂类JMS ConnectionFactory

(2)使用管理对象JMS ConnectionFactory建立连接Connection

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)使用消息生产者MessageSender发送消息

4.2消息消费者从JMS接受消息的步骤

(1)创建连接使用的工厂类JMS ConnectionFactory

(2)使用管理对象JMS ConnectionFactory建立连接Connection

(3)使用连接Connection 建立会话Session

(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver

(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver

消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

 

5.ActiveMQ运行

ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。运行�tivemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。

打开http://localhost:8161/admin/queues.jsp ,可以查看相应的queue中是否有消息

6.SendMessage(用于发送消息)

Java代码 

import javax.jms.Connection;   

import javax.jms.Destination;   

import javax.jms.JMSException;   

import javax.jms.MessageProducer;   

import javax.jms.Session;   

import javax.jms.TextMessage;   

import org.apache.activemq.ActiveMQConnectionFactory;   

 

public class SendMessage {   

 private static final String url ="tcp://localhost:61616";   

 private static final String QUEUE_NAME ="choice.queue";   

 protected String expectedBody = "<hello>world!</hello>";   

 public void sendMessage() throws JMSException{   

  Connection connection =null;   

  try{   

   ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(url);   

   connection = (Connection)connectionFactory.createConnection();   

   connection.start();   

   Session session = (Session)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   

   Destination destination = session.createQueue(QUEUE_NAME);   

   MessageProducer producer = session.createProducer(destination);   

   TextMessage message = session.createTextMessage(expectedBody);   

   message.setStringProperty("headname", "remoteB");   

   producer.send(message);       

  }catch(Exception e){   

   e.printStackTrace();   

  }   

 }   

 

 public static void main(String[] args){   

  SendMessage sndMsg = new SendMessage();   

  try{   

   sndMsg.sendMessage();   

  }catch(Exception ex){   

   System.out.println(ex.toString());   

  }   

 }   

}  

 

 

 

7.ReceiveMessage(用于接收消息)

Java代码 

import java.io.File;   

import java.io.FileInputStream;   

import java.io.FileOutputStream;   

import java.io.IOException;   

import javax.jms.BytesMessage;   

import javax.jms.Connection;   

import javax.jms.Destination;   

import javax.jms.JMSException;   

import javax.jms.Message;   

import javax.jms.MessageConsumer;   

import javax.jms.Session;   

import javax.jms.TextMessage;   

import org.apache.activemq.ActiveMQConnectionFactory;   

 

public class ReceiveMessage {   

 private static final String url = "tcp://localhost:61616";   

 private static final String QUEUE_NAME = "choice.queue";   

 public void receiveMessage() {   

  Connection connection = null;   

  try {   

   try {   

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);   

    connection = connectionFactory.createConnection();   

   } catch (Exception e) {   

                System.out.println(e.toString());   

   }   

   connection.start();   

   Session session = connection.createSession(false,   

     Session.AUTO_ACKNOWLEDGE);   

   Destination destination = session.createQueue(QUEUE_NAME);   

   MessageConsumer consumer = session.createConsumer(destination);   

   consumeMessagesAndClose(connection, session, consumer);   

  } catch (Exception e) {   

    System.out.println(e.toString());   

  }   

 }   

 

 protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)   

 throws JMSException {   

  for (int i = 0; i < 1;) {   

   Message message = consumer.receive(1000);   

   if (message != null) {   

    i++;   

    onMessage(message);   

   }   

  }   

  System.out.println("Closing connection");   

  consumer.close();   

  session.close();   

  connection.close();   

 }   

 

 public void onMessage(Message message) {   

  try {   

   if (message instanceof TextMessage) {   

    TextMessage txtMsg = (TextMessage) message;   

    String msg = txtMsg.getText();   

    System.out.println("Received: " + msg);   

   }   

  } catch (Exception e) {   

   e.printStackTrace();   

  }   

 

 }   

 

 public static void main(String args[]) {   

  ReceiveMessage rm = new ReceiveMessage();   

  rm.receiveMessage();   

 }   

 

}  

分享到:
评论

相关推荐

    apache activeMQ之初体验(helloworld)

    在这个"apache activeMQ之初体验(helloworld)"中,我们将探索如何使用ActiveMQ进行基本的消息发布与订阅。 在消息中间件中,"Hello World"程序通常用于演示最基本的消息传递概念。ActiveMQ的"Hello World"程序主要...

    ActiveMQ简单的HelloWorld实例

    在本文中,我们将深入探讨如何使用ActiveMQ创建一个基本的"HelloWorld"实例,以便于初学者理解消息队列的工作原理。ActiveMQ是Apache软件基金会开发的一个开源消息代理,它实现了多种消息协议,如OpenWire、AMQP、...

    ActiveMQ_Helloword

    ActiveMQ_HelloWorld 是一个示例项目,它展示了如何使用 Apache ActiveMQ 这个开源消息代理来实现消息队列(MQ)的基本功能。在这个项目中,我们重点探讨了两种主要的消息传递模式:点对点(Point-to-Point, PTP)和...

    ActiveMQ的处理模式:PTP与PUB/SUB

    在SpringBoot中配置和使用ActiveMQ的PTP模式,通常需要创建一个Queue bean,并通过配置JMS模板来发送和接收消息。 其次,发布/订阅(PUB/SUB)模式则是基于主题(Topic)的通信方式。与队列不同,主题可以让多个...

    ActiveMQ 安装 手册 说明

    这个程序创建了一个连接,创建了一个队列目的地,并发送了一条"Hello World!"的消息到名为 "jms_test_queue" 的队列。 总结来说,安装和运行ActiveMQ涉及下载、解压、设置权限、启动和验证服务状态。通过编写简单的...

    activeMQ收发工具.rar

    ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...

    ActiveMQ的activemq.xml详细配置讲解

    **ActiveMQ的activemq.xml配置详解** ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的...

    apache-activemq-5.5.1-bin.zip加上入门demo

    解压缩apache-activemq-5.5.1-bin.zip,然后双击...包含了apache-activemq-5.5.1-bin.zip以及ActiveMQ一个helloworld的demo启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    mqttjs(activemq测试工具)

    client.publish('test/topic', 'Hello, MQTT World!'); }); // 接收消息事件 client.on('message', function (topic, message) { console.log(`Received message from topic ${topic}: ${message.toString()}`); ...

    apache-activemq-5.16.5

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java消息服务(JMS)标准,提供了高度可扩展、可靠的异步通信能力。标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本...

    ActiveMQ路由配置方式

    ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...

    apache-activemq-5.17.3

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的应用服务器,它是Apache软件基金会的一部分。这个名为"apache-activemq-5.17.3"的压缩包包含了ActiveMQ的5.17.3版本,这是一个稳定且功能丰富的发布版本。在深入...

    activemqactivemq

    TextMessage message = session.createTextMessage("Hello, ActiveMQ!"); producer.send(message); ``` 在“MQClient”文件中,我们可以期待看到具体的客户端代码示例,这些示例可能包括连接到ActiveMQ服务器、创建...

    jmx监控activeMQ监控

    jmx监控ActiveMQ监控 jmx(Java Management Extensions)是一种Java技术,为Java应用程序提供了管理和监控的功能。ActiveMQ是Apache软件基金会下的一个开源消息队列系统,提供了高效、可靠的消息传递服务。在生产...

    ActiveMQ快速上手 PDF

    ### ActiveMQ 快速上手知识点详解 #### 一、ActiveMQ简介 - **定义**:ActiveMQ 是 Apache 软件基金会所研发的一款开源消息中间件,它完全支持 JMS 1.1 和 J2EE 1.4 规范,能够作为 JMS Provider 实现消息传递功能...

    Jmeter测试ActiveMQ性能报告

    本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...

    apache-activemq Linux版本

    Apache ActiveMQ是业界广泛使用的开源消息中间件,尤其在Linux环境下表现出色。它基于Java语言开发,遵循Apache软件基金会的许可证,并且实现了多种消息传递协议,包括OpenWire、STOMP、AMQP和XMPP等。在Linux系统上...

    ActiveMQ客户端

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的Message Broker,它允许应用程序通过消息传递进行异步通信。ActiveMQ客户端库是用于与ActiveMQ服务器交互的接口,允许开发者在他们的应用中发送和接收消息。这个...

    最新稳定版ActiveMQ5.15.0

    Apache ActiveMQ是开源社区中最流行的Java消息代理,也是企业级消息中间件(Message Broker)的首选之一。在最新的稳定版5.15.0中,它提供了可靠的消息传递功能,适用于分布式应用程序之间的通信,实现了异步处理、...

Global site tag (gtag.js) - Google Analytics