介绍
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。
想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。
实现
Job
首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。
job.py
1 |
#!/usr/bin/env python |
2 |
# -*- coding: utf-8 -*- |
3 |
4 |
class Job:
|
5 |
def __init__( self , job_id):
|
6 |
self .job_id = job_id
|
Master
Master用来派发作业和显示运行完成的作业信息
master.py
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
04 |
from Queue import Queue
|
05 |
from multiprocessing.managers import BaseManager
|
06 |
from job import Job
|
07 |
08 |
09 |
class Master:
|
10 |
11 |
def __init__( self ):
|
12 |
# 派发出去的作业队列
|
13 |
self .dispatched_job_queue = Queue()
|
14 |
# 完成的作业队列
|
15 |
self .finished_job_queue = Queue()
|
16 |
17 |
def get_dispatched_job_queue( self ):
|
18 |
return self .dispatched_job_queue
|
19 |
20 |
def get_finished_job_queue( self ):
|
21 |
return self .finished_job_queue
|
22 |
23 |
def start( self ):
|
24 |
# 把派发作业队列和完成作业队列注册到网络上
|
25 |
BaseManager.register( 'get_dispatched_job_queue' , callable = self .get_dispatched_job_queue)
|
26 |
BaseManager.register( 'get_finished_job_queue' , callable = self .get_finished_job_queue)
|
27 |
28 |
# 监听端口和启动服务
|
29 |
manager = BaseManager(address = ( '0.0.0.0' , 8888 ), authkey = 'jobs' )
|
30 |
manager.start()
|
31 |
32 |
# 使用上面注册的方法获取队列
|
33 |
dispatched_jobs = manager.get_dispatched_job_queue()
|
34 |
finished_jobs = manager.get_finished_job_queue()
|
35 |
36 |
# 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
|
37 |
job_id = 0
|
38 |
while True :
|
39 |
for i in range ( 0 , 10 ):
|
40 |
job_id = job_id + 1
|
41 |
job = Job(job_id)
|
42 |
print ( 'Dispatch job: %s' % job.job_id)
|
43 |
dispatched_jobs.put(job)
|
44 |
45 |
while not dispatched_jobs.empty():
|
46 |
job = finished_jobs.get( 60 )
|
47 |
print ( 'Finished Job: %s' % job.job_id)
|
48 |
49 |
manager.shutdown()
|
50 |
51 |
if __name__ = = "__main__" :
|
52 |
master = Master()
|
53 |
master.start()
|
Slave
Slave用来运行master派发的作业并将结果返回
slave.py
01 |
#!/usr/bin/env python |
02 |
# -*- coding: utf-8 -*- |
03 |
04 |
import time
|
05 |
from Queue import Queue
|
06 |
from multiprocessing.managers import BaseManager
|
07 |
from job import Job
|
08 |
09 |
10 |
class Slave:
|
11 |
12 |
def __init__( self ):
|
13 |
# 派发出去的作业队列
|
14 |
self .dispatched_job_queue = Queue()
|
15 |
# 完成的作业队列
|
16 |
self .finished_job_queue = Queue()
|
17 |
18 |
def start( self ):
|
19 |
# 把派发作业队列和完成作业队列注册到网络上
|
20 |
BaseManager.register( 'get_dispatched_job_queue' )
|
21 |
BaseManager.register( 'get_finished_job_queue' )
|
22 |
23 |
# 连接master
|
24 |
server = '127.0.0.1'
|
25 |
print ( 'Connect to server %s...' % server)
|
26 |
manager = BaseManager(address = (server, 8888 ), authkey = 'jobs' )
|
27 |
manager.connect()
|
28 |
29 |
# 使用上面注册的方法获取队列
|
30 |
dispatched_jobs = manager.get_dispatched_job_queue()
|
31 |
finished_jobs = manager.get_finished_job_queue()
|
32 |
33 |
# 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
|
34 |
while True :
|
35 |
job = dispatched_jobs.get(timeout = 1 )
|
36 |
print ( 'Run job: %s ' % job.job_id)
|
37 |
time.sleep( 1 )
|
38 |
finished_jobs.put(job)
|
39 |
40 |
if __name__ = = "__main__" :
|
41 |
slave = Slave()
|
42 |
slave.start()
|
测试
分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下
master
01 |
$ python master.py |
02 |
Dispatch job: 1
|
03 |
Dispatch job: 2
|
04 |
Dispatch job: 3
|
05 |
Dispatch job: 4
|
06 |
Dispatch job: 5
|
07 |
Dispatch job: 6
|
08 |
Dispatch job: 7
|
09 |
Dispatch job: 8
|
10 |
Dispatch job: 9
|
11 |
Dispatch job: 10
|
12 |
Finished Job: 1
|
13 |
Finished Job: 2
|
14 |
Finished Job: 3
|
15 |
Finished Job: 4
|
16 |
Finished Job: 5
|
17 |
Finished Job: 6
|
18 |
Finished Job: 7
|
19 |
Finished Job: 8
|
20 |
Finished Job: 9
|
21 |
Dispatch job: 11
|
22 |
Dispatch job: 12
|
23 |
Dispatch job: 13
|
24 |
Dispatch job: 14
|
25 |
Dispatch job: 15
|
26 |
Dispatch job: 16
|
27 |
Dispatch job: 17
|
28 |
Dispatch job: 18
|
29 |
Dispatch job: 19
|
30 |
Dispatch job: 20
|
31 |
Finished Job: 10
|
32 |
Finished Job: 11
|
33 |
Finished Job: 12
|
34 |
Finished Job: 13
|
35 |
Finished Job: 14
|
36 |
Finished Job: 15
|
37 |
Finished Job: 16
|
38 |
Finished Job: 17
|
39 |
Finished Job: 18
|
40 |
Dispatch job: 21
|
41 |
Dispatch job: 22
|
42 |
Dispatch job: 23
|
43 |
Dispatch job: 24
|
44 |
Dispatch job: 25
|
45 |
Dispatch job: 26
|
46 |
Dispatch job: 27
|
47 |
Dispatch job: 28
|
48 |
Dispatch job: 29
|
49 |
Dispatch job: 30
|
slave1
01 |
$ python slave.py |
02 |
Connect to server 127.0 . 0.1 ...
|
03 |
Run job: 1
|
04 |
Run job: 2
|
05 |
Run job: 3
|
06 |
Run job: 5
|
07 |
Run job: 7
|
08 |
Run job: 9
|
09 |
Run job: 11
|
10 |
Run job: 13
|
11 |
Run job: 15
|
12 |
Run job: 17
|
13 |
Run job: 19
|
14 |
Run job: 21
|
15 |
Run job: 23
|
slave2
01 |
$ python slave.py |
02 |
Connect to server 127.0 . 0.1 ...
|
03 |
Run job: 4
|
04 |
Run job: 6
|
05 |
Run job: 8
|
06 |
Run job: 10
|
07 |
Run job: 12
|
08 |
Run job: 14
|
09 |
Run job: 16
|
10 |
Run job: 18
|
11 |
Run job: 20
|
12 |
Run job: 22
|
13 |
Run job: 24
|
相关推荐
想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。在这之前,我们先来详细了解下python中的多进程管理包multiprocessing。 multiprocessing.Process multiprocessing包是Python中的多进程管理包。...
Celery是一个分布式任务队列,它提供了任务分解、任务分派、执行和结果收集等功能,支持灵活的任务调度策略。而Dask则是一个并行计算库,能够创建分布式集群,自动分配任务并处理大规模数据。 实现分布式任务调度...
在深入学习这个Python分布式文件系统的源码时,你需要理解上述各个模块的设计和实现,熟悉分布式系统的基本原理,如CAP理论、Paxos协议、Chubby锁服务等。此外,对Python编程、网络编程、并发控制和数据存储等相关...
之前,为了在Python中实现生产者-消费者模式,往往就会选择一个额外的队列系统,比如rabbitMQ之类。此外,你有可能还要设计一套任务对象的序列化方式以便塞入队列。如果没有队列的支持,那不排除有些同学不得不从...
在Python中,可以使用`concurrent.futures`模块的`ProcessPoolExecutor`来创建一个进程池,它可以接受函数和参数列表,将任务分发到进程池中的各个进程。结合`multiprocessing.Queue`,可以实现进程间的通信和结果...
接下来,我们将通过一个简单的Python代码示例来展示如何实现一个分布式任务调度系统。我们使用了Python标准库中的`multiprocessing`模块以及其`managers`子模块来实现进程间通信。示例中包括两个主要部分:`task_...
6. Python的分布式库:例如,`multiprocessing`库可以实现多进程并行,`concurrent.futures`提供了一种抽象接口来异步执行可等待对象,`Celery`则是一个分布式任务队列,适合周期性任务和后台作业。 7. C/C++的...
2. **分布式计算**:项目可能使用了如`multiprocessing`、`concurrent.futures`等Python库来实现分布式任务调度,或者使用了Apache Spark、Hadoop等大数据处理框架进行分布式计算。 3. **HTTP服务探测**:项目可能...
在本操作系统课程设计中,我们将深入探讨如何使用Python这一强大且灵活的编程语言来实现操作系统的基本功能,特别是进程管理。Python虽然通常不用于构建完整的操作系统,但它的易读性、丰富的库支持以及强大的抽象...
一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 master服务端原理:通过managers模块把Queue...
因此,使用Rust编写的“Parallel”可能作为一个中间层,帮助Python更好地利用硬件资源,尤其是多核CPU,实现更高效的并行计算。 描述中提到的“命令行CPU负载均衡器”,暗示了这个工具可能具有以下功能: 1. **...
一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 举个例子:如果我们已经有一个通过Queue通信的...
线程允许程序同时执行多个任务,但要注意GIL(全局解释器锁)的存在,它限制了同一时刻只有一个线程执行Python字节码,从而在多核环境下并不能实现真正的并行计算。 2. **进程(Processes)**:为了克服GIL的限制,...
Python-pagmo,全称为Parallel Global Optimization Toolbox,是一个基于C++和Python的开源库,专为处理大规模并行优化问题而设计。它允许用户利用多核处理器、GPU以及分布式计算资源进行高效地优化计算。这个库的...
一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 举个例子:如果我们已经有一个通过Queue通信的...
在提供的"agentscope_main.zip"文件中,可能包含了实现这样一个系统的主程序或框架。"说明.txt"文件可能提供了关于如何运行和配置该系统的指导。为了进一步理解和使用这些资源,你需要解压文件,阅读文档,了解代码...