HBase的批量put操作主要步骤
1.同个region的put视为同一批操作
2.对批量操作按rowkey进行字节排序
Collections.sort(actionsForRegion);
3.检查region server的全局内存是否超过阀值,如超过则唤醒flush线程进行flush操作
public void reclaimMemStoreMemory() { //如果超过高水位,默认为堆内存的0.4,阻塞rpc线程直到内存减少到预期 if (isAboveHighWaterMark()) { lock.lock(); try { boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { ..... //给flush线程提交一个task wakeupFlushThread(); try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. flushOccurred.await(5, TimeUnit.SECONDS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } .... } finally { lock.unlock(); } } //如果超过低水位,默认为堆内存的0.35,给flush线程提交一个task,不阻塞线程 else if (isAboveLowWaterMark()) { wakeupFlushThread(); } }
4.检查这个region的memstore内存大小是否超过限制,超过则唤醒flush线程对该region进行flush,异步操作
private void checkResources() throws RegionTooBusyException, InterruptedIOException { ..... boolean blocked = false; long startTime = 0; //当前region内存大小超过blockingMemStoreSize,默认为memstoreFlushSize的2被,memstoreFlushSize默认128M while (this.memstoreSize.get() > this.blockingMemStoreSize) { //给flush线程发个请求 requestFlush(); 。。。。。 blocked = true; //等待一段时间,10s synchronized(this) { try { wait(Math.min(timeToWait, threadWakeFrequency)); } catch (InterruptedException ie) { final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; if (totalTime > 0) { this.updatesBlockedMs.add(totalTime); } LOG.info("Interrupted while waiting to unblock updates for region " + this + " '" + Thread.currentThread().getName() + "'"); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } ...... }
5.拿行锁,如果拿不到锁,则不处理
private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { //检查row的范围是否在这个region里 checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); //行锁是一个Latch,释放的时候Latch减1,等待线程就会被唤醒 CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { //put一把 CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); //如果锁不存在,则认为拿到锁 if (existingLatch == null) { break; } //已经有锁了,则等待锁释放或超时 else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + Bytes.toStringBinary(row)); } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id //锁id是一个原子递增的整数 while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }6.修改KeyValue的timestamp为当前时间
7.拿mvcc的写事务id
public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { //事务id是一个原子递增的long long nextWriteNumber = ++memstoreWrite; //entry用来存这个事务的状态,是否已完成 WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } }8.写入memstore的内存kv列表
private long internalAdd(final KeyValue toAdd) { //堆内存加了多少 long s = heapSizeChange(toAdd, this.kvset.add(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); return s; }9.写Hlog,但不flush,仍在内存
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd, boolean doSync) throws IOException { ...... long txid = 0; synchronized (this.updateLock) { //log的序列号 long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); //region第一个修改的事务id,flush时所有大于等于该值的entry都会被写入文件 this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); //事务id,代表第几条log txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { lastDeferredTxid = txid; } } // Sync if catalog region, and if not then check if that table supports // deferred log flushing if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system this.sync(txid); } return txid; }
写log的cache // appends new writes to the pendingWrites. It is better to keep it in // our own queue rather than writing it to the HDFS output stream because // HDFSOutputStream.writeChunk is not lightweight at all. synchronized void append(Entry e) throws IOException { pendingWrites.add(e); }10.释放行锁
public void releaseRowLock(final Integer lockId) { if (lockId == null) return; // null lock id, do nothing //先删除lock id HashedBytes rowKey = lockIds.remove(lockId); if (rowKey == null) { LOG.warn("Release unknown lockId: " + lockId); return; } //再删除lock CountDownLatch rowLatch = lockedRows.remove(rowKey); if (rowLatch == null) { LOG.error("Releases row not locked, lockId: " + lockId + " row: " + rowKey); return; } //lock释放 rowLatch.countDown(); }11.flush Hlog到HDFS
// sync all transactions upto the specified txid private void syncer(long txid) throws IOException { Writer tempWriter; synchronized (this.updateLock) { if (this.closed) return; tempWriter = this.writer; // guaranteed non-null } // if the transaction that we are interested in is already // synced, then return immediately. //当前flush到第一个日志了,有可能已经被其他rpc线程flush掉了 if (txid <= this.syncedTillHere) { return; } try { long doneUpto; long now = System.currentTimeMillis(); // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. synchronized (flushLock) { if (txid <= this.syncedTillHere) { return; } doneUpto = this.unflushedEntries.get(); //当前所有cache的log List<Entry> pending = logSyncerThread.getPendingWrites(); try { //写,但没sync到HDFS logSyncerThread.hlogFlush(tempWriter, pending); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; logSyncerThread.hlogFlush(tempWriter, pending); } } } // another thread might have sync'ed avoid double-sync'ing if (txid <= this.syncedTillHere) { return; } try { //sync到HDFS,写失败重试一次 tempWriter.sync(); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; tempWriter.sync(); } } //当前已sync的日志 this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); ...... } catch (IOException e) { LOG.fatal("Could not sync. Requesting close of hlog", e); //回滚。 requestLogRoll(); throw e; } }
@Override public void append(HLog.Entry entry) throws IOException { entry.setCompressionContext(compressionContext); try { //SequenceFile写入 this.writer.append(entry.getKey(), entry.getEdit()); } catch (NullPointerException npe) { // Concurrent close... throw new IOException(npe); } }
12.修改mvcc的读事务id
public void completeMemstoreInsert(WriteEntry e) { //递增读事务id advanceMemstore(e); //等待之前的请求全部完成 waitForRead(e); }
boolean advanceMemstore(WriteEntry e) { synchronized (writeQueue) { //事务结束 e.markCompleted(); long nextReadValue = -1; boolean ranOnce=false; //遍历队列,拿到最近已完成的事务id,如果中间有一个请求还未完成,则可能拿到的事务id比当前事务小 while (!writeQueue.isEmpty()) { ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); if (nextReadValue > 0) { if (nextReadValue+1 != queueFirst.getWriteNumber()) { throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + nextReadValue + " next: " + queueFirst.getWriteNumber()); } } if (queueFirst.isCompleted()) { nextReadValue = queueFirst.getWriteNumber(); writeQueue.removeFirst(); } else { break; } } if (!ranOnce) { throw new RuntimeException("never was a first"); } //修改读事务的id,所有小于该id的事务都已完成,对read可见 if (nextReadValue > 0) { synchronized (readWaiters) { memstoreRead = nextReadValue; readWaiters.notifyAll(); } } if (memstoreRead >= e.getWriteNumber()) { return true; } return false; } }
/** * Wait for the global readPoint to advance upto * the specified transaction number. */ public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { //如果前面请求还未处理完,则等待它们结束 while (memstoreRead < e.getWriteNumber()) { try { readWaiters.wait(0); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. interrupted = true; } } } if (interrupted) Thread.currentThread().interrupt(); }
13.检查memstore的内存大小是否超过memstoreFlushSize,是则请求flush,异步
14.返回结果,如果put操作没拿到行锁,则结果是null
相关推荐
df.write.format("org.apache.spark.sql.hbase") \ .options(table="my_table", columnFamily="cf") \ .option("rowkey", "key_column") \ .save() ``` 至此,你已经在Linux环境中成功搭建了HBase,并准备好了与...
2. **环境配置**:解压`hbase-1.1.5-bin.tar`后,你需要将HBase的bin目录添加到系统的PATH环境变量中,以便在命令行中直接运行HBase命令。 3. **配置文件**:HBase的主要配置文件是`conf/hbase-site.xml`。在这里,...
Availability Problems-- Phoenix Write Path Availability Problems-- Fail Recovery Availability Problems-- Fail Recovery Dead Lock Availability Problems-- Phoenix Index Write Error Handler Availability ...
- `hbase.bucketcache.persistent.path` 设置为 `"file:/disk1/hbase/cache.meta"`,表示 Bucket Cache 元数据的存储路径,用于重启时恢复缓存状态。 #### 工作流程 1. **首次使用流程**: - 当第一次请求一个块...
5. 写入范围,以便读取 DataDrivenDBInputFormat.write(DataOutput output)。 6. 读取以上 2)写入的范围 DataDrivenDBInputFormat.readFields(DataInput input)。 7. 创建 RecordReader 从数据库中读取数据。 8. ...
result.write.format("parquet").save("path/to/save") ``` 在Boss直聘的Spark指标分析程序中,可能还会涉及到数据源的实时性,如使用Spark Streaming处理实时数据流,或者与外部数据库如Hive、HBase集成。此外,...
2. 系统篇:关注底层系统层面的优化,如IO调度算法(如CFQ、DEADLINE、NOOP),RAID策略(write back与write through,以及BBU的使用),网卡多队列技术以提高网络处理能力,以及CPU软中断、中断分布等。这些系统...
5. **结果导出**:将处理后的数据写回HDFS或其他存储系统,如`df.write.format("parquet").save("hdfs://path/to/output")`。 在实际项目中,gy_pub.sql和ds_pub.sql可能还会涉及到更复杂的操作,如窗口函数、...
2. **Sinks**:数据的去向,通常是数据存储系统,如 HDFS、HBase 或 Kafka。 3. **Channels**:临时存储数据的媒介,确保数据在传输过程中的可靠性。 Flume 的工作流程是线性的,数据从 Source 流入 Channel,然后...
a1.sinks.k1.hdfs.writeFormat = Text # Use a channel that buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the ...
- **HBase**:HBase依赖ZooKeeper进行RegionServer的负载均衡和主 Region 的选举。 在实际使用中,理解并熟练掌握ZooKeeper的这些核心概念和技术,将有助于构建和维护高效稳定的分布式系统。同时,ZooInspector...
Spark是Apache软件基金会下的一个开源大数据处理...df.write.parquet("output/path") ``` 以上就是Spark的基础知识,包括其部署、基本概念和代码编写。通过深入学习和实践,你将能够熟练运用Spark处理各种大数据任务。
sink则将接收到的数据发送到目标位置,如HDFS、HBase或Kafka等。 在配置Flume时,用户需要创建一个或多个配置文件,定义agent(代理),每个agent包含源、通道和sink的组合。例如,一个简单的配置可能如下: ```...
DataX是阿里巴巴集团开源的一款跨平台、高性能的数据同步工具,能够实现包括MySQL、Oracle、SQLServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS等数据源之间的数据迁移。...
Sink 负责将数据从通道中取出并发送到目的地,如 HDFS、HBase、Elasticsearch 或简单的文本文件。 在配置 Flume 时,你需要定义一个或多个Agent。Agent 是 Flume 的基本工作单元,它包含至少一个源、一个或多个通道...
3. **Sinks**: 数据通过Sinks传递到最终目的地,可以是另一个Flume Agent,也可以是HDFS、Kafka、HBase等。选择哪种Sink取决于你的数据处理需求。 **三、Flume配置** Flume配置采用基于Java的Properties格式,易于...
Flume 支持多种数据源,如网络套接字、文件系统、应用程序接口等,能够灵活地将数据传输到各种存储系统,如 HDFS、HBase 或其他日志管理系统。通过构建可配置的、容错的、高可用的数据管道,Flume 提供了一种有效...
- **HBase**:提供基于 HDFS 的 NoSQL 数据库服务。 - **ZooKeeper**:提供分布式协调服务,解决分布式环境中的一致性问题。 #### 七、Hadoop优化与调优 1. **HDFS优化**: - 增加数据块大小:默认情况下,HDFS ...
sqlResult.write().format("parquet").save("output_path"); ``` 总的来说,Apache Spark SQL通过提供对结构化数据的强大支持,极大地简化了大数据处理中的分析任务。在Java项目中,开发人员可以利用其丰富的API和...
这些系统可能包括传统的关系型数据库如 MySQL、Oracle,以及大数据平台中的 HDFS、Hive、ODPS、HBase 和 FTP 等。通过DataX,可以将原本复杂的多对多的数据同步网络简化为以 DataX 为中心的星型结构,从而大大简化了...