`

Celery手动配置路由

阅读更多
 
1、需求

我们有多种不同的任务,这些任务优先级不同,比如我们有视频上传和压缩任务等,照片压缩上传等任务还有其他不重要的任务。这些任务耗时不同需要使用不同的worker去处理。只是用celery默认的队列就不能满足我们的需求了。
这就需要我们将不同的task路由到不同队列,让不同的worker处理不同种类的task

2、创建队列和交换机

关于交换机和队列可以先看看http://rabbitmq.mr-ping.com/

default_exchange = Exchange('dedfault', type='direct')

# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')

# 创建三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
# 定义默认队列和默认的交换机routing_key
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
3、创建task
from celery import Celery
import time

app = Celery()
app.config_from_object('celeryconfig')

# 视频压缩
@app.task
def video_compress(video_name):
    time.sleep(10)
    print 'Compressing the:', video_name
    return 'success'

@app.task
def video_upload(video_name):
    time.sleep(5)
    print u'正在上传视频'
    return 'success'

# 压缩照片
@app.task
def image_compress(image_name):
    time.sleep(10)
    print 'Compressing the:', image_name
    return 'success'

# 其他任务
@app.task
def other(str):
    time.sleep(10)
    print 'Do other things'
    return 'success'

我们已经定义了三个队列,现在我们想将操作视频和操作照片的task分别路由到特定的队列。

4、指定路由
CELERY_ROUTES = ({'tasks.image_compress': {
                        'queue': 'images',
                        'routing_key': 'media.image'
                 }},{'tasks.video_upload': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }},{'tasks.video_compress': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }}, )

通过CELERY_ROUTES来为每一个task指定队列,如果有任务到达时,通过任务的名字来让指定的worker来处理。

5、task注册

关于任务的名字可以看看这篇文档http://docs.jinkan.org/docs/celery/userguide/tasks.html
celery可以自动生成名字,如果任务没有注册,就会出错。搜索后发现有人使用下面方法解决。

CELERY_IMPORTS = ("tasks",)

tasks是我保存任务的模块名,这样在创建worker时就可以将任务注册到worker,如下面这样:


 


图片中的[tasks]下面的几个任务就是我tasks文件中的任务。

6、完整代码

tasks.py

from celery import Celery
import time


app = Celery()
app.config_from_object('celeryconfig')

# 视频压缩
@app.task
def video_compress(video_name):
    time.sleep(10)
    print 'Compressing the:', video_name
    return 'success'

@app.task
def video_upload(video_name):
    time.sleep(5)
    print u'正在上传视频'
    return 'success'

# 压缩照片
@app.task
def image_compress(image_name):
    time.sleep(10)
    print 'Compressing the:', image_name
    return 'success'

# 其他任务
@app.task
def other(str):
    time.sleep(10)
    print 'Do other things'
    return 'success'

celeryconfig.py

from kombu import Exchange, Queue
from routers import MyRouter

# 配置市区
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BROKER = 'amqp://localhost'
# 定义一个默认交换机
default_exchange = Exchange('dedfault', type='direct')

# 定义一个媒体交换机
media_exchange = Exchange('media', type='direct')

# 创建三个队列,一个是默认队列,一个是video、一个image
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
#
CELERY_ROUTES = ({'tasks.image_compress': {
                        'queue': 'images',
                        'routing_key': 'media.image'
                 }},{'tasks.video_upload': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }},{'tasks.video_compress': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }}, )

# 在出现worker接受到的message出现没有注册的错误时,使用下面一句能解决
CELERY_IMPORTS = ("tasks",)

注意在启动worker的时候需要制定队列,需要在保存任务的目录中打开终端启动worker,这个问题我还没有搞明白!

# 启动worker1
celery worker -Q default --loglevel=info
celery worker -Q videos --loglevel=info

启动处理图片的worker

celery worker -Q images --loglevel=info

这样我们就可以把不同类的任务路由到不同的worker上处理了。

遇到的坑

在你修改了配置文件需要重启worker时,记得把python shell也关掉重启。
如果调用任务但是worker出现这样的错误

unregistered task of type

就说明你的任务没有注册,需要加上我上面提到的CELERY_IMPORTS = ("tasks",)到配置文件中。
建议将还是自己多动手调试,把芹菜的官方文档看看,里面的东西会让我们看了豁然开朗。


作者:嘿嘿_小余同学
链接:http://www.jianshu.com/p/11b420aea529
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
分享到:
评论

相关推荐

    celery的基础配置以及启动

    Celery 基础配置和启动 Celery 是一个基于分布式消息队列的异步任务队列,能够帮助我们实现任务的异步执行,降低系统的耦合度和提高系统的可扩展性。Celery 的基础配置和启动是使用 Celery 的第一步。 首先,我们...

    celery手动安装依赖包

    4. **PyYAML**(可选):如果你的配置文件是YAML格式,那么Celery可能需要这个库来解析配置。 5. **Click**(可选):Celery命令行工具依赖Click库,用于生成命令行界面。 6. **six**:这个库提供了对Python 2和...

    Django+celery+rabbitmq配置文档

    ### Django+Celery+RabbitMQ配置文档 #### 一、环境搭建与配置 ##### 1.1 系统环境 - **操作系统**: Ubuntu 14.04 - **开发工具**: PyCharm 5.0 - **虚拟环境**: 使用虚拟环境隔离项目依赖,避免环境冲突。 #####...

    celery配置文档,主要用于配置celery

    celery配置文档,主要用于配置celery

    详解django+django-celery+celery的整合实战

    1. **创建Celery实例**:初始化Celery实例,配置必要的参数,如Broker地址、Backend地址等。 2. **启动Celery Worker**:运行Celery Worker进程,使其能够监听任务队列并执行任务。 3. **应用程序调用异步任务**:在...

    Celery简介

    2. **任务路由**:Celery根据配置的路由策略将任务发送到相应的队列。 3. **任务消费**:工作者从队列中取出任务,执行任务代码。 4. **结果存储**:如果使用结果后端,工作者会将任务结果存储在那里。 5. **结果...

    celery的基本使用

    **Celery 基础与配置详解** Celery 是一个分布式任务队列,它专注于实时操作,但也支持调度。在Python开发中,Celery常用于处理异步任务,提高应用程序的响应速度和效率。本文将深入探讨Celery的基本使用方法及其...

    Celery Document V4.0

    总的来说,Celery文档V4.0为读者提供了全面的分布式任务处理解决方案的介绍,以及如何安装、配置和使用Celery框架的详细指南。通过上述内容的总结,读者可以获得Celery框架的核心知识,并根据自己的需求进行实践应用...

    celery-v4.3.0中文.pdf

    文档中提供了对Celery的基本介绍、安装步骤、配置指导和使用教程,是学习如何使用Celery来创建和维护分布式系统的宝贵资源。文档中还提到了项目贡献者和开发社区,鼓励读者通过邮件列表或IRC频道加入社区进行交流。 ...

    celery+ rabbitMq + python window是下配置

    标题 "celery+ rabbitMq + python windows环境配置" 涉及到的是在Windows操作系统上集成使用Celery和RabbitMQ的流程。Celery是一个分布式任务队列,主要用于处理异步任务,而RabbitMQ则是一个开源的消息代理,常被...

    celery——cmd命令.zip

    这个压缩包文件"celery——cmd命令.zip"显然包含了在Windows环境下使用Django框架和Celery进行分布式任务处理的相关配置和启动脚本。我们将深入探讨如何在IIS(Internet Information Services)服务器上设置和运行...

    Celery课件笔记

    2. **任务路由**: 根据配置的路由规则,任务被分发到对应的队列。 3. **任务接收**: 工作者监听队列,一旦有新任务到达,就会从队列中取出并开始执行。 4. **任务执行**: 工作者执行任务,这个过程是异步的,不会...

    celery异步任务构建

    在提供的压缩包文件"Celery框架构建一部任务服务程序"中,可能包含了完整的示例代码、配置文件以及运行说明,你可以参考这些内容进一步了解和实践Celery的使用。通过深入学习和实践,你将能够熟练掌握利用Celery构建...

    django celery celery beat项目

    **标题解析:** "django celery celery beat项目" 这个标题揭示了我们正在讨论一个基于Django框架构建的项目,该项目集成了Celery和Celery Beat。Celery是一个分布式任务队列,它允许我们将异步任务分解为可并行执行...

    celery异步代码实测

    **Celery 异步代码实测** 在现代的 Web 应用开发中,为了提高系统的响应速度和用户体验,异步处理已经成为不可或缺的一部分。...记得在生产环境中,对 Celery 的配置和监控也需要相应的重视,以确保系统的稳定运行。

    celery定时任务使用总结

    # Celery 配置 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # Redis 连接 URL CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果存储的 Redis 连接 URL CELERY_ACCEPT_CONTENT = ['json'] CELERY_...

    celery-template:简单的celery模板来演示链接和任务路由

    该模板主要关注符合任务路由以及基本任务链,其中任务可以依赖于其他任务并且需要按顺序执行。 组织 calc # Application root ├── __init__.py # Celery app instance is imported here ├── celery.py # ...

    django_celery_demo

    它可能包含了一个完整的示例,演示了如何配置 Django 与 Celery 的集成,以及如何定义、调度和执行 Celery 任务。 在 Django 中使用 Celery,首先需要安装 Celery 和相关的消息中间件(如 RabbitMQ 或 Redis)。...

    django-celery

    2. **配置**:在Django项目的settings.py中配置Celery,包括设置消息中间件、结果后端、worker配置等。 3. **任务注册**:在Django应用中定义任务函数,并使用`@app.task`装饰器将其注册到Celery应用中。 4. **...

Global site tag (gtag.js) - Google Analytics