`
phinecos
  • 浏览: 351744 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

深入剖析SolrCloud(三)

    博客分类:
  • Java
 
阅读更多

 

在上一篇中介绍了SolrCloud的第一个模块---构建管理solr集群状态信息的zookeeper集群。当我们在solr服务器启动时拥有了这样一个Zookeeper集群后,显然我们需要连接到Zookeeper集群的方便手段,在这一篇中我将对Zookeeper客户端相关的各个封装类进行分析。

SolrZkClient类是Solr服务器用来与Zookeeper集群进行通信的接口类,它包含的主要组件有:

 

private ConnectionManager connManager;
private volatile SolrZooKeeper keeper;
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();

 其中ConnectionManager是Watcher的实现类,主要负责对客户端与Zookeeper集群之间连接的状态变化信息进行响应,关于Watcher的详细介绍,可以参考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches,

 

SolrZooKeeper类是一个包装类,没有实际意义,ZkCmdExecutor类是负责在连接失败的情况下,重试某种操作特定次数,具体的操作是ZkOperation这个抽象类的具体实现子类,其execute方法中包含了具体操作步骤,这些操作包括新建一个Znode节点,读取Znode节点数据,创建Znode路径,删除Znode节点等Zookeeper操作。

首先来看它的构造函数,先创建ConnectionManager对象来响应两端之间的状态变化信息,然后ZkClientConnectionStrategy类是一个连接策略抽象类,它包含连接和重连两种策略,并且采用模板方法模式,具体的实现是通过静态累不类ZkUpdate来实现的,DefaultConnectionStrategy是它的一个实现子类,它覆写了connect和reconnect两个连接策略方法。

 

public SolrZkClient(String zkServerAddress, int zkClientTimeout,
      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
      TimeoutException, IOException {
    connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
        + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
    strat.connect(zkServerAddress, zkClientTimeout, connManager,
        new ZkUpdate() {
          @Override
          public void update(SolrZooKeeper zooKeeper) {
            SolrZooKeeper oldKeeper = keeper;
            keeper = zooKeeper;
            if (oldKeeper != null) {
              try {
                oldKeeper.close();
              } catch (InterruptedException e) {
                // Restore the interrupted status
                Thread.currentThread().interrupt();
                log.error("", e);
                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                    "", e);
              }
            }
          }
        });
    connManager.waitForConnected(clientConnectTimeout);
    numOpens.incrementAndGet();
  }

  值得注意的是,构造函数中生成的ZkUpdate匿名类对象,它的update方法会被调用,在这个方法里,会首先将已有的老的SolrZooKeeperg关闭掉,然后放置上一个新的SolrZooKeeper。做好这些准备工作以后,就会去连接Zookeeper服务器集群,

 

 

connManager.waitForConnected(clientConnectTimeout);//连接zk服务器集群,默认30秒超时时间

 

其实具体的连接动作是new SolrZooKeeper(serverAddress, timeout, watcher)引发的,上面那句代码只是在等待指定时间,看是否已经连接上。

如果连接Zookeeper服务器集群成功,那么就可以进行Zookeeper的常规操作了:

1) 是否已经连接

 

public boolean isConnected() {
    return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
  }

 2)  是否存在某个路径的Znode

public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (retryOnConnLoss) {
      return zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public Stat execute() throws KeeperException, InterruptedException {
          return keeper.exists(path, watcher);
        }
      });
    } else {
      return keeper.exists(path, watcher);
    }
  }

 3) 创建一个Znode节点

public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (retryOnConnLoss) {
      return zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public String execute() throws KeeperException, InterruptedException {
          return keeper.create(path, data, acl, createMode);
        }
      });
    } else {
      return keeper.create(path, data, acl, createMode);
    }
  }

 4)  获取指定路径下的孩子Znode节点

public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (retryOnConnLoss) {
      return zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public List<String> execute() throws KeeperException, InterruptedException {
          return keeper.getChildren(path, watcher);
        }
      });
    } else {
      return keeper.getChildren(path, watcher);
    }
  }

 5) 获取指定Znode上附加的数据

public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (retryOnConnLoss) {
      return zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public byte[] execute() throws KeeperException, InterruptedException {
          return keeper.getData(path, watcher, stat);
        }
      });
    } else {
      return keeper.getData(path, watcher, stat);
    }
  }

 6)  在指定Znode上设置数据

public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (retryOnConnLoss) {
      return zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public Stat execute() throws KeeperException, InterruptedException {
          return keeper.setData(path, data, version);
        }
      });
    } else {
      return keeper.setData(path, data, version);
    }
  }

 7) 创建路径

public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
    if (log.isInfoEnabled()) {
      log.info("makePath: " + path);
    }
    boolean retry = true;
    
    if (path.startsWith("/")) {
      path = path.substring(1, path.length());
    }
    String[] paths = path.split("/");
    StringBuilder sbPath = new StringBuilder();
    for (int i = 0; i < paths.length; i++) {
      byte[] bytes = null;
      String pathPiece = paths[i];
      sbPath.append("/" + pathPiece);
      final String currentPath = sbPath.toString();
      Object exists = exists(currentPath, watcher, retryOnConnLoss);
      if (exists == null || ((i == paths.length -1) && failOnExists)) {
        CreateMode mode = CreateMode.PERSISTENT;
        if (i == paths.length - 1) {
          mode = createMode;
          bytes = data;
          if (!retryOnConnLoss) retry = false;
        }
        try {
          if (retry) {
            final CreateMode finalMode = mode;
            final byte[] finalBytes = bytes;
            zkCmdExecutor.retryOperation(new ZkOperation() {
              @Override
              public Object execute() throws KeeperException, InterruptedException {
                keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
                return null;
              }
            });
          } else {
            keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
          }
        } catch (NodeExistsException e) {
          
          if (!failOnExists) {
            // TODO: version ? for now, don't worry about race
            setData(currentPath, data, -1, retryOnConnLoss);
            // set new watch
            exists(currentPath, watcher, retryOnConnLoss);
            return;
          }
          
          // ignore unless it's the last node in the path
          if (i == paths.length - 1) {
            throw e;
          }
        }
        if(i == paths.length -1) {
          // set new watch
          exists(currentPath, watcher, retryOnConnLoss);
        }
      } else if (i == paths.length - 1) {
        // TODO: version ? for now, don't worry about race
        setData(currentPath, data, -1, retryOnConnLoss);
        // set new watch
        exists(currentPath, watcher, retryOnConnLoss);
      }
    }
  }

 8) 删除指定Znode

public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
    if (retryOnConnLoss) {
      zkCmdExecutor.retryOperation(new ZkOperation() {
        @Override
        public Stat execute() throws KeeperException, InterruptedException {
          keeper.delete(path, version);
          return null;
        }
      });
    } else {
      keeper.delete(path, version);
    }
  }

   我们再回过头来看看ConnectionManager类是如何响应两端的连接状态信息的变化的,它最重要的方法是process方法,当它被触发回调时,会从WatchedEvent参数中得到事件的各种状态信息,比如连接成功,会话过期(此时需要进行重连),连接断开等。

public synchronized void process(WatchedEvent event) {
    if (log.isInfoEnabled()) {
      log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());
    }

    state = event.getState();
    if (state == KeeperState.SyncConnected) {
      connected = true;
      clientConnected.countDown();
    } else if (state == KeeperState.Expired) {
      connected = false;
      log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
      //尝试重新连接zk服务器
      try {
        connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
            new ZkClientConnectionStrategy.ZkUpdate() {
              @Override
              public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
                synchronized (connectionStrategy) {
                  waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
                  client.updateKeeper(keeper);
                  if (onReconnect != null) {
                    onReconnect.command();
                  }
                  synchronized (ConnectionManager.this) {
                    ConnectionManager.this.connected = true;
                  }
                }
                
              }
            });
      } catch (Exception e) {
        SolrException.log(log, "", e);
      }
      log.info("Connected:" + connected);
    } else if (state == KeeperState.Disconnected) {
      connected = false;
    } else {
      connected = false;
    }
    notifyAll();
  }

 

 

 

 

作者:洞庭散人

出处:http://phinecos.cnblogs.com/    

本博客遵从Creative Commons Attribution 3.0 License,若用于非商业目的,您可以自由转载,但请保留原作者信息和文章链接URL。

 

 

分享到:
评论
2 楼 CFJAVA 2013-04-04  
分析的真仔细,拜读了!
1 楼 cc3514772b 2012-04-27  
目前也在使用solr ,想跟lz交流下。。

相关推荐

    深入剖析NGINGX

    深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入剖析NGINGX深入...

    《深入剖析 Tomcat》PDF版本下载.txt

    根据提供的文件信息,本文将对《深入剖析 Tomcat》这一资料进行详细的知识点解析。Tomcat作为一款开源的Servlet容器,被广泛应用于Java Web应用程序的部署与运行环境中。本资料旨在帮助读者深入了解Tomcat的工作原理...

    《深入剖析TOMCAT.pdf》(中文版,带目录)

    带目录的《深入剖析TOMCAT》中文版,带目录,便于大家阅读

    《计算的本质:深入剖析程序和计算机》_编程的本质_《计算的本质:深入剖析程序和计算机》_计算机_tunegua_

    《计算的本质:深入剖析程序和计算机》是一本深入探讨编程理论和计算机科学核心概念的书籍。作者通过使用 Ruby 语言的实例,使复杂的计算概念变得易于理解,旨在帮助读者掌握编程语言设计的基础知识。 首先,我们要...

    计算的本质:深入剖析程序和计算机(完整版 电子书 文字版)Tom Stuart

    《计算的本质:深入剖析程序和计算机》借助简单的Ruby代码示例,全面、深入地介绍计算理论和编程语言设计。作者注重实用性,在读者熟知的背景知识下,以明晰的可工作代码阐释了形式语义、自动机理论,以及通过lambda...

    全面深入剖析SaaS

    深入剖析SaaS之三: SaaS的特性 18 深入剖析SaaS之四: SaaS的好处 22 SaaS在中国抓住“长尾”需特殊商业模式 30 SaaS在中国的产生及发展 32 IBM博士孙伟:软件产业正经历裂变 SaaS渐成生态 33 SaaS之后是什么 39 SaaS...

    tomcat深入剖析

    tomcat深入剖析tomcat深入剖析tomcat深入剖析tomcat深入剖析tomcat深入剖析tomcat深入剖析tomcat深入剖析

    linux深入剖析基于0.11

    linux深入剖析基于0.11linux深入剖析基于0.11linux深入剖析基于0.11

    Oracle11g体系结构深入剖析和运维管理(一)

    资源名称:Oracle 11g体系结构深入剖析和运维管理(一)资源目录:【】1_Oracle学习前奏【】2_Oracle软件安装准备工作【】3_Oracle软件安装和数据库的创建【】4_Oracle数据库及相关软件的启动和关闭【】5_计算机工作...

    内存管理深入剖析内存管理深入剖析内存管理深入剖析

    _内存管理深入剖析 第1章 内存初学者指南 计算机内存的类型: 长期或短期 计算机内存的发展 应用程序如何寻找内存 内存管理程序如何工作 典型问题 第2章 计算机如何看待内存 微处理器 INTEL微处理器系列的发展 内存...

    深入剖析TOMCAT_高清中文_带完整章节目录多版本

    《深入剖析Tomcat》是一本专门针对Java领域的Web服务器Tomcat进行深度解析的权威书籍。这本书以高清中文的形式呈现,包含完整的章节目录,旨在帮助读者全面理解和掌握Tomcat的内部工作原理及其在实际开发中的应用。...

    深入剖析Tomcat-高清-书签

    深入剖析Tomcat 高清带书签 深入剖析Tomcat 高清带书签

    Windows深入剖析(初始化篇).

    Windows深入剖析(初始化篇).Windows深入剖析(初始化篇).

    WindowsXP 深入剖析

    WindowsXP 深入剖析-王達時

    深入剖析Visual C++编程技术及应用实例

    《深入剖析Visual C++编程技术及应用实例》通过主流开发领域内的若干具有代表性的实例,向读者深入细致地讲解了利用Visual C++6.0进行Windows高级软件开发的相关技术。本书涉及的领域包括用户界面、文件系统、图形...

    Oracle11g体系结构深入剖析和运维管理(五)

    资源名称:Oracle 11g体系结构深入剖析和运维管理(五)资源目录:【】37_深入剖析事务槽及Oracle多种提交方式【】38_OracleIMU及RedoPrivateStrands技术【】39_读一致性(ORA-01555错误机制分析)及Undo表空间大小设置...

    深入剖析tomcat (完整目录)

    【深入剖析Tomcat——完整目录】 Tomcat是Apache软件基金会的Jakarta项目中的一个核心项目,是一个开源的、免费的Web应用服务器,主要用于运行Java Servlet和JavaServer Pages(JSP)。Tomcat以其轻量级、易用性...

    玩转电商系统 深入剖析智慧电商平台_完整版 PDF电子书下载 带书签目录

    通过以上分析可以看出,《玩转电商系统深入剖析智慧电商平台》这本书不仅涵盖了电商平台的核心技术要点,还针对实际应用场景提出了具体的实践方案。无论是对于想要深入了解该领域的技术人员还是正在寻求解决方案的...

    深入剖析Tomcat+源码

    《深入剖析Tomcat》是一本专注于Java Web服务器Tomcat的深度解析资料,包含了对Tomcat源码的细致分析。此资料包提供了多个文件,包括"深入剖析Tomcat源码.rar","深入剖析tomcat.pdf",以及"apache-tomcat-7.0.32-...

Global site tag (gtag.js) - Google Analytics