- 浏览: 455163 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
goody9807:
goody9807 写道server.1和server.2启动 ...
深入浅出Zookeeper之五 Leader选举 -
goody9807:
server.1和server.2启动后,无法选出Leader ...
深入浅出Zookeeper之五 Leader选举 -
小黄牛:
分布式事务解决方案演示效果:http://www.iqiyi. ...
深入浅出Zookeeper之七分布式CREATE事务处理 -
chenghaitao111111:
兄弟syncedUpdateBrokersInfo这个代码和g ...
[metaq]Producer -
victory_gwb:
我看的3.4.6版本中takeSnapshot(); 这个方 ...
深入浅出Zookeeper之一Server启动
大名鼎鼎的Zookeeper是解决分布式问题的神器。小编最近简单阅读了代码,分享一下。有不对之处,还请大家指出。
整篇文章将分多个系列完成,因为涉及点比较多,很难在一片文章内搞定。关于zookeeper的使用场景,大家参考http://rdc.taobao.com/team/jm/archives/1232。api使用参考官网手http://zookeeper.apache.org/doc/trunk/。这里以最新的zookeeper3.4.5为例。
这个系列的第一篇来说说zookeeper server端的启动,以单机为例,分布式zookeeper将在后续专门分析。
单机版启动类ZooKeeperServerMain
protected void initializeAndRun(String[] args) throws ConfigException, IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } //解析配置文件zoo.cfg ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } //启动 runFromConfig(config); }
具体解析:
public void parse(String path) throws ConfigException { QuorumPeerConfig config = new QuorumPeerConfig(); config.parse(path); // let qpconfig parse the file and then pull the stuff we are // interested in readFrom(config); }
启动
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args ZooKeeperServer zkServer = new ZooKeeperServer(); //2个文件,log和data文件 FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); zkServer.setTxnLogFactory(ftxn); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); //连接工厂,默认NIOServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); //初始化主线程,打开selector,并bind端口,打开NIO的ACCEPT通知 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); //并生成最新的snapshot文件,启动IO主线程,从snapshot文件和log文件中恢复内存database结构和session结构 cnxnFactory.startup(zkServer); //启动线程等待之前启动的主线程结束 cnxnFactory.join(); if (zkServer.isRunning()) { zkServer.shutdown(); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } }
具体startup流程:
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { //启动IO主线程 start(); //从log和snapshot回复database和session,并重新生成一个最新的snapshot文件 zks.startdata(); //启动sessionTracker线程,初始化IO请求的处理链,并启动每个processor线程 zks.startup(); setZooKeeperServer(zks); }
具体恢复过程:
public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { //初始化database zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } }
DataTree用Map实现,key是节点名称,value是DataNode,DataNode从有parent指向父亲节点,有children指向所有孩子节点
public DataTree() { /* Rather than fight it, let root have an alias */ //'/','/zookeeper','/zookeeper/quota'3个系统节点初始化 nodes.put("", root); nodes.put(rootZookeeper, root); /** add the proc node and quota node */ root.addChild(procChildZookeeper); nodes.put(procZookeeper, procDataNode); procDataNode.addChild(quotaChildZookeeper); nodes.put(quotaZookeeper, quotaDataNode); }
具体恢复数据
public void loadData() throws IOException, InterruptedException { //执行恢复,并返回最新的事务ID setZxid(zkDb.loadDataBase()); // Clean up dead sessions //清理session LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } //生成最新的snapshot文件 // Make a clean snapshot takeSnapshot(); }
load过程:
public long loadDataBase() throws IOException { //load过程中,发起分布式提议,对于单机版,先不考虑 PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); addCommittedProposal(r); } }; //load数据 long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); initialized = true;
restore过程:
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { //从FileSnap中恢复 snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; //从snapshot中记录的最新的事务开始处理,将log中的事务merge到datatree中 while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } return highestZxid; }
FileSnap恢复过程:
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up //找前100个snapshot文件,降序,最新的文件在最前面 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } //从最新的文件开始恢复,如果反序列化ok而且checksum也ok,则恢复结束 File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt,sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } //snapshot文件名就记录着最新的zxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot"); return dt.lastProcessedZxid; }
单个事务处理:
public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { //创建session case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); ...... // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "playLog --- close session in log: 0x" + Long.toHexString(hdr.getClientId())); } rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } ...... }
DataTree处理单个事务
public ProcessTxnResult processTxn(TxnHeader header, Record txn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime()); break; case OpCode.delete: DeleteTxn deleteTxn = (DeleteTxn) txn; rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); break; case OpCode.setData: SetDataTxn setDataTxn = (SetDataTxn) txn; rc.path = setDataTxn.getPath(); rc.stat = setData(setDataTxn.getPath(), setDataTxn .getData(), setDataTxn.getVersion(), header .getZxid(), header.getTime()); break; 》 ...... /* * A snapshot might be in progress while we are modifying the data * tree. If we set lastProcessedZxid prior to making corresponding * change to the tree, then the zxid associated with the snapshot * file will be ahead of its contents. Thus, while restoring from * the snapshot, the restore method will not apply the transaction * for zxid associated with the snapshot file, since the restore * method assumes that transaction to be present in the snapshot. * * To avoid this, we first apply the transaction and then modify * lastProcessedZxid. During restore, we correctly handle the * case where the snapshot contains data ahead of the zxid associated * with the file. */ //处理完事务后,再修改最新Zxid,如果是先修改Zxid再处理事务,修改完Zxid后,正好异步线程flush datatree //此时由于事务并没有被处理,导致snapshot中的zxid比content新,而restore的时候是从最新zxid+1开始恢复的,从而 //导致丢数据 if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } ...... return rc; }
以上就完成了server的数据恢复过程,LSM的精华所在。
接下来server启动sessionTracker线程和请求处理链
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
核心IO线程
public void run() { while (!ss.socket().isClosed()) { try { //select过程 selector.select(1000); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>( selected); //打乱顺序 Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { //新连接进来,accept之 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); //校验同个client连接数是否超过限制 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); //异步模式 sc.configureBlocking(false); //监听read事件 SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); //创建内部连接 NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); //添加到连接表,方便后续统计 addCnxn(cnxn); } } //如果是read和write事件,则处理之 else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } //准备下次IO selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method"); }
具体io处理过程,将在后续结合实例来讲解。
至此server启动完成,就等待client去连接了。server启动核心功能就是从snapshot和log文件中恢复datatree,其核心就是zxid,典型的LSM应用。
评论
2 楼
victory_gwb
2016-02-25
我看的3.4.6版本中takeSnapshot(); 这个方法在SyncRequestProcessor中引用,没有在loadData()中呀
1 楼
迪伦少校
2015-10-26
如此好文竟然没人顶,赞一个!最近也在学习zookeeper,希望多交流!
相关推荐
### 深入浅出 Zookeeper #### 一、引言与基础知识 ##### 自序 在深入了解Zookeeper之前,我们不妨先从一位实践者的视角出发。最初接触到Zookeeper时,很多人可能会感到困惑,尤其是当其与Kafka这样的分布式消息...
深入浅出Zookeeper核心原理_1
深入浅出Zookeeper核心原理_2
深入浅出Zookeeper核心原理_3
深入浅出Zookeeper核心原理_4
深入浅出Zookeeper核心原理_5
深入浅出Zookeeper核心原理_6
深入浅出Zookeeper核心原理_7
深入浅出Zookeeper核心原理_8
深入浅出Zookeeper核心原理_9
Zookeeper 是一个分布式协调服务,源自 Google 的 Chubby 实现,是 Hadoop 生态系统中的重要组成部分。它的设计目标是解决分布式环境中的协调问题,如同步服务、配置维护和命名服务。Zookeeper 提供了一组简单的原语...
ZooKeeper是一款分布式的、开放源码的分布式应用程序协调服务,最初是由Google设计实现的Chubby的开源版本。ZooKeeper在分布式系统中扮演的角色主要是提供配置服务、命名服务、分布式锁服务等功能,广泛应用于Hadoop...
本篇文章将聚焦于Zookeeper的一个典型应用——ConfigServer,通过分析具体的代码样例,深入理解其工作原理和实现方式。 一、Zookeeper简介 Zookeeper是由Apache软件基金会开发的开源项目,它是一个为分布式应用...
Zookeeper是一个分布式协调服务,可用于服务发现,分布式锁,分布式领导选举,配置管理等。这一切的基础,都是Zookeeper提供了一个类似于Linux文件系统的树形结构(可认为是轻量级的内存文件系统,但只适合存少量...
构建一个分布式应用是一个很复杂的事情,主要的原因是我们需要合理有效的处理分布式集群中的部分失败的问题。例如,集群中的节点在相互通信时,A节点向B节点发送消息。A节点如果想知道消息是否发送成功,只能由B节点...
这段代码首先创建了一个 Zookeeper 连接,然后在 `/testNode` 路径下创建了一个持久化的节点,并打印出节点的数据。 总的来说,Zookeeper 是一个强大的工具,广泛应用于分布式系统中,帮助解决一致性、协调和管理等...
zookeeper启动脚本,zookeeper启动脚本,zookeeper启动脚本
unable to connect to ZooKeeper server解决方案(亲测可用)