`

RibbitMQ php扩展使用 实现队列生产消费

阅读更多

一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:

首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器

http://www.rabbitmq.com/download.html

默认的端口是55672   访问地址http://127.0.0.1:55672/

默认帐号密码   guest    guest

你可以看到rabbitmq 的管理界面

mq的任务是一个不浪费资源,的一个队列系统!

 

php使用需要下载一个amqp扩展,或者直接点击下面的地址找到适合自己的版本,下载

         http://pecl.php.net/package/amqp/1.4.0/windows

根据当前使用的php版本选择相应的扩展dll,下载后是一个压缩包,里面有两个dll扩展(php_amqp.dll和rabbitmq.1.dll)。

php_amqp

队列开启的过程:

rabbitmq.1.dll   放在C盘windows下

php_amqp.dll    放入php扩展中

开启php_amqp.dll的引用

详细步骤如下:

1.将rabbitmq.1.dll文件放在php的根目录里(也就是ext目录的父级目录),然后修改apache的httpd.con文件,文件尾部添加一行,这里的路径根据情况修改,我这里使用的wampserver软件。

 

1
LoadFile  "d:/wamp/bin/php/php5.5.12/rabbitmq.1.dll"

 

        2.将php_amqp.dll放在php的ext目录里,然后修改php.ini文件,在文件的最后面添加两行

 

1
2
[amqp]
extension=php_amqp.dll

 

3.重启apache,并查看phpinfo信息。只要看到amqp 字样即可。

amqp

 

 

 

 

首先是rabbitmq的生产者:

生产者的逻辑:创建连接-->创建channel-->创建交换机对象-->发送消息

 

创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储!

    接下来就需要跑数据了。

    createQueue(array('xxx','2222'),'test001');

    echo "ok";

     function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')

    {

        $queueName = self::getQueueName($queueName);

        $conn_args = array('host' =>'localhost', 'port'=> '5672',

            'login' =>'guest',        //mq帐号

            'password'=> '',        //mq密码

             'vhost' => '/');

        $conn = new AMQPConnection($conn_args);

        $conn->connect();

        $channel = new AMQPChannel($conn);

        if (!$exchangeName) {

            $exchangeName = $queueName;

        }

        $queueName = $queueName;

        if (!$queueKey) {

            $queueKey = $queueName;

        }

        $ex = new AMQPExchange($channel);

        $ex->setName($exchangeName);

        $ex->setType(AMQP_EX_TYPE_TOPIC);

        $ex->setFlags(AMQP_DURABLE); //exchange持久化

        $ex->declareExchange();

        $q = new AMQPQueue($channel);

        $q->setName($queueName);

        $q->setFlags(AMQP_DURABLE); //queue持久化

        $q->declareQueue();

        $q->bind($exchangeName, $queueKey);

        $channel->startTransaction();

        /**

         * 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。

         * 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php

         */

        $result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));

        $channel->commitTransaction();

        $conn->disconnect();

    }  

 

有了生产者,那就有消费者。

脚本如果没有其他的修改或问题,基本上都是常年启动的:

消费者逻辑:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

 

消费者基类:

        class WorkerCommand{

        function qInit($q_name,$e_name='',$k_route=''){

                $q_name = Utils::getQueueName($q_name);

                $conn_args = array(

                    'host' => '127.0.0.1',            //mq的配置

                    'port' => '5672',

                    'login' => 'guest',

                    'password' => 'huoxingxing',

                    'vhost' => '/'

                );

              

          

        //创建连接和channel

                $conn = new AMQPConnection($conn_args);

                if (!$conn->connect()) {

                    die("Cannot connect to the broker!\n");

                }

                $channel = new AMQPChannel($conn);

        //创建交换机

                $ex = new AMQPExchange($channel);

                if (!$e_name) {

                    $e_name = $q_name;

                }

                $ex->setName($e_name);

                $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型

                $ex->setFlags(AMQP_DURABLE); //持久化

               // echo "Exchange Status:" . $ex->declareExchange() . "\n";

        //创建队列

                $q = new AMQPQueue($channel);

                $q->setName($q_name);

                $q->setFlags(AMQP_DURABLE); //持久化

               // echo "Message Total:" . $q->declareExchange() . "\n";

                if (!$k_route) {

                    $k_route = $q_name;

                }

        //绑定交换机与队列,并指定路由键

               // echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\n";

        //阻塞模式接收消息

                echo "Message:\n";

                while (True) {

                    $q->consume(array($this,'processMessage'));

                    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答

                }

                $conn->disconnect();

        }

}    

 

消费者:

class WorkerWareSyncBackUpCommand extends WorkerCommand {

    function actionIndex()

    {

        $this->qInit('SyncWareBackup');

    }

    function processMessage($envelope, $queue)

    {

        $msg = json_decode($envelope->getBody());

        Utils::doBackUp('back',$msg,'');

        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

    }

}

分享到:
评论

相关推荐

    tp6使用rabbitmq

    RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced Message Queuing Protocol)协议,提供高可用性、可靠性和可扩展性。 【描述】: 在TP6中应用RabbitMQ,主要涉及以下几个步骤和知识点: 1. **...

    用PHP收发RabbitMQ消息

    要使用 PHP 语言与 RabbitMQ 实现消息队列,需要安装 AMQP 扩展。AMQP 扩展提供了 PHP 语言与 RabbitMQ 之间的接口,允许开发者使用 PHP 语言来发送和接收消息。 二、消费者:接收消息逻辑 在接收消息时,我们需要...

    centos7 rpm快速安装rabbitmq3.8.5 php安装amqp扩展 添加延迟队列扩展 避开坑

    rabbitmq3.8.5 和下面版本都有一切区别,首先就是erlang语言版本的区别,但还好用的是openssl1.0,...一般centos7都是装的openssl1.0版本,该压缩包,给了详细的安装文档,稍微区别于3.7和3.6,已经增加了延迟队列扩展

    beego环境下 rabbitmq封装以及使用

    通过RabbitMQ,生产者可以发送消息,而消费者可以在准备好处理时接收这些消息,从而解耦了系统的不同部分,提高系统的响应速度和可扩展性。 接下来,我们进入Beego框架。Beego是一个快速开发Go语言Web应用的框架,...

    Redis延时消息队列基于swoole实现的多进程消费端

    标题中的“Redis延时消息队列基于swoole实现的多进程消费端”是指使用Redis作为消息队列,结合Swoole的多进程特性来构建一个高效、可扩展的延迟消息处理系统。在这个系统中,Redis作为一个可靠的键值存储,用于暂存...

    php队列+php-redis队列+php-redis扩展

    本文将详细探讨PHP中的队列技术,包括如何使用PHP-Redis扩展来实现高效的数据处理。 首先,PHP队列是PHP应用程序中用于处理大量任务的一种机制。它遵循先进先出(FIFO)原则,即最早进入队列的任务最先被处理。队列...

    RabbitMQ操作demo

    通过这个简单的"RabbitMQ操作demo",你可以理解如何在PHP中使用RabbitMQ进行消息的生产和消费。在实际项目中,你可以根据需求设置更复杂的路由、交换机和绑定规则,以及处理确认、持久化等高级特性,以实现更高效、...

    PHP基于rabbitmq操作类的生产者和消费者功能示例

    标题和描述中涉及的知识点包括PHP编程语言、RabbitMQ消息队列、生产者(Producer)和消费者(Consumer)模式、PHP的amqp扩展。在详细说明这些知识点前,我们需要了解RabbitMQ是使用高级消息队列协议(AMQP)的一个...

    php7可用 rabbitmq-c 插件

    在PHP7环境中,与RabbitMQ进行交互通常需要一个PHP扩展。`rabbitmq-c`就是这样一个扩展,它允许PHP代码直接调用C语言实现的RabbitMQ客户端库。然而,需要注意的是,`rabbitmq-c`本身并不能直接在PHP中使用,还需要...

    rabbitmq+PHP教程代码.rar

    这个库提供了与RabbitMQ服务器通信的接口,允许我们使用PHP编写生产者和消费者脚本。 `config.php`是配置文件,通常会包含连接RabbitMQ服务器所需的参数,例如主机名、端口、用户名、密码以及虚拟主机等。在实际...

    RabbitMQ实战 高效部署分布式消息队列完整版带书签

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...

    108、AMQP消息队列-RabbitMQ1

    总结:本案例主要介绍了如何在Symfony应用中集成RabbitMQ作为消息队列,利用Docker容器部署RabbitMQ服务,配置`.env`文件进行连接,并通过Symfony Messenger组件实现消息的发送和消费。同时,也展示了使用Doctrine ...

    RabbitMQ 的windows php 扩展php_amqp-1.2.0-5.3-ts-vc9-x86.dll

    本篇主要讲解如何在Windows上安装并使用RabbitMQ的PHP扩展——`php_amqp`,特别是针对PHP 5.3线程安全(TS)版本的VC9编译的`php_amqp-1.2.0-5.3-ts-vc9-x86.dll`扩展。 首先,我们需要确保已经安装了RabbitMQ...

    RabbitMQ 的windows php 扩展php_amqp-1.2.0-5.3-nts-vc9-x86.dll

    在Windows环境下,为了在PHP中利用RabbitMQ的功能,需要安装相应的PHP扩展,这就是`php_amqp`。标题提到的`php_amqp-1.2.0-5.3-nts-vc9-x86.dll`是一个针对PHP 5.3版本、非线程安全(NTS)、Visual C++ 9编译器编译的...

    使用PHP访问RabbitMQ消息队列的方法示例

    本文将详细介绍如何使用PHP来访问RabbitMQ消息队列,包括相关扩展的安装方法、队列的建立、队列的绑定、消息的发送和接收等操作技巧。 首先,要使用PHP访问RabbitMQ,我们需要在服务器上安装PHP的AMQP扩展。在...

    一文读懂PHP使用RabbitMQ

    RabbitMQ的核心作用在于解耦应用程序,使得生产者能够发送消息而无需关心谁是消费者,同样,消费者也无需知道消息来自何处,极大地增强了系统的灵活性和可扩展性。 【RabbitMQ 工作模型】 RabbitMQ工作模型包括几个...

    RabbitMq消息队列指南.docx

    消息队列MQ,全称Message Queue,是一种在应用程序之间用于通信的技术,主要功能是作为生产者与消费者之间的缓冲。在消息队列模型中,生产者不断地向队列中发布消息,而消费者则从队列中获取并处理这些消息。这种...

    RabbitMQ实战 高效部署分布式消息队列

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...

Global site tag (gtag.js) - Google Analytics