`
xieye
  • 浏览: 835460 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

php轻量队列(4)-pheanstalk

    博客分类:
  • PHP
阅读更多
-- 前面的队列文章 --

php队列使用php-resque(1)
php队列使用php-resque(2)
php队列使用php-resque(3)- by supervisor


-- 总体说明 --
下面是一个网上的中文的说明
www.fzb.me/2015-7-31-beanstalkd-faq.html

这里分为我们常见的服务端和客户端。
服务端是使用人很多的beanstalkd,其实叫做消息队列比较合适。
它和我们前面介绍的php-resque差别:
1、php-resque不区分明显的客户端和服务端,beanstalkd区分,较符合我们常用的思维方式。
2、php-resque只需安装redis并运行,而beanstalkd是自己写的服务端与其他软件无关,也不需要。
3、php-resque不能持久化,而beanstalkd可以,但实际不用为好,因为太慢。
4、php-resque和beanstalkd都很轻量级,都很容易使用。
5、最重要的php-resque不能使用延时队列(或者说需要其他组件补充),而beanstalkd可以。
6、php-resque许久不更新了,而beanstalkd在composer上次更新是2015年,好不少。
7、php-resque没有优先级,而beanstalkd有优先级,但我也不用。
8、为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。


介绍一下延时队列,我可以放一个消息,但希望该消息在某个特定时点才真正进入队列,从而被取出。
而普通的队列,你放入消息到队列里,就无法控制它,只要它前面没有别的消息,它就立刻会被取出。

客户端可以选用多种语言,php也有多个类库,但composer仓库下载最多的是pda/pheanstalk
==============================
服务端说明:
1、一个消息有READY, RESERVED, DELAYED, BURIED四种状态
2、当producer直接put一个消息时,消息就处于READY状态
3、如果选择延迟put,消息就先到DELAYED状态,等待时间过后才迁移到READY状态。任务状态会从 READY 变为 RESERVED(预定),其他人就无法获取。 PUT 产生消息的时候,携带了 ttr(time to run),如果这个时间内,消费者没有发送 delete, release 或者 buried 命令。 任务会自动回到 READY 状态,其他人可以继续获取。(注意:消费者返回 release 命令或者不返回,就回到 READY/DELAYED 状态,可以重新被消费!!这可能是我们不希望的)
4、如果获取了当前READY的消息后,该消息的状态就迁移到RESERVED,这样其他的任何进程就不能再操作该消息。确保唯一使用。
5、BURIED这个,为防止一个消息被调用多次,可以使用此接口。
6、程序有一个delete操作,一个消息被delete之后,就完全消失,不属于上面的4种情况中任何一种,并且,一般来说,就应该在获取消息之后对其delete,绝对不可以省略。

Beanstalkd的job状态多样化,支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,因此有很高的性能。

关于服务端使用的内存,只取决于队列大小,不限制。
缺点:无最大内存控制, 如果有消息堆积或者业务使用方式有误,而导致内存暴涨拖垮机器





-- 服务端安装 --

完整安装步骤,必须是类unix系统。

yum install beanstalkd
service beanstalkd start

上面的写法,会读取下列配置文件。
配置文件 /etc/sysconfig/beanstalkd

还有一种启动方法是直接启动,自己设置参数


当开启-b启动持久化后,其会将binlog每隔N秒(N可配置为0)刷入到硬盘,类似redis的aof持久化方式,一般不需要的。

-- 服务端监控 --
有很多
https://github.com/kr/beanstalkd/wiki/Tools
我挑选第一个下载
https://github.com/EdwinHoksberg/beanstalkd-cli/releases/tag/1.2.10

wget https://github.com/EdwinHoksberg/beanstalkd-cli/releases/download/1.2.10/beanstalkd-cli_linux_386
mv beanstalkd-cli_linux_386 beanstalkd-cli
chmod a+x beanstalkd-cli
mv beanstalkd-cli /usr/local/bin/beanstalkd-cli


使用极其简单,无需安装。
只要把文件解压即可,执行命令
./beanstalkd-cli --server 127.0.0.1 --port 11300  stats
或者
./beanstalkd-cli  --server 127.0.0.1 --port 11300  monitor
但是不太直观。

-- 服务端监控的一个php的web监控控制台,超棒 --
使用方法,首先,电脑必须按照composer
然后,
composer create-project ptrofimov/beanstalk_console -s dev 目标目录
这个目标目录通常位于web根目录下。一句命令就把控制台安装成功。
然后,
打开浏览器
http://域名/目标目录/public/index.php
神奇的控制台就出现了。
php好棒!



-- 客户端安装 --

composer安装
"pda/pheanstalk":"3.1.0"



-- 客户端代码 --
为简单,写一个文件里了,真实项目肯定分开
<?php
namespace app\test\controller;
use Pheanstalk\Pheanstalk;
/**
 * 这是门面程序。
 * 使用之前,需要beanstalkd服务已经在本机启动。
 * 我是用框架执行的,如果没有框架,需写成两个php文件。
 * 
 * @author Administrator
 *
 */
class Beanstalkd
{
    /**
     * 这是添加消息到队列。
     */
    public function add()   {
        $beanstalkd = new Pheanstalk('127.0.0.1', '11300');
        //这是消息数据,在本demo中,type不能省略。区分任务类型。
        $data = array(
            'type'   => mt_rand(1,2),
            'mobile' => '13051662435',
            'id'     => mt_rand(100,200),
            'time'   => date("Y-m-d H:i:s"),
        );
        $delay = (int) strtotime($data['time']) - time();
        $delay=0;
          
        // 把消息放入队列,1024 是优先级
         //$delay 非常重要,假设设置10,则该消息10秒后才被放入队列!非常好使。
        $beanstalkd->useTube( Worker::queue_name  )
            ->put(serialize($data), 1024, $delay);
    }
    
    /**
     * 这是运行队列管理器。
     */
    public function run()
    {
        $worker = new Worker();
        $worker->run();
    }
    
    
    
}

/**
 * 这是真正的任务执行1
 * @author Administrator
 *
 */
class WorkerJob1
{
    public function excute($data)
    {
        echo "---\n";
        echo  ('job:type1:'.print_r($data, 1));
        echo "---\n";
    }
}

/**
 * 这是真正的任务执行2
 * @author Administrator
 *
 */
class WorkerJob2
{
    public function excute($data)
    {
        echo "---\n";
        echo  ('job:type2:'.print_r($data, 1));
        echo "---\n";
    }
}

/**
 * 这是队列管理器守护进程,最好用supervisord这个软件支持一下。
   且应该放在linux的后台执行
 * @author Administrator
 *
 */
class Worker {

    private $job1; //任务对象
    private $job2; //这是任务对象
    private $pheanstalk;//这是服务
    // 设置队列名称。随意起
    const queue_name ="queue1";

    public function __construct() {
       
        $this->log('starting');
        $this->pheanstalk = new Pheanstalk('127.0.0.1:11300');
        $this->job1 = new WorkerJob1();
        $this->job2 = new WorkerJob2();
    }

    public  function excute($data)
    {
        if ($data['type']==1) {
            $this->job1->excute($data);
        }elseif ($data['type']==2) {
            $this->job2->excute($data);
        }
    }

    public function run() {
        $this->log('starting to run');

        while(true) {
            $job = $this->pheanstalk->watch( self::queue_name )->ignore('default')->reserve();
            $job_data = unserialize( $job->getData());
            
            // 真正工作的就这句。
            $this->excute($job_data);
            //删除千万不能忘记。
            $this->pheanstalk->delete($job);

            $memory = memory_get_usage();
            //这是一个保险措施。也可以去除。
            if($memory > 100000000) {
                $this->log('exiting run due to memory limit');
                exit;
            }
        }
    }

    private function log($txt) {
        echo "worker: ".$txt."\n";
    }
}


-- 浏览器输出 --

  • 大小: 15.5 KB
0
0
分享到:
评论

相关推荐

    tp5.1消息队列 think-queue

    标题 "tp5.1消息队列 think-queue" 指的是使用ThinkPHP5.1框架集成的消息队列组件——think-queue。消息队列在软件开发中扮演着重要角色,它允许应用程序异步处理耗时任务,提高系统响应速度和整体性能。think-queue...

    php队列+php-redis队列+php-redis扩展

    在IT行业中,队列是一种常见的数据结构,尤其在处理大量并发任务或后台异步操作时,它的作用尤为重要。本文将详细探讨PHP中的队列技术,包括如何使用PHP-Redis扩展来实现高效的数据处理。 首先,PHP队列是PHP应用...

    队列应用---划分冲突子集问题 为运动会安排日程

    环队列结构,解决“划分冲突子集问题”,这是一种典型的队列应用,常用于日程安排、资源调度等问题。在给定的运动会日程安排场景中,我们需要将比赛项目分配到不同的时间,使得每个运动员能参加的所有项目不会在同一...

    XXL-MQ是一款轻量级分布式消息队列支持串行并行和广播等多种消息模型

    XXL-MQ是一款专为Java开发设计的轻量级分布式消息队列系统,它提供了高效、稳定的消息传递功能,能够帮助开发者在分布式环境下构建可靠的数据通信。作为一款强大的中间件,XXL-MQ旨在解决微服务架构中的异步处理、...

    队列应用--舞伴配对

    在IT领域,队列是一种非常基础且重要的数据结构,它遵循先进先出(FIFO,First In First Out)的原则。在“队列应用--舞伴配对”这个场景中,我们探讨的是如何利用C语言来实现一个舞伴配对的算法,这涉及到队列的...

    队列---适合初学者

    4. **阻塞队列(Blocking Queue)**:在多线程环境下,当队列为空时,出队操作会阻塞,直到有新的元素入队;同样,当队列满时,入队操作也会阻塞,直到有元素出队。 ### 队列的应用 1. **任务调度**:操作系统中的...

    Pheanstalk一个Beanstalkd队列PHP客户端库

    Pheanstalk是一个针对Beanstalkd轻量级工作队列的PHP客户端库。Beanstalkd是一款高性能、轻量级的分布式消息队列系统,它允许应用程序通过将任务放入队列来解耦任务的生产者和消费者,从而提高系统的可扩展性和可靠...

    同步队列-无锁队列-循环数组无锁队列.zip

    配套代码讲解:https://blog.csdn.net/songchuwang1868/article/details/90200251 ...同步队列-无锁队列-循环数组无锁队列 同步队列-无锁队列-循环数组无锁队列 同步队列-无锁队列-循环数组无锁队列

    Go-GoQ在Golang中的轻量级消息队列

    GoQ是Golang中一个轻量级的消息队列实现,专为简单、高效以及易于集成到Go应用程序中而设计。消息队列作为一种中间件,它允许不同组件之间异步通信,提高了系统的可扩展性和容错性。GoQ的出现使得开发者能够更便捷地...

    数据结构--队列--思维导图.pdf

    数据结构--队列--思维导图.pdf 数据结构中,队列(Queue)是一种重要的抽象数据类型,具有重要的应用价值。队列是一种特殊的线性表,它只允许在一端进行插入,另一端进行删除。队列的基本概念和操作将在下面详细...

    消息队列源码-Linux.zip

    4. 资源管理:查看如何分配和释放消息队列相关的内存资源,以及如何处理错误情况。 5. 持久化:分析消息队列如何实现数据的持久化,即使在系统重启后也能恢复消息。 通过阅读和理解这份源码,开发者能够深入理解...

    PHP消息队列服务php-queue.zip

    php-queue 是 PHP开发的磁盘存储消息队列服务,基于leveldb和swoole ,在4核机器上处理能力可以达到2.5W/s 。leveldb: ...

    - 为什么使用消息队列? - 消息队列有什么优点和缺点? - Kafka、ActiveMQ、RabbitMQ、RocketMQ

    - 为什么使用消息队列? - 消息队列有什么优点和缺点? - Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,以及适合哪些场景?

    Go-Netlog-一个轻量级HTTP-centric基于日志(Kafka风格)的消息队列

    **Go-Netlog:构建轻量级HTTP-Centric Kafka风格消息队列** Go-Netlog 是一款用 Go 语言开发的轻量级消息队列,它借鉴了 Apache Kafka 的设计理念,但更注重HTTP协议,使其更适合现代Web服务的集成与通信。在本文中...

    FreeRTOS动态静态创建任务-二值信号量-队列-内存管理

    FreeRTOS动态静态创建任务-二值信号量-队列-内存管理

    分布式消息队列----如何保证消息的顺序性?

    如何保证消息的顺序性?

    基于python实现的sqlite队列sqlite-queue-python-master

    根据提供的标签,“sqlite”和“python”,我们可以推断这个项目专注于SQLite数据库的Python开发,适用于那些需要在Python应用中使用轻量级、文件存储型数据库的场合。 尽管没有具体的代码示例,我们可以推测该...

    数据结构-03-123栈队列数组-691.ppt

    数据结构-03-123栈队列数组-691.ppt

    消息队列简介-培训材料

    消息队列简介 消息队列,作为信息技术领域中一种重要的工具,主要用于实现分布式系统中的异步服务间通信。它基于先进先出(FIFO)的数据结构——队列,允许应用程序将数据(即消息)放入队列,而其他应用程序可以从...

Global site tag (gtag.js) - Google Analytics