`

Zookeeper故障总结

 
阅读更多

背景:

10月29号,系统切换的时候,发生了master信息的变更,有个监听master信息的系统未收到zookeeper的通知。

 

分析:

原代码如下所示, 监听系统中用以下代码来监听path中数据的变更。当数据发生变更的时候,会回调process方法,然后处理相应的业务。(我们使用的是curator的jar包)

		client = CuratorFrameworkFactory.newClient("*:2181", new ExponentialBackoffRetry(1000, 3));
		client.start();
		try {
			client.getData().usingWatcher(this).inBackground().forPath("/id/master");
		} catch (Exception e) {
			throw new RuntimeException(e);
		}

 问题点: 

    以上代码再第一次发生变更的时候,可以接收到zookeeper的变更通知,但是在zookeeper server会将watcher删除,在以后的变更中将不会继续通知。

 

源码分析:

   当调用forPath函数时将会调用GetDataBuilderImpl的forPath方法,这个方法中会异步处理操作

  

    @Override
    public byte[] forPath(String path) throws Exception
    {
        path = client.fixForNamespace(path);

        byte[]      responseData = null;
        if ( backgrounding.inBackground() )
        {
            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
        }
        else
        {
            responseData = pathInForeground(path);
        }
        return responseData;
    }

    

    异步调用GetDataBuilderImpl的performBackgroundOperation方法。然后会调用appcahe原生zookeeper的jar包。

    @Override
    public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
    {
        if ( watching.isWatched() )
        {
            client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
        }
    }

    然后会调用zookeeper.java中的getData函数,发送请求到zookeeper server. setWatch为true

 public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)
    {
        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
    }

    zookeeper server接收到客户端的getData请求时,会调用FinalRequestProcessor类中的以下代码处理

case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest); 
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }

    接下去会调用DataTree中的getData函数,这个函数中会将监听的节点加入到WatcherManager中,这样我们就添加了对某个路径的监听

    

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path); 
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }

    当另一个客户端调用setData方法时,Zookeeper客户端将会调用DataTree类中的setData方法。这个方法将会变更数据,然后调用triggerWatch方法,通知变更。

 public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path); 
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

    然后调用WatcherManager中的triggerWatch方法,方法中将会删除监听。然后调用watcher的process方法

 public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {                 
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }

    调用NIOServerCnxn类中的process方法,发送通知到客户端。

    

 synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }

   

     客户端接收到notification时,会调用ClientCnxn类的下面代码处理。queueEvent方法中会调用watcher.materialize方法确定watcherEvent的处理类

     

            if (replyHdr.getXid() == -1) {                 
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response"); 
                WatchedEvent we = new WatchedEvent(event); 
                eventThread.queueEvent( we );
                return;
            }

          public void queueEvent(WatchedEvent event) {
            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }

     调用zookeeper的materialize方法获取watcher对象,并且重watcher列表中删除。

       @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
            case None:
                result.add(defaultWatcher);
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;

                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
        }

     总结:

     从上面源码,我们可以看出,zookeeper的每次只能监听一次变更。

 

      解决: 

      curator为我们封装了重复监听的类NodeCache.java. 它是怎么实现的呢?

NodeCache nc = new NodeCache(zkclient.getClient(), path);
nc.start(true);

     start方法会获取初始值,然后调用reset方法

public void     start(boolean buildInitial) throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        ensurePath.ensure(client.getZookeeperClient());

        client.getConnectionStateListenable().addListener(connectionStateListener);

        if ( buildInitial )
        {
            internalRebuild();
        }
        reset();
    }

    reset方法会调用zookeeper的forpath方法,并用包装的watcher类

private void     reset() throws Exception
    {
        if ( (state.get() == State.STARTED) && isConnected.get() )
        {
            client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }

    当配置发生变化时,watcher类中会重新调用reset方法,再次监听

    private final CuratorWatcher watcher = new CuratorWatcher()
    {
        @Override
        public void process(WatchedEvent event) throws Exception
        {
            reset();
        }
    };

 

 

 

分享到:
评论

相关推荐

    zookeeper 系列整理总结

    ZooKeeper通过 zab(ZooKeeper Atomic Broadcast)协议保证了强一致性,而Leader选举则是ZooKeeper集群中实现高可用性的关键过程,确保即使有服务器故障,服务仍能继续运行。 总的来说,ZooKeeper是一个强大的工具...

    zookeeper使用总结

    ### Zookeeper 使用总结 #### ZOOKEEPER 概述 - **Zookeeper 介绍** Zookeeper 是一个分布式协调服务框架,旨在简化分布式应用程序的开发。它提供了一个高性能的协同工作系统,使得开发者能够专注于应用程序的...

    Zookeeper_安装和配置

    - **集群管理**:Zookeeper 可以监控集群中节点的状态,当节点发生变化时,可以通过事件通知其他节点,实现集群的动态扩展和故障恢复。 ### Zookeeper 集群模式 在实际生产环境中,Zookeeper 通常运行在真正的集群...

    zookeeper3.6.0-linux版本

    总结,ZooKeeper是分布式系统中不可或缺的组件,它的稳定性和高性能使其在大数据、云计算等领域广泛应用。理解并熟练掌握ZooKeeper的原理和使用,对于Linux运维人员来说至关重要,能够帮助构建更加可靠的分布式服务...

    linux中zookeeper安装包zookeeper-3.4.8.tar

    总结来说,Zookeeper-3.4.8是一个强大的分布式协调服务,其安装包中的`zookeeper-3.4.8.jar`和`slf4j-api-1.6.1.jar`是运行和管理Zookeeper所必需的。通过理解和掌握Zookeeper的工作原理及其配置,开发者可以有效地...

    zookeeper-3.4.6.rar

    在使用ZooKeeper时,需要注意一些最佳实践,例如,为了保证高可用性,通常会部署一个由多个节点组成的ZooKeeper集群,每个节点都应配置奇数个,以防止因节点故障导致半数以上节点不可用的情况。此外,合理规划ZNode...

    zookeeper解压版安装包

    总结,Zookeeper 3.4.6作为一款强大的分布式协调服务,对于搭建Dubbo这样的分布式应用至关重要。了解其安装、配置、集群搭建以及与Dubbo的集成方法,是成功构建和维护分布式系统的关键步骤。通过深入理解Zookeeper的...

    zookeeper-3.4.14.zip

    总结来说,Zookeeper在Kafka中的作用不可忽视,它是Kafka实现高效、可靠、可扩展的关键因素。通过理解Zookeeper的工作原理,我们可以更好地优化Kafka集群的配置和管理,提高系统的稳定性和性能。在实际操作中,我们...

    zookeeper-3.4.6.tar

    同时,Zookeeper与Dubbo的结合,使得服务治理变得更加简便,例如服务注册与发现、负载均衡、故障转移等功能,大大提升了分布式服务的可扩展性和可靠性。 在Zookeeper 3.4.6的压缩包`zookeeper-3.4.6.tar`中,包含了...

    zookeeper-3.4.12--.rar

    总结,Zookeeper 3.4.12 是一个强大的分布式协调框架,它为开发者提供了构建高可用分布式系统的基础。了解并熟练掌握其核心概念和用法,对于构建可扩展、容错的分布式应用至关重要。通过深入学习和实践,我们可以更...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践&zookeeper;-3.4.6总结

    例如,在Hadoop中,ZooKeeper用于管理NameNode的元数据,防止单点故障;在Spark中,它负责任务调度和集群管理;在HBase中,它帮助管理RegionServer的状态和分配。 总的来说,从Paxos到ZooKeeper的演进体现了分布式...

    zookeeper客户端 图形化界面

    总结,Zookeeper客户端图形化界面为Zookeeper的管理和维护提供了便利,通过直观的图形展示和便捷的操作,使得Zookeeper的管理变得更加高效和人性化。无论是开发人员还是运维人员,都能从中受益,提升工作效率。在...

    dubbo2.6.0 + Zookeeper3.4.9 + Zookeeper3.8.0 + Zookeeper3.7.1

    Dubbo的优秀服务治理功能和Zookeeper的协调能力共同确保了系统的高效运行和故障恢复。在使用过程中,根据项目需求和兼容性选择合适的组件版本是关键,同时保持对最新技术趋势的关注也是必要的,以便及时利用新功能和...

    zookeeper3.4.5.rar

    这种设计使得Zookeeper具有高可用性,即使部分服务器故障,系统仍能正常运行。 三、Zookeeper数据模型 Zookeeper的数据模型是一个层次化的命名空间,类似于文件系统的目录结构,称为ZNode。每个ZNode都可以存储数据...

    zookeeper3.4.13

    总结,Zookeeper 3.4.13是分布式系统中不可或缺的工具,其强大的功能和稳定的性能使得它在云计算、大数据等领域广泛应用。理解和掌握Zookeeper的使用,对于构建和维护大规模分布式系统具有重要意义。

    Zookeeper中文开发指南

    **Zookeeper中文开发指南** ...总结来说,“Zookeeper中文开发指南”是一本全面介绍Zookeeper的参考资料,它将帮助开发者深入理解Zookeeper的原理、使用方法和最佳实践,对于构建和管理分布式系统具有很高的参考价值。

    zookeeper for windows 3.4.8

    总结,Zookeeper for Windows 3.4.8虽然提供了在Windows环境下的运行可能,但考虑到其性能和稳定性,推荐在生产环境中使用Linux版本。通过理解和掌握Zookeeper的基本概念、安装配置以及使用方法,开发者能够更好地在...

Global site tag (gtag.js) - Google Analytics