`

RabbitMQ与PHP(二)—— 相关服务安装及如何用PHP作为守护模式处理消息

 
阅读更多

上一节中,详细介绍了RabbitMQ的exchange/routingkey/queue等概念,以及示例了如何使用PHP发送和处理消息的代码。这一节,将介绍在项目中如何使用PHP多线程的进行消息实时处理,以及简要介绍一些RabbitMQ的安装相关。熟悉的可以将安装这部分跳过。

一、RabbitMQ的安装:

需要首先安装erlang

#安装jdk环境
sudo apt-get install openjdk-7-jdk
#安装相关类库和工具包
sudo apt-get install libncurses5-dev m4 fop freeglut3-dev libwxgtk2.8-dev g++ libssl-dev xsltproc build-essential tk8.5  unixodbc unixodbc-dev libxml2-utils
#下载erlang安装包并安装
wget http://www.erlang.org/download/otp_src_R16B03-1.tar.gz
tar -xzvf otp_src_R16B03-1.tar.gz
cd otp_src_R16B03-1
./configure --prefix=/usr/local/erlang
make
sudo make install

注意: 这里将erlang安装到了指定的目录: /usr/local/erlang,而不是使用默认的路径。这是一个好的习惯,对于版本控制等都会有好处。但是这会导致后面 rabbitMQ报错:找不到erl 执行文件,需要多做一些处理才行。

更新环境变量:

sudo vi /etc/profile

在最后一行加上

export PATH=/usr/local/erlang/bin:$PATH

保存退出后

source /etc/profile

然后到命令行中输入erl看是否安装成功

安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.3/rabbitmq-server-generic-unix-3.2.3.tar.gz
tar -xzvf rabbitmq-server-generic-unix-3.2.3.tar.gz
mv rabbitmq_server-3.2.3/ /usr/local/
cd /usr/local/rabbitmq_server-3.2.3/sbin
./rabbitmq-server

到这里如果出现一个报错信息: ./rabbitmq-server: line 86: erl: command not found  这是因为erlang指定了安装路径,在系统的PATH中找不到。只要export PATH=$PATH:/usr/local/erlang/bin 就可以了。

如果为了rc.local启动方便,可以将 export PATH=$PATH:/usr/local/erlang/bin 这一行写入到 rabbitmq-server 文件中:

1f178a82b9014a90f9dd87cda8773912b21beedb

执行后,ps -aux 一下,看到进程中有/usr/local/erlang/lib/erlang/erts-5.10.1/bin/epmd -daemon 和 /usr/local/erlang/lib/erlang/erts-5.10.1/bin/beam.smp 就OK了。

sbin目录下还有一个脚本: rabbitmqctl 也很常用,与 rabbitmq-server 一样需要指明erlang的路径才能正确工作。

常用的方法:

rabbitmqctl start_app 启动后,执行一下这个比较保险
rabbitmqctl list_exchanges 显示当前所有的交换机
rabbitmqctl list_queue 查看当前有效队列情况

二、 PHP extension的安装:

PHP操作rabbitmq需要AMQP扩展的支持。下载扩展: http://pecl.php.net/package/amqp ,安装过程与一般扩展一样,

/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
sudo make && make install

在编译安装过程中如果报错信息,可参考这篇文章:Ubuntu 12.04安装RabbitMQ的PHP扩展出现的问题及解决办法

然后编辑php.ini 插入:

[amqp]
extension = amqp.so

重启apache或nginx,查看phpinfo其中有关于anqp的段落,就OK了。

0df431adcbef760936f25aef2fdda3cc7dd99ec1

三、如何使用PHP进行实时后端消息处理

首先,要保证PHP文件可以正确的以堵塞方式处理消息。代码可参见上一节RabbitMQ与PHP(一)

然后,我们来借助Python实现一个多线程的守护进程,由这个守护进程来调用PHP,把PHP作为工作线程。

启动脚本:start_jobs.py

# _*_ coding:utf-8 _*_
'''
yoka at 实现多线程处理任务的守护进程
Created on 2012-4-7
@author: xwarrior
@update: jimmy.dong@gmail.com
'''
#在此引入项目需要的数据包
from MyJobs import MyJobs
from MyThread import MyThread
import logging
import time

def main():
    logger = logging.getLogger('main()')
    logger.info('server start!')
    worker_threads = 2 #定义需要启动的线程数量
    timeline = 2 #线程检查时间间隔,秒
    thread_pool = {}

    for i in range(0, worker_threads ):
        param = 'some param'
        job = MyJobs( param )
        thread = MyThread( job, i )
        thread.setDaemon = True
        thread.start()
        logger.info('start thread %s' %( thread.getName() ))
        thread_pool[i] = thread

    #干完就结束模式
    #for eachKey in thread_pool.keys():
    # thread_pool[eachKey].join()

    #保持线程数量模式
    while 1:
        time.sleep(timeline)
        # 检查每一个线程
        for eachKey in thread_pool.keys():
            if thread_pool[eachKey].isAlive():
                print 'thread alive:' + str(i)
            else:
                print 'thread down:' + str(i)
                thread_pool[eachKey].run()

    logger.info('main exist!')
    return

if __name__ == '__main__':
    #init config format
    FORMAT = '%(asctime)-15s %(name)s %(levelname)s file %(filename)s:lineno %(lineno)s - %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)

    main()
    pass

线程脚本: MyThread.py

# _*_ coding:utf-8 _*_
'''
Created on 2013-03-25
@author: jimmy.dong@gmail.com
'''
from threading import Thread

class MyThread(Thread):
    '''
    创建线程
    '''

    def __init__(self,job,thread_id):
        '''
        Constructor
        '''
        self.job = job
        Thread.__init__(self, name = 'my_thread_%s' %(thread_id))

    def run(self):
        self.job.run()

    def stop(self):
        self.job.exit()

任务脚本: MyJobs.py

# _*_ coding:utf-8 _*_
'''
Created on 2013-03-25
@author: jimmy.dong@gmail.com
'''
import os
import urllib2

class MyJobs(object):

    def __init__(self, param ):
        #do something
        self.param = param

    def __del__(self):
        ''' destruct '''
        self.exit()

    def exit(self):
        ''' 退出'''
        self.quit = True

    def run(self):
        ''' 开始处理 '''
        #使用shell模式
        #cmd = '/usr/bin/curl "http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param) + '"'
        cmd = '/usr/local/php/bin/php -c /usr/local/php/lib/nginx.ini /home/jimmy/at/DocumentRoot/try/amqp_consume.php ' + str(self.param)
        re = os.system(cmd)

        #使用web模式
        #req = urllib2.Request('http://at.yoka.com/try/amqp_consume.php?key=' + str(self.param))
        #response = urllib2.urlopen(req)
        #re = response.read()
        #print re

在任务调度(start_jobs.py)中,设计了两种工作模式:

一种工作模式是一共启动N个线程去干活,适合于尽快完成一个大任务;

另一种是保持进程数量,当发现某个进程完成后,再重新将进程启动起来。显然,用户守护处理消息适合这种模式。

具体工作在MyJob.py中,提供了系统Shell调用和采用URL调用两种方式。推荐使用shell直接调用php的方式,这样可以灵活控制Php.ini,比如增加auto_prepend_file、增长max_execution_time等。

实际项目中,假定有5种类型的消息,可以启动20个线程,将thread_id当作参数传递给PHP。PHP将thread_id%5当作待处理类型,就可以得到每种类型有4个线程工作的场景了。

考虑到PHP的执行时间限制及内存泄露问题,可以将consume.php脚本进行一下改进,让PHP脚本每次处理指定数量的消息后就退出,由Python多线程框架重新启动线程,以保证运行稳定可靠。另外,将应答改为手工应答,确保消息获得正确有效处理。

/$q->consume('processMessage'); //需手动应答

/**
* 消费回调函数
*/
function processMessage($envelope, $queue) {
    global $counter;
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    if($counter++ > 5)return FALSE; //处理5个消息后退出
}

用 两个线程,检查间隔2秒,SHELL模式 测试运行结果:

f703738da9773912b0eab410f9198618377ae2f6

由上图可见,运行开始后,检查2个线程都处于活跃状态,并对消息进行了正确处理,当处理到一定数量后,PHP程序结束,父进程检查到有进程处于完成状态,重新将其启动(第二个绿色框)。完全与预期相符。

 

http://nonfu.me/p/8838.html

分享到:
评论

相关推荐

    用PHP收发RabbitMQ消息

    要使用 PHP 语言与 RabbitMQ 实现消息队列,需要安装 AMQP 扩展。AMQP 扩展提供了 PHP 语言与 RabbitMQ 之间的接口,允许开发者使用 PHP 语言来发送和接收消息。 二、消费者:接收消息逻辑 在接收消息时,我们需要...

    RabbitMq安装包及超详细安装教程

    确保你安装的Erlang版本与RabbitMQ当前版本兼容。你可以访问Erlang Solutions的官方网站下载适合你操作系统的Erlang版本。 接下来,获取RabbitMQ的安装包。RabbitMQ提供了针对Windows、Linux和macOS等操作系统的...

    windows下安装RabbitMQ消息服务器

    - `rabbitmq-service.bat install`:重新安装服务 - `rabbitmq-service.bat start`:启动服务 #### 四、测试验证 - **访问管理界面**:安装及配置完成后,可以通过浏览器访问 RabbitMQ 的管理界面,地址为 `...

    RabbitMQ消息服务安装使用手册.docx

    **RabbitMQ 消息服务安装使用手册** RabbitMQ 是一个开源的消息代理和队列服务器,它基于 AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中可靠地传递消息。本手册将详细介绍如何在Linux环境中...

    tp6使用rabbitmq

    当消息被正确处理后,消费者需要发送一个确认信号给RabbitMQ,否则消息将被重新投递。同时,应妥善处理异常,避免因为错误导致消息丢失。 9. **队列监控**:通过RabbitMQ的管理界面或者第三方工具,可以实时监控...

    php 连接rabbitmq例子含PhpAmqpLib库

    通过这些基本操作,你可以构建一个高效的消息传递系统,实现任务异步处理、负载均衡或服务间通信等功能。在实际项目中,你可能还需要考虑错误处理、重试机制以及性能优化等问题。理解`PhpAmqpLib`和RabbitMQ的工作...

    RabbitMQ消息模式之Confirm确认消息

    理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心...

    Windows c++中RabbitMQ-c特殊消息队列的保姆教程(csdn)————程序.pdf

    在Windows环境下,使用C++与RabbitMQ进行通信,我们需要首先了解RabbitMQ的基本概念,它是一个开源的消息代理和队列服务器,常用于处理异步任务和分布式系统中的消息传递。RabbitMQ-c是一个C语言编写的轻量级...

    rabbitmq+PHP教程代码.rar

    4. **RPC模式**:模拟远程过程调用,使服务间能通过消息进行通信,实现异步RPC。 5. **死信队列**:了解如何处理未被正确消费的消息,避免数据丢失。 6. **持久化与消息确认**:设置消息和队列的持久化,确保在...

    RabbitMQ自动安装脚本

    自动安装脚本是为了简化RabbitMQ在服务器上的部署过程,通常由bash或shell脚本编写,它可以自动处理下载、解压、配置、安装依赖和启动服务等一系列步骤。这样可以避免手动操作带来的繁琐和错误,提高效率。对于离线...

    RabbitMQ3.9.15安装包及安装文档

    1. **RabbitMQ概述**:RabbitMQ是一个用Erlang编程语言开发的消息中间件,它的核心功能是作为消息代理,接收并转发应用程序之间的消息。它支持多种消息协议,但最常用的是AMQP,该协议定义了消息的格式和交换方式。 ...

    rabbitmq windows服务器安装文档.doc

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,被广泛应用于分布式系统中,用于处理异步任务、消息传递和负载均衡。本文将详细介绍如何在Windows服务器上安装RabbitMQ。 ...

    离线安装rabbitmq全过程,包含python环境和er环境以及安装包的安装过程

    在IT行业中,RabbitMQ是一种广泛应用的消息队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,用于处理应用程序之间的异步通信。在没有网络连接或者网络环境不稳定的情况下,离线安装RabbitMQ就...

    RabbitMQ Linux安装教程

    在所有组件都成功安装并配置好之后,RabbitMQ就可以作为消息中间件在Linux环境中使用,实现可靠的消息传输和处理。记得在生产环境中,还需要配置防火墙规则、监控和日志管理,以确保RabbitMQ的安全和稳定性。

    rabbitMq安装教程以及软件

    - 安装过程中,系统会自动配置RabbitMQ服务,并启动服务。 3. **验证安装**: - 打开命令行窗口,输入`rabbitmqctl status`,如果显示RabbitMQ服务器的状态信息,说明安装成功。 - 通过浏览器访问`...

    RabbitMQ消息中间件技术精讲

    RabbitMQ支持多种消息传递模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式等,适用于各种复杂的应用场景。 #### 二、RabbitMQ的工作原理 RabbitMQ作为一款消息中间件,其核心组件主要包括...

    rabbitmq 实现消息插队

    本篇文章将深入探讨如何使用RabbitMQ实现消息插队,以"多人投资"为例,重点讲解手动投资与自动投资的消息处理流程。 首先,我们需要理解RabbitMQ的基本概念。在RabbitMQ中,生产者(Producer)负责发布消息,消费者...

    php7可用 rabbitmq-c 插件

    在PHP7中,为了实现与消息队列服务RabbitMQ的交互,我们可以使用`rabbitmq-c`这个C语言编写的客户端库。RabbitMQ是一种广泛使用的开源消息代理,它遵循AMQP(Advanced Message Queuing Protocol)协议,用于在分布式...

    RabbitMQ-3.6.8 安装包及安装教程

    提供的`RabbitMQ安装.docx`文档将深入解释整个过程,对于学习和实践RabbitMQ的安装与配置非常有帮助。通过熟练掌握RabbitMQ,你可以实现高效、可靠的系统间通信,提升整体架构的灵活性和稳定性。

    RabbitMQ消息服务用户手册.docx

    【RabbitMQ 消息服务用户手册】 RabbitMQ 是一个开源的消息代理和队列服务器,它基于 Advanced Message Queuing Protocol (AMQP) 协议,广泛用于分布式系统中的消息传递。RabbitMQ 提供了高可靠性和可扩展性的解决...

Global site tag (gtag.js) - Google Analytics