`

Python使用multiprocessing实现一个最简单的分布式作业调度系统

阅读更多

介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。

job.py

1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3  
4 class Job:
5     def __init__(self, job_id):
6         self.job_id = job_id

Master

Master用来派发作业和显示运行完成的作业信息

master.py

01 #!/usr/bin/env python
02 # -*- coding: utf-8 -*-
03  
04 from Queue import Queue
05 from multiprocessing.managers import BaseManager
06 from job import Job
07  
08  
09 class Master:
10  
11     def __init__(self):
12         # 派发出去的作业队列
13         self.dispatched_job_queue = Queue()
14         # 完成的作业队列
15         self.finished_job_queue = Queue()
16  
17     def get_dispatched_job_queue(self):
18         return self.dispatched_job_queue
19  
20     def get_finished_job_queue(self):
21         return self.finished_job_queue
22  
23     def start(self):
24         # 把派发作业队列和完成作业队列注册到网络上
25         BaseManager.register('get_dispatched_job_queue',callable=self.get_dispatched_job_queue)
26         BaseManager.register('get_finished_job_queue',callable=self.get_finished_job_queue)
27  
28         # 监听端口和启动服务
29         manager = BaseManager(address=('0.0.0.0'8888), authkey='jobs')
30         manager.start()
31  
32         # 使用上面注册的方法获取队列
33         dispatched_jobs = manager.get_dispatched_job_queue()
34         finished_jobs = manager.get_finished_job_queue()
35  
36         # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
37         job_id = 0
38         while True:
39             for in range(010):
40                 job_id = job_id + 1
41                 job = Job(job_id)
42                 print('Dispatch job: %s' % job.job_id)
43                 dispatched_jobs.put(job)
44  
45             while not dispatched_jobs.empty():
46                 job = finished_jobs.get(60)
47                 print('Finished Job: %s' % job.job_id)
48  
49         manager.shutdown()
50  
51 if __name__ == "__main__":
52     master = Master()
53     master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py

01 #!/usr/bin/env python
02 # -*- coding: utf-8 -*-
03  
04 import time
05 from Queue import Queue
06 from multiprocessing.managers import BaseManager
07 from job import Job
08  
09  
10 class Slave:
11  
12     def __init__(self):
13         # 派发出去的作业队列
14         self.dispatched_job_queue = Queue()
15         # 完成的作业队列
16         self.finished_job_queue = Queue()
17  
18     def start(self):
19         # 把派发作业队列和完成作业队列注册到网络上
20         BaseManager.register('get_dispatched_job_queue')
21         BaseManager.register('get_finished_job_queue')
22  
23         # 连接master
24         server = '127.0.0.1'
25         print('Connect to server %s...' % server)
26         manager = BaseManager(address=(server, 8888), authkey='jobs')
27         manager.connect()
28  
29         # 使用上面注册的方法获取队列
30         dispatched_jobs = manager.get_dispatched_job_queue()
31         finished_jobs = manager.get_finished_job_queue()
32  
33         # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
34         while True:
35             job = dispatched_jobs.get(timeout=1)
36             print('Run job: %s ' % job.job_id)
37             time.sleep(1)
38             finished_jobs.put(job)
39  
40 if __name__ == "__main__":
41     slave = Slave()
42     slave.start()

测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master

01 $ python master.py
02 Dispatch job: 1
03 Dispatch job: 2
04 Dispatch job: 3
05 Dispatch job: 4
06 Dispatch job: 5
07 Dispatch job: 6
08 Dispatch job: 7
09 Dispatch job: 8
10 Dispatch job: 9
11 Dispatch job: 10
12 Finished Job: 1
13 Finished Job: 2
14 Finished Job: 3
15 Finished Job: 4
16 Finished Job: 5
17 Finished Job: 6
18 Finished Job: 7
19 Finished Job: 8
20 Finished Job: 9
21 Dispatch job: 11
22 Dispatch job: 12
23 Dispatch job: 13
24 Dispatch job: 14
25 Dispatch job: 15
26 Dispatch job: 16
27 Dispatch job: 17
28 Dispatch job: 18
29 Dispatch job: 19
30 Dispatch job: 20
31 Finished Job: 10
32 Finished Job: 11
33 Finished Job: 12
34 Finished Job: 13
35 Finished Job: 14
36 Finished Job: 15
37 Finished Job: 16
38 Finished Job: 17
39 Finished Job: 18
40 Dispatch job: 21
41 Dispatch job: 22
42 Dispatch job: 23
43 Dispatch job: 24
44 Dispatch job: 25
45 Dispatch job: 26
46 Dispatch job: 27
47 Dispatch job: 28
48 Dispatch job: 29
49 Dispatch job: 30

slave1

01 $ python slave.py
02 Connect to server 127.0.0.1...
03 Run job: 1
04 Run job: 2
05 Run job: 3
06 Run job: 5
07 Run job: 7
08 Run job: 9
09 Run job: 11
10 Run job: 13
11 Run job: 15
12 Run job: 17
13 Run job: 19
14 Run job: 21
15 Run job: 23

slave2

01 $ python slave.py
02 Connect to server 127.0.0.1...
03 Run job: 4
04 Run job: 6
05 Run job: 8
06 Run job: 10
07 Run job: 12
08 Run job: 14
09 Run job: 16
10 Run job: 18
11 Run job: 20
12 Run job: 22
13 Run job: 24

本文地址:http://www.kongxx.info/blog/?p=512

分享到:
评论

相关推荐

    Python利用multiprocessing实现最简单的分布式作业调度系统实例

    想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。在这之前,我们先来详细了解下python中的多进程管理包multiprocessing。 multiprocessing.Process multiprocessing包是Python中的多进程管理包。...

    Python技术的分布式任务调度与并行计算方法.docx

    Celery是一个分布式任务队列,它提供了任务分解、任务分派、执行和结果收集等功能,支持灵活的任务调度策略。而Dask则是一个并行计算库,能够创建分布式集群,自动分配任务并处理大规模数据。 实现分布式任务调度...

    Python分布式文件系统源码.zip

    在深入学习这个Python分布式文件系统的源码时,你需要理解上述各个模块的设计和实现,熟悉分布式系统的基本原理,如CAP理论、Paxos协议、Chubby锁服务等。此外,对Python编程、网络编程、并发控制和数据存储等相关...

    Python如何快速实现分布式任务

    之前,为了在Python中实现生产者-消费者模式,往往就会选择一个额外的队列系统,比如rabbitMQ之类。此外,你有可能还要设计一套任务对象的序列化方式以便塞入队列。如果没有队列的支持,那不排除有些同学不得不从...

    Python之分布式进程共8页.pdf.zip

    在Python中,可以使用`concurrent.futures`模块的`ProcessPoolExecutor`来创建一个进程池,它可以接受函数和参数列表,将任务分发到进程池中的各个进程。结合`multiprocessing.Queue`,可以实现进程间的通信和结果...

    python分布式编程实现过程解析

    接下来,我们将通过一个简单的Python代码示例来展示如何实现一个分布式任务调度系统。我们使用了Python标准库中的`multiprocessing`模块以及其`managers`子模块来实现进程间通信。示例中包括两个主要部分:`task_...

    基于Python和C_C++的分布式计算架构.zip

    6. Python的分布式库:例如,`multiprocessing`库可以实现多进程并行,`concurrent.futures`提供了一种抽象接口来异步执行可等待对象,`Celery`则是一个分布式任务队列,适合周期性任务和后台作业。 7. C/C++的...

    一个还正在完善的项目,采用分布式python扫描全国的HTTP服务-simple_zoomeye.zip

    2. **分布式计算**:项目可能使用了如`multiprocessing`、`concurrent.futures`等Python库来实现分布式任务调度,或者使用了Apache Spark、Hadoop等大数据处理框架进行分布式计算。 3. **HTTP服务探测**:项目可能...

    基于Python的操作系统课程设计

    在本操作系统课程设计中,我们将深入探讨如何使用Python这一强大且灵活的编程语言来实现操作系统的基本功能,特别是进程管理。Python虽然通常不用于构建完整的操作系统,但它的易读性、丰富的库支持以及强大的抽象...

    python3学习笔记之多进程分布式小例子

    一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 master服务端原理:通过managers模块把Queue...

    Python-Parallel一个Rust开发的命令行CPU负载均衡器

    因此,使用Rust编写的“Parallel”可能作为一个中间层,帮助Python更好地利用硬件资源,尤其是多核CPU,实现更高效的并行计算。 描述中提到的“命令行CPU负载均衡器”,暗示了这个工具可能具有以下功能: 1. **...

    在Python程序中实现分布式进程的教程

    一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 举个例子:如果我们已经有一个通过Queue通信的...

    Python-PythonParallelProgrammingCookbook中文版

    线程允许程序同时执行多个任务,但要注意GIL(全局解释器锁)的存在,它限制了同一时刻只有一个线程执行Python字节码,从而在多核环境下并不能实现真正的并行计算。 2. **进程(Processes)**:为了克服GIL的限制,...

    Python-pagmo一个CPython大规模并行优化计算库

    Python-pagmo,全称为Parallel Global Optimization Toolbox,是一个基于C++和Python的开源库,专为处理大规模并行优化问题而设计。它允许用户利用多核处理器、GPU以及分布式计算资源进行高效地优化计算。这个库的...

    详解python分布式进程

    一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。 举个例子:如果我们已经有一个通过Queue通信的...

    Python_开始以更简单的方式构建基于llmems的多代理应用程序.zip

    在提供的"agentscope_main.zip"文件中,可能包含了实现这样一个系统的主程序或框架。"说明.txt"文件可能提供了关于如何运行和配置该系统的指导。为了进一步理解和使用这些资源,你需要解压文件,阅读文档,了解代码...

Global site tag (gtag.js) - Google Analytics