reference:
http://skyrover.me/post/19/
Celery是一个实时处理和任务调度的分布式任务队列。任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。
这是其使用场景:
- web应用,需要较长时间完成的任务,就可以作为任务交给celery异步执行,执行完返回给用户。
- 网站的定时任务
- 异步执行的其他任务。比如清理/设置缓存
有以下特点:
- 任务执行情况
- 管理后台管理任务
- 任务和配置管理相关联
- 多进程,EventLet和Gevent三种模式并发执行
- 错误处理机制
- 任务原语,任务分组,拆分和调用链
Celery的架构
- Celery Beat,任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
- Celery Worker,执行任务的消费者
- Broker,消息代理,接受生产者的任务消息,存进队列然后按序发送给消费者
- Producer,定时任务或者调用了API产生任务交给任务队列进行处理
- Result Backend,任务处理完后保存状态信息和结果
整体机制就是,任务发布者(一般是web应用)或者任务调度,即定时任务将任务发布到消息代理上(使用Redis或者RabbitMQ),然后消息代理将任务按序发送给Worker执行,Worker执行完后将结果存储到Backend中,也可以用Redis。
Celery的序列化
一般使用json, yaml, msgpack,其中msgpack是一个二进制的json序列化方案,比json数据结构更小,更快。序列化的作用是在客户端和消费者之间传输数据的途中需要序列化和反序列化。
一个例子
- 主程序(实例化Celery)
一种方式,直接进行,简单粗暴
from celery import Celery
from config import REDISHOST, REDISDB, REDISPORT
celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))
另一种方式,使用配置文件,类似于Flask应用实例的方式吧
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
# celery.conf.update(app.config)
- Celery配置文件
如果都在实例化的时候指定好了配置,那么就不需要了,如果需要指定额外的参数,那么就可以放在配置文件里,以下是几个常用的参数:
# 导入tasks
from celery.schedules import crontab
CELERY_IMPORTS = ("tasks", "graph_data_tasks")
BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
# A value of None or 0 means results will never expire
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务结果过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# celery worker的并发数
CELERYD_CONCURRENCY = 2
# 使用任务调度,使用Beat进程自动生成任务
CELERYBEAT_SCHEDULE = {
'graph_data': {
'task': 'graph_data_tasks.sync_graph',
'schedule': timedelta(minutes=60),
'args': ()
},
'rank_for_guchong': {
'task': 'backend.celerytasks.rank_for_guchong.calc_ability_schedule',
'schedule': crontab(hour=11, minute=55),
'args': ()
}
}
- Celery的tasks
将实例化的celery实例导入进来之后,使用celery的装饰器task()即可完成tasks的注册
@celery.task()
def calc_capacity(strategy_id, start_date, end_date, assets, flows, stocks, output_path):
c = CalcAbility(strategy_id, assets, flows, stocks, output_path)
c.run(start_date, end_date)
return {'current': 1, 'total': 1, 'status': 'success', 'reason': ''}
- 使用tasks
@be_data.route('/data/graph', methods=['GET'])
# @token_required
def gen_megapool_from_summary():
'''直接从summary文件计算这些汇总信息'''
pie_tuple = Cache.get_summary_data(name='summary_data', data_type=float)
meta = {}
if not pie_tuple:
task = sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=0), 200
elif pie_tuple[11] != db_clients.hsize('summary_ability'):
sync_graph.apply_async(args=[])
return gen_response(data=meta, message="success", errorcode=0), 200
另外可以通过task.id来获取task的运行状态
task = calc_ability.AsyncResult(task_id)
response = {
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'time': task.info.get('time', 0),
'start_time': start_time,
'fundid_nums': calc_task.fundid_nums,
'status': task.info.get('status', ''),
'reason': task.info.get('reason', '')
}
- Celery的执行
celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings --beat
- 指定队列
Celery通常使用默认名为celery的队列来存放任务,可以通过CELERY_DEFAULT_QUEUE修改,可以使用优先级不同的队列来确保高优先级的任务不需要等待就可以得到相应。
from kombu import Queue
CELERY_QUEUES = (
Queue('default', routing_key='task.#'),
Queue('web_tasks', routing_key='web.#')
)
# 默认交换机名字为tasks
CELERY_DEFAULT_EXCHANGE = 'tasks'
# 交换类型是topic
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
# 默认的路由键是task.default
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
CELERY_ROUTES = {
'projq.tasks.add': {
'queue': 'web_tasks',
'routing_key': 'web.add',
}
}
指定队列方式启动消费者进程:celery -A projq worker -Q web_tasks -l info
celery中的任务绑定
任务可以通过 app.task 装饰器进行注册,需要注意的一点是,当函数有多个装饰器时,为了保证 Celery 的正常运行,app.task 装饰器需要在最外层。其中有一个bind 参数,当设置了 bind 参数,则会为这个任务绑定一个 Task 实例,通过第一个 self 参数传入,可以通过这个 self 参数访问到 Task 对象的所有属性。绑定任务用于尝试重新执行任务(使用app.Task.retry()),绑定了任务就可以访问当前请求的任务信息和任何你添加到指定任务基类中的方法。也可以使用self.update_state()
方法来更新状态信息
@celery.task(bind=True)
def calc_ability(self, fundIds, start_date, end_date, increment_end_date, maxsize):
start_date = parse_date(start_date)
end_date = parse_date(end_date)
self.update_state(state='PROGRESS',
meta={'current': index, 'total': total,
'time': used_time.used_time() / 60, 'status': 'running', 'reason': ''})
停止任务
rs = add.delay(1, 2)
rs.revoke() # 只是撤销,如果任务已经在执行,则撤销无效
rs.task_id # 任务id
app.control.revoke(rs.task_id) # 通过task_id撤销
app.control.revoke(rs.task_id, terminate=True) # 撤销正在执行的任务,默认使用TERM信号
app.control.revoke(rs.task_id, terminate=True, signal='SIGKILL') # 撤销正在执行的任务,使用KILL信号
# 而在最新的celery3版本中,这样停止一个任务
celery.control.terminate(task_id)
# 其实本质还是调用了revoke
return self.revoke(task_id, destination=destination, terminate=True, signal=signal, **kwargs)
Celery监控工具Flower
pip install flower
需要在celery_settings中指定CELERY_SEND_TASK_SENT_EVENT = True
,然后和启动celery同样的目录下运行flower -A backend.celery --port=5555
,即可看到管理界面了。访问http://localhost:5555
使用自动扩展
celery -A proj worker -l info --autoscale=6,3
表示平时保持3个进程,最大并发进程数可以达到6个。
在Celery tasks里面使用多进程
from celery import Celery
import time
import multiprocessing as mp
app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")
def test_func(i):
print "beg...:", i
time.sleep(5)
print "....end:", i
return i * 5
@app.task
def fun_1(n):
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,))
p.close()
p.join()
return 1
if __name__ == "__main__":
app.start()
直接启动多进程是肯定不可以的,因为是守候进程curr_proc.daemon=True,所以启多进程之前主动设置为非守候进程curr_proc.daemon=False,启动了以后再设为守候进程
相关推荐
【分布式任务队列Celery详解】 Celery是一个强大的分布式任务队列系统,它设计用于处理大量消息,并且具有高可靠性和灵活性。Celery的核心功能是实现实时任务处理和任务调度,采用生产者-消费者模式运行。在这个...
总的来说,这个资料包旨在帮助开发者掌握使用Python和Celery构建分布式任务队列的方法,提升系统处理大规模异步任务的能力,从而在高并发场景下实现高效、可靠的业务处理。通过学习和实践,你可以了解到如何将 ...
由于此类Web应用程序中经常使用Celery分布式任务,因此该库使您既可以实现celery工作者,又可以在Go中提交celery任务。 您还可以将此库用作纯go分布式任务队列。 芹菜工人行动起来 支持的经纪人/后端 现在支持...
### Python Celery 分布式任务队列的使用详解 #### 一、Celery介绍与基本使用 ##### 1.1 Celery简介 Celery 是一个高级的分布式任务队列,它支持多种消息传递机制(例如 RabbitMQ 和 Redis),并且能够很好地处理...
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。 在 Python 中定义 Celery 的...
Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。 这是一个任务队列,着重于实时处理,同时还支持任务调度。 Celery有庞大的用户和贡献者社区,您应该加入IRC或...
5. **发布任务**:在需要异步执行的地方,调用Celery任务的apply_async方法,将任务放入队列。 6. **处理任务**:Celery的工作节点会从队列中取出任务,执行并返回结果。 在实际应用中,thorns项目可以应用于各种...
Celery 是一个强大的开源分布式任务队列,主要用Python编写,设计用于处理大量异步任务。它通过消息传递系统(如 RabbitMQ、Redis 或 Amazon SQS)作为中间人来协调工作,使得应用程序可以将任务发布到队列,然后在...
在实际项目中,Flower可以帮助开发者更有效地管理和优化Celery任务队列,提升整个系统的效率和可靠性。通过监控任务的执行情况,可以及时发现潜在的问题,如任务堆积、worker过载等,并采取相应措施进行调整。同时,...
Celery是一个基于Python开发的分布式任务队列系统,它支持后台任务的异步处理、周期性任务的调度和定时任务的执行。它使得任务的执行不受限于用户界面或用户交互,从而提高应用程序的响应性和效率。 Celery的核心...
分布式知乎爬虫,python3,使用celery进行分布式任务分发 使用sqlalchemy做orm框架, db模块为model、以及相关的存储、创建。 使用redis作为cookies的存储,利用过期时间。 使用redis作为celery的broker,backend。 ...
在Django中使用异步任务队列Celery可以显著提升网站性能,特别是在处理耗时较长的任务时,能够避免用户长时间等待,提供更好的用户体验。Celery是一个分布式任务队列,允许开发者将任务放入队列,由后台worker异步...
RabbitMQ实战 高效部署分布式消息队列 pdf全85M,无法一次性上传,压缩成两个包。 请同时下载两个文件,然后一并解压。另一个文件在我上传的资源里面找,谢谢
5. **任务定义**:在Celery中,任务是通过Python函数定义的,可以使用`@app.task`装饰器将其标记为Celery任务。 6. **任务调度**:不使用数据库进行任务调度,可能意味着将任务配置写入代码,然后通过程序启动时...
在 《分布式任务队列Celery使用说明》 中介绍了在 Python 中使用 Celery 来实验异步任务和定时任务功能。本文介绍如何在 Django 中使用 Celery。 安装 pip install django-celery 这个命令使用的依赖是 Celery 3.x...
Celery是一个分布式任务队列,它提供了任务分解、任务分派、执行和结果收集等功能,支持灵活的任务调度策略。而Dask则是一个并行计算库,能够创建分布式集群,自动分配任务并处理大规模数据。 实现分布式任务调度...
标题和描述中提到的知识点主要涉及Django、Celery以及Redis在部署定时任务方面的应用,重点是如何使用这些工具实现多worker和多队列机制。以下是详细的知识点说明。 首先,Celery是一个基于消息队列的异步任务队列/...