`

大量小文件的实时同步方案

阅读更多

传统的文件同步方案有rsync(单向) 和 unison(双向)等,它们需要扫描所有文件后进行比对,差量传输。如果文件数量达到了百万甚至千万量级,扫描所有文件将非常耗时。而且正在发生变化的往往是其中很少的一部分,这是非常低效的方式。

之前看了Amazon的Dynamo的设计文档,它们每个节点的数据是通过Hash Tree来实现同步,既有通过日志来同步的软实时特点(msyql, bdb等),也可以保证最终数据的一致性(rsync, unison等)。Hash Tree的大体思路是将所有数据存储成树状结构,每个节点的Hash是其所有子节点的Hash的Hash,叶子节点的Hash是其内容的Hash。这样一旦某个节点发生变化,其Hash的变化会迅速传播到根节点。需要同步的系统只需要不断查询跟节点的hash,一旦有变化,顺着树状结构就能够在logN级别的时间找到发生变化的内容,马上同步。

文件系统天然的是树状结构,尽管不是平衡的数。如果文件的修改时间是可靠的,可以表征文件的变化,那就可以用它作为文件的Hash值。另一方面,文件的修改通常是按顺序执行的,后修改的文件比早修改的文件具有更大的修改时间,这样就可以把一个目录内的最大修改时间作为它的修改时间,以实现Hash Tree。这样,一旦某个文件被修改,修改时间的信息就会迅速传播到根目录。

一般的文件系统都不是这样做的,目录的修改时间表示的是目录结构最后发生变化的时间,不包括子目录,否则会不堪重负。因为我们需要自己实现这个功能,利用Linux 2.6内核的新特性inotify获得某个目录内文件发生变化的信息,并把其修改时间传播到它的上级目录(以及再上级目录)。Python 有 pyinotify,watch.py的代码如下:

 

  1. #!/usr/bin/python   
  2.   
  3. from pyinotify import *   
  4. import osos.path   
  5.   
  6. flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW   
  7. dirs = {}   
  8. base = '/log/lighttpd/cache/images/icon/u241'   
  9. base = 'tmp'   
  10.   
  11. class UpdateParentDir(ProcessEvent):   
  12.     def process_IN_CLOSE_WRITE(self, event):   
  13.         print 'modify', event.pathname   
  14.         mtime = os.path.getmtime(event.pathname)   
  15.         p = event.path   
  16.         while p.startswith(base):   
  17.             m = os.path.getmtime(p)   
  18.             if m < mtime:   
  19.                 print 'update', p   
  20.                 os.utime(p, (mtime,mtime))   
  21.             elif m > mtime:   
  22.                 mtime = m   
  23.             p = os.path.dirname(p)   
  24.        
  25.     process_IN_MODIFY = process_IN_CLOSE_WRITE   
  26.   
  27.     def process_IN_Q_OVERFLOW(self, event):   
  28.         print 'over flow'   
  29.         max_queued_events.value *= 2   
  30.   
  31.     def process_default(self, event):   
  32.         pass  
  33.   
  34. wm = WatchManager()   
  35. notifier = Notifier(wm, UpdateParentDir())   
  36. dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))   
  37.   
  38. notifier.loop()   

<textarea class="Python" style="display: none;" cols="80" rows="40" name="code" readonly="65535">#!/usr/bin/python from pyinotify import * import os, os.path flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW dirs = {} base = '/log/lighttpd/cache/images/icon/u241' base = 'tmp' class UpdateParentDir(ProcessEvent): def process_IN_CLOSE_WRITE(self, event): print 'modify', event.pathname mtime = os.path.getmtime(event.pathname) p = event.path while p.startswith(base): m = os.path.getmtime(p) if m < mtime: print 'update', p os.utime(p, (mtime,mtime)) elif m > mtime: mtime = m p = os.path.dirname(p) process_IN_MODIFY = process_IN_CLOSE_WRITE def process_IN_Q_OVERFLOW(self, event): print 'over flow' max_queued_events.value *= 2 def process_default(self, event): pass wm = WatchManager() notifier = Notifier(wm, UpdateParentDir()) dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True)) notifier.loop() </textarea>

 

在已经有Hash Tree的时候,同步就比较简单了,不停地获取根目录的修改时间并顺着目录结构往下找即可。需要注意的是,在更新完文件后,需要设置修改时间为原文件的修改时间,目录也是,保证Hash Tree的一致性,否则没法同步。mirror.py的代码如下

  1. #!/usr/bin/python   
  2.   
  3. import sys,time,re,urllib  
  4. import os,os.path   
  5. from os.path import exists, isdir, getmtime   
  6.   
  7. src = sys.argv[1]   
  8. dst = sys.argv[2]   
  9.   
  10. def local_mirror(src, dst):   
  11.     if exists(dst) and mtime == getmtime(dst):   
  12.         return  
  13.     if not isdir(src):   
  14.         print 'update:', dst   
  15.         open(dst,'wb').write(open(src).read())   
  16.     else:   
  17.         if not exists(dst):   
  18.             os.makedirs(dst)   
  19.         for filename in os.listdir(src):   
  20.             local_mirror(os.path.join(src,filename), os.path.join(dst,filename))   
  21.     os.utime(dst, (mtime,mtime))   
  22.   
  23. def get_info(path):   
  24.     f = urllib.urlopen(path)   
  25.     mtime = f.headers.get('Last-Modified')   
  26.     if mtime:   
  27.         mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z'))   
  28.     content = f.read()   
  29.     f.close()   
  30.     return int(mtime), content   
  31.   
  32. p = re.compile(r'([\d.]+?) +([\w/]+)')   
  33.   
  34. def remote_mirror(src, dst):   
  35.     mtime, content = get_info(src)   
  36.     if exists(dst) and mtime == int(getmtime(dst)):   
  37.         return  
  38.     print 'update:', dst, src   
  39.     if not src.endswith('/'):   
  40.         open(dst,'wb').write(content)   
  41.     else:   
  42.         if not exists(dst):   
  43.             os.makedirs(dst)   
  44.         for mt,filename in p.findall(content):   
  45.             mt = int(float(mt))   
  46.             lpath = dst+filename   
  47.             if not exists(lpath) or int(getmtime(lpath)) != mt:   
  48.                 remote_mirror(src+filename, lpath)   
  49.     os.utime(dst, (mtime,mtime))   
  50.   
  51. if src.startswith('http://'):   
  52.     mirror = remote_mirror   
  53. else:   
  54.     mirror = local_mirror   
  55.   
  56. while True:   
  57.     mirror(src, dst)   
  58.     time.sleep(1)   

<textarea class="Python" style="display: none;" cols="80" rows="60" name="code" readonly="65535">#!/usr/bin/python import sys,time,re,urllib import os,os.path from os.path import exists, isdir, getmtime src = sys.argv[1] dst = sys.argv[2] def local_mirror(src, dst): if exists(dst) and mtime == getmtime(dst): return if not isdir(src): print 'update:', dst open(dst,'wb').write(open(src).read()) else: if not exists(dst): os.makedirs(dst) for filename in os.listdir(src): local_mirror(os.path.join(src,filename), os.path.join(dst,filename)) os.utime(dst, (mtime,mtime)) def get_info(path): f = urllib.urlopen(path) mtime = f.headers.get('Last-Modified') if mtime: mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z')) content = f.read() f.close() return int(mtime), content p = re.compile(r'([\d.]+?) +([\w/]+)') def remote_mirror(src, dst): mtime, content = get_info(src) if exists(dst) and mtime == int(getmtime(dst)): return print 'update:', dst, src if not src.endswith('/'): open(dst,'wb').write(content) else: if not exists(dst): os.makedirs(dst) for mt,filename in p.findall(content): mt = int(float(mt)) lpath = dst+filename if not exists(lpath) or int(getmtime(lpath)) != mt: remote_mirror(src+filename, lpath) os.utime(dst, (mtime,mtime)) if src.startswith('http://'): mirror = remote_mirror else: mirror = local_mirror while True: mirror(src, dst) time.sleep(1) </textarea>

 

如果源文件不在同一台机器上,可以通过NFS等共享过来。或者可以通过支持列目录的HTTP服务器来访问远程目录,mirror.py 已经支持这种访问方式。server.py 是用webpy做的一个简单的只是列目录的文件服务器。由于瓶颈在IO上,它的性能不是关键。server.py的代码如下:

  1. #!/usr/bin/python   
  2.   
  3. import os,os.path   
  4. import web   
  5. import time  
  6.   
  7. root = 'tmp'   
  8.   
  9. HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z'   
  10.   
  11. class FileServer:   
  12.     def GET(self, path):   
  13.         path = root + path   
  14.         if not os.path.exists(path):   
  15.             return 404   
  16.         mtime = time.localtime(os.path.getmtime(path))   
  17.         web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime))   
  18.         if os.path.isdir(path):   
  19.             for file in os.listdir(path):   
  20.                 if file.startswith('.'): continue  
  21.                 p = os.path.join(path,file)   
  22.                 m = os.path.getmtime(p)   
  23.                 if os.path.isdir(p):   
  24.                     file += '/'   
  25.                 print m, file  
  26.         else:   
  27.             print open(path,'rb').read()   
  28.   
  29. urls = (   
  30.    "(/.*)", "FileServer",   
  31. )   
  32.   
  33. if __name__ == '__main__':   
  34.     web.run(urls, globals())   

<textarea class="Python" style="display: none;" cols="80" rows="35" name="code" readonly="65535">#!/usr/bin/python import os,os.path import web import time root = 'tmp' HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z' class FileServer: def GET(self, path): path = root + path if not os.path.exists(path): return 404 mtime = time.localtime(os.path.getmtime(path)) web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime)) if os.path.isdir(path): for file in os.listdir(path): if file.startswith('.'): continue p = os.path.join(path,file) m = os.path.getmtime(p) if os.path.isdir(p): file += '/' print m, file else: print open(path,'rb').read() urls = ( "(/.*)", "FileServer", ) if __name__ == '__main__': web.run(urls, globals()) </textarea>

 

为了获得更好性能,以达到更好的实时性,Hash Tree最好是平衡的,比如BTree。如果一个文件发生变化,同步它需要进行的IO操作为N*M,其中N为数的层数,M为每层的文件数目。现在我们N为2,M最大为10000,适当减少它可以获得更好的性能,比如N为4,M为100。在以后创建目录结构时,最好能够考虑这方面的因素。

之前hongqn推荐过一个利用inotify的文件同步方案,同步方式类似于mysql和bdb等,由于过于复杂导致不可靠而没有采用。上面这个方案只用了一百多行Python代码就基本解决问题了,是不是很帅?:-)

分享到:
评论

相关推荐

    linux下Rsync+sersync实现文件数据实时同步

    在Linux环境中,文件数据的实时同步是系统管理中的重要...两者结合使用,可以构建出既实时又稳定的数据同步方案,满足各种复杂的企业级需求。在实际操作中,应根据具体的业务场景和需求,灵活选择和配置这两个工具。

    hashtree的小文件同步方案

    综上所述,"hashtree的小文件同步方案"利用哈希树数据结构和文件监控技术,实现了一个高效、实时的文件同步系统。通过Python脚本`mirror.py`、`watch.py`和`server.py`,我们可以了解到这一方案的具体实现细节,包括...

    unison实现双系统文件实时同步

    2. 考虑网络状况,避免在带宽有限的环境中进行大量文件同步。 3. 对于敏感数据,使用加密的SSH连接以保护数据安全。 4. 定期检查同步日志,确保同步过程无误。 总之,通过Unison、inotify-tools、SSH和其他相关工具...

    C#实现的文件同步软件

    本项目"用C#实现的文件同步软件"聚焦于这个需求,旨在提供一种高效、可靠的文件传输解决方案。C#作为.NET框架的主要编程语言,具有丰富的类库和强大的跨平台能力,使得它成为开发此类应用的理想选择。 首先,我们要...

    跨Windows和Linux操作系统的开源文件同步工具

    综上所述,这个开源文件同步工具提供了一种跨平台、高效率的解决方案,适用于网站备份、多服务器环境的数据一致性维护,以及在Windows和Linux系统间的数据迁移。其内置的rsync特性,结合开源社区的支持,使其在性能...

    基于 Electron & Vue.js 的文件同步客户端

    例如,对敏感的云服务凭据进行加密存储,使用安全的传输协议(如 HTTPS)进行网络通信,以及合理管理内存和磁盘资源,避免因大量文件同步导致的性能瓶颈。 总的来说,这个基于 Electron & Vue.js 的文件同步客户端...

    rsync+inotify实时同步

    rsync+inotify 的实时同步方案解决了传统 rsync 的效率和实时性问题,特别是在处理大量文件时。通过 inotify 实时监控文件系统事件,一旦检测到变化,rsync 立即进行同步,确保了数据的一致性。这种方法特别适合对...

    个人文件同步备份系统

    个人文件同步备份系统的应用广泛,无论是日常办公、学习还是创作,都可以提供高效的数据管理解决方案。尤其对于那些依赖大量文件的专业人士,如设计师、程序员、作家等,这样的系统更是不可或缺。 总的来说,个人...

    FTP文件同步服务

    FTP文件同步服务是一种基于FTP(File Transfer Protocol)协议的解决方案,用于在本地计算机与远程服务器之间自动保持文件和文件夹的同步。FTP是互联网上广泛使用的标准协议之一,主要用于在不同系统之间交换文件。...

    文件系统同步器

    这种方法虽然简单,但在处理大量文件时可能会效率低下,因为每次同步都需要检查所有文件。 第二个解决方案,"FileSystemSyncher2",可能在前一个基础上进行了优化。它可能引入了更高级的同步算法,例如哈希检查,...

    用SWT写的文件同步程序

    通过上述分析,我们可以看到这个基于SWT的文件同步程序涵盖了文件系统操作、同步算法、用户界面设计等多个方面的技术知识,对于开发人员来说,这些都是构建高效、可靠的文件同步解决方案所需掌握的关键技能。

    自同步局域网免费文件同步工具

    【自同步局域网免费文件同步工具】是一种实用的解决方案,尤其对于那些在同一个局域网环境下工作的用户。它能够实现在多台计算机之间自动、实时地同步指定文件夹的内容,确保所有设备上的文件数量和内容保持一致。...

    文件同步下载程序示例.rar_文件同步

    在IT领域,文件同步下载程序是一个常见的需求,特别是在分布式系统、多设备协同工作或云存储服务中。...通过这个示例,开发者可以了解到文件同步的基本原理和技术实现,为构建自己的同步解决方案打下基础。

    eclipse,Myeclipse修改js文件同步插件

    "eclipse,Myeclipse修改js文件同步插件"就是为了解决这个问题而存在的,它允许开发者在不重启项目的情况下即时查看和测试JavaScript文件的修改效果。这个插件适用于Eclipse和MyEclipse集成开发环境(IDE),极大地...

    一款轻量级的开源分布式文件系统,功能包括:文件存储、文件同步、文件上传、文件下载等,解决了文件大容量存储和高性能访问问题

    2. **文件同步**:文件的更新和修改会自动同步到集群中的其他节点,确保所有副本的一致性。这种实时同步机制使得在多台服务器之间保持文件的最新状态变得简单。 3. **文件上传**:FastDFS提供了一套简单易用的API...

    数据库同步热备解决方案(某区政府)

    1. **日志传输**:这是通过复制数据库的日志文件来实现同步的方法。主数据库的操作会被记录在日志中,这些日志会被发送到备份数据库,然后应用到备份数据库上,以保持同步。 2. **事务复制**:这种方法关注的是单个...

    多线程的批量线程同步解决方案

    在这种情况下,可能会采用批处理技术,将大量小任务合并为少数大任务,减少线程切换开销。以下是一些策略: 1. **工作窃取队列(Work Stealing Queue)**:线程可以从公共队列中获取任务,也可以从其他线程的工作队列...

    FTP自动文件同步软件FileGee.docx

    文件同步对于企业而言至关重要,尤其是对于那些依赖大量数据运行的企业,如数据库管理,能够实时保护文件免受意外丢失或损坏。 文件同步的基本概念是指将两个或多个文件夹内的内容保持一致。这可以通过在一台计算机...

    NAS海量文件存储数据异地容灾方案.docx

    另一种实时同步方案则通过在所有前端业务服务器上安装监控客户端,结合一台专门的“英方NAS监控服务器”。当任何一台服务器修改数据时,监控服务器会实时读取并发送到容灾端,实现秒级同步。这种方式旁路监听,不对...

Global site tag (gtag.js) - Google Analytics