`
iwinit
  • 浏览: 454837 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Zookeeper之二Session建立

阅读更多

上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。

官网Datamonitor的代码:

Executor

 

public class Executor implements Watcher, Runnable,
		DataMonitor.DataMonitorListener {
	String znode;

	DataMonitor dm;

	ZooKeeper zk;

	String filename;

	String exec[];

	Process child;

        //Executor是一个watcher,不过其处理都代理给DataMonitor了
	public Executor(String hostPort, String znode, String filename,
			String exec[]) throws KeeperException, IOException {
		this.filename = filename;
		this.exec = exec;
                //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
		zk = new ZooKeeper(hostPort, 3000, this);
                //datamonitor是真实的处理类
		dm = new DataMonitor(zk, znode, null, this);
	}

 

 DataMonitor

 

public class DataMonitor implements Watcher, StatCallback {

.......

	public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
			DataMonitorListener listener) {
......
		// Get things started by checking if the node exists. We are going
		// to be completely event driven,异步exist,注册watcher,设置回调
		zk.exists(znode, true, this, null);
	}

......
	//处理watcher通知事件
	public void process(WatchedEvent event) {
		String path = event.getPath();
		//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
		} else {
			if (path != null && path.equals(znode)) {
				// Something has changed on the node, let's find out
				zk.exists(znode, true, this, null);
			}
		}
		if (chainedWatcher != null) {
			chainedWatcher.process(event);
		}
	}
	//处理exist操作的回掉结果
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		boolean exists;
		switch (rc) {
		case Code.Ok:
			exists = true;
			break;
		case Code.NoNode:
			exists = false;
			break;
		case Code.SessionExpired:
		case Code.NoAuth:
			dead = true;
			listener.closing(rc);
			return;
		default:
			// Retry errors
			zk.exists(znode, true, this, null);
			return;
		}
		//如果节点存在,则同步获取节点数据
		byte b[] = null;
		if (exists) {
			try {
				b = zk.getData(znode, false, null);
			} catch (KeeperException e) {
				// We don't need to worry about recovering now. The watch
				// callbacks will kick off any exception handling
				e.printStackTrace();
			} catch (InterruptedException e) {
				return;
			}
		}
		//如果数据有变化,则处理之
		if ((b == null && b != prevData)
				|| (b != null && !Arrays.equals(prevData, b))) {
			listener.exists(b);
			prevData = b;
		}
	}
}

  从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。

Zookeeper构造

 

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
	//设置默认watcher
        watchManager.defaultWatcher = watcher;
	
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
	//从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
	//创建客户端连接,初始化SendThread和EventThread
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
	//启动SendThread和EventThread
        cnxn.start();
    }

 初始化连接

 

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
	//客户端sessionId
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
	//客户端设置的超时时间
        this.sessionTimeout = sessionTimeout;
	//主机列表
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
	//连接超时
        connectTimeout = sessionTimeout / hostProvider.size();
	//读超时
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
	//初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

 SendThread核心流程

 

public void run() {
            .....
            while (state.isAlive()) {
                try {
			//如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server
                    if (!clientCnxnSocket.isConnected()) {
                        ......
			//异步连接
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
			//如果状态为连接上,则真的是连上server了
                    if (state.isConnected()) {
                        ......
			//下一次select超时时间
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
			//如果没连上,则递减连接超时
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    //session超时,包括连接超时
                    if (to <= 0) {
                        throw new SessionTimeoutException(
                                "Client session timed out, have not heard from server in "
                                        + clientCnxnSocket.getIdleRecv() + "ms"
                                        + " for sessionid 0x"
                                        + Long.toHexString(sessionId));
                    }
		    //如果send空闲,则发送心跳包
                    if (state.isConnected()) {
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If we are in read-only mode, seek for read/write server
		    //如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = System.currentTimeMillis();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
				//同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
			//处理IO
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " : " + e.getMessage());
                        }
                        break;
                    } else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else {
				......
                        }
			//清理之前的连接,找下一台server连接
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                }
            }
     ......
        }

 具体过程

 

private void startConnect() throws IOException {
		//状态改为CONNETING
            state = States.CONNECTING;
		//拿目标地址
            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
		......
		//异步连接
            clientCnxnSocket.connect(addr);
        }

 具体connect

 

    void connect(InetSocketAddress addr) throws IOException {
	//创建客户端SocketChannel
        SocketChannel sock = createSock();
        try {
		//注册OP_CONNECT事件,尝试连接
           registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
	//session还未初始化
        initialized = false;

        /*
         * Reset incomingBuffer
         */
	//重置2个读buffer,准备下一次读
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

 registerAndConnect过程:

 

    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
	//尝试连接
        boolean immediateConnect = sock.connect(addr);
	//如果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

 primeConnection代表连上之后的操作,主要是建立session:

 

void primeConnection() throws IOException {
            ......
		//客户端sessionId默认为0
            long sessId = (seenRwServerBefore) ? sessionId : 0;
		//构造连接请求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
                ......
		//组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));
            }
		//确保读写事件都监听
            clientCnxnSocket.enableReadWriteOnly();
            .....
        }

 此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:

 

 void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
	//select
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
		//如果之前连接没有立马连上,则在这里处理OP_CONNECT事件
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } 
	//如果读写就位,则处理之
	else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }

 假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest

 

if (sockKey.isWritable()) {
	    //同步处理
            synchronized(outgoingQueue) {
		//从发送队列中拿请求
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
			//修改上次发送时间
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
		//序列化Packet到ByteBuffer
                    if (p.bb == null) {
			//如果是业务请求,则需要设置事务Id
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
			//序列化
                        p.createBB();
                    }
			//写数据
                    sock.write(p.bb);
			//写完了,太好了,发送成功
                    if (!p.bb.hasRemaining()) {
			//已发送的业务Packet数量
                        sentCount++;
			//发送完了,那从发送队列删掉,方便后续发送请求处理
                        outgoingQueue.removeFirstOccurrence(p);
			//如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
		//请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }

 具体序列化方式,ConnRequest的packet没有协议头

 

 public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
		//写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节
                boa.writeInt(-1, "len"); // We'll fill this in later
		//序列化协议头
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
		//序列化协议体
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
		//生成ByteBuffer
                this.bb = ByteBuffer.wrap(baos.toByteArray());
		//将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头
                this.bb.putInt(this.bb.capacity() - 4);
		//准备给后续读
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。

假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求

else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);

   if (k.isReadable()) {

		//先从Channel读4个字节,代表头
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
		//int读好,继续往下读
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
			//2个一样,就可以继续读下一个请求了
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip();
			//给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来
                        isPayload = readLength(k);
			//clear一下,准备写
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
			//好,读后续数据
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    }
                    else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }

 具体的后续数据流程:

 

/** Read the request payload (everything following the length prefix) */
    private void readPayload() throws IOException, InterruptedException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
		//尝试一次读进来
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }
	//哈哈,一次读完
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
		//server的packet统计
            packetReceived();
		//准备使用这个buffer了
            incomingBuffer.flip();
		//嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了
            if (!initialized) {
                readConnectRequest();
            } 
		//处理请他请求
	    else {
                readRequest();
            }
		//清理现场,为下一个packet读做准备
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

 我们现在发的ConnReq已经被server端接受了,处理如下

 

 private void readConnectRequest() throws IOException, InterruptedException {
        if (zkServer == null) {
            throw new IOException("ZooKeeperServer not running");
        }
	//开始执行ConnectRequest的处理链
        zkServer.processConnectRequest(this, incomingBuffer);
	//处理完了,说明业务连接已经建立好了
        initialized = true;
    }

 具体处理:

 

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
	//ConnectReq的packet是没有header的,所以直接读内容,反序列化
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect");
        ...
        boolean readOnly = false;
        try {
		//是否readOnly
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
          ....
        }
        ...
        //设置客户端请求的session相关参数
        int sessionTimeout = connReq.getTimeOut();
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
	//暂时先不读后续请求了,直到session建立
        cnxn.disableRecv();
	//拿客户端的sessionId
        long sessionId = connReq.getSessionId();
	//重试
        if (sessionId != 0) {
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
		//创建新Session
            createSession(cnxn, passwd, sessionTimeout);
        }
    }

 创建新session如下:

 

    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
	//server端创建session,sessionId自增
        long sessionId = sessionTracker.createSession(timeout);
	//随机密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
	//每个server端连接都有一个唯一的SessionId
        cnxn.setSessionId(sessionId);
	//提交请求给后面的执行链
        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
        return sessionId;
    }

提交过程:

private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }

  Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:

 

public void submitRequest(Request si) {
       ......
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
		//提交给后续的processor执行,一般用异步以提升性能
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
       ......
    }

 第一个processor PrepRequestProcessor执行:

 

public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take();
                ......
                pRequest(request);
            }
      ......
    }

 对于CREATE_SESSION具体处理:

 

//create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
		//在这里,组装了Request的header和txh实现,方便后续processor处理
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;
		......
	request.zxid = zks.getZxid();
	//让后续processor处理,这里一般是异步以提高性能
        nextProcessor.processRequest(request);
 case OpCode.createSession:
		//读session超时值
                request.request.rewind();
                int to = request.request.getInt();
		//组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
                request.txn = new CreateSessionTxn(to);
                request.request.rewind();
                zks.sessionTracker.addSession(request.sessionId, to);
                zks.setOwner(request.sessionId, request.getOwner());
                break;

  从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理

第二个Processor SyncRequestProcessor处理:

 

int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
		//flush队列如果为空,阻塞等待,代表之前的请求都被处理了
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } 
		//如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求
		//如果系统压力大,则可能需要flush满1000个才会继续处理
		else {
                    si = queuedRequests.poll();
			//任务queue空闲,处理积压的待flush请求
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
			//将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢
                    if (zks.getZKDatabase().append(si)) {
			//成功计数器
                        logCount++;
			//如果成功append的request累计数量大于某个值,则执行flush log的操作
			//并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint
			//snapCount默认是100000
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
			    //将内存中的log flush到磁盘
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
			    //启动线程异步将内存中的database和sessions状态写入snapshot文件中
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new Thread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    }
		    //如果是写请求,而且flush队列为空,执行往下执行 
		    else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                        continue;
                    }
		    //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
                    toFlush.add(si);
		    //如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }

 具体的flush处理:

 

private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
	//将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
        zks.getZKDatabase().commit();
	//log flush完后,开始处理flush队列里的Request
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
		//执行后面的processor
            nextProcessor.processRequest(i);
        }
        if (nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

 我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:

 

if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
		//对于事务型请求,处理之
               rc = zks.processTxn(hdr, txn);
            }

 具体处理:

 

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
	//进一步调用database来处理事务
        rc = getZKDatabase().processTxn(hdr, txn);
	//如果是创建session,添加session
        if (opCode == OpCode.createSession) {
            if (txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.addSession(sessionId, cst
                        .getTimeOut());
      ......
        return rc;
    }

 public ProcessTxnResult processTxn(TxnHeader header, Record txn)

    {
	//在这里构造一个Result对象,返回给FinalRequestProcessor
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
	......

 在FinalRequestProcessor拿到database的处理结果,继续处理:

 

case OpCode.createSession: {
                zks.serverStats().updateLatency(request.createTime);

                lastOp = "SESS";
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                        request.createTime, System.currentTimeMillis());
		//在这里写回response
                zks.finishSessionInit(request.cnxn, true);
                return;
            }

 public void finishSessionInit(ServerCnxn cnxn, boolean valid) {

        ......
		//构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码
            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
		//用-1占位
            bos.writeInt(-1, "len");
		//序列化内容
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(
                        this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
		//将之前的-1改成真实的长度
            bb.putInt(bb.remaining() - 4).rewind();
		//通过channel写回
            cnxn.sendBuffer(bb);    

            ......
		//打开selector的读事件
            cnxn.enableRecv();
        ......
    }

 具体写回,通讯层NIOServerCnxn:

 

public void sendBuffer(ByteBuffer bb) {
        try {
            if (bb != ServerCnxnFactory.closeConn) {
                // We check if write interest here because if it is NOT set,
                // nothing is queued, so we can try to send the buffer right
                // away without waking up the selector
		//确保可写
                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
                    try {
			//写回client
                        sock.write(bb);
                    } catch (IOException e) {
                        // we are just doing best effort right now
                    }
                }
                // if there is nothing left to send, we are done
		//一次写完了,太好了
                if (bb.remaining() == 0) {
                    packetSent();
                    return;
                }
            }
		//如果一次没写完,添加到输出队列,后续继续写
            synchronized(this.factory){
                sk.selector().wakeup();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                            + " is valid: " + sk.isValid());
                }
                outgoingBuffers.add(bb);
                if (sk.isValid()) {
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                }
            }
            
        .......
    }

 到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:

 

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
		//先读包的长度,一个int
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
		//如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
		//如果读的是长度
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
			//给incomingBuffer分配包长度的空间
                    readLength();
                } 
		//如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse		
		else if (!initialized) {
			//读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象
                    readConnectResult();
			//继续读后续响应
                    enableRead();
			//如果还有写请求,确保write事件ok
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
			//准备读下一个响应
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
			//session建立完毕
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }

 具体的读取:

 

void readConnectResult() throws IOException {
        .....
	//将incomingBuffer反序列化成CoonectResponse
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
	//server返回的sessionId
        this.sessionId = conRsp.getSessionId();
	//后续处理,初始化client的一些参数,最后触发WatchedEvent
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

 后续处理如下:

 

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            ......
		//初始化client端的session相关参数
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
		//修改CONNECT状态
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
		//触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

 EventThread处理:

 

       public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
		//EventThread同步session状态
            sessionState = event.getState();

            // materialize the watchers based on the event
		//找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
		//提交异步队列处理
            waitingEvents.add(pair);
        }

 EventThread主线程

 

public void run() {
           try {
              isRunning = true;
              while (true) {
		//拿事件
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
			//处理
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

 具体处理:

 

if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } 

 在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干

 

case SyncConnected:
				// In this particular example we don't need to do anything
				// here - watches are automatically re-registered with
				// server and any watches triggered while the client was
				// disconnected will be delivered (in order of course)
				break;
 

好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。

Session建立核心流程:

1.创建TCP连接

2.client发送ConnectRequest包

3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client

4.client收到server的响应,触发相应SyncConnected状态的事件

5.client端watcher消费事件

分享到:
评论
1 楼 灰色回忆 2016-02-19  
hi,
请教个问题,在你的博文中提到下面一句话。
我不理解怎么让同一个包读两次。
//如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用   if (!incomingBuffer.hasRemaining()) { 
                incomingBuffer.flip();

相关推荐

    深入浅出Zookeeper

    ### 深入浅出 Zookeeper #### 一、引言与基础知识 ##### 自序 在深入了解Zookeeper之前,我们不妨先从一位实践者的视角出发。最初接触到Zookeeper时,很多人可能会感到困惑,尤其是当其与Kafka这样的分布式消息...

    深入浅出Zookeeper核心原理 1

    深入浅出Zookeeper核心原理_1

    深入浅出Zookeeper核心原理 2

    深入浅出Zookeeper核心原理_2

    深入浅出Zookeeper核心原理 3

    深入浅出Zookeeper核心原理_3

    深入浅出Zookeeper核心原理 4

    深入浅出Zookeeper核心原理_4

    深入浅出Zookeeper核心原理 5

    深入浅出Zookeeper核心原理_5

    深入浅出Zookeeper核心原理 6

    深入浅出Zookeeper核心原理_6

    深入浅出Zookeeper核心原理 7

    深入浅出Zookeeper核心原理_7

    深入浅出Zookeeper核心原理 8

    深入浅出Zookeeper核心原理_8

    深入浅出Zookeeper核心原理 9

    深入浅出Zookeeper核心原理_9

    基于ZooKeeper的分布式Session实现

    标题 "基于ZooKeeper的分布式Session实现" 涉及的是在分布式系统中如何利用Apache ZooKeeper来管理和共享Session信息。ZooKeeper是一款开源的分布式协调服务,它为分布式应用程序提供了一个简单一致的接口,用于处理...

    zookeeper实现分布式session sample

    Zookeeper,作为Apache的一个开源项目,是一个高可用、高性能的分布式协调服务,它为解决分布式环境下的session共享提供了有效的解决方案。本示例将探讨如何利用Zookeeper实现分布式session。 1. **Zookeeper的基本...

    zookeeper分布session式实现

    ### 基于ZooKeeper的分布式Session实现详解 #### 1. 认识ZooKeeper ZooKeeper,形象地被称为“动物园管理员”,在分布式系统中扮演着至关重要的角色。随着企业级应用系统的不断扩展,传统的单体架构难以应对日益...

    ZooKeeper深入浅出.pdf

    ZooKeeper应用程序的构建需要深入了解ZooKeeper提供的API和数据模型。由于ZooKeeper的高可用性,它被设计为一种可以被开发人员在其分布式应用中使用的协调服务,方便实现如配置管理、分布式同步等功能。 配置服务是...

    Hadoop深入浅出之Zookeeper介绍.pptx

    Zookeeper 是一个分布式协调服务,源自 Google 的 Chubby 实现,是 Hadoop 生态系统中的重要组成部分。它的设计目标是解决分布式环境中的协调问题,如同步服务、配置维护和命名服务。Zookeeper 提供了一组简单的原语...

    深入浅出Zookeeper(二)基于Zookeeper的分布式锁与领导选举

    本文来自于技术世界,本文结合实例演示了使用Zookeeper实现分布式锁与领导选举的原理与具体实现方法。如上文《Zookeeper架构及FastLeaderElection机制》所述,Zookeeper提供了一个类似于Linux文件系统的树形结构。该...

    基于ZooKeeper的分布式Session实现_已发布.docx

    【基于ZooKeeper的分布式Session实现】 ZooKeeper是一个分布式协调服务,源于Apache Hadoop...通过合理利用ZooKeeper的特性,我们可以构建出具有高可用性和扩展性的分布式Session系统,满足大规模、高并发的应用需求。

    zookeeper深入浅出

    Zookeeper是Hadoop分布式调度服务,用来构建分布式应用系统。构建一个分布式应用是一个很复杂的事情,主要的原因是我们需要合理有效的处理分布式集群中的部分失败的问题。例如,集群中的节点在相互通信时,A节点向B...

    深入浅出Zookeeper(一)Zookeeper架构及FastLeaderElection机制

    本文来自于技术世界,本文介绍了Zookeeper的架构,并组合实例分析了原子广播(ZAB)协议的原理,希望对您的学习有所帮助。Zookeeper是一个分布式协调服务,可用于服务发现,分布式锁,分布式领导选举,配置管理等。这...

Global site tag (gtag.js) - Google Analytics