Hlog的相关处理流程:
在对hbase中数据进行更新操作put/delete/append/increment操作时,记录操作日志供日志重播的相关处理。
Hlog的写入通过regionserver实例生成时生成的FSLog为的实例。
通过LogRoller线程定期去检查并删除过期的日志文件。
定期检查的时间间隔通过hbase.regionserver.logroll.period进行配置,默认为3600000ms
如果检查时间还没有达到上面的间隔时间时,线程等待的wake(唤醒)时间,hbase.server.thread.wakefrequency,
默认为10*1000ms
FSLog实例内部有一个FSLog.LogSyncer线程实例,并启动此实例。此线程主要用来把log写入到hdfs中
此线程的执行间隔通过hbase.regionserver.optionallogflushinterval配置,默认为1000ms
如果此值配置为小于0的值,表示实时写入hlog日志
在执行更新操作时如果开启有日志记录,调用appendNoSync-->append写入日志,
每一个日志中记录的seqid默认第一次时为当前rs中所有region中最大的seqid加一(FSHLog.logSeqNum),
每append一次后,logSeqNum的值为加一。同时此值也是flsuh时hfile中的fileinfo中记录的最大的seqid值。
此方法把要记录的日志写入到FSLog.LogSyncer.pendingWrites队列中。等待LogSyncer的run方法去sync
注意:如果是对meta的log写入时,每一次写入都会执行sync操作,保证meta数据的不丢失。
publicvoidrun() {
try {
// awaiting with a timeout doesn't always
// throw exceptions on interrupt
while(!this.isInterrupted() && !closeLogSyncer.get()) {
try {
如果没有最新的数据要写入到HDFS,现在未flush的log的值小于或等于上一次提交的值,表示没有新记录
unflushedEntries在每append一次值会加一。
if (unflushedEntries.get() <= syncedTillHere) {
synchronized (closeLogSyncer) {
线程等待
closeLogSyncer.wait(this.optionalFlushInterval);
}
}
// Calling sync since we waited or had unflushed entries.
// Entries appended but not sync'd are taken care of here AKA
// deferred log flush
否则,执行log的写入HDFS操作。使用unflushedEntries的值当txid
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
Threads.sleep(this.optionalFlushInterval);
}
}
} catch (InterruptedException e) {
LOG.debug(getName() + " interrupted while waiting for sync requests");
} finally {
LOG.info(getName() + " exiting");
}
}
使用unflushedEntries的值当成txid
private void syncer() throws IOException {
syncer(this.unflushedEntries.get()); // sync all pending items
}
private void syncer(long txid) throws IOException {
// if the transaction that we are interested in is already
// synced, then return immediately.
检查日志是否有更新,如果没有,直接返回,减少检查过程
if (txid <= this.syncedTillHere) {
return;
}
WritertempWriter;
synchronized (this.updateLock) {
if (this.closed) return;
// Guaranteed non-null.
// Note that parallel sync can close tempWriter.
// The current method of dealing with this is to catch exceptions.
// See HBASE-4387, HBASE-5623, HBASE-7329.
tempWriter = this.writer;
}
try {
longdoneUpto;
longnow = EnvironmentEdgeManager.currentTimeMillis();
// First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
IOException ioe = null;
List<Entry> pending = null;
synchronized (flushLock) {
if (txid <= this.syncedTillHere) {
return;
}
得到当前未flush的最新的txid
doneUpto = this.unflushedEntries.get();
得到需要写入到HDFS的日志记录列表
pending = logSyncer.getPendingWrites();
try {
添加到HLOG的写入OUTPUT中
logSyncer.hlogFlush(tempWriter, pending);
postAppend(pending);
} catch(IOException io) {
ioe = io;
LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
}
}
if (ioe != null && pending != null) {
synchronized (this.updateLock) {
synchronized (flushLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncer.hlogFlush(tempWriter, pending);
postAppend(pending);
}
}
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
return;
}
try {
if (tempWriter != null) {
tempWriter.sync();
postSync();
}
} catch(IOException ex) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
// TODO: we don't actually need to do it for concurrent close - what is the point
// of syncing new unrelated writer? Keep behavior for now.
tempWriter = this.writer;
if (tempWriter != null) {
执行HDFS的文件写入sync,表示数据持久化
tempWriter.sync();
postSync();
}
}
}
更新当前syncedTillHere的值为unflushedEntries的值,主要用来检查下次是否需要更新记录
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
// TODO: preserving the old behavior for now, but this check is strange. It's not
// protected by any locks here, so for all we know rolling locks might start
// as soon as we enter the "if". Is this best-effort optimization check?
if (!this.logRollRunning) {
checkLowReplication();
try {
如果当前HLOG文件的大小超过了指定的log文件大小,
通过hbase.regionserver.hlog.blocksize配置,默认hdfs的blocksize大小 *
hbase.regionserver.logroll.multiplier的值,默认0.95
curLogSize = tempWriter.getLength();
if (curLogSize > this.logrollsize) {
调用LogRoller.logRollRequested方法
requestLogRoll();
}
} catch (IOException x) {
LOG.debug("Log roll failed and will be retried. (This is not an error)");
}
}
} catch (IOException e) {
LOG.fatal("Could not sync. Requesting roll of hlog", e);
requestLogRoll();
throwe;
}
}
LogRoller.logRollRequested方法流程:
此方法主要用来叫醒LogRoller线程本身。
public void logRollRequested() {
synchronized (rollLog) {
rollLog.set(true);
rollLog.notifyAll();
}
}
LogRoller.run方法:
public void run() {
while (!server.isStopped()) {
longnow = System.currentTimeMillis();
booleanperiodic = false;
如果是Hlog写入的时候显示调用了 logRollRequested方法,下面的if 不进去
if (!rollLog.get()) {
检查上次更新时间是否大过更新间隔时间,如果没有,线程等待
通过hbase.regionserver.logroll.period进行配置,默认为3600000ms
periodic = (now - this.lastrolltime) > this.rollperiod;
如果是定期检查,同时上一次roll log的时间还不到间隔的时间,线程等待
线程等待超时通过hbase.server.thread.wakefrequency,默认为10*1000ms
if (!periodic) {
synchronized (rollLog) {
try {
rollLog.wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// Fall through
}
}
continue;
}
// Time for periodic roll
if (LOG.isDebugEnabled()) {
LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
}
} elseif (LOG.isDebugEnabled()) {
LOG.debug("HLog roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
更新最后一次roll log的时间为当前时间
this.lastrolltime = now;
// This is array of actual region names.
调用FSHLog.rollWriter方法,检查文件个数是否达到指定的值,
如果是:把所有region已经flush的hlog文件移动到oldWALs目录下。
并取出未移动同时已经关闭writer的hlog file的最小的一个seqid的hlog path,
把所有未flush的region中seqid小于此值的region返回回来。
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
发起flush请求,强制所有的region进行flush操作。
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
} catch (FailedLogCloseException e) {
server.abort("Failed log close in log roller", e);
} catch (java.net.ConnectException e) {
server.abort("Failed log close in log roller", e);
} catch (IOException ex) {
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
server.abort("IOE in log roller",
RemoteExceptionHandler.checkIOException(ex));
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
server.abort("Log rolling failed", ex);
} finally {
try {
rollLog.set(false);
} finally {
rollLock.unlock();
}
}
}
LOG.info("LogRoller exiting.");
}
FSHLog.rollWriter方法流程分析:
把所有的region中已经flush完成的最小的seqid与old的hlog文件进行比对,
如果old hlog的seqid小于指定的flushid把这些hlog移动到oldWALs目录下去。
如果关闭的hlog文件大于hbase.regionserver.maxlogs配置的的值,默认为32个,
返回所有的已经关闭的hlog中所有region未进行flush的seqid,
publicbyte [][] rollWriter(booleanforce)
throws FailedLogCloseException, IOException {
synchronized (rollWriterLock) {
// Return if nothing to flush.
FSHLog中的numEntries的值每次append时,都会加一
if (!force && this.writer != null && this.numEntries.get() <= 0) {
returnnull;
}
byte [][] regionsToFlush = null;
if (closed) {
LOG.debug("HLog closed. Skipping rolling of writer");
returnnull;
}
try {
....................此处部分代码没有显示
filenum在第一次是值为-1,第一次进行入是在FSHLog实例生成时通过此方法得到Writer实例
longcurrentFilenum = this.filenum;
Path oldPath = null;
如果是第二次指定此方法,也就是LogRoller显示调用时
if (currentFilenum > 0) {
//computeFilename will take care of meta hlog filename
得到上一个hlog文件的路径,也就是准备关闭的文件路径
oldPath = computeFilename(currentFilenum);
}
生成一个新的HLOG文件名称,并创建文件路径
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
while (fs.exists(newPath)) {
this.filenum++;
newPath = computeFilename();
}
....................此处部分代码没有显示
生成一个新的Writer实例,默认实现是ProtobufLogWriter类,通过hbase.regionserver.hlog.writer.impl配置
FSHLog.WriternextWriter = this.createWriterInstance(fs, newPath, conf);
// Can we get at the dfsclientoutputstream?
FSDataOutputStream nextHdfsOut = null;
if (nextWriterinstanceof ProtobufLogWriter) {
得到writer对应的HDFS路径的OutputStream
nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
// perform the costly sync before we get the lock to roll writers.
try {
nextWriter.sync();
} catch (IOException e) {
// optimization failed, no need to abort here.
LOG.warn("pre-sync failed", e);
}
}
Path oldFile = null;
intoldNumEntries = 0;
synchronized (updateLock) {
// Clean up current writer.
oldNumEntries = this.numEntries.get();
关闭上一个HLOG在HDFS的文件流,并把writer设置为null
同时把关闭的hlog文件的的seqid与路径写入到outputfiles的map列表中
oldFile = cleanupCurrentWriter(currentFilenum);
把当前新的hlog文件的writer与outputstream设置给FSHLOG实例
this.writer = nextWriter;
this.hdfs_out = nextHdfsOut;
this.numEntries.set(0);
}
....................此处部分代码没有显示
....................此处部分代码没有显示
// Can we delete any of the old log files?
如果outputfiles列表中有值,检查log的个数是否达到指定的个数,
if (getNumRolledLogFiles() > 0) {
检查log目录下的文件个数是否达到指定的个数,hbase.regionserver.maxlogs配置的的值,默认为32个
如果达到指定的个数,找到所有region中已经flush完成的最小的一个seqid,
并从outputfiles列表中取出比此seqid小的hlog文件,把这些文件移动到oldWALs路径下。
CleanOldLogs();
取出outputfiles列表中的最小的一个seqid,
检查如果未flush的region的seqid是否小于此seqid,如果是返回这些region
regionsToFlush = getRegionsToForceFlush();
}
} finally {
this.logRollRunning = false;
closeBarrier.endOp();
}
returnregionsToFlush;
}
}
相关推荐
4. **请求处理流程:** - 读取每个请求的头部和内容,然后将其放入优先级队列(PriorityQueue)中。 - 启动一个Responder线程,用于将响应队列中的数据写回到客户端的连接通道中。 - 默认超时时间为15分钟。 - ...
HBase的运行机制涉及系统架构、Region服务器的运行原理、Store和HLog的工作原理。这些机制共同保障了HBase的稳定运行。 4.5 应用方案 HBase在性能优化和性能检测方面有一系列的策略和组件。通过这些方法,可以显著...
基于万能逼近原理的自适应模糊控制算法在多自由度AUV运动控制中的应用与抗干扰补偿Simulink仿真研究,自适应模糊控制算法的万能逼近原理与多自由度AUV运动控制的抗干扰补偿技术——基于Simulink的仿真研究,万能逼近原理自适应模糊控制算法的多自由度AUV运动控制抗干扰补偿simulink仿真 ,核心关键词:万能逼近原理; 自适应模糊控制算法; 多自由度AUV运动控制; 抗干扰补偿; Simulink仿真。,基于万能逼近的模糊控制算法多自由度AUV抗干扰补偿Simulink仿真
deepseek最新资讯、配置方法、使用技巧,持续更新中
deepseek最新资讯、配置方法、使用技巧,持续更新中
结合扩展卡尔曼滤波与滑模观测器的策略:优化电角度估计,反电势波形逼近完美正弦波,结合扩展卡尔曼滤波与滑模观测器的反电势波形优化:正弦波形展现近乎完美精度,电角度估算与实际应用差异微小,扩展卡尔曼滤波与滑模观测器的结合,反电势波形近乎完美的正弦波形,观测器估算转子电角度与实际电角度相差0.3弧度左右,转速跟随效果较好。 ,核心关键词:扩展卡尔曼滤波; 滑模观测器; 反电势波形; 转子电角度估算; 转速跟随效果。,卡尔曼滑模观测器:优化正弦波转子角度与转速估算
毕业设计_基于springboot+vue的**学生公寓管理系统**【源码+sql+可运行】【**50217**】.zip 全部代码均可运行,亲测可用,尽我所能,为你服务; 1.代码压缩包内容 代码:springboo后端代码+vue前端页面代码; 脚本:数据库SQL脚本 效果图:运行结果请看资源详情效果图 2.环境准备: - JDK1.8+ - maven3.6+ - nodejs14+ - mysql5.6+ - redis 3.技术栈 - 后台:springboot+mybatisPlus+Shiro - 前台:vue+iview+Vuex+Axios - 开发工具: idea、navicate 4.功能列表 - 系统设置:用户管理、角色管理、资源管理、系统日志 - **业务管理:业务管理:公寓信息、房间信息、入住记录、学生信息** 3.运行步骤: 步骤一:修改数据库连接信息(ip、port修改) 步骤二:找到启动类xxxApplication启动 4.若不会,可私信博主!!!
1、文件内容:xorg-x11-server-source-1.20.4-29.el7_9.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/xorg-x11-server-source-1.20.4-29.el7_9.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊
1、文件内容:yum-plugin-ps-1.1.31-54.el7_8.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/yum-plugin-ps-1.1.31-54.el7_8.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊
基于模型预测控制(MPC)的无人船与无人车编队一致性协同控制研究(附原文献),基于模型预测控制(MPC)的无人船与无人车编队一致性协同控制研究(附原文献),无人船编队 无人车编队 MPC 模型预测控制 多智能体协同控制 一致性 MATLAB 无人车 USV 带原文献 ,无人船编队; 无人车编队; MPC 模型预测控制; 多智能体协同控制; 一致性; MATLAB; USV; 原文献,无人系统协同控制:MPC模型预测控制下的多智能体编队与一致性研究(原文献支撑)
4套中级通信工程师综合真题及答案(2019,2020,2021,2023),适用于需要考中级通信工程师的人群
deepseek最新资讯,配置方法,使用技巧,持续更新中
基于matlab的锁相环PLL相位噪声拟合仿真代码集合:多个版本建模与仿真,高质量的锁相环PLL仿真代码集合:Matlab与Simulink建模研究,[1]锁相环 PLL 几个版本的matlab相位噪声拟合仿真代码,质量杠杠的,都是好东西 [2]锁相环matlab建模稳定性仿真,好几个版本 [3]锁相环2.4G小数分频 simulink建模仿真 ,PLL; Matlab相位噪声拟合仿真; Matlab建模稳定性仿真; 锁相环2.4G小数分频Simulink建模仿真,MATLAB仿真系列:锁相环PLL及分频器建模仿真
exceptionLogs.zip
基于光伏微网的经济性与并网负荷波动率双目标优化调度策略:蓄电池与V2G协同管理策略仿真研究,MATLAB下光储充微网结合电动汽车V2G的多目标协同调度策略研究:经济性与并网负荷波动性的对比分析,MATLAB代码:考虑V2G的光储充一体化微网多目标优化调度策略 关键词:光储充微网 电电汽车V2G 多目标优化 蓄电池优化 调度 参考文档:《光伏微网下考虑V2G补偿蓄电池容量的双目标优化调度策略》,已经投稿EI会议,中文说明文档可联系我咨询 仿真平台:MATLAB 平台 优势:代码注释详实,适合参考学习,相关成果已经采用,程序非常精品,请仔细辨识 主要内容:过建立光伏微网中以经济性和并网负荷波动率为双目标的蓄电池和V2G的协同调度模型。 采用粒子群算法,对电网、微网调度中心和电动汽车用户三方在无、无序、转移和调度V2G电动汽车负荷四种运行模式下的经济和安全影响进行对比。 最后,根据算例分析,求解四种模式下两级负荷曲线及经济收益表。 对比分析得出,引入V2G可以替代部分容量的蓄电池,使光伏微网在负荷峰谷平抑、三方经济和安全等方面进一步优化。 求解采用的是PSO算法(粒子群算法),求解效果极
javascript 动态网页设计期末大作业(自己手写的,高分期末作业),含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来,简单部署,就可以使用。该项目可以直接作为毕设、期末大作业使用,代码都在里面,系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值,项目都经过严格调试,确保可以运行! javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期末大作业(自己手写的,高分期末作业)javascript 动态网页设计期
混合智能体系统编队控制:分布式优化与15异构混合阶的挑战,异构混合阶智能体系统编队控制的分布式优化策略研究,15异构混合阶多智能体系统编队控制的分布式优化(无参考文献) ,核心关键词:15异构混合阶; 多智能体系统; 编队控制; 分布式优化; 无参考文献。,15混合阶多智能体系统编队分布式优化控制
javascript 动态网页设计期末大作业(自己手写的,很适合期末作业),含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来,简单部署,就可以使用。该项目可以直接作为毕设、期末大作业使用,代码都在里面,系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值,项目都经过严格调试,确保可以运行! javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascript 动态网页设计期末大作业(自己手写的,很适合期末作业)javascrip
X光安检OPIXray数据集已经转换为VOC格式,可直接转换为为YOLO
DataX--Web:图形化界面简化大数据任务管理_datax-web