`
bit1129
  • 浏览: 1068081 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

[Zookeeper学习笔记十]Zookeeper源代码分析之ClientCnxn数据序列化和反序列化

 
阅读更多

ClientCnxn是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,1. SendThread 2. EventThread, SendThread负责客户端和服务器端的数据通信,也包括事件信息的传输,EventThread主要在客户端回调注册的Watchers进行通知处理

 

ClientCnxn构造方法

 

 

    /**
     * Creates a connection object. The actual network connect doesn't get
     * established until needed. The start() instance method must be called
     * subsequent to construction.
     *
     * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
     * @param hostProvider
     *                the list of ZooKeeper servers to connect to
     * @param sessionTimeout
     *                the timeout for connections.
     * @param zooKeeper
     *                the zookeeper object that this connection is related to.
     * @param watcher watcher for this connection
     * @param clientCnxnSocket
     *                the socket implementation used (e.g. NIO/Netty)
     * @param sessionId session id if re-establishing session
     * @param sessionPasswd session passwd if re-establishing session
     * @param canBeReadOnly
     *                whether the connection is allowed to go to read-only
     *                mode in case of partitioning
     * @throws IOException
     */
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        //如果zookeeper集群有1000台,那么会话超时时间岂不是要设置的很大?因此,zookeeper一般不会很大,3台或者5台足亦
        connectTimeout = sessionTimeout / hostProvider.size();//链接超时时间是会话超时时间除以Zookeeper集群数
        //读超时是会话超时的2/3
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
   
        //读线程,在ClientCnxnd的start方法中启动
        sendThread = new SendThread(clientCnxnSocket);
        //会话线程,在ClientCnxnd的start方法中启动
        eventThread = new EventThread();

    }
 

 对于SendThread数据传输线程包含两方面的内容,1是基于TCP/IP的Socket的数据传输,2.传输的数据内容。首先关注传输的数据内容,在TCP/IP传输的数据都是字节,因此,在SendThread发送数据之前,需要将要传输的数据结构进行序列化成字节流,服务器端会反序列化成相应的数据结构。当客户端收到服务器返回的字节流时,客户端将其反序列化为相应的数据结构。

 

首先看看数据的序列化和反序列化,接口定义:

package org.apache.jute //该借口不是Zookeeper原生提供的,是Apache的jute提供的
import java.io.IOException;

/**
 * Interface that is implemented by generated classes.
 * 
 */
public interface Record {
    public void serialize(OutputArchive archive, String tag) //序列化,tag在XmlInputArchive序列化器中,用作xml元素标签
        throws IOException;
    public void deserialize(InputArchive archive, String tag)//反序列话
        throws IOException;
}

 OutputArchive接口是数据结构序列化为字节流的字节流写入器,InputArchive接口是字节流反序列化为数据结构的字节流读取器。OutputArchive和InputArchive接口有三个成对使用的实现类

BinaryOutputArchive和BinaryInputArchive底层使用DataOutput和DataInput作为字节容器

 

XmlOutputArchive和XmlInputArchive底层使用PrintStream,XmlInputArchive使用Xml解析的方式得到相应的数据结构

CsvOutputArchive和CsvInputArchive底层使用PrintStream作为自己容器

 

Zookeeper客户端向服务器端发送请求,包含请求头和请求正文两部分,每个请求的请求头的类型都是一样的,而请求正文根据请求的不同,分为多种类型。

Zookeeper服务器端向客户端返回响应数据,包含响应头和响应正文两部分,每个响应的响应头的类型都是一样的,而响应正文根据请求的不同,分为多种类型。

 

请求头,各种请求正文,响应头和响应正文因为要在Socket上进行数据传输,所以它们应该都是可序列化和反序列话的,因此它们都是可序列化的

 

 

请求头:

public class RequestHeader implements Record {
  private int xid; //请求的事务id,具体的含义和功能接下来分析
  private int type; //请求类型?
  public RequestHeader() {
  }
  public RequestHeader(
        int xid,
        int type) {
    this.xid=xid;
    this.type=type;
  }
  public int getXid() {
    return xid;
  }
  public void setXid(int m_) {
    xid=m_;
  }
  public int getType() {
    return type;
  }
  public void setType(int m_) {
    type=m_;
  }
  //序列化操作,将xid和type序列化到OutputArchive中,
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag); //对于最常使用的BinaryOutputArchive,此方法空实现
    a_.writeInt(xid,"xid");
    a_.writeInt(type,"type");
    a_.endRecord(this,tag);//对于最常使用的BinaryOutputArchive,此方法空实现
  }
  //序列化操作,将xid和type反序列化
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
    xid=a_.readInt("xid");
    type=a_.readInt("type");
    a_.endRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
}
  public String toString() {
    try {
      java.io.ByteArrayOutputStream s =
        new java.io.ByteArrayOutputStream();
      CsvOutputArchive a_ = 
        new CsvOutputArchive(s);
      a_.startRecord(this,"");//对于CsvOutputArchive,startRecord方法
    a_.writeInt(xid,"xid");
    a_.writeInt(type,"type");
      a_.endRecord(this,"");
      return new String(s.toByteArray(), "UTF-8");
    } catch (Throwable ex) {
      ex.printStackTrace();
    }
    return "ERROR";
  }
  public void write(java.io.DataOutput out) throws java.io.IOException {
    BinaryOutputArchive archive = new BinaryOutputArchive(out);
    serialize(archive, "");
  }
  public void readFields(java.io.DataInput in) throws java.io.IOException {
    BinaryInputArchive archive = new BinaryInputArchive(in);
    deserialize(archive, "");
  }
  public int compareTo (Object peer_) throws ClassCastException {
    if (!(peer_ instanceof RequestHeader)) {
      throw new ClassCastException("Comparing different types of records.");
    }
    RequestHeader peer = (RequestHeader) peer_;
    int ret = 0;
    ret = (xid == peer.xid)? 0 :((xid<peer.xid)?-1:1);
    if (ret != 0) return ret;
    ret = (type == peer.type)? 0 :((type<peer.type)?-1:1);
    if (ret != 0) return ret;
     return ret;
  }
  public boolean equals(Object peer_) {
    if (!(peer_ instanceof RequestHeader)) {
      return false;
    }
    if (peer_ == this) {
      return true;
    }
    RequestHeader peer = (RequestHeader) peer_;
    boolean ret = false;
    ret = (xid==peer.xid);
    if (!ret) return ret;
    ret = (type==peer.type);
    if (!ret) return ret;
     return ret;
  }
  public int hashCode() {
    int result = 17;
    int ret;
    ret = (int)xid;
    result = 37*result + ret;
    ret = (int)type;
    result = 37*result + ret;
    return result;
  }
  public static String signature() {
    return "LRequestHeader(ii)";
  }
}

 

请求正文有很多,比如

  • 链接请求ConnectRequest
  • 创建znode请求CreateRequest
  • 节点是否存在请求ExistsRequest
  • 删除znode请求DeleteRequest
  • 获取child znodes请求GetChildrenRequest
  • 设置znode数据SetDataRequest
  • 事件WatcherEvent

以CreateRequest为例进行分析

 

public class CreateRequest implements Record {
  private String path; //创建znode节点的path
  private byte[] data; //创建znode节点时的节点数据
  private java.util.List<org.apache.zookeeper.data.ACL> acl; //创建znode节点时的ACL
  private int flags;//这个参数干啥的?
  public CreateRequest() {
  }
  public CreateRequest(
        String path,
        byte[] data,
        java.util.List<org.apache.zookeeper.data.ACL> acl,
        int flags) {
    this.path=path;
    this.data=data;
    this.acl=acl;
    this.flags=flags;
  }
  public String getPath() {
    return path;
  }
  public void setPath(String m_) {
    path=m_;
  }
  public byte[] getData() {
    return data;
  }
  public void setData(byte[] m_) {
    data=m_;
  }
  public java.util.List<org.apache.zookeeper.data.ACL> getAcl() {
    return acl;
  }
  public void setAcl(java.util.List<org.apache.zookeeper.data.ACL> m_) {
    acl=m_;
  }
  public int getFlags() {
    return flags;
  }
  public void setFlags(int m_) {
    flags=m_;
  }
  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(this,tag);
    a_.writeString(path,"path");//写入path
    a_.writeBuffer(data,"data");//写入data字节数组
    {
      a_.startVector(acl,"acl");//写入acl,acl是List类型
      if (acl!= null) {        
          int len1 = acl.size();
          for(int vidx1 = 0; vidx1<len1; vidx1++) {
            org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
            a_.writeRecord(e1,"e1");//ACL也是一个Record
          }
      }
      a_.endVector(acl,"acl");
    }
    a_.writeInt(flags,"flags");//写入flags
    a_.endRecord(this,tag);
  }
  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
    a_.startRecord(tag);
    path=a_.readString("path");
    data=a_.readBuffer("data");
    {
      Index vidx1 = a_.startVector("acl");
      if (vidx1!= null) {          acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();
          for (; !vidx1.done(); vidx1.incr()) {
    org.apache.zookeeper.data.ACL e1;
    e1= new org.apache.zookeeper.data.ACL();
    a_.readRecord(e1,"e1");
            acl.add(e1);
          }
      }
    a_.endVector("acl");
    }
    flags=a_.readInt("flags");
    a_.endRecord(tag);
}
  public String toString() {
    try {
      java.io.ByteArrayOutputStream s =
        new java.io.ByteArrayOutputStream();
      CsvOutputArchive a_ = 
        new CsvOutputArchive(s);
      a_.startRecord(this,"");
    a_.writeString(path,"path");
    a_.writeBuffer(data,"data");
    {
      a_.startVector(acl,"acl");
      if (acl!= null) {          int len1 = acl.size();
          for(int vidx1 = 0; vidx1<len1; vidx1++) {
            org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
    a_.writeRecord(e1,"e1");
          }
      }
      a_.endVector(acl,"acl");
    }
    a_.writeInt(flags,"flags");
      a_.endRecord(this,"");
      return new String(s.toByteArray(), "UTF-8");
    } catch (Throwable ex) {
      ex.printStackTrace();
    }
    return "ERROR";
  }
  public void write(java.io.DataOutput out) throws java.io.IOException {
    BinaryOutputArchive archive = new BinaryOutputArchive(out);
    serialize(archive, "");
  }
  public void readFields(java.io.DataInput in) throws java.io.IOException {
    BinaryInputArchive archive = new BinaryInputArchive(in);
    deserialize(archive, "");
  }
  public int compareTo (Object peer_) throws ClassCastException {
    throw new UnsupportedOperationException("comparing CreateRequest is unimplemented");
  }
  public boolean equals(Object peer_) {
    if (!(peer_ instanceof CreateRequest)) {
      return false;
    }
    if (peer_ == this) {
      return true;
    }
    CreateRequest peer = (CreateRequest) peer_;
    boolean ret = false;
    ret = path.equals(peer.path);
    if (!ret) return ret;
    ret = org.apache.jute.Utils.bufEquals(data,peer.data);
    if (!ret) return ret;
    ret = acl.equals(peer.acl);
    if (!ret) return ret;
    ret = (flags==peer.flags);
    if (!ret) return ret;
     return ret;
  }
  public int hashCode() {
    int result = 17;
    int ret;
    ret = path.hashCode();
    result = 37*result + ret;
    ret = java.util.Arrays.toString(data).hashCode();
    result = 37*result + ret;
    ret = acl.hashCode();
    result = 37*result + ret;
    ret = (int)flags;
    result = 37*result + ret;
    return result;
  }
  public static String signature() {
    return "LCreateRequest(sB[LACL(iLId(ss))]i)";
  }
}
 

 

 ConnectRequest的请求数据:

  private int protocolVersion;
  private long lastZxidSeen; //客户端保存的Zxid最近时间,zxid有什么用呢?
  private int timeOut;//会话超时时间
  private long sessionId;
  private byte[] passwd;

 

 

 

 ClientCnxn的内部类Packet类封装了请求头,响应头,请求正文和响应征正文

   static class Packet {
        RequestHeader requestHeader;//请求头

        ReplyHeader replyHeader; //响应头

        Record request;//请求正文

        Record response; //响应正文

        ByteBuffer bb;//上面四部分序列化的字节流

        /** Client's view of the path (may differ due to chroot) **/
        String clientPath;
        /** Servers's view of the path (may differ due to chroot) **/
        String serverPath;

        boolean finished;

        AsyncCallback cb;//异步请求的响应Callback

        Object ctx;

        WatchRegistration watchRegistration;

        public boolean readOnly;

        /** Convenience ctor */
        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration) {
            this(requestHeader, replyHeader, request, response,
                 watchRegistration, false);
        }

        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration, boolean readOnly) {

            this.requestHeader = requestHeader;
            this.replyHeader = replyHeader;
            this.request = request;
            this.response = response;
            this.readOnly = readOnly;
            this.watchRegistration = watchRegistration;
        }

        public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);//序列化字节流容器
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());//将字节流容器中的字节流复制给bb
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();

            sb.append("clientPath:" + clientPath);
            sb.append(" serverPath:" + serverPath);
            sb.append(" finished:" + finished);

            sb.append(" header:: " + requestHeader);
            sb.append(" replyHeader:: " + replyHeader);
            sb.append(" request:: " + request);
            sb.append(" response:: " + response);

            // jute toString is horrible, remove unnecessary newlines
            return sb.toString().replaceAll("\r*\n+", " ");
        }
    }

    ClientCnxn类包含两个队列(LinkedList),队列中的元素都是Packet类型,pengdingQueue表示请求已经发送,等待响应结果;outgoingQueue表示等待发送请求的请求序列

 

  

/**
     * These are the packets that have been sent and are waiting for a response.
     */
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    /**
     * These are the packets that need to be sent.
     */
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

 

 

 ClientCnxn的Socket的数据传输,将另外一篇进行单独分析

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    zookeeper示例代码。

    4. **序列化**:在ZooKeeper中,存储的数据需要进行序列化。可以使用Java内置的序列化,或者自定义序列化方式,例如JSON、protobuf等,将对象转换为字节流以便在网络间传输和存储。 5. **ACL权限**:ZooKeeper提供...

    Hadoop源代码分析完整版

    这种自定义的序列化机制优化了数据在网络传输过程中的效率,减少了序列化和反序列化带来的开销,对于提高MapReduce任务的执行速度具有重要意义。 通过以上分析,我们可以看出Hadoop的设计理念和架构复杂性,同时也...

    Hadoop源代码分析(完整版)

    ### Hadoop源代码分析知识点详解 #### 一、Hadoop背景与关键技术介绍 Hadoop作为一款开源的大数据处理框架,其设计灵感源自Google的一系列核心论文。这些论文详细阐述了Google构建其基础设施的方法论和技术原理,...

    Hadoop源代码分析(完整版).pdf

    ### Hadoop源代码分析知识点概览 #### 一、Hadoop背景与关键技术 - **Google核心技术**:Google凭借其先进的计算平台在业界确立了领先地位,其中主要包括以下几项关键技术: - **Google Cluster**:提供了关于...

    zookeeper源码分析

    第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...

    Zookeeper-release-3.5.4 源码,部分有注释

    源码中包含了这些功能的实现,如`org.apache.zookeeper.server`包下的`PackedDataInputStream`和`PackedDataOutputStream`用于数据序列化和反序列化,`org.apache.zookeeper.server.quorum`包下的类则涉及到了集群中...

    Zookeeper

    为了提高Zookeeper的性能,可以从网络通信优化、数据序列化方式、硬件配置等方面入手。例如,合理设置`tickTime`参数以优化心跳检测,选择高效的序列化库,以及使用SSD硬盘来提升磁盘I/O速度。 通过以上对Zookeeper...

    zookeeper 3.8.4

    - **性能提升**:包括更高效的网络通信库和优化的数据序列化,使得数据读写速度更快。 - **稳定性增强**:对并发控制和错误处理进行了改进,降低了服务中断的风险。 - **安全增强**:支持 SASL 认证和 ACL 策略,...

    zookeeper第四节课原理源码分析资料1

    Zookeeper内部使用了Java的序列化机制,但同时也提供了自定义序列化器的接口,用户可以根据需求定制数据的序列化和反序列化策略。这使得Zookeeper能够适应各种复杂的数据结构和格式。 在运维方面,Zookeeper提供了...

    zookeeper3.5.5.zip

    7. **数据序列化与反序列化**:Zookeeper不处理数据的序列化,因此在存储和读取数据时,开发者需要自己处理这个问题。 在实际应用中,Zookeeper常用于分布式锁、配置管理、集群管理等场景,其C库为非Java应用提供了...

    Apache Log4j反序列化命令执行漏洞

    vulhub靶机里的这个漏洞里缺少了curl或者wget这些命令

    Hadoop 源代码分析 [完整版]

    Hadoop源代码分析涉及多个方面,包括包依赖关系、配置管理、文件系统抽象、分布式文件系统(HDFS)、IPC(进程间通信)、序列化机制、网络功能、安全性和数据描述语言(DDL)。 Hadoop的包依赖关系复杂,HDFS作为一...

    zookeeper-3.4.14.zip

    5. **lib** 目录:存放 ZooKeeper 运行所需的依赖库,这些库支持 ZooKeeper 的网络通信、序列化和其他功能。 6. **build** 目录(可能有):包含构建 ZooKeeper 的输出结果,比如编译后的 JAR 文件。 7. **LICENSE...

    Hadoop源代码分析 高清完整中文版PDF下载

    任何实现了Writable接口的类,都需要定义`write(DataOutput out)`和`readFields(DataInput in)`方法,来实现序列化和反序列化的逻辑。 以上内容为根据提供的文件部分描述的Hadoop源代码分析的知识点。该文件内容...

    Hadoop源代码分析

    ### Hadoop源代码分析 #### 一、Hadoop与Google的核心技术 Hadoop是一个开源的分布式计算框架,其设计初衷是为了模拟Google所采用的核心技术。Google通过一系列文章介绍了自己的技术栈,包括分布式集群管理...

    zookeeper+rpc学习demo

    1. **序列化与反序列化**:RPC通信过程中,对象需要转换成网络传输的字节流,再在接收端还原为对象,这就需要序列化和反序列化机制。 2. **网络通信**:RPC框架通常使用HTTP、TCP或自定义协议进行网络通信,如gRPC...

    Zookeeper实现简单的分布式RPC框架

    为了实现这个过程,我们需要设计一套序列化和反序列化机制,以便将对象转换为网络传输的数据格式,如JSON或protobuf。 在源码层面,我们可以采用Java的反射机制来动态生成代理类,使得客户端可以直接调用远程服务的...

    Hadoop之HDFS源代码分析 pdf

    Avro是一个跨语言的远程过程调用(RPC)和数据序列化系统。ZooKeeper是一个分布式协调服务,用于解决分布式系统中的一致性问题。HBase是一个基于列的分布式数据库,提供快速随机访问和大规模数据分析。HDFS是一个高...

Global site tag (gtag.js) - Google Analytics