- 浏览: 60549 次
- 性别:
- 来自: 广州
文章分类
最新评论
laravel/lumen的事件、任务调度等都是基于队列来实现的,但由于php是进程模式,除非部署第三方模块,否则无法像java那样通过创建线程来实现并发执行。这对于laravel来说实在非常不方便,只能一个任务执行完再执行下一个任务,效率极其低下,而且容易阻塞。但并发任务的需求又非常常见,下面就基于laravel自行实现一个并发任务模块。
1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。
2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
CREATE TABLE `sq_task` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
`starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
`endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
`status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
`task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
`parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
`total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
`success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
`fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
`memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
1、要实现并发,就要先解决在PHP进程内创建新进程的问题。通过PHP本身是没有办法的,因此采用执行外部Shell脚本的方式启动新的PHP进程。但默认情况下,system、exec等方式执行shell时都是阻塞的,按网上文章说的用popen测试,也不成功。最后采用的是
shell_exec($shell . ' > /dev/null 2>&1 &');
上面的方法只能用在linux/unix环境,因为windows没有/dev/null这个空设备句柄。
2、任务模块不再依赖laravel队列来实现,触发任务时使用数据库保存任务信息,表结构如下:
引用
CREATE TABLE `sq_task` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`creationtime` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`modifiedtime` int(11) NOT NULL DEFAULT '0' COMMENT '最后更新时间',
`starttime` int(11) NOT NULL DEFAULT '0' COMMENT '启动时间',
`endtime` int(11) NOT NULL DEFAULT '0' COMMENT '完成时间',
`status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '状态,0未执行,1正在执行,2已完成,3出错',
`task` varchar(50) NOT NULL DEFAULT '' COMMENT '任务名,与Commands中的$signature一致',
`parameters` varchar(1000) NOT NULL DEFAULT '' COMMENT '启动参数',
`total_num` int(11) NOT NULL DEFAULT '0' COMMENT '总共完成子任务数量',
`success` int(11) NOT NULL DEFAULT '0' COMMENT '成功数量',
`fail` int(11) NOT NULL DEFAULT '0' COMMENT '失败数量',
`memo` varchar(2000) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
<?php namespace App\Http\Model; use App\Http\Model\BaseModel; use Illuminate\Support\Facades\DB; class TaskModel extends BaseModel { public $table = 'sq_task'; public $primaryKey = 'id'; const CREATED_AT = 'creationtime'; const UPDATED_AT = 'modifiedtime'; public function __construct() { parent::__construct(); } public function search($shopId=false, $status=false, $skip=0, $limit=10){ $db = DB::table($this->table); if($shopId !== false){ $db->where('shopid', $shopId); } if($status !== false){ $db->where('status', $status); } return $db->skip($skip)->take($limit)->get()->toArray(); } /** * 保存商家管理员 * @param $data * @return mixed */ public function add($data){ return DB::table($this->table)->insertGetId($data); } /** * @param $id * @return mixed */ public function get($id){ return DB::table($this->table)->where('id', $id)->first(); } /** * @param $id * @param $array * @return mixed */ public function modify($id, $array){ if ($array) { $array['modifiedtime'] = time(); } return DB::table($this->table) ->where('id', $id) ->update($array); } public function incrSuccess($id){ return DB::table($this->table) ->where('id', $id) ->increment('success'); } public function incrFail($id){ return DB::table($this->table) ->where('id', $id) ->increment('fail'); } }
3、还需要一个单独执行的Command来替代原有队列的功能,负责检查并执行待执行的任务
<?php namespace App\Console\Commands; use App\Http\Model\TaskModel; class ShopCommandHandler extends Command { protected $signature = 'CommandHandler'; protected $description = '任务触发器'; public function handle() { info($this->description . "启动"); $taskModel = new TaskModel(); while(true) { $tasks = $taskModel->search(false, 0, 0, 10); foreach ($tasks as $task) { try { $shell = 'php artisan ' . $task['task'] . ' ' . $task['id']; shell_exec($shell . ' > /dev/null 2>&1 &'); info("准备执行任务:" . $task['task'] . '(' . $task['id'] . ')'); } catch(\Exception $e){ info(json_encode($task) . '===>' . $e->getMessage() ." : " . $e->getTraceAsString()); } } sleep(2); } info($this->description ."结束"); } }
4、为了符合laravel的编程习惯,shell命令使用的是artisan命令行方式开发。使用artisan的好处是可以直接复用laravel里面的各种类,并且开发出来的命令,也可以单独执行。为了统一命令的行为,自定义了一个Command基类
<?php namespace App\Console\Commands; use App\Http\Model\TaskModel; abstract class BaseCommand extends Command{ private $key = null; private $subTasks = []; private $taskModel; protected $task; protected $taskId; /** * Create a new command instance. */ public function __construct(){ parent::__construct(); $this->taskModel = new TaskModel(); } /** * 获取待处理对象 * @return array - 处理对象集合 */ abstract protected function getSubTasks(); /** * 执行单个子任务 * @param $item - 单个处理对象 * @return bool - 处理结果 */ abstract protected function do($item); public function handle(){ try{ // 获取命令行参数 $this->task = $this->getTask(); if(!$this->task){ return; } info($this->logName); info('任务开始:' . $this->signature . $this->taskId); // 设置任务名称 $name = $this->task['task']; $this->key = $this->lockPrefix . $name . '_' . $this->task['id']; // 获取任务信息 $this->subTasks = $this->getSubTasks(); // 计算任务子任务数量 $this->updateSubTaskTotal(); // 更新任务信息(子任务数量、任务状态、启动时间) $this->taskModel->modify($this->taskId, ['starttime'=>time(), 'status' => 1]); // 设置任务锁 $this->setOrCheckLocked($this->key, $ttlSecond = 60 * 60 * 24); // 执行任务 foreach ($this->subTasks as $item) { // 执行子任务 try { $r = $this->do($item); } catch(\Exception $e){ info('任务异常,' . json_encode($item) . '===>' . $e->getMessage() .': ' . $e->getTraceAsString()); $r = false; } // 更新当前进度 $this->incrProcess($r); } // 更新任务信息(任务状态、完成时间) $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 2]); info('任务结束:' . $this->signature . $this->taskId); } catch(\Exception $e){ // 更新任务信息(任务状态、完成时间) $this->taskModel->modify($this->taskId, ['endtime'=>time(), 'status' => 3, 'memo'=>$e->getMessage()]); info('任务异常(' . $e->getMessage() .'): ' . $e->getTraceAsString()); } finally{ // 释放任务锁 if($this->key) { $this->releaseLocked($this->key); } } } /** * @throws \Exception */ private function getTaskId(){ $taskId = $this->argument('id'); if(!$taskId){ throw new \Exception('获取任务ID失败', 9901); } return $taskId; } /** * @throws \Exception */ private function getTask(){ $this->taskId = $this->getTaskId(); $task = $this->taskModel->get($this->taskId); if(!$task){ throw new \Exception('任务不存在', 9902); } if($task['status'] === 2){ $this->log("任务[$this->taskId]已完成,放弃执行。"); return false; } if($task['status'] === 1 && time()-$task['starttime']<86400*2){ info("任务[$this->taskId]正在执行,且未超过2小时,放弃执行。"); return false; } return $task; } private function updateSubTaskTotal(){ $this->taskModel->modify($this->taskId, ['total_num'=>count($this->subTasks)]); } private function incrProcess($result){ if($result) { $this->taskModel->incrSuccess($this->taskId); } else { $this->taskModel->incrFail($this->taskId); } } protected function getConf($name){ $conf = json_decode($this->task['parameters'],true); if($conf && isset($conf[$name])){ return $conf[$name]; } else { return null; } } }
5、剩下的就是开发具体的任务了,下面是一个发放向用户优惠券示例:
<?php <?php namespace App\Console\Commands; use App\Http\Service\OrderService; use App\Libs\MicroService\CouponMicroService; class SendActiveUserCoupon extends BaseCommand { protected $signature = 'SendActiveUserCoupon {id}'; protected $description = '向活跃用户发放优惠券'; /** * 获取待处理对象 * @return array - 处理对象集合 */ protected function getSubTasks(){ $result = (new OrderService())->searchUser($this->task['shopid'], $this->getConf('conditions')); return array_column($result, 'cid'); } /** * 执行单个子任务 * @param $item - 单个处理对象 * @return bool - 处理结果 * @throws */ protected function do($item){ $couponMicroService = new CouponMicroService(); $couponMicroService->entity_create($this->getConf('provide_id'), $item); return true; } }
发表评论
-
ElementUI上传组件在Lumen环境下跨域问题的解决
2020-08-25 11:10 909在后台的路由中间件中要增加跨域设置: namespace ... -
lumen集成结巴分词
2020-03-31 16:46 304常规的方法是通过compoer集成 composer re ... -
Lumen/laravel动态分库的实现
2019-07-04 10:39 793lumen默认支持多数据源,但如果系统存在多个结构相同的数据库 ... -
lumen操作mongodb
2019-01-30 14:40 848前提:使用Eloquent访问mongodb class ... -
lumen5.5使用rabbitmq
2019-01-21 10:53 1097在composer.json中的require中增加以下语句 ... -
lumen使用mongodb
2019-01-09 16:16 17171. 安装mongodb扩展 执行sudo pecl in ... -
在lumen中开发和执行artisan命令行任务
2018-12-22 17:28 1912lumen是laravel的简化版,其中artisan部分删除 ... -
在lumen安装阿里云短信服务SDK
2018-12-12 12:15 8431、下载SDK:https://help.aliyun.com ... -
lumen日志权限冲突问题
2016-11-02 11:44 1762运行lumen项目一般使用nginx作为webserver,因 ... -
lumen中使用调度任务
2016-04-22 12:21 3013需要在crontab中增加一行 * * * * * php ... -
在lumen中使用smtp方式发送txt/plain邮件
2016-04-22 11:46 12721、安装邮件组件 修改composer.json,在re ... -
lumen下操作excel
2016-04-22 11:34 20181、安装excel组件 修改composer.json, ... -
lumen中使用redis队列
2016-04-22 11:18 18491、采用redis作为队列驱动 修改.env文件 QU ... -
lumen中安装及使用redis作为cache
2016-04-22 10:54 21161、安装redis模块 在compose.json的re ...
相关推荐
在 Laravel 框架中,分布式并发锁是一个关键的概念,特别是在高并发的场景下,它能够确保多个请求在处理同一资源时不会发生冲突。这里,我们深入探讨 Laravel 的并发锁机制,以及如何在 PHP 开发中利用这个特性。 ...
要在Laravel项目中使用ClickHouse,首先需要安装一个适配器包,例如`yandex/laravel-clickhouse`。这个包允许Laravel通过Eloquent ORM与ClickHouse进行交互,提供了一套与原生MySQL相似的API,方便开发者操作数据库...
当一个任务正在处理时,其他相同任务会被放入队列,等待前一个任务完成。这样避免了多个任务同时操作相同数据。 总结来说,Laravel的锁定机制涵盖了多种层面,包括Redis、数据库事务、乐观锁、互斥锁和信号量,以及...
总的来说,`laravel-queue-pool`是Laravel开发中的一个重要工具,尤其对于处理大量并发任务的应用来说,它能够提升效率,保证服务的稳定性和可靠性。通过合理配置和使用,开发者可以优化队列处理流程,提高整个系统...
**正文** 在IT行业中,Laravel是一个非常流行的PHP框架,以其优雅的设计和强大的功能深受开发者喜爱。...对于处理大量并发请求、执行耗时操作或者需要解耦组件的项目来说,`laravel-amqp` 是一个不可或缺的工具。
Laravel-resque是Laravel社区为了更好地利用Resque(一个由GitHub上的Chris Jones开发的Redis驱动的任务队列系统)而创建的一个连接器。这个项目的主要目标是为Laravel提供对Resque的无缝集成,让开发者可以利用...
在“laravel-reactphp-master”这个项目中,很可能是实现了一个Laravel应用程序,同时集成了ReactPHP作为其后端服务器。这种架构可以充分利用ReactPHP的异步I/O特性,同时保留Laravel的强大功能,如路由、ORM和...
使用`Redis::lock()`方法,可以创建一个可设置超时时间的锁,当一个任务完成或者异常发生时,必须记得释放锁,以避免死锁问题。 3. **Mutex和Semaphore锁**:Laravel还提供了基于文件系统的Mutex和Semaphore锁,...
描述中提到的“支持存储队列的Laravel Azure包”暗示我们正在处理一个扩展包,这个包可能是为了帮助开发者更好地在Azure上利用存储队列服务。在Laravel中,队列可以用来处理耗时的任务,如发送电子邮件或执行后台...
而`laravel-hprose`是针对Laravel或Lumen框架的一个扩展,旨在引入HPRose服务的支持,以便实现高性能、轻量级的RPC(远程过程调用)通信。在这个项目中,我们将深入探讨Laravel、Lumen、HPRose以及它们如何协同工作...
在Laravel框架中,`Laravel Worker`是一个关键组件,主要负责处理后台的任务,比如队列任务和周期性作业。这个组件使得开发者可以将耗时的操作(如发送邮件、处理大量数据或执行复杂的计算)从主线程中分离出来,...
总的来说,"laravel-counter"项目是Laravel生态系统的一个实践案例,它结合了Laravel的多种特性,提供了一个方便的计数解决方案。通过学习这个项目,开发者不仅可以了解Laravel的基础,还能掌握构建高效、可扩展的...
`laravel-swoole`是一个专为Laravel设计的插件,它利用了Swoole扩展的强大功能,将PHP应用程序转换为高性能的HTTP服务器。 首先,了解Swoole是什么至关重要。Swoole是一个开源的、高性能的异步并行的PHP扩展,它...
6. **接收短信**:如果laravel-sms支持接收短信,开发者需要设置一个路由来处理短信的回复,通常涉及到短信验证码的验证或事件监听。 7. **测试**:为了确保短信服务的正常工作,开发者需要编写单元测试和功能测试...
这个项目是为 Laravel 框架提供 ZeroMQ(ZMQ)支持的一个驱动程序,使得 Laravel 应用能够利用 ZMQ 的强大功能进行消息传递和事件处理。 首先,我们需要了解 ZeroMQ(ZMQ)。ZeroMQ 是一个高性能、轻量级的消息队列...
"Laravel Datastore"在此上下文中可能指的是Laravel与Google Cloud Datastore的集成,这是一个NoSQL数据库服务,用于存储非结构化数据。在这个主题中,我们将深入探讨Laravel框架与Datastore的结合使用,以及相关的...
在 Laravel 中创建一个队列任务,你需要创建一个新的 `App\Jobs` 类,例如 `SendWelcomeEmail`,并实现 `handle` 方法来定义任务的具体操作: ```php namespace App\Jobs; use Illuminate\Bus\Queueable; use ...
【Laravel开发-laravel-crawler】是一个基于Laravel框架构建的分布式Web爬虫项目,它利用了Laravel的队列功能来实现高效的数据抓取。在这个项目中,开发者可以利用Laravel的强大特性和灵活的架构设计,构建出可扩展...