Kombu 源码解析一
玩 Python 的同学可能很多都听说过甚至玩过 Celery,Celery 作为 Python 中最流行的异步消息队列可以说是非常得受欢迎。但是,用得多的同学相信也是有一个感触,那就是时不时会遇到莫名奇妙的坑,而且最后都只能通过重启解决,因为要找到原因实在不是一件容易的事情。
而我作为这受伤人群中的一个,想了解一下这个大坑内部的原理,所以就曾经扒过它内部的实现,但是,由于 Celery 确实有点庞大,而且我认为代码实现得也不是很多,所以,只是学习到了一部分的知识,但是,却是收获良多。所以,最近因为各种情况,我这次深扒一次,所以后续将会有一系列的文章是讲 Celery 实现的。
当你尝试查看 Celery 的源码的时候,你会发现有一个你根本绕不过的坑的,那就是 Kombu,Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,但是,这只是作者的一面说辞,你看完代码就知道了,它是尝试做异步MQ 的兼容 AMQP 的抽象队列。所以,我决定先从 Kombu 的代码写起,而这篇文章是第一篇!
我在这个系列里面讲的都是 Kombu 4.1.0 和 Celery 4.1.0 的事,先提前说一下,以免后续的同学对不上号。
下载 Kombu 代码
Kombu 的代码很好找,因为它和 Celery 是强耦合的,所以,它的代码放在 Celery 的代码组里头,可以很简单得 Github上找到,所以下载也是很简单了:
> git clone https://github.com/celery/kombu.git
> cd kombu
> git checkout v4.1.0
这样,你就得到了我这个系列文章里面的 Kombu 的源代码了,而我后面的所有代码来源以及行数都是对应这个版本的代码的!
整体 Review
拿到代码之后,我就先摒弃所有的非代码部分,直接来整体看看 kombu 这个目录下有什么:
可以看到,在 kombu 这个目录里面大部分都是文件,而只有三个文件夹,分别是:
- async:异步操作的 函数 和 类
- transport:兼容各种 MQ 的类
- utils: 一些辅助 函数 和 类
从这个目录中,我们就发现了一些问题,你说你既然是抽象 MQ 的,你要啥异步操作啊,这有点越界了呀喂。
简单添加/获取消息
OK,概览完我们看一些实际的东西,先尝试用 Kombu 写一个生产/消费的代码吧,这个 DEMO 分为两部分
- simple_receive.py:一个简单的消费者
- simple_send.py:一个简单的生产者
下面看看这两段代码都是怎么写的:
还是老规矩,这两段代码你都可以在我的 Github 中找到。先看看看消费者,消费者的代码比较简单,我们先建立和 MQ 的连接,然后再从 MQ 的指定队列里面将消息拿出来,处理掉。
而生产者则相反,前面也是要先和队列建立连接,但是,有一点不一样的是这里创建了一个 exchange,然后再往队列里面发送消息,而发送消息的同时还是搭配好多参数。
这里就是 Kombu 有意思的一点了,它意图对所有的 MQ 进行抽象,然后通过接口对外暴露出一致的 API,这样我们就不用关心底层用的是什么 MQ 了,Redis/RabbitMQ/MongoDB 之类的随便切换。
Kombu 的 MQ 模型
因为 Kombu 是对 AMQP 进行抽象,所以它必定有抽象的模型,事实上,它大体上和 RabbitMQ 差不多,但是,不完全一样,有一些差别,下面就介绍一下 Konbu 的抽象模型。
在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:
- Message:生产消费的基本单位,其实就是我们所谓的一条条消息
- Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接
- Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例
- Producers: 发送消息的抽象类
- Consumers:接受消息的抽象类
- Exchange:MQ 路由,这个和 RabbitMQ 差不多,支持 5 类型
- Queue:对应的 queue 抽象,其实就是一个字符串的封装
消息是发送到那个 Queue 的
假设我们想要发送一个消息到 Redis 中名为 'test' 的 queue 中,那么 Kombu 是怎么做的,这就设计到 Exchange 的概念了。目前 Kombu 对于不同 MQ 的支持是这样的:
假设我们用的是 Direct,那么我们的 Producer 在生产的时候只需要指定 Queue=test 即可,这样就会发送的 test 这个 queue 中。更多关于 Exchange 的知识可以参考:AMQP 0-9-1 Model Explained
这一篇就先介绍到这里,后续就开始深入到代码层面,看下这个模型中的各个实体是如何实现的。
相关推荐
1. **多协议支持**:Kombu支持AMQP(Advanced Message Queuing Protocol)、Redis、MongoDB、SQLAlchemy、Beanstalkd、RabbitMQ等多种消息队列协议,可以根据项目需求选择合适的后端。 2. **消息类型**:Kombu支持...
Kombu,作为Python的一个开源库,正是实现这一功能的重要组件。本文将深入探讨Kombu 3.0.37版本的特点、功能以及其在实际应用中的作用。 首先,让我们了解什么是Kombu。Kombu是一个为Python设计的消息传递库,它...
python库。 资源全名:kombu-5.1.0b1-py3-none-any.whl
kombu, python的消息库 kombu-- python的消息传递库 版本:站点:下载:关键字:关键字:4.1.0http://kombu.me/https://pypi.python.org/pypi/kombu/
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
资源分类:Python库 所属语言:Python 资源全名:kombu-4.1.0.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
以下是一个简单的例子,展示了如何使用kombu创建一个直接交换机和队列,并发送和接收消息: ```python import kombu connection = kombu.Connection('amqp://guest:guest@localhost:5672//') channel = connection...
1. **多后端支持**:Kombu 可以与多种消息队列服务进行交互,如RabbitMQ、Redis、Amazon SQS、MongoDB、SQLAlchemy等,这使得开发者可以根据项目需求选择最适合的队列服务。 2. **高级路由**:Kombu 提供了基于...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
包括使用本机producer和consumer类的kombu消息处理器,以及用于中继发布挂钩或cachingcelery连接器的consumerproducermixin工作者,ansible是一个简单而强大的自动化引擎。它用于帮助配置管理、应用程序部署和任务...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
Kombu是一个强大的Python消息传递库,它支持多种消息队列和消息代理,如RabbitMQ、SQS(Amazon Simple Queue Service)等。这个库的主要目的是简化分布式系统中的异步任务处理和通信,使得开发者可以方便地在不同...
Kombu Fernet序列化器该库注册了一组序列化程序,这些序列化程序将Kombu内置的序列化程序用于接收,并使用对称地对其进行加密。 可通过KOMBU_FERNET_KEY环境变量访问加密密钥。 设置加密密钥: import osfrom ...
此存储库包含一个简单的程序,该程序显示了如何使用Kombu和RabbitMQ实时传送视频流,并假设发送方和接收方位于同一计算机系统中。 是一个开源消息代理。 是基于python的消息传递库,可以使用RabbitMQ作为传输工具...
教程中包含了源码解析,展示了Celery如何依赖于Kombu完成基本功能,以及如何通过AMQP协议实现消息路由。此外,还提供了关于如何在Django项目中使用Redis作为Celery的消息队列的示例代码,以及如何异步调用Celery任务...
下载 https://pypi.org/project/kombu/来源 https://github.com/celery/kombu/关键字 消息传递、amqp、rabbitmq、redis、mongodb、python、队列关于Kombu 是 Python 的一个消息传递库。Kombu 的目标是通过为 AMQ ...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
离线安装包,测试可用。使用 pip install [完整包名] 进行安装
1. **解压文件**:将下载的源码包或wheel文件解压缩到一个目录中。 2. **创建虚拟环境**(可选):为了保持环境的清洁,建议创建一个Python虚拟环境。使用`python -m venv myenv`创建新的环境,然后通过`source ...