`

用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收

 
阅读更多

消费者:接收消息

逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

<?php
/*************************************
* PHP amqp(RabbitMQ) Demo - consumer
* Author: Linvo
* Date: 2012/7/30
*************************************/
//配置信息
$conn_args = array(
    'host' => '192.168.1.93',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = 'key_1'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";

//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息
echo "Message:\n";
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();

/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

生产者:发送消息

逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息

<?php
/*************************************
* PHP amqp(RabbitMQ) Demo - publisher
* Author: Linvo
* Date: 2012/7/30
*************************************/
//配置信息
$conn_args = array(
    'host' => '192.168.1.93',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
//$q_name = 'q_linvo'; //无需队列名
$k_route = 'key_1'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//消息内容
$message = "TEST MESSAGE! 测试消息!";

//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);

//发送消息
//$channel->startTransaction(); //开始事务
for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事务

$conn->disconnect();

需要注意的地方是:

queue对象有两个方法可用于取消息:consume和get。
前者是阻塞的,无消息时会被挂起,适合循环中使用;

后者则是非阻塞的,取消息时有则取,无则返回false。

测试截图

运行消费者:

1343873405_2469

运行生产者,发消息:

1343873408_3083

消费者接收到消息:

1343873412_6992

 

http://nonfu.me/p/9722.html

分享到:
评论

相关推荐

    用PHP收发RabbitMQ消息

    AMQP 扩展提供了 PHP 语言与 RabbitMQ 之间的接口,允许开发者使用 PHP 语言来发送和接收消息。 二、消费者:接收消息逻辑 在接收消息时,我们需要创建一个连接,创建一个 channel,创建一个交换机,创建一个队列...

    php7 测试可用的amqp 扩展

    标题 "php7 测试可用的amqp 扩展" 指的是在PHP7环境中能够正常工作的AMQP扩展,这是用于实现先进消息队列协议(Advanced Message Queuing Protocol)的一个关键组件。AMQP允许分布式系统中的组件通过异步消息传递...

    RabbitMQ 的windows php 扩展php_amqp-1.2.0-5.3-nts-vc9-x86.dll

    2. **php_amqp扩展:** `php_amqp`是PHP与RabbitMQ交互的扩展库,它提供了PHP与AMQP服务器通信的接口,使得开发者可以在PHP代码中创建连接、通道、队列、发布和接收消息等。 3. **版本兼容性:** `...

    amqp.so扩展

    通过安装amqp.so扩展,PHP程序员能够利用PHP语言直接操作RabbitMQ,发送和接收消息,从而构建出高效能、可扩展的应用程序。 在Mac电脑上,开发者通常会使用MAMP(Mac OS X Apache MySQL PHP)这个集成环境来搭建...

    php扩展amqp

    **PHP扩展AMQP详解** PHP扩展AMQP是用于在PHP应用程序中与AMQP(Advanced Message Queuing Protocol)消息队列进行交互的一种工具。...理解并熟练使用AMQP扩展,可以帮助开发者构建更加健壮和高效的分布式系统。

    108、AMQP消息队列-RabbitMQ1

    总结:本案例主要介绍了如何在Symfony应用中集成RabbitMQ作为消息队列,利用Docker容器部署RabbitMQ服务,配置`.env`文件进行连接,并通过Symfony Messenger组件实现消息的发送和消费。同时,也展示了使用Doctrine ...

    tp6使用rabbitmq

    RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced Message Queuing Protocol)协议,提供高可用性、可靠性和可扩展性。 【描述】: 在TP6中应用RabbitMQ,主要涉及以下几个步骤和知识点: 1. **...

    RabbitmQ相关Amqp的PHP实例代码

    以下是一个简单的PHP实例代码,展示如何发送和接收消息: ```php &lt;?php require_once 'vendor/autoload.php'; // 如果使用Composer安装php-amqp use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\...

    php AMQP 实例

    在PHP中,通过amqp扩展,我们可以轻松地发送和接收消息到RabbitMQ服务器,从而实现任务的异步处理,提高系统的可扩展性和响应速度。 以下是一些关于使用PHP和AMQP进行RabbitMQ操作的关键知识点: 1. **安装AMQP...

    php7可用 rabbitmq-c 插件

    通过`rabbitmq-c`和`amqp-1.9.0`扩展,PHP7开发者可以充分利用RabbitMQ的强大功能,构建出可靠、可扩展的消息传递系统。理解如何配置和使用这些工具,对于构建高性能、高可用性的分布式系统至关重要。

    rabbitmq的接口函数说明,api参数使用说明

    **声明**:`amqp_exchange_declare_ok_t* amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t ...

    amqp-1.4.0.tgz

    3. **PHP AMQP扩展**(php-amqp):这个扩展为PHP提供了与RabbitMQ交互的接口,允许开发者创建连接、声明交换机、队列,发送和接收消息,以及管理其他AMQP相关任务。使用这个扩展,开发者可以轻松地将异步处理和消息...

    php_amqp-1.4.0-5.5-ts-vc11-x64

    通过安装这个扩展并配置好相关设置,PHP开发者可以使用AMQP类和方法创建连接、声明交换机、绑定队列、发布和接收消息等。例如,你可以创建一个生产者来发布消息到指定的交换机,然后由消费者从队列中获取并处理这些...

    amqp1.6源码

    6. **回调函数**:在异步消费模式下,PHP-amqp扩展通常使用回调函数处理接收到的消息。源码中会有如何注册和执行这些回调的示例。 7. **错误处理**:理解源码中的错误处理机制可以帮助我们在遇到问题时快速定位和...

    beego环境下 rabbitmq封装以及使用

    通过RabbitMQ,生产者可以发送消息,而消费者可以在准备好处理时接收这些消息,从而解耦了系统的不同部分,提高系统的响应速度和可扩展性。 接下来,我们进入Beego框架。Beego是一个快速开发Go语言Web应用的框架,...

    rabbitmq+PHP教程代码.rar

    RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的开源消息队列服务器,它允许应用程序之间进行异步通信,从而提高系统的响应速度和可扩展性。通过创建消息通道,生产者可以将任务发送到队列,而...

    一文读懂PHP使用RabbitMQ

    RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理和队列服务器,它采用Erlang编程语言开发,提供了多种语言的客户端,包括PHP。RabbitMQ的核心作用在于解耦应用程序,使得生产者能够发送...

    RabbitMQ操作demo

    接着,我们需要在PHP环境中安装AMQP扩展,这是PHP与RabbitMQ交互的关键。在大多数情况下,你可以使用Composer,PHP的依赖管理工具,来添加`php-amqp`库。运行以下命令: ```bash composer require ...

    PHPAMQP一个纯PHP实现的AMQP库

    PHP AMQP库是一个用于在PHP中与AMQP(Advanced Message Queuing Protocol)消息队列交互的纯PHP实现。AMQP是一种开放标准,它定义了一种二进制应用层协议,用于在分布式系统中进行高效、可靠的消息传递。PHP AMQP库...

Global site tag (gtag.js) - Google Analytics