浏览 4798 次
锁定老帖子 主题:MongoDB 内存解析 python
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-10-15
在使用mongodb开发工作工,mongodb内存使用非常之大,64G的内存使用了99%的内存 通过整理和查询,了解了mongodb的内存工作原理,特此跟大家分析 mongodb 使用MMAP 将文件映射到内存中 ![]() 我自己写了一个mongodb python在内存实现方面的代码,代码如下仅供参考: 1.启动mongodb 服务 python _mongo_.py 2.链接到mongo服务 telnet 0.0.0.0 8900 3.开始调试 ps x|grep python #找到mongo对于的进程号 #接下来使用vmmap [mac] pmap[linux]查看内存使用情况 vmmap -resident 2546|grep wiyun mapped file 0000000101000000-000000010d801000 [200.0M 8K] rw-/rwx SM=PRV /Users/liuzheng/py.work.dir/wgit/wiyun0.db mapped file 000000010d801000-000000011a002000 [200.0M 8K] rw-/rwx SM=PRV /Users/liuzheng/py.work.dir/wgit/wiyun1.db #这里大家看到文件初试大小为200M,内存使用了8k #接下来我们操作数据库 telnet 0.0.0.0 8900 set a=saddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsadd vmmap -resident 2546|grep wiyun mapped file 0000000101000000-000000010d801000 [200.0M 64.1M] rw-/rwx SM=PRV /Users/liuzheng/py.work.dir/wgit/wiyun0.db mapped file 000000010d801000-000000011a002000 [200.0M 8K] rw-/rwx SM=PRV /Users/liuzheng/py.work.dir/wgit/wiyun1.db #这里RSS内存使用变为了64M #coding=utf-8 import os, sys import errno import functools import socket import time import os import traceback import memcache from tornado import ioloop, iostream from threading import Thread from Queue import Queue import logging import mmap import random streamDict = {} #用于保存stream streamRequests = {} #Stream 请求数据队列 io_loop = None #侦听网络服务对象 mmap_obj = None def printLog(log): tstr = time.strftime("%Y-%m-%d %X", time.localtime()) print tstr, log def stream_add(stream, count): streamkey = str(stream) streamDict[streamkey] = (stream, 0) streamRequests[streamkey] = Queue() def stream_remove(streamkey, stream): print "stream closed", stream.__dict__ stream.close() if streamDict.has_key(streamkey): del streamDict[streamkey] if streamRequests.has_key(streamkey): del streamRequests[streamkey] class MMapping(object): data_map = [] _hash = {} def __init__(self, data_path, db): self.read_ns_file(data_path,db) def read_ns_file(self, data_path, db): f = open(os.path.join(data_path, db + ".ns"), "w+b") for i in xrange(2): sz = (1024 * 1024 * 200 ) db_name = "%s%s.db" % (db, i) f = open(os.path.join(data_path,db_name), "w+b") f.write(db_name) f.seek(sz) if f.readline() != "\0": f.write('\0') f.flush() MMapping.data_map.append(mmap.mmap(f.fileno(), 0)) def parse_data(self, data): command, values = data.split(" ") if command == "set": key, value = values.split("=") return command, key, value else: key = values.strip() self._hash.get(key, "no match\r\n") return command, key, None def flush_data(self, data): self.data_map[0].write(data) def get_data(self, data): command, key, value = self.parse_data(data) print "command,key,value", command, key, value if command == "set": d = "%s=%s" % (key, value) self.flush_data(d) self._hash[key] = value return "True" else: return self._hash.get(key, "on match") class ClockResponse(Thread): def __init__(self): Thread.__init__(self) self.flag = True self.count = 0 def run(self): while self.flag: #打印 LOG,N次/打印 ct = self.count % 10000 if ct == 0: print 'now connections:%s' % (len(streamDict)) self.count = ct + 1 us = 0 for streamkey, (stream, num) in streamDict.items(): queue = streamRequests[streamkey] try: data = queue.get(False) except Exception, e: continue if data: print "get data %s", data else: continue try: d = mmap_obj.get_data(data) stream.write(d + "\r\n") except Exception, e: logging.error(e) stream.write(str(e) + "\r\n") time.sleep(0.01) def stop(self): self.flag = False class SocketRequest(object): delimiter = "\r\n" def __init__(self, stream, address): self.stream = stream self.streamkey = str(stream) self.address = address self.stream.read_until(SocketRequest.delimiter, self.on_body) def on_body(self, data): queue = streamRequests[self.streamkey] size = queue.qsize() if size <= 10000: print "put in queue %s " % data queue.put(data) try: self.stream.read_until(SocketRequest.delimiter, self.on_body) except Exception, e: print "in read", e stream_remove(self.streamkey, self.stream) def connection_ready(sock, fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return print "connection on:", address #non-block connection.setblocking(0) stream = iostream.IOStream(connection, io_loop) stream_add(stream, 0) skey = str(stream) scallback = functools.partial(stream_remove, skey, stream) stream.set_close_callback(scallback) SocketRequest(stream, address) def run_thread_worker(n=1): threads = [] for t in xrange(n): clock = ClockResponse() threads.append(clock) for t in threads: t.start() if __name__ == '__main__': server_port = 8900 #init data data_path = os.path.abspath(os.path.join(os.path.dirname(__file__))) mmap_obj = MMapping(data_path, "wiyun") run_thread_worker(1) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(("", server_port)) sock.listen(9999) io_loop = ioloop.IOLoop.instance() callback = functools.partial(connection_ready, sock) io_loop.add_handler(sock.fileno(), callback, io_loop.READ) try: io_loop.start() except KeyboardInterrupt: io_loop.stop() clock.stop() print "exited cleanly" sys.exit(1) except Exception, e: print e 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-10-20
不错啊!很详细!
|
|
返回顶楼 | |
发表时间:2011-10-28
这孩子人不错啊,例子详细!
|
|
返回顶楼 | |