概述
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
09 |
REDIS_HOST = 'localhost'
|
12 |
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
|
13 |
CHANNEL_RESULT = 'CHANNEL_RESULT'
|
21 |
MyServerResultHandleThread().start()
|
22 |
MyServerDispatchThread().start()
|
25 |
class MyServerDispatchThread(threading.Thread):
|
27 |
threading.Thread.__init__( self )
|
30 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB)
|
31 |
for i in range ( 1 , 100 ):
|
32 |
channel = CHANNEL_DISPATCH + '_' + str (random.randint( 1 , 3 ))
|
33 |
print ( "Dispatch job %s to %s" % ( str (i), channel))
|
34 |
ret = r.publish(channel, str (i))
|
36 |
print ( "Dispatch job %s failed." % str (i))
|
40 |
class MyServerResultHandleThread(threading.Thread):
|
42 |
threading.Thread.__init__( self )
|
45 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB)
|
47 |
p.subscribe(CHANNEL_RESULT)
|
48 |
for message in p.listen():
|
49 |
if message[ 'type' ] ! = 'message' :
|
51 |
print ( "Received finished job %s" % message[ 'data' ])
|
54 |
if __name__ = = "__main__" :
|
说明
MyMaster类 – master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 – 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 – 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
03 |
from datetime import datetime
|
10 |
REDIS_HOST = 'localhost'
|
13 |
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
|
14 |
CHANNEL_RESULT = 'CHANNEL_RESULT'
|
23 |
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str (i)).start()
|
26 |
class MyJobWorkerThread(threading.Thread):
|
28 |
def __init__( self , channel):
|
29 |
threading.Thread.__init__( self )
|
30 |
self .channel = channel
|
33 |
r = redis.StrictRedis(host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB)
|
35 |
p.subscribe( self .channel)
|
36 |
for message in p.listen():
|
37 |
if message[ 'type' ] ! = 'message' :
|
39 |
print ( "%s: Received dispatched job %s " % ( self .channel, message[ 'data' ]))
|
40 |
print ( "%s: Run dispatched job %s " % ( self .channel, message[ 'data' ]))
|
42 |
print ( "%s: Send finished job %s " % ( self .channel, message[ 'data' ]))
|
43 |
ret = r.publish(CHANNEL_RESULT, message[ 'data' ])
|
45 |
print ( "%s: Send finished job %s failed." % ( self .channel, message[ 'data' ]))
|
48 |
if __name__ = = "__main__" :
|
说明
MySlave类 – slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 – 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。
转自:http://www.kongxx.info/blog/?p=522
分享到:
相关推荐
Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。 Redis从它的许多竞争继承来的三个主要特点: Redis数据库完全在内存中,使用磁盘仅用于持久性。 相比许多键值数据...
`aredis` 是一个针对 Python 3.5 及以上版本设计的高效、异步的 Redis 客户端库。它利用了 Python 的协程(coroutines)和事件循环(event loop)技术,为开发者提供了非阻塞的 I/O 操作,极大地提升了程序执行效率...
Redis 和 Python 在爬虫系统中的整合是一个常见的技术实践,它利用了 Redis 的高效数据存储和 Python 的强大编程能力。在本套源码中,我们可能会看到如何构建一个基于 Redis 的爬虫队列来管理和调度爬取任务,以及...
RQ 调度器RQ Scheduler是一个小包,它为RQ(一个基于Redis 的Python 排队库)添加了作业调度功能。支持RQ调度如果您发现rq-scheduler有用,请考虑通过Tidelift支持其开发。要求请求权限安装你可以通过 pip安装RQ ...
在IT行业中,Python是一种强大的...总的来说,结合Python、Redis和Paramiko,我们可以构建一个强大的系统监控运维平台,实现对多台远程服务器的实时监控、自动化任务执行和异常检测,从而提升整体的运维效率和安全性。
Python基于Tornado实现的系统核心调度能够有效地支持分布式扩展,这是一种高效、轻量级的解决方案,尤其适合处理大量并发连接。Tornado是一个Python Web框架和异步网络库,由FriendFeed团队开发,后来被Facebook收购...
redis.smove('tags', 'tags1', 'Coffee') # 将元素从一个集合移动到另一个 redis.scard('tags') # 获取集合大小 redis.sismember('tags', 'Book') # 判断元素是否存在 redis.sinter('tags', 'tags1') # 集合交集 ...
Python-MrQueue是一个基于Python的分布式worker任务队列,它利用了Redis作为中央消息代理以及gevent库来实现高并发的异步处理。这个框架设计的主要目标是为了解决大型项目中的任务调度、异步执行和工作负载均衡问题...
标题中的“基于redis实现的消息队列”指的是使用Redis这一开源数据结构存储系统来构建消息队列(Message Queue, MQ)的解决方案。Redis以其高性能、丰富的数据结构和内存存储特性,常被用作构建消息队列的底层技术。...
python 开发的 Scrapy 框架来开发,使用 Xpath 技术对下载的网页进行提取解析,运用 Redis 数据库做分布式, 设计并实现了针对当当图书网的分布式爬虫程序,scrapy-redis是一个基于redis的scrapy组件,通过它可以...
python 开发的 Scrapy 框架来开发,使用 Xpath 技术对下载的网页进行提取解析,运用 Redis 数据库做分布式, 设计并实现了针对当当图书网的分布式爬虫程序,scrapy-redis是一个基于redis的scrapy组件,通过它可以...
python 开发的 Scrapy 框架来开发,使用 Xpath 技术对下载的网页进行提取解析,运用 Redis 数据库做分布式, 设计并实现了针对当当图书网的分布式爬虫程序,scrapy-redis是一个基于redis的scrapy组件,通过它可以...
python 开发的 Scrapy 框架来开发,使用 Xpath 技术对下载的网页进行提取解析,运用 Redis 数据库做分布式, 设计并实现了针对当当图书网的分布式爬虫程序,scrapy-redis是一个基于redis的scrapy组件,通过它可以...
python 开发的 Scrapy 框架来开发,使用 Xpath 技术对下载的网页进行提取解析,运用 Redis 数据库做分布式, 设计并实现了针对当当图书网的分布式爬虫程序,scrapy-redis是一个基于redis的scrapy组件,通过它可以...
iredis 是一个强大的 Python 库,它为开发者提供了一个高效且易于使用的接口,以便与 Redis 数据库进行交互。Redis 是一种高性能的键值对存储系统,常用于数据缓存、消息队列以及数据库等多个场景。iredis 充分利用...
总结来说,这个压缩包提供了一个完整的Scrapy分布式爬虫项目,包含了从使用Python和Scrapy构建爬虫,通过Redis实现分布式爬取,再到数据存储到MongoDB的整个流程。对于学习和实践Python爬虫开发,尤其是分布式爬虫,...