一、为什么要用celery
celery是一个简单、灵活、可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必须工具。他是一个专注于实时处理的任务队列,同时也支持任务调度。
celery是异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。celery用于生产系统每天处理数以百万计的任务。
【注:何为任务队列?任务队列是一种在线程或机器间分发任务的机制。消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。)】。
Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用 SQLite 进行本地开发。
Celery 可以单机运行,也可以在多台机器上运行,甚至可以跨越数据中心运行。
二、celery适用于那些场景
应用场景一:我们知道大型网站的性能非常重要,然而有时不得不做一些相当耗时的操作。 比如SNS网站的“新鲜事儿”系统,我发帖之后,会给所有关注我的人推送一条通知。乍一看没什么难的,发帖之后找出关注我的人, 然后生成相应的消息记录就行了。但问题是,100个人关注我,就要执行100条INSERT查询,更要命的是,Web服务器是同步的, 这100条查询执行完成之前,用户是看不到结果的。怎么办呢,这时就轮到消息队列上场了。发帖之后只需给队列发送一条消息, 告诉队列“我发帖子了”,然后把发帖的结果返回给用户。 这时另一个叫做worker的进程会取出这条消息并执行那100条INSERT查询。这样,推送通知的操作在后台异步执行, 用户就能立即看到发帖结果。更精彩的是,可以运行多个worker实现分布式,多繁重的任务都不在话下了。将Celery 与RabbitMQ 结合,将会产出很好的效果,可以实现类似新浪微博大数据量的消息推送。(这里就可以采用RabbitMQ消息队列系统负责存储消息;采用celery的worken进程,同时提供在webapp中创建任务的功能)。
应用场景二:很多做开发和运维的都会涉及一件事:crontab, 也就是在服务器上设定定时任务,按期执行一些任务.但是假如你有上千台的服务器, 你有上千种任务,那么对于这个定时任务的管理恐怕是一件很头疼的事情.哪怕你只是几十个任务分配的不同的机器上怎么样合理的管理和实现以下功能呢:①查看定时任务的执行情况.比如执行是否成功,当前状态,执行花费的时间;②一个友好的界面或者命令行下实现添加,删除任务;③怎么样简单实现不同的机器设定不同种任务,某些机器执行不同的队列;④假如你需要生成一个任务怎么样不阻塞剩下来的过程(异步了呗);⑤怎么样并发的执行任务。RabbitMQ,ZeroMQ这样的消息队列总是出现在我们视线中, 其实意义是很简单: 消息就是一个要传送的数据,celery是一个分布式的任务队列.这个”任务”其实就是一种消息, 任务被生成到队列中,被RabbitMQ等容器接收和存储,在适当的时候又被要执行的机器把这个消息取走。
以上是两种典型的应用场景。通过上面两种场景的分析,在大量异步任务处理和大量定时任务管理的情况下,我们可以优先考虑采用celery和rabbitMq解决这些问题。
三、celery特点
- 简单:Celery 易于使用和维护,并且它不需要配置文件
- 高可用性:倘若连接丢失或失败,进程和客户端会自动重试,并且通过主/主或主/从方式复制来提高可用性
- 快速:单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级
- 灵活:Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、中间人传输或更多。
四、工作原理
它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的?这个Celery本身不能解决。所以,RabbitMQ作为一个消息队列管理工具被引入到和Celery集成,负责处理服务器之间的通信任务。和rabbitmq的关系只是在于,celery没有消息存储功能,他需要介质,比如rabbitmq、redis、mysql、mongodb 都是可以的。推荐使用rabbitmq,他的速度和可用性都很高。
五、celery安装配置
- 用pip安装:$ pip install -U Celery
- 用easy_install 安装:$ easy_install -U Celery
- 捆绑式安装--Celery 也定义了一组用于安装 Celery 和给定特性依赖的捆绑:$ pip install celery[librabbitmq] 或者 $ pip install celery[librabbitmq,redis,auth,msgpack]。
- 注意:有关celery的捆绑详解,请查看:http://docs.torriacg.org/docs/celery/getting-started/introduction.html 页面中捆绑。
六、应用
from celery import Celery app = Celery('tasks', broker='amqp://root:123456@*.*.*.*:5672/myhost') @app.task def add(x, y): return x + y #启动: celery -A tasks worker --loglevel=info from tasks import add add.delay(4, 4) #执行: python run.py
七、使用模块配置
BROKER_URL = 'amqp://' broker设置 CELERY_RESULT_BACKEND = 'amqp://' 存储任务结果 CELERY_TASK_RESULT_EXPIRES = 18000 celery任务结果有效期 CELERY_TASK_SERIALIZER = 'json' 任务序列化结构 CELERY_RESULT_SERIALIZER = 'json' 结果序列化结构 CELERY_ACCEPT_CONTENT=['json'] celery接收内容类型 CELERY_TIMEZONE = 'Asia/Shanghai' celery使用的时区 CELERY_ENABLE_UTC = True 启动时区设置 CELERYD_LOG_FILE="/var/log/celery/celery.log" celery日志存储位置 from kombu.common import Broadcast CELERY_QUEUES = (Broadcast('broadcast_logger'), ) 任务队列的类型 CELERY_ROUTES = { 任务队列 'log_analysis.run': {'queue': 'api.log'}, 'logrotate': {'queue': 'broadcast_logger'}, } CELERY_SEND_TASK_ERROR_EMAILS = True celery接收错误邮件 ADMINS = ( ("*****", "*****@***.com"), celery接收错误邮件地址 ) SERVER_EMAIL = ****@***.com 从哪里发送的错误地址 EMAIL_HOST = "*.*.*.*" EMAIL_PORT = 25 EMAIL_HOST_USER = SERVER_EMAIL CELERYBEAT_SCHEDULE = { 定期执行任务 # 接口中心每小时 'api.hour':{'task': 'api.hour', 'schedule': crontab(minute=15), 'args': ()}, # 接口中心每日 'api.day':{'task': 'api.day', 'schedule': crontab(minute=30, hour=0), 'args': ()}, } celery = Celery() celery.config_from_object('celeryconfig1') celery配置文档
八、Crontab
Example |
Meaning |
crontab() |
每分钟 |
crontab(minute=0, hour=0) |
每天零时 |
crontab(minute=0, hour='*/3') |
每3个小时 |
crontab(minute=0, hour='0,3,6,9,12,15,18,21') |
每3个小时 |
crontab(minute='*/15') |
每15分钟 |
crontab(day_of_week='sunday') |
周日每分钟 |
crontab(minute='*', hour='*', day_of_week='sun') |
周日每分钟 |
crontab(minute='*/10', hour='3,17,22',day_of_week='thu,fri') |
Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0, hour='*/2,*/3') |
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') |
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17') |
Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(day_of_month='2') |
Execute on the second day of every month. |
crontab(day_of_month='1-7,15-21') |
Execute on the first and third weeks of the month. |
crontab(day_of_month='11', month_of_year='5') |
Execute on 11th of May every year. |
crontab(month_of_year='*/3') |
Execute on the first month of every quarter. |
九、启动
celery -A tasks worker --loglevel=info celery beat
任务:组成celery的核心,任务都有唯一的名字
@app.task(serializer='json') def create_user(username, password): User.objects.create(username=username, password=password)
流程:①celerybeat生成任务消息,然后发送消息到一个exchange(交换机);②交换机决定那个(些)队列会接收这个消息,这个其实就是根据下面的exchange的类型和绑定到这个交换机所用的bindingkey;
序列化:格式json、pickle、yaml、msgpack
add.apply_async((10, 10), serializer='json')
压缩:格式zlib、gzip、bzip2
add.apply_async((2, 2), compression='zlib')
十、高级用法
1、group
from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 是多个相同任务
2、chain
>>> from celery import chain # 2 + 2 + 4 + 8 >>> res = chain(add.s(2, 2), add.s(4), add.s(8))() >>> res.get() 16 是一个任务
3、chord
>>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90 多个不同任务,必须有backend配置,配置文件中增加CELERY_CHORD_PROPAGATES = True
十一、celery amqp
$ celery amqp 读取celeryconfig配置 -> connecting to amqp://guest@localhost:5672/. -> connected. 1> exchange.declare testexchange direct 定义交换机 ok. 2> queue.declare testqueue 定义队列 ok. queue:testqueue messages:0 consumers:0. 3> queue.bind testqueue testexchange testkey 绑定队列 ok. 4> basic.publish 'This is a message!' testexchange testkey 发布消息 ok. 5> basic.get testqueue 消费 {'body': 'This is a message!', 'delivery_info': {'delivery_tag': 1, 'exchange': u'testexchange', 'message_count': 0, 'redelivered': False, 'routing_key': u'testkey'}, 'properties': {}} 6> basic.ack 1 回馈 ok. 7> queue.delete testqueue 删除队列 ok. 0 messages deleted. 8> exchange.delete testexchange 删除交换机 ok.
十二、celery界面监控
-
安装flower:pip install flower
-
启动flower:celery flower
-
访问 http://host:5555
十三、celery队列
1、CELERY_QUEUES(定义celery队列)
from kombu import Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = ( Queue('default', routing_key='task.#'), Queue('feed_tasks', routing_key='feed.#'), ) CELERY_DEFAULT_EXCHANGE = 'tasks' CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' CELERY_DEFAULT_ROUTING_KEY = 'task.default'2、CELERY_ROUTES(用来决定在任务哪个队列上执行)
CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }3、只让队列单独工作:celery worker -Q feed_tasks
相关推荐
与之相比,使用Celery来管理定时任务和周期任务会更加灵活和方便,它能够在任务执行结束后随时获取任务结果。 本文通过一个简单的示例来展示如何使用Celery来定义和执行异步任务。首先定义了一个Celery实例,并配置...
以上介绍了在Python Celery中如何设置和使用定时任务,包括定时任务的创建、配置以及管理和调度方式。Celery定时任务的使用可以大大简化后台任务的处理,使得程序可以更加灵活高效地处理周期性和延时性的任务。
这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来...
本系统是一个主要使用python3, celery和requests来爬取职位数据的爬虫,实现了定时任务,出错重试,日志记录,自动更改Cookies等的功能,并使用ECharts Bootstrap 来构建前端页面,来展示爬取到的数据。
Celery支持多种消息代理,如RabbitMQ、Redis等,可以处理各种复杂的任务调度,如定时任务、周期性任务和即时任务。 Flower作为Celery的可视化监控工具,具备以下核心功能: 1. **实时监控**:Flower可以实时显示...
Celery 是一个分布式任务...总的来说,这个项目是一个全面的教程,涵盖了使用Celery进行异步和定时任务处理的关键方面,以及异常处理和通知机制。对于想要学习或已经在使用Celery的开发者来说,这是一个宝贵的资源。
本篇将深入探讨如何在Windows上配置和运行基于Redis作为消息中间件的Celery定时任务。 首先,我们需要了解Celery的基本概念。Celery是一个开源的分布式任务队列,它专注于实时操作,但也支持调度。它使用AMQP...
内容涵盖 Celery 的基本概念、配置方法、异步任务和定时任务的调用、任务流设计、日志处理、错误监控(Sentry)和与 Flask 的结合使用。此外,还介绍了 Celery 中的五个角色(Task、Broker、Worker、Beat、Backend)...
Celery可以处理异步任务,包括定时任务(定时器)和周期性任务(定期执行)。它的主要特点是分布式、可扩展和容错性。 WebSocket是一种在客户端和服务器之间建立长连接的协议,允许双向通信。在实时监控Celery任务...
【标题】:“不使用数据库动态设置定时任务-celery_demo.zip”揭示了如何在不依赖数据库的情况下使用Celery框架创建和管理定时任务。Celery是一个分布式任务队列,广泛用于处理异步任务和定时调度。 【描述】:“不...
Supervisor、Celery和Flower这三款工具的结合使用,可以为开发者提供一个高效、可视化的解决方案,用于处理异步任务和定时任务。现在,我们将深入探讨这些工具的配置与应用。 首先,**Supervisor** 是一个用Python...
在Python中,Celery提供了一种灵活的调度器,可以设置定时任务或者间隔任务。例如,你可以定义一个任务,使其每天特定时间运行一次,或者在完成某个任务后触发另一个任务。 **Python环境** 在这个项目中,推荐使用...
4. **任务调度**:通过Celery的调度能力,Crawlab能够设置定时任务,自动执行爬虫,满足周期性数据抓取需求。 5. **错误处理**:Crawlab提供错误捕获和恢复机制,当爬虫在执行过程中遇到问题时,可以自动重试或记录...
- **定时任务**: 使用 Celery 的 Beat 服务,可以定期执行任务,实现计划任务功能。 - **并发处理**: 当需要处理大量并发请求时,Celery 可以有效地扩展应用的处理能力。 - **错误处理与重试**: Celery 支持任务重...
它主要由Python编写,并且主要的使用场景包括网站后台任务处理和定时任务。Python开发者常用它来实现异步任务,例如发送邮件、处理文件、调用外部API等耗时操作,这些操作可以异步执行,不阻塞主线程的处理。 首先...
标题中的“celery+ rabbitMq + python linux 下 例子 crontab例子”涉及的是一个在Linux环境下使用Python、Celery以及RabbitMQ构建任务调度系统,并结合crontab进行定时任务设置的示例。接下来,我们将深入探讨这些...
Django是Python领域广泛应用的Web框架,而Celery则是一个分布式任务队列,特别适用于处理异步任务和定时任务。 【描述】中的“源码”意味着我们将深入探讨实际的代码实现,了解如何在Django项目中集成Celery,以及...