1、需求
我们有多种不同的任务,这些任务优先级不同,比如我们有视频上传和压缩任务等,照片压缩上传等任务还有其他不重要的任务。这些任务耗时不同需要使用不同的worker
去处理。只是用celery
默认的队列就不能满足我们的需求了。
这就需要我们将不同的task
路由到不同队列,让不同的worker
处理不同种类的task
。
2、创建队列和交换机
关于交换机和队列可以先看看http://rabbitmq.mr-ping.com/
default_exchange = Exchange('dedfault', type='direct')
# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')
# 创建三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
# 定义默认队列和默认的交换机routing_key
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
3、创建task
from celery import Celery
import time
app = Celery()
app.config_from_object('celeryconfig')
# 视频压缩
@app.task
def video_compress(video_name):
time.sleep(10)
print 'Compressing the:', video_name
return 'success'
@app.task
def video_upload(video_name):
time.sleep(5)
print u'正在上传视频'
return 'success'
# 压缩照片
@app.task
def image_compress(image_name):
time.sleep(10)
print 'Compressing the:', image_name
return 'success'
# 其他任务
@app.task
def other(str):
time.sleep(10)
print 'Do other things'
return 'success'
我们已经定义了三个队列,现在我们想将操作视频和操作照片的task
分别路由到特定的队列。
4、指定路由
CELERY_ROUTES = ({'tasks.image_compress': {
'queue': 'images',
'routing_key': 'media.image'
}},{'tasks.video_upload': {
'queue': 'videos',
'routing_key': 'media.video'
}},{'tasks.video_compress': {
'queue': 'videos',
'routing_key': 'media.video'
}}, )
通过CELERY_ROUTES
来为每一个task
指定队列,如果有任务到达时,通过任务的名字来让指定的worker
来处理。
5、task
注册
关于任务的名字可以看看这篇文档http://docs.jinkan.org/docs/celery/userguide/tasks.html
celery可以自动生成名字,如果任务没有注册,就会出错。搜索后发现有人使用下面方法解决。
CELERY_IMPORTS = ("tasks",)
tasks是我保存任务的模块名,这样在创建worker
时就可以将任务注册到worker,如下面这样:
图片中的[tasks]下面的几个任务就是我tasks文件中的任务。
6、完整代码
tasks.py
from celery import Celery
import time
app = Celery()
app.config_from_object('celeryconfig')
# 视频压缩
@app.task
def video_compress(video_name):
time.sleep(10)
print 'Compressing the:', video_name
return 'success'
@app.task
def video_upload(video_name):
time.sleep(5)
print u'正在上传视频'
return 'success'
# 压缩照片
@app.task
def image_compress(image_name):
time.sleep(10)
print 'Compressing the:', image_name
return 'success'
# 其他任务
@app.task
def other(str):
time.sleep(10)
print 'Do other things'
return 'success'
celeryconfig.py
from kombu import Exchange, Queue
from routers import MyRouter
# 配置市区
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BROKER = 'amqp://localhost'
# 定义一个默认交换机
default_exchange = Exchange('dedfault', type='direct')
# 定义一个媒体交换机
media_exchange = Exchange('media', type='direct')
# 创建三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
#
CELERY_ROUTES = ({'tasks.image_compress': {
'queue': 'images',
'routing_key': 'media.image'
}},{'tasks.video_upload': {
'queue': 'videos',
'routing_key': 'media.video'
}},{'tasks.video_compress': {
'queue': 'videos',
'routing_key': 'media.video'
}}, )
# 在出现worker接受到的message出现没有注册的错误时,使用下面一句能解决
CELERY_IMPORTS = ("tasks",)
注意在启动worker
的时候需要制定队列,需要在保存任务的目录中打开终端启动worker,这个问题我还没有搞明白!
# 启动worker1
celery worker -Q default --loglevel=info
celery worker -Q videos --loglevel=info
启动处理图片的worker
celery worker -Q images --loglevel=info
这样我们就可以把不同类的任务路由到不同的worker上处理了。
遇到的坑
在你修改了配置文件需要重启worker时,记得把python shell也关掉重启。
如果调用任务但是worker出现这样的错误
unregistered task of type
就说明你的任务没有注册,需要加上我上面提到的CELERY_IMPORTS = ("tasks",)
到配置文件中。
建议将还是自己多动手调试,把芹菜的官方文档看看,里面的东西会让我们看了豁然开朗。
作者:嘿嘿_小余同学 链接:http://www.jianshu.com/p/11b420aea529 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关推荐
Celery 基础配置和启动 Celery 是一个基于分布式消息队列的异步任务队列,能够帮助我们实现任务的异步执行,降低系统的耦合度和提高系统的可扩展性。Celery 的基础配置和启动是使用 Celery 的第一步。 首先,我们...
4. **PyYAML**(可选):如果你的配置文件是YAML格式,那么Celery可能需要这个库来解析配置。 5. **Click**(可选):Celery命令行工具依赖Click库,用于生成命令行界面。 6. **six**:这个库提供了对Python 2和...
### Django+Celery+RabbitMQ配置文档 #### 一、环境搭建与配置 ##### 1.1 系统环境 - **操作系统**: Ubuntu 14.04 - **开发工具**: PyCharm 5.0 - **虚拟环境**: 使用虚拟环境隔离项目依赖,避免环境冲突。 #####...
celery配置文档,主要用于配置celery
1. **创建Celery实例**:初始化Celery实例,配置必要的参数,如Broker地址、Backend地址等。 2. **启动Celery Worker**:运行Celery Worker进程,使其能够监听任务队列并执行任务。 3. **应用程序调用异步任务**:在...
2. **任务路由**:Celery根据配置的路由策略将任务发送到相应的队列。 3. **任务消费**:工作者从队列中取出任务,执行任务代码。 4. **结果存储**:如果使用结果后端,工作者会将任务结果存储在那里。 5. **结果...
**Celery 基础与配置详解** Celery 是一个分布式任务队列,它专注于实时操作,但也支持调度。在Python开发中,Celery常用于处理异步任务,提高应用程序的响应速度和效率。本文将深入探讨Celery的基本使用方法及其...
# Celery 配置 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Redis 连接 URL CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果存储的 Redis 连接 URL CELERY_ACCEPT_CONTENT = ['json'] CELERY_...
总的来说,Celery文档V4.0为读者提供了全面的分布式任务处理解决方案的介绍,以及如何安装、配置和使用Celery框架的详细指南。通过上述内容的总结,读者可以获得Celery框架的核心知识,并根据自己的需求进行实践应用...
文档中提供了对Celery的基本介绍、安装步骤、配置指导和使用教程,是学习如何使用Celery来创建和维护分布式系统的宝贵资源。文档中还提到了项目贡献者和开发社区,鼓励读者通过邮件列表或IRC频道加入社区进行交流。 ...
标题 "celery+ rabbitMq + python windows环境配置" 涉及到的是在Windows操作系统上集成使用Celery和RabbitMQ的流程。Celery是一个分布式任务队列,主要用于处理异步任务,而RabbitMQ则是一个开源的消息代理,常被...
这个压缩包文件"celery——cmd命令.zip"显然包含了在Windows环境下使用Django框架和Celery进行分布式任务处理的相关配置和启动脚本。我们将深入探讨如何在IIS(Internet Information Services)服务器上设置和运行...
2. **任务路由**: 根据配置的路由规则,任务被分发到对应的队列。 3. **任务接收**: 工作者监听队列,一旦有新任务到达,就会从队列中取出并开始执行。 4. **任务执行**: 工作者执行任务,这个过程是异步的,不会...
在提供的压缩包文件"Celery框架构建一部任务服务程序"中,可能包含了完整的示例代码、配置文件以及运行说明,你可以参考这些内容进一步了解和实践Celery的使用。通过深入学习和实践,你将能够熟练掌握利用Celery构建...
**标题解析:** "django celery celery beat项目" 这个标题揭示了我们正在讨论一个基于Django框架构建的项目,该项目集成了Celery和Celery Beat。Celery是一个分布式任务队列,它允许我们将异步任务分解为可并行执行...
**Celery 异步代码实测** 在现代的 Web 应用开发中,为了提高系统的响应速度和用户体验,异步处理已经成为不可或缺的一部分。...记得在生产环境中,对 Celery 的配置和监控也需要相应的重视,以确保系统的稳定运行。
该模板主要关注符合任务路由以及基本任务链,其中任务可以依赖于其他任务并且需要按顺序执行。 组织 calc # Application root ├── __init__.py # Celery app instance is imported here ├── celery.py # ...
2. **配置Celery**:在项目中设置Celery,包括选择消息中间件,定义任务,设置定时任务。 3. **启动Celery worker**:使用`celery -A your_app worker --loglevel=info`命令启动Celery工作进程。 4. **启动Flower**...
它可能包含了一个完整的示例,演示了如何配置 Django 与 Celery 的集成,以及如何定义、调度和执行 Celery 任务。 在 Django 中使用 Celery,首先需要安装 Celery 和相关的消息中间件(如 RabbitMQ 或 Redis)。...
5. **配置管理**:Flower允许用户在界面上动态调整Celery的配置参数,无需重启服务,提高了运维效率。 6. **安全性**:Flower支持身份验证和授权,确保只有授权的用户可以访问和操作Celery集群。 安装和使用Flower...