`

建立一个支持并发的Laravel任务模块

 
阅读更多
laravel/lumen的事件、任务调度等都是基于队列来实现的,但由于php是进程模式,除非部署第三方模块,否则无法像java那样通过创建线程来实现并发执行。这对于laravel来说实在非常不方便,只能一个任务执行完再执行下一个任务,效率极其低下,而且容易阻塞。但并发任务的需求又非常常见,下面就基于laravel自行实现一个并发任务模块。

1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
shell_exec($shell . ' > /dev/null 2>&1 &');

上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。

2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
引用

CREATE TABLE `sq_task` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
  `modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
  `starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
  `endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
  `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
  `task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
  `parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
  `total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
  `success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
  `fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
  `memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
  PRIMARY KEY (`id`),
  KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;


<?php

namespace App\Http\Model;

use App\Http\Model\BaseModel;
use Illuminate\Support\Facades\DB;

class TaskModel extends BaseModel
{
    public $table = 'sq_task';
    public $primaryKey = 'id';
    const CREATED_AT = 'creationtime';
    const UPDATED_AT = 'modifiedtime';


    public function __construct()
    {
        parent::__construct();
    }

    public function search($shopId=false, $status=false, $skip=0, $limit=10){
        $db = DB::table($this->table);
        if($shopId !== false){
            $db->where('shopid', $shopId);
        }
        if($status !== false){
            $db->where('status', $status);
        }
        return $db->skip($skip)->take($limit)->get()->toArray();
    }

    /**
     * 保存商家管理员
     * @param $data
     * @return mixed
     */
    public function add($data){
        return DB::table($this->table)->insertGetId($data);
    }

    /**
     * @param $id
     * @return mixed
     */
    public function get($id){
        return DB::table($this->table)->where('id', $id)->first();
    }

    /**
     * @param $id
     * @param $array
     * @return mixed
     */
    public function modify($id, $array){
        if ($array) {
            $array['modifiedtime'] = time();
        }
        return DB::table($this->table)
            ->where('id', $id)
            ->update($array);
    }

    public function incrSuccess($id){
        return DB::table($this->table)
            ->where('id', $id)
            ->increment('success');
    }

    public function incrFail($id){
        return DB::table($this->table)
            ->where('id', $id)
            ->increment('fail');
    }
}


3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
<?php
namespace App\Console\Commands;


use App\Http\Model\TaskModel;

class ShopCommandHandler extends Command
{
    protected $signature = 'CommandHandler';

    protected $description = '任务触发器';

    public function handle()
    {
        info($this->description . "启动");

        $taskModel = new TaskModel();
        while(true) {
            $tasks = $taskModel->search(false, 0, 0, 10);
            foreach ($tasks as $task) {
                try {
                    $shell = 'php artisan ' . $task['task'] . ' ' . $task['id'];
                    shell_exec($shell . ' > /dev/null 2>&1 &');
                    info("准备执行任务:" . $task['task'] . '(' . $task['id'] . ')');
                } catch(\Exception $e){
                    info(json_encode($task) . '===>' . $e->getMessage() ."  :  " . $e->getTraceAsString());
                }
            }
            sleep(2);
        }
        info($this->description  ."结束");
    }
}


4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
<?php

namespace App\Console\Commands;

use App\Http\Model\TaskModel;

abstract class BaseCommand extends Command{
    private $key = null;
    private $subTasks = [];
    private $taskModel;
    protected $task;
    protected $taskId;
	/**
	 * Create a new command instance.
	 */
	public function __construct(){
		parent::__construct();

                $this->taskModel = new TaskModel();
	}

    /**
     * 获取待处理对象
     * @return array - 处理对象集合
     */
    abstract protected function getSubTasks();

    /**
     * 执行单个子任务
     * @param $item - 单个处理对象
     * @return bool - 处理结果
     */
    abstract protected function do($item);

    public function handle(){
        try{
            // 获取命令行参数
            $this->task = $this->getTask();
            if(!$this->task){
                return;
            }
            info($this->logName);
            info('任务开始:' . $this->signature . $this->taskId);

            // 设置任务名称
            $name = $this->task['task'];
            $this->key = $this->lockPrefix . $name . '_' . $this->task['id'];

            // 获取任务信息
            $this->subTasks = $this->getSubTasks();

            // 计算任务子任务数量
            $this->updateSubTaskTotal();

            // 更新任务信息(子任务数量、任务状态、启动时间)
            $this->taskModel->modify($this->taskId, ['starttime'=>time(), 'status' => 1]);

            // 设置任务锁
            $this->setOrCheckLocked($this->key, $ttlSecond = 60 * 60 * 24);

            // 执行任务
            foreach ($this->subTasks as $item) {
                // 执行子任务
                try {
                    $r = $this->do($item);
                } catch(\Exception $e){
                    info('任务异常,' . json_encode($item) . '===>' . $e->getMessage() .': ' . $e->getTraceAsString());
                    $r = false;
                }
                // 更新当前进度
                $this->incrProcess($r);
            }
            // 更新任务信息(任务状态、完成时间)
            $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 2]);

            info('任务结束:' . $this->signature . $this->taskId);
        } catch(\Exception $e){
            // 更新任务信息(任务状态、完成时间)
            $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 3, 'memo'=>$e->getMessage()]);

            info('任务异常(' . $e->getMessage() .'): ' . $e->getTraceAsString());
        } finally{
            // 释放任务锁
            if($this->key) {
                $this->releaseLocked($this->key);
            }
        }
    }

    /**
     * @throws \Exception
     */
    private function getTaskId(){
        $taskId = $this->argument('id');
        if(!$taskId){
            throw new \Exception('获取任务ID失败', 9901);
        }
        return $taskId;
    }

    /**
     * @throws \Exception
     */
    private function getTask(){
        $this->taskId = $this->getTaskId();
        $task = $this->taskModel->get($this->taskId);
        if(!$task){
            throw new \Exception('任务不存在', 9902);
        }
        if($task['status'] === 2){
            $this->log("任务[$this->taskId]已完成,放弃执行。");
            return false;
        }
        if($task['status'] === 1 && time()-$task['starttime']<86400*2){
            info("任务[$this->taskId]正在执行,且未超过2小时,放弃执行。");
            return false;
        }
        return $task;
    }

    private function updateSubTaskTotal(){
        $this->taskModel->modify($this->taskId, ['total_num'=>count($this->subTasks)]);
    }

    private function incrProcess($result){
        if($result) {
            $this->taskModel->incrSuccess($this->taskId);
        } else {
            $this->taskModel->incrFail($this->taskId);
        }
    }

    protected function getConf($name){
        $conf = json_decode($this->task['parameters'],true);
        if($conf && isset($conf[$name])){
            return $conf[$name];
        } else {
            return null;
        }
    }
}


5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
<?php
<?php
namespace App\Console\Commands;

use App\Http\Service\OrderService;
use App\Libs\MicroService\CouponMicroService;

class SendActiveUserCoupon extends BaseCommand
{
    protected $signature = 'SendActiveUserCoupon {id}';

    protected $description = '向活跃用户发放优惠券';


    /**
     * 获取待处理对象
     * @return array - 处理对象集合
     */
    protected function getSubTasks(){

        $result = (new OrderService())->searchUser($this->task['shopid'], $this->getConf('conditions'));
        return array_column($result, 'cid');

    }

    /**
     * 执行单个子任务
     * @param $item - 单个处理对象
     * @return bool - 处理结果
     * @throws
     */
    protected function do($item){
        $couponMicroService = new CouponMicroService();
        $couponMicroService->entity_create($this->getConf('provide_id'), $item);
        return true;
    }

}
分享到:
评论

相关推荐

    laravel分布式并发锁

    在 Laravel 框架中,分布式并发锁是一个关键的概念,特别是在高并发的场景下,它能够确保多个请求在处理同一资源时不会发生冲突。这里,我们深入探讨 Laravel 的并发锁机制,以及如何在 PHP 开发中利用这个特性。 ...

    Laravel开发-laravel-clickhouse

    要在Laravel项目中使用ClickHouse,首先需要安装一个适配器包,例如`yandex/laravel-clickhouse`。这个包允许Laravel通过Eloquent ORM与ClickHouse进行交互,提供了一套与原生MySQL相似的API,方便开发者操作数据库...

    Laravel开发-laravel-locking

    当一个任务正在处理时,其他相同任务会被放入队列,等待前一个任务完成。这样避免了多个任务同时操作相同数据。 总结来说,Laravel的锁定机制涵盖了多种层面,包括Redis、数据库事务、乐观锁、互斥锁和信号量,以及...

    Laravel开发-laravel-queue-pool

    总的来说,`laravel-queue-pool`是Laravel开发中的一个重要工具,尤其对于处理大量并发任务的应用来说,它能够提升效率,保证服务的稳定性和可靠性。通过合理配置和使用,开发者可以优化队列处理流程,提高整个系统...

    Laravel开发-laravel-amqp

    **正文** 在IT行业中,Laravel是一个非常流行的PHP框架,以其优雅的设计和强大的功能深受开发者喜爱。...对于处理大量并发请求、执行耗时操作或者需要解耦组件的项目来说,`laravel-amqp` 是一个不可或缺的工具。

    Laravel开发-laravel-resque-redis

    Laravel-resque是Laravel社区为了更好地利用Resque(一个由GitHub上的Chris Jones开发的Redis驱动的任务队列系统)而创建的一个连接器。这个项目的主要目标是为Laravel提供对Resque的无缝集成,让开发者可以利用...

    Laravel开发-laravel-reactphp

    在“laravel-reactphp-master”这个项目中,很可能是实现了一个Laravel应用程序,同时集成了ReactPHP作为其后端服务器。这种架构可以充分利用ReactPHP的异步I/O特性,同时保留Laravel的强大功能,如路由、ORM和...

    Laravel开发-lock-laravel

    使用`Redis::lock()`方法,可以创建一个可设置超时时间的锁,当一个任务完成或者异常发生时,必须记得释放锁,以避免死锁问题。 3. **Mutex和Semaphore锁**:Laravel还提供了基于文件系统的Mutex和Semaphore锁,...

    Laravel开发-azure-laravel

    描述中提到的“支持存储队列的Laravel Azure包”暗示我们正在处理一个扩展包,这个包可能是为了帮助开发者更好地在Azure上利用存储队列服务。在Laravel中,队列可以用来处理耗时的任务,如发送电子邮件或执行后台...

    Laravel开发-laravel-hprose

    而`laravel-hprose`是针对Laravel或Lumen框架的一个扩展,旨在引入HPRose服务的支持,以便实现高性能、轻量级的RPC(远程过程调用)通信。在这个项目中,我们将深入探讨Laravel、Lumen、HPRose以及它们如何协同工作...

    Laravel开发-laravel-worker

    在Laravel框架中,`Laravel Worker`是一个关键组件,主要负责处理后台的任务,比如队列任务和周期性作业。这个组件使得开发者可以将耗时的操作(如发送邮件、处理大量数据或执行复杂的计算)从主线程中分离出来,...

    Laravel开发-laravel-counter

    总的来说,"laravel-counter"项目是Laravel生态系统的一个实践案例,它结合了Laravel的多种特性,提供了一个方便的计数解决方案。通过学习这个项目,开发者不仅可以了解Laravel的基础,还能掌握构建高效、可扩展的...

    Laravel开发-laravel-swoole

    `laravel-swoole`是一个专为Laravel设计的插件,它利用了Swoole扩展的强大功能,将PHP应用程序转换为高性能的HTTP服务器。 首先,了解Swoole是什么至关重要。Swoole是一个开源的、高性能的异步并行的PHP扩展,它...

    Laravel开发-laravel-sms

    6. **接收短信**:如果laravel-sms支持接收短信,开发者需要设置一个路由来处理短信的回复,通常涉及到短信验证码的验证或事件监听。 7. **测试**:为了确保短信服务的正常工作,开发者需要编写单元测试和功能测试...

    Laravel开发-laravel-zmq-driver

    这个项目是为 Laravel 框架提供 ZeroMQ(ZMQ)支持的一个驱动程序,使得 Laravel 应用能够利用 ZMQ 的强大功能进行消息传递和事件处理。 首先,我们需要了解 ZeroMQ(ZMQ)。ZeroMQ 是一个高性能、轻量级的消息队列...

    Laravel开发-laravel-datastore

    "Laravel Datastore"在此上下文中可能指的是Laravel与Google Cloud Datastore的集成,这是一个NoSQL数据库服务,用于存储非结构化数据。在这个主题中,我们将深入探讨Laravel框架与Datastore的结合使用,以及相关的...

    Laravel开发-laravel-resque-ex

    在 Laravel 中创建一个队列任务,你需要创建一个新的 `App\Jobs` 类,例如 `SendWelcomeEmail`,并实现 `handle` 方法来定义任务的具体操作: ```php namespace App\Jobs; use Illuminate\Bus\Queueable; use ...

    Laravel开发-laravel-crawler

    【Laravel开发-laravel-crawler】是一个基于Laravel框架构建的分布式Web爬虫项目,它利用了Laravel的队列功能来实现高效的数据抓取。在这个项目中,开发者可以利用Laravel的强大特性和灵活的架构设计,构建出可扩展...

Global site tag (gtag.js) - Google Analytics