solr4.2的update大致流程:
execute--->RequestHandlerBase.handleRequest--->ContentStreamHandlerBase.handleRequestBody
--->JavabinLoader.load-->JavaBinUpdateRequestCodec.StreamingUpdateHandler.update
--->LogUpdateProcessor.processAdd
--->DistributedUpdateProcessor.processAdd
--->RunUpdateProcessor.processAdd
ContentStreamHandlerBase.handleRequestBody
SolrParams params = req.getParams();
/**
* 根据update.chain 获取不同的处理链
* langid --> 包含TikaLanguageIdentifierUpdateProcessorFactory
* script --> 包含StatelessScriptUpdateProcessorFactory
* dedupe --> 包含SignatureUpdateProcessorFactory
* .....
*/
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
/**
* 系统默认的更新处理链包含3个,按顺序执行
* LogUpdateProcessorFactory ---> 记录更新日志
* DistributedUpdateProcessorFactory ---> 分布式更新,包含请求转发,版本控制等
* RunUpdateProcessorFactory ---> 写入索引
*/
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
processorChain.createProcessor:
public UpdateRequestProcessor createProcessor(SolrQueryRequest req,
SolrQueryResponse rsp)
{
UpdateRequestProcessor processor = null;
UpdateRequestProcessor last = null;
final String distribPhase = req.getParams().get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM); // 判断是否请求转发而来的
final boolean skipToDistrib = distribPhase != null;
boolean afterDistrib = true; // we iterate backwards, so true to start
for (int i = chain.length-1; i>=0; i--) { // 从下往上迭代构造处理链
UpdateRequestProcessorFactory factory = chain[i];
if (skipToDistrib) {
if (afterDistrib) {
if (factory instanceof DistributingUpdateProcessorFactory) {
afterDistrib = false;
}
} else if (!(factory instanceof LogUpdateProcessorFactory)) { // TODO: use a marker interface for this?
// skip anything that is not the log factory
continue; // 请求转发的就不需要记录更新日志,忽略LogUpdateProcessor
}
}
processor = factory.getInstance(req, rsp, last); // 构造处理链,将下个的UpdateRequestProcessor通过构造函数赋值给当前的next 当next为null 代表处理完毕
last = processor == null ? last : processor;
}
return last; // 返回配置文件中的第一个更新处理链
}
LogUpdateProcessor:
记录更新日志
DistributedUpdateProcessor:
DistributedUpdateProcessor是一个分布式更新组件,主要是进行请求的转发。
发生请求转发的情况
implicit:
客户端直接提交到非leader节点,就需要通过该leader来分发数据
请求leader节点,但是该slice带有replicas
compositeId:
hash后的节点,不是当前提交的节点
所以为了不让更新请求不会转发来转发去。需要满足3个条件
1.只提交给所有leader节点
2.该leader节点不带有replicas节点
3.没有配置numShards参数,或者numShards不大于1
public void processAdd(AddUpdateCommand cmd) throws IOException {
updateCommand = cmd;
if (zkEnabled) { // cloud模式下
zkCheck(); // 检查zk连接状态
nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument()); // 判断更新请求来源,获取待转发的节点
} else {
isLeader = getNonZkLeaderAssumption(req);
}
boolean dropCmd = false;
if (!forwardToLeader) { //是否有需要写入本地数据
dropCmd = versionAdd(cmd); // 在这个方法判断版本号信息.然后在调用RunUpdateProcessord写入索引
}
if (dropCmd) { // 丢弃该请求,不分发到各个节点上面
return;
}
ModifiableSolrParams params = null;
if (nodes != null) {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, (isLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); // 转发请求,可以分为从leader转发,还是发到leader
if (isLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
}
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName())); // 分布式更新提交标志
cmdDistrib.distribAdd(cmd, nodes, params); // 将记录分发到各个节点上面
}
/**
* 返回update的响应内容
* 可以通过设置UpdateParams.VERSIONS=true来返回添加信息,默认不返回
*/
if (returnVersions && rsp != null && idField != null) {
if (addsResponse == null) {
addsResponse = new NamedList<String>();
rsp.add("adds",addsResponse);
}
if (scratch == null) scratch = new CharsRef();
idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
addsResponse.add(scratch.toString(), cmd.getVersion()); // 返回内容adds={uniqueKey=版本号,...} key值和版本号
}
}
setupRequest()
private List<Node> setupRequest(String id, SolrInputDocument doc) {
List<Node> nodes = null;
// if we are in zk mode...
if (zkEnabled) {
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
}
String coreName = req.getCore().getName();
ClusterState cstate = zkController.getClusterState();
numNodes = cstate.getLiveNodes().size();
DocCollection coll = cstate.getCollection(collection);
/**
* ImplicitDocRouter ---> 和 uniqueKey无关,可以在请求参数或者SolrInputDocument中添加_shard_参数获取slice ,(router="implicit")
* CompositeIdRouter ---> 根据uniqueKey的hash值获取slice, (在指定numshards参数会自动切换到router="compositeId")
*
* 在compositeId模式下,update更新基本都是靠转发来完成的,只有当本身的请求节点与hash后获取的节点一样时,才从本地写入
*/
Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
if (slice == null) {
String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
slice = coll.getSlice(shardId);
if (slice == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
}
}
String shardId = slice.getName();
try {
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(leaderReplica);
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
/**
* isleader有两种情况:
* 1.在implicit模式下 请求的节点本身就是leader
* 2.在compositeId模式下,请求的节点与hash后的节点 一样
*/
isLeader = coreNodeName.equals(leaderReplica.getName());
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); // 可以分为fromleader和toleader
doDefensiveChecks(phase);
if (DistribPhase.FROMLEADER == phase) { // 请求来自leader转发,FROMLEADER,不需要转发给leader,也不需要转发给其它replicas
forwardToLeader = false;
} else if (isLeader) { // 请求不是来自leader,但自己就是leader,那么就需要将请求写到本地,顺便分发给其他的replicas.
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, coreNodeName,
coreName, null, ZkStateReader.DOWN);
if (replicaProps != null) { // 带有replicaProps节点时候,需要请求转发
nodes = new ArrayList<Node>(replicaProps.size());
String[] skipList = req.getParams().getParams("test.distrib.skip.servers");
Set<String> skipListSet = null;
if (skipList != null) {
skipListSet = new HashSet<String>(skipList.length);
skipListSet.addAll(Arrays.asList(skipList));
}
for (ZkCoreNodeProps props : replicaProps) {
if (skipList != null) {
if (!skipListSet.contains(props.getCoreUrl())) {
nodes.add(new StdNode(props));
}
} else {
nodes.add(new StdNode(props));
}
}
}
} else {
/**
* 有两种情况:
* 1.请求不是来自leader,同时自己又不是leader,即该更新请求是客户端直接提交过来的
* 2.compositeId下,hash后的节点不是当前节点
*
* 发生这种情况时候,请求需要转发
*/
nodes = new ArrayList<Node>(1);
nodes.add(new RetryNode(leaderProps, zkController.getZkStateReader(), collection, shardId)); // 获取待转发的节点implicit下为leader节点,compositeId下为hash后 的节点
forwardToLeader = true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
}
return nodes;
}
versionAdd():
private boolean versionAdd(AddUpdateCommand cmd) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
if (idBytes == null) {
super.processAdd(cmd);
return false;
}
if (vinfo == null) { // <updateLog/>配置后就会自动初始化vinfo
if (isAtomicUpdate(cmd)) {
throw new SolrException
(SolrException.ErrorCode.BAD_REQUEST,
"Atomic document updates are not supported unless <updateLog/> is configured");
} else {
super.processAdd(cmd); // 非cloud模式下 事务日志没有配置的话 直接进行提交
return false;
}
}
int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0); // 对主键进行hash
long versionOnUpdate = cmd.getVersion(); // 获取更新的版本号,如果没有进行配置的话为0
if (versionOnUpdate == 0) {
SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD); // 判断添加的SolrInputDocument中是否带有VERSION_FIELD信息
if (versionField != null) {
Object o = versionField.getValue();
versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
} else {
// Find the version
String versionOnUpdateS = req.getParams().get(VERSION_FIELD); // 查看请求中是否带有VERSION_FIELD信息
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
}
// 如果在请求中没有特定的去指定VERSION_FIELD信息 versionOnUpdate还是为0
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
VersionBucket bucket = vinfo.bucket(bucketHash); // 根据主键的hash值获取不同的bucket对象
/**
* 读锁, 按query进行删除的时候会持有写锁,其他地方持有写锁的时候,对性能基本没什么影响,即在调用deleteByQuery的时候,对写入性能有较大的影响
*/
vinfo.lockForUpdate();
try {
synchronized (bucket) { // 如果主键相同,则获取的bucket对象相同,需要同步处理,判断版本号 ---- 这块还没有搞清楚
boolean checkDeleteByQueries = false;
if (versionsStored) { // 是否带有_version_信息
long bucketVersion = bucket.highest;
if (leaderLogic) {
boolean updated = getUpdatedDocument(cmd, versionOnUpdate);
if (versionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) {
} else {
throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion);
}
}
long version = vinfo.getNewClock();
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
bucket.updateHighest(version);
} else {
// The leader forwarded us this update.
cmd.setVersion(versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
return true;
}
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
bucket.updateHighest(versionOnUpdate);
} else {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
return true;
}
checkDeleteByQueries = true;
}
}
}
boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; // 是否需要分发其他replica节点
SolrInputDocument clonedDoc = null;
if (willDistrib) {
clonedDoc = cmd.solrDoc.deepCopy(); // 深拷贝,
}
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd); // 调用RunUpdateProcessor写入索引
if (willDistrib) {
cmd.solrDoc = clonedDoc; // 重新赋值给cmd.solrDoc
}
} // end synchronized (bucket)
} finally {
vinfo.unlockForUpdate();
}
return false;
}
RunUpdateProcessor:
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (DistributedUpdateProcessor.isAtomicUpdate(cmd)) {
throw new SolrException
(SolrException.ErrorCode.BAD_REQUEST,
"RunUpdateProcessor has recieved an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain");
}
updateHandler.addDoc(cmd); // 调用DirectUpdateHandler2写入索引
super.processAdd(cmd); // 如果有下个处理链的话,继续往下执行
changesSinceCommit = true;
}
分享到:
相关推荐
介绍Solr的应用场景和使用流程。 3.2 一个简单的例子 包括Solr Schema设计、构建索引和搜索测试。 3.3 搜索引擎的规划设计 3.3.1 定义业务模型 3.3.2 定制索引服务 3.3.3 定制搜索服务 3.4 搜索引擎配置 3.5 如何...
- **3.1.2 SOLR的使用过程说明**:一般流程包括创建索引、添加文档、执行查询和获取结果等步骤。 **3.2 一个简单的例子** - **3.2.1 SolrSchema设计**:定义文档的结构,包括字段名称、字段类型等。 - **3.2.2 ...
- **3.1.2 SOLR的使用过程说明**:概述了从搭建环境到实际应用Solr的整个流程。 **3.2 一个简单的例子** - **3.2.1 SolrSchema设计**:通过一个具体的例子展示如何设计SolrSchema。 - **3.2.2 构建索引**:演示...
- **集成能力**:Solr易于与其他企业应用集成,如CRM、ERP等系统,从而增强整体业务流程的效率。 ### 4. Solr的企业级信息检索系统设计 #### 4.1 系统架构 系统通常包含索引模块、搜索模块、缓存模块和管理模块。...
- **3.1.2 SOLR的使用过程说明**:从索引建立到搜索结果呈现的整个流程。 ##### 3.2 一个简单的例子 - **3.2.1 SolrSchema设计**:为示例项目设计一个简单的索引结构。 - **3.2.2 构建索引**:向Solr添加文档以构建...
##### 4.2 MyBatis的数据持久化 MyBatis框架提供了一种简洁的方式来处理数据持久化操作。通过简单的XML映射文件或注解即可完成对象与数据库表之间的映射,极大地提高了开发效率。 ##### 4.3 Shiro的安全管理 ...
Lucene社区活跃,不断有新的改进和扩展,如Solr和Elasticsearch,它们基于Lucene提供更高级的服务,如分布式搜索、集群管理和数据分析。 总结,通过对“lucene-2.9.0-src.tar.gz”的研究,我们可以深入学习全文检索...
- Spring MVC 4.2 版本可以通过注解 `@CrossOrigin` 来简化跨域配置。 - `@CrossOrigin` 注解可以指定允许的源地址 `origins` 和是否允许携带凭据 `allowCredentials`。 ### 4. 用户购物车逻辑 - **购物车模型**...
然而,随着数据量的增长,性能优化、分布式索引、实时索引等挑战也随之而来,需要开发者灵活运用Lucene的高级特性以及与其他技术(如Solr、Elasticsearch)的集成来解决。 总的来说,“基于lucene 的简单搜索引擎....
##### 4.2 Nutch 工作流程 - **爬虫**:负责抓取网页并解析链接,将数据存入爬取数据库。 - **索引**:将爬取的网页内容转换为索引格式,便于后续搜索。 - **搜索**:用户通过 Web 界面向 Nutch 发送查询请求,...
1.2 分类任务的处理流程 ……·· ……·7 1.3 算法:朴素贝叶斯和 K最近邻……·8 1.3.1 朴素贝叶斯….......……… …….. 8 1.3.2 K 最近邻……………………... 9 1.4 分类效果评估…………·…….. 10 1.5 ...
- 利用Solr、Elasticsearch等分布式搜索引擎。 - 分布式架构可以有效提高系统的扩展性和可用性。 **6.6 本章小结** - 本章详细介绍了如何使用Lucene创建和管理索引库,包括索引库的设计、创建...
Maven定义了软件开发的整套流程体系,并进行了封装,开发人员只需要指定项目的构建流程,无需针对每个流程编写自己的构建脚本。 2、依赖管理。除了项目构建,Maven最核心的功能是软件包的依赖管理,能够自动分析...
36 4.2 Linux 内核编译流程............................................................................... 38 4.2.1 获取内核源码............................................................................