`
san_yun
  • 浏览: 2652283 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

python-memcached的线程安全问题

 
阅读更多

答案是肯定的,前提你在使用Python 2.4+和python-memcached 1.36+
为什么我们需要线程安全的memcached client,因为我们的实际应用一般是多线程的模型,例如cherrypy、twisted,如果python-memcached不是线程安全的话, 引起的问题不仅仅是并发修改共享变量这么简单,是外部socket链接的数据流的混乱
python-memcached怎么实现线程安全的呢?查看源代码看到

try:
    # Only exists in Python 2.4+
    from threading import local
except ImportError:
    # TODO:  add the pure-python local implementation
    class local(object):
        pass

 class Client(local): 很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定的。实现虽然不太优雅,但是很实在
但是别以为这样就可以随便在线程里面用python-memcached了,因为这种thread local的做法,你的应用必须要使用thread pool的模式,而不能不停创建销毁thread,因为每一个新线程的创建,对于就会使用一个全新的Client,也就是一个全新的socket链接,如 果不停打开创建销毁thread的话,就会导致不停的创建销毁socket链接,导致性能大量下降。幸好,无论是cherrypy还是twisted,都 是使用了thread pool的模式。

 

但是不幸的是gevent不是thread pool的模式,这导致不停的创建销毁socket链接。

gevent
# ThreadID: 47316030715200
=============================
File: "/data1/duitang/dist/app/test/7199/duitang/common/templatetags/myTags.py", line 525, in statichtml
return StaticHTML_Tag(static_name)
File: "/data1/duitang/dist/app/test/7199/duitang/common/templatetags/myTags.py", line 507, in __init__
sh = StaticHTML.objects.get(name=name)
File: "/data1/duitang/dist/app/test/7199/duitang/statichtml/manager.py", line 18, in get
model = key and cache.get(key)
File: "/data1/duitang/dist/app/test/7199/duitang/perf/memcached.py", line 25, in get
rp = super(MemcachedCache, self).get(key, deault, version)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/cache/backends/memcached.py", line 58, in get
val = self._cache.get(key)
File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 180, in __getattribute__
_init_locals(self)
File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 167, in _init_locals
cls.__init__(self, *args, **kw)
File: "/data1/duitang/dist/app/test/7199/duitang/memcache.py", line 168, in __init__
print cf1.stacktraces()
File: "/data1/duitang/dist/app/test/7199/duitang/cf1.py", line 8, in stacktraces
for filename, lineno, name, line in traceback.extract_stack(stack):

========================================================
# ThreadID: 47316030715200
File: "build/bdist.linux-x86_64/egg/gevent/greenlet.py", line 390, in run
  result = self._run(*self.args, **self.kwargs)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/async.py", line 44, in handle
  self.handle_request(req, client, addr)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py", line 88, in handle_request
  super(GeventWorker, self).handle_request(*args)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/async.py", line 78, in handle_request
  respiter = self.wsgi(environ, resp.start_response)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/handlers/wsgi.py", line 273, in __call__
  response = self.get_response(request)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
  response = callback(request, *callback_args, **callback_kwargs)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/utils/decorators.py", line 93, in _wrapped_view
  response = view_func(request, *args, **kwargs)
File: "/data1/duitang/dist/app/test/7199/duitang/people/views_people.py", line 45, in peopleIndex
  people = AuthUser.objects.get(id=UserProfile.get_real_id(user_id))
File: "/data1/duitang/dist/app/test/7199/duitang/common/user.py", line 19, in get
  model = key and cache.get(key)
File: "/data1/duitang/dist/app/test/7199/duitang/perf/memcached.py", line 25, in get
  rp = super(MemcachedCache, self).get(key, deault, version)
File: "/duitang/dist/sys/python/lib/python2.7/site-packages/django/core/cache/backends/memcached.py", line 58, in get
  val = self._cache.get(key)
File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 180, in __getattribute__
  _init_locals(self)
File: "build/bdist.linux-x86_64/egg/gevent/local.py", line 167, in _init_locals
  cls.__init__(self, *args, **kw)
File: "/data1/duitang/dist/app/test/7199/duitang/memcache.py", line 168, in __init__
  print cf1.stacktraces()
File: "/data1/duitang/dist/app/test/7199/duitang/cf1.py", line 8, in stacktraces
  for filename, lineno, name, line in traceback.extract_stack(stack):
   

可以发现threading local最后委托给gevent/local.py实现,导致重复初始化Client。

 

而uwsgi只有初始化时才调用,后续刷新页面不再触发Client.init()

 

google了一下,老外的解决方案: https://groups.google.com/forum/?fromgroups=#!topic/gevent/ULKUPvbaQ7I

 

 

 

 

python memcache客户端比较

对于memcached,redis,hessian,mongo等各种客户端都要考虑一个问题,如何高效安全地和server进行通信?有几种策略:

1. 每次调用都建立和关闭连接
优点:实现简单,不存在线程安全问题。django的BaseMemcachedCache实现就是每次执行完都会close掉连接。
缺点:出现大量TIME_WAIT,花费在建立网络连接上的开销比较大,在有些情况下会导致性能瓶颈。

2.单一socket长线程
优点:不会出现大量TIME_WAIT,不会频繁开关socket,减轻服务器的压力。
缺点:远程调用一般是请求/问答式,为了保证线程安全必须加同步锁,如果远程服务器响应慢的话会导致大量线程竞争同一个socket资源,socket成为瓶颈。

3.单一socket长线程+连接池
为了解决方案1和方案2的问题,我们采用方案3:保持长连接不变,但采用连接池来保存长连接,这样系统内有一批socket供程序调用,避免大量线程竞争同一个socket。

那么python memcached client采用那种形式呢?

首先python memcached client有好几个,在memcached的官方网站提供了client列表(http://code.google.com/p/memcached/wiki/Clients):
a.libmemcached: 最受欢迎的memcached的C语言版本的客户端的,高性能,线程安全。很多语言都有对这个版本的wrapper。
b.pylibmc:是对libmemcached的wrapper。 http://sendapatch.se/projects/pylibmc/
c.python-memcached: 是100%纯python的版本,也是我们现在正在使用的版本。     http://www.tummy.com/Community/software/python-memcached/
d.Python libmemcached: 豆瓣提供的,对libmemcached的wrapper版本。http://code.google.com/p/python-libmemcached/
e.django cahce: django对memcached的支持,其实django只是提供了一个统一的cache接口,并没有具体实现,相关实现类在django.core.cache.backends.memcached.BaseMemcachedCache ,具体参考:https://docs.djangoproject.com/en/dev/topics/cache/ 不过django的问题很多。

python-memcached
python-memcached不会在每次get/set操作完成之后主动关闭连接,他是一种长连接,但他如果保证线程安全呢?一般我们是这样使用它:
import memcache
mc = memcache.Client(['127.0.0.1:11211'], debug=0)
答案在于他很取巧的让Client类继承threading.local,也就是Client里面的每一个属性都是跟当前线程绑定。这样每个线程都只会看到本地的Client,变相的实现了连接池。
这个池的大小取决于系统有多少个线程。为了验证这个说法,我们写一个程序来测试。
[code]
import memcache
import threading
mc = memcache.Client(['127.0.0.1:11211'], debug=1)

class TestThread(threading.Thread): 
    def __init__(self,redis_cache): 
        threading.Thread.__init__(self) 
        self.redis_cache = redis_cache 
 
    def run(self): 
        while True: 
            obj = self.redis_cache.get("uid:1002")
            if not obj:
                self.redis_cache.set("uid:1002","test",1)
            print obj

for i in xrange(8): 
    t = TestThread(mc) 
    t.start()   
[/code]

执行这段程序,然后通过netstat -an查看,刚好有一个8个ESTABLISHED,每个进程一个socket。
python的locale和java的ThreadLocl是一个意思,把值设置到线程中,每个线程只能看到自己线程保存的值,这里面的实现是这样的,由于Client继承于local,当访问Cient的任何一个方法或者属性都会进入到__getattribute__(),其内部实现是获取到当前线程并绑定 相关代码如下:

def _patch(self):
    key = object.__getattribute__(self, '_local__key')
    d = current_thread().__dict__.get(key)
    if d is None:
        d = {}
        current_thread().__dict__[key] = d
        object.__setattr__(self, '__dict__', d)

        # we have a new instance dict, so call out __init__ if we have
        # one
        cls = type(self)
        if cls.__init__ is not object.__init__:
            args, kw = object.__getattribute__(self, '_local__args')
            cls.__init__(self, *args, **kw)
    else:
        object.__setattr__(self, '__dict__', d)

 

cls是当前对象的class,object是基类class,我打印了清单:

new :  <class 'duitang.memcache.Client'> | <type 'object'>
--------------------
cached: <class 'gevent.local.local'> | <slot wrapper '__init__' of 'object' objects> | <slot wrapper '__init__' of 'object' objects>


yunpeng@yunpeng-duitang:~/test2$ netstat -an | grep 11211
tcp        0      0 0.0.0.0:11211           0.0.0.0:*               LISTEN    
tcp       31      0 127.0.0.1:46835         127.0.0.1:11211         ESTABLISHED
tcp        0      0 127.0.0.1:46832         127.0.0.1:11211         ESTABLISHED
tcp        0      0 127.0.0.1:46831         127.0.0.1:11211         ESTABLISHED
tcp        0      0 127.0.0.1:46829         127.0.0.1:11211         ESTABLISHED
tcp        0     14 127.0.0.1:46833         127.0.0.1:11211         ESTABLISHED
tcp       31      0 127.0.0.1:46828         127.0.0.1:11211         ESTABLISHED
tcp        0      0 127.0.0.1:46834         127.0.0.1:11211         ESTABLISHED
tcp       31      0 127.0.0.1:46830         127.0.0.1:11211         ESTABLISHED

当python进程退出socket会被自动关闭。
yunpeng@yunpeng-duitang:/duitang/dist/app/trunk/duitang$ netstat -an | grep 11211
tcp        0      0 0.0.0.0:11211           0.0.0.0:*               LISTEN    
tcp        0      0 127.0.0.1:44900         127.0.0.1:11211         TIME_WAIT 


但是很明显,采用threading locale这种方式来保证线程安全存在一些缺陷:

1.要求web server采用thread pool的方式,如果thread每次执行完之后就结束了,这会导致不停的创建销毁socket链接。
2.要求使用python thread locale的语义,但不幸的是python的thread语义很容易被改变,gevent就可以直接把python的一个thread转换成greenlet。


gevent的monkey提供的patch方法

patch_all() 调用所有的monkey patch
   
patch_os() os.fork()替换成gevent.fork


patch_select(aggressive=False) `select.select`替换成`gevent.select.select`

patch_socket(dns=True, aggressive=True)  标准的socket object 替换成 gevent's cooperative sockets.

patch_thread(threading=True, _threading_local=True) thread` module 替换成 gevent's thread

