`

异步任务神器 Celery

阅读更多

        

Celery

在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,而发邮件是个 IO 阻塞式任务,如果直接把它放到应用当中,就需要等邮件发出去之后才能进行下一步操作,此时用户只能等待再等待。更好的方式是在业务逻辑中触发一个发邮件的异步任务,而主程序可以继续往下运行。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图:

Celery_frameworkCelery_framework

可以看到,Celery 主要包含以下几个模块:

  • 任务模块 Task

    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列

  • 消息中间件 Broker

    Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

  • 任务执行单元 Worker

    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它

  • 任务结果存储 Backend

    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

异步任务

使用 Celery 实现异步任务主要包含三个步骤:

  1. 创建一个 Celery 实例
  2. 启动 Celery Worker
  3. 应用程序调用异步任务

快速入门

为了简单起见,对于 Broker 和 Backend,这里都使用 redis。在运行下面的例子之前,请确保 redis 已正确安装,并开启 redis 服务,当然,celery 也是要安装的。可以使用下面的命令来安装 celery 及相关依赖:

1
$ pip install 'celery[redis]'

创建 Celery 实例

将下面的代码保存为文件 tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding: utf-8 -*-
 
import time
from celery import Celery
 
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'
 
app = Celery('my_task', broker=broker, backend=backend)
 
@app.task
def add(x, y):
time.sleep(5) # 模拟耗时操作
return x + y

上面的代码做了几件事:

  • 创建了一个 Celery 实例 app,名称为 my_task
  • 指定消息中间件用 redis,URL 为 redis://127.0.0.1:6379
  • 指定存储用 redis,URL 为 redis://127.0.0.1:6379/0
  • 创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;

启动 Celery Worker

在当前目录,使用如下方式启动 Celery Worker:

1
$ celery worker -A tasks --loglevel=info

其中:

  • 参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用 -A tasks.app
  • 参数 --loglevel 指定了日志级别,默认为 warning,也可以使用 -l info 来表示;

在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。

启动成功后,控制台会显示如下输出:

celerycelery

调用任务

现在,我们可以在应用程序中使用 delay() 或 apply_async() 方法来调用任务。

在当前目录打开 Python 控制台,输入以下代码:

1
2
3
>>> from tasks import add
>>> add.delay(2, 8)
<AsyncResult: 2272ddce-8be5-493f-b5ff-35a0d9fe600f>

在上面,我们从 tasks.py 文件中导入了 add 任务对象,然后使用 delay() 方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:

1
2
[2016-12-10 12:00:50,376: INFO/MainProcess] Received task: tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f]
[2016-12-10 12:00:55,385: INFO/PoolWorker-4] Task tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f] succeeded in 5.00642602402s: 10

这说明任务已经被调度并执行成功。

另外,我们如果想获取执行后的结果,可以这样做:

1
2
3
4
5
6
7
8
9
>>> result = add.delay(2, 6)
>>> result.ready() # 使用 ready() 判断任务是否执行完毕
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get() # 使用 get() 获取任务结果
8

在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py:

1
2
3
4
5
6
7
8
# -*- coding: utf-8 -*-
 
from tasks import add
 
# 异步任务
add.delay(2, 8)
 
print 'hello world'

运行命令 $ python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。

使用配置

在上面的例子中,我们直接把 Broker 和 Backend 的配置写在了程序当中,更好的做法是将配置项统一写入到一个配置文件中,通常我们将该文件命名为 celeryconfig.py。Celery 的配置比较多,可以在官方文档查询每个配置项的含义。

下面,我们再看一个例子。项目结构如下:

1
2
3
4
5
6
7
celery_demo # 项目根目录
├── celery_app # 存放 celery 相关文件
│   ├── __init__.py
│   ├── celeryconfig.py # 配置文件
│   ├── task1.py # 任务文件 1
│   └── task2.py # 任务文件 2
└── client.py # 应用程序

__init__.py 代码如下:

1
2
3
4
5
6
# -*- coding: utf-8 -*-
 
from celery import Celery
 
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块

celeryconfig.py 代码如下:

1
2
3
4
5
6
7
8
9
10
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
 
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
 
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task1',
'celery_app.task2'
)

task1.py 代码如下:

1
2
3
4
5
6
7
import time
from celery_app import app
 
@app.task
def add(x, y):
time.sleep(2)
return x + y

task2.py 代码如下:

1
2
3
4
5
6
7
import time
from celery_app import app
 
@app.task
def multiply(x, y):
time.sleep(2)
return x * y

client.py 代码如下:

1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
 
from celery_app import task1
from celery_app import task2
 
task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)
task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7)
 
print 'hello world'

现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

1
celery_demo $ celery -A celery_app worker --loglevel=info

接着,运行 $ python client.py,它会发送两个异步任务到 Broker,在 Worker 的窗口我们可以看到如下输出:

1
2
3
4
[2016-12-10 13:51:58,939: INFO/MainProcess] Received task: celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa]
[2016-12-10 13:51:58,941: INFO/MainProcess] Received task: celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a]
[2016-12-10 13:52:00,948: INFO/PoolWorker-3] Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa] succeeded in 2.00600231002s: 10
[2016-12-10 13:52:00,949: INFO/PoolWorker-4] Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a] succeeded in 2.00601326401s: 21

delay 和 apply_async

在前面的例子中,我们使用 delay() 或 apply_async() 方法来调用任务。事实上,delay 方法封装了 apply_async,如下:

1
2
3
def delay(self, *partial_args, **partial_kwargs):
"""Shortcut to :meth:`apply_async` using star arguments."""
return self.apply_async(partial_args, partial_kwargs)

也就是说,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数,它的一般形式如下:

1
apply_async(args=(), kwargs={}, route_name=None, **options)

apply_async 常用的参数如下:

  • countdown:指定多少秒后执行任务
1
task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务
  • eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
1
2
3
4
from datetime import datetime, timedelta
 
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
  • expires:任务过期时间,参数类型可以是 int,也可以是 datetime
1
task1.multiply.apply_async(args=[3, 7], expires=10) # 10 秒后过期

更多的参数列表可以在官方文档中查看。

定时任务

Celery 除了可以执行异步任务,也支持执行周期性任务(Periodic Tasks),或者说定时任务。Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

让我们看看例子,项目结构如下:

1
2
3
4
5
6
celery_demo # 项目根目录
├── celery_app # 存放 celery 相关文件
   ├── __init__.py
   ├── celeryconfig.py # 配置文件
   ├── task1.py # 任务文件
   └── task2.py # 任务文件

__init__.py 代码如下:

1
2
3
4
5
6
# -*- coding: utf-8 -*-
 
from celery import Celery
 
app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')

celeryconfig.py 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# -*- coding: utf-8 -*-
 
from datetime import timedelta
from celery.schedules import crontab
 
# Broker and Backend
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
 
# Timezone
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'
 
# import
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
 
# schedules
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task1.add',
'schedule': timedelta(seconds=30), # 每 30 秒执行一次
'args': (5, 8) # 任务函数参数
},
'multiply-at-some-time': {
'task': 'celery_app.task2.multiply',
'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次
'args': (3, 7) # 任务函数参数
}
}

task1.py 代码如下:

1
2
3
4
5
6
7
import time
from celery_app import app
 
@app.task
def add(x, y):
time.sleep(2)
return x + y

task2.py 代码如下:

1
2
3
4
5
6
7
import time
from celery_app import app
 
@app.task
def multiply(x, y):
time.sleep(2)
return x * y

现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

1
celery_demo $ celery -A celery_app worker --loglevel=info

接着,启动 Celery Beat 进程,定时将任务发送到 Broker,在项目根目录下执行下面命令:

1
2
3
4
5
6
7
8
9
10
11
celery_demo $ celery beat -A celery_app
celery beat v4.0.1 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2016-12-11 09:48:16
Configuration ->
. broker -> redis://127.0.0.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)

之后,在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。

在上面,我们用两个命令启动了 Worker 进程和 Beat 进程,我们也可以将它们放在一个命令中:

1
$ celery -B -A celery_app worker --loglevel=info

Celery 周期性任务也有多个配置项,可参考官方文档

参考资料

 

分享到:
评论

相关推荐

    Python并行分布式框架Celery详解

    Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。 在 Python 中定义 Celery 的...

    一个基于Qt Creator(qt,C++)实现中国象棋人机对战

    qt 一个基于Qt Creator(qt,C++)实现中国象棋人机对战.

    热带雨林自驾游自然奇观探索.doc

    热带雨林自驾游自然奇观探索

    冰川湖自驾游冰雪交融景象.doc

    冰川湖自驾游冰雪交融景象

    C51 单片机数码管使用 Keil项目C语言源码

    C51 单片机数码管使用 Keil项目C语言源码

    基于智能算法的无人机路径规划研究 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    前端分析-2023071100789s12

    前端分析-2023071100789s12

    Delphi 12.3控件之Laz-制作了一些窗体和对话框样式.7z

    Laz_制作了一些窗体和对话框样式.7z

    ocaml-docs-4.05.0-6.el7.x64-86.rpm.tar.gz

    1、文件内容:ocaml-docs-4.05.0-6.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/ocaml-docs-4.05.0-6.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    学习笔记-沁恒第六讲-米醋

    学习笔记-沁恒第六讲-米醋

    工业机器人技术讲解【36页】.pptx

    工业机器人技术讲解【36页】

    基于CentOS 7和Docker环境下安装和配置Elasticsearch数据库

    内容概要:本文档详细介绍了在 CentOS 7 上利用 Docker 容器化环境来部署和配置 Elasticsearch 数据库的过程。首先概述了 Elasticsearch 的特点及其主要应用场景如全文检索、日志和数据分析等,并强调了其分布式架构带来的高性能与可扩展性。之后针对具体的安装流程进行了讲解,涉及创建所需的工作目录,准备docker-compose.yml文件以及通过docker-compose工具自动化完成镜像下载和服务启动的一系列命令;同时对可能出现的问题提供了应对策略并附带解决了分词功能出现的问题。 适合人群:从事IT运维工作的技术人员或对NoSQL数据库感兴趣的开发者。 使用场景及目标:该教程旨在帮助读者掌握如何在一个Linux系统中使用现代化的应用交付方式搭建企业级搜索引擎解决方案,特别适用于希望深入了解Elastic Stack生态体系的个人研究与团队项目实践中。 阅读建议:建议按照文中给出的具体步骤进行实验验证,尤其是要注意调整相关参数配置适配自身环境。对于初次接触此话题的朋友来说,应该提前熟悉一下Linux操作系统的基础命令行知识和Docker的相关基础知识

    基于CNN和FNN的进化神经元模型的快速响应尖峰神经网络 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    网络小说的类型创新、情节设计与角色塑造.doc

    网络小说的类型创新、情节设计与角色塑造

    毕业设计-基于springboot+vue开发的学生考勤管理系统【源码+sql+可运行】50311.zip

    毕业设计_基于springboot+vue开发的学生考勤管理系统【源码+sql+可运行】【50311】.zip 全部代码均可运行,亲测可用,尽我所能,为你服务; 1.代码压缩包内容 代码:springboo后端代码+vue前端页面代码 脚本:数据库SQL脚本 效果图:运行结果请看资源详情效果图 2.环境准备: - JDK1.8+ - maven3.6+ - nodejs14+ - mysql5.6+ - redis 3.技术栈 - 后台:springboot+mybatisPlus+Shiro - 前台:vue+iview+Vuex+Axios - 开发工具: idea、navicate 4.功能列表 - 系统设置:用户管理、角色管理、资源管理、系统日志 - 业务管理:班级信息、学生信息、课程信息、考勤记录、假期信息、公告信息 3.运行步骤: 步骤一:修改数据库连接信息(ip、port修改) 步骤二:找到启动类xxxApplication启动 4.若不会,可私信博主!!!

    57页-智慧办公园区智能化设计方案.pdf

    在智慧城市建设的大潮中,智慧园区作为其中的璀璨明珠,正以其独特的魅力引领着产业园区的新一轮变革。想象一下,一个集绿色、高端、智能、创新于一体的未来园区,它不仅融合了科技研发、商业居住、办公文创等多种功能,更通过深度应用信息技术,实现了从传统到智慧的华丽转身。 智慧园区通过“四化”建设——即园区运营精细化、园区体验智能化、园区服务专业化和园区设施信息化,彻底颠覆了传统园区的管理模式。在这里,基础设施的数据收集与分析让管理变得更加主动和高效,从温湿度监控到烟雾报警,从消防水箱液位监测到消防栓防盗水装置,每一处细节都彰显着智能的力量。而远程抄表、空调和变配电的智能化管控,更是在节能降耗的同时,极大地提升了园区的运维效率。更令人兴奋的是,通过智慧监控、人流统计和自动访客系统等高科技手段,园区的安全防范能力得到了质的飞跃,让每一位入驻企业和个人都能享受到“拎包入住”般的便捷与安心。 更令人瞩目的是,智慧园区还构建了集信息服务、企业服务、物业服务于一体的综合服务体系。无论是通过园区门户进行信息查询、投诉反馈,还是享受便捷的电商服务、法律咨询和融资支持,亦或是利用云ERP和云OA系统提升企业的管理水平和运营效率,智慧园区都以其全面、专业、高效的服务,为企业的发展插上了腾飞的翅膀。而这一切的背后,是大数据、云计算、人工智能等前沿技术的深度融合与应用,它们如同智慧的大脑,让园区的管理和服务变得更加聪明、更加贴心。走进智慧园区,就像踏入了一个充满无限可能的未来世界,这里不仅有科技的魅力,更有生活的温度,让人不禁对未来充满了无限的憧憬与期待。

    一种欠定盲源分离方法及其在模态识别中的应用 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    Matlab实现基于BO贝叶斯优化Transformer结合GRU门控循环单元时间序列预测的详细项目实例(含完整的程序,GUI设计和代码详解)

    内容概要:本文介绍了使用 Matlab 实现基于 BO(贝叶斯优化)的 Transformer 结合 GRU 门控循环单元时间序列预测的具体项目案例。文章首先介绍了时间序列预测的重要性及其现有方法存在的限制,随后深入阐述了该项目的目标、挑战与特色。重点描述了项目中采用的技术手段——结合 Transformer 和 GRU 模型的优点,通过贝叶斯优化进行超参数调整。文中给出了模型的具体实现步骤、代码示例以及完整的项目流程。同时强调了数据预处理、特征提取、窗口化分割、超参数搜索等关键技术点,并讨论了系统的设计部署细节、可视化界面制作等内容。 适合人群:具有一定机器学习基础,尤其是熟悉时间序列预测与深度学习的科研工作者或从业者。 使用场景及目标:适用于金融、医疗、能源等多个行业的高精度时间序列预测。该模型可通过捕捉长时间跨度下的复杂模式,提供更为精准的趋势预判,辅助相关机构作出合理的前瞻规划。 其他说明:此项目还涵盖了从数据采集到模型发布的全流程讲解,以及GUI图形用户界面的设计实现,有助于用户友好性提升和技术应用落地。此外,文档包含了详尽的操作指南和丰富的附录资料,包括完整的程序清单、性能评价指标等,便于读者动手实践。

    漫画与青少年教育关系.doc

    漫画与青少年教育关系

    励志图书的成功案例分享、人生智慧提炼与自我提升策略.doc

    励志图书的成功案例分享、人生智慧提炼与自我提升策略

Global site tag (gtag.js) - Google Analytics