`
jaesonchen
  • 浏览: 309910 次
  • 来自: ...
社区版块
存档分类
最新评论

从ZooKeeper源代码看如何实现分布式系统(二)数据的高可用存储

 
阅读更多

    这篇先从数据的高可用存储说起。ZooKeeper提供了分布式的目录服务,它存储的数据相比一个分布式存储系统来说很少,它主要是用来做分布式协同操作的。但是麻雀虽小,五脏俱全,ZooKeeper也必须要提供数据的高可用存储,对数据进行备份和恢复,以防出现服务器宕机导致数据丢失的情况。

    高可用的数据存储有一个比较通用的解决方案,就是数据文件 + 日志文件的方式。比如传统数据库中的数据文件 + undo/redo日志就可以来进行数据备份和恢复,在日志文件中加入检查点checkpoint,可以更加快速地进行数据的恢复。所以对于高可用的数据存储来说,我们要考察3个方面:

  • 数据文件
  • 日志文件
  • 检查点

数据文件

ZooKeeper的数据文件采用快照文件的方式来记录和持久化运行时的数据。顶层接口是SnapShot,提供了对运行时的数据DataTree和session的序列化和反序列化操作。DataTree保存了运行时的数据。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public interface SnapShot {  
  2.       
  3.     long deserialize(DataTree dt, Map<Long, Integer> sessions)   
  4.         throws IOException;  
  5.    
  6.     void serialize(DataTree dt, Map<Long, Integer> sessions,   
  7.             File name)   
  8.         throws IOException;  
  9.       
  10.     File findMostRecentSnapshot() throws IOException;  
  11.       
  12.     void close() throws IOException;  
  13. }   


SnapShot的默认实现类是FileSnapShot,提供了把DataTree和Session持久化到文件的能力。来看一下它的序列化实现

 

1. 创建一个具备校验和的文件输出流

2. 对象的序列化采用apache jute框架,创建一个jute的OutputArchive的实现。下面给出了OutputArchive接口的定义,可以看到它和Thrift的TProtocol的定义基本一致,提供了一系列的write类型和read类型接口,是jute 序列化的顶层接口

3. OutputArchive的默认实现是BinaryOutputArchive,和Thrift的TBinaryProtocol实现基本一致,提供了二进制的序列化协议,内部采用DataOutputStream,把不同的数据类型写到Byte数组中

4. 快照文件的文件头对象FileHeader,包含一个魔数ZKSN, 版本号和dbId。 FileHeader实现了jute的Record接口,提供了serialize和deserialize方法实现

5. 快照文件体使用SerializeUtils这个辅助类来实现,先序列化Session,序列化Session时,先写一个Long类型的SessionId,再写一个int类型的timeout。再序列化DataTree,它也实现了Jute的Record类,实现了序列化自己的serialize方法

6. DataTree的serialize方法,先序列化ACL信息,再序列化DataTree中的DataNode,采用中序遍历的方式递归遍历DataTree的所有节点。最后写入"/"表示文件结尾

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // FileSnapshot  
  2. public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)  
  3.             throws IOException {  
  4.         if (!close) {  
  5.             OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));  
  6.             CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());  
  7.             //CheckedOutputStream cout = new CheckedOutputStream()  
  8.             OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);  
  9.             FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);  
  10.             serialize(dt,sessions,oa, header);  
  11.             long val = crcOut.getChecksum().getValue();  
  12.             oa.writeLong(val, "val");  
  13.             oa.writeString("/""path");  
  14.             sessOS.flush();  
  15.             crcOut.close();  
  16.             sessOS.close();  
  17.         }  
  18.     }  
  19.   
  20.   
  21.  protected void serialize(DataTree dt,Map<Long, Integer> sessions,  
  22.             OutputArchive oa, FileHeader header) throws IOException {  
  23.         // this is really a programmatic error and not something that can  
  24.         // happen at runtime  
  25.         if(header==null)  
  26.             throw new IllegalStateException(  
  27.                     "Snapshot's not open for writing: uninitialized header");  
  28.         header.serialize(oa, "fileheader");  
  29.         SerializeUtils.serializeSnapshot(dt,oa,sessions);  
  30.     }   
  31.   
  32. public class FileHeader implements Record {  
  33.   private int magic;  
  34.   private int version;  
  35.   private long dbid;  
  36.   public FileHeader() {  
  37.   }  
  38.  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {  
  39.     a_.startRecord(this,tag);  
  40.     a_.writeInt(magic,"magic");  
  41.     a_.writeInt(version,"version");  
  42.     a_.writeLong(dbid,"dbid");  
  43.     a_.endRecord(this,tag);  
  44.   }  
  45.   public void deserialize(InputArchive a_, String tag) throws java.io.IOException {  
  46.     a_.startRecord(tag);  
  47.     magic=a_.readInt("magic");  
  48.     version=a_.readInt("version");  
  49.     dbid=a_.readLong("dbid");  
  50.     a_.endRecord(tag);  
  51. }  
  52.   
  53. public interface Record {  
  54.     public void serialize(OutputArchive archive, String tag)  
  55.         throws IOException;  
  56.     public void deserialize(InputArchive archive, String tag)  
  57.         throws IOException;  
  58. }  
  59. public interface OutputArchive {  
  60.     public void writeByte(byte b, String tag) throws IOException;  
  61.     public void writeBool(boolean b, String tag) throws IOException;  
  62.     public void writeInt(int i, String tag) throws IOException;  
  63.     public void writeLong(long l, String tag) throws IOException;  
  64.     public void writeFloat(float f, String tag) throws IOException;  
  65.     public void writeDouble(double d, String tag) throws IOException;  
  66.     public void writeString(String s, String tag) throws IOException;  
  67.     public void writeBuffer(byte buf[], String tag)  
  68.         throws IOException;  
  69.     public void writeRecord(Record r, String tag) throws IOException;  
  70.     public void startRecord(Record r, String tag) throws IOException;  
  71.     public void endRecord(Record r, String tag) throws IOException;  
  72.     public void startVector(List v, String tag) throws IOException;  
  73.     public void endVector(List v, String tag) throws IOException;  
  74.     public void startMap(TreeMap v, String tag) throws IOException;  
  75.     public void endMap(TreeMap v, String tag) throws IOException;  
  76.   
  77. }  
  78.   
  79. public class BinaryOutputArchive implements OutputArchive {  
  80.     private ByteBuffer bb = ByteBuffer.allocate(1024);  
  81.   
  82.     private DataOutput out;  
  83.       
  84.     public static BinaryOutputArchive getArchive(OutputStream strm) {  
  85.         return new BinaryOutputArchive(new DataOutputStream(strm));  
  86.     }  
  87.       
  88.     /** Creates a new instance of BinaryOutputArchive */  
  89.     public BinaryOutputArchive(DataOutput out) {  
  90.         this.out = out;  
  91.     }  
  92.       
  93.     public void writeByte(byte b, String tag) throws IOException {  
  94.         out.writeByte(b);  
  95.     }  
  96.       
  97.     public void writeBool(boolean b, String tag) throws IOException {  
  98.         out.writeBoolean(b);  
  99.     }  
  100.       
  101.     public void writeInt(int i, String tag) throws IOException {  
  102.         out.writeInt(i);  
  103.     }  
  104.       
  105.     public void writeLong(long l, String tag) throws IOException {  
  106.         out.writeLong(l);  
  107.     }  
  108.       
  109.     public void writeFloat(float f, String tag) throws IOException {  
  110.         out.writeFloat(f);  
  111.     }  
  112.       
  113.     public void writeDouble(double d, String tag) throws IOException {  
  114.         out.writeDouble(d);  
  115.     }  
  116.   
  117. // SerializeUtils  
  118.  public static void serializeSnapshot(DataTree dt,OutputArchive oa,  
  119.             Map<Long, Integer> sessions) throws IOException {  
  120.         HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);  
  121.         oa.writeInt(sessSnap.size(), "count");  
  122.         for (Entry<Long, Integer> entry : sessSnap.entrySet()) {  
  123.             oa.writeLong(entry.getKey().longValue(), "id");  
  124.             oa.writeInt(entry.getValue().intValue(), "timeout");  
  125.         }  
  126.         dt.serialize(oa, "tree");  
  127.     }  
  128.   
  129. // DataTree  
  130.  public void serialize(OutputArchive oa, String tag) throws IOException {  
  131.         scount = 0;  
  132.         serializeList(longKeyMap, oa);  
  133.         serializeNode(oa, new StringBuilder(""));  
  134.         // / marks end of stream  
  135.         // we need to check if clear had been called in between the snapshot.  
  136.         if (root != null) {  
  137.             oa.writeString("/""path");  
  138.         }  
  139.     }  
  140.    

 

 

涉及到的几个接口和类



反序列化即把快照文件反序列化成DataTree的过程和序列化的过程正好相反,值得注意的是,反序列化时,找的是最新的,可用的snapshot文件

1. findNValidSnapshots找100个以内的快照文件,并且按照zxid从大到小排列,保证最新的快照最先被处理

2. 如果最新的快照被成功处理,就返回,否则找第二新的快照,直到结束

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // FileSnap  
  2. public long deserialize(DataTree dt, Map<Long, Integer> sessions)  
  3.             throws IOException {  
  4.         // we run through 100 snapshots (not all of them)  
  5.         // if we cannot get it running within 100 snapshots  
  6.         // we should  give up  
  7.         List<File> snapList = findNValidSnapshots(100);  
  8.         if (snapList.size() == 0) {  
  9.             return -1L;  
  10.         }  
  11.         File snap = null;  
  12.         boolean foundValid = false;  
  13.         for (int i = 0; i < snapList.size(); i++) {  
  14.             snap = snapList.get(i);  
  15.             InputStream snapIS = null;  
  16.             CheckedInputStream crcIn = null;  
  17.             try {  
  18.                 LOG.info("Reading snapshot " + snap);  
  19.                 snapIS = new BufferedInputStream(new FileInputStream(snap));  
  20.                 crcIn = new CheckedInputStream(snapIS, new Adler32());  
  21.                 InputArchive ia = BinaryInputArchive.getArchive(crcIn);  
  22.                 deserialize(dt,sessions, ia);  
  23.                 long checkSum = crcIn.getChecksum().getValue();  
  24.                 long val = ia.readLong("val");  
  25.                 if (val != checkSum) {  
  26.                     throw new IOException("CRC corruption in snapshot :  " + snap);  
  27.                 }  
  28.                 foundValid = true;  
  29.                 break;  
  30.             } catch(IOException e) {  
  31.                 LOG.warn("problem reading snap file " + snap, e);  
  32.             } finally {  
  33.                 if (snapIS != null)   
  34.                     snapIS.close();  
  35.                 if (crcIn != null)   
  36.                     crcIn.close();  
  37.             }   
  38.         }  
  39.         if (!foundValid) {  
  40.             throw new IOException("Not able to find valid snapshots in " + snapDir);  
  41.         }  
  42.         dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");  
  43.         return dt.lastProcessedZxid;  
  44.     }  
  45.   
  46. private List<File> findNValidSnapshots(int n) throws IOException {  
  47.         List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot"false);  
  48.         int count = 0;  
  49.         List<File> list = new ArrayList<File>();  
  50.         for (File f : files) {  
  51.             // we should catch the exceptions  
  52.             // from the valid snapshot and continue  
  53.             // until we find a valid one  
  54.             try {  
  55.                 if (Util.isValidSnapshot(f)) {  
  56.                     list.add(f);  
  57.                     count++;  
  58.                     if (count == n) {  
  59.                         break;  
  60.                     }  
  61.                 }  
  62.             } catch (IOException e) {  
  63.                 LOG.info("invalid snapshot " + f, e);  
  64.             }  
  65.         }  
  66.         return list;  
  67.     }  



 


 

日志文件

ZooKeeper将事务类分为两部分,TxnHeader表示事务头,包含了事务的基本信息。xxxTxn类表示具体类型的事务,包含了事务对应的操作路径和数据。

先来看看ZooKeeper中的事务基础类定义Txn,它只包含了type属性和data属性,实现了Jute Record接口,处理自身的序列化操作。具体的增删改操作事务各自定义了单独的类,都实现了Record接口。单独的事务类和Txn类可以根据type和data互相转化。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public class Txn implements Record {  
  2.   private int type;  
  3.   private byte[] data;  
  4.   public void serialize(OutputArchive a_, String tag) throws java.io.IOException {  
  5.     a_.startRecord(this,tag);  
  6.     a_.writeInt(type,"type");  
  7.     a_.writeBuffer(data,"data");  
  8.     a_.endRecord(this,tag);  
  9.   }  
  10.   public void deserialize(InputArchive a_, String tag) throws java.io.IOException {  
  11.     a_.startRecord(tag);  
  12.     type=a_.readInt("type");  
  13.     data=a_.readBuffer("data");  
  14.     a_.endRecord(tag);  
  15. }  
  16.   
  17. public class CreateTxn implements Record {  
  18.   private String path;  
  19.   private byte[] data;  
  20.   private java.util.List<org.apache.zookeeper.data.ACL> acl;  
  21.   private boolean ephemeral;  
  22.   private int parentCVersion;  
  23. }  
  24.   
  25. public class DeleteTxn implements Record {  
  26.   private String path;  
  27. }  
  28. }  
  29.   
  30. public class SetDataTxn implements Record {  
  31.   private String path;  
  32.   private byte[] data;  
  33.   private int version;  
  34. }  


单独事务类和Txn类互相转换的示例如下

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  for (Txn subtxn : txns) {  
  2.     ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());  
  3.     Record record = null;  
  4.     switch (subtxn.getType()) {  
  5.        case OpCode.create:  
  6.             record = new CreateTxn();  
  7.             break;  
  8.        case OpCode.delete:  
  9.             record = new DeleteTxn();  
  10.             break;  
  11.        case OpCode.setData:  
  12.             record = new SetDataTxn();  
  13.             break;  
  14.        case OpCode.error:  
  15.             record = new ErrorTxn();  
  16.             post_failed = true;  
  17.             break;  
  18.        case OpCode.check:  
  19.             record = new CheckVersionTxn();  
  20.             break;  
  21.        default:  
  22.             throw new IOException("Invalid type of op: " + subtxn.getType());  
  23.      }  
  24.                          
  25.   ByteBufferInputStream.byteBuffer2Record(bb, record);  
  26. }  

 

 

TxnHeader类定义了事务的基本信息,通过type可以确定具体的事务类型

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public class TxnHeader implements Record {  
  2.   private long clientId;  
  3.   private int cxid;  
  4.   private long zxid;  
  5.   private long time;  
  6.   private int type;  
  7.   public TxnHeader() {  
  8.   }  
  9.   public void serialize(OutputArchive a_, String tag) throws java.io.IOException {  
  10.     a_.startRecord(this,tag);  
  11.     a_.writeLong(clientId,"clientId");  
  12.     a_.writeInt(cxid,"cxid");  
  13.     a_.writeLong(zxid,"zxid");  
  14.     a_.writeLong(time,"time");  
  15.     a_.writeInt(type,"type");  
  16.     a_.endRecord(this,tag);  
  17.   }  
  18.   public void deserialize(InputArchive a_, String tag) throws java.io.IOException {  
  19.     a_.startRecord(tag);  
  20.     clientId=a_.readLong("clientId");  
  21.     cxid=a_.readInt("cxid");  
  22.     zxid=a_.readLong("zxid");  
  23.     time=a_.readLong("time");  
  24.     type=a_.readInt("type");  
  25.     a_.endRecord(tag);  
  26. }  

 

TxnHeader和具体事务类的交互例子如下:

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. switch (header.getType()) {  
  2.                 case OpCode.create:  
  3.                     CreateTxn createTxn = (CreateTxn) txn;  
  4.                     rc.path = createTxn.getPath();  
  5.                     createNode(  
  6.                             createTxn.getPath(),  
  7.                             createTxn.getData(),  
  8.                             createTxn.getAcl(),  
  9.                             createTxn.getEphemeral() ? header.getClientId() : 0,  
  10.                             createTxn.getParentCVersion(),  
  11.                             header.getZxid(), header.getTime());  
  12.                     break;  
  13.                 case OpCode.delete:  
  14.                     DeleteTxn deleteTxn = (DeleteTxn) txn;  
  15.                     rc.path = deleteTxn.getPath();  
  16.                     deleteNode(deleteTxn.getPath(), header.getZxid());  
  17.                     break;  
  18.                 case OpCode.setData:  
  19.                     SetDataTxn setDataTxn = (SetDataTxn) txn;  
  20.                     rc.path = setDataTxn.getPath();  
  21.                     rc.stat = setData(setDataTxn.getPath(), setDataTxn  
  22.                             .getData(), setDataTxn.getVersion(), header  
  23.                             .getZxid(), header.getTime());  
  24.                     break;  



 

ZooKeeper的日志文件接口是TxnLog接口,它提供了对事务日志的操作。 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public interface TxnLog {  
  2.       
  3.     void rollLog() throws IOException;  
  4.     
  5.     boolean append(TxnHeader hdr, Record r) throws IOException;  
  6.   
  7.     TxnIterator read(long zxid) throws IOException;  
  8.    
  9.     long getLastLoggedZxid() throws IOException;  
  10.       
  11.     boolean truncate(long zxid) throws IOException;  
  12.       
  13.     long getDbId() throws IOException;  
  14.       
  15.     void commit() throws IOException;  
  16.    
  17.     void close() throws IOException;  
  18.  }  
  19.   
  20. public interface TxnIterator {  
  21.         TxnHeader getHeader();  
  22.           
  23.         Record getTxn();  
  24.      
  25.         boolean next() throws IOException;  
  26.     
  27.         void close() throws IOException;  
  28.     }  
  29.   
  30. }  


TxnLog的默认实现类是FileTxnLog,从它的描述可以看到事务日志文件的格式如下:

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. FileHeader TxnList ZeroPad  


其中FileHeader和数据文件中的FileHeader一样,三要素。 

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. FileHeader: {  
  2.      magic 4bytes (ZKLG)  
  3.      version 4bytes  
  4.      dbid 8bytes  
  5.    }  


TxnList是事务列表,Txn表示单个事务,格式如下,Record表示具体的事务类

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. checksum Txnlen TxnHeader Record 0x42  


来看一下FileTxnLog如何写入一条事务日志

 

1. append操作负责写入一条事务日志,一条事务日志包含了TxnHeader和Record两部分,这个方法是同步方法,一次只能有一个线程写日志。

2. lastZxidSeen表示当前这个类处理过的最新的事务id,如果要写入的事务id比lastZxidSeen小,记录warn信息

3. 如果文件输出流为空,就新建一个文件输出流,文件名是log.zxid

4. 先写FileHeader文件头

5. padFile(fos)扩大文件大小,在当前位置距离文件尾部还有4KB的时候会扩大文件。 currentSize记录了当前的文件大小

6. 把TxnHeader和Txn序列化到byte数组

7. 计算checksum

8. 先写checksum,再写事务的byte数组,最后写入0x42表示end of record, 写入一条事务结束

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public synchronized boolean append(TxnHeader hdr, Record txn)  
  2.         throws IOException  
  3.     {  
  4.         if (hdr != null) {  
  5.             if (hdr.getZxid() <= lastZxidSeen) {  
  6.                 LOG.warn("Current zxid " + hdr.getZxid()  
  7.                         + " is <= " + lastZxidSeen + " for "  
  8.                         + hdr.getType());  
  9.             }  
  10.             if (logStream==null) {  
  11.                if(LOG.isInfoEnabled()){  
  12.                     LOG.info("Creating new log file: log." +    
  13.                             Long.toHexString(hdr.getZxid()));  
  14.                }  
  15.                  
  16.                logFileWrite = new File(logDir, ("log." +   
  17.                        Long.toHexString(hdr.getZxid())));  
  18.                fos = new FileOutputStream(logFileWrite);  
  19.                logStream=new BufferedOutputStream(fos);  
  20.                oa = BinaryOutputArchive.getArchive(logStream);  
  21.                FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);  
  22.                fhdr.serialize(oa, "fileheader");  
  23.                // Make sure that the magic number is written before padding.  
  24.                logStream.flush();  
  25.                currentSize = fos.getChannel().position();  
  26.                streamsToFlush.add(fos);  
  27.             }  
  28.             padFile(fos);  
  29.             byte[] buf = Util.marshallTxnEntry(hdr, txn);  
  30.             if (buf == null || buf.length == 0) {  
  31.                 throw new IOException("Faulty serialization for header " +  
  32.                         "and txn");  
  33.             }  
  34.             Checksum crc = makeChecksumAlgorithm();  
  35.             crc.update(buf, 0, buf.length);  
  36.             oa.writeLong(crc.getValue(), "txnEntryCRC");  
  37.             Util.writeTxnBytes(oa, buf);  
  38.               
  39.             return true;  
  40.         }  
  41.         return false;  
  42.     }  


看看如何获取日志文件中最新的事务id

 

1. 先从日志目录下,zxid最大的日志文件名获取zxid

2. 然后根据这个zxid获得从这个事务id开始的事务链TxnIterator。遍历这个事务链表,最后的事务就是最新的事务

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public long getLastLoggedZxid() {  
  2.         File[] files = getLogFiles(logDir.listFiles(), 0);  
  3.         long maxLog=files.length>0?  
  4.                 Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;  
  5.   
  6.         // if a log file is more recent we must scan it to find  
  7.         // the highest zxid  
  8.         long zxid = maxLog;  
  9.         TxnIterator itr = null;  
  10.         try {  
  11.             FileTxnLog txn = new FileTxnLog(logDir);  
  12.             itr = txn.read(maxLog);  
  13.             while (true) {  
  14.                 if(!itr.next())  
  15.                     break;  
  16.                 TxnHeader hdr = itr.getHeader();  
  17.                 zxid = hdr.getZxid();  
  18.             }  
  19.         } catch (IOException e) {  
  20.             LOG.warn("Unexpected exception", e);  
  21.         } finally {  
  22.             close(itr);  
  23.         }  
  24.         return zxid;  
  25.     }  


FileTxnLog的commit方法保证日志文件的持久化,flush方法是将文件文件内容写到页缓存。如果强制更新到硬盘,调用FileChannel的force方法强制从页缓存刷新到硬盘,并且记录写硬盘的时间,如果超过阀值就记录warn信息

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public synchronized void commit() throws IOException {  
  2.        if (logStream != null) {  
  3.            logStream.flush();  
  4.        }  
  5.        for (FileOutputStream log : streamsToFlush) {  
  6.            log.flush();  
  7.            if (forceSync) {  
  8.                long startSyncNS = System.nanoTime();  
  9.   
  10.                log.getChannel().force(false);  
  11.   
  12.                long syncElapsedMS =  
  13.                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);  
  14.                if (syncElapsedMS > fsyncWarningThresholdMS) {  
  15.                    LOG.warn("fsync-ing the write ahead log in "  
  16.                            + Thread.currentThread().getName()  
  17.                            + " took " + syncElapsedMS  
  18.                            + "ms which will adversely effect operation latency. "  
  19.                            + "See the ZooKeeper troubleshooting guide");  
  20.                }  
  21.            }  
  22.        }  
  23.        while (streamsToFlush.size() > 1) {  
  24.            streamsToFlush.removeFirst().close();  
  25.        }  
  26.    }  


检查点和数据恢复

 

这里的检查点就是zxid,可以根据zxid找到对应的事务日志文件,然后在最新的快照文件上进行回放进行数据的恢复。

1. Snapshot.deserialize方法会把最新的快照文件反序列化到DataTree对象和Session中去

2. 快照文件中的最大的zxid作为数据文件目前最大的zxid

3. 用这个最大的zxid + 1去事务日志文件中找事务日志

4. 如果找到了正确的事务日志,使用processTransaction方法进行事务日志的回放

5. PlayBackListener接口提供了在回放时的钩子

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // FileTxnSnapshot  
  2. public long restore(DataTree dt, Map<Long, Integer> sessions,   
  3.             PlayBackListener listener) throws IOException {  
  4.         snapLog.deserialize(dt, sessions);  
  5.         FileTxnLog txnLog = new FileTxnLog(dataDir);  
  6.         TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);  
  7.         long highestZxid = dt.lastProcessedZxid;  
  8.         TxnHeader hdr;  
  9.         try {  
  10.             while (true) {  
  11.                 // iterator points to   
  12.                 // the first valid txn when initialized  
  13.                 hdr = itr.getHeader();  
  14.                 if (hdr == null) {  
  15.                     //empty logs   
  16.                     return dt.lastProcessedZxid;  
  17.                 }  
  18.                 if (hdr.getZxid() < highestZxid && highestZxid != 0) {  
  19.                     LOG.error("{}(higestZxid) > {}(next log) for type {}",  
  20.                             new Object[] { highestZxid, hdr.getZxid(),  
  21.                                     hdr.getType() });  
  22.                 } else {  
  23.                     highestZxid = hdr.getZxid();  
  24.                 }  
  25.                 try {  
  26.                     processTransaction(hdr,dt,sessions, itr.getTxn());  
  27.                 } catch(KeeperException.NoNodeException e) {  
  28.                    throw new IOException("Failed to process transaction type: " +  
  29.                          hdr.getType() + " error: " + e.getMessage(), e);  
  30.                 }  
  31.                 listener.onTxnLoaded(hdr, itr.getTxn());  
  32.                 if (!itr.next())   
  33.                     break;  
  34.             }  
  35.         } finally {  
  36.             if (itr != null) {  
  37.                 itr.close();  
  38.             }  
  39.         }  
  40.         return highestZxid;  
  41.     }  
  42.   
  43.  public void processTransaction(TxnHeader hdr,DataTree dt,  
  44.             Map<Long, Integer> sessions, Record txn)  
  45.         throws KeeperException.NoNodeException {  
  46.         ProcessTxnResult rc;  
  47.         switch (hdr.getType()) {  
  48.         case OpCode.createSession:  
  49.             sessions.put(hdr.getClientId(),  
  50.                     ((CreateSessionTxn) txn).getTimeOut());  
  51.             if (LOG.isTraceEnabled()) {  
  52.                 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,  
  53.                         "playLog --- create session in log: 0x"  
  54.                                 + Long.toHexString(hdr.getClientId())  
  55.                                 + " with timeout: "  
  56.                                 + ((CreateSessionTxn) txn).getTimeOut());  
  57.             }  
  58.             // give dataTree a chance to sync its lastProcessedZxid  
  59.             rc = dt.processTxn(hdr, txn);  
  60.             break;  
  61.         case OpCode.closeSession:  
  62.             sessions.remove(hdr.getClientId());  
  63.             if (LOG.isTraceEnabled()) {  
  64.                 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,  
  65.                         "playLog --- close session in log: 0x"  
  66.                                 + Long.toHexString(hdr.getClientId()));  
  67.             }  
  68.             rc = dt.processTxn(hdr, txn);  
  69.             break;  
  70.         default:  
  71.             rc = dt.processTxn(hdr, txn);  
  72.         }  
  73.   
  74.         /** 
  75.          * Snapshots are lazily created. So when a snapshot is in progress, 
  76.          * there is a chance for later transactions to make into the 
  77.          * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS 
  78.          * errors could occur. It should be safe to ignore these. 
  79.          */  
  80.         if (rc.err != Code.OK.intValue()) {  
  81.             LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()  
  82.                     + ", error: " + rc.err + ", path: " + rc.path);  
  83.         }  
  84.     }  
  85.   
  86.  public interface PlayBackListener {  
  87.         void onTxnLoaded(TxnHeader hdr, Record rec);  
  88.     }  
分享到:
评论

相关推荐

    基于ssm redis solr dubbo zookeeper mysql等大型分布式电商系统.zip

    【压缩包子文件的文件名称列表】: "content_code" 提示我们这个压缩包可能包含了整个项目的源代码,用户可以下载并研究系统是如何利用上述技术实现的。通过查看和分析代码,学习者可以深入理解这些技术在实际项目中...

    Dubbo+Zookeeper+SpringMVC实现分布式服务治理框架(附件含源码).zip

    8. **源码分析**:通过提供的源码,开发者可以深入理解Dubbo、Zookeeper和SpringMVC在实际项目中的集成方式,学习如何配置、调优以及解决可能出现的问题,提升自身的分布式系统开发能力。 综上所述,这个项目不仅...

    maven ssh+ quartz + zookeeper 实现分布式任务调度的管理界面

    在分布式任务调度系统中,Zookeeper用于服务发现、配置管理、分布式锁和群集协调,确保任务调度的高可用性和一致性。 结合这些组件,该系统的工作流程可能是这样的: - 使用Maven管理项目的依赖和构建过程。 - 通过...

    分布式系统实验报告之一

    实验报告中提到的关键源代码可能是实现上述分布式系统特性的实例,例如: 1. RPC框架的客户端和服务器端代码,展示了请求和响应的交互过程。 2. 负载均衡器的实现,可能包含调度算法的代码片段。 3. 一致性算法的...

    一个基于SSM+dubbo+zookeeper的分布式系统.zip

    这是一个基于Java技术栈,融合了Spring Boot、Dubbo和Zookeeper的分布式系统项目。这个系统的核心特点是利用现代的微服务架构来实现高可用和可扩展性。以下将详细阐述涉及的技术点及其应用。 首先,Spring Boot是...

    ZOOKEEPER3.4.5

    在实际项目中,开发者可以通过解压`zookeeper-3.4.5`压缩包,了解ZooKeeper的源代码,学习其实现原理,以便更好地运用到自己的分布式系统设计中。 总之,ZooKeeper 3.4.5 在服务治理和分布式部署中的作用不可忽视,...

    netty、redis、zookeeper高并发实战-源代码

    在分布式系统中,ZooKeeper常用于解决一致性问题,确保在高并发环境下数据的一致性和可用性。 ZooKeeper在高并发中的关键应用包括: 1. **分布式锁**:通过创建临时节点实现分布式锁,保证同一时刻只有一个客户端能...

    zookeeper-3.4.9的源码

    通过深入学习和理解Zookeeper-3.4.9的源码,可以更好地了解分布式协调服务的工作原理,为构建和优化分布式系统提供宝贵的理论基础。同时,源码阅读也有助于开发者调试问题,定制功能,甚至为Zookeeper贡献代码。

    SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip

    该压缩包文件“SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip”是基于Java技术栈,利用SpringBoot、Zookeeper和Dubbo框架构建的一个分布式高并发商品秒杀系统的源代码示例。这个系统设计的目标是处理...

    dubbo-zookeeper-spring入门例子源代码

    【标题】"dubbo-zookeeper-spring入门例子源代码"涉及的是使用Dubbo、Zookeeper和Spring框架构建...通过深入研究这个示例,可以进一步理解分布式系统的设计原理和实现方式,为构建大规模、高可用的微服务架构打下基础。

    【Dubbo+Zookeeper的RPC分布式集群服务系统】服务端接口.zip

    【Dubbo+Zookeeper的RPC分布式集群服务系统】服务端接口.zip这个压缩包包含了基于Dubbo和Zookeeper构建的RPC分布式服务系统的源代码。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务框架,它提供了服务治理、负载...

    zookeeper-3.4.9.zip

    Zookeeper 3.4.9版本中包含了完整的源代码、编译脚本、文档、测试用例等,开发者可以根据需要进行编译和定制。同时,该版本已经过广泛的测试,具有良好的稳定性和性能,适用于各种规模的分布式系统。 在实际部署中...

    pg12主从高可用+timescaledb分布式节点

    本文将深入探讨如何在PostgreSQL 12版本中构建主从高可用环境,并结合TimescaleDB插件实现分布式节点,以优化时序数据存储与分析。首先,我们将理解Patroni的作用及其在高可用性设置中的角色,然后详细介绍Timescale...

    zookeeper-3.9.1.zip

    总的来说,Apache ZooKeeper是分布式系统中不可或缺的一部分,它提供的服务对于实现高可用性、一致性以及分布式环境下的数据管理至关重要。这个"zookeeper-3.9.1.zip"文件为Linux 64位用户提供了一个便捷的方式来...

    zookeeper.zip

    《Zookeeper:分布式协调服务详解》 Zookeeper是Apache Hadoop的一个子项目,它是一个分布式的,开放源码的分布式应用程序...通过学习和实践,我们可以掌握其精髓,为构建高可用、高性能的分布式系统打下坚实基础。

    zookeeper-3.4.14.zip

    4. **src**:源代码目录,对于开发者来说,可以查看和理解Zookeeper的内部工作原理。 5. **data**:默认的数据存储目录,Zookeeper在此存储每个节点的数据和元数据。 在分布式商城系统中,Zookeeper作为服务注册...

    zookeeper工具

    开发者可以通过阅读源代码了解其实现细节,或者直接使用预编译的二进制文件在自己的分布式系统中部署ZooKeeper。对于学习和使用ZooKeeper的人来说,这是一个非常有价值的资源。通过深入理解ZooKeeper的工作原理和...

    Redids和Zookepeer分布式锁实现源代码

    本文将深入探讨Redis和ZooKeeper这两种流行的分布式系统中的分布式锁实现。 **Redis分布式锁** Redis是一个高性能的键值存储系统,广泛用于缓存、消息队列等场景。在Redis中实现分布式锁主要依赖于两个命令:`...

    zookeeper+rmi开发

    文件名"ZKRmi"可能包含的是整个示例的源代码,包括ZooKeeper的客户端和服务端代码,以及RMI的实现部分。通过分析这些代码,你可以更深入地了解如何将ZooKeeper和RMI集成到你的分布式系统中,实现服务的注册、发现和...

Global site tag (gtag.js) - Google Analytics