HBase put数据时会先将数据写入内存,其内存结构是一个ConcurrentSkipListMap,其Comparator是KVComparator。
keyvalue对象结构
KVComparator的KeyValue对象比较过程
1.使用KeyComparator比较rowkey,结果是rowkey字节序从小到大
2.如果rowkey一样,则按column family比较,结果是column family字节序从小到大
3.如果column family一样,则按family+qualifier比较,结果是qualifier字节序从小到大
4.如果qualifier也一样,则按timestamp排序,结果是timestamp从大到小排序
5.如果timestamp也一样,则按type排序,delete在put之前
6.以上都一样,则按照memstoreTS排序,memstoreTS是原子递增id,不可能一样,结果是memstoreTS从大到小排序,越新的修改会排前面,方便scan
可见KeyValue对象在内存里其实是已经排序好了,flush生成文件的时候,只是简单的scan一下,设置maxVersion(在这里超过maxVersion的put自动失效了),将每个KeyValue对象写入HDFS
Flush生成HFile的过程大抵如下
1.构造Writer,最新版本是HFileWriterV2,第2版
2.循环将KeyValue对象append到writer,这里会按block写入cache,默认64k,每64k,就要重新new一个block,每次finish一个block,就会添加一条索引记录到block index,到block index超过一定限制(默认124K),则写入一个特殊的InlineBlock,代表这是一个索引块,HFile就是data block和inline block交替结构
3.KeyValue对象写完后,再将索引数据以inline block的形式全部写入,最后写入root index,fileInfo等信息。
HFile V2结构
其实现类图如下
主流程
Scan scan = new Scan(); //最多保留的版本,默认为3 scan.setMaxVersions(scanInfo.getMaxVersions()); // Use a store scanner to find which rows to flush. // Note that we need to retain deletes, hence // treat this as a minor compaction. InternalScanner scanner = new StoreScanner(this, scan, Collections .singletonList(new CollectionBackedScanner(set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. synchronized (flushLock) { status.setStatus("Flushing " + this + ": creating writer"); // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); pathName = writer.getPath(); try { List<KeyValue> kvs = new ArrayList<KeyValue>(); boolean hasMore; do { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to disk. if (kv.getMemstoreTS() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); kv.setMemstoreTS(0); } //append写keyvalue writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } kvs.clear(); } } while (hasMore); } finally { // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. status.setStatus("Flushing " + this + ": appending metadata"); writer.appendMetadata(logCacheFlushId, false); status.setStatus("Flushing " + this + ": closing flushed file"); //写入元数据 writer.close(); } } } finally { flushedSize.set(flushed); scanner.close(); }
append过程
private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { //检查key顺序,返回是否和上一个key重复 boolean dupKey = checkKey(key, koffset, klength); checkValue(value, voffset, vlength); //如果是新key,才检查block是否超过限制,也就是说同样的key保证在同一个block data里 //如果block超过64K限制,则开始将block写入HDFS的outputstream,不flush,同时更新block index信息 if (!dupKey) { checkBlockBoundary(); } //初始化的时候重置状态,准备开始写入data block了 if (!fsBlockWriter.isWriting()) newBlock(); // Write length of key and value and then actual key and value bytes. // Additionally, we may also write down the memstoreTS. { //userDataStream是临时的,如果block满了之后,会将里面的数据flush到HDFS的outputstream //这里将keyvalue对象顺序写入 DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); totalKeyLength += klength; out.writeInt(vlength); totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); if (this.includeMemstoreTS) { WritableUtils.writeVLong(out, memstoreTS); } } // Are we the first key in this block? //block的第一个key,后续会作为data index的entry属性 if (firstKeyInBlock == null) { // Copy the key. firstKeyInBlock = new byte[klength]; System.arraycopy(key, koffset, firstKeyInBlock, 0, klength); } //上一个keyvalue lastKeyBuffer = key; lastKeyOffset = koffset; lastKeyLength = klength; entryCount++; }
初始化开始写入
/** * Starts writing into the block. The previous block's data is discarded. * * @return the stream the user can write their data into * @throws IOException */ public DataOutputStream startWriting(BlockType newBlockType) throws IOException { if (state == State.BLOCK_READY && startOffset != -1) { // We had a previous block that was written to a stream at a specific // offset. Save that offset as the last offset of a block of that type. //保存着同类型block的上一个block的偏移量 prevOffsetByType[blockType.getId()] = startOffset; } //当前block的偏移量 startOffset = -1; //block类型,主要有data,block,index block,meta block blockType = newBlockType; //临时buffer baosInMemory.reset(); //头数据,这里是占位用,后续finish block的时候会写入正式的header数据 baosInMemory.write(DUMMY_HEADER); //开始写 state = State.WRITING; // We will compress it later in finishBlock() //临时stream userDataStream = new DataOutputStream(baosInMemory); return userDataStream; }
写着写着,可能block就满了,检查data block是否已满
private void checkBlockBoundary() throws IOException { //默认64K if (fsBlockWriter.blockSizeWritten() < blockSize) return; //将之前写入的userDataStream里的data block写入HDFS的outputStream,添加索引记录 finishBlock(); //如果索引快满了,则将index block写入HDFS的outputStream writeInlineBlocks(false); //重置状态,进入’WRITING‘状态,等待写入 newBlock(); }
具体finishBlock过程,flush数据到HDFS的outputstream
/** Clean up the current block */ private void finishBlock() throws IOException { //前置状态’WRITING‘,userDataStream有数据写入 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; long startTimeNs = System.nanoTime(); // Update the first data block offset for scanning. //第一个data block的偏移量 if (firstDataBlockOffset == -1) { firstDataBlockOffset = outputStream.getPos(); } // Update the last data block offset //上一个data block的偏移量 lastDataBlockOffset = outputStream.getPos(); //这里将userDataStream里的数据flush到HDFS的outputStream fsBlockWriter.writeHeaderAndData(outputStream); //在HDFS上写了多少字节,有可能压缩后,加上了checksum数据 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); //更新data block的索引 dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset, onDiskSize); //未压缩的数据字节数 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); HFile.offerWriteLatency(System.nanoTime() - startTimeNs); HFile.offerWriteData(onDiskSize); //更新block cache if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } }
写入HDFS stream过程
public void writeHeaderAndData(FSDataOutputStream out) throws IOException { long offset = out.getPos(); if (startOffset != -1 && offset != startOffset) { throw new IOException("A " + blockType + " block written to a " + "stream twice, first at offset " + startOffset + ", then at " + offset); } //这个块的开始位置 startOffset = offset; //写 writeHeaderAndData((DataOutputStream) out); } private void writeHeaderAndData(DataOutputStream out) throws IOException { //这个方法比较重要,当一个block需要flush到HDFS stream的时候,需要做数据做一些处理,比如压缩,编码等,设置状态为’READY‘ ensureBlockReady(); //onDiskBytesWithHeader是处理之后的数据,直接写入HDFS stream out.write(onDiskBytesWithHeader); if (compressAlgo == NONE) { if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) { throw new IOException("A " + blockType + " without compression should have checksums " + " stored separately."); } //不压缩的话,还要写入checksum out.write(onDiskChecksum); } }
对buffer数据处理部分,包括压缩和编码等处理
private void finishBlock() throws IOException { //先flush一下 userDataStream.flush(); // This does an array copy, so it is safe to cache this byte array. //拿到buffer中的数据,也就是当前所有写入的数据,未压缩 uncompressedBytesWithHeader = baosInMemory.toByteArray(); //上一个同类型的block偏移量 prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for // cache-on-write. In a way, the block is ready, but not yet encoded or // compressed. //READY,准备flush state = State.BLOCK_READY; //encode encodeDataBlockForDisk(); //压缩和checksum doCompressionAndChecksumming(); }
压缩和checksum,压缩之后,checksum数据直接写入onDiskBytesWithHeader,否则写入onDiskChecksum,不管压缩不压缩,都要写入block的header数据
private void doCompressionAndChecksumming() throws IOException { // do the compression if (compressAlgo != NONE) { compressedByteStream.reset(); compressedByteStream.write(DUMMY_HEADER); compressionStream.resetState(); compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, uncompressedBytesWithHeader.length - HEADER_SIZE); compressionStream.flush(); compressionStream.finish(); // generate checksums onDiskDataSizeWithHeader = compressedByteStream.size(); // data size // reserve space for checksums in the output byte stream ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, onDiskDataSizeWithHeader, bytesPerChecksum); onDiskBytesWithHeader = compressedByteStream.toByteArray(); putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); // generate checksums for header and data. The checksums are // part of onDiskBytesWithHeader itself. ChecksumUtil.generateChecksums( onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader, onDiskBytesWithHeader, onDiskDataSizeWithHeader, checksumType, bytesPerChecksum); // Checksums are already part of onDiskBytesWithHeader onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; //set the header for the uncompressed bytes (for cache-on-write) putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length + onDiskChecksum.length, uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); } else { // If we are not using any compression, then the // checksums are written to its own array onDiskChecksum. onDiskBytesWithHeader = uncompressedBytesWithHeader; onDiskDataSizeWithHeader = onDiskBytesWithHeader.length; //check份数 int numBytes = (int)ChecksumUtil.numBytes( uncompressedBytesWithHeader.length, bytesPerChecksum); //checksum数据 onDiskChecksum = new byte[numBytes]; //set the header for the uncompressed bytes //修改header putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length + onDiskChecksum.length, uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); ChecksumUtil.generateChecksums( uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length, onDiskChecksum, 0, checksumType, bytesPerChecksum); } }
data block处理完之后,更新索引,索引项由block的firstkey,开始的偏移量,dataSize组成。索引主要有2种,leaf-level chunk和root index chunk
void add(byte[] firstKey, long blockOffset, int onDiskDataSize, long curTotalNumSubEntries) { // Record the offset for the secondary index //二级索引,记每个entry的偏移量 secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize); //下一个entry的偏移地址 curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD + firstKey.length; //给root chunk用 curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT + WritableUtils.getVIntSize(firstKey.length) + firstKey.length; //索引信息记录 blockKeys.add(firstKey); blockOffsets.add(blockOffset); onDiskDataSizes.add(onDiskDataSize); //如果是root index chunk添加索引 if (curTotalNumSubEntries != -1) { numSubEntriesAt.add(curTotalNumSubEntries); // Make sure the parallel arrays are in sync. if (numSubEntriesAt.size() != blockKeys.size()) { throw new IllegalStateException("Only have key/value count " + "stats for " + numSubEntriesAt.size() + " block index " + "entries out of " + blockKeys.size()); } } }
回到前头,finishBlock之后,数据都从buffer中flush到了HDFS的stream里。这个时候给index block一个机会,检查下是否已满,满的话,将索引块flush到HDFS
private void writeInlineBlocks(boolean closing) throws IOException { for (InlineBlockWriter ibw : inlineBlockWriters) { //如果InlineBlock需要flush,就flush while (ibw.shouldWriteBlock(closing)) { long offset = outputStream.getPos(); boolean cacheThisBlock = ibw.cacheOnWrite(); //和data block写入一样,先start,再write ibw.writeInlineBlock(fsBlockWriter.startWriting( ibw.getInlineBlockType())); fsBlockWriter.writeHeaderAndData(outputStream); ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), fsBlockWriter.getUncompressedSizeWithoutHeader()); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); if (cacheThisBlock) { doCacheOnWrite(offset); } } } }
对于BlockIndexWriter来说规则是,maxChunkSize默认128K
curInlineChunk.getNonRootSize() >= maxChunkSize;
看看BlockIndexWriter是如何写的
public void writeInlineBlock(DataOutput out) throws IOException { if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED); // Write the inline block index to the output stream in the non-root // index block format. //以leaf chunk写入 curInlineChunk.writeNonRoot(out); // Save the first key of the inline block so that we can add it to the // parent-level index. //保留这个index block的firstkey,给后续多级索引用 firstKey = curInlineChunk.getBlockKey(0); // Start a new inline index block //curInlineChunk初始化,后续索引数据继续写 curInlineChunk.clear(); }
写入leaf chunk
void writeNonRoot(DataOutput out) throws IOException { // The number of entries in the block. //从索引记录数 out.writeInt(blockKeys.size()); if (secondaryIndexOffsetMarks.size() != blockKeys.size()) { throw new IOException("Corrupted block index chunk writer: " + blockKeys.size() + " entries but " + secondaryIndexOffsetMarks.size() + " secondary index items"); } // For each entry, write a "secondary index" of relative offsets to the // entries from the end of the secondary index. This works, because at // read time we read the number of entries and know where the secondary // index ends. //二级索引数据写入,因为每个索引entry是变长的,这个二级索引记录着每个entry的偏移信息,方便查找 for (int currentSecondaryIndex : secondaryIndexOffsetMarks) out.writeInt(currentSecondaryIndex); // We include one other element in the secondary index to calculate the // size of each entry more easily by subtracting secondary index elements. //总大小 out.writeInt(curTotalNonRootEntrySize); //索引数据 for (int i = 0; i < blockKeys.size(); ++i) { out.writeLong(blockOffsets.get(i)); out.writeInt(onDiskDataSizes.get(i)); out.write(blockKeys.get(i)); } }
索引block写入HDFS stream后,更新rootChunk索引,rootChunk是一个对data block index块的索引结构,所有keyvalue都写完后,rootChunk才会flush
到HDFS stream,会进一步分裂多级结构,但是在循环写入的时候只有2级
public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { // Add leaf index block size totalBlockOnDiskSize += onDiskSize; totalBlockUncompressedSize += uncompressedSize; if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED); if (firstKey == null) { throw new IllegalStateException("Trying to add second-level index " + "entry with offset=" + offset + " and onDiskSize=" + onDiskSize + "but the first key was not set in writeInlineBlock"); } //写入的时候,只有2级,因为rootChunk 128K足够存memstore的所有索引信息了 if (rootChunk.getNumEntries() == 0) { // We are writing the first leaf block, so increase index level. expectNumLevels(1); numLevels = 2; } // Add another entry to the second-level index. Include the number of // entries in all previous leaf-level chunks for mid-key calculation. //root索引写入entry,totalNumEntries为当前子节点数,也就是leaf level chunk数目 rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries); firstKey = null; }
以上就是循环的大抵过程,data block和data index block交替写入。当所有数据都写完后,开始做close操作,看HFileWriterV2的close操作
public void close() throws IOException { if (outputStream == null) { return; } // Write out the end of the data blocks, then write meta data blocks. // followed by fileinfo, data block index and meta block index. //结尾数据写入stream finishBlock(); //写index block数据 writeInlineBlocks(true); //trailer信息 FixedFileTrailer trailer = new FixedFileTrailer(2, HFileReaderV2.MAX_MINOR_VERSION); // Write out the metadata blocks if any. //meta block写入,V2里meta没用,V1里是用来存bloomfilter的 if (!metaNames.isEmpty()) { for (int i = 0; i < metaNames.size(); ++i) { // store the beginning offset long offset = outputStream.getPos(); // write the metadata content DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META); metaData.get(i).write(dos); fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); // Add the new meta block to the meta index. metaBlockIndexWriter.addEntry(metaNames.get(i), offset, fsBlockWriter.getOnDiskSizeWithHeader()); } } // Load-on-open section. // Data block index. // // In version 2, this section of the file starts with the root level data // block index. We call a function that writes intermediate-level blocks // first, then root level, and returns the offset of the root level block // index. //这里将rootChunk分裂成子树,如果rootChunk很大,按照128K分裂成子block并写入HDFS,如果一开始索引树有2层的话,结果索引树会有3层 //也就是多了一个中间层 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); trailer.setLoadOnOpenOffset(rootIndexOffset); // Meta block index. //写入meta block metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting( BlockType.ROOT_INDEX), "meta"); fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); if (this.includeMemstoreTS) { appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); } //写file info // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO)); fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); // Load-on-open data supplied by higher levels, e.g. Bloom filters. for (BlockWritable w : additionalLoadOnOpenData){ fsBlockWriter.writeBlock(w, outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); } //写trailer // Now finish off the trailer. trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); trailer.setUncompressedDataIndexSize( dataBlockIndexWriter.getTotalUncompressedSize()); trailer.setFirstDataBlockOffset(firstDataBlockOffset); trailer.setLastDataBlockOffset(lastDataBlockOffset); trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); finishClose(trailer); fsBlockWriter.releaseCompressor(); }
相关推荐
在处理每一行数据时,你可以将其拆分为键(row key)和值(value),然后使用`Table`对象写入HBase: ```java Table table = connection.getTable(TableName.valueOf(tableName)); // 假设每一行格式为"key\tvalue...
3. 上传文件:使用HBase的Java API或者HBase Shell命令,将图片和小视频数据作为KeyValue存储到表中。数据的Key可以是文件的唯一标识,Value则是文件内容。 4. 查询和读取:当需要访问这些文件时,HBase会自动处理...
在HBase中,表结构由一系列的行组成,每行都有唯一的row key,并且每个单元格都存储在列族下。为了描述这样一个表结构,我们可以使用Java中的Map来表示列族,List来表示具体的列名。以下是一个简单的示例代码: ```...
在HBase中,它要求在访问服务前,用户必须先通过Kerberos Key Distribution Center (KDC)进行身份验证。如果验证成功,用户才能继续访问HBase服务。这种方式提供了很高的安全性,但配置相对复杂,不仅需要配置...
1. **创建表**:使用HBase shell,执行`create 'table_name', 'column_family'`命令创建一个包含指定列族的表。 2. **插入数据**:通过`put 'table_name', 'row_key', 'column_family:qualifier', 'value'`将数据...
hbase(main):002:0> put 'ORDER_INFO', 'row_key', 'ORDER_DETAILS:ORDER_ID', 'order_id_value' hbase(main):003:0> put 'ORDER_INFO', 'row_key', 'ORDER_DETAILS:USER_ID', 'user_id_value' # 重复此过程,用...
8. StoreFile:Region内的数据被组织成StoreFile,是HBase内部的HFile格式,提供高效的数据读写。 二、HBase的客户端依赖 1. HBase客户端库:为了与HBase服务器通信,客户端需要导入HBase的客户端库。这个库包含了...
此外,HBase支持数据的批量加载,如使用HFile或SequenceFile格式预先准备数据,然后使用 bulk load 功能快速导入到表中,这对于初始化大量数据非常有用。 最后,监控和优化HBase性能至关重要。这包括查看Region分布...
例如,`HFile#createReader` 方法的参数、`HFileScanner#getKeyValue` 的移除、`HFile.WriterFactory#withComparator` 的入参类型变化、`HFileReaderImpl#getMetaBlock` 返回类型改变以及 `HColumnDescriptor` 的弃...
- HBase 使用 HFile 作为存储文件格式,而 HFile 内部是以 KeyValue 形式存储数据。 - **选项解析:** - A. 拥有复杂结构的字符串:不准确,KeyValue 中的 Value 并不一定具备复杂结构。 - B. 字符串:过于泛化...