高可用的数据存储有一个比较通用的解决方案,就是数据文件 + 日志文件的方式。比如传统数据库中的数据文件 + undo/redo日志就可以来进行数据备份和恢复,在日志文件中加入检查点checkpoint,可以更加快速地进行数据的恢复。所以对于高可用的数据存储来说,我们要考察3个方面:
- 数据文件
- 日志文件
- 检查点
- public interface SnapShot {
- long deserialize(DataTree dt, Map<Long, Integer> sessions)
- throws IOException;
- void serialize(DataTree dt, Map<Long, Integer> sessions,
- File name)
- throws IOException;
- File findMostRecentSnapshot() throws IOException;
- void close() throws IOException;
- }
1. 创建一个具备校验和的文件输出流
2. 对象的序列化采用apache jute框架,创建一个jute的OutputArchive的实现。下面给出了OutputArchive接口的定义,可以看到它和Thrift的TProtocol的定义基本一致,提供了一系列的write类型和read类型接口,是jute 序列化的顶层接口
3. OutputArchive的默认实现是BinaryOutputArchive,和Thrift的TBinaryProtocol实现基本一致,提供了二进制的序列化协议,内部采用DataOutputStream,把不同的数据类型写到Byte数组中
4. 快照文件的文件头对象FileHeader,包含一个魔数ZKSN, 版本号和dbId。 FileHeader实现了jute的Record接口,提供了serialize和deserialize方法实现
5. 快照文件体使用SerializeUtils这个辅助类来实现,先序列化Session,序列化Session时,先写一个Long类型的SessionId,再写一个int类型的timeout。再序列化DataTree,它也实现了Jute的Record类,实现了序列化自己的serialize方法
6. DataTree的serialize方法,先序列化ACL信息,再序列化DataTree中的DataNode,采用中序遍历的方式递归遍历DataTree的所有节点。最后写入"/"表示文件结尾
- // FileSnapshot
- public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
- throws IOException {
- if (!close) {
- OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
- CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
- //CheckedOutputStream cout = new CheckedOutputStream()
- OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
- FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
- serialize(dt,sessions,oa, header);
- long val = crcOut.getChecksum().getValue();
- oa.writeLong(val, "val");
- oa.writeString("/", "path");
- sessOS.flush();
- crcOut.close();
- sessOS.close();
- }
- }
- protected void serialize(DataTree dt,Map<Long, Integer> sessions,
- OutputArchive oa, FileHeader header) throws IOException {
- // this is really a programmatic error and not something that can
- // happen at runtime
- if(header==null)
- throw new IllegalStateException(
- "Snapshot's not open for writing: uninitialized header");
- header.serialize(oa, "fileheader");
- SerializeUtils.serializeSnapshot(dt,oa,sessions);
- }
- public class FileHeader implements Record {
- private int magic;
- private int version;
- private long dbid;
- public FileHeader() {
- }
- public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(this,tag);
- a_.writeInt(magic,"magic");
- a_.writeInt(version,"version");
- a_.writeLong(dbid,"dbid");
- a_.endRecord(this,tag);
- }
- public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(tag);
- magic=a_.readInt("magic");
- version=a_.readInt("version");
- dbid=a_.readLong("dbid");
- a_.endRecord(tag);
- }
- public interface Record {
- public void serialize(OutputArchive archive, String tag)
- throws IOException;
- public void deserialize(InputArchive archive, String tag)
- throws IOException;
- }
- public interface OutputArchive {
- public void writeByte(byte b, String tag) throws IOException;
- public void writeBool(boolean b, String tag) throws IOException;
- public void writeInt(int i, String tag) throws IOException;
- public void writeLong(long l, String tag) throws IOException;
- public void writeFloat(float f, String tag) throws IOException;
- public void writeDouble(double d, String tag) throws IOException;
- public void writeString(String s, String tag) throws IOException;
- public void writeBuffer(byte buf[], String tag)
- throws IOException;
- public void writeRecord(Record r, String tag) throws IOException;
- public void startRecord(Record r, String tag) throws IOException;
- public void endRecord(Record r, String tag) throws IOException;
- public void startVector(List v, String tag) throws IOException;
- public void endVector(List v, String tag) throws IOException;
- public void startMap(TreeMap v, String tag) throws IOException;
- public void endMap(TreeMap v, String tag) throws IOException;
- }
- public class BinaryOutputArchive implements OutputArchive {
- private ByteBuffer bb = ByteBuffer.allocate(1024);
- private DataOutput out;
- public static BinaryOutputArchive getArchive(OutputStream strm) {
- return new BinaryOutputArchive(new DataOutputStream(strm));
- }
- /** Creates a new instance of BinaryOutputArchive */
- public BinaryOutputArchive(DataOutput out) {
- this.out = out;
- }
- public void writeByte(byte b, String tag) throws IOException {
- out.writeByte(b);
- }
- public void writeBool(boolean b, String tag) throws IOException {
- out.writeBoolean(b);
- }
- public void writeInt(int i, String tag) throws IOException {
- out.writeInt(i);
- }
- public void writeLong(long l, String tag) throws IOException {
- out.writeLong(l);
- }
- public void writeFloat(float f, String tag) throws IOException {
- out.writeFloat(f);
- }
- public void writeDouble(double d, String tag) throws IOException {
- out.writeDouble(d);
- }
- // SerializeUtils
- public static void serializeSnapshot(DataTree dt,OutputArchive oa,
- Map<Long, Integer> sessions) throws IOException {
- HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);
- oa.writeInt(sessSnap.size(), "count");
- for (Entry<Long, Integer> entry : sessSnap.entrySet()) {
- oa.writeLong(entry.getKey().longValue(), "id");
- oa.writeInt(entry.getValue().intValue(), "timeout");
- }
- dt.serialize(oa, "tree");
- }
- // DataTree
- public void serialize(OutputArchive oa, String tag) throws IOException {
- scount = 0;
- serializeList(longKeyMap, oa);
- serializeNode(oa, new StringBuilder(""));
- // / marks end of stream
- // we need to check if clear had been called in between the snapshot.
- if (root != null) {
- oa.writeString("/", "path");
- }
- }
1. findNValidSnapshots找100个以内的快照文件,并且按照zxid从大到小排列,保证最新的快照最先被处理
2. 如果最新的快照被成功处理,就返回,否则找第二新的快照,直到结束
- // FileSnap
- public long deserialize(DataTree dt, Map<Long, Integer> sessions)
- throws IOException {
- // we run through 100 snapshots (not all of them)
- // if we cannot get it running within 100 snapshots
- // we should give up
- List<File> snapList = findNValidSnapshots(100);
- if (snapList.size() == 0) {
- return -1L;
- }
- File snap = null;
- boolean foundValid = false;
- for (int i = 0; i < snapList.size(); i++) {
- snap = snapList.get(i);
- InputStream snapIS = null;
- CheckedInputStream crcIn = null;
- try {
- LOG.info("Reading snapshot " + snap);
- snapIS = new BufferedInputStream(new FileInputStream(snap));
- crcIn = new CheckedInputStream(snapIS, new Adler32());
- InputArchive ia = BinaryInputArchive.getArchive(crcIn);
- deserialize(dt,sessions, ia);
- long checkSum = crcIn.getChecksum().getValue();
- long val = ia.readLong("val");
- if (val != checkSum) {
- throw new IOException("CRC corruption in snapshot : " + snap);
- }
- foundValid = true;
- break;
- } catch(IOException e) {
- LOG.warn("problem reading snap file " + snap, e);
- } finally {
- if (snapIS != null)
- snapIS.close();
- if (crcIn != null)
- crcIn.close();
- }
- }
- if (!foundValid) {
- throw new IOException("Not able to find valid snapshots in " + snapDir);
- }
- dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
- return dt.lastProcessedZxid;
- }
- private List<File> findNValidSnapshots(int n) throws IOException {
- List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
- int count = 0;
- List<File> list = new ArrayList<File>();
- for (File f : files) {
- // we should catch the exceptions
- // from the valid snapshot and continue
- // until we find a valid one
- try {
- if (Util.isValidSnapshot(f)) {
- list.add(f);
- count++;
- if (count == n) {
- break;
- }
- }
- } catch (IOException e) {
- LOG.info("invalid snapshot " + f, e);
- }
- }
- return list;
- }
先来看看ZooKeeper中的事务基础类定义Txn,它只包含了type属性和data属性,实现了Jute Record接口,处理自身的序列化操作。具体的增删改操作事务各自定义了单独的类,都实现了Record接口。单独的事务类和Txn类可以根据type和data互相转化。
- public class Txn implements Record {
- private int type;
- private byte[] data;
- public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(this,tag);
- a_.writeInt(type,"type");
- a_.writeBuffer(data,"data");
- a_.endRecord(this,tag);
- }
- public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(tag);
- type=a_.readInt("type");
- data=a_.readBuffer("data");
- a_.endRecord(tag);
- }
- public class CreateTxn implements Record {
- private String path;
- private byte[] data;
- private java.util.List<org.apache.zookeeper.data.ACL> acl;
- private boolean ephemeral;
- private int parentCVersion;
- }
- public class DeleteTxn implements Record {
- private String path;
- }
- }
- public class SetDataTxn implements Record {
- private String path;
- private byte[] data;
- private int version;
- }
- for (Txn subtxn : txns) {
- ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
- Record record = null;
- switch (subtxn.getType()) {
- case OpCode.create:
- record = new CreateTxn();
- break;
- case OpCode.delete:
- record = new DeleteTxn();
- break;
- case OpCode.setData:
- record = new SetDataTxn();
- break;
- case OpCode.error:
- record = new ErrorTxn();
- post_failed = true;
- break;
- case OpCode.check:
- record = new CheckVersionTxn();
- break;
- default:
- throw new IOException("Invalid type of op: " + subtxn.getType());
- }
- ByteBufferInputStream.byteBuffer2Record(bb, record);
- }
- public class TxnHeader implements Record {
- private long clientId;
- private int cxid;
- private long zxid;
- private long time;
- private int type;
- public TxnHeader() {
- }
- public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(this,tag);
- a_.writeLong(clientId,"clientId");
- a_.writeInt(cxid,"cxid");
- a_.writeLong(zxid,"zxid");
- a_.writeLong(time,"time");
- a_.writeInt(type,"type");
- a_.endRecord(this,tag);
- }
- public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
- a_.startRecord(tag);
- clientId=a_.readLong("clientId");
- cxid=a_.readInt("cxid");
- zxid=a_.readLong("zxid");
- time=a_.readLong("time");
- type=a_.readInt("type");
- a_.endRecord(tag);
- }
- switch (header.getType()) {
- case OpCode.create:
- CreateTxn createTxn = (CreateTxn) txn;
- rc.path = createTxn.getPath();
- createNode(
- createTxn.getPath(),
- createTxn.getData(),
- createTxn.getAcl(),
- createTxn.getEphemeral() ? header.getClientId() : 0,
- createTxn.getParentCVersion(),
- header.getZxid(), header.getTime());
- break;
- case OpCode.delete:
- DeleteTxn deleteTxn = (DeleteTxn) txn;
- rc.path = deleteTxn.getPath();
- deleteNode(deleteTxn.getPath(), header.getZxid());
- break;
- case OpCode.setData:
- SetDataTxn setDataTxn = (SetDataTxn) txn;
- rc.path = setDataTxn.getPath();
- rc.stat = setData(setDataTxn.getPath(), setDataTxn
- .getData(), setDataTxn.getVersion(), header
- .getZxid(), header.getTime());
- break;
- public interface TxnLog {
- void rollLog() throws IOException;
- boolean append(TxnHeader hdr, Record r) throws IOException;
- TxnIterator read(long zxid) throws IOException;
- long getLastLoggedZxid() throws IOException;
- boolean truncate(long zxid) throws IOException;
- long getDbId() throws IOException;
- void commit() throws IOException;
- void close() throws IOException;
- }
- public interface TxnIterator {
- TxnHeader getHeader();
- Record getTxn();
- boolean next() throws IOException;
- void close() throws IOException;
- }
- }
- FileHeader TxnList ZeroPad
- FileHeader: {
- magic 4bytes (ZKLG)
- version 4bytes
- dbid 8bytes
- }
- checksum Txnlen TxnHeader Record 0x42
1. append操作负责写入一条事务日志,一条事务日志包含了TxnHeader和Record两部分,这个方法是同步方法,一次只能有一个线程写日志。
2. lastZxidSeen表示当前这个类处理过的最新的事务id,如果要写入的事务id比lastZxidSeen小,记录warn信息
3. 如果文件输出流为空,就新建一个文件输出流,文件名是log.zxid
4. 先写FileHeader文件头
5. padFile(fos)扩大文件大小,在当前位置距离文件尾部还有4KB的时候会扩大文件。 currentSize记录了当前的文件大小
6. 把TxnHeader和Txn序列化到byte数组
7. 计算checksum
8. 先写checksum,再写事务的byte数组,最后写入0x42表示end of record, 写入一条事务结束
- public synchronized boolean append(TxnHeader hdr, Record txn)
- throws IOException
- {
- if (hdr != null) {
- if (hdr.getZxid() <= lastZxidSeen) {
- LOG.warn("Current zxid " + hdr.getZxid()
- + " is <= " + lastZxidSeen + " for "
- + hdr.getType());
- }
- if (logStream==null) {
- if(LOG.isInfoEnabled()){
- LOG.info("Creating new log file: log." +
- Long.toHexString(hdr.getZxid()));
- }
- logFileWrite = new File(logDir, ("log." +
- Long.toHexString(hdr.getZxid())));
- fos = new FileOutputStream(logFileWrite);
- logStream=new BufferedOutputStream(fos);
- oa = BinaryOutputArchive.getArchive(logStream);
- FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
- fhdr.serialize(oa, "fileheader");
- // Make sure that the magic number is written before padding.
- logStream.flush();
- currentSize = fos.getChannel().position();
- streamsToFlush.add(fos);
- }
- padFile(fos);
- byte[] buf = Util.marshallTxnEntry(hdr, txn);
- if (buf == null || buf.length == 0) {
- throw new IOException("Faulty serialization for header " +
- "and txn");
- }
- Checksum crc = makeChecksumAlgorithm();
- crc.update(buf, 0, buf.length);
- oa.writeLong(crc.getValue(), "txnEntryCRC");
- Util.writeTxnBytes(oa, buf);
- return true;
- }
- return false;
- }
1. 先从日志目录下,zxid最大的日志文件名获取zxid
2. 然后根据这个zxid获得从这个事务id开始的事务链TxnIterator。遍历这个事务链表,最后的事务就是最新的事务
- public long getLastLoggedZxid() {
- File[] files = getLogFiles(logDir.listFiles(), 0);
- long maxLog=files.length>0?
- Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
- // if a log file is more recent we must scan it to find
- // the highest zxid
- long zxid = maxLog;
- TxnIterator itr = null;
- try {
- FileTxnLog txn = new FileTxnLog(logDir);
- itr = txn.read(maxLog);
- while (true) {
- if(!itr.next())
- break;
- TxnHeader hdr = itr.getHeader();
- zxid = hdr.getZxid();
- }
- } catch (IOException e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- close(itr);
- }
- return zxid;
- }
- public synchronized void commit() throws IOException {
- if (logStream != null) {
- logStream.flush();
- }
- for (FileOutputStream log : streamsToFlush) {
- log.flush();
- if (forceSync) {
- long startSyncNS = System.nanoTime();
- log.getChannel().force(false);
- long syncElapsedMS =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
- if (syncElapsedMS > fsyncWarningThresholdMS) {
- LOG.warn("fsync-ing the write ahead log in "
- + Thread.currentThread().getName()
- + " took " + syncElapsedMS
- + "ms which will adversely effect operation latency. "
- + "See the ZooKeeper troubleshooting guide");
- }
- }
- }
- while (streamsToFlush.size() > 1) {
- streamsToFlush.removeFirst().close();
- }
- }
1. Snapshot.deserialize方法会把最新的快照文件反序列化到DataTree对象和Session中去
2. 快照文件中的最大的zxid作为数据文件目前最大的zxid
3. 用这个最大的zxid + 1去事务日志文件中找事务日志
4. 如果找到了正确的事务日志,使用processTransaction方法进行事务日志的回放
5. PlayBackListener接口提供了在回放时的钩子
- // FileTxnSnapshot
- public long restore(DataTree dt, Map<Long, Integer> sessions,
- PlayBackListener listener) throws IOException {
- snapLog.deserialize(dt, sessions);
- FileTxnLog txnLog = new FileTxnLog(dataDir);
- TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
- long highestZxid = dt.lastProcessedZxid;
- TxnHeader hdr;
- try {
- while (true) {
- // iterator points to
- // the first valid txn when initialized
- hdr = itr.getHeader();
- if (hdr == null) {
- //empty logs
- return dt.lastProcessedZxid;
- }
- if (hdr.getZxid() < highestZxid && highestZxid != 0) {
- LOG.error("{}(higestZxid) > {}(next log) for type {}",
- new Object[] { highestZxid, hdr.getZxid(),
- hdr.getType() });
- } else {
- highestZxid = hdr.getZxid();
- }
- try {
- processTransaction(hdr,dt,sessions, itr.getTxn());
- } catch(KeeperException.NoNodeException e) {
- throw new IOException("Failed to process transaction type: " +
- hdr.getType() + " error: " + e.getMessage(), e);
- }
- listener.onTxnLoaded(hdr, itr.getTxn());
- if (!itr.next())
- break;
- }
- } finally {
- if (itr != null) {
- itr.close();
- }
- }
- return highestZxid;
- }
- public void processTransaction(TxnHeader hdr,DataTree dt,
- Map<Long, Integer> sessions, Record txn)
- throws KeeperException.NoNodeException {
- ProcessTxnResult rc;
- switch (hdr.getType()) {
- case OpCode.createSession:
- sessions.put(hdr.getClientId(),
- ((CreateSessionTxn) txn).getTimeOut());
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
- "playLog --- create session in log: 0x"
- + Long.toHexString(hdr.getClientId())
- + " with timeout: "
- + ((CreateSessionTxn) txn).getTimeOut());
- }
- // give dataTree a chance to sync its lastProcessedZxid
- rc = dt.processTxn(hdr, txn);
- break;
- case OpCode.closeSession:
- sessions.remove(hdr.getClientId());
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
- "playLog --- close session in log: 0x"
- + Long.toHexString(hdr.getClientId()));
- }
- rc = dt.processTxn(hdr, txn);
- break;
- default:
- rc = dt.processTxn(hdr, txn);
- }
- /**
- * Snapshots are lazily created. So when a snapshot is in progress,
- * there is a chance for later transactions to make into the
- * snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS
- * errors could occur. It should be safe to ignore these.
- */
- if (rc.err != Code.OK.intValue()) {
- LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()
- + ", error: " + rc.err + ", path: " + rc.path);
- }
- }
- public interface PlayBackListener {
- void onTxnLoaded(TxnHeader hdr, Record rec);
- }
