`
fwuwen
  • 浏览: 16333 次
  • 来自: 厦门
文章分类
社区版块
存档分类
最新评论

solr4.2更新流程

 
阅读更多

solr4.2的update大致流程:

execute--->RequestHandlerBase.handleRequest--->ContentStreamHandlerBase.handleRequestBody
	--->JavabinLoader.load-->JavaBinUpdateRequestCodec.StreamingUpdateHandler.update
		--->LogUpdateProcessor.processAdd
			--->DistributedUpdateProcessor.processAdd
				--->RunUpdateProcessor.processAdd

  

ContentStreamHandlerBase.handleRequestBody

SolrParams params = req.getParams();
    /**
     * 根据update.chain 获取不同的处理链
     * langid -->  包含TikaLanguageIdentifierUpdateProcessorFactory
     * script -->  包含StatelessScriptUpdateProcessorFactory
     * dedupe -->  包含SignatureUpdateProcessorFactory
     * .....
     */ 
    UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));	 
    /**
     * 系统默认的更新处理链包含3个,按顺序执行
     * LogUpdateProcessorFactory	  ---> 记录更新日志
     * DistributedUpdateProcessorFactory  ---> 分布式更新,包含请求转发,版本控制等
     * RunUpdateProcessorFactory 	  ---> 写入索引
     */
    UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);

 

processorChain.createProcessor:

 public UpdateRequestProcessor createProcessor(SolrQueryRequest req, 
                                                SolrQueryResponse rsp) 
  {
    UpdateRequestProcessor processor = null;
    UpdateRequestProcessor last = null;
    
    final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);	// 判断是否请求转发而来的
    final boolean skipToDistrib = distribPhase != null;
    boolean afterDistrib = true;  // we iterate backwards, so true to start

    for (int i = chain.length-1; i>=0; i--) {	// 从下往上迭代构造处理链
      UpdateRequestProcessorFactory factory = chain[i];
      if (skipToDistrib) {
        if (afterDistrib) {
          if (factory instanceof DistributingUpdateProcessorFactory) {
            afterDistrib = false;
          }
        } else if (!(factory instanceof LogUpdateProcessorFactory)) {    // TODO: use a marker interface for this?
          // skip anything that is not the log factory
          continue;			// 请求转发的就不需要记录更新日志,忽略LogUpdateProcessor
        }
      }

      processor = factory.getInstance(req, rsp, last);	// 构造处理链,将下个的UpdateRequestProcessor通过构造函数赋值给当前的next 当next为null 代表处理完毕
      last = processor == null ? last : processor;
    }

    return last;	// 返回配置文件中的第一个更新处理链
  }

 

 LogUpdateProcessor:

 

 记录更新日志

 

 DistributedUpdateProcessor:

 

DistributedUpdateProcessor是一个分布式更新组件,主要是进行请求的转发。

 

发生请求转发的情况

implicit:

客户端直接提交到非leader节点,就需要通过该leader来分发数据

请求leader节点,但是该slice带有replicas

compositeId:

hash后的节点,不是当前提交的节点

 

 所以为了不让更新请求不会转发来转发去。需要满足3个条件

1.只提交给所有leader节点

2.该leader节点不带有replicas节点

3.没有配置numShards参数,或者numShards不大于1

 

 public void processAdd(AddUpdateCommand cmd) throws IOException {
    updateCommand = cmd;

    if (zkEnabled) {	// cloud模式下
      zkCheck();		// 检查zk连接状态
      nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());	// 判断更新请求来源,获取待转发的节点
    } else {
      isLeader = getNonZkLeaderAssumption(req);
    }
    
    boolean dropCmd = false;
    if (!forwardToLeader) {	 		//是否有需要写入本地数据
      dropCmd = versionAdd(cmd);	// 在这个方法判断版本号信息.然后在调用RunUpdateProcessord写入索引
    }

    if (dropCmd) {			// 丢弃该请求,不分发到各个节点上面	
      return;
    }
    
    ModifiableSolrParams params = null;
    if (nodes != null) {	
      params = new ModifiableSolrParams(filterParams(req.getParams()));
      params.set(DISTRIB_UPDATE_PARAM, (isLeader ?  DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); // 转发请求,可以分为从leader转发,还是发到leader
      if (isLeader) {
        params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
      }

      params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));	// 分布式更新提交标志
    
      cmdDistrib.distribAdd(cmd, nodes, params);	   // 将记录分发到各个节点上面
    }
    
    /**
     * 返回update的响应内容
     * 可以通过设置UpdateParams.VERSIONS=true来返回添加信息,默认不返回
     */
    if (returnVersions && rsp != null && idField != null) {
      if (addsResponse == null) {
        addsResponse = new NamedList<String>();
        rsp.add("adds",addsResponse);
      }
      if (scratch == null) scratch = new CharsRef();
      idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
      addsResponse.add(scratch.toString(), cmd.getVersion());	// 返回内容adds={uniqueKey=版本号,...} key值和版本号
    }
  }
 setupRequest()
private List<Node> setupRequest(String id, SolrInputDocument doc) {
    List<Node> nodes = null;

    // if we are in zk mode...
    if (zkEnabled) {

      if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
        isLeader = false;     // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
        forwardToLeader = false;
        return nodes;
      }

      String coreName = req.getCore().getName();	

      ClusterState cstate = zkController.getClusterState();
      numNodes = cstate.getLiveNodes().size();
      DocCollection coll = cstate.getCollection(collection);
      /**
       * ImplicitDocRouter	---> 和 uniqueKey无关,可以在请求参数或者SolrInputDocument中添加_shard_参数获取slice ,(router="implicit")
       * CompositeIdRouter	---> 根据uniqueKey的hash值获取slice,	(在指定numshards参数会自动切换到router="compositeId")
       * 
       * 在compositeId模式下,update更新基本都是靠转发来完成的,只有当本身的请求节点与hash后获取的节点一样时,才从本地写入
       */
      Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);

      if (slice == null) {
        String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
        slice = coll.getSlice(shardId);
        if (slice == null) {
          throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
        }
      }

      String shardId = slice.getName();	

      try {
        Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
            collection, shardId);
        ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(leaderReplica);
        String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
        /**
         * isleader有两种情况:
         * 1.在implicit模式下 请求的节点本身就是leader
         * 2.在compositeId模式下,请求的节点与hash后的节点 一样
         */
        isLeader = coreNodeName.equals(leaderReplica.getName());

        DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));	// 可以分为fromleader和toleader

        doDefensiveChecks(phase);

        if (DistribPhase.FROMLEADER == phase) {		// 请求来自leader转发,FROMLEADER,不需要转发给leader,也不需要转发给其它replicas
          forwardToLeader = false;
        } else if (isLeader) {						// 请求不是来自leader,但自己就是leader,那么就需要将请求写到本地,顺便分发给其他的replicas.
          forwardToLeader = false;
          List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
              .getReplicaProps(collection, shardId, coreNodeName,
                  coreName, null, ZkStateReader.DOWN);
          if (replicaProps != null) {				// 带有replicaProps节点时候,需要请求转发
            nodes = new ArrayList<Node>(replicaProps.size());
            String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
            Set<String> skipListSet = null;
            if (skipList != null) {
              skipListSet = new HashSet<String>(skipList.length);
              skipListSet.addAll(Arrays.asList(skipList));
            }

            for (ZkCoreNodeProps props : replicaProps) {
              if (skipList != null) {
                if (!skipListSet.contains(props.getCoreUrl())) {
                  nodes.add(new StdNode(props));
                }
              } else {
                nodes.add(new StdNode(props));
              }
            }
          }

        } else {	
        /**
         * 有两种情况:
         * 1.请求不是来自leader,同时自己又不是leader,即该更新请求是客户端直接提交过来的
         * 2.compositeId下,hash后的节点不是当前节点
         * 
         * 发生这种情况时候,请求需要转发
         */
          nodes = new ArrayList<Node>(1);
          nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId));	// 获取待转发的节点implicit下为leader节点,compositeId下为hash后 的节点
          forwardToLeader = true;
        }

      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
            e);
      }
    }

    return nodes;
  }
 

 

versionAdd():

private boolean versionAdd(AddUpdateCommand cmd) throws IOException {
    BytesRef idBytes = cmd.getIndexedId();	

    if (idBytes == null) {
      super.processAdd(cmd);
      return false;
    }

    if (vinfo == null) {	// <updateLog/>配置后就会自动初始化vinfo
      if (isAtomicUpdate(cmd)) {
        throw new SolrException
          (SolrException.ErrorCode.BAD_REQUEST,
           "Atomic document updates are not supported unless <updateLog/> is configured");
      } else {
        super.processAdd(cmd);	// 非cloud模式下 事务日志没有配置的话 直接进行提交
        return false;
      }
    }

    int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0); // 对主键进行hash

    long versionOnUpdate = cmd.getVersion();	// 获取更新的版本号,如果没有进行配置的话为0

    if (versionOnUpdate == 0) {
      SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD);	// 判断添加的SolrInputDocument中是否带有VERSION_FIELD信息
      if (versionField != null) {
        Object o = versionField.getValue();
        versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
      } else {
        // Find the version
        String versionOnUpdateS = req.getParams().get(VERSION_FIELD);	// 查看请求中是否带有VERSION_FIELD信息
        versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
      }
    }

    // 如果在请求中没有特定的去指定VERSION_FIELD信息 versionOnUpdate还是为0
    
    boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
    boolean leaderLogic = isLeader && !isReplayOrPeersync;


    VersionBucket bucket = vinfo.bucket(bucketHash);	// 根据主键的hash值获取不同的bucket对象

    /**
     * 读锁, 按query进行删除的时候会持有写锁,其他地方持有写锁的时候,对性能基本没什么影响,即在调用deleteByQuery的时候,对写入性能有较大的影响
     */
    vinfo.lockForUpdate();	
    try {
      synchronized (bucket) {	// 如果主键相同,则获取的bucket对象相同,需要同步处理,判断版本号 ---- 这块还没有搞清楚
        boolean checkDeleteByQueries = false;

        if (versionsStored) {	// 是否带有_version_信息

          long bucketVersion = bucket.highest;

          if (leaderLogic) {

            boolean updated = getUpdatedDocument(cmd, versionOnUpdate);

            if (versionOnUpdate != 0) {
              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
              long foundVersion = lastVersion == null ? -1 : lastVersion;
              if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) {
              } else {
                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion);
              }
            }


            long version = vinfo.getNewClock();
            cmd.setVersion(version);
            cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
            bucket.updateHighest(version);
          } else {
            // The leader forwarded us this update.
            cmd.setVersion(versionOnUpdate);

            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
              // we're not in an active state, and this update isn't from a replay, so buffer it.
              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
              ulog.add(cmd);
              return true;
            }

            if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
              bucket.updateHighest(versionOnUpdate);
            } else {
              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
              if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
                // This update is a repeat, or was reordered.  We need to drop this update.
                return true;
              }

              checkDeleteByQueries = true;
            }
          }
        }
        
        boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;	// 是否需要分发其他replica节点
        
        SolrInputDocument clonedDoc = null;
        if (willDistrib) {	
          clonedDoc = cmd.solrDoc.deepCopy();	// 深拷贝,
        }
        
        // TODO: possibly set checkDeleteByQueries as a flag on the command?
        doLocalAdd(cmd);						// 调用RunUpdateProcessor写入索引
        
        if (willDistrib) {
          cmd.solrDoc = clonedDoc;				// 重新赋值给cmd.solrDoc
        }

      }  // end synchronized (bucket)
    } finally {
      vinfo.unlockForUpdate();
    }
    return false;
  }

 

 

RunUpdateProcessor:

 

 public void processAdd(AddUpdateCommand cmd) throws IOException {
    if (DistributedUpdateProcessor.isAtomicUpdate(cmd)) {
      throw new SolrException
        (SolrException.ErrorCode.BAD_REQUEST,
         "RunUpdateProcessor has recieved an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain");
    }

    updateHandler.addDoc(cmd);	// 调用DirectUpdateHandler2写入索引
    super.processAdd(cmd);		// 如果有下个处理链的话,继续往下执行
    changesSinceCommit = true;
  }

 

分享到:
评论
1 楼 weislyn 2016-08-16  
processorChain.createProcessor:

if (!(factory instanceof LogUpdateProcessorFactory)) {    // TODO: use a marker interface for this? 
         // skip anything that is not the log factory 
         continue;          // 请求转发的就不需要记录更新日志,忽略LogUpdateProcessor 
       } 
此处说是  // 请求转发的就不需要记录更新日志,忽略
判断factory不是LogUpdateProcessorFactory 类型 才执行下一次循环

相关推荐

    SOLR的应用教程

    介绍Solr的应用场景和使用流程。 3.2 一个简单的例子 包括Solr Schema设计、构建索引和搜索测试。 3.3 搜索引擎的规划设计 3.3.1 定义业务模型 3.3.2 定制索引服务 3.3.3 定制搜索服务 3.4 搜索引擎配置 3.5 如何...

    Solr3.5开发应用指导

    - **3.1.2 SOLR的使用过程说明**:一般流程包括创建索引、添加文档、执行查询和获取结果等步骤。 **3.2 一个简单的例子** - **3.2.1 SolrSchema设计**:定义文档的结构,包括字段名称、字段类型等。 - **3.2.2 ...

    solr教材-PDF版

    - **3.1.2 SOLR的使用过程说明**:概述了从搭建环境到实际应用Solr的整个流程。 **3.2 一个简单的例子** - **3.2.1 SolrSchema设计**:通过一个具体的例子展示如何设计SolrSchema。 - **3.2.2 构建索引**:演示...

    基于Solr的企业级信息检索的设计与实现

    - **集成能力**:Solr易于与其他企业应用集成,如CRM、ERP等系统,从而增强整体业务流程的效率。 ### 4. Solr的企业级信息检索系统设计 #### 4.1 系统架构 系统通常包含索引模块、搜索模块、缓存模块和管理模块。...

    Solrj 中文教程

    - **3.1.2 SOLR的使用过程说明**:从索引建立到搜索结果呈现的整个流程。 ##### 3.2 一个简单的例子 - **3.2.1 SolrSchema设计**:为示例项目设计一个简单的索引结构。 - **3.2.2 构建索引**:向Solr添加文档以构建...

    基于SpringBoot博客系统的设计与实现.doc

    ##### 4.2 MyBatis的数据持久化 MyBatis框架提供了一种简洁的方式来处理数据持久化操作。通过简单的XML映射文件或注解即可完成对象与数据库表之间的映射,极大地提高了开发效率。 ##### 4.3 Shiro的安全管理 ...

    lucene-2.9.0-src.tar.gz

    Lucene社区活跃,不断有新的改进和扩展,如Solr和Elasticsearch,它们基于Lucene提供更高级的服务,如分布式搜索、集群管理和数据分析。 总结,通过对“lucene-2.9.0-src.tar.gz”的研究,我们可以深入学习全文检索...

    XXX02_项目话术.txt

    - Spring MVC 4.2 版本可以通过注解 `@CrossOrigin` 来简化跨域配置。 - `@CrossOrigin` 注解可以指定允许的源地址 `origins` 和是否允许携带凭据 `allowCredentials`。 ### 4. 用户购物车逻辑 - **购物车模型**...

    基于lucene 的简单搜索引擎.rar

    然而,随着数据量的增长,性能优化、分布式索引、实时索引等挑战也随之而来,需要开发者灵活运用Lucene的高级特性以及与其他技术(如Solr、Elasticsearch)的集成来解决。 总的来说,“基于lucene 的简单搜索引擎....

    nutch入门学习

    ##### 4.2 Nutch 工作流程 - **爬虫**:负责抓取网页并解析链接,将数据存入爬取数据库。 - **索引**:将爬取的网页内容转换为索引格式,便于后续搜索。 - **搜索**:用户通过 Web 界面向 Nutch 发送查询请求,...

    百度云盘 pdf《大数据架构和算法实现之路:电商系统的技术实战》百度云盘-带标签目录

    1.2 分类任务的处理流程 ……·· ……·7 1.3 算法:朴素贝叶斯和 K最近邻……·8 1.3.1 朴素贝叶斯….......……… …….. 8 1.3.2 K 最近邻……………………... 9 1.4 分类效果评估…………·…….. 10 1.5 ...

    一个专业搜索公司关于lucene+solar资料(1)

    - 利用Solr、Elasticsearch等分布式搜索引擎。 - 分布式架构可以有效提高系统的扩展性和可用性。 **6.6 本章小结** - 本章详细介绍了如何使用Lucene创建和管理索引库,包括索引库的设计、创建...

    t淘淘商城项目 商城项目 视频和源码教程 详细

    Maven定义了软件开发的整套流程体系,并进行了封装,开发人员只需要指定项目的构建流程,无需针对每个流程编写自己的构建脚本。 2、依赖管理。除了项目构建,Maven最核心的功能是软件包的依赖管理,能够自动分析...

    嵌入式体验入门班.pdf

    36 4.2 Linux 内核编译流程............................................................................... 38 4.2.1 获取内核源码............................................................................

Global site tag (gtag.js) - Google Analytics