前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
dataMonitor开始exist操作
- public void exists(final String path, Watcher watcher,
- StatCallback cb, Object ctx)
- {
- ......
- //exist请求头
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.exists);
- //请求体
- ExistsRequest request = new ExistsRequest();
- request.setPath(serverPath);
- request.setWatch(watcher != null);
- SetDataResponse response = new SetDataResponse();
- //添加到发送队列
- cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
- clientPath, serverPath, ctx, wcb);
- }
添加过程
- Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
- Record response, AsyncCallback cb, String clientPath,
- String serverPath, Object ctx, WatchRegistration watchRegistration)
- {
- Packet packet = null;
- // Note that we do not generate the Xid for the packet yet. It is
- // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
- // where the packet is actually sent.
- synchronized (outgoingQueue) {
- //转换成Packet
- packet = new Packet(h, r, request, response, watchRegistration);
- packet.cb = cb;
- packet.ctx = ctx;
- packet.clientPath = clientPath;
- packet.serverPath = serverPath;
- if (!state.isAlive() || closing) {
- conLossPacket(packet);
- } else {
- // If the client is asking to close the session then
- // mark as closing
- if (h.getType() == OpCode.closeSession) {
- closing = true;
- }
- //添加到发送队列
- outgoingQueue.add(packet);
- }
- }
- //唤醒下Selector,快点处理
- sendThread.getClientCnxnSocket().wakeupCnxn();
- return packet;
- }
接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:
- //重要的业务请求,需要设置事务id
- if ((p.requestHeader != null) &&
- (p.requestHeader.getType() != OpCode.ping) &&
- (p.requestHeader.getType() != OpCode.auth)) {
- p.requestHeader.setXid(cnxn.getXid());
- }
接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。
PrepRequestProcessor预处理
- //All the rest don't need to create a Txn - just verify session
- /读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null
- case OpCode.sync:
- case OpCode.exists:
- case OpCode.getData:
- case OpCode.getACL:
- case OpCode.getChildren:
- case OpCode.getChildren2:
- case OpCode.ping:
- case OpCode.setWatches:
- zks.sessionTracker.checkSession(request.sessionId,
- request.getOwner());
- break;
SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下
- //试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中
- zks.getZKDatabase().append(si)
- case OpCode.exists: {
- lastOp = "EXIS";
- // TODO we need to figure out the security requirement for this!
- ExistsRequest existsRequest = new ExistsRequest();
- //反序列化
- ByteBufferInputStream.byteBuffer2Record(request.request,
- existsRequest);
- String path = existsRequest.getPath();
- if (path.indexOf('\0') != -1) {
- throw new KeeperException.BadArgumentsException();
- }
- //拿对应node的状态,并设置是否watch
- Stat stat = zks.getZKDatabase().statNode(path, existsRequest
- .getWatch() ? cnxn : null);
- //结果
- rsp = new ExistsResponse(stat);
- break;
- }
- ......
- //当前处理zxid
- long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
- //构造相应头
- ReplyHeader hdr =
- new ReplyHeader(request.cxid, lastZxid, err.intValue());
- zks.serverStats().updateLatency(request.createTime);
- cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
- request.createTime, System.currentTimeMillis());
- try {
- //写回响应
- cnxn.sendResponse(hdr, rsp, "response");
statNode过程
- //此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可
- public Stat statNode(String path, Watcher watcher)
- throws KeeperException.NoNodeException {
- Stat stat = new Stat();
- DataNode n = nodes.get(path);
- if (watcher != null) {
- //对于exists请求,需要监听data变化事件,添加watcher
- dataWatches.addWatch(path, watcher);
- }
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- n.copyStat(stat);
- return stat;
- }
- }
好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理
- if (sockKey.isReadable()) {
- int rc = sock.read(incomingBuffer);
- ......
- else if (!initialized) {
- readConnectResult();
- enableRead();
- if (findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
- // Since SASL authentication has completed (if client is configured to do so),
- // outgoing packets waiting in the outgoingQueue can now be sent.
- enableWrite();
- }
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- updateLastHeard();
- initialized = true;
- }
- //此时client session已经建立,init完成
- else {
- sendThread.readResponse(incomingBuffer);
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- updateLastHeard();
- }
- }
- }
具体读取:
- void readResponse(ByteBuffer incomingBuffer) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream(
- incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ReplyHeader replyHdr = new ReplyHeader();
- //先读响应头,先对特殊的xid进行处理
- replyHdr.deserialize(bbia, "header");
- ......
- //从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序
- Packet packet;
- synchronized (pendingQueue) {
- if (pendingQueue.size() == 0) {
- throw new IOException("Nothing in the queue, but got "
- + replyHdr.getXid());
- }
- packet = pendingQueue.remove();
- }
- /*
- * Since requests are processed in order, we better get a response
- * to the first request!
- */
- try {
- //检查一下是否一样
- if (packet.requestHeader.getXid() != replyHdr.getXid()) {
- packet.replyHeader.setErr(
- KeeperException.Code.CONNECTIONLOSS.intValue());
- throw new IOException("Xid out of order. Got Xid "
- + replyHdr.getXid() + " with err " +
- + replyHdr.getErr() +
- " expected Xid "
- + packet.requestHeader.getXid()
- + " for a packet with details: "
- + packet );
- }
- packet.replyHeader.setXid(replyHdr.getXid());
- packet.replyHeader.setErr(replyHdr.getErr());
- packet.replyHeader.setZxid(replyHdr.getZxid());
- //更新client端的最新zxid
- if (replyHdr.getZxid() > 0) {
- lastZxid = replyHdr.getZxid();
- }
- //反序列化响应
- if (packet.response != null && replyHdr.getErr() == 0) {
- packet.response.deserialize(bbia, "response");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading reply sessionid:0x"
- + Long.toHexString(sessionId) + ", packet:: " + packet);
- }
- } finally {
- //处理packet,主要是注册watcher,回调,触发事件
- finishPacket(packet);
- }
- }
finishPacket过程
- private void finishPacket(Packet p) {
- //注册watcher,如果是exists请求,则注册到dataWatches中
- if (p.watchRegistration != null) {
- p.watchRegistration.register(p.replyHeader.getErr());
- }
- //如果是同步接口,则唤醒等待的业务线程
- if (p.cb == null) {
- synchronized (p) {
- p.finished = true;
- p.notifyAll();
- }
- }
- //如果是异步请求,则发送异步事件
- else {
- p.finished = true;
- eventThread.queuePacket(p);
- }
- }
EventThread端
- else if (p.response instanceof ExistsResponse
- || p.response instanceof SetDataResponse
- || p.response instanceof SetACLResponse) {
- StatCallback cb = (StatCallback) p.cb;
- //如果成功,增加node stat的回调参数
- if (rc == 0) {
- if (p.response instanceof ExistsResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((ExistsResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetDataResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((SetDataResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetACLResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((SetACLResponse) p.response)
- .getStat());
- }
- }
- //如果响应失败,stat为null
- else {
- cb.processResult(rc, clientPath, p.ctx, null);
- }
- }
在DataMonitor例子中,它本身就是一个StatCallback
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- boolean exists;
- switch (rc) {
- case Code.Ok:
- exists = true;
- break;
- case Code.NoNode:
- exists = false;
- break;
- case Code.SessionExpired:
- case Code.NoAuth:
- dead = true;
- listener.closing(rc);
- return;
- default:
- // Retry errors
- zk.exists(znode, true, this, null);
- return;
- }
Exists过程大致就是上面描述的,主要注意点:
1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
2.同步接口是使用Packet.wait()实现的
3.server端exists操作不是事务型的操作,不会写入log
4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可
http://iwinit.iteye.com/blog/1759908
相关推荐
### 深入浅出 Zookeeper #### 一、引言与基础知识 ##### 自序 在深入了解Zookeeper之前,我们不妨先从一位实践者的视角出发。最初接触到Zookeeper时,很多人可能会感到困惑,尤其是当其与Kafka这样的分布式消息...
深入浅出Zookeeper核心原理_1
深入浅出Zookeeper核心原理_2
深入浅出Zookeeper核心原理_3
深入浅出Zookeeper核心原理_4
深入浅出Zookeeper核心原理_5
深入浅出Zookeeper核心原理_6
深入浅出Zookeeper核心原理_7
深入浅出Zookeeper核心原理_8
深入浅出Zookeeper核心原理_9
它使用了ZooKeeper自己的会话机制,以便处理客户端与服务端之间的连接和超时等状态。 ZooKeeper的数据模型和操作会实现ZooKeeper的协议,确保集群中的节点可以就数据的更新达成一致。这种一致性不是绝对的,而是...
Zookeeper 的使用场景广泛,例如在 Hadoop 中,它确保集群中只有一个 NameNode 来处理元数据,并存储配置信息。在 HBase 中,Zookeeper 用于监控 HMaster 的状态,确保集群的稳定,并管理 HRegionServer 的生命周期...
Zookeeper是Hadoop分布式调度服务,用来构建分布式应用系统。构建一个分布式应用是一个很复杂的...Zookeeper不能让部分失败的问题彻底消失,但是它提供了一些工具能够让你的分布式应用安全合理的处理部分失败的问题。
本文来自于技术世界,本文介绍了Zookeeper的架构,并组合实例分析了原子广播(ZAB)协议的原理,希望对您的学习有所帮助。Zookeeper是一个分布式协调服务,可用于服务发现,分布式锁,分布式领导选举,配置管理等。这...
zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析...
本文来自于技术世界,本文结合实例演示了使用Zookeeper实现分布式锁与领导选举的原理与具体实现方法。如上文《Zookeeper架构及FastLeaderElection机制》所述,Zookeeper提供了一个类似于Linux文件系统的树形结构。该...
在本文中,我们将深入探讨Zookeeper的服务监控与管理,以及如何有效地利用它来提升系统的稳定性和可扩展性。 一、Zookeeper的基本概念 1.1 ZooKeeper数据模型:Zookeeper的数据模型是一种树形结构,类似于文件系统...
### ZooKeeper 未授权访问漏洞处理方法 #### 一、漏洞背景及原理 **ZooKeeper** 是一个分布式的协调服务框架,它提供了一种高效可靠的解决方案来管理和维护分布式环境中不同节点之间的同步与协调问题。然而,由于...
### 深入分析Zookeeper实现原理 #### 初识Zookeeper 在深入了解Zookeeper之前,我们先简要介绍下Zookeeper以及它所处的分布式环境的一些特点。 **分布式环境的特点:** - **分布性:** 系统由多个通过网络连接的...