`
paddy.w
  • 浏览: 506106 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RCFile浅析

 
阅读更多
       RCFile是Facebook制定的一种数据格式,应用在Hive、pig等系统中。RCFile结合了行存储和列存储的优点,对数据进行行分割和列分割后存储。行分割形成的多行组成一个row split,每个row split里面进行列分割。即:一个row split表示多行数据,row split内部的一行表示一列数据。需要注意的是:RCFile中的“key”和“value”一般不与MR的key和value对应,而是指一个row split中的一组key和一组value。

        RCFile的文件格式在javadoc中的层次不是很清楚,应为如下所示:
  • - Header
  • - Record
  •         - Key part
                - Record length in bytes
                - Key length in bytes
                - Number_of_rows_in_this_record(vint)
                - Column_1_ondisk_length(vint)
                - Column_1_row_1_value_plain_length
                - Column_1_row_2_value_plain_length
                - ...
                - Column_2_ondisk_length(vint)
                - Column_2_row_1_value_plain_length
                - Column_2_row_2_value_plain_length
                - ...
            - Value part
                - Compressed or plain data of [column_1_row_1_value, column_1_row_2_value,....]
                - Compressed or plain data of [column_2_row_1_value, column_2_row_2_value,....]
            - Key part
                ...
            - Value part
                ...
            ...

            这个结构图就非常清楚了,RCFile分为两部分:Header和Record。

    Header:

    • version - 3 bytes of magic header RCF, followed by 1 byte of actual version number (e.g. RCF1)
    • compression - A boolean which specifies if compression is turned on for keys/values in this file.
    • compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
    • metadata - Metadata for this file.
    • sync - A sync marker to denote end of the header. RCFile Format

            version:由4个字节组成,前3个字节为‘R’,‘C’,‘F’,之前也直接用过secequencefile的‘S’,‘E’,‘Q’。第4个字节为版本号。这部分主要表明这是一个RCFile
            compression:一个boolean,表明数据是否被压缩。
            compression codec:若compression的值为true,此字段表明了编解码器。若compression为false,则没有此字段。
            metadata:元数据信息,在内存中以TreeMap存放信息
            sync:表明Header结束,由唯一的UID+@+当前时间的毫秒形式组成。

    Record:

            Record是存放实际数据的部分,其分为两类数据:Key part和Value part。key和value是配对出现的,Record由多个key part和Value part组成

    Key part:

            一般来讲,mapreduce输出的key和value都具有实际的意义(当然,只是一般来讲),但是 RCFile的输出无视用户生成的key,而只输出value。所以key part部分存放的是与value有关的其他数据,而不是用户生成的key。其各信息的意义如下:

    • Record length in bytes:key part和value part所占用的总字节
    • Key length in bytes:key part占用的总字节
    • Number_of_rows_in_this_record(vint):行数
    • Column_1_ondisk_length(vint):第1列所占用的总字节数
    • Column_1_row_1_value_plain_length:第1列第1行(即二维坐标所确定的一个元素)占用的字节数
    • Column_1_row_2_value_plain_length:第1列第2行占用的字节数
    • ...
    • Column_2_ondisk_length(vint):第2列所占用的总字节数
    • Column_2_row_1_value_plain_length:第2列第1行(即二维坐标所确定的一个元素)占用的字节数
    • Column_2_row_2_value_plain_length:第2列第2行占用的字节数

            从以上信息知道,key part存放的即是value part的偏移量。

    Value part:

            显而易见,value part存放的即是实际的数据。

    Writer:

            Writer是RCFile内部的static class。有多个构造方法,完整构造方法:

    public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
    			short replication, long blockSize, Progressable progress,
    			Metadata metadata, CompressionCodec codec) throws IOException {
    		
    		RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
    		
    		columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);//table中共有多少列
    
    		//也可以通过createMetadata(Text... values),生成元数据,参数必须成对
    
    		if (metadata == null) {
    			metadata = new Metadata();
    		}
    		metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
    	          + columnNumber));
    		//columnsBufferSize为一个row split的大小上限
    		columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
    				4 * 1024 * 1024);
    		//未压缩的每列数据大小
    		columnValuePlainLength = new int[columnNumber];
    		//value part的buffer,ColumnBuffer为每列数据的buffer
    		columnBuffers = new ColumnBuffer[columnNumber];
    		for (int i = 0; i < columnNumber; i++) {
    			columnBuffers[i] = new ColumnBuffer();
    		}
    		//初始化参数信息
    		init(conf, fs.create(name, true, bufferSize, replication,
    				blockSize, progress), codec, metadata);
    		//以下方法初始化了Header
    		initializeFileHeader();
    		writeFileHeader();
    		finalizeFileHeader();
    		//key part的buffer,与ColumnBuffer不同的是,KeyBuffer是整个key part的buffer
    		key = new KeyBuffer(columnNumber);
    		//未压缩的每列的大小
    		plainTotalColumnLength = new int[columnNumber];
    		//压缩后的每列的大小
    		comprTotalColumnLength = new int[columnNumber];
    	}
    

            由上可知,构造方法主要完成了各参数的设置和Header的写入。
            Writer的功能比较简单,主要是append方法。

    public void append(Writable val) throws IOException {
    
    		//RCFile规定,写入的数据必须是BytesRefArrayWritable的实例
    		if (!(val instanceof BytesRefArrayWritable)) {
    			throw new UnsupportedOperationException(
    					"Currently the writer can only accept BytesRefArrayWritable");
    		}
    		BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
    		int size = columns.size();
    		for (int i = 0; i < size; i++) {
    			BytesRefWritable cu = columns.get(i);
    			int plainLen = cu.getLength();
    			columnBufferSize += plainLen;
    			columnValuePlainLength[i] += plainLen;
    			columnBuffers[i].append(cu);
    		}
    		//如果实际列数小于参数所设定的列数,则此列以0填充
    		if (size < columnNumber) {
    			for (int i = columns.size(); i < columnNumber; i++) {
    				columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
    			}
    		}
    
    		bufferedRecords++;
    		//如果当前buffer中的row split实际大小大于阈值,或者row split的个数大于阈值的话,则对数据进行flush
    		if ((columnBufferSize > columnsBufferSize)
    				|| (bufferedRecords >= RECORD_INTERVAL)) {
    			flushRecords();//刷新数据
    		}
    	}

        BytesRefArrayWritable实际是BytesRefWritable类型的数组,而BytesRefWritable实际为byte数组。即:BytesRefArrayWritable存储的是一个row split的数据。

        将数据写入文件的操作是flushRecords完成的。

    	private void flushRecords() throws IOException {
    
    		key.numberRows = bufferedRecords;
    		Compressor compressor = null;
    		NonSyncDataOutputBuffer valueBuffer = null;
    		CompressionOutputStream deflateFilter = null;
    		DataOutputStream deflateOut = null;
    		boolean isCompressed = isCompressed();
    		int valueLength = 0;
    		if (isCompressed) {
    			ReflectionUtils.setConf(codec, this.conf);
    			compressor = CodecPool.getCompressor(codec);
    			valueBuffer = new NonSyncDataOutputBuffer();
    			deflateFilter = codec.createOutputStream(valueBuffer, compressor);
    			deflateOut = new DataOutputStream(deflateFilter);
    		}
    
    		for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
    			ColumnBuffer currentBuf = columnBuffers[columnIndex];
    			currentBuf.flushGroup();
    
    			NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
    			int colLen;
    			int plainLen = columnValuePlainLength[columnIndex];
    
    			if (isCompressed) {
    				if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
    					((SchemaAwareCompressionOutputStream)deflateFilter).
    					setColumnIndex(columnIndex);
    				}
    				deflateFilter.resetState();
    				deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
    				deflateOut.flush();
    				deflateFilter.finish();
    				// find how much compressed data was added for this column
    				colLen = valueBuffer.getLength() - valueLength;
    			} else {
    				colLen = columnValuePlainLength[columnIndex];
    			}
    			valueLength += colLen;
    			key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
    					columnIndex);
    			plainTotalColumnLength[columnIndex] += plainLen;
    			comprTotalColumnLength[columnIndex] += colLen;
    			columnValuePlainLength[columnIndex] = 0;
    		}
    
    		int keyLength = key.getSize();
    		if (keyLength < 0) {
    			throw new IOException("negative length keys not allowed: " + key);
    		}
    		if (compressor != null) {
    			CodecPool.returnCompressor(compressor);
    		}
    
    		// Write the key out 真正写入到文件
    		writeKey(key, keyLength + valueLength, keyLength);
    		// write the value out
    		if (isCompressed) {
    			out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
    		} else {
    			for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
    				NonSyncDataOutputBuffer buf =
    						columnBuffers[columnIndex].columnValBuffer;
    				out.write(buf.getData(), 0, buf.getLength());
    			}
    		}
    		// clear the columnBuffers
    		clearColumnBuffers();
    		bufferedRecords = 0;
    		columnBufferSize = 0;
    	}
    


            可以看到,方法的前一部分有flush操作,但数据实际是被写入到ByteArrayOutputStream,还是在内存中。真正写入文件的操作是在最后完成的。写数据的操作到此结束。

    Reader:

        Reader要比Writer的操作多一些,因为有分别读取行列的方法。其中行读取可以读取每一行数据,列读取会buffer一个row split的数据。首先来看构造方法:

    	public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
    			long start, long length) throws IOException {
    		
    		tolerateCorruptions = conf.getBoolean(TOLERATE_CORRUPTIONS_CONF_STR, false);
    		conf.setInt("io.file.buffer.size", bufferSize);
    		this.file = file;
    		in = openFile(fs, file, bufferSize, length);
    		this.conf = conf;
    		end = start + length;
    		boolean succeed = false;
    		try {
    			if (start > 0) {
    				seek(0);
    				init();//初始化操作读取头文件等信息
    				seek(start);
    			} else {
    				init();
    			}
    			succeed = true;
    		} finally {
    			if (!succeed) {
    				if (in != null) {
    					try {
    						in.close();
    					} catch(IOException e) {
    						if (LOG != null && LOG.isDebugEnabled()) {
    							LOG.debug("Exception in closing " + in, e);
    						}
    					}
    				}
    			}
    		}
    
    		columnNumber = Integer.parseInt(metadata.get(
    				new Text(COLUMN_NUMBER_METADATA_STR)).toString());//读取总列数
    
    		//获取需要读取的各列的索引,在读取在读取之前应该将需要读取的列信息通过ColumnProjectionUtils.setReadColumnIDs方法写入配置文件
    		java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
    				.getReadColumnIDs(conf);
    		boolean[] skippedColIDs = new boolean[columnNumber];//标识了各列是否被选择,这个参数名字比较容易被误解
    
    		//将各字段置为true,再将已选字段置为false
    		if (notSkipIDs.size() > 0) {
    			for (int i = 0; i < skippedColIDs.length; i++) {
    				skippedColIDs[i] = true;
    			}
    			for (int read : notSkipIDs) {
    				if (read < columnNumber) {
    					skippedColIDs[read] = false;
    				}
    			}
    		} else {
    			// TODO: if no column name is specified e.g, in select count(1) from tt;
    			// skip all columns, this should be distinguished from the case:
    			// select * from tt;
    			for (int i = 0; i < skippedColIDs.length; i++) {
    				skippedColIDs[i] = false;
    			}
    		}
    
    		//设置loadColumnNum并修正
    		loadColumnNum = columnNumber;
    		if (skippedColIDs.length > 0) {
    			for (boolean skippedColID : skippedColIDs) {
    				if (skippedColID) {
    					loadColumnNum -= 1;
    				}
    			}
    		}
    
    		//revPrjColIDs是原始的列序号到索引的一个映射
    		revPrjColIDs = new int[columnNumber];
    		// get list of selected column IDs
    		selectedColumns = new SelectedColumn[loadColumnNum];
    		colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
    
    		//未被选择的列的索引映射为-1
    		//全部列的序号 -> 去除未选择的列后重新排序的序号
    		//e.g. a b c d e的序号分别为1 2 3 4 5,选择了b和d,则b和d的序号由2 4变为 1 2
    		for (int i = 0, j = 0; i < columnNumber; ++i) {
    			if (!skippedColIDs[i]) {
    				SelectedColumn col = new SelectedColumn();
    				col.colIndex = i;
    				col.runLength = 0;
    				col.prvLength = -1;
    				col.rowReadIndex = 0;
    				selectedColumns[j] = col;
    				colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
    				revPrjColIDs[i] = j;
    				j++;
    			} else {
    				revPrjColIDs[i] = -1;
    			}
    		}
    
    		currentKey = createKeyBuffer();
    		boolean lazyDecompress = !tolerateCorruptions;
    		currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec, lazyDecompress);
    	}
    


            在Reader中用到了一个数据结构:ValueBuffer,这个与Writer中用到的ColumnBuffer类似。ValueBuffer保持了一个KeyBuffer的引用,为了获取偏移量和长度等信息,便于读取数据。

            与读取行数据相关的两个操作是next(LongWritable) 和 getCurrentRow(BytesRefArrayWritable)。

            next操作实际上返回两个有意义的信息:1、是否还有更多的行信息可以读取;2、已经通过next()“读取”了多少行的信息,但这个数据并不准确。

    	public synchronized boolean next(LongWritable readRows) throws IOException {
    
    		if (hasRecordsInBuffer()) {//buffer中是否还有数据,return readRowsIndexInBuffer < recordsNumInValBuffer
    			readRows.set(passedRowsNum);
    			readRowsIndexInBuffer++;//当前行在buffer中的索引
    			passedRowsNum++;//已经“读取”的行数
    			rowFetched = false;
    			return true;
    		} else {
    			keyInit = false;
    		}
    
    		//以下代码表示当前row split已经读取完,将下一个row split信息读入内存
    		int ret = -1;
    		if (tolerateCorruptions) {
    			ret = nextKeyValueTolerateCorruptions();
    		} else {
    			try {
    				ret = nextKeyBuffer();
    			} catch (EOFException eof) {
    				eof.printStackTrace();
    			}
    		}
    		return (ret > 0) && next(readRows);
    	}
    


            由以上可知,内存中只存储一个row split的信息,更确切的是一个key part的信息。因为key part有value part的偏移量和数据长度信息。当需要读取某行数据时,才会通过getCurrentRow()方法将所需数据加载进内存。

    	public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
    
    		if (!keyInit || rowFetched) {
    			return;
    		}
    
    		if (tolerateCorruptions) {
    			if (!currentValue.inited) {
    				currentValueBuffer();//此方法将value part的数据读入内存
    			}
    			ret.resetValid(columnNumber);
    		} else {
    			if (!currentValue.inited) {//初始化
    				currentValueBuffer();
    				// do this only when not initialized, but we may need to find a way to
    				// tell the caller how to initialize the valid size
    				ret.resetValid(columnNumber);
    			}
    		}
    
    		// we do not use BytesWritable here to avoid the byte-copy from
    		// DataOutputStream to BytesWritable
    		if (currentValue.numCompressed > 0) {
    			for (int j = 0; j < selectedColumns.length; ++j) {
    				SelectedColumn col = selectedColumns[j];
    				int i = col.colIndex;
    
    				BytesRefWritable ref = ret.unCheckedGet(i);
    
    				colAdvanceRow(j, col);
    
    				if (currentValue.decompressedFlag[j]) {
    					ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
    							col.rowReadIndex, col.prvLength);
    				} else {
    					ref.set(currentValue.lazyDecompressCallbackObjs[j],
    							col.rowReadIndex, col.prvLength);
    				}
    				col.rowReadIndex += col.prvLength;
    			}
    		} else {
    			// This version of the loop eliminates a condition check and branch
    			// and is measurably faster (20% or so)
    			for (int j = 0; j < selectedColumns.length; ++j) {
    				SelectedColumn col = selectedColumns[j];
    				int i = col.colIndex;
    
    				BytesRefWritable ref = ret.unCheckedGet(i);
    
    				colAdvanceRow(j, col);
    				ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
    						col.rowReadIndex, col.prvLength);//读取指定行信息
    				col.rowReadIndex += col.prvLength;
    			}
    		}
    		rowFetched = true;
    	}
    


            由以上可知,若第一次读取当前row split数据时,会初始化ValueBuffer,将一个用户所选列的一个row split信息全部读入内存。当再次读取当前row split信息时,将直接从内存中获取数据。

            前面提到BytesRefArrayWritable实际是一个byte类型的二维数组结构。BytesRefWritable表示byte类型的数组,存储的是一列的数据。

    ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),col.rowReadIndex, col.prvLength);//读取指定行信息
    

    public void set(byte[] newData, int offset, int len) {
    
    bytes = newData;
    start = offset;
    length = len;
    lazyDecompressObj = null;
    
    }


            ref仍然保持了某一列的所有行数据的引用,通过偏移量来确定某一行的元素。BytesRefArrayWritable没有提供将列数组转为行数组的接口,所以用户若要获取直观的行数据,还需要自己进行处理。

            next()和getCurrentRow()方法是与获取行数据有关的,getColumn()和nextColumnsBatch()则是与获取列数据相关的方法。

    	public BytesRefArrayWritable getColumn(int columnID, BytesRefArrayWritable rest) throws IOException {
    
    		int selColIdx = revPrjColIDs[columnID];
    		if (selColIdx == -1) {
    			return null;
    		}
    
    		if (rest == null) {
    			rest = new BytesRefArrayWritable();
    		}
    
    		rest.resetValid(recordsNumInValBuffer);
    
    		if (!currentValue.inited) {
    			currentValueBuffer();
    		}
    
    		int columnNextRowStart = 0;
    		fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
    				.getData(), currentKey.allCellValLenBuffer[columnID].getLength());
    		SelectedColumn selCol = selectedColumns[selColIdx];
    		byte[] uncompData = null;
    		ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
    		boolean decompressed = currentValue.decompressedFlag[selColIdx];
    		if (decompressed) {
    			uncompData =
    					currentValue.loadedColumnsValueBuffer[selColIdx].getData();
    		} else {
    			decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
    		}
    		for (int i = 0; i < recordsNumInValBuffer; i++) {
    			colAdvanceRow(selColIdx, selCol);
    			int length = selCol.prvLength;
    
    			BytesRefWritable currentCell = rest.get(i);
    
    			if (decompressed) {
    				currentCell.set(uncompData, columnNextRowStart, length);
    			} else {
    				currentCell.set(decompCallBack, columnNextRowStart, length);
    			}
    			columnNextRowStart = columnNextRowStart + length;
    		}
    		return rest;
    	}
    


            getColumn()方法是获取指定列数据的。参数中的columnID是列序号,rest存放的是获取到的列数据,方法返回值也是获取到的列数据。也就是说,当rest不为null时,方法的返回值与rest是一致的。

            nextColumnsBatch()方法比较简单,读取下一个row split的key part。

    	public synchronized boolean nextColumnsBatch() throws IOException {
    		passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
    		return nextKeyBuffer() > 0;
    	}
    

    分享到:
    评论

    相关推荐

    Global site tag (gtag.js) - Google Analytics