`
xiaoyu966
  • 浏览: 257878 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

python基于mysql实现的简单队列以及跨进程锁

阅读更多

============================================================================

原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。

请注明转自:http://yunjianfei.iteye.com/blog/

============================================================================

 

在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源)。

 

举个例子:

假设我们用mysql来实现一个任务队列,实现的过程如下:

1. 在Mysql中创建Job表,用于储存队列任务,如下:

create table jobs(
    id auto_increment not null primary key,
    message text not null,
    job_status not null default 0
);

message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 

 

2. 有一个生产者进程,往job表中放新的数据,进行排队

insert into jobs(message) values('msg1');

 

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

 

这里我贴出非常好的一篇文章,大家可以参照一下:

https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

 

=========================华丽的分割线=======================================

 

说道跨进程的锁实现,我们主要有几种实现方式:

1. 信号量

2. 文件锁fcntl

3. socket(端口号绑定)

4. signal

这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。

 

查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈,链接如下:

http://dev.mysql.com/doc/refman/5.0/fr/miscellaneous-functions.html

 

我用python实现了一个demo,如下:

 

文件名:glock.py

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#   Author  :   yunjianfei
#   E-mail  :   yunjianfei@126.com
#   Date    :   2014/02/25
#   Desc    :
#

import logging, time
import MySQLdb


class Glock:
    def __init__(self, db):
        self.db = db

    def _execute(self, sql):
        cursor = self.db.cursor()
        try:
            ret = None
            cursor.execute(sql)
            if cursor.rowcount != 1:
                logging.error("Multiple rows returned in mysql lock function.")
                ret = None
            else:
                ret = cursor.fetchone()
            cursor.close()
            return ret
        except Exception, ex:
            logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
            cursor.close()
            return None

    def lock(self, lockstr, timeout):
        sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)
        ret = self._execute(sql)

        if ret[0] == 0:
            logging.debug("Another client has previously locked '%s'.", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' was obtained successfully.", lockstr)
            return True
        else:
            logging.error("Error occurred!")
            return None

    def unlock(self, lockstr):
        sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)
        ret = self._execute(sql)
        if ret[0] == 0:
            logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' the lock was released.", lockstr)
            return True
        else:
            logging.error("The lock '%s' did not exist.", lockstr)
            return None

#Init logging
def init_logging():
    sh = logging.StreamHandler()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))

def main():
    init_logging()
    db = MySQLdb.connect(host='localhost', user='root', passwd='')
    lock_name = 'queue'

    l = Glock(db)

    ret = l.lock(lock_name, 10)
    if ret != True:
        logging.error("Can't get lock! exit!")
        quit()
    time.sleep(10)
    logging.info("You can do some synchronization work across processes!")
    ##TODO
    ## you can do something in here ##
    l.unlock(lock_name)

if __name__ == "__main__":
    main()

在main函数里, l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。

 

 

在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的:

   3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。

 

测试的时候,启动两个glock.py, 结果如下:

[@tj-10-47 test]# ./glock.py 
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.

可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#

 

 

 

1
2
分享到:
评论
3 楼 gfdice 2014-03-17  
xiaoyu966 写道
gfdice 写道
效率会不会太低


这个看应用场景了,如果你要用于很大并发的分布式系统,且队列只要求满足生成消费的功能,可以采用专业的MQ了,比如rabbitMQ,activeMQ等。但是这些MQ都有自己的特点,比如rabbitMQ我记得是不支持完全优先级队列的。且他们都适合消息刚进来就被消费掉的场景。

我做这个队列是为了满足如下需求:
1. 所有的任务都是长时间的任务,至少要几分钟才能完成。
2. 涉及到了状态机,任务运行中有好几种状态(不只是出入队列的状态),且有相应的处理函数。
3. 消费者端是单进程单线程的,一次只消费一个任务,消费之后修改任务状态,交给后端分布式处理系统处理,然后继续消费新消息。(启动多个消费者进程负载均衡,中间通过以上实现的锁来同步队列)
4. 队列中未出队列的任务,要求随时可以调整优先级,并且可以看到是第几个运行。

这个mysql的锁,我看了很多国外的一些相关讨论,性能应该还可以,但是对于非常庞大的并发可能会有问题。但是我相信这个都可以解决,只要找到造成问题的根源,都可以解决。


http://optimmysql.blogspot.com/2007/11/getlock-family.html
http://www.xaprb.com/blog/2006/07/26/how-to-coordinate-distributed-work-with-mysqls-get_lock/
http://techblog.procurios.nl/k/n618/news/view/41405/14863/mysql-get_lock()-explained.html


ok 感谢回复
2 楼 xiaoyu966 2014-03-17  
gfdice 写道
效率会不会太低


这个看应用场景了,如果你要用于很大并发的分布式系统,且队列只要求满足生成消费的功能,可以采用专业的MQ了,比如rabbitMQ,activeMQ等。但是这些MQ都有自己的特点,比如rabbitMQ我记得是不支持完全优先级队列的。且他们都适合消息刚进来就被消费掉的场景。

我做这个队列是为了满足如下需求:
1. 所有的任务都是长时间的任务,至少要几分钟才能完成。
2. 涉及到了状态机,任务运行中有好几种状态(不只是出入队列的状态),且有相应的处理函数。
3. 消费者端是单进程单线程的,一次只消费一个任务,消费之后修改任务状态,交给后端分布式处理系统处理,然后继续消费新消息。(启动多个消费者进程负载均衡,中间通过以上实现的锁来同步队列)
4. 队列中未出队列的任务,要求随时可以调整优先级,并且可以看到是第几个运行。

这个mysql的锁,我看了很多国外的一些相关讨论,性能应该还可以,但是对于非常庞大的并发可能会有问题。但是我相信这个都可以解决,只要找到造成问题的根源,都可以解决。


http://optimmysql.blogspot.com/2007/11/getlock-family.html
http://www.xaprb.com/blog/2006/07/26/how-to-coordinate-distributed-work-with-mysqls-get_lock/
http://techblog.procurios.nl/k/n618/news/view/41405/14863/mysql-get_lock()-explained.html
1 楼 gfdice 2014-03-15  
效率会不会太低

相关推荐

    python基于mysql实现的简单队列以及跨进程锁实例详解

    在本文中,我们将探讨如何利用MySQL数据库来实现一个简单的队列系统以及跨进程锁,以解决多进程之间的资源同步问题。 首先,我们要建立一个基于MySQL的简单任务队列。队列在多进程环境中的作用是存储待处理的任务,...

    python 工程师技能图谱

    熟悉进程间通信(如队列、管道、共享内存和锁),并了解如何处理并发环境下的问题,如GIL锁对多线程性能的影响。 9. **Web服务与API**:了解RPC原理,特别是gRPC实战,以及使用RabbitMQ实现消息队列。熟悉分布式...

    python-study是本人在学习python的过程中的一些示例代码。从基础的入门,到常用的mysql操作、多线程.zip

    4. **GIL(全局解释器锁)**:理解Python的全局解释器锁对多线程的影响,以及如何通过其他并发模型(如进程)来克服。 5. **线程间的通信**:使用队列(Queue)进行线程间的异步数据交换。 综上所述,"python-...

    Python高级-全部(html版).rar

    在Python中,多进程是通过`multiprocessing`模块实现的。多进程允许程序同时执行多个独立的计算任务,从而提高计算效率,尤其适用于CPU密集型任务。你可以创建子进程,进行进程间通信,例如使用队列或管道进行数据...

    python3.5全栈工程师零基础到项目实战全套

    - **进程同步**:使用锁、队列等工具来同步进程之间的数据交换。 #### 第二阶段:Python3.5 WEB开发篇 ##### 11.Python3.5的消息机制 - **消息队列**:使用消息队列实现异步消息传递。 - **消息中间件**:如...

    Python高级-全部(html版).zip

    理解进程的概念、进程间通信(如队列、管道、信号量)以及如何创建和管理进程是进阶Python开发者的必备技能。 3. **线程**: Python的`threading`模块支持线程,线程是轻量级的并发执行单元,用于在单个进程中执行...

    完整版 Python高级开发课程 高级教程 10 Python Web开发框架Django实战.pptx

    这部分内容会探讨Python中的并发编程,包括线程(threading模块)和进程(multiprocessing模块)的使用,以及GIL(全局解释器锁)对多线程的影响。此外,可能还会涉及到异步I/O和协程(asyncio库)的概念。 ### ...

    python核心编程第三版各章节代码

    - 使用Python操作MySQL:使用pymysql或mysql-connector-python库连接MySQL。 8. **并发编程**: - 进程与线程:理解GIL(全局解释器锁)和多线程、多进程的区别。 - 并发工具:如threading模块的使用,事件...

    python高级编程(第2版) Michal Jaworski

    10. **高级数据结构**:除了基本的列表、字典和集合,Python还提供了`deque`(双端队列)、`heapq`(堆队列)等高级数据结构,以及`bisect`模块用于排序。 11. **模块和包管理**:Python的模块化设计使得代码组织和...

    《零基础:21天搞定Python分布爬虫》课件

    9. **多线程与异步IO**:理解Python的GIL(全局解释器锁)和多线程、多进程的使用,以及如何利用异步库如asyncio提高爬虫效率。 最后,进入分布式爬虫的专题: 10. **分布式爬虫原理**:介绍分布式爬虫的概念,...

    MySQL报警脚本

    系统负载通常由平均负载(load average)来衡量,这是一个反映在特定时间间隔内运行队列中的进程数的统计值。当平均负载超过预设阈值时,脚本会判断系统过载。同时,会话(session)数也是一个重要的指标,过多的...

    完整版 Python高级开发课程 高级教程 01 Python语言开发要点详解.pptx

    8. **Python多线程 多进程开发**: 介绍Python的并发编程,如threading模块的使用,线程同步机制(锁、信号量、事件),以及multiprocessing模块进行多进程编程,探讨GIL(全局解释器锁)的影响和解决策略。...

    完整版 Python高级开发课程 高级教程 05 Python数据采集 网络爬虫 网页爬虫.pptx

    - GIL(全局解释器锁):探讨GIL对Python多线程的影响,以及如何通过多进程规避。 - 并发工具:使用`threading`和`multiprocessing`库实现多线程、多进程。 9. **Python爬虫框架Scrapy实战**: - Scrapy架构:...

    Python高级软件开发技术(数据结构,Linux,网络并发编程,RE模块,MySQL).zip

    Python提供了线程、进程、异步I/O(如asyncio库)等机制来实现并发。理解同步与异步、锁、信号量等概念,并掌握如何避免和解决并发中的问题,如死锁,是这一部分的重点。 4. **RE模块**: Python的re模块提供了...

    多进程爬取在线课程并存入MySQL数据库.rar

    Python的`multiprocessing`模块提供了队列(Queue)、管道(Pipe)等工具,用于进程间交换数据。在这个项目中,可能使用队列来传递待爬取的URL或存储爬取结果。 9. **错误处理与日志记录**:为了确保爬虫的稳定运行...

    python入门到高级全栈工程师培训 第3期 附课件代码

    python入门到高级全栈工程师培训视频学习资料;本资料仅用于学习,请查看后24小时之内删除。 【课程内容】 第1章 01 计算机发展史 02 计算机系统 03 小结 04 数据的概念 05 进制转换 06 原码补码反码 07 物理层和...

    最新Python3.5零基础+高级+完整项目(28周全)培训视频学习资料

    进程锁与进程池详解 协程 协程Gevent 协程之爬虫 协程之Socket IO多路复用 IO模式 Select解析Socket通信 作业 第11周 鸡汤 消息队列介绍 RabbitMQ基本示例 RabbitMQ消息分发轮询 RabbitMQ消息持久化 RabbitMQ ...

    Python Cookbook

    3.14 用Python实现的简单加法器 133 3.15 检查信用卡校验和 136 3.16 查看汇率 137 第4章 Python技巧 139 引言 139 4.1 对象拷贝 140 4.2 通过列表推导构建列表 144 4.3 若列表中某元素存在则返回之 146 ...

Global site tag (gtag.js) - Google Analytics