`
zzc1684
  • 浏览: 1224220 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

(转)消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用 PHP demo

    博客分类:
  • PHP
阅读更多
原文链接:http://blog.csdn.net/shagoo/article/details/6077686

 

随 着互联网企业业务量的不断扩大,企业信息网络系统的愈加复杂,性能问题也就越来越凸显出来,串行的业务处理方式显然已经成为主要的瓶颈,我们需要更多 异 步的并行处理来提高企业信息系统的业务处理能力,因此独立的消息处理系统也就应运而生,ActiveMQ  就是诸多开源消息系统的佼佼者。对于我们 的技术选型来说,稳定和适应性是最重要的考虑因素,因此由 Apache  组织背景而且支持发布/订阅(Pub/Sub)模式以及异步 Stomp 协 议(Streaming Text Orientated  Messaging Protocol)和 REST 方式的 ActiveMQ  就成为 了首选的消息处理器,经测试在异步模式下,整个系统的信息处理吞吐量还是很理想的(一台普通服务器能达到每秒收发 4000 个消息,每个消 息  4K 字节这样的速度)。

下面我们来看看 ActiveMQ 的基本配置与使用,由于我们公司大部分的内部服务都是使用脚本语言写 的,比如 PHP/Python  等,所以我们使用 Stomp 协议来处理;另外,为了尽可能提高消息“生产者”的效率,我们采用了异 步 nio  模式;还有关于并发处理以及负载均衡方面还有很多需要注意的配置和选项,我们后面会另找时间补充。

1、下载 ActiveMQ(最新版本 5.4.1)

2、解压安装到 /usr/local/activemq 目录下

3、安装初始化配置文件 ./bin/activemq setup /etc/default/activemq (保存至默认位置,以后可微调启动参数)

4、编辑 ./conf/activemq.xml 加入如下段(authentication / stomp+nio):
...
<plugins>
<!-- Add By James ; Configure authentication; Username, passwords and groups -->
    <simpleAuthenticationPlugin>
        <users>
            <authenticationUser username="system" password="${activemq.password}" groups="users,admins"/>
            <authenticationUser username="user" password="${guest.password}" groups="users"/>
            <authenticationUser username="guest" password="${guest.password}" groups="guests"/>
        </users>
    </simpleAuthenticationPlugin>
</plugins>
...
<!-- Add by James for support stomp protocol -->
<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
...

5、使用 Stomp 库对 ActiveMQ 进行收发消息测试(这里不提供 Java 的例子,因为 Google 上已经有太多了,我们以 PHP 作为脚本语言的程序范例)。

a> 以下是 Stomp 协议的简单命令:

* SEND
* SUBSCRIBE
* UNSUBSCRIBE
* BEGIN
* COMMIT
* ABORT
* ACK
* DISCONNECT

b> PHP 实例(transaction / persistent / ack)

关于 Stomp 的 PHP 库可以到 Google Code 上下载,使用以下测试用例的时候可以配合 http://activemq-hostname:8161/admin/ 查看消息的数量和状态。

c> amq_sent.php(生产者示例)
...
// include a library
require_once("Stomp.php");

// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");

// connect
$con->connect("guest", "password");

// begin transaction
$con->begin("tx1");

try {
    
    // build a message by map
    require_once("Stomp/Message/Map.php");
    $msgBody = array("city"=>"Belgrade", "name"=>"Dejan");
    $msgHeader["persistent"] = "true"; // VERY IMPORTANT : Because By  default, Stomp produced messages are set to non-persistent.
    $msgHeader["transformation"] = "jms-map-json";
    $mapMessage = new StompMessageMap($msgBody, $msgHeader);
    
    // send the message to the queue
    $con->send("/queue/test", $mapMessage, array("transaction" => "tx1"));
    
    // test rollback logic
//    throw new Exception("Sending failed ...");
    
    // commit sending message
    $con->commit("tx1");
    
    // print message
    echo "Sent message : ";
    print_r($msgBody);
    
} catch (Exception $e) {
    
    // rollback sending
    $con->abort("tx1");

    // print message
    echo $e->getMessage();
}

// disconnect
$con->disconnect();
...

d> amq_recv.php(消费者示例)
...
// include a library
require_once("Stomp.php");

// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");

// connect with authentication
$con->connect("guest", "password");

// set read timeout
$con->setReadTimeout(1);

// subscribe to the queue
$con->subscribe("/queue/test", array("transformation" => "jms-map-json"));

// receive a message from the queue
$msg = $con->readFrame();

// do what you want with the message
if ( $msg != null) {
    echo "Received message : ";
    print_r($msg);
    // mark the message as received in the queue
    $con->ack($msg);
} else {
    echo "Failed to receive a message/n";
}

// disconnect
$con->disconnect();
...

6、 目前 ActiveMQ 最新版的消息数据是保存在一个快速的文本数据库 kahadb  里面的,使用虽然速度很快但是一旦出错恢复起来总是有很多的问 题。所以我建议大家还是把主流数据库的持久化选项打开,虽然慢一点但是数据至少恢复起来要简 单不少,我们使用 Mysql 来保存持久化的消息数据,数 据库是 activemq,配置在 ./conf/activemq.xml 中,如下:
...
<persistenceFactory>
    <journalPersistenceAdapterFactory dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceFactory>
...
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://192.168.1.10:11811/activemq?relaxAutoCommit=true"/>
    <property name="username" value="admin"/>
    <property name="password" value="ihush2010"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
...

总结&注意事项:
1、Stomp 协议构造的消息默认是非持久化的,如果要让消息持久化必须加上 "persistent:true" 的消息头,这个搞了我半天时间~
2、编程时候还是需要注意“消费者”的运行效率,因为大量“慢消费者”会在非持久的 Topics 上出现问题,特别在多消费者的情况下,容易造成 ActiveMQ 的反应变慢~
3、如果不是使用 Stomp 协议和 ActiveMQ 建立通讯的情况下尽量使用长连接,Connection 的 start() 和 stop() 方法代价很高。
4、如果使用集群 master-slave 也是可以的,但是总是不够稳定,建议还是使用单台服务器的模式。

分享到:
评论

相关推荐

    php 版本 activemq+stomp.php

    总之,这个压缩包中的内容将为PHP开发者提供一个全面了解如何在PHP环境中使用ActiveMQ和STOMP协议的起点,包括源代码示例、XML解析技巧和实际应用场景,对于构建分布式系统和消息驱动的应用程序非常有帮助。

    activemq_demo,activeMQ的简单demo

    ActiveMQ提供了多种消息协议的支持,包括开放消息中间件协议(OpenWire)、Java消息服务(JMS)以及STOMP等,使得它能够与多种语言和平台无缝对接。 【描述】中提到的"activeMQ的简单demo"是指一个基础的示例项目,...

    activemq-demo

    ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循多种消息协议,如AMQP、STOMP、XMPP等,为分布式系统提供可靠的异步通信能力。 【描述】在这个"activemq-demo"项目中,开发者使用了Spring框架来简化配置...

    ActiveMQ使用mqtt协议的实现发布消息的三种方式.txt

    java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)

    activeMQ初学使用demo

    本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...

    ActiveMQ Demo (C#)

    - **多种协议支持**:除了JMS,ActiveMQ还支持AMQP、STOMP、MQTT等多种消息协议。 6. **Demo中的Form配置** - 在这个Demo中,开发者可能创建了一个带有图形界面的Form,用户可以配置ActiveMQ连接参数,如服务器...

    go语言实现使用activemq 收发消息

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,支持多种协议,包括我们这里提到的STOMP(Simple Text Oriented Messaging Protocol)。STOMP是一种简单易用的网络消息传输协议,适合多种编程语言,包括Go。 ...

    springboot集成activemq实现消息接收demo

    本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml &lt;groupId&gt;org.spring...

    ActiveMQ APR STOMP技术文档

    1. ActiveMQ 的简单介绍 ...APR 提供了底层的系统接口,ActiveMQ 则作为消息中间件处理和转发消息,而STOMP简化了客户端与ActiveMQ的通信。这样的组合对于构建高性能、可扩展的分布式系统具有重要意义。

    activemq-stomp-5.10.0.jar

    标签:activemq-stomp-5.10.0.jar,activemq,stomp,5.10.0,jar包下载,依赖包

    activemq C#客户端使用demo

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它支持多种协议,包括AMQP、STOMP、MQTT、OpenWire等,并且提供了多种语言的客户端API,包括Java、C#、Python、Ruby等。在本示例中,我们将专注于讲解如何在C#...

    ActiveMQ使用SSL加密文件Demo

    **ActiveMQ 使用 SSL 加密文件 Demo** ActiveMQ 是一个开源的消息代理服务器,它遵循 Java Message Service(JMS)规范,提供了可靠的消息传递功能。在实际的生产环境中,为了确保消息传输的安全性,我们通常会使用...

    ActiveMQ+Camel+Spring+jms Demo(二)

    首先,ActiveMQ是Apache软件基金会开发的开源消息代理,它是JMS的实现,支持多种协议,如AMQP、STOMP等。ActiveMQ允许应用程序通过发布/订阅或点对点模型交换消息,从而实现异步通信和解耦。在本示例中,ActiveMQ将...

    自己写的ActiveMQ的Demo例子

    ActiveMQ 支持多种协议,如 OpenWire、STOMP、AMQP 和 MQTT,允许不同平台的应用程序进行通信。它提供了消息的可靠传递、高可用性以及可伸缩性,是企业级应用程序中广泛使用的组件。 **ActiveMQ 的核心概念** 1. *...

    使用WebSocket协议接收ActiveMQ消息

    ActiveMQ是Apache软件基金会开发的消息队列产品,它遵循开放标准,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),提供跨语言的API和协议支持,可以处理各种消息传递模式,如点对点、...

    ActiveMQ整合spring的Demo

    ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的Java消息服务(JMS)提供者,支持多种消息协议,如AMQP、STOMP等。Spring框架则是广泛使用的Java应用程序开发框架,它提供了丰富的功能,包括依赖注入...

    activemq-stomp-5.10.0-sources.jar

    标签:activemq-stomp-5.10.0-sources.jar,activemq,stomp,5.10.0,sources,jar包下载,依赖包

    消息中间件activemq项目demo

    "前后两个项目的对比,凸显出spring的优点"意味着项目中可能包含了一个没有使用Spring的对照版本,通过对比,能够更好地理解Spring在处理消息传递方面的优势。 **标签解析:** "active jms spring java" 这些标签...

Global site tag (gtag.js) - Google Analytics