`
hongs_yang
  • 浏览: 60588 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

Hlog的相关处理流程不完全分析

阅读更多

 

Hlog的相关处理流程:

 

在对hbase中数据进行更新操作put/delete/append/increment操作时,记录操作日志供日志重播的相关处理。

 

Hlog的写入通过regionserver实例生成时生成的FSLog为的实例。

 

通过LogRoller线程定期去检查并删除过期的日志文件。

 

定期检查的时间间隔通过hbase.regionserver.logroll.period进行配置,默认为3600000ms

 

如果检查时间还没有达到上面的间隔时间时,线程等待的wake(唤醒)时间,hbase.server.thread.wakefrequency

 

默认为10*1000ms

 

FSLog实例内部有一个FSLog.LogSyncer线程实例,并启动此实例。此线程主要用来把log写入到hdfs

 

此线程的执行间隔通过hbase.regionserver.optionallogflushinterval配置,默认为1000ms

 

如果此值配置为小于0的值,表示实时写入hlog日志

 

 

 

在执行更新操作时如果开启有日志记录,调用appendNoSync-->append写入日志,

每一个日志中记录的seqid默认第一次时为当前rs中所有region中最大的seqid加一(FSHLog.logSeqNum),

 

每append一次后,logSeqNum的值为加一。同时此值也是flsuh时hfile中的fileinfo中记录的最大的seqid值。

 

此方法把要记录的日志写入到FSLog.LogSyncer.pendingWrites队列中。等待LogSyncerrun方法去sync

 

注意:如果是对metalog写入时,每一次写入都会执行sync操作,保证meta数据的不丢失。

 

 

 

publicvoidrun() {

 

try {

 

// awaiting with a timeout doesn't always

 

// throw exceptions on interrupt

 

while(!this.isInterrupted() && !closeLogSyncer.get()) {

 

 

 

try {

 

如果没有最新的数据要写入到HDFS,现在未flushlog的值小于或等于上一次提交的值,表示没有新记录

 

unflushedEntries在每append一次值会加一。

 

 

 

if (unflushedEntries.get() <= syncedTillHere) {

 

synchronized (closeLogSyncer) {

 

线程等待

 

closeLogSyncer.wait(this.optionalFlushInterval);

 

}

 

}

 

// Calling sync since we waited or had unflushed entries.

 

// Entries appended but not sync'd are taken care of here AKA

 

// deferred log flush

 

否则,执行log的写入HDFS操作。使用unflushedEntries的值当txid

 

sync();

 

} catch (IOException e) {

 

LOG.error("Error while syncing, requesting close of hlog ", e);

 

requestLogRoll();

 

Threads.sleep(this.optionalFlushInterval);

 

}

 

}

 

} catch (InterruptedException e) {

 

LOG.debug(getName() + " interrupted while waiting for sync requests");

 

} finally {

 

LOG.info(getName() + " exiting");

 

}

 

}

 

 

 

使用unflushedEntries的值当成txid

 

private void syncer() throws IOException {

 

syncer(this.unflushedEntries.get()); // sync all pending items

 

}

 

 

 

 

 

private void syncer(long txid) throws IOException {

 

// if the transaction that we are interested in is already

 

// synced, then return immediately.

 

检查日志是否有更新,如果没有,直接返回,减少检查过程

 

if (txid <= this.syncedTillHere) {

 

return;

 

}

 

WritertempWriter;

 

synchronized (this.updateLock) {

 

if (this.closed) return;

 

// Guaranteed non-null.

 

// Note that parallel sync can close tempWriter.

 

// The current method of dealing with this is to catch exceptions.

 

// See HBASE-4387, HBASE-5623, HBASE-7329.

 

tempWriter = this.writer;

 

}

 

try {

 

longdoneUpto;

 

longnow = EnvironmentEdgeManager.currentTimeMillis();

 

// First flush all the pending writes to HDFS. Then

 

// issue the sync to HDFS. If sync is successful, then update

 

// syncedTillHere to indicate that transactions till this

 

// number has been successfully synced.

 

IOException ioe = null;

 

List<Entry> pending = null;

 

synchronized (flushLock) {

 

if (txid <= this.syncedTillHere) {

 

return;

 

}

 

得到当前未flush的最新的txid

 

doneUpto = this.unflushedEntries.get();

 

得到需要写入到HDFS的日志记录列表

 

pending = logSyncer.getPendingWrites();

 

try {

 

添加到HLOG的写入OUTPUT

 

logSyncer.hlogFlush(tempWriter, pending);

 

postAppend(pending);

 

} catch(IOException io) {

 

ioe = io;

 

LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);

 

}

 

}

 

if (ioe != null && pending != null) {

 

synchronized (this.updateLock) {

 

synchronized (flushLock) {

 

// HBASE-4387, HBASE-5623, retry with updateLock held

 

tempWriter = this.writer;

 

logSyncer.hlogFlush(tempWriter, pending);

 

postAppend(pending);

 

}

 

}

 

}

 

// another thread might have sync'ed avoid double-sync'ing

 

if (txid <= this.syncedTillHere) {

 

return;

 

}

 

try {

 

if (tempWriter != null) {

 

tempWriter.sync();

 

postSync();

 

}

 

} catch(IOException ex) {

 

synchronized (this.updateLock) {

 

// HBASE-4387, HBASE-5623, retry with updateLock held

 

// TODO: we don't actually need to do it for concurrent close - what is the point

 

// of syncing new unrelated writer? Keep behavior for now.

 

tempWriter = this.writer;

 

if (tempWriter != null) {

 

执行HDFS的文件写入sync,表示数据持久化

 

tempWriter.sync();

 

postSync();

 

}

 

}

 

}

 

更新当前syncedTillHere的值为unflushedEntries的值,主要用来检查下次是否需要更新记录

 

 

 

this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);

 

 

 

this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);

 

// TODO: preserving the old behavior for now, but this check is strange. It's not

 

// protected by any locks here, so for all we know rolling locks might start

 

// as soon as we enter the "if". Is this best-effort optimization check?

 

if (!this.logRollRunning) {

 

checkLowReplication();

 

try {

 

如果当前HLOG文件的大小超过了指定的log文件大小,

 

通过hbase.regionserver.hlog.blocksize配置,默认hdfsblocksize大小 *

 

hbase.regionserver.logroll.multiplier的值,默认0.95

 

curLogSize = tempWriter.getLength();

 

if (curLogSize > this.logrollsize) {

 

调用LogRoller.logRollRequested方法

 

requestLogRoll();

 

}

 

} catch (IOException x) {

 

LOG.debug("Log roll failed and will be retried. (This is not an error)");

 

}

 

}

 

} catch (IOException e) {

 

LOG.fatal("Could not sync. Requesting roll of hlog", e);

 

requestLogRoll();

 

throwe;

 

}

 

}

 

 

 

LogRoller.logRollRequested方法流程:

 

此方法主要用来叫醒LogRoller线程本身。

 

public void logRollRequested() {

 

synchronized (rollLog) {

 

rollLog.set(true);

 

rollLog.notifyAll();

 

}

 

}

 

 

 

LogRoller.run方法:

 

public void run() {

 

while (!server.isStopped()) {

 

longnow = System.currentTimeMillis();

 

booleanperiodic = false;

 

如果是Hlog写入的时候显示调用了 logRollRequested方法,下面的if 不进去

 

if (!rollLog.get()) {

 

检查上次更新时间是否大过更新间隔时间,如果没有,线程等待

 

通过hbase.regionserver.logroll.period进行配置,默认为3600000ms

 

periodic = (now - this.lastrolltime) > this.rollperiod;

 

如果是定期检查,同时上一次roll log的时间还不到间隔的时间,线程等待

 

线程等待超时通过hbase.server.thread.wakefrequency,默认为10*1000ms

 

if (!periodic) {

 

synchronized (rollLog) {

 

try {

 

rollLog.wait(this.threadWakeFrequency);

 

} catch (InterruptedException e) {

 

// Fall through

 

}

 

}

 

continue;

 

}

 

// Time for periodic roll

 

if (LOG.isDebugEnabled()) {

 

LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");

 

}

 

} elseif (LOG.isDebugEnabled()) {

 

LOG.debug("HLog roll requested");

 

}

 

rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH

 

try {

 

更新最后一次roll log的时间为当前时间

 

this.lastrolltime = now;

 

// This is array of actual region names.

 

调用FSHLog.rollWriter方法,检查文件个数是否达到指定的值,

 

如果是:把所有region已经flushhlog文件移动到oldWALs目录下。

 

并取出未移动同时已经关闭writerhlog file的最小的一个seqidhlog path,

 

把所有未flushregionseqid小于此值的region返回回来。

 

byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());

 

if (regionsToFlush != null) {

 

发起flush请求,强制所有的region进行flush操作。

 

for (byte [] r: regionsToFlush) scheduleFlush(r);

 

}

 

} catch (FailedLogCloseException e) {

 

server.abort("Failed log close in log roller", e);

 

} catch (java.net.ConnectException e) {

 

server.abort("Failed log close in log roller", e);

 

} catch (IOException ex) {

 

// Abort if we get here. We probably won't recover an IOE. HBASE-1132

 

server.abort("IOE in log roller",

 

RemoteExceptionHandler.checkIOException(ex));

 

} catch (Exception ex) {

 

LOG.error("Log rolling failed", ex);

 

server.abort("Log rolling failed", ex);

 

} finally {

 

try {

 

rollLog.set(false);

 

} finally {

 

rollLock.unlock();

 

}

 

}

 

}

 

LOG.info("LogRoller exiting.");

 

}

 

 

 

FSHLog.rollWriter方法流程分析:

 

把所有的region中已经flush完成的最小的seqidoldhlog文件进行比对,

 

如果old hlogseqid小于指定的flushid把这些hlog移动到oldWALs目录下去。

 

如果关闭的hlog文件大于hbase.regionserver.maxlogs配置的的值,默认为32个,

 

返回所有的已经关闭的hlog中所有region未进行flushseqid,

 

 

 

 

 

publicbyte [][] rollWriter(booleanforce)

 

throws FailedLogCloseException, IOException {

 

synchronized (rollWriterLock) {

 

// Return if nothing to flush.

 

FSHLog中的numEntries的值每次append时,都会加一

 

if (!force && this.writer != null && this.numEntries.get() <= 0) {

 

returnnull;

 

}

 

byte [][] regionsToFlush = null;

 

if (closed) {

 

LOG.debug("HLog closed. Skipping rolling of writer");

 

returnnull;

 

}

 

try {

 

....................此处部分代码没有显示

 

 

 

filenum在第一次是值为-1,第一次进行入是在FSHLog实例生成时通过此方法得到Writer实例

 

longcurrentFilenum = this.filenum;

 

Path oldPath = null;

 

如果是第二次指定此方法,也就是LogRoller显示调用时

 

if (currentFilenum > 0) {

 

//computeFilename will take care of meta hlog filename

 

得到上一个hlog文件的路径,也就是准备关闭的文件路径

 

oldPath = computeFilename(currentFilenum);

 

}

 

生成一个新的HLOG文件名称,并创建文件路径

 

this.filenum = System.currentTimeMillis();

 

Path newPath = computeFilename();

 

while (fs.exists(newPath)) {

 

this.filenum++;

 

newPath = computeFilename();

 

}

 

 

 

....................此处部分代码没有显示

 

 

 

生成一个新的Writer实例,默认实现是ProtobufLogWriter,通过hbase.regionserver.hlog.writer.impl配置

 

FSHLog.WriternextWriter = this.createWriterInstance(fs, newPath, conf);

 

// Can we get at the dfsclientoutputstream?

 

FSDataOutputStream nextHdfsOut = null;

 

if (nextWriterinstanceof ProtobufLogWriter) {

 

得到writer对应的HDFS路径的OutputStream

 

nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();

 

// perform the costly sync before we get the lock to roll writers.

 

try {

 

nextWriter.sync();

 

} catch (IOException e) {

 

// optimization failed, no need to abort here.

 

LOG.warn("pre-sync failed", e);

 

}

 

}

 

 

 

Path oldFile = null;

 

intoldNumEntries = 0;

 

synchronized (updateLock) {

 

// Clean up current writer.

 

oldNumEntries = this.numEntries.get();

 

关闭上一个HLOGHDFS的文件流,并把writer设置为null

 

同时把关闭的hlog文件的的seqid与路径写入到outputfilesmap列表中

 

oldFile = cleanupCurrentWriter(currentFilenum);

 

把当前新的hlog文件的writeroutputstream设置给FSHLOG实例

 

this.writer = nextWriter;

 

this.hdfs_out = nextHdfsOut;

 

this.numEntries.set(0);

 

}

 

 

 

....................此处部分代码没有显示

 

 

 

....................此处部分代码没有显示

 

 

 

// Can we delete any of the old log files?

 

如果outputfiles列表中有值,检查log的个数是否达到指定的个数,

 

if (getNumRolledLogFiles() > 0) {

 

检查log目录下的文件个数是否达到指定的个数,hbase.regionserver.maxlogs配置的的值,默认为32

 

如果达到指定的个数,找到所有region中已经flush完成的最小的一个seqid,

 

并从outputfiles列表中取出比此seqid小的hlog文件,把这些文件移动到oldWALs路径下。

 

CleanOldLogs();

 

取出outputfiles列表中的最小的一个seqid,

 

检查如果未flushregionseqid是否小于此seqid,如果是返回这些region

 

regionsToFlush = getRegionsToForceFlush();

 

}

 

} finally {

 

this.logRollRunning = false;

 

closeBarrier.endOp();

 

}

 

returnregionsToFlush;

 

}

 

}

 

 

0
0
分享到:
评论

相关推荐

    hlog,日志,调试

    在IT行业中,日志(log)是开发和调试过程中不可或缺的一部分。日志系统,如“hlog”,是用来记录应用程序运行时的各种事件、错误、警告和信息的重要工具。它为开发者提供了一种跟踪程序行为,诊断问题,以及优化...

    4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康

    在海康威视的系统中,“hlog.dll”可能用于处理与摄像机通信、参数设置等相关操作。 远程配置海康威视相机,意味着我们可以不在物理位置上直接接触设备,而是通过网络来调整和管理相机的设置。这在大规模监控系统中...

    HLog.rar_日志

    "HLog.rar_日志"这个压缩包可能包含了一个专为多线程环境设计的日志记录插件,我们可以从以下几个方面来理解和探讨相关知识点: 1. **日志的重要性**:日志记录是系统运行过程中的足迹,它记录了程序的执行轨迹,...

    HLog:一个简单的c日志工具

    在软件开发过程中,日志记录是必不可少的一个环节,它能够帮助开发者追踪程序运行状态、定位错误、分析性能等。本文将详细介绍一款名为HLog的C语言实现的日志工具,带你了解其设计理念、功能特性以及如何在项目中...

    hlog:用 Go 编写的(现已不存在的)Hakka Logs Web 服务的简单命令行客户端

    hlog是一个超级简单的 Hakka Logs CLI 客户端。 只需设置您的~/.hakkarc文件,然后运行hlog 来发布日志。 配置文件 hlog 在$HOME/.hakkarc .hakkarc 中查找其配置,格式如下: [logs] token = your_hakka_logs_...

    hbase的rowkey设计与hbase的协处理器运用.docx

    HBase 协处理器是一种高级的数据处理机制,可以对数据进行实时处理和分析。协处理器可以应用于数据清洁、数据转换、数据聚合等场景。 HBase 是一个功能强大且高性能的 NoSQL 数据库,具有广泛的应用前景。 RowKey ...

    HLog:Burpsuite HTTP 插件,主要用于内网测试,可定制Content-Type和Response Content

    导入后,可以在BurpSuite的界面中找到HLog的相关选项,设置自定义的Content-Type和Response Content。在进行内网测试时,插件会自动处理经过的HTTP请求,按照设定的方式改变Content-Type和响应内容,帮助测试者快速...

    4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.rar.rar

    标题中的“4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.rar.rar”暗示了本压缩包文件包含的是关于海康威视相机参数远程配置的相关资料,尤其是涉及到一个名为“hlog.dll”的动态链接库文件。...

    4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.zip

    标题 "4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.zip" 暗示了这个压缩包包含了一组与海康威视相机远程配置相关的源代码和可能的动态链接库文件(hlog.dll)。海康威视是一家知名的安防监控...

    南开大学20秋学期《大数据开发技术(一)》在线作业-1.pdf

    这篇资料涉及到了大数据开发技术相关的多个知识点,包括数据库类型、云数据库服务、大数据特性、数据库优点、计算模式、可视化工具、物联网技术、MapReduce工作流程以及大数据处理框架和工具等。下面将对这些知识点...

    HBase原理——要弄懂的sequenceId

    有的朋友要问为什么需要关联这两者,那笔者这里提出三个相关问题:1.Memstore中的数据flush到HDFS文件中后HLog对应的数据是不是就可以被删除了?不然HLog会无限增长!那问题来了,Memstore中被flush到HDFS的数据,如何...

    7.2-列存储数据库-3.pptx

    "列存储数据库-HBASE关键算法和应用" HBASE是基于列存储的NoSQL数据库,它...2. 实时数据处理和分析。 3. 海量数据存储和检索。 HBASE是一个基于列存储的NoSQL数据库,它的关键算法和应用场景在这里进行了详细介绍。

    微信3.1.0.72 运行日志HOOK 源码

    微信3.1.0.72 运行日志HOOK源码分析 微信作为中国乃至全球最流行的即时通讯工具之一,其内部的运行机制一直是开发者和技术爱好者...但务必注意,任何与隐私和许可相关的操作都需要谨慎处理,以避免不必要的法律风险。

    大数据分布并行处理试题及答案.docx

    - HDFS设计运行在廉价硬件上,高效存储大量大文件,流式访问数据是其优势之一,但不擅长处理大量小文件。 - HDFS的最小存储单位是Block,而不是元数据节点。 - 创建HDFS文件夹的命令是`$hadoop fs -mkdir ...

    HBase源码分析

    4. **请求处理流程:** - 读取每个请求的头部和内容,然后将其放入优先级队列(PriorityQueue)中。 - 启动一个Responder线程,用于将响应队列中的数据写回到客户端的连接通道中。 - 默认超时时间为15分钟。 - ...

    南开大学20秋学期《大数据开发技术(一)》在线作业-1.docx

    本文主要涉及大数据开发技术的相关知识点,包括数据库类型、云计算服务、大数据处理模式、可视化工具、物联网技术以及大数据框架的工作流程。 1. 数据库类型: - 列族数据库(Column Family Database):如HBase,...

    hbase源码分析

    ### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...

    大数据分布并行处理试题及答案.pdf

    - Hadoop生态系统包括HBase、Zookeeper、Flume等工具,Storm也是一个大数据处理框架,但不属于Hadoop的直接组成部分。 - 在Linux发行版中,Red Hat、CentOS和Ubuntu是常见的选择,而IOS是一个操作系统,不属于...

    大数据技术分享22.pptx

    大数据技术是现代信息技术领域的重要组成部分,它涉及到数据的收集、存储、处理和分析等多个环节。本分享将主要围绕数据存储、数据分析、实时计算、数据传输和数据采集这五个方面展开。 首先,数据存储是大数据的...

    21春南开大学《大数据开发技术(一)》在线作业参考答案.docx

    - 批量数据处理是指对大规模数据进行一次性处理,常用于数据分析、报表生成等场景。MySQL和Oracle是传统的关系型数据库,适合结构化数据的存储和查询,但它们在处理大规模批量数据时可能效率较低。 - Hbase是一种...

Global site tag (gtag.js) - Google Analytics