patch_time()  把标准的`time.sleep` 替换成`gevent.sleep`.


gunicorn如何使用gevent?
代码:
/duitang/dist/sys/python/lib/python2.7/site-packages/gunicorn-0.14.6-py2.7.egg/gunicorn/workers/ggevent.py

# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.

from __future__ import with_statement

import os
import sys
from datetime import datetime

# workaround on osx, disable kqueue
if sys.platform == "darwin":
    os.environ['EVENT_NOKQUEUE'] = "1"

try:
    import gevent
except ImportError:
    raise RuntimeError("You need gevent installed to use this worker.")
from gevent.pool import Pool
from gevent.server import StreamServer
from gevent import pywsgi

import gunicorn
from gunicorn.workers.async import AsyncWorker

VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)

BASE_WSGI_ENV = {
    'GATEWAY_INTERFACE': 'CGI/1.1',
    'SERVER_SOFTWARE': VERSION,
    'SCRIPT_NAME': '',
    'wsgi.version': (1, 0),
    'wsgi.multithread': False,
    'wsgi.multiprocess': False,
    'wsgi.run_once': False
}

class GeventWorker(AsyncWorker):

    server_class = None
    wsgi_handler = None

    @classmethod
    def setup(cls):
        from gevent import monkey
        monkey.noisy = False
        monkey.patch_all()

    def timeout_ctx(self):
        return gevent.Timeout(self.cfg.keepalive, False)

    def run(self):
        self.socket.setblocking(1)

        pool = Pool(self.worker_connections)
        if self.server_class is not None:
            server = self.server_class(
                self.socket, application=self.wsgi, spawn=pool, log=self.log,
                handler_class=self.wsgi_handler)
        else:
            server = StreamServer(self.socket, handle=self.handle, spawn=pool)

        server.start()
        pid = os.getpid()
        try:
            while self.alive:
                self.notify()

                if  pid == os.getpid() and self.ppid != os.getppid():
                    self.log.info("Parent changed, shutting down: %s", self)
                    break

                gevent.sleep(1.0)

        except KeyboardInterrupt:
            pass

        try:
            # Try to stop connections until timeout
            self.notify()
            server.stop(timeout=self.cfg.graceful_timeout)
        except:
            pass

    def handle_request(self, *args):
        try:
            super(GeventWorker, self).handle_request(*args)
        except gevent.GreenletExit:
            pass

        if gevent.version_info[0] == 0:

            def init_process(self):
                #gevent 0.13 and older doesn't reinitialize dns for us after forking
                #here's the workaround
                import gevent.core
                gevent.core.dns_shutdown(fail_requests=1)
                gevent.core.dns_init()
                super(GeventWorker, self).init_process()


