消费者:接收消息
逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
<?php /************************************* * PHP amqp(RabbitMQ) Demo - consumer * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array( 'host' => '192.168.1.93', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n"; //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
生产者:发送消息
逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息
<?php /************************************* * PHP amqp(RabbitMQ) Demo - publisher * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array( 'host' => '192.168.1.93', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //消息内容 $message = "TEST MESSAGE! 测试消息!"; //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); //发送消息 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ echo "Send Message:".$ex->publish($message, $k_route)."\n"; } //$channel->commitTransaction(); //提交事务 $conn->disconnect();
需要注意的地方是:
queue对象有两个方法可用于取消息:consume和get。
前者是阻塞的,无消息时会被挂起,适合循环中使用;
后者则是非阻塞的,取消息时有则取,无则返回false。
测试截图
运行消费者:
运行生产者,发消息:
消费者接收到消息:
http://nonfu.me/p/9722.html
相关推荐
AMQP 扩展提供了 PHP 语言与 RabbitMQ 之间的接口,允许开发者使用 PHP 语言来发送和接收消息。 二、消费者:接收消息逻辑 在接收消息时,我们需要创建一个连接,创建一个 channel,创建一个交换机,创建一个队列...
标题 "php7 测试可用的amqp 扩展" 指的是在PHP7环境中能够正常工作的AMQP扩展,这是用于实现先进消息队列协议(Advanced Message Queuing Protocol)的一个关键组件。AMQP允许分布式系统中的组件通过异步消息传递...
2. **php_amqp扩展:** `php_amqp`是PHP与RabbitMQ交互的扩展库,它提供了PHP与AMQP服务器通信的接口,使得开发者可以在PHP代码中创建连接、通道、队列、发布和接收消息等。 3. **版本兼容性:** `...
通过安装amqp.so扩展,PHP程序员能够利用PHP语言直接操作RabbitMQ,发送和接收消息,从而构建出高效能、可扩展的应用程序。 在Mac电脑上,开发者通常会使用MAMP(Mac OS X Apache MySQL PHP)这个集成环境来搭建...
**PHP扩展AMQP详解** PHP扩展AMQP是用于在PHP应用程序中与AMQP(Advanced Message Queuing Protocol)消息队列进行交互的一种工具。...理解并熟练使用AMQP扩展,可以帮助开发者构建更加健壮和高效的分布式系统。
总结:本案例主要介绍了如何在Symfony应用中集成RabbitMQ作为消息队列,利用Docker容器部署RabbitMQ服务,配置`.env`文件进行连接,并通过Symfony Messenger组件实现消息的发送和消费。同时,也展示了使用Doctrine ...
RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced Message Queuing Protocol)协议,提供高可用性、可靠性和可扩展性。 【描述】: 在TP6中应用RabbitMQ,主要涉及以下几个步骤和知识点: 1. **...
以下是一个简单的PHP实例代码,展示如何发送和接收消息: ```php <?php require_once 'vendor/autoload.php'; // 如果使用Composer安装php-amqp use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\...
在PHP中,通过amqp扩展,我们可以轻松地发送和接收消息到RabbitMQ服务器,从而实现任务的异步处理,提高系统的可扩展性和响应速度。 以下是一些关于使用PHP和AMQP进行RabbitMQ操作的关键知识点: 1. **安装AMQP...
通过`rabbitmq-c`和`amqp-1.9.0`扩展,PHP7开发者可以充分利用RabbitMQ的强大功能,构建出可靠、可扩展的消息传递系统。理解如何配置和使用这些工具,对于构建高性能、高可用性的分布式系统至关重要。
**声明**:`amqp_exchange_declare_ok_t* amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t ...
3. **PHP AMQP扩展**(php-amqp):这个扩展为PHP提供了与RabbitMQ交互的接口,允许开发者创建连接、声明交换机、队列,发送和接收消息,以及管理其他AMQP相关任务。使用这个扩展,开发者可以轻松地将异步处理和消息...
通过安装这个扩展并配置好相关设置,PHP开发者可以使用AMQP类和方法创建连接、声明交换机、绑定队列、发布和接收消息等。例如,你可以创建一个生产者来发布消息到指定的交换机,然后由消费者从队列中获取并处理这些...
6. **回调函数**:在异步消费模式下,PHP-amqp扩展通常使用回调函数处理接收到的消息。源码中会有如何注册和执行这些回调的示例。 7. **错误处理**:理解源码中的错误处理机制可以帮助我们在遇到问题时快速定位和...
通过RabbitMQ,生产者可以发送消息,而消费者可以在准备好处理时接收这些消息,从而解耦了系统的不同部分,提高系统的响应速度和可扩展性。 接下来,我们进入Beego框架。Beego是一个快速开发Go语言Web应用的框架,...
RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的开源消息队列服务器,它允许应用程序之间进行异步通信,从而提高系统的响应速度和可扩展性。通过创建消息通道,生产者可以将任务发送到队列,而...
RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理和队列服务器,它采用Erlang编程语言开发,提供了多种语言的客户端,包括PHP。RabbitMQ的核心作用在于解耦应用程序,使得生产者能够发送...
接着,我们需要在PHP环境中安装AMQP扩展,这是PHP与RabbitMQ交互的关键。在大多数情况下,你可以使用Composer,PHP的依赖管理工具,来添加`php-amqp`库。运行以下命令: ```bash composer require ...
PHP AMQP库是一个用于在PHP中与AMQP(Advanced Message Queuing Protocol)消息队列交互的纯PHP实现。AMQP是一种开放标准,它定义了一种二进制应用层协议,用于在分布式系统中进行高效、可靠的消息传递。PHP AMQP库...