split log过程
在hbase hmaster启动的时候,将hlog移动到split log文件夹下,并处理split log
(在RS加入到dead serverlist的时候,ServerShutdownHandler也会调用 splitlog方法,代码贴在最后)
Master负责分发split log任务到zk上
master处理split log文件,最终写split log到zk上
regionserver从zk上抢split log任务,将split log读入到内存entry,由writer进程写到hdfs上
// we recover hbase:meta region servers inside master initialization and // handle other failed servers in SSH in order to start up master node ASAP Set<ServerName> previouslyFailedServers = this.fileSystemManager .getFailedServersFromLogFolders();//从splitting log中,获取之前死了的rs // remove stale recovering regions from previous run删除正在恢复的region this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers); // log splitting for hbase:meta server先修复meta表 ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation(); if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) { splitMetaLogBeforeAssignment(oldMetaServerLocation);//恢复meta split log // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it // may also host user regions } status.setStatus("Submitting log splitting work for previously failed region servers"); // Master has recovered hbase:meta region server and we put // other failed region servers in a queue to be handled later by SSH //之后恢复其他dead server,非meta表 for (ServerName tmpServer : previouslyFailedServers) { this.serverManager.processDeadServer(tmpServer, true); }
恢复split log都会进入这样的代码:
两种方式恢复split log,默认是第二种,第一种replay,是让原来的regionserver自己进行split log的恢复
if (this.distributedLogReplay) { // In log replay mode, we mark hbase:meta region as recovering in ZK Set<HRegionInfo> regions = new HashSet<HRegionInfo>(); regions.add(HRegionInfo.FIRST_META_REGIONINFO); this.fileSystemManager.prepareLogReplay(currentMetaServer, regions); } else { // In recovered.edits mode: create recovered edits file for hbase:meta server this.fileSystemManager.splitMetaLog(currentMetaServer); }
先看一下默认的splitlog的方法
MasterFileSystem类splitLog方法
/** * This method is the base split method that splits HLog files matching a filter. Callers should * pass the appropriate filter for meta and non-meta HLogs. * @param serverNames * @param filter * @throws IOException */ public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException { long splitTime = 0, splitLogSize = 0; List<Path> logDirs = getLogDirs(serverNames);//获得splitting log的path splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); //发布split log任务 splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; if (this.metricsMasterFilesystem != null) {//记录split log时间,大小 if (filter == META_FILTER) {//meta 表 this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize); } else {//非meta 表 this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize); } } }
插入他getLogDirs方法的代码,是将hlog rename为split log,并返回path
private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException { List<Path> logDirs = new ArrayList<Path>(); boolean needReleaseLock = false; if (!this.services.isInitialized()) { // during master initialization, we could have multiple places splitting a same wal this.splitLogLock.lock(); needReleaseLock = true; } try { for (ServerName serverName : serverNames) {//将hbase的hlog换成split log Path logDir = new Path(this.rootdir, HLogUtil.getHLogDirectoryName(serverName.toString())); Path splitDir = logDir.suffix(HLog.SPLITTING_EXT); // Rename the directory so a rogue RS doesn't create more HLogs if (fs.exists(logDir)) { if (!this.fs.rename(logDir, splitDir)) { throw new IOException("Failed fs.rename for log split: " + logDir); } logDir = splitDir; LOG.debug("Renamed region directory: " + splitDir); } else if (!fs.exists(splitDir)) { LOG.info("Log dir for server " + serverName + " does not exist"); continue; } logDirs.add(splitDir); } } finally { if (needReleaseLock) { this.splitLogLock.unlock(); } } return logDirs; }
splitLogManager类负责split log
splitLogDistributed
public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, PathFilter filter) throws IOException { ....... TaskBatch batch = new TaskBatch(); Boolean isMetaRecovery = (filter == null) ? null : false; for (FileStatus lf : logfiles) { // TODO If the log file is still being written to - which is most likely // the case for the last log file - then its length will show up here // as zero. The size of such a file can only be retrieved after // recover-lease is done. totalSize will be under in most cases and the // metrics that it drives will also be under-reported. totalSize += lf.getLen(); //获得split log路径 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); //split log任务添加到zk上 if (!enqueueSplitTask(pathToLog, batch)) { throw new IOException("duplicate log split scheduled for " + lf.getPath()); } } waitForSplittingCompletion(batch, status);//等待zk上的split log任务都完成 // remove recovering regions from ZK if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { // we split meta regions and user regions separately therefore logfiles are either all for // meta or user regions but won't for both( we could have mixed situations in tests) isMetaRecovery = true; } this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);//删除已经恢复的regionserver if (batch.done != batch.installed) {//有失败的split log抛出异常 batch.isDead = true; ............. }
enqueueSplitTask方法,将path放入zk等待rs处理
boolean enqueueSplitTask(String taskname, TaskBatch batch) { SplitLogCounters.tot_mgr_log_split_start.incrementAndGet(); // This is a znode path under the splitlog dir with the rest of the path made up of an // url encoding of the passed in log to split. String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);//log path的获得full name lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); Task oldtask = createTaskIfAbsent(path, batch); if (oldtask == null) {//生成split node,等待RS的splitlogwork工作 // publish the task in zk createNode(path, zkretries); return true; } return false; }
当设置了distributedLogReplay为true,则使用masterfilesystem的prepareLogReplay方法
splitlogmanager方法的markRegionsRecoveringInZK,生成的zk node节点如下,多了RSname,regionname,region sequence id
Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
all regions of the passed in region servers
而且会为RS的每个region,生成一个zk node
void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions) throws KeeperException { try { this.recoveringRegionLock.lock(); // mark that we're creating recovering znodes this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis(); for (HRegionInfo region : userRegions) { String regionEncodeName = region.getEncodedName(); long retries = this.zkretries; do { String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);//path中添加了rs的region long lastRecordedFlushedSequenceId = -1; try { long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId( regionEncodeName.getBytes()); /* * znode layout: .../region_id[last known flushed sequence id]/failed server[last known * flushed sequence id for the server] */ byte[] data = ZKUtil.getData(this.watcher, nodePath); if (data == null) { ZKUtil.createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); } else {//根据data更新region的sequence id lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); if (lastRecordedFlushedSequenceId < lastSequenceId) { // update last flushed sequence id in the region level ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); } } // go one level deeper with server name nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());//path添加了rs name if (lastSequenceId <= lastRecordedFlushedSequenceId) { // the newly assigned RS failed even before any flush to the region lastSequenceId = lastRecordedFlushedSequenceId; } ZKUtil.createSetData(this.watcher, nodePath, ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));//生成节点,节点data中防止region的sequence id LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server " + serverName); // break retry loop break; } catch (KeeperException e) { // ignore ZooKeeper exceptions inside retry loop if (retries <= 1) { throw e; } // wait a little bit for retry try { Thread.sleep(20); } catch (Exception ignoreE) { // ignore } } } while ((--retries) > 0 && (!this.stopper.isStopped())); } } finally { this.recoveringRegionLock.unlock(); } }
下边是恢复非meta表的入口
serverManager里边processDeadServer,提交ServerShutdownHandler到线程池
public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) { // When assignment manager is cleaning up the zookeeper nodes and rebuilding the // in-memory region states, region servers could be down. Meta table can and // should be re-assigned, log splitting can be done too. However, it is better to // wait till the cleanup is done before re-assigning user regions. // // We should not wait in the server shutdown handler thread since it can clog // the handler threads and meta table could not be re-assigned in case // the corresponding server is down. So we queue them up here instead. if (!services.getAssignmentManager().isFailoverCleanupDone()) { requeuedDeadServers.put(serverName, shouldSplitHlog); return; } this.deadservers.add(serverName); this.services.getExecutorService().submit( new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName, shouldSplitHlog));//执行handler }
在ServerShutdownHandler的process里,进行split log
------------------------------------------
2.regionserver的splitlogworker取zk split node,进行恢复
@Override public void run() { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); if (distributedLogReplay) { // initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); } // wait for master to create the splitLogZnode int res = -1; while (res == -1 && !exitWorker) { try { res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);//等待master创建splitlog zk node } catch (KeeperException e) { // ignore LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); } if (res == -1) { try { LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); Thread.sleep(1000); } catch (InterruptedException e) { LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode + (exitWorker ? "" : " (ERROR: exitWorker is not set, " + "exiting anyway)")); exitWorker = true; break; } } } if (!exitWorker) { taskLoop();//处理zk上的splitting zk node } }
childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.splitLogZNode);//所有region server在抢zk上的split log节点
在看一下taskLoop方法
private void taskLoop() { while (!exitWorker) { int seq_start = taskReadySeq; List<String> paths = getTaskList();//抢得split log task路径 if (paths == null) { LOG.warn("Could not get tasks, did someone remove " + this.watcher.splitLogZNode + " ... worker thread exiting."); return; } // pick meta wal firstly int offset = (int) (Math.random() * paths.size());//随机取个开始位置 for(int i = 0; i < paths.size(); i ++){ if(HLogUtil.isMetaFile(paths.get(i))) {//先处理meta结尾的文件 offset = i; break; } } int numTasks = paths.size(); for (int i = 0; i < numTasks; i++) { int idx = (i + offset) % paths.size(); // don't call ZKSplitLog.getNodeName() because that will lead to // double encoding of the path name if (this.calculateAvailableSplitters(numTasks) > 0) { //生成split log线程执行 grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); } else { LOG.debug("Current region server " + this.serverName + " has " + this.tasksInProgress.get() + " tasks in progress and can't take more."); break; } if (exitWorker) { return; } }
grabTask方法生成HLogSplitterHandler线程类
private void grabTask(String path) { Stat stat = new Stat(); long t = -1; byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { return; } } try { try { if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {//没拿到zk data,返回 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); return; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); return; } SplitLogTask slt; try { slt = SplitLogTask.parseFrom(data); } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); return; } if (!slt.isUnassigned()) {//split log task状态不为未发布,返回 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); return; } //尝试设置zk node的version currentVersion = attemptToOwnTask(true, watcher, serverName, path, stat.getVersion()); if (currentVersion < 0) {//没能设置zk node version SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); return; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { //zk node path没有子目录,为rescan node,直接设置done HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); return; } LOG.info("worker " + serverName + " acquired task " + path); SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); getDataSetWatchAsync(); //处理hlog split,提交一个HLogSplitterHandler进程到线程池 submitTask(path, currentVersion, this.report_period); // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks try { int sleepTime = RandomUtils.nextInt(500) + 500; Thread.sleep(sleepTime); } catch (InterruptedException e) { LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } } finally { synchronized (grabTaskLock) { workerInGrabTask = false; // clear the interrupt from stopTask() otherwise the next task will // suffer Thread.interrupted(); } } }
最终执行类: HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), fs, conf, p, sequenceIdChecker, watcher){ //split hlog 的read类 HLog.Reader in = getReader(fs, logfile, conf, skipErrors, reporter); outputSink.setReporter(reporter);//启动split hlog恢复写线程类,负责将内存entry,写入hlog outputSink.startWriterThreads(); while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { entryBuffers.appendEntry(entry);//向buffer中添加一个hlog entry } } HLogSplitter里会初始化writer类 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);//生成hlogentry的writer线程,将内存写入hdfs if (zkw != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(numWriterThreads); } else { if (this.distributedLogReplay) { LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly."); } this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); }
----------------------------------------
在贴一个split log的入口代码
ServerShutdownHandler process
try { if (this.shouldSplitHlog) {//split log LOG.info("Splitting logs for " + serverName + " before assignment."); if (this.distributedLogReplay) { LOG.info("Mark regions in recovery before assignment."); Set<ServerName> serverNames = new HashSet<ServerName>(); serverNames.add(serverName); this.services.getMasterFileSystem().prepareLogReplay(serverNames); } else { this.services.getMasterFileSystem().splitLog(serverName); } am.getRegionStates().logSplit(serverName); } else { LOG.info("Skipping log splitting for " + serverName); } } catch (IOException ioe) { resubmit(serverName, ioe); }
相关推荐
HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。
### HBase源码分析——关键知识点详解 #### 一、HBase性能测试总结与环境配置 **测试环境:** - **硬件配置:** - 1台客户端机器 - 5台RegionServer服务器 - 1台Master服务器 - 3台Zookeeper服务器 - **软件...
实训商业源码-不锈钢金属制品类pbootcms网站模板-毕业设计.zip
内容概要:本文介绍了基于飞思卡尔MC9S12XEP100的新能源汽车整车控制器(VCU)解决方案。该方案涵盖了从硬件设计到软件实现的各个方面,包括C源文件、程序变量表格、DBC数据库、原理图、接口定义以及PCB图纸等。文中详细描述了VCU方案的技术细节和支持功能,如CANBOOTLOADER更新下载程序、周立工USB接口等功能,旨在提高系统性能和稳定性,为用户提供全面的二次开发支持。 适合人群:从事新能源汽车研发的技术人员、电子工程师、车辆控制系统开发者。 使用场景及目标:适用于新能源汽车整车控制器的设计、开发和测试阶段,帮助工程师理解和优化VCU的工作机制,提升车辆的整体性能和可靠性。 其他说明:该方案不仅提供了详尽的技术资料,还附带了实用工具,如上位机下载软件和bootS19文件,极大地方便了用户的实际操作和维护。
实训商业源码-(自适应手机端)响应式室内建筑设计工程公司网站pbootcms模板 装修设计公司网站源码-毕业设计.zip
内容概要:本文详细介绍如何使用最小支持向量机(LSSVM)进行多列输入单列输出的拟合预测建模。首先简述了LSSVM的基本概念及其相对于传统SVM的优势,接着逐步讲解了从数据准备到模型构建、训练以及最终预测评估的具体流程。文中提供了详尽的Python代码示例,涵盖数据读取、预处理、模型选择、参数设置、训练验证等关键环节,确保读者能够快速上手并应用于实际问题解决。 适合人群:对机器学习有一定了解,特别是想要深入理解和支持向量机理论与实践的研究人员和技术开发者。 使用场景及目标:适用于需要处理多维特征数据并进行数值型输出预测的任务,如金融风险评估、环境监测预报等领域。通过本教程的学习,可以掌握LSSVM的工作机制,学会搭建高效稳定的预测系统。 其他说明:文中提供的代码片段可以直接运行,只需替换相应路径下的数据文件即可投入使用。同时鼓励读者尝试调整不同参数组合以优化模型性能。
实训商业源码-网址导航-毕业设计.zip
内容概要:本文详细介绍了用于光伏系统的改进型MPPT(最大功率点跟踪)算法的C语言实现及其实际应用效果。文中首先解释了传统MPPT算法存在的问题,如在最大功率点附近的不稳定性和响应速度慢等问题。接着展示了改进型MPPT算法的核心代码,该算法结合了电导增量法和传统的扰动观察法,在不同条件下选择最合适的追踪方式。特别是在接近最大功率点时,通过动态调整步长来减少振荡,提高稳定性。此外,还提供了基于STM32G431处理器的实际测试数据对比,证明改进后的算法在多种环境条件下均表现出更好的性能,尤其是在低光照条件下的优势明显。最后提到了一些硬件移植过程中遇到的问题及解决方案。 适合人群:对光伏发电技术感兴趣的工程师和技术爱好者,尤其是那些希望深入了解MPPT算法原理并将其应用于实际项目的人群。 使用场景及目标:适用于需要优化光伏系统发电效率的研究人员或开发者,旨在帮助他们掌握一种更加高效稳定的MPPT算法实现方法。 其他说明:文中不仅有详细的理论讲解,还有具体的代码片段以及实验数据支持,为读者提供了一个完整的从理论到实践的学习路径。同时,针对可能出现的技术难题给出了相应的解决办法。
医院网络运营人员的业绩提成考核方案.doc
电子商务网站规划书.doc
实训商业源码-宽屏大气的机械设备网站源码-毕业设计.zip
论文模板-大气的影视公司网站源码-实训商业源码.zip
实训商业源码-空间规划-毕业设计.zip
工程项目管理中风险防控签证管理及反索赔.ppt
连锁超市销量数据集包含date store-id product-id category price quantity-sold
论文模板-发卡网-实训商业源码.zip
内容概要:本文详细介绍了在MATLAB环境下实现小波图像分解重构以及多种图像去噪算法的方法。首先讲解了小波图像分解重构的具体步骤,包括读取图像、选择小波基函数、进行多级小波分解和重构。接着分别阐述了NLM(非局部均值)、中值滤波、DNCNN(深度监督网络级联)、BM3D(块匹配和三维滤波)和均值滤波五种常见图像去噪算法的实现流程,每种算法都提供了详细的代码实现步骤和技术要点。最后指出,这些算法的具体实现需要根据应用场景调整参数以达到最佳效果。 适合人群:从事图像处理研究的技术人员、高校相关专业学生、对MATLAB编程有一定基础的学习者。 使用场景及目标:适用于科研项目、教学实验、工业应用等领域,旨在帮助用户掌握不同类型的图像去噪技术和小波变换的基本原理及其在实际问题中的应用。 其他说明:文中提供的代码仅为基本框架,实际操作时可根据特定任务的需求进一步优化和改进。同时,对于初学者而言,建议先从简单的算法入手,逐步深入理解各算法的工作机制。
顺丰历史快递查询_13343003637_1.zip
工程项目管理中风险防控、签证管理及反索赔.ppt
内容概要:本文介绍了基于C#开发的工业自动化控制通讯协议库,旨在为工业自动化领域提供一站式技术解决方案。该库不仅支持多种通信协议如串口通信、TCP/IP、UDP、CAN总线、Modbus TCP/RTU/DTU、Profibus、OPC UA、OPC DA等,还集成了数据库操作(MySQL、EF6+MySQL、EF6+SQLite)、Excel表格操作、RabbitMQ消息队列管理和常用的数据转换功能。文中详细解析了库的核心技术和应用场景,强调了其在工业自动化项目中的重要性和实用性。此外,还提到了技术公共群的存在,帮助用户解决技术难题并促进技术交流。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是需要处理复杂通信协议和数据处理任务的专业人士。 使用场景及目标:适用于各类工业自动化项目,特别是那些涉及多个通信协议和数据处理需求的项目。目标是提升项目效率,确保系统稳定运行,并通过技术公共群获得持续的技术支持。 其他说明:该库提供了详细的使用说明文档,便于用户快速上手。同时,技术公共群为用户提供了一个交流平台,有助于共同解决问题和分享经验。