`
m635674608
  • 浏览: 5003481 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

ElasticSearch Recovery 分析

 
阅读更多

上周出现了一次故障,recovery的过程比较慢,然后发现Shard 在做恢复的过程一般都是卡在TRANSLOG阶段,所以好奇这块是怎么完成的,于是有了这篇文章

这是一篇源码分析类的文章,大家需要先建立一个整体的概念,建议参看 这篇文章

另外你可能还需要了解下 Recovery 阶段迁移过程:

INIT -> INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE

概览

Recovery 其实有两种:

  1. Primary的迁移/Replication的生成和迁移
  2. Primary的恢复

org.elasticsearch.indices.cluster.IndicesClusterStateService.clusterChanged 被触发后,会触发applyNewOrUpdatedShards 函数的调用。大家可以跑进去看看,然后跟着文章去浏览整个代码体系,基本能够帮助大家串起整个流程了。

Primary的恢复

这个是一般出现故障集群重启的时候可能遇到的。首先需要从Store里进行恢复。

if (isPeerRecovery(shardRouting)) {
   ......
}
else {
  //走的这个分支
  indexService.shard(shardId).recoverFromStore(shardRouting, 
  new StoreRecoveryService.RecoveryListener() {
}

Primary 进行自我恢复,所以并不需要其他节点的支持。所以判定的函数叫做 isPeerRecovery其实还是挺合适的。

indexService.shard(shardId).recoverFromStore调用的是 org.elasticsearch.index.shard.IndexShard的方法。

  public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
        // we are the first primary, recover from the gateway
        // if its post api allocation, the index should exists
        assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
        final boolean shouldExist = shard.allocatedPostIndexCreate();
        storeRecoveryService.recover(this, shouldExist, recoveryListener);
    }

逻辑还是很清晰的,接着我们进入 org.elasticsearch.index.shard.StoreRecoveryService.recover方法里,这里有个细节需要了解下:

 if (indexShard.routingEntry().restoreSource() != null) {
                indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
            } else {
                indexShard.recovering("from store", RecoveryState.Type.STORE, clusterService.localNode());
            }

我们会根据restoreSource 决定是从SNAPSHOT或者从Store里进行恢复。这里的 indexShard.recovering并没有执行真正的recovering 操作,而是返回了一个recover的信息对象,比如节点什么的。

这正执行recovering 操作是使用了一个新的线程:

threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    final RecoveryState recoveryState = indexShard.recoveryState();
                    if (indexShard.routingEntry().restoreSource() != null) {
                        logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
                        restore(indexShard, recoveryState);
                    } else {
                        logger.debug("starting recovery from shard_store ...");
                        recoverFromStore(indexShard, indexShouldExists, recoveryState);
                    }

这里我们只走一条线,进入 recoverFromStore 方法,该方法会执行索引文件的恢复动作,本质上是进入了 INDEXStage.

接着进行TranslogRecovery了

typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();

我们进入 indexShard.performTranslogRecovery 方法:

  public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
        if (indexExists == false) {
            // note: these are set when recovering from the translog
            final RecoveryState.Translog translogStats = recoveryState().getTranslog();
            translogStats.totalOperations(0);
            translogStats.totalOperationsOnStart(0);
        }
        final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);
        assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
        return recoveredTypes;
    }

这个方法里面,最核心的是 internalPerformTranslogRecovery方法,进入该方法后先进入 VERIFY_INDEXStage,进行索引的校验,校验如果没有问题,就会进入我们期待的 TRANSLOG状态了。

进入 TRANSLOG后,先进行一些设置:

engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);

这里的GC 指的是tranlog日志的删除问题,也就是不允许删除translog,接着会创建一个新的InternalEngine了,然后返回调用 org.elasticsearch.index.shard.TranslogRecoveryPerformer.getRecoveredTypes

不过你看这个代码会比较疑惑,其实我一开始看也觉得纳闷:

  if (skipTranslogRecovery == false) {
            // This will activate our shard so we get our fair share of the indexing buffer during recovery:
            markLastWrite();
        }
        createNewEngine(skipTranslogRecovery, engineConfig);
        return       engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes();

我们并没有看到做translog replay的地方,而从上层的调用方来看:

typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();

performTranslogRecovery 返回后,就立马进入扫尾阶段。 里面唯一的动作是createNewEngine,并且传递了 skipTranslogRecovery参数。 也就说,真正的translog replay动作是在createNewEngine里完成,我们经过探索,发现是在InternalEngine 的初始化过程完成的,具体代码如下:

try {
                if (skipInitialTranslogRecovery) {
                    // make sure we point at the latest translog from now on..
                    commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID));
                } else {
                    recoverFromTranslog(engineConfig, translogGeneration);
                }
            } catch (IOException | EngineException ex) {
                throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex);
            }

里面有个recoverFromTranslog,我们进去瞅瞅:

   final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
        try (Translog.Snapshot snapshot = translog.newSnapshot()) {
            opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
        } catch (Throwable e) {
            throw new EngineException(shardId, "failed to recover from translog", e);
        }

目前来看,所有的Translog recovery 动作其实都是由 TranslogRecoveryPerformer来 完成的。当然这个名字也比较好,翻译过来就是 TranslogRecovery 执行者。先对translog 做一个snapshot,然后根据这个snapshot开始进行恢复,进入 recoveryFromSnapshot 方法我们查看细节,然后会引导你进入

下面的方法:

 public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
        try {
            switch (operation.opType()) {
                case CREATE:
                    Translog.Create create = (Translog.Create) operation;
                    Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
                            source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
                                    .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
                            create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
                    maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
                    if (logger.isTraceEnabled()) {
                        logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
                    }
                    engine.create(engineCreate);
                    break;

终于看到了实际的translog replay 逻辑了。这里调用了标准的InternalEngine.create 方法进行日志的恢复。其实比较有意思的是,我们在日志回放的过程中,依然会继续写translog。这里就会导致一个问题,如果我在做日志回放的过程中, 服务器由当掉了(或者ES instance 重启了),那么就会导致translog 变多了。这个地方是否可以再优化下?

假设我们完成了Translog 回放后,如果确实有重放,那么就行flush动作,删除translog,否则就commit Index。具体逻辑由如下的代码来完成:

if (opsRecovered > 0) {
            logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
                    opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog
                            .currentFileGeneration());
            flush(true, true);
        } else if (translog.isCurrent(translogGeneration) == false) {
            commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
        }

接着就进入了finalizeRecovery,然后,就没然后了。

 indexShard.finalizeRecovery();
            String indexName = indexShard.shardId().index().name();
            for (Map.Entry<String, Mapping> entry : typesToUpdate.entrySet()) {
                validateMappingUpdate(indexName, entry.getKey(), entry.getValue());
            }
            indexShard.postRecovery("post recovery from shard_store");

Primary的迁移/Replication的生成和迁移

一般这种recovery其实就是发生relocation或者调整副本的时候发生的。所以集群是在正常状态,一定有健康的primary shard存在,所以我们也把这种recovery叫做Peer Recovery。 入口和前面的Primary恢复是一样的,代码如下:

if (isPeerRecovery(shardRouting)) {
 //走的这个分支
.....
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
                recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
......           
}
else {
 ......
}

核心代码自然是 recoveryTarget.startRecovery。这里的recoveryTarget的类型是: org.elasticsearch.indices.recovery.RecoveryTarget

startRecovery方法的核心代码是:

threadPool.generic().execute(new RecoveryRunner(recoveryId));

也是启动一个县城异步执行的。RecoveryRunner调用的是RecoveryTarget的 doRecovery方法,在该方法里,会发出一个RPC请求:

final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),        false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());

recoveryStatus.indexShard().prepareForIndexRecovery();
            recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
                @Override
                public void run() throws InterruptedException {
                    responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
                        @Override
                        public RecoveryResponse newInstance() {
                            return new RecoveryResponse();
                        }
                    }).txGet());
                }
            });

这个时候进入 INDEX Stage。 那谁接受处理的呢? 我们先看看现在的类名叫啥? RecoveryTarget。 我们想当然的想,是不是有RecoverySource呢? 发现确实有,而且该类确实也有一个处理类:

 class StartRecoveryTransportRequestHandler extends TransportRequestHandler<StartRecoveryRequest> {
        @Override
        public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
            RecoveryResponse response = recover(request);
            channel.sendResponse(response);
        }
    }

ES里这种通过Netty进行交互的方式,大家可以看看我之前写文章 ElasticSearch Rest/RPC 接口解析

这里我们进入RecoverSource对象的recover方法:

 private RecoveryResponse recover(final StartRecoveryRequest request) {
      .....
      if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) {
            handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
        } else {
            handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
        }
        ongoingRecoveries.add(shard, handler);
        try {
            return handler.recoverToTarget();
        } finally {
            ongoingRecoveries.remove(shard, handler);
        }
 }

我们看到具体负责处理的类是RecoverySourceHandler,之后调用该类的recoverToTarget方法。我对下面的代码做了精简,方便大家看清楚。

public RecoveryResponse recoverToTarget() {
        final Engine engine = shard.engine();
        assert engine.getTranslog() != null : "translog must not be null";
        try (Translog.View translogView = engine.getTranslog().newView()) {

            final SnapshotIndexCommit phase1Snapshot;
            phase1Snapshot = shard.snapshotIndex(false);
            phase1(phase1Snapshot, translogView);

            try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
                phase2(phase2Snapshot);
            } catch (Throwable e) {
                throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
            }

            finalizeRecovery();
        }
        return response;
    }

首先创建一个Translog的视图(创建视图的细节我现在也还没研究),接着的话对当前的索引进行snapshot。 然后进入phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方。之后进入文件清理阶段,然后就进入 translog 阶段:

protected void prepareTargetForTranslog(final Translog.View translogView) {

接着进入第二阶段:

try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
                phase2(phase2Snapshot);           
            }

对当前的translogView 进行一次snapshot,然后进行translog发送:

int totalOperations = sendSnapshot(snapshot);



http://www.w2bc.com/article/155209
分享到:
评论

相关推荐

    elasticsearch-5.1.1.zip

    总结来说,Elasticsearch 5.1.1是一个强大的分布式搜索平台,适用于大数据时代的数据检索、分析和可视化需求。其强大的可扩展性和灵活性,使得它在各种应用场景中都表现出色。无论是小型项目还是大型企业,Elastic...

    【elasticsearch】- Learning elasticsearch

    Elasticsearch 是一款基于 Lucene 的开源搜索和分析引擎,它提供了分布式、实时的全文搜索功能,并且可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。 **1.2 开始使用 Elasticsearch** 本章节将介绍...

    ElasticSearch使用说明书 - 整合篇

    Elasticsearch(ES)是一种分布式、RESTful风格的搜索和数据分析引擎,常用于大数据分析和实时搜索场景。它不依赖传统的SQL数据库语句,而是采用JSON文档格式和HTTP协议进行交互。本文将涵盖ES的安装、配置、Kibana...

    ElasticSearch面试题.pdf

    Elasticsearch 是一个基于 Lucene 的实时的分布式搜索和分析引擎,设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。基于RESTful 接口。 一、Elasticsearch 的介绍 Elasticsearch 是一个基于...

    ElasticSearch、Logstash、Kibana

    - **Recovery(恢复)**:当节点出现故障或新节点加入集群时,Elasticsearch会自动进行数据恢复或重新分布。 - **River**:一种数据源概念,用于将其他存储系统中的数据同步到Elasticsearch。 #### 3. Logstash与...

    Elasticsearch 5.0

    Elasticsearch 5.0 是一个重要的版本更新,它在搜索引擎和大数据分析领域有着广泛的应用。这个版本带来了许多新特性和改进,旨在提升性能、易用性和可扩展性。以下是关于Elasticsearch 5.0的一些关键知识点: 1. **...

    elasticsearch-definitive-guide-master.rar

    9. **Recovery and Replication**:Elasticsearch 的副本机制确保数据冗余,提高系统的耐久性。当主分片失败时,副本分片可以晋升为主分片继续服务。恢复机制则负责在节点故障后重建数据。 10. **插件扩展**:...

    elasticSearch 详细介绍,集群部署,常用API 常用插件 SQL使用

    ElasticSearch (简称 ES) 是一款基于 Lucene 的开源搜索和分析引擎,它提供了强大的分布式搜索功能,适用于大规模数据集的实时搜索场景。ES 使用 Java 开发,并在 Apache 许可下免费发布。作为一个成熟的企业级解决...

    掌控数据流:如何管理 Elasticsearch 的副本

    Elasticsearch 是一款强大的开源搜索与分析引擎,基于 Lucene 开发,适用于大规模数据处理场景。它以其分布式架构、实时搜索能力、高度可扩展性以及丰富的功能集(如全文搜索、复杂查询、聚合分析等)而闻名。其中,...

    Elasticsearch核心知识篇、项目中如何使用、如何优化进阶

    Elasticsearch是一个基于Lucene的开源搜索引擎,它提供了一套完整的分布式环境下的搜索和分析解决方案。由于其高度的可扩展性和强大的实时搜索能力,Elasticsearch已经成为大数据处理领域的热门工具之一。 - **...

    源码es请检查

    **源码分析 Elasticsearch 2.4.5** Elasticsearch 是一个开源的全文搜索引擎,基于 Lucene 库,设计用于分布式、实时、可扩展的数据存储和搜索。在深入理解 Elasticsearch 源码之前,我们需要先了解其核心概念,如...

    Advanced Secondary Voltage Recovery Control.rar_recovery voltage

    本文为直流微电网中的多个dc-ES建立了分布式协同控制框架,并给出了系统的小信号稳定性分析。 主要级别实现下垂控制以协调多个dc-ES的操作。 次级控制基于一致性算法来调节直流母线电压基准,并结合dc-ES之间的充电...

    Learning Apache Flink.pdf

    Apache Flink的生态系统非常丰富,它支持包括Akka、Elasticsearch、Hadoop HDFS、Kafka等在内的多种系统集成。文档可能会讨论这些集成点,包括它们如何与其他系统协作来构建更大型的数据处理解决方案。 在学习和...

    上海工程技术大学在全院范围实施MATLAB和Simulink.pdf

    上海工程技术大学(SUES)在全院范围内实施了MathWorks公司的MATLAB和Simulink软件,这些产品广泛应用于汽车行业。学校此举意在提供给师生和实验室人员进行分析、设计、仿真、代码生成及验证的专业工具。 2. MATLAB...

    文件恢复源码 c# .net

    描述中提到,这段代码可能不如同类专业软件如EaseUS Data Recovery Wizard(简称es)功能强大,但它的优点在于开源和可定制性。对于学习和研究,或者针对特定需求进行修改的开发者来说,这样的源码具有很高的价值。...

    platform-tools_r35.0.0-windows.zip

    这个工具专门用于追踪OpenGL ES的调用,帮助开发者分析和优化图形渲染性能。它可以生成详细的调用日志,显示每个渲染调用的时间和其他相关信息。 6. **Proguard** Proguard是一个代码混淆工具,可以减小APK的大小...

    Oracle9i RAC数据库集群系统死机案例研究.pdf

    软件配置有Oracle9i Release 9.2.0.4企业版带Real Application Clusters,IBM的AIX 5.1操作系统,HACMP/ES CRM 4.4.1等。系统采用卷组管理,如rootvg用于AIX和HACMP,oravg用于Oracle9i系统软件和归档日志,datavg...

    数据库灾备技术方案.pdf

    总的来说,数据库灾备技术方案涵盖了各种灾难恢复体系,如传统数据库分类下的Oracle、MySQL和SQL Server,以及现代的Elasticsearch和MariaDB等。通过分析不同类型的故障场景和相应的解决方法,此方案提供了全面的...

    盖国强-Oracle数据库的DevOps实践-V2.zip

    5. **监控与日志管理**:通过Prometheus、Grafana等工具收集Oracle数据库的性能指标,结合ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志分析,以便快速识别和解决问题。 6. **安全性**:DevOps流程应考虑...

Global site tag (gtag.js) - Google Analytics