`
suichangkele
  • 浏览: 201445 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

solrCloud中CompsiteId路由策略的collection的操作分析

    博客分类:
  • solr
阅读更多

       solrCloud有多个shard,一个shard有一个或者多个replica,那么再solrj发起添加documnet(这里称作update请求)或者是查询(这里叫做query请求)的时候,是如何向最终的各个solr的不同的shard的replica发送的请求呢?我看了看solrj中的CloudSolrServer的处理请求的源码以及solr服务端的部分源码,终于弄懂了,几个笔记如下,方便大家和我,这篇博客只是写CompositeIdDocRouter的collection,对于ImplicitRouter的集合在另一篇博客中。

 

       先说一下solrj的实现原理和过程,solrj是使用一个叫做solrServer的类发起请求的,无论是update还是query,都是一个请求,他们只包含一些请求的参数,都需要一个server来具体的域solr服务器交互,solrServer就是起到这样的一个功能。solrServer是一个接口,他有多个实现,比如我们在单机版的solr中使用的HttpSolrServer,就是直接使用http协议发起的请求。在solrj提供的实现中还有一个是LBHttpSolrServer,他也是使用http协议发起的请求,只不过他的内部是有多个url,也就是服务器是有多个的,多个url中他是随机抽取一个进行请求的,如果随机抽取的url可以访问成功,那就操作成功,否则会将当前访问的url设置为僵尸的,然后继续访问下一个url,我们看一下LBhttpSolrServer的request方法:

  @Override
  public NamedList<Object> request(final SolrRequest request)//处理请求,请求可以是update的或者是query的。
          throws SolrServerException, IOException {
    Exception ex = null;
    ServerWrapper[] serverList = aliveServerList;//当前可以访问的所有的url
    
    int maxTries = serverList.length;
    Map<String,ServerWrapper> justFailed = null;

    for (int attempts=0; attempts<maxTries; attempts++) {//尝试所有的可能的url。
      int count = counter.incrementAndGet();//当前的LBHttpSolrServer发起http请求的总次数
      ServerWrapper wrapper = serverList[count % serverList.length];//根据次数计算随机值,获得一个随机的HttpSolrServer,ServerWrapper就是封装的HttpSolrServer,
      wrapper.lastUsed = System.currentTimeMillis();

      try {
        return wrapper.solrServer.request(request);//调用获得server发起请求,如果成功,则返回
      } catch (SolrException e) {//http请求成功,但是参数不对,则抛出这个异常
        // Server is alive but the request was malformed or invalid
        throw e;
      } catch (SolrServerException e) {
        if (e.getRootCause() instanceof IOException) {//http请求失败,比如是网络出现问题,或者当前的url代表的server已经死亡
          ex = e;
          moveAliveToDead(wrapper);//将这个url设置为死亡状态
          if (justFailed == null) justFailed = new HashMap<String,ServerWrapper>();
          justFailed.put(wrapper.getKey(), wrapper);
        } else {
          throw e;
        }
      } catch (Exception e) {
        throw new SolrServerException(e);
      }
    }

    // try other standard servers that we didn't try just now  如果上面的所有的url都不能访问成功,则继续访问那些之前别标记为死亡的url,看看他们能否访问成功。
    for (ServerWrapper wrapper : zombieServers.values()) {
      if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
      try {
        NamedList<Object> rsp = wrapper.solrServer.request(request);
        // remove from zombie list *before* adding to alive to avoid a race that could lose a server
        zombieServers.remove(wrapper.getKey());//如果访问成功,则从死亡的server中移除
        addToAlive(wrapper);//添加到存活的url中
        return rsp;
      } catch (SolrException e) {
        // Server is alive but the request was malformed or invalid
        throw e;
      } catch (SolrServerException e) {
        if (e.getRootCause() instanceof IOException) {
          ex = e;
          // still dead
        } else {
          throw e;
        }
      } catch (Exception e) {
        throw new SolrServerException(e);
      }
    }

    //如果所有的url都访问了,还是失败,则报错。
    if (ex == null) {
      throw new SolrServerException("No live SolrServers available to handle this request");
    } else {
      throw new SolrServerException("No live SolrServers available to handle this request", ex);
    }
  }

经过上面之后,可以对LBHttpSolrServer 做一个初步的了解了,他就是对所有的url做轮训的,可以这么理解。

 

接下来便是CloudSolrServer了,他在处理请求的时候,对于update的,会在客户端根据document的id进行hash,然后将其添加到对应的shard中,如果一个shard有超过一个replica,会强行选择leader,这一块涉及到的代码非常多,我这里就不再贴代码了。对于query的请求,则是将当前collection的所有的shard的所有的replica的url都获取到,然后使用一个LBHttpSolrServer进行查询任意一个replica,也就是查询的时候,solr是在服务端进行的组装来自于不同的shard的document,在某一个replica中会向其他的shard发起请求,然后再返回组装好的response,对于客户端是透明的。

代码如下:

 

  @Override
  public NamedList<Object> request(SolrRequest request)
      throws SolrServerException, IOException {
    connect();
    ClusterState clusterState = zkStateReader.getClusterState();//读取zk中的集群状态节点
    boolean sendToLeaders = false;
    List<String> replicas = null;
    
    if (request instanceof IsUpdateRequest) {//如果这次的请求是update的,则进入if
      if (request instanceof UpdateRequest) {
        NamedList response = directUpdate((AbstractUpdateRequest) request,//直接调用directUpdate,
            clusterState);
        if (response != null) {
          return response;
        }
      }
      sendToLeaders = true;
      replicas = new ArrayList<String>();
    }
    SolrParams reqParams = request.getParams();
    if (reqParams == null) {
      reqParams = new ModifiableSolrParams();
    }
    List<String> theUrlList = new ArrayList<String>();
    if (request.getPath().equals("/admin/collections")
        || request.getPath().equals("/admin/cores")) {
      Set<String> liveNodes = clusterState.getLiveNodes();
      for (String liveNode : liveNodes) {
        theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));
      }
    } else {//下面的是处理的query的
      String collection = reqParams.get("collection", defaultCollection);//如果没有对此CloudSolrServer指定collection,也没有在参数中设置collection的参数,则报错
      if (collection == null) {
        throw new SolrServerException(
            "No collection param specified on request and no default collection has been set.");
      }
      //根据上面的collection字符串来获得集合的名字,我这里假设此CloudSolrServer只有一个collection
      Set<String> collectionsList = getCollectionList(clusterState, collection);
      if (collectionsList.size() == 0) {
        throw new SolrException(ErrorCode.BAD_REQUEST,
            "Could not find collection: " + collection);
      }

      // TODO: not a big deal because of the caching, but we could avoid looking
      // at every shard
      // when getting leaders if we tweaked some things
      
      // Retrieve slices from the cloud state and, for each collection
      // specified,
      // add it to the Map of slices.
      Map<String,Slice> slices = new HashMap<String,Slice>();
      for (String collectionName : collectionsList) {//假设只有一个collection,实际情况也是只有一个
        Collection<Slice> colSlices = clusterState.getActiveSlices(collectionName);//获得此集合的所有的存活的shard,每个slice就是shard的意思
        if (colSlices == null) {
          throw new SolrServerException("Could not find collection:"
              + collectionName);
        }
        ClientUtils.addSlices(slices, collectionName, colSlices, true);//将colSlice添加到上面的slices中
      }
      Set<String> liveNodes = clusterState.getLiveNodes();
      
      List<String> leaderUrlList = null;
      List<String> urlList = null;
      List<String> replicasList = null;
      
      // build a map of unique nodes
      // TODO: allow filtering by group, role, etc
      Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
      List<String> urlList2 = new ArrayList<String>();
      for (Slice slice : slices.values()) {//循环所有的shard,
        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {//对一个shard,获得所有的replica的url
          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
          String node = coreNodeProps.getNodeName();
          if (!liveNodes.contains(coreNodeProps.getNodeName())
              || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
          if (nodes.put(node, nodeProps) == null) {
            if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {//在查询的时候sendToLeaders是false,进入if
              String url;
              if (reqParams.get("collection") == null) {
                url = ZkCoreNodeProps.getCoreUrl(
                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
                    defaultCollection);
              } else {
                url = coreNodeProps.getCoreUrl();//获得此replica的url
              }
              urlList2.add(url);//
            } else if (sendToLeaders) {
              String url;
              if (reqParams.get("collection") == null) {
                url = ZkCoreNodeProps.getCoreUrl(
                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
                    defaultCollection);
              } else {
                url = coreNodeProps.getCoreUrl();
              }
              replicas.add(url);
            }
          }
        }
      }
      
      if (sendToLeaders) {
        leaderUrlList = urlList2;
        replicasList = replicas;
      } else {
        urlList = urlList2;//设置为所有的shard的所有的replica的url
      }
      
      if (sendToLeaders) {
        theUrlList = new ArrayList<String>(leaderUrlList.size());
        theUrlList.addAll(leaderUrlList);
      } else {
        theUrlList = new ArrayList<String>(urlList.size());
        theUrlList.addAll(urlList);
      }
      Collections.shuffle(theUrlList, rand);
      if (sendToLeaders) {
        ArrayList<String> theReplicas = new ArrayList<String>(replicasList.size());
        theReplicas.addAll(replicasList);
        Collections.shuffle(theReplicas, rand);
        theUrlList.addAll(theReplicas);
      }
    }
    
    LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, theUrlList);//最后发起的请求就是轮训所有的shard的所有的replica的url,并不是每一个shard一个,所以可以总结是在服务端进行的封装。
    LBHttpSolrServer.Rsp rsp = lbServer.request(req);
    return rsp.getResponse();
  }

经过上面已经看到对于query的请求的操作了,即将请求随机的发到一个replica中,在replica中进行分发到各个shard中,这个想象也是这样,因为CompositeIdRouter是计算的id的hash值,无法根据hash值直接判断某个shard中是否存在某个doc,所以在查询的时候必须向所有的shard发请求。

 

下面看下update的操作方法,即directUpdate方法 ,在这个方法中有个很关键的方法是:Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);解释一下,updateRequest表示本次的更新操作,里面含有要添加的doc和删除的id、query,router表示当前的集合的router,col表示当前的集合的描述对象,urlMap表示当前集合的多个shard对应的对个的replica,这个方法用来对所有要更新的document找到对应的slice,也就是shard,我们进入到 updateRequest.getRoutes方法(只截取了一部分)

Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();//当前的request里要添加的documents
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {//循环所有的doc
        SolrInputDocument doc = entry.getKey();
        Object id = doc.getFieldValue(idField);//得到idField的值
        if (id == null) {
          return null;
        }
        Slice slice = router.getTargetSlice(id.toString(), doc, null, col);//router根据id值计算hash值,然后根据hash值得到一个slice,这个部分的实现看CompositeIdRouter的博客
        if (slice == null) {
          return null;
        }
        List<String> urls = urlMap.get(slice.getName());//根据得到的slice的名字得到这个slice的多个replica的url,第一个表示leader
        String leaderUrl = urls.get(0);//某个slice的leader
        LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes.get(leaderUrl);//将添加操作转发到leader上。
       。。。。。。//没有截取
      }

 上面的部分解释了update的请求,会将一个doc根据其id域计算hash值并找到对应的slice(这部分的实现在CompositeIdRouter中实现),然后将请求转发到这个slice的leader上。

 

看到这里,就看懂了在router是CompositeIDRouter的情况下CloudSOlrServer的客户端(solrj中)实现原理了。

 

 

 

 

 

分享到:
评论

相关推荐

    什么是路由策略?路由策略和策略路由有什么区别?如何配置路由策略?.doc

    在实际应用中,路由策略可以应用于各种场景,例如,在双上行的组网结构中,使用路由策略可以实现对路由信息进行过滤和修改路由的属性,例如,允许来自某个路由器的路由,拒绝来自某个路由器的路由,或者将来自某个...

    Linux策略路由的分析及其应用.pdf

    Linux 策略路由的分析及其应用 Linux 策略路由是指基于策略的路由选择机制,它允许管理员根据 IP 包的源地址、目的地址或其他信息来选择路由。Linux 作为一个开源的操作系统,具有强大的功能,包括策略路由功能。 ...

    路由策略学生实验-路由策略控制.rar

    在IT行业中,路由策略是网络管理员用来控制路由信息传播、优化网络流量以及提高网络稳定性的重要工具。本实验针对“路由策略学生实验”旨在帮助学习者深入理解路由策略的原理和应用,尤其对于HCNP(华为认证网络专业...

    路由策略改进与传统路由对比_路由策略改进_路由策略_

    在IT行业中,路由策略是网络通信的核心组成部分,它决定了数据包如何在互联网上高效、可靠地传输。本文将深入探讨路由策略的改进...在实际操作中,结合仿真工具进行性能评估,有助于找到最适合特定网络环境的路由策略。

    在RIP中使用路由策略配置指导

    *管理员需要在 RIP 路由协议中应用路由策略,包括配置路由策略和路由优先级等。 *管理员需要监控网络状态,实时调整路由策略和路由优先级等。 注意事项 ------ 使用 RIP 路由策略配置指导需要注意以下事项: *...

    华为路由器路由策略和策略路由.pdf

    路由策略是华为路由器中的一种机制,旨在实现路由过滤和路由属性设置等功能。通过改变路由属性,路由策略可以改变网络流量的路径,使网络流量更合理化。路由策略在发布、接收和引入路由信息时,根据实际组网需求实施...

    5、华为路由器路由策略和策略路由配置与管理.docx

    路由策略简介,基本原理,配置路由策略-配置地址前缀列表、配置AS属性过滤器\配置团体属性过滤器

    策略路由&路由策的区别和日常用途

    策略路由:一般情况下,数据包的转发是根据目的...路由策略:如下图,一个运行了路由协议的网络中,RTF会收到RTA和RTB发布的路由信息。如果用户要求RTF仅接收RTA发布的路由,那么,可以通过在RTC上部署路由策略来实现。

    详解CISCO路由交换机的策略路由配置语句

    策略路由配置语句在CISCO路由交换机中的应用 策略路由是一种基于源地址的路由技术,它可以根据不同的源地址将数据包发送到不同的目标网络。CISCO路由交换机支持策略路由,并提供了灵活的配置选项来满足不同的网络...

    路由策略与策略路由知识总结

    路由策略与策略路由知识总结

    华为公司使用路由策略.pptx

    路由策略是一种基于策略的路由选择机制,它可以改变实体——假如一个数据包匹配了路由策略中给定的标准,就会执行一些操作改变数据包。路由策略和访问控制列表非常相似,它们都运行“if……then”的程序语句:声明...

    基于域名路由策略.zip

    在IT行业中,网络路由策略是网络通信的核心组成部分,它决定了数据包如何在互联网上找到目的地。通常,我们使用的路由策略是基于IP地址的,但随着网络环境的复杂性增加,基于域名的路由策略逐渐受到关注。这篇内容将...

    华为 ME60 V800R010C10SPC500 配置指南 - 路由策略配置

    地址前缀列表是路由策略中的一个重要组件,它根据路由的目的地址进行匹配。在需要根据目的地址控制路由发布和接收的场景下,配置地址前缀列表是必要的。地址前缀列表类似于访问控制列表(ACL),但更加灵活,可以...

    三层技术-IP路由配置指导-路由策略配

    - **管理引入的路由**:不同路由协议之间的相互转换通常需要通过路由策略来调整路由信息的属性,以便它们能够正确地被引入到新的协议环境中。 - **设置路由的属性**:路由策略还可以用来修改路由的某些属性,例如...

    路由策略和策略路由。。。。

    在IT行业中,路由策略和策略路由是网络管理中的重要概念,尤其在华为设备上具有广泛的应用。本文将详细探讨这两个概念及其在实际网络部署中的应用。 路由策略(Routing Policy)是一种控制路由信息交换的方式,它...

    H3C 路由策略命令

    本文将详细解读《H3C 路由策略命令》手册中的关键知识点,涵盖路由策略的公共配置命令、IPv4路由策略配置命令以及IPv6路由策略配置命令。 #### 路由策略公共配置命令详解 1. **apply as-path**:此命令用于配置BGP...

    华为路由器路由策略和策略路由配置与管理.pdf

    路由策略是华为路由器中的一个重要功能,主要实现了路由过滤和路由属性设置等功能。它通过改变路由属性(包括可达性)来改变网络流量所经过的路径。路由协议在发布、接收和引入路由信息时,根据实际组网需求实施一些...

    策略路由与路由策略原理PPT学习教案.pptx

    策略路由和路由策略是网络管理中的重要概念,它们用于对数据包的转发路径和路由信息进行精细化控制,以优化网络性能和提高安全性。本课程旨在帮助你理解这两种机制的基本原理,并能实际应用到网络管理中。 ### 第一...

Global site tag (gtag.js) - Google Analytics