class GeventResponse(object):

    status = None
    headers = None
    response_length = None


    def __init__(self, status, headers, clength):
        self.status = status
        self.headers = headers
        self.response_length = clength

class PyWSGIHandler(pywsgi.WSGIHandler):

    def log_request(self):
        start = datetime.fromtimestamp(self.time_start)
        finish = datetime.fromtimestamp(self.time_finish)
        response_time = finish - start
        resp = GeventResponse(self.status, self.response_headers,
                self.response_length)
        req_headers = [h.split(":", 1) for h in self.headers.headers]
        self.server.log.access(resp, req_headers, self.environ, response_time)

    def get_environ(self):
        env = super(PyWSGIHandler, self).get_environ()
        env['gunicorn.sock'] = self.socket
        env['RAW_URI'] = self.path
        return env

class PyWSGIServer(pywsgi.WSGIServer):
    base_env = BASE_WSGI_ENV

class GeventPyWSGIWorker(GeventWorker):
    "The Gevent StreamServer based workers."
    server_class = PyWSGIServer
    wsgi_handler = PyWSGIHandler
 

测试:ab -n100 http://7199.t.duitang.com:7199/cache/

[admin@server2 duitang]$ netstat -an| grep 11211 |wc -l
306


总结:使用了gevent之后thread local有太多不可控.
gevent代码:/duitang/dist/sys/python/lib/python2.7/site-packages/gevent
gevent+django:http://www.slideshare.net/mahendram/scaling-django-with-gevent

