`
xiaoyu966
  • 浏览: 258171 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

python基于mysql实现的简单队列以及跨进程锁

阅读更多

============================================================================

原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。

请注明转自:http://yunjianfei.iteye.com/blog/

============================================================================

 

在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源)。

 

举个例子:

假设我们用mysql来实现一个任务队列,实现的过程如下:

1. 在Mysql中创建Job表,用于储存队列任务,如下:

create table jobs(
    id auto_increment not null primary key,
    message text not null,
    job_status not null default 0
);

message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 

 

2. 有一个生产者进程,往job表中放新的数据,进行排队

insert into jobs(message) values('msg1');

 

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

 

这里我贴出非常好的一篇文章,大家可以参照一下:

https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

 

=========================华丽的分割线=======================================

 

说道跨进程的锁实现,我们主要有几种实现方式:

1. 信号量

2. 文件锁fcntl

3. socket(端口号绑定)

4. signal

这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。

 

查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈,链接如下:

http://dev.mysql.com/doc/refman/5.0/fr/miscellaneous-functions.html

 

我用python实现了一个demo,如下:

 

文件名:glock.py

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#   Author  :   yunjianfei
#   E-mail  :   yunjianfei@126.com
#   Date    :   2014/02/25
#   Desc    :
#

import logging, time
import MySQLdb


class Glock:
    def __init__(self, db):
        self.db = db

    def _execute(self, sql):
        cursor = self.db.cursor()
        try:
            ret = None
            cursor.execute(sql)
            if cursor.rowcount != 1:
                logging.error("Multiple rows returned in mysql lock function.")
                ret = None
            else:
                ret = cursor.fetchone()
            cursor.close()
            return ret
        except Exception, ex:
            logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
            cursor.close()
            return None

    def lock(self, lockstr, timeout):
        sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)
        ret = self._execute(sql)

        if ret[0] == 0:
            logging.debug("Another client has previously locked '%s'.", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' was obtained successfully.", lockstr)
            return True
        else:
            logging.error("Error occurred!")
            return None

    def unlock(self, lockstr):
        sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)
        ret = self._execute(sql)
        if ret[0] == 0:
            logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' the lock was released.", lockstr)
            return True
        else:
            logging.error("The lock '%s' did not exist.", lockstr)
            return None

#Init logging
def init_logging():
    sh = logging.StreamHandler()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))

def main():
    init_logging()
    db = MySQLdb.connect(host='localhost', user='root', passwd='')
    lock_name = 'queue'

    l = Glock(db)

    ret = l.lock(lock_name, 10)
    if ret != True:
        logging.error("Can't get lock! exit!")
        quit()
    time.sleep(10)
    logging.info("You can do some synchronization work across processes!")
    ##TODO
    ## you can do something in here ##
    l.unlock(lock_name)

if __name__ == "__main__":
    main()

在main函数里, l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。

 

 

在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的:

   3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。

 

测试的时候,启动两个glock.py, 结果如下:

[@tj-10-47 test]# ./glock.py 
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.

可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#

 

 

 

1
2
分享到:
评论
3 楼 gfdice 2014-03-17  
xiaoyu966 写道
gfdice 写道
效率会不会太低


这个看应用场景了,如果你要用于很大并发的分布式系统,且队列只要求满足生成消费的功能,可以采用专业的MQ了,比如rabbitMQ,activeMQ等。但是这些MQ都有自己的特点,比如rabbitMQ我记得是不支持完全优先级队列的。且他们都适合消息刚进来就被消费掉的场景。

我做这个队列是为了满足如下需求:
1. 所有的任务都是长时间的任务,至少要几分钟才能完成。
2. 涉及到了状态机,任务运行中有好几种状态(不只是出入队列的状态),且有相应的处理函数。
3. 消费者端是单进程单线程的,一次只消费一个任务,消费之后修改任务状态,交给后端分布式处理系统处理,然后继续消费新消息。(启动多个消费者进程负载均衡,中间通过以上实现的锁来同步队列)
4. 队列中未出队列的任务,要求随时可以调整优先级,并且可以看到是第几个运行。

这个mysql的锁,我看了很多国外的一些相关讨论,性能应该还可以,但是对于非常庞大的并发可能会有问题。但是我相信这个都可以解决,只要找到造成问题的根源,都可以解决。


http://optimmysql.blogspot.com/2007/11/getlock-family.html
http://www.xaprb.com/blog/2006/07/26/how-to-coordinate-distributed-work-with-mysqls-get_lock/
http://techblog.procurios.nl/k/n618/news/view/41405/14863/mysql-get_lock()-explained.html


ok 感谢回复
2 楼 xiaoyu966 2014-03-17  
gfdice 写道
效率会不会太低


这个看应用场景了,如果你要用于很大并发的分布式系统,且队列只要求满足生成消费的功能,可以采用专业的MQ了,比如rabbitMQ,activeMQ等。但是这些MQ都有自己的特点,比如rabbitMQ我记得是不支持完全优先级队列的。且他们都适合消息刚进来就被消费掉的场景。

我做这个队列是为了满足如下需求:
1. 所有的任务都是长时间的任务,至少要几分钟才能完成。
2. 涉及到了状态机,任务运行中有好几种状态(不只是出入队列的状态),且有相应的处理函数。
3. 消费者端是单进程单线程的,一次只消费一个任务,消费之后修改任务状态,交给后端分布式处理系统处理,然后继续消费新消息。(启动多个消费者进程负载均衡,中间通过以上实现的锁来同步队列)
4. 队列中未出队列的任务,要求随时可以调整优先级,并且可以看到是第几个运行。

这个mysql的锁,我看了很多国外的一些相关讨论,性能应该还可以,但是对于非常庞大的并发可能会有问题。但是我相信这个都可以解决,只要找到造成问题的根源,都可以解决。


http://optimmysql.blogspot.com/2007/11/getlock-family.html
http://www.xaprb.com/blog/2006/07/26/how-to-coordinate-distributed-work-with-mysqls-get_lock/
http://techblog.procurios.nl/k/n618/news/view/41405/14863/mysql-get_lock()-explained.html
1 楼 gfdice 2014-03-15  
效率会不会太低

相关推荐

Global site tag (gtag.js) - Google Analytics