写入数据:
public class TestWrit { private static Configuration cfg = new Configuration(); private static final int BLOCK_INDEX_SIZE = 60; private static final int BLOOM_BLOCK_INDEX_SIZE = 10; public TestWrit() { cfg.setInt("hfile.index.block.max.size", BLOCK_INDEX_SIZE); cfg.setInt("io.storefile.bloom.block.size", BLOOM_BLOCK_INDEX_SIZE); //cfg.setBoolean("hbase.regionserver.checksum.verify", true); } public static void main(String[] args) throws IOException { } public void test() throws IOException { //指定写入的路径 Path path = new Path("/data0/hbase/test/myhfile"); FileSystem fs = FileSystem.get(cfg); CacheConfig config = new CacheConfig(cfg); FSDataOutputStream fsdos = fs.create(path); //MyDataOutputStream mdos = new MyDataOutputStream(fsdos); //fsdos = new FSDataOutputStream(mdos); //创建压缩算法,文件块编码,比较器 //HFile默认的比较器是字典排序的,也可以生成一个自定义的比较器,但必须继承KeyComparator Algorithm algorithm = Algorithm.GZ; HFileDataBlockEncoder encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding.DIFF); KeyComparator comparator = new KeyComparator(); ChecksumType check = ChecksumType.CRC32; //创建HFile写实现类,指定写入的数据块大小,多少字节生成一个checksum int blockSize = 100; int checkPerBytes = 16384; HFileWriterV2 v2 = new HFileWriterV2(cfg, config, fs, path, fsdos, blockSize, algorithm, encoder, comparator, check, checkPerBytes, true); /** * HFile默认的比较器是字典排序的,所以插入的key也必须是字典排序,如果不想按照字典排序, * 这里使用红黑树保证key的有序性 String keyPrefix = new String("key"); TreeSet<String> set = new TreeSet<String>(); int len = 100; for(int i=1;i<=len;i++) { set.add(""+i); } for(String key:set) { String generatorKey = keyPrefix+key; v2.append( generator(generatorKey,"c","",System.currentTimeMillis(),VALUES) ); } */ //创建两个布隆过滤器,指定最大的key数为5 int maxKey = 5; BloomFilterWriter bw = BloomFilterFactory.createGeneralBloomAtWrite(cfg, config, BloomType.ROW, maxKey, v2); BloomFilterWriter bw2 = BloomFilterFactory.createDeleteBloomAtWrite(cfg, config, maxKey, v2); //生成KeyValue,插入到HFile中,并保存到布隆过滤器中 KeyValue kv = generator("key111111111111111111111111","value","f",System.currentTimeMillis(),new byte[]{'2'}); addToHFileWirterAndBloomFile(kv,v2,bw,bw2); kv = generator("key222222222222222222222222","value","f",System.currentTimeMillis(),new byte[]{'2'}); addToHFileWirterAndBloomFile(kv,v2,bw,bw2); kv = generator("key333333333333333333333333","value","f",System.currentTimeMillis(),new byte[]{'2'}); addToHFileWirterAndBloomFile(kv,v2,bw,bw2); //生成meta块,布隆过滤器块,删除的布隆过滤器块 //自定义文件信息块的key-value //布隆过滤器加入到HFile.Writer时会判断里面是否有数据,所以要先将key插入到布隆过滤器中,再加入到 //Writerv2中 v2.addGeneralBloomFilter(bw); v2.addDeleteFamilyBloomFilter(bw2); v2.appendMetaBlock("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", new MyWritable()); v2.appendFileInfo(Bytes.toBytes("mykey"), Bytes.toBytes("myvalue")); v2.close(); } /** * 插入一个KeyValue到HFile中,同时将这个key保存到布隆过滤器中 */ public void addToHFileWirterAndBloomFile(KeyValue kv, HFileWriterV2 v2, BloomFilterWriter bw, BloomFilterWriter bw2) throws IOException { v2.append( kv ); byte[] buf = bw.createBloomKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); bw.add(buf, 0, buf.length); bw2.add(buf, 0, buf.length); } /** * 生成KeyValue */ public KeyValue generator(String key,String column,String qualifier,long timestamp,byte[] value) { byte[] keyBytes = Bytes.toBytes(key); byte[] familyBytes = Bytes.toBytes(column); byte[] qualifierBytes = Bytes.toBytes(qualifier); Type type = Type.Put; byte[] valueBytes = value; KeyValue kv = new KeyValue(keyBytes, 0, keyBytes.length, familyBytes, 0, familyBytes.length, qualifierBytes, 0, qualifierBytes.length, timestamp, type, valueBytes, 0, valueBytes.length); return kv; } }
写入到磁盘时的内存dump:
读取操作:
public class TestReader { public static String FILE_PATH = "/data0/hbase/test/myhfile"; public Configuration cfg = new Configuration(); private FSReader fsBlockReader; /** * 二级索引长度 */ private static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; public static void main(String[] args) throws Exception { TestReader t = new TestReader(); t.readBloom(); } /** * 解析布隆过滤器 */ public void readBloom() throws IOException { // 创建读取路径,本地文件系统,两个读取流 Path path = new Path(FILE_PATH); FileSystem fs = FileSystem.getLocal(cfg); CacheConfig config = new CacheConfig(cfg); // 由HFile创建出Reader实现类 Reader reader = HFile.createReader(fs, path, config); // 创建通用布隆过滤器 DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); BloomFilter bloomFilter = null; if (bloomMeta != null) { bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader); System.out.println(bloomFilter); } //创建删除的布隆过滤器 bloomMeta = reader.getDeleteBloomFilterMetadata(); bloomFilter = null; if (bloomMeta != null) { bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader); System.out.println(bloomFilter); } //meta的读取实现在 HFileReaderV2#getMetaBlock()中 } /** * 使用Scanner读取数据块内容 */ @SuppressWarnings("unchecked") public void readScan() throws IOException, SecurityException, NoSuchMethodException, IllegalArgumentException, IllegalAccessException, InvocationTargetException { // 创建读取路径,本地文件系统,两个读取流 Path path = new Path(FILE_PATH); FileSystem fs = FileSystem.getLocal(cfg); CacheConfig config = new CacheConfig(cfg); FSDataInputStream fsdis = fs.open(path); FSDataInputStream fsdisNoFsChecksum = fsdis; HFileSystem hfs = new HFileSystem(fs); long size = fs.getFileStatus(path).getLen(); // 由读FS读取流,文件长度,就可以读取到尾文件块 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size); // 根据尾文件块,和其他相关信息,创建HFile.Reader实现 HFileReaderV2 v2 = new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum, size, true, config, DataBlockEncoding.NONE, hfs); System.out.println(v2); // 读取FileInfo中的内容 Method method = v2.getClass().getMethod("loadFileInfo", new Class[] {}); Map<byte[], byte[]> fileInfo = (Map<byte[], byte[]>) method.invoke(v2, new Object[] {}); Iterator<Entry<byte[], byte[]>> iter = fileInfo.entrySet().iterator(); while (iter.hasNext()) { Entry<byte[], byte[]> entry = iter.next(); System.out.println(Bytes.toString(entry.getKey()) + " = " + Bytes.toShort(entry.getValue())); } // 由Reader实现创建扫描器Scanner,负责读取数据块 // 并遍历所有的数据块中的KeyValue HFileScanner scanner = v2.getScanner(false, false); scanner.seekTo(); System.out.println(scanner.getKeyValue()); KeyValue kv = scanner.getKeyValue(); while (scanner.next()) { kv = scanner.getKeyValue(); System.out.println(kv); } v2.close(); } /** * 解析HFile中的数据索引 */ @SuppressWarnings({ "unused", "unchecked" }) public void readIndex() throws Exception { // 创建读取路径,本地文件系统,两个读取流 // 由读FS读取流,文件长度,就可以读取到尾文件块 Path path = new Path(FILE_PATH); FileSystem fs = FileSystem.getLocal(cfg); CacheConfig config = new CacheConfig(cfg); FSDataInputStream fsdis = fs.open(path); FSDataInputStream fsdisNoFsChecksum = fsdis; HFileSystem hfs = new HFileSystem(fs); long size = fs.getFileStatus(path).getLen(); FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size); // 下面创建的一些类,在Reader实现类的构造函数中也可以找到,创建具体文件读取实现FSReader // 由于这个类没有提供对外的创建方式,只能通过反射构造 FSReader Compression.Algorithm compressAlgo = trailer.getCompressionCodec(); Class<?> clazz = Class .forName("org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderV2"); java.lang.reflect.Constructor<FSReader> constructor = (Constructor<FSReader>) clazz .getConstructor(new Class[] { FSDataInputStream.class, FSDataInputStream.class, Compression.Algorithm.class, long.class, int.class, HFileSystem.class, Path.class }); constructor.setAccessible(true); fsBlockReader = constructor.newInstance(fsdis, fsdis, compressAlgo, size, 0, hfs, path); // 创建比较器,比较器是定义在尾文件块中 RawComparator<byte[]> comparator = FixedFileTrailer .createComparator(KeyComparator.class.getName()); // 创建读取数据块的根索引 BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader( comparator, trailer.getNumDataIndexLevels()); // 创建读取元数据快的根索引 BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( Bytes.BYTES_RAWCOMPARATOR, 1); // 创建 HFileBlock 迭代器 HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange( trailer.getLoadOnOpenDataOffset(), size - trailer.getTrailerSize()); // 读取数据文件根索引 dataBlockIndexReader.readMultiLevelIndexRoot( blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); // 读取元数据根索引 metaBlockIndexReader.readRootIndex( blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getMetaIndexCount()); // 读取FileInfo块中的信息 // 由于FileInfo块不是public的,所以定义了一个MyFileInfo,内容跟FileInfo一样 long fileinfoOffset = trailer.getFileInfoOffset(); HFileBlock fileinfoBlock = fsBlockReader.readBlockData(fileinfoOffset, -1, -1, false); MyFileInfo fileinfo = new MyFileInfo(); fileinfo.readFields(fileinfoBlock.getByteStream()); int avgKeyLength = Bytes.toInt(fileinfo.get(MyFileInfo.AVG_KEY_LEN)); int avgValueLength = Bytes .toInt(fileinfo.get(MyFileInfo.AVG_VALUE_LEN)); long entryCount = trailer.getEntryCount(); System.out.println("avg key length=" + avgKeyLength); System.out.println("avg value length=" + avgValueLength); System.out.println("entry count=" + entryCount); int numDataIndexLevels = trailer.getNumDataIndexLevels(); if (numDataIndexLevels > 1) { // 大于一层 iteratorRootIndex(dataBlockIndexReader); } else { // 单根索引 iteratorSingleIndex(dataBlockIndexReader); } fsdis.close(); fsdisNoFsChecksum.close(); } /** * 解析单层索引 */ public void iteratorSingleIndex(BlockIndexReader dataBlockIndex) { for (int i = 0; i < dataBlockIndex.getRootBlockCount(); i++) { byte[] keyCell = dataBlockIndex.getRootBlockKey(i); int blockDataSize = dataBlockIndex.getRootBlockDataSize(i); String rowKey = parseKeyCellRowkey(keyCell); System.out.println("rowkey=" + rowKey + "\tdata size=" + blockDataSize); } } /** * 解析多层索引,首先解析根索引 */ public void iteratorRootIndex(BlockIndexReader dataBlockIndex) throws IOException { for (int i = 0; i < dataBlockIndex.getRootBlockCount(); i++) { long offset = dataBlockIndex.getRootBlockOffset(i); int onDiskSize = dataBlockIndex.getRootBlockDataSize(i); iteratorNonRootIndex(offset, onDiskSize); } } /** * 递归解析每个中间索引 */ public void iteratorNonRootIndex(long offset, int onDiskSize) throws IOException { HFileBlock block = fsBlockReader.readBlockData(offset, onDiskSize, -1, false); if (block.getBlockType().equals(BlockType.LEAF_INDEX)) { parseLeafIndex(block); return; } // 开始计算中间层索引的 每个key位置 ByteBuffer buffer = block.getBufferReadOnly(); buffer = ByteBuffer.wrap(buffer.array(), buffer.arrayOffset() + block.headerSize(), buffer.limit() - block.headerSize()).slice(); int indexCount = buffer.getInt(); // 二级索引全部偏移量,二级索引数据+二级索引总数(int)+索引文件总大小(int) int entriesOffset = Bytes.SIZEOF_INT * (indexCount + 2); for (int i = 0; i < indexCount; i++) { // 二级索引指向的偏移量 // 如当前遍历到第一个key,那么二级索引偏移量就是 第二个int(第一个是索引总数) int indexKeyOffset = buffer.getInt(Bytes.SIZEOF_INT * (i + 1)); long blockOffsetIndex = buffer.getLong(indexKeyOffset + entriesOffset); int blockSizeIndex = buffer.getInt(indexKeyOffset + entriesOffset + Bytes.SIZEOF_LONG); iteratorNonRootIndex(blockOffsetIndex, blockSizeIndex); } } /** * 解析叶索引 */ public void parseLeafIndex(HFileBlock block) { // 开始计算中间层索引的 每个key位置 ByteBuffer buffer = block.getBufferReadOnly(); buffer = ByteBuffer.wrap(buffer.array(), buffer.arrayOffset() + block.headerSize(), buffer.limit() - block.headerSize()).slice(); int indexCount = buffer.getInt(); // 二级索引全部偏移量,二级索引数据+二级索引总数(int)+索引文件总大小(int) int entriesOffset = Bytes.SIZEOF_INT * (indexCount + 2); for (int i = 0; i < indexCount; i++) { // 二级索引指向的偏移量 // 如当前遍历到第一个key,那么二级索引偏移量就是 第二个int(第一个是索引总数) int indexKeyOffset = buffer.getInt(Bytes.SIZEOF_INT * (i + 1)); // 全部二级索引长度+key偏移位置+ 块索引offset(long)+块大小(int) // 可以计算出真实的key的偏移位置 int KeyOffset = entriesOffset + indexKeyOffset + SECONDARY_INDEX_ENTRY_OVERHEAD; // long blockOffsetIndex = // buffer.getLong(indexKeyOffset+entriesOffset); int blockSizeIndex = buffer.getInt(indexKeyOffset + entriesOffset + Bytes.SIZEOF_LONG); // 计算key的长度 int length = buffer.getInt(Bytes.SIZEOF_INT * (i + 2)) - indexKeyOffset - SECONDARY_INDEX_ENTRY_OVERHEAD; // 一个key // cell包含了key长度(2字节),key,family长度(1字节),family,qualifier,timestampe(8字节),keytype(1字节) // 这里只需要key就可以了 byte[] keyCell = new byte[length]; System.arraycopy(buffer.array(), buffer.arrayOffset() + KeyOffset, keyCell, 0, length); String rowKey = parseKeyCellRowkey(keyCell); System.out.println("rowkey=" + rowKey + "\t blockSizeIndex=" + blockSizeIndex); } } /** * 通过keycell,解析出rowkey */ public static String parseKeyCellRowkey(byte[] cell) { if (cell == null || cell.length < 3) { throw new IllegalArgumentException("cell length is illegal"); } int high = (cell[0] >> 8) & 0xFF; int low = cell[1] & 0xFF; int keySize = high + low; byte[] key = new byte[keySize]; System.arraycopy(cell, 2, key, 0, key.length); return Bytes.toString(key); } }
工具类:
/** * 自定义这样的类原因是HBase的实现是非public类 */ public class MyFileInfo extends HbaseMapWritable<byte[], byte[]> { /** * hfile保留的key,以"hfile."开头 */ public static final String RESERVED_PREFIX = "hfile."; /** * hfile前缀的二进制表示 */ public static final byte[] RESERVED_PREFIX_BYTES = Bytes .toBytes(RESERVED_PREFIX); /** * last key */ public static final byte[] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY"); /** * 平均key长度 */ public static final byte[] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN"); /** * 平均value长度 */ public static final byte[] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN"); /** * 比较器 */ public static final byte[] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR"); /** * 增加一个key/value 对到file info中,可选的可以检查key的前缀 */ public MyFileInfo append(final byte[] k, final byte[] v, final boolean checkPrefix) throws IOException { if (k == null || v == null) { throw new NullPointerException("Key nor value may be null"); } if (checkPrefix && isReservedFileInfoKey(k)) { throw new IOException("Keys with a " + SaeFileInfo.RESERVED_PREFIX + " are reserved"); } put(k, v); return this; } /** * 检查当前的key是否以保留的前缀开头的 */ public static boolean isReservedFileInfoKey(byte[] key) { return Bytes.startsWith(key, SaeFileInfo.RESERVED_PREFIX_BYTES); } } /** * 自定义序列化写入实现类 * */ public class MyWritable implements Writable { @Override public void readFields(DataInput input) throws IOException { input.readInt(); } @Override public void write(DataOutput out) throws IOException { out.write(123456); } }
相关推荐
李白高力士脱靴李白贺知章告别课本剧.pptx
1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于人工智能、计算机科学与技术等相关专业,更为适合; 4、下载使用后,可先查看README.md文件(如有),本项目仅用作交流学习参考,请切勿用于商业用途。
C语言项目之超级万年历系统源码,可以做课程设计参考 文章参考:https://www.qqmu.com/4373.html
Jupyter-Notebook
51单片机加减乘除计算器系统设计(proteus8.17,keil5),复制粘贴就可以运行
《中国房地产统计年鉴》面板数据资源-精心整理.zip
Jupyter-Notebook
Jupyter-Notebook
毕业论文答辩ppt,答辩ppt模板,共18套
Jupyter-Notebook
《中国城市统计年鉴》面板数据集(2004-2020年,最新).zip
Python基础 本节课知识点: • set的定义 • Set的解析 • set的操作 • set的函数
1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于人工智能、计算机科学与技术等相关专业,更为适合; 4、下载使用后,可先查看README.md文件(如有),本项目仅用作交流学习参考,请切勿用于商业用途。
兵制与官制研究资料最新版.zip
Jupyter-Notebook
七普人口数据+微观数据+可视化+GIS矢量资源-精心整理.zip
Support package for Hovl Studio assets.unitypackage
土壤数据库最新集.zip
Jupyter-Notebook
1991-2020年中国能源统计年鉴-能源消费量(分省)面板数据-已更至最新.zip