分享到:
评论

相关推荐

    python-memcached python-memcached

    4. **原子操作**:Python-memcached支持原子操作,如增加(incr)、减少(decr)整数值,这些操作在多线程环境下是安全的。 5. **前缀支持**:通过设置键的前缀,可以在同一个Memcached实例中隔离不同应用的数据,...

    PyPI 官网下载 | tencentcloud-sdk-python-memcached-3.0.315.tar.gz

    《PyPI上的腾讯云Python Memcached SDK:tencentcloud-sdk-python-memcached-3.0.315详解》 PyPI(Python Package Index)是Python开发者常用的软件包仓库,提供了一个广泛且丰富的Python库集合,供全球的开发人员...

    python3-memcached-master

    10. 性能优化:Python3-memcached-master利用多线程,允许并行执行多个操作,以提高性能。此外,它使用了有效的序列化和反序列化策略,如使用pickle模块,来转换Python对象。 总之,Python3-memcached-master是...

    python-binary-memcached:一个纯Python模块(线程安全),可通过其二进制协议访问具有SASL身份验证的memcached

    一个纯python模块(线程安全),可以通过具有SASL auth支持的二进制文件访问memcached。 该模块的主要目的是能够与使用二进制协议的memcached通信并支持身份验证,因此它可以与Heroku一起使用。 有关在阅读文档的...

    Python-强大的memcached客户端拥有shdict缓存层和许多其他功能

    shdict可能提供线程安全、快速的缓存操作,以优化对memcached的访问。 3. **Python编程**:熟悉Python语言基础,包括变量、数据类型、控制流、函数和类,这些都是编写客户端库的基础。 4. **异步编程**:由于...

    memcached-1.5.4

    - `memcached`的核心设计理念是简单而高效,提供了一个键值对存储的接口,支持多线程服务,使用libevent库处理网络事件。 - `1.5.4`版本是一个稳定版,修复了前一版本中的已知问题,提高了系统的稳定性和性能。 2...

    memcached客户端

    1. **Python** - `python-memcached` 和 `pylibmc`:Python社区提供了两个常用的Memcached客户端库。`python-memcached` 是一个轻量级的客户端,简单易用;而 `pylibmc` 增强了错误处理和更好的内存管理,支持多线程...

    Python-一个简单小巧可定制化轻量级的基于内存的Python缓存组件

    4. **并发与线程安全**:由于Python的全局解释器锁(GIL),多线程环境下的缓存设计需要考虑线程安全,确保在并发访问时数据的一致性和完整性。 5. **序列化与反序列化**:如果缓存的数据类型不是基本类型,可能...

    缓存应用--Memcached分布式缓存简介(二)

    - **封装示例**:以Python为例,使用python-memcached库可以非常方便地与Memcached服务器交互,包括设置键值对、获取值等基本操作。 以上内容详细介绍了Memcached的状态查看方法、存储机制以及内存资源的有效利用...

    memcached-1.2.1-win32.rar

    - 数据分布是基于一致性哈希,使得数据能够均匀分布在多个memcached服务器上,避免了热点问题。 4. **性能优势** - 使用内存存储,读写速度快,且支持多线程,能处理大量并发请求。 - 轻量级,对服务器资源需求...

    Python-csssdbpycssdbpy用Cython编写的SSDB客户端

    4. **并发处理**:通过多线程或者异步IO(如Python的asyncio库)实现并行操作,提升批量操作时的吞吐量。 **使用CSSDBPy** 在Python项目中使用`csssdbpy`,首先需要安装这个库,通常可以通过`pip`进行: ```bash ...

    memcached-1.5.12.tar.gz

    - memcached 使用 TCP 或 UDP 协议在客户端和服务器之间通信,支持多线程处理,具有低延迟和高吞吐量的特性。 - 由于其内存存储的特性,它不适合存储大量的持久化数据,但非常适合缓存短期、频繁访问的数据。 2. ...

    多线程精品资源--多线程与高并发.zip

    然而,多线程也带来了挑战,如线程安全问题。当多个线程共享同一资源时,可能会出现竞态条件、死锁或活锁等问题。为此,我们需要使用同步机制,如Java的`synchronized`关键字、互斥锁、信号量等,来控制对共享资源的...

    memcached资源demo(已调试通过)

    3. **并发处理**:Memcached是线程安全的,可以处理多线程并发请求。 4. **性能优化**:合理设置缓存大小、减少网络延迟、优化数据结构和编码方式等都可以提升性能。 通过本示例,你可以了解到如何在实际项目中集成...

    memcached连接demo

    使用`python-memcached`库,首先需要安装:`pip install python-memcached`。然后,你可以创建一个客户端实例并执行基本操作: ```python import memcache mc = memcache.Client(['127.0.0.1:11211'], debug=0) ...

    Python示例-从基础到高手PDF

    第 21 章 服务端 socket 开发之多线程和 gevent 框架并发测试[python 语言] 第 22 章 利用 pypy 提高 python 脚本的执行速度及测试性能 第 23 章 python 实现 select 和 epoll 模型 socket 网络编程 第 24 章 对 ...

    memcached-3.0.4.tgz

    Memcached 3.0.4是这个开源项目的最新迭代,它在性能、稳定性和安全性方面进行了优化。这个版本的发布,旨在进一步提升系统响应速度,增强系统健壮性,确保数据处理的高效与可靠。对于依赖于快速数据访问的大型Web...

    memcached-1.4.34.tar.gz

    7. **客户端连接**:应用可以通过各种语言(如 PHP、Python、Java 等)的客户端库与 memcached 通信,进行数据的存取。 memcached 支持的特性包括: - **TCP/IP 协议**:作为网络服务,它通过 TCP 连接与客户端通信...

    memcached-1.4.17

    5. **高性能**: Memcached设计简洁,无锁操作和多线程模型使其能处理大量并发请求。 6. **多语言支持**:提供了多种编程语言的客户端库,如PHP、Python、Java、Ruby、C++等,方便开发者集成到各种应用中。 **...

Global site tag (gtag.js) - Google Analytics