`
iwinit
  • 浏览: 455049 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Zookeeper之七分布式CREATE事务处理

阅读更多

前面几篇文章讲了follower和leader之间如何选举和初始化的,这一篇将以之前描述过的CREATE请求作为例子来描述在集群环境下是如何处理事务的。

关于client和zookeeper server的描述前几篇文章已经涉及了。这里不就不再赘述了。假设client和某一个follower建立了连接,并发送了CREATE请求。在follower端,IO线程拿到请求开始执行处理链,Follower处理链如下

初始化代码:

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true);
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

 第一个处理器是FollowerRequestProcessor,处理如下

while (!finished) {
                Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {
                    break;
                }
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
		//先交给CommitProcessor,最终投票通过后,会通过CommitProcessor的commit方法最终提交事务
                nextProcessor.processRequest(request);
                
                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this follower has pending, so we
                // add it to pendingSyncs.
		//只有事务请求才转发给leader,进行投票
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.delete:
                case OpCode.setData:
                case OpCode.setACL:
                case OpCode.createSession:
                case OpCode.closeSession:
                case OpCode.multi:
                    zks.getFollower().request(request);
                    break;
                }

 转发事务请求给leader

   void request(Request request) throws IOException {
	//反序列化
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream oa = new DataOutputStream(baos);
        oa.writeLong(request.sessionId);
        oa.writeInt(request.cxid);
        oa.writeInt(request.type);
        if (request.request != null) {
            request.request.rewind();
            int len = request.request.remaining();
            byte b[] = new byte[len];
            request.request.get(b);
            request.request.rewind();
            oa.write(b);
        }
        oa.close();
        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
                .toByteArray(), request.authInfo);
        writePacket(qp, true);
    }

 在CommitProcessor中主要是等待缓存请求,并等待该请求被commit

 while (!finished) {
                int len = toProcess.size();
		//最终的请求处理交给FinalRequestProcessor
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest(toProcess.get(i));
                }
                toProcess.clear();
                synchronized (this) {
			//如果没有commit请求,则wait,直到commit请求的时候唤醒
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() == 0) {
                        wait();
                        continue;
                    }
                    // First check and see if the commit came in for the pending
                    // request
		    //有commit请求,则添加到最终队列,下一轮处理
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() > 0) {
                        Request r = committedRequests.remove();
                        /*
                         * We match with nextPending so that we can move to the
                         * next request when it is committed. We also want to
                         * use nextPending because it has the cnxn member set
                         * properly.
                         */
			 //如果是自己的请求,则使用之前的Request,以为之前的Request带client的连接信息,可以写回响应
                        if (nextPending != null
                                && nextPending.sessionId == r.sessionId
                                && nextPending.cxid == r.cxid) {
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r.hdr;
                            nextPending.txn = r.txn;
                            nextPending.zxid = r.zxid;
                            toProcess.add(nextPending);
                            nextPending = null;
                        }
			//如果是别人的请求,则使用新的Request,不带连接信息,无法发送响应
			else {
                            // this request came from someone else so just
                            // send the commit packet
                            toProcess.add(r);
                        }
                    }
                }

                // We haven't matched the pending requests, so go back to
                // waiting
		//有pending请求,但是该请求还未commit,则继续
                if (nextPending != null) {
                    continue;
                }
		//从队列中拿待处理请求
                synchronized (this) {
                    // Process the next requests in the queuedRequests
                    while (nextPending == null && queuedRequests.size() > 0) {
                        Request request = queuedRequests.remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            nextPending = request;
                            break;
                        case OpCode.sync:
                            if (matchSyncs) {
                                nextPending = request;
                            } else {
                                toProcess.add(request);
                            }
                            break;
                        default:
                            toProcess.add(request);
                        }
                    }
                }

 在这个场景中,CREATE请求先到了queuedRequests中,然后nextPending会指向这个请求,但是此时还未commit,所以CommitProcessor会wait,直到该请求投票被通过,然后被commit。

此时leader收到了转发的请求,在LearnerHandler中

case Leader.REQUEST:                    
			//反序列化
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if(type == OpCode.sync){
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
		    //提交给执行链处理
                    leader.zk.submitRequest(si);
                    break;

 Leader端的执行链如下

PrepRequestProcessor在之前的文章已经分析过了,主要是根据请求类型,拼装不同的Request,这里是CreateRequest

接下来ProposalRequestProcessor执行,ProposalRequestProcessor主要是发起投票

public void processRequest(Request request) throws RequestProcessorException {

	......                
        
        /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
         * If the sync is coming from a follower, then the follower
         * handler adds it to syncHandler. Otherwise, if it is a client of
         * the leader that issued the sync command, then syncHandler won't 
         * contain the handler. In this case, we add it to syncHandler, and 
         * call processRequest on the next processor.
         */
        
        if(request instanceof LearnerSyncRequest){
            zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {
		//先交给CommitProcessor处理下,此时还未提交
                nextProcessor.processRequest(request);
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
			//发起一个投票
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
		//先写日志
                syncProcessor.processRequest(request);
            }
        }
    }

 leader发起投票

 public Proposal propose(Request request) throws XidRolloverException {
        .......

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        try {
            request.hdr.serialize(boa, "hdr");
            if (request.txn != null) {
                request.txn.serialize(boa, "txn");
            }
            baos.close();
        } catch (IOException e) {
            LOG.warn("This really should be impossible", e);
        }
	//投票包
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
                baos.toByteArray(), null);
        
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        synchronized (this) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Proposing:: " + request);
            }

            lastProposed = p.packet.getZxid();
	    //添加到投票箱,后续leader收到选票时会检查这个投票箱里的投票是否满足条件
            outstandingProposals.put(lastProposed, p);
	    //给每个follower发一个投票包,让他们投票
            sendPacket(pp);
        }
        return p;
    }

 leader发完投票后,通过SyncRequestProcessor将事务写入日志文件,本地写成功后,投票成功。

SyncRequestProcessor之前文章已经分析过了,主要是将事务顺序写入日志文件。主要看之后的AckRequestProcessor

    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if(self != null)
		//本地日志写成功后,认为自己成功了
            leader.processAck(self.getId(), request.zxid, null);
        else
            LOG.error("Null QuorumPeer");
    }

 leader的processAck方法比较关键,之前也有分析,这里再强调下

   synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
       .......
       //当有选票进来时,先看看是哪个投票的
        Proposal p = outstandingProposals.get(zxid);
        if (p == null) {
            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                    Long.toHexString(zxid), followerAddr);
            return;
        }
        //把票投上
        p.ackSet.add(sid);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Count for zxid: 0x{} is {}",
                    Long.toHexString(zxid), p.ackSet.size());
        }
	//如果满足投票结束条件,默认是半数server统一,则提交事务
        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
            if (zxid != lastCommitted+1) {
                LOG.warn("Commiting zxid 0x{} from {} not first!",
                        Long.toHexString(zxid), followerAddr);
                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
            }
            outstandingProposals.remove(zxid);
            if (p.request != null) {
		//先添加到带提交队列
                toBeApplied.add(p);
            }
            // We don't commit the new leader proposal
            if ((zxid & 0xffffffffL) != 0) {
                if (p.request == null) {
                    LOG.warn("Going to commmit null request for proposal: {}", p);
                }
		//事务提交,通知follower提交事务
                commit(zxid);
		//通知Observer
                inform(p);
		//leader commit事务
		zk.commitProcessor.commit(p.request);
            ......
        }
    }

 通知follower提交事务

    public void commit(long zxid) {
        synchronized(this){
            lastCommitted = zxid;
        }
	//发送COMMIT包
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp);
    }

 此时Follower收到proposal包,follower中处理投票

case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
	    //记录事务日志,成功后发送ACK包
            fzk.logRequest(hdr, txn);
            break;

 

    public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
                hdr.getType(), null, null);
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
	//还是通过SyncRequestProcessor将事务写入本地文件,再发送ack包
        syncProcessor.processRequest(request);
    }

 日志写成功后,SendAckRequestProcessor发送ACK包

    public void processRequest(Request si) {
        if(si.type != OpCode.sync){
		//ACK包
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                null);
            try {
		//发送
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn("Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner.sock.isClosed()) {
                        learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
                }
            }
        }
    }

 此时,leader收到ack包,LearnerHandler线程中

   case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Received ACK from Observer  " + this.sid);
                        }
                    }
                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;

 还是调用了processAck方法,由于之前已经有了leader自己的投票,此时follower再投一票,3台机器的集群即认为投票成功,leader开始发送commit操作,也就是发送commit包给follower。

follower收到commit包

case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;

    public void commit(long zxid) {
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid)
                    + " without seeing txn");
            return;
        }
        long firstElementZxid = pendingTxns.element().zxid;
        if (firstElementZxid != zxid) {
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x"
                    + Long.toHexString(firstElementZxid));
            System.exit(12);
        }
	//从Pending队列中拿到待commit请求
        Request request = pendingTxns.remove();
	//commit这个请求,这个请求将交给FinalRequestProcessor处理
        commitProcessor.commit(request);
    }

 Commit之后请求将交给FinalRequestProcessor处理,修改最后的内存db结构,如果是本机请求则写回响应,如果不是则不用写回响应

  • 大小: 29.2 KB
  • 大小: 39.6 KB
分享到:
评论
1 楼 小黄牛 2016-11-16  
分布式事务解决方案演示效果:http://www.iqiyi.com/w_19rsveqlhh.html

相关推荐

    深入解析ZooKeeper分布式环境搭建+编程知识+技术开发

    zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析...

    Apache ZooKeeper分布式环境搭建教程

    zookeeper之分布式环境搭建:Apache ZooKeeper分布式环境搭建教程; zookeeper之分布式环境搭建:Apache ZooKeeper分布式环境搭建教程; zookeeper之分布式环境搭建:Apache ZooKeeper分布式环境搭建教程; ...

    深入浅出Zookeeper

    ### 深入浅出 Zookeeper #### 一、引言与基础知识 ##### 自序 在深入了解Zookeeper之前,我们不妨先从一位实践者的视角出发。最初接触到Zookeeper时,很多人可能会感到困惑,尤其是当其与Kafka这样的分布式消息...

    ZooKeeper-分布式过程协同技术详解 和从Paxos到Zookeeper

    《ZooKeeper:分布式过程协同技术详解》与《从Paxos到Zookeeper:分布式一致性原理与实践》这两本书深入探讨了分布式系统中的关键组件ZooKeeper及其背后的一致性算法Paxos。ZooKeeper是由Apache软件基金会开发的一个...

    springboot+dubbo+zookeeper构建的分布式调用服务框架

    【SpringBoot + Dubbo + ZooKeeper 构建的分布式调用服务框架】 SpringBoot 是一个由 Pivotal 团队创建的 Java 框架,它简化了在 Spring 框架上创建独立的、生产级别的基于 Java 的应用程序。SpringBoot 通过提供...

    从Paxos到Zookeeper分布式一致性原理与实践 + ZooKeeper-分布式过程协同技术详解 pdf

    《从Paxos到Zookeeper分布式一致性原理与实践》与《ZooKeeper-分布式过程协同技术详解》这两本书深入探讨了分布式系统中的一个重要概念——一致性,以及如何通过ZooKeeper这一工具来实现高效的分布式协同。...

    zookeeper伪分布式搭建(1)1

    Zookeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的接口和性能高效、功能稳定的系统提供给用户。...

    zookeeper+dubbo分布式demo可直接运行

    【标题】"zookeeper+dubbo分布式demo可直接运行"揭示了这个压缩包文件是一个实际运行的示例,它结合了Zookeeper和Dubbo两大分布式技术,用于演示如何在分布式环境中搭建和运行应用。 【描述】提到"zookeeper+dubbo...

    zookeeper开源的分布式协调服务之分布式环境搭建

    ### ZooKeeper 分布式协调服务的关键特性与应用场景 #### 一、一致性 ZooKeeper 提供了一种强一致性的数据模型。在这个模型中,所有的更新操作都是原子性的,这意味着一旦某个客户端发起了一次更新请求,那么这个...

    使用ZooKeeper实现分布式锁

    这里,我们将深入探讨如何利用ZooKeeper这一强大的分布式协调服务来实现分布式锁,以解决订单编号的唯一性问题。 ZooKeeper是由Apache Hadoop项目孵化的开源项目,它提供了一个高可用、高性能的分布式协调服务。其...

    基于zookeeper的分布式锁简单实现

    - **测试代码**:展示了如何在实际应用中使用Zookeeper实现分布式锁的示例,包括创建锁、获取锁、释放锁以及异常处理等操作。 - **实用工具类**:封装了与Zookeeper交互的常用方法,如创建节点、设置监听、检查节点...

    基于zookeeper实现的分布式读写锁

    本文将深入探讨基于Zookeeper实现的分布式读写锁,并利用Zkclient客户端进行操作。Zookeeper是一个分布式服务协调框架,它提供了一种简单且高效的方式来实现分布式锁。 **一、Zookeeper简介** Zookeeper是由Apache...

    Zookeeper伪分布式集群环境搭建过程

    ZooKeeper是一个分布式的、开源的应用程序协调服务,被广泛应用于多种分布式场景之中,例如配置维护、域名服务、分布式同步、组服务等。它能够提供一个简单统一的服务接口,让分布式应用能够更加便捷地实现关键功能...

    zookeeper分布式锁实例源码

    在分布式系统中,ZooKeeper 是一个至关重要的组件,它为分布式协调提供了强大的服务,包括配置管理、命名服务、分布式同步、组服务等。在这个场景下,我们将关注ZooKeeper如何实现分布式锁,特别是不可重入锁、可重...

    ZooKeeper-分布式过程协同技术详解(高清).zip

    《ZooKeeper:分布式过程协同技术详解》是一本深入探讨Zookeeper这一分布式协调服务的书籍。Zookeeper在当今的分布式系统中扮演着至关重要的角色,它提供了一种可靠的、高可用的服务发现、配置管理以及分布式锁等...

    分布式服务框架 Zookeeper — 管理分布式环境中的数据.doc

    【分布式服务框架 Zookeeper — 管理...3. **深入 ZooKeeper 原理**:通过源码分析,开发分布式应用。 通过逐步学习,开发者可以掌握 Zookeeper 的精髓,将其应用于实际的分布式系统中,提升系统的稳定性和可用性。

    zookeeper做分布式锁

    本文将深入探讨如何利用ZooKeeper来构建分布式锁,并讨论其背后的关键概念和技术细节。 **1. ZooKeeper概述** ZooKeeper是一个高可用、高性能的分布式协调服务,它提供了诸如命名服务、配置管理、分布式同步、组...

    陶隽-基于Apache Zookeeper的分布式协调原理及应用

    Apache ZooKeeper是一款开源的分布式协调服务,其设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,提供给分布式应用简单的接口。本文将介绍ZooKeeper的原理和应用。 首先,我们需要了解分布式协调问题...

    fat:基于springboot,zookeeper,redis分布式事务强一致性方案

    脂肪FAT,基于springboot,使用zookeeper,redis,spring异步,spring transactionManager的强一致性分布式事务解决方案框架介绍纯编码方式,强一致性。使用redis / zookeeper作为注册中心,代理事务的执行,使用...

    springBoot+dubbo+zookeeper分布式微服务

    在IT行业中,分布式微服务架构已经成为了现代企业级应用开发的重要模式。本项目"springBoot+dubbo+zookeeper...开发者可以通过研究这些文件,学习如何将这三个组件整合在一起,构建出高效运行的分布式微服务架构。

Global site tag (gtag.js) - Google Analytics