`
iwinit
  • 浏览: 454813 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Zookeeper之四Create请求和处理

阅读更多

客户端接口

 

public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());

        final String serverPath = prependChroot(clientPath);
	//请求头
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
	//请求体
        CreateRequest request = new CreateRequest();
	//CREATE请求需要server端响应
        CreateResponse response = new CreateResponse();
        request.setData(data);
	//node类型
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
	//同步提交
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        //异常情况
	if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
	//真实路径,对于SEQUENTIAL NODE后面会加上序号
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

 请求提交过程和之前的exists一样,都是通过sendthread写出去,都会进入pengding队列等待server端返回。

 

    server端处理也是一样,变化的只是RequestProcessor的业务逻辑。也就是说transport层是通用的,变化的是上层的业务层。

    server端执行处理链,PrepRequestProcessor

 

switch (request.type) {
                case OpCode.create:
		//反序列化的对象
                CreateRequest createRequest = new CreateRequest();
		//zxid递增
                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                break;
		......
	request.zxid = zks.getZxid();
        nextProcessor.processRequest(request);

 

 具体处理

 

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
	//构造内部的事务头
        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                    zks.getTime(), type);

        switch (type) {
            case OpCode.create:     
		//检查session是否还有效
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                CreateRequest createRequest = (CreateRequest)record;  
		//反序列化
                if(deserialize)
                    ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                String path = createRequest.getPath();
		//路径规则检查
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
                    LOG.info("Invalid path " + path + " with session 0x" +
                            Long.toHexString(request.sessionId));
                    throw new KeeperException.BadArgumentsException(path);
                }
		//权限去重
                List<ACL> listACL = removeDuplicates(createRequest.getAcl());
                if (!fixupACL(request.authInfo, listACL)) {
                    throw new KeeperException.InvalidACLException(path);
                }
                String parentPath = path.substring(0, lastSlash);
		//父节点的ChangeRecord
                ChangeRecord parentRecord = getRecordForPath(parentPath);
		//权限校验
                checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
                        request.authInfo);
                int parentCVersion = parentRecord.stat.getCversion();
                CreateMode createMode =
                    CreateMode.fromFlag(createRequest.getFlags());
		    //SEQUENCE节点修改路径
                if (createMode.isSequential()) {
                    path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                }
                try {
                    PathUtils.validatePath(path);
                } catch(IllegalArgumentException ie) {
                    LOG.info("Invalid path " + path + " with session 0x" +
                            Long.toHexString(request.sessionId));
                    throw new KeeperException.BadArgumentsException(path);
                }
		//当前节点ChangeRecord,一并校验节点是否已经存在
                try {
                    if (getRecordForPath(path) != null) {
                        throw new KeeperException.NodeExistsException(path);
                    }
                } catch (KeeperException.NoNodeException e) {
                    // ignore this one
                }
		//EPHEMERAL节点不允许创建子节点
                boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
                if (ephemeralParent) {
                    throw new KeeperException.NoChildrenForEphemeralsException(path);
                }
		//递增Cversion
                int newCversion = parentRecord.stat.getCversion()+1;
		//构造事务体,后续会被写入log
                request.txn = new CreateTxn(path, createRequest.getData(),
                        listACL,
                        createMode.isEphemeral(), newCversion);
                StatPersisted s = new StatPersisted();
		//如果是临时节点,则owner为sessionId
                if (createMode.isEphemeral()) {
                    s.setEphemeralOwner(request.sessionId);
                }
                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
		//递增parent的child数
                parentRecord.childCount++;
		//递增parent的cversion
                parentRecord.stat.setCversion(newCversion);
		//添加changeRecord到冗余队列
                addChangeRecord(parentRecord);
                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                        0, listACL));
                break;
		......

 SyncRequestProcessor处理,主要是log写入,和之前的分析类似,不赘述

 

FinalRequestProcessor处理,修改内存中的datatree结构

 

 switch (header.getType()) {
                case OpCode.create:
                    CreateTxn createTxn = (CreateTxn) txn;
                    rc.path = createTxn.getPath();
                    createNode(
                            createTxn.getPath(),
                            createTxn.getData(),
                            createTxn.getAcl(),
                            createTxn.getEphemeral() ? header.getClientId() : 0,
                            createTxn.getParentCVersion(),
                            header.getZxid(), header.getTime());
                    break;

 public String createNode(String path, byte data[], List<ACL> acl,

 

            long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
	//各种参数设置
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
	//修改parent节点内容
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            if (children != null) {
                if (children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = convertAcls(acl);
		//添加目标节点
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
		//添加ephemeral类型节点
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
        ......
	//dataWatches和childWatches事件触发
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }

 事件触发过程

 

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
		//拿节点对应的watcher列表,只通知一次
            watchers = watchTable.remove(path);
            .......
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
	//挨个通知
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }

 

 具体处理,NIOServerCncx

 

    synchronized public void process(WatchedEvent event) {
	//事件通知的响应头
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        ......

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
	//发送响应
        sendResponse(h, e, "notification");
    }

 

 具体发送:

 

synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            // Make space for length
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            try {
		//占位
                baos.write(fourBytes);
		//序列化
                bos.writeRecord(h, "header");
                if (r != null) {
                    bos.writeRecord(r, tag);
                }
                baos.close();
            } catch (IOException e) {
                LOG.error("Error serializing response");
            }
		//转换成byte数组
            byte b[] = baos.toByteArray();
            ByteBuffer bb = ByteBuffer.wrap(b);
		//最后写入长度
            bb.putInt(b.length - 4).rewind();
		//通过channel异步写
            sendBuffer(bb);
            if (h.getXid() > 0) {
                synchronized(this){
                    outstandingRequests--;
                }
                // check throttling
                synchronized (this.factory) {        
                    if (zkServer.getInProcess() < outstandingLimit
                            || outstandingRequests < 1) {
                        sk.selector().wakeup();
                        enableRecv();
                    }
                }
            }
         } catch(Exception e) {
            LOG.warn("Unexpected exception. Destruction averted.", e);
         }
    }

 server端就处理完了,接下来client端SendThread收到响应

 

  if (replyHdr.getXid() == -1) {

                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
		//反序列化
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
		//路径切换,如果client用了相对路径的话
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                    	LOG.warn("Got server path " + event.getPath()
                    			+ " which is too short for chroot path "
                    			+ chrootPath);
                    }
                }
		//通过eventThread派发事件,通知之前注册的watcher
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }

                eventThread.queueEvent( we );
                return;
            }

 eventThread端处理和之前类似

 case NodeCreated:
		//通知dataWatcher,只一次
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
		//通知existWatcher,只一次
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
			//挨个通知
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              }
 以上就完成了create操作的处理,之后的delete,set,get操作都是类似的,大家有兴趣的直接阅读相关代码即可:)
分享到:
评论

相关推荐

    深入浅出Zookeeper

    ### 深入浅出 Zookeeper #### 一、引言与基础知识 ##### 自序 在深入了解Zookeeper之前,我们不妨先从一位实践者的视角出发。最初接触到Zookeeper时,很多人可能会感到困惑,尤其是当其与Kafka这样的分布式消息...

    深入浅出Zookeeper核心原理 1

    深入浅出Zookeeper核心原理_1

    深入浅出Zookeeper核心原理 2

    深入浅出Zookeeper核心原理_2

    深入浅出Zookeeper核心原理 3

    深入浅出Zookeeper核心原理_3

    深入浅出Zookeeper核心原理 4

    深入浅出Zookeeper核心原理_4

    深入浅出Zookeeper核心原理 5

    深入浅出Zookeeper核心原理_5

    深入浅出Zookeeper核心原理 6

    深入浅出Zookeeper核心原理_6

    深入浅出Zookeeper核心原理 7

    深入浅出Zookeeper核心原理_7

    深入浅出Zookeeper核心原理 8

    深入浅出Zookeeper核心原理_8

    深入浅出Zookeeper核心原理 9

    深入浅出Zookeeper核心原理_9

    ZooKeeper深入浅出.pdf

    它使用了ZooKeeper自己的会话机制,以便处理客户端与服务端之间的连接和超时等状态。 ZooKeeper的数据模型和操作会实现ZooKeeper的协议,确保集群中的节点可以就数据的更新达成一致。这种一致性不是绝对的,而是...

    Hadoop深入浅出之Zookeeper介绍.pptx

    Zookeeper 的使用场景广泛,例如在 Hadoop 中,它确保集群中只有一个 NameNode 来处理元数据,并存储配置信息。在 HBase 中,Zookeeper 用于监控 HMaster 的状态,确保集群的稳定,并管理 HRegionServer 的生命周期...

    zookeeper深入浅出

    Zookeeper是Hadoop分布式调度服务,用来构建分布式应用系统。构建一个分布式应用是一个很复杂的...Zookeeper不能让部分失败的问题彻底消失,但是它提供了一些工具能够让你的分布式应用安全合理的处理部分失败的问题。

    深入浅出Zookeeper(一)Zookeeper架构及FastLeaderElection机制

    本文来自于技术世界,本文介绍了Zookeeper的架构,并组合实例分析了原子广播(ZAB)协议的原理,希望对您的学习有所帮助。Zookeeper是一个分布式协调服务,可用于服务发现,分布式锁,分布式领导选举,配置管理等。这...

    zookeeper 服务监控和管理

    在本文中,我们将深入探讨Zookeeper的服务监控与管理,以及如何有效地利用它来提升系统的稳定性和可扩展性。 一、Zookeeper的基本概念 1.1 ZooKeeper数据模型:Zookeeper的数据模型是一种树形结构,类似于文件系统...

    深入解析ZooKeeper分布式环境搭建+编程知识+技术开发

    zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析...

    ZooKeeper 未授权访问漏洞处理方法

    ### ZooKeeper 未授权访问漏洞处理方法 #### 一、漏洞背景及原理 **ZooKeeper** 是一个分布式的协调服务框架,它提供了一种高效可靠的解决方案来管理和维护分布式环境中不同节点之间的同步与协调问题。然而,由于...

    zookeeper之节点基本操作(一).doc

    2. **数据版本控制**:每当数据发生改变时,ZooKeeper会记录修改的事务ID(mZxid)和数据版本(dataVersion),这有助于确保操作的一致性和原子性。 3. **版本控制**:在执行`set`或`delete`操作时,可以通过指定...

    深入浅出Zookeeper(二)基于Zookeeper的分布式锁与领导选举

    本文来自于技术世界,本文结合实例演示了使用Zookeeper实现分布式锁与领导选举的原理与具体实现方法。如上文《Zookeeper架构及FastLeaderElection机制》所述,Zookeeper提供了一个类似于Linux文件系统的树形结构。该...

    zookeeper 3.6.3 源码下载

    在深入理解源码之前,我们需要先了解ZooKeeper的基本概念和工作原理。 **ZooKeeper的基本概念** 1. **节点(ZNode)**:ZooKeeper 的数据存储结构类似文件系统,由一系列的节点构成,每个节点称为ZNode。每个ZNode...

Global site tag (gtag.js) - Google Analytics