`
农村外出务工男JAVA
  • 浏览: 105653 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

zookeeper事件监控源码解析

阅读更多

一:zk的客户端注册Watcher源码剖析
     zookeeper有很多中方式都可以传入一个watcher对象,比如exist,getData等,下面我们以getData的源码来剖析zk的client和server端是如何处理实现整个watcher机制的。
   原生的zookeeper API在获得节点数据的时候,可以通过getData方法获取,并且可以传入一个watcher对象,表示对当前节点进行监听,当节点的数据发送了改变,客户端会收到server端的通知事件告知该节点状态发生了改变。下面我们根据getData()方法来看看zk是如何进行事件监听处理的。
     在开始下面的讲解之前,先了解一下几个概念:
  1.1 Packet
     Packet是Zookeeper中用来通信的最小单元,所以任何需要网络进行传输的对象都需要包装成Packet对象。
  1.2 SendThead
     SendThread是Zookeeper中专门用来接收事件通知的线程,当服务端响应了客户端的请求后,会交给SendThread处理。
  1.3 EventThread
     EventThread是Zookeeper专门用来处理事件通知的线程,SendThread在接收到通知事件后会将事件传给EventThread进行处理。
  1.4:ServerCnxn
    ServerCnxn代表的是一个客户端和服务端的连接,客户端像服务端注册Watcher的时候,并不会真正将Watcher对象传递到服务端,而服务端也仅仅是保存了与当前客户端连接的ServerCnxn对象。
    getData()方法如下:

public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
       .....................
        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        ....................
        return response.getData();
    }

   (1)当客户端传入一个watcher对象的时候,如果不为null,首先会暂时将watcher包装成一个WatchRegistration,WatchRegistration是一个抽象类,里面保存了watcher对象和节点路径之间的关系。
   (2)标记request,如果watcher不为null,将该请求标记为需要使用watcher监听,而RequestHeader标记的是一些头信息。
   (3)ClientCnxn提交请求,封装成packet对象
    

  public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }

     其中重点看下queuePacket方法,它的源代码如下:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        ..............
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

    (4) SendThread.readResponse()方法接收服务器端返回

     class SendThread extends Thread {
        ............
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            ..............
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }
              ................
            } finally {
                finishPacket(packet);
            }
        }

   可以看到它将WatchRegistration对象封装成Packet对象,然后放到发送队列里面去,这里需要注意的是zk并不是把整个WatchRegistration对象都封装到Packet中:
   其调用ClientCnxnSocket的sendPacket方法将封装好的packet对象传入, 源码如下: 

  @Override
    void sendPacket(Packet p) throws IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        p.createBB();
        ByteBuffer pbb = p.bb;
        sock.write(pbb);
    }
    public void createBB() {
         try {
            ............
            if (requestHeader != null) {
                requestHeader.serialize(boa, "header");
            }
            if (request instanceof ConnectRequest) {
                request.serialize(boa, "connect");
                // append "am-I-allowed-to-be-readonly" flag
                boa.writeBool(readOnly, "readOnly");
            } else if (request != null) {
                request.serialize(boa, "request");
            }
            ........
        } catch (IOException e) {
            LOG.warn("Ignoring unexpected exception", e);
        }
    }

      源码里的finishPacket()方法会将从刚刚我们封装的packet中取出来,通过调用 p.watchRegistration.register()会将我们暂时保存的Watcher对象
   保存到ZKWatcherManager中,从这里我们也可以看到,实际上zk并没有真正把watcher对象传递到了服务端,而是会将watcher存在本地的WatcherManager中等收到服务端的响应后再从本地的ZKWatcherManager中取。
    

private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }
       ............
    }

       本例中的getData对应的是DataWatchRegistration

class DataWatchRegistration extends WatchRegistration {
        public DataWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
        }
    }

       会放到本地ZKWatcherManager的dataWatches中去,其中dataWatches是一个Map,存储了数据节点路径和Watcher的映射关系
  

 private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        .............
    }   

   二:服务端Watcher处理
         当服务端收到了客户端的请求后,如果客户端标记了需要使用Watcher监听,服务端会触发相应的事件
     

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
       WatchedEvent e = new WatchedEvent(type,
               KeeperState.SyncConnected, path);
       HashSet<Watcher> watchers;
       synchronized (this) {
           watchers = watchTable.remove(path);
           if (watchers == null || watchers.isEmpty()) {
               if (LOG.isTraceEnabled()) {
                   ZooTrace.logTraceMessage(LOG,
                           ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                           "No watchers for " + path);
               }
               return null;
           }
           for (Watcher w : watchers) {
               HashSet<String> paths = watch2Paths.get(w);
               if (paths != null) {
                   paths.remove(path);
               }
           }
       }
       for (Watcher w : watchers) {
           if (supress != null && supress.contains(w)) {
               continue;
           }
           w.process(e);
       }
       return watchers;
   }  

        第一步:先将事件类型,路径和通知的状态封装成WatchedEvent对象
    第二步:从watchTable中获取对应的watcher
    第三步:调用process方法回调

  @Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }

    在回调里面会将WatchedEvent反序列化成WatcherEvent便于网络传输,而且还会在请求头里面设置一个标记-1,代表的是通知请求。
    最后调用sendResponse()方法发送该通知。
    这样,一个服务端处理Watcher的过程就走完了,可以看到并没有涉及任何处理Wacther的真正的业务逻辑,因为这块是在客户端执行的。

 

三:客户端回调
       当服务端触发了watcher后,服务端使用ServerCnxn对应的TCP连接像客户端发送了一个WatcherEvent事件,当客户端收到后会进行下面的处理:
    前面我们已经讲的,客户端的SendThread线程用来接收事件的通知

class SendThread extends Thread {
        private long lastPingSentNs;
        private final ClientCnxnSocket clientCnxnSocket;
        private Random r = new Random(System.nanoTime());       
        private boolean isFirstConnect = true;
        //用来接收服务端的事件
        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket              
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                   
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
                            Watcher.Event.KeeperState.AuthFailed, null) );                                       
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                //反序列化
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );
                return;
            }
         }
    }

     对应请求头标记为-1的事件,首先要将字节流转为WatcherEvent对象,然后还原成WatchedEvent,最后将WatchedEvent传递给EventThread线程进行处理:

  public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }
     }

   EventThread线程首先会从ZKWatcherManager中取出相应的Watcher

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();
            switch (type) {
            case None:
             //......................
                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }
                return result;
            case NodeDataChanged:
            //..............
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            }

            return result;
        }
    }

       从客户端回调的方法中可以看到,客户端判断出事件类型后,会从相应的存储Watcher的Map中remove掉,也就是Watcher监听是一次性的。
    当获取了Watcher后,将其放入waitingEvents队列里,这是一个基于链表的有界队列。当队列里面有Watcher的时候,SendThread线程的run()方法则不断从队列里面取数据进行处理。

  @Override
   public void run() {
      try {
         isRunning = true;
         while (true) {
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
               wasKilled = true;
            } else {
               processEvent(event);
            }
            if (wasKilled)
               synchronized (waitingEvents) {
                  if (waitingEvents.isEmpty()) {
                     isRunning = false;
                     break;
                  }
               }
         }
      } catch (InterruptedException e) {
         LOG.error("Event thread exiting due to interruption", e);
      }
       LOG.info("EventThread shut down");
   }

     这个地方是一个串行同步的处理方式,所以不能因为某一个watcher的长时间处理而影响了客户端回调队列里其它的Watcher。 

分享到:
评论

相关推荐

    zookeeper 服务监控和管理

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的接口和性能高效、功能稳定的系统提供给用户。...

    zookeeper3.4.5:原始解析-源码解析

    《Zookeeper 3.4.5:原始解析与源码剖析》 Zookeeper,作为一款分布式协调服务,广泛应用于分布式系统中,为集群提供统一命名服务、配置管理、分布式锁和组服务等核心功能。本解析主要针对Zookeeper 3.4.5版本,...

    ZooKeeper源码

    五、ZooKeeper监控与故障恢复 ZooKeeper提供了监控接口,允许用户监控服务器状态、连接状态等。当节点出现故障时,可以通过监控信息快速定位问题并进行恢复。 六、ZooKeeper的应用场景 1. 分布式锁:利用临时节点...

    dubbo源码解析2

    ### Dubbo源码解析2 #### 一、源码阅读路径 在开始深入解析Dubbo源码之前,首先需要明确的是,Dubbo虽然代码量不算庞大,但是它涉及的技术领域非常广泛,对于初学者来说,可能需要具备一定的前置知识才能更好地...

    dubbo+zookeeper案例,dubbo和Zookeeper详解,Java源码.zip

    《Dubbo与Zookeeper深度解析:Java源码实践》 在现代分布式系统中,服务治理是不可或缺的一部分。Dubbo和Zookeeper作为两个重要的组件,分别承担了服务治理和注册中心的角色,它们共同构建了高效、稳定的微服务体系...

    【Zookeeper管理工具】

    通过阅读源码,我们可以了解到如何与Zookeeper服务器通信,如何解析和展示数据,以及如何实现图形化操作的逻辑。这对于理解和开发类似的分布式协调服务管理工具具有很大的价值。 总结来说,Zookeeper管理工具是管理...

    Zookeeper_安装和配置

    - **集群管理**:Zookeeper 可以监控集群中节点的状态,当节点发生变化时,可以通过事件通知其他节点,实现集群的动态扩展和故障恢复。 ### Zookeeper 集群模式 在实际生产环境中,Zookeeper 通常运行在真正的集群...

    zookeeper限制ip版

    本篇文章将深入探讨如何在Zookeeper 3.4.14版本中添加IP黑白名单功能,并详细解析源码改造过程。 1. **IP限制需求分析** 在默认情况下,Zookeeper允许任何IP地址进行连接。为了提高系统的安全性,我们需要实现对...

    笔记_zookeeper_源码.zip

    《深入解析Zookeeper源码》 Zookeeper是一个分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase、Kafka等。它提供了一种可靠的方式来管理分布式环境中的配置信息、命名服务、集群状态同步以及分布式锁等功能...

    zookeeper-3.4.6.tar

    《Zookeeper 3.4.6:分布式协调服务的核心解析》 Zookeeper,作为一个高度可靠的分布式协调服务,是Apache Hadoop项目的重要组成部分。在版本3.4.6中,它继续为分布式应用程序提供了稳定和高效的服务。这个版本的...

    【No0057】HBase源码解析与开发实战.txt

    ### HBase源码解析与开发实战 #### 一、HBase简介 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了...

    zookeeper-3.4.9

    《Zookeeper 3.4.9:分布式协调服务的核心解析》 Zookeeper,这个在分布式系统领域中举足轻重的名字,是Apache软件基金会的一个关键项目,它为分布式应用程序提供了高效且可靠的协调服务。Zookeeper 3.4.9版本是其...

    dubbo源码解析

    《Dubbo源码解析》是一本深度探讨Dubbo框架核心机制的专业资料,旨在帮助开发者深入理解这个高性能、轻量级的Java开源RPC框架。Dubbo是阿里巴巴贡献的著名微服务框架,广泛应用于大型互联网公司的服务治理。随着...

    prettyZoo,zookeeper window客户端

    描述中提到,"快速查看zookeeper内容" 指的是prettyZoo的主要功能之一,即能迅速地浏览Zookeeper服务器上的节点信息,包括节点的数据、子节点等,这对于监控和调试Zookeeper服务非常有用。"git 地址 ...

    zookeeper-3.4.7

    《Zookeeper-3.4.7:分布式协调服务的核心解析》 Zookeeper是Apache软件基金会的一个开源项目,作为分布式协调服务,它在分布式系统中扮演着至关重要的角色。Zookeeper-3.4.7是该项目的一个稳定版本,包含了丰富的...

    dubbo+zookeeper案例,dubbo和Zookeeper详解,Java

    《Dubbo与Zookeeper在分布式环境中的应用解析》 Dubbo和Zookeeper是两个在分布式系统中广泛应用的技术,它们在构建高效、可扩展的服务架构中起着关键作用。本篇文章将深入探讨这两个技术以及如何结合使用,以实现一...

    zookeeper-3.4.14.zip

    《Zookeeper 3.4.14:Dubbo 集成与配置解析》 Apache ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将...

    zookeeper3.5

    《Zookeeper 3.5深度解析》 Zookeeper,作为分布式协调服务的杰出代表,是Apache软件基金会的重要项目之一。本文将围绕Zookeeper 3.5版本进行深入探讨,揭示其核心特性、设计理念以及在实际应用中的关键作用。 1. ...

Global site tag (gtag.js) - Google Analytics