`
xiaoyu966
  • 浏览: 258158 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

python-gearman之多个GearmanServer问题

阅读更多

============================================================================

原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。

请注明转自:http://yunjianfei.iteye.com/blog/

============================================================================

 

问题描述

 

1.开启多个GearmanServer,端口号分别为4730、4731

2.使用python-gearman开发client和worker,开启多个worker连接4730和4731两个Server

3.client发送大量任务到GearmanServer

4.多个worker中的一个或者多个在接收一个或者若干个任务后,再也不接收任务了

 

代码实现

注意:

在编写代码前,应该先安装python-gearman,官网链接如下:

https://pypi.python.org/pypi/gearman/

 

1.文件名:test.bash

 

# test.bash
for i in {0..5}; do
    python gearman_client.py
    sleep 2
done

 

 

 2.文件名:gearman_client.py

 

# gearman_client.py
import multiprocessing
import gearman
import traceback

def start_gearman_client(process_id):
    gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
    try:
        requests = []
        for gm_job_id in range(500):
            request = gm_client.submit_job(
                    task='do_task',
                    data='%d_%03d' % (process_id,gm_job_id),
                    unique='%d_%03d' % (process_id,gm_job_id),
                    background=False,wait_until_complete=False)
            requests.append(request)
        gm_client.wait_until_jobs_completed(requests)
    except:
        print traceback.format_exc()
    return 0

def main():
    child_processes = []
    for process_id in range(2):
        p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
        child_processes.append((process_id,p))
        p.start()

    for (pid,child) in child_processes:
        print 'Confirming that child number %d had died' % pid
        child.join()

if __name__ == '__main__':
    main()

 

 

 3.文件名:gearman_worker.py

 

# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial

def do_work(gearman_worker,gearman_job,worker_id):
    try:
        print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
        time.sleep(0.001)
    except:
        print traceback.format_exc()
    return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)

def start_gearman_worker(worker_id):
    gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
    gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
    print 'Worker %d start working' % worker_id
    gm_worker.work()

if __name__ == '__main__':
    workers = []
    for pid in range(8):
        worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
        workers.append(worker)
        worker.start()
    for worker in workers:
        worker.join()

 

 

问题复现

1.开启多个GearmanServer,端口号分别为4730、4731

2.运行命令 python gearman_worker.py

3.运行脚本 /bin/bash test.bash

4.查看worker打印输出的log,可以发现log末尾只有部分worker在接收任务并执行,有几个worker是只执行了一次任务就再也不执行了。(注意:这种状况如果没发生,多执行几次test.bash,然后观察)

 

这里我贴出其中一次测试结果:

测试结果
Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499

 可以看到,后面只有worker2、4、5在执行任务,其他的worker都不工作了。

 

问题分析与解决

在开始问题分析之前,先了解一下GearmanServer与client、worker之间大概的工作流程。



1.client提交一个Job到GearmanServer

2.GearmanServer找出所有Sleeping的worker

3.GearmanServer向这些Sleeping的worker发送'noop'命令,唤醒worker

4.worker向GearmanServer发送Grab_Job的命令获取Job

 

根据上面的流程,我们可以知道,worker要获取Job,首先必须受到GearmanServer发送的“NOOP”命令。那么,worker不执行任务了,不外乎以下几种情况:

1. worker同GearmanServer断开了连接

2. GearmanServer之中因为某些原因,没有向所有worker发送“NOOP”

 

这两点也是可以通过代码来验证的,我当时在python-gearman中加了一些打印输出,可以确定worker同GearmanServer的连接并没有断开。那么,原因只能是GearmanServer没有向所有的worker发送NOOP。

 

这样的话,有以下两种方案来解决:

1.查阅GearmanServer(C实现),修改源码,对所有worker发送NOOP

2.修改python-gearman的代码,使得所有worker能够主动从GearmanServer获取Job

 

对于第一种方案,显然成本很高,而且风险极大,可能会引发更大的BUG,显然不可取。所以我们选择第二种方案,让worker主动去GearmanServer拉取Job,而不是死等“NOOP”。

 

第二种方案怎么实现呢?这里我就不详细介绍了,一切尽在代码中,以下是我在github发布的源码,是基于python-gearman的官方版本修改出来的,链接如下:

https://github.com/yunjianfei/python-gearman

 

主要特性有:

1.加入了TCP keepalive特性

2.采用epoll,效率更高

3.worker加入主动抓取任务的功能,保证能从多个server获取任务

 

希望大家踊跃使用,如果发现bug,请联系我,谢谢

 

  • 大小: 91.2 KB
1
4
分享到:
评论

相关推荐

    java-gearman-service-0.6.6.zip

    java-gearman-service-0.6.6.zip 包,gearman分为3部分,client - server - worker,创建 java 版本的client和worker部分。 其实在gearman中,client和worker的编写不复杂,但是不同厂商提供的API是不大相同的,本...

    java-gearman-service(gearman-java-service)

    java实现gearman的job实现的jar包,包括gearman server,client和work客户端API

    java-gearman-service-0.6.6.jar

    java实现gearman的job实现的jar包,包括gearman server,client和work客户端API

    java-gearman-service jar

    gearman的java库有两个,一个是gearman service ,一个是gearman java,相比来说service版本更好用一些,并且网上的教程一般是用的这个版本。因此我打好了gearman service的包提供给需要的开发者使用。

    Go-G2-Gearman服务器的一个GO现代化实现

    G2 - Gearman服务器的一个GO现代化实现

    Laravel开发-php-gearman

    Gearman 是一个跨语言的、通用的任务调度框架,它可以将任务分发到多台服务器上,使得大规模应用可以有效地利用系统资源。 ### Gearman 的核心概念 1. **Worker**:工作进程,负责接收并执行 Gearman 服务器分发的...

    Laravel开发-laravel-gearman-rpc

    $client->addServer(config('services.gearman.default.host'), config('services.gearman.default.port')); $job = new GearmanJob('my_task'); $job->setUnique(md5(microtime(true))); $job->setData(json_...

    PyPI 官网下载 | python-libgearman-0.10.5.tar.gz

    其中,Gearman是一个分布式任务队列系统,而Python-libgearman则是Python与Gearman交互的接口库,它使得Python开发者能够轻松地利用Gearman的功能。本文将深入探讨Python-libgearman-0.10.5这一版本的具体内容和应用...

    java-gearman-service-0.6.6.jar gearman java调度工具包

    在项目中使用这个压缩包,你需要包含`java-gearman-service依赖包`中的所有文件,确保运行时库能够正常加载和运行。这可能包括JAR文件和其他必要的资源文件。确保你的构建系统(如Maven或Gradle)正确地配置了依赖...

    go-gearman:戈朗工作调度员

    齿轮手 分片密钥散列到同一队列,每个队列都绑定到一个工作器。 用法 初始装备 ... "github.com/rfyiamcool/go-gearman" ) func main () { gm := gearman . NewGearman ( 10 , 1000 ) gm . Start

    pecl-gearman:libgearmanPHP包装器

    Gearman支持多种编程语言,包括C、Python、Perl、Java等,而pecl-gearman就是PHP对Gearman的官方支持。 **libgearman库** libgearman是Gearman的C语言客户端和服务器库。它提供了一系列的API函数,用于连接...

    Python库 | gearman-1.3.1.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:gearman-1.3.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    node-gearman::gear:Gearman客户端和节点的工作程序

    多写一些样板每个客户/员工仅支持1个服务器连接用法例子创建一个客户,创建1个工作,并完成工作const gearman = require ( 'gearman' )let client = gearman ( "localhost" , 4730 , { timeout : 3000 } ) // time...

    gearman-mysql-udf-0.6.tar.gz

    这个名为 "gearman-mysql-udf-0.6.tar.gz" 的压缩包包含了一个 Gearman 与 MySQL 结合使用的用户定义函数(UDF)插件,版本为 0.6。这个插件使得 MySQL 数据库可以直接与 Gearman 服务进行交互,扩展了 MySQL 的功能...

    Python库 | gearman-2.0.1.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:gearman-2.0.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    gearman下载gearman下载

    首先,我们看到一个名为`gearman-java-0.04-src.tar.gz`的文件,这是Gearman Java客户端的源代码包。这个版本为0.04,以tar.gz格式压缩,通常用于在Linux或Unix-like系统中进行编译和安装。源代码提供了对Gearman ...

    docker-gearman:Docker Gearman服务器

    码头工人Docker Gearman服务器

    Express-Js-Gearman-样本

    Sample Express Js,Gearman Worker,后端backend =>包含一个使用express.js实现的node.js后端gearman-worker =>包含一个基本的node.js gearman工作,只有一个工作(编码),该工作将输入数据编码为base64后端Api ...

    Gearman Worker实例 C++ vs2008

    Gearman是一个分布式任务队列系统,它允许应用程序将工作分发到多个服务器或进程,以实现负载均衡和异步处理。在本实例中,我们关注的是如何在C++环境中,利用Visual Studio 2008(VS2008)在Windows平台上创建一个...

Global site tag (gtag.js) - Google Analytics