`

hbase split log源码分析

阅读更多

  split log过程

  

  在hbase hmaster启动的时候,将hlog移动到split log文件夹下,并处理split log

 (在RS加入到dead serverlist的时候,ServerShutdownHandler也会调用 splitlog方法,代码贴在最后)

  Master负责分发split log任务到zk上

 

master处理split log文件,最终写split log到zk上

regionserver从zk上抢split log任务,将split log读入到内存entry,由writer进程写到hdfs上

 

 

// we recover hbase:meta region servers inside master initialization and
    // handle other failed servers in SSH in order to start up master node ASAP
    Set<ServerName> previouslyFailedServers = this.fileSystemManager
        .getFailedServersFromLogFolders();//从splitting log中,获取之前死了的rs
        
    // remove stale recovering regions from previous run删除正在恢复的region
    this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
    
   // log splitting for hbase:meta server先修复meta表
    ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
    if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
      splitMetaLogBeforeAssignment(oldMetaServerLocation);//恢复meta split log
      // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
      // may also host user regions
    }
    
        status.setStatus("Submitting log splitting work for previously failed region servers");
    // Master has recovered hbase:meta region server and we put
    // other failed region servers in a queue to be handled later by SSH
    //之后恢复其他dead server,非meta表
    for (ServerName tmpServer : previouslyFailedServers) {
      this.serverManager.processDeadServer(tmpServer, true);
    }

 

 

    恢复split log都会进入这样的代码:

    两种方式恢复split log,默认是第二种,第一种replay,是让原来的regionserver自己进行split log的恢复

 

 

   if (this.distributedLogReplay) {
      // In log replay mode, we mark hbase:meta region as recovering in ZK
      Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
      regions.add(HRegionInfo.FIRST_META_REGIONINFO);
      this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
    } else {
      // In recovered.edits mode: create recovered edits file for hbase:meta server
      this.fileSystemManager.splitMetaLog(currentMetaServer);
    }

 

 

    先看一下默认的splitlog的方法

    MasterFileSystem类splitLog方法

 

    /**
   * This method is the base split method that splits HLog files matching a filter. Callers should
   * pass the appropriate filter for meta and non-meta HLogs.
   * @param serverNames
   * @param filter
   * @throws IOException
   */
  public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
    long splitTime = 0, splitLogSize = 0;
    List<Path> logDirs = getLogDirs(serverNames);//获得splitting log的path

    splitLogManager.handleDeadWorkers(serverNames);
    splitTime = EnvironmentEdgeManager.currentTimeMillis();
    //发布split log任务
    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
    splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;

    if (this.metricsMasterFilesystem != null) {//记录split log时间,大小
      if (filter == META_FILTER) {//meta 表
        this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
      } else {//非meta 表
        this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
      }
    }
  }

 

插入他getLogDirs方法的代码,是将hlog rename为split log,并返回path

 private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
    List<Path> logDirs = new ArrayList<Path>();
    boolean needReleaseLock = false;
    if (!this.services.isInitialized()) {
      // during master initialization, we could have multiple places splitting a same wal
      this.splitLogLock.lock();
      needReleaseLock = true;
    }
    try {
      for (ServerName serverName : serverNames) {//将hbase的hlog换成split log
        Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
        Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
        // Rename the directory so a rogue RS doesn't create more HLogs
        if (fs.exists(logDir)) {
          if (!this.fs.rename(logDir, splitDir)) {
            throw new IOException("Failed fs.rename for log split: " + logDir);
          }
          logDir = splitDir;
          LOG.debug("Renamed region directory: " + splitDir);
        } else if (!fs.exists(splitDir)) {
          LOG.info("Log dir for server " + serverName + " does not exist");
          continue;
        }
        logDirs.add(splitDir);
      }
    } finally {
      if (needReleaseLock) {
        this.splitLogLock.unlock();
      }
    }
    return logDirs;
  }

 

 

    splitLogManager类负责split log

    splitLogDistributed

 

   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
      PathFilter filter) throws IOException {
      .......
    TaskBatch batch = new TaskBatch();
    Boolean isMetaRecovery = (filter == null) ? null : false;
    for (FileStatus lf : logfiles) {
      // TODO If the log file is still being written to - which is most likely
      // the case for the last log file - then its length will show up here
      // as zero. The size of such a file can only be retrieved after
      // recover-lease is done. totalSize will be under in most cases and the
      // metrics that it drives will also be under-reported.
      totalSize += lf.getLen();
      //获得split log路径
      String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
      //split log任务添加到zk上
      if (!enqueueSplitTask(pathToLog, batch)) {
        throw new IOException("duplicate log split scheduled for " + lf.getPath());
      }
    }
    waitForSplittingCompletion(batch, status);//等待zk上的split log任务都完成
    // remove recovering regions from ZK
    if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
      // we split meta regions and user regions separately therefore logfiles are either all for
      // meta or user regions but won't for both( we could have mixed situations in tests)
      isMetaRecovery = true;
    }
    this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);//删除已经恢复的regionserver

    if (batch.done != batch.installed) {//有失败的split log抛出异常
      batch.isDead = true;
    .............
    }

 

 

enqueueSplitTask方法,将path放入zk等待rs处理

 

   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
    SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
    // This is a znode path under the splitlog dir with the rest of the path made up of an
    // url encoding of the passed in log to split.
    String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);//log path的获得full name
    lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
    Task oldtask = createTaskIfAbsent(path, batch);
    if (oldtask == null) {//生成split node,等待RS的splitlogwork工作
      // publish the task in zk
      createNode(path, zkretries);
      return true;
    }
    return false;
  }

 

 

 

  当设置了distributedLogReplay为true,则使用masterfilesystem的prepareLogReplay方法

  splitlogmanager方法的markRegionsRecoveringInZK,生成的zk node节点如下,多了RSname,regionname,region sequence id

   Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for

   all regions of the passed in region servers

   而且会为RS的每个region,生成一个zk node

 

 void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
      throws KeeperException {
  
  try {
      this.recoveringRegionLock.lock();
      // mark that we're creating recovering znodes
      this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();

      for (HRegionInfo region : userRegions) {
        String regionEncodeName = region.getEncodedName();
        long retries = this.zkretries;

        do {
          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);//path中添加了rs的region
          long lastRecordedFlushedSequenceId = -1;
          try {
            long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
              regionEncodeName.getBytes());

            /*
             * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
             * flushed sequence id for the server]
             */
            byte[] data = ZKUtil.getData(this.watcher, nodePath);
            if (data == null) {
              ZKUtil.createSetData(this.watcher, nodePath,
                ZKUtil.positionToByteArray(lastSequenceId));
            } else {//根据data更新region的sequence id
              lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
              if (lastRecordedFlushedSequenceId < lastSequenceId) {
                // update last flushed sequence id in the region level
                ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
              }
            }
            // go one level deeper with server name
            nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());//path添加了rs name
            if (lastSequenceId <= lastRecordedFlushedSequenceId) {
              // the newly assigned RS failed even before any flush to the region
              lastSequenceId = lastRecordedFlushedSequenceId;
            }
            ZKUtil.createSetData(this.watcher, nodePath,
              ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));//生成节点,节点data中防止region的sequence id
            LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
                + serverName);

            // break retry loop
            break;
          } catch (KeeperException e) {
            // ignore ZooKeeper exceptions inside retry loop
            if (retries <= 1) {
              throw e;
            }
            // wait a little bit for retry
            try {
              Thread.sleep(20);
            } catch (Exception ignoreE) {
              // ignore
            }
          }
        } while ((--retries) > 0 && (!this.stopper.isStopped()));
      }
    } finally {
      this.recoveringRegionLock.unlock();
    }
    }

 

 

 

    

    下边是恢复非meta表的入口

    serverManager里边processDeadServer,提交ServerShutdownHandler到线程池

 

      public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
    // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
    // in-memory region states, region servers could be down. Meta table can and
    // should be re-assigned, log splitting can be done too. However, it is better to
    // wait till the cleanup is done before re-assigning user regions.
    //
    // We should not wait in the server shutdown handler thread since it can clog
    // the handler threads and meta table could not be re-assigned in case
    // the corresponding server is down. So we queue them up here instead.
    if (!services.getAssignmentManager().isFailoverCleanupDone()) {
      requeuedDeadServers.put(serverName, shouldSplitHlog);
      return;
    }

    this.deadservers.add(serverName);
    this.services.getExecutorService().submit(
      new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
          shouldSplitHlog));//执行handler
  }
  

 

 

 

  在ServerShutdownHandler的process里,进行split log

  

  ------------------------------------------

  2.regionserver的splitlogworker取zk split node,进行恢复

 

 

 
   @Override
  public void run() {
    try {
      LOG.info("SplitLogWorker " + this.serverName + " starting");
      this.watcher.registerListener(this);
      boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
      if (distributedLogReplay) {
        // initialize a new connection for splitlogworker configuration
        HConnectionManager.getConnection(conf);
      }

      // wait for master to create the splitLogZnode
      int res = -1;
      while (res == -1 && !exitWorker) {
        try {
          res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);//等待master创建splitlog zk node
        } catch (KeeperException e) {
          // ignore
          LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
        }
        if (res == -1) {
          try {
            LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
                + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
                "exiting anyway)"));
            exitWorker = true;
            break;
          }
        }
      }

      if (!exitWorker) {
        taskLoop();//处理zk上的splitting zk node
      }
   }

 

 

 

        childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
            this.watcher.splitLogZNode);//所有region server在抢zk上的split log节点

 

 

 

  在看一下taskLoop方法

 

    private void taskLoop() {
    while (!exitWorker) {
      int seq_start = taskReadySeq;
      List<String> paths = getTaskList();//抢得split log task路径
      if (paths == null) {
        LOG.warn("Could not get tasks, did someone remove " +
            this.watcher.splitLogZNode + " ... worker thread exiting.");
        return;
      }
      // pick meta wal firstly
      int offset = (int) (Math.random() * paths.size());//随机取个开始位置
      for(int i = 0; i < paths.size(); i ++){
        if(HLogUtil.isMetaFile(paths.get(i))) {//先处理meta结尾的文件
          offset = i;
          break;
        }
      }
      int numTasks = paths.size();
      for (int i = 0; i < numTasks; i++) {
        int idx = (i + offset) % paths.size();
        // don't call ZKSplitLog.getNodeName() because that will lead to
        // double encoding of the path name
        if (this.calculateAvailableSplitters(numTasks) > 0) {
        	//生成split log线程执行
          grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
        } else {
          LOG.debug("Current region server " + this.serverName + " has "
              + this.tasksInProgress.get() + " tasks in progress and can't take more.");
          break;
        }
        if (exitWorker) {
          return;
        }
      }

 

 

 

  grabTask方法生成HLogSplitterHandler线程类

 

 

 private void grabTask(String path) {
    Stat stat = new Stat();
    long t = -1;
    byte[] data;
    synchronized (grabTaskLock) {
      currentTask = path;
      workerInGrabTask = true;
      if (Thread.interrupted()) {
        return;
      }
    }
    try {
      try {
        if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {//没拿到zk data,返回
          SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
          return;
        }
      } catch (KeeperException e) {
        LOG.warn("Failed to get data for znode " + path, e);
        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
        return;
      }
      SplitLogTask slt;
      try {
        slt = SplitLogTask.parseFrom(data);
      } catch (DeserializationException e) {
        LOG.warn("Failed parse data for znode " + path, e);
        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
        return;
      }
      if (!slt.isUnassigned()) {//split log task状态不为未发布,返回
        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
        return;
      }

      //尝试设置zk node的version
      currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion());
      if (currentVersion < 0) {//没能设置zk node version
        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
        return;
      }

      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
    	  //zk node path没有子目录,为rescan node,直接设置done
        HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName),
          SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
        return;
      }

      LOG.info("worker " + serverName + " acquired task " + path);
      SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
      getDataSetWatchAsync();
      //处理hlog split,提交一个HLogSplitterHandler进程到线程池
      submitTask(path, currentVersion, this.report_period);

      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
      try {
        int sleepTime = RandomUtils.nextInt(500) + 500;
        Thread.sleep(sleepTime);
      } catch (InterruptedException e) {
        LOG.warn("Interrupted while yielding for other region servers", e);
        Thread.currentThread().interrupt();
      }
    } finally {
      synchronized (grabTaskLock) {
        workerInGrabTask = false;
        // clear the interrupt from stopTask() otherwise the next task will
        // suffer
        Thread.interrupted();
      }
    }
  }

 

 

最终执行类:
HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
            fs, conf, p, sequenceIdChecker, watcher){
            
        //split hlog 的read类 HLog.Reader
        in = getReader(fs, logfile, conf, skipErrors, reporter);
        
      outputSink.setReporter(reporter);//启动split hlog恢复写线程类,负责将内存entry,写入hlog
      outputSink.startWriterThreads();
            
        while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
                           
        entryBuffers.appendEntry(entry);//向buffer中添加一个hlog entry
        }
            
}


HLogSplitter里会初始化writer类

    this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);//生成hlogentry的writer线程,将内存写入hdfs
    if (zkw != null && this.distributedLogReplay) {
      outputSink = new LogReplayOutputSink(numWriterThreads);
    } else {
      if (this.distributedLogReplay) {
        LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
      }
      this.distributedLogReplay = false;
      outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
    }
       

 

 

----------------------------------------

在贴一个split log的入口代码

ServerShutdownHandler process

      try {
        if (this.shouldSplitHlog) {//split log
          LOG.info("Splitting logs for " + serverName + " before assignment.");
          if (this.distributedLogReplay) {
            LOG.info("Mark regions in recovery before assignment.");
            Set<ServerName> serverNames = new HashSet<ServerName>();
            serverNames.add(serverName);
            this.services.getMasterFileSystem().prepareLogReplay(serverNames);
          } else {
            this.services.getMasterFileSystem().splitLog(serverName);
          }
          am.getRegionStates().logSplit(serverName);
        } else {
          LOG.info("Skipping log splitting for " + serverName);
        }
      } catch (IOException ioe) {
        resubmit(serverName, ioe);
      }

 

 

 

 

 

1
0
分享到:
评论

相关推荐

    hbase权威指南源码

    通过分析和实践《HBase权威指南》的源码,读者不仅可以深化理论知识,还能掌握实际操作技巧,为解决实际项目中的问题提供有力支持。对于想深入理解HBase工作原理和优化技巧的开发者来说,这份源码是一份宝贵的资源。

    基于Java语言的HBase数据库设计源码分析

    该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。

    基于Java语言的Apache HBase数据库设计源码分析

    该项目为Apache HBase数据库设计源码,采用Java语言编写,包含5428个文件,其中4589个为Java源文件,222个为Ruby脚本,118个为XML配置文件,其余文件包括Shell脚本、Python脚本、HTML、CSS、JavaScript、C++、PHP、C...

    HBase开启审计日志

    log4j.appender.RFAS.File=${hbase.log.dir}/${hbase.security.log.file} log4j.appender.RFAS.MaxFileSize=${hbase.security.log.maxfilesize} log4j.appender.RFAS.MaxBackupIndex=${hbase.security.log....

    hbase-0.98.1源码包

    5. 并发控制:学习RegionSplitPolicy、RegionSplitter等类,理解HBase如何处理并发请求和Region分裂。 6. 客户端API:研究HBase客户端如何通过Table、Get、Put、Scan等对象进行数据操作。 通过阅读源码,开发者可以...

    HBase源码分析

    HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...

    HBase源码分析与开发实战

    HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解

    HBase实战源码

    源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...

    基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip

    基于Java语言的HBase分布式数据库设计源码分析

    本项目为基于Java语言的HBase分布式数据库设计源码,包含5415个文件,涵盖了4575个Java源文件、223个Ruby脚本、118个XML...该源码是GitHub上HBase项目的核心代码,适用于需要深入理解和分析HBase架构与实现细节的场景。

    基于kafka和spark streaming和hbase的日志统计分析系统.zip

    基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的...

    基于Java开发的分布式NoSQL数据库HBase设计源码分析

    该项目为基于Java开发的分布式NoSQL数据库HBase的设计源码,包含5289个文件,涵盖各类编程语言和文件类型,其中Java源文件4465个,Ruby脚本221个,XML配置112个,Protobuf定义66个,以及少量Shell、Python、...

    hbase源码分析

    ### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...

    hbase源码包和测试用例

    HBase的源码分析有助于理解其内部工作原理。例如,`HRegionServer`是数据服务的主要组件,负责Region的管理和数据操作;`HMaster`负责Region的分配和负载均衡;`HStore`管理Column Family,包含一系列的`HStoreFile...

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...

    hbase 1.2.0源码

    HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,对于想要深入理解HBase工作原理或者进行大数据分析的学习者来说,研究其源码是非常有价值的。 一、HBase架构与核心概念 1. 表与Region:HBase中的...

    hbase 源码包

    HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...

    Hbase权威指南源码

    《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其源码提供了对HBase工作原理和实现细节的深入理解。HBase,作为Apache Hadoop生态系统中的一个关键组件,是为大规模数据存储设计的高性能、...

Global site tag (gtag.js) - Google Analytics