本文转自:http://sgq0085.iteye.com/blog/2215318
一.前言
本文是对大数据文本文件读取(按行读取)的优化,目前常规的方案(限于JDK)有三种,第一种LineNumberReader,第二种RandomAccessFile,第三种是内存映射文件(详见http://sgq0085.iteye.com/blog/1318622)在RandomAccessFile基础上调用getChannel().map(...)。
1.LineNumberReader
按行读取,只能从第一行向后遍历,到需要读取的行时开始读入,直到完成;在我的测试用例中,读取1000W行数据每次5万行,用时93秒,效率实测比RandomAccessFile要高,但读取一亿跳数据时效率太低了(因为每次都要从头遍历),因为测试时超过1个小时,放弃测试;
2.RandomAccessFile
实际不适用于这种大数据读取,RandomAccessFile是为了磁盘文件的随机访问,所以效率很低,1000w行测试时用时140秒,一亿行数据测试用时1438秒但由于可以通过getFilePointer方法记录位置,并通过seek方法指定读取位置,所以从理论上比较适用这种大数据按行读取的场景;
RandomAccessFile只能按照8859_1这种方法读取,所以需要对内容重新编码,方法如下
- new String(pin.getBytes("8859_1"), "")
3.内存映射文件
由于每行数据大小不同,内存映射文件在这种情况下不适用,其他情况请参考我的博客(详见http://sgq0085.iteye.com/blog/1318622)
二.解决方案
如果在RandomAccessFile基础上,整合内部缓冲区,效率会有提高,测试过程中1000w行数据用时1秒,1亿行数据用时103(比1438秒快了13倍左右)
BufferedRandomAccessFile
网上已经有实现,代码如下:
- package com.gqshao.file.io;
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.util.Arrays;
- public class BufferedRandomAccessFile extends RandomAccessFile {
- static final int LogBuffSz_ = 16; // 64K buffer
- public static final int BuffSz_ = (1 << LogBuffSz_);
- static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
- private String path_;
- /*
- * This implementation is based on the buffer implementation in Modula-3's
- * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
- */
- private boolean dirty_; // true iff unflushed bytes exist
- private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
- private long curr_; // current position in file
- private long lo_, hi_; // bounds on characters in "buff"
- private byte[] buff_; // local buffer
- private long maxHi_; // this.lo + this.buff.length
- private boolean hitEOF_; // buffer contains last file block?
- private long diskPos_; // disk position
- /*
- * To describe the above fields, we introduce the following abstractions for
- * the file "f":
- *
- * len(f) the length of the file curr(f) the current position in the file
- * c(f) the abstract contents of the file disk(f) the contents of f's
- * backing disk file closed(f) true iff the file is closed
- *
- * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
- * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
- * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
- * operation has the effect of making "disk(f)" identical to "c(f)".
- *
- * A file is said to be *valid* if the following conditions hold:
- *
- * V1. The "closed" and "curr" fields are correct:
- *
- * f.closed == closed(f) f.curr == curr(f)
- *
- * V2. The current position is either contained in the buffer, or just past
- * the buffer:
- *
- * f.lo <= f.curr <= f.hi
- *
- * V3. Any (possibly) unflushed characters are stored in "f.buff":
- *
- * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
- *
- * V4. For all characters not covered by V3, c(f) and disk(f) agree:
- *
- * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
- * disk(f)[i])
- *
- * V5. "f.dirty" is true iff the buffer contains bytes that should be
- * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
- *
- * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
- *
- * V6. this.maxHi == this.lo + this.buff.length
- *
- * Note that "f.buff" can be "null" in a valid file, since the range of
- * characters in V3 is empty when "f.lo == f.curr".
- *
- * A file is said to be *ready* if the buffer contains the current position,
- * i.e., when:
- *
- * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
- *
- * When a file is ready, reading or writing a single byte can be performed
- * by reading or writing the in-memory buffer without performing a disk
- * operation.
- */
- /**
- * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
- * in mode <code>mode</code>, which should be "r" for reading only, or
- * "rw" for reading and writing.
- */
- public BufferedRandomAccessFile(File file, String mode) throws IOException {
- this(file, mode, 0);
- }
- public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
- super(file, mode);
- path_ = file.getAbsolutePath();
- this.init(size);
- }
- /**
- * Open a new <code>BufferedRandomAccessFile</code> on the file named
- * <code>name</code> in mode <code>mode</code>, which should be "r" for
- * reading only, or "rw" for reading and writing.
- */
- public BufferedRandomAccessFile(String name, String mode) throws IOException {
- this(name, mode, 0);
- }
- public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
- super(name, mode);
- path_ = name;
- this.init(size);
- }
- private void init(int size) {
- this.dirty_ = false;
- this.lo_ = this.curr_ = this.hi_ = 0;
- this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
- this.maxHi_ = (long) BuffSz_;
- this.hitEOF_ = false;
- this.diskPos_ = 0L;
- }
- public String getPath() {
- return path_;
- }
- public void sync() throws IOException {
- if (syncNeeded_) {
- flush();
- getChannel().force(true);
- syncNeeded_ = false;
- }
- }
- // public boolean isEOF() throws IOException
- // {
- // assert getFilePointer() <= length();
- // return getFilePointer() == length();
- // }
- public void close() throws IOException {
- this.flush();
- this.buff_ = null;
- super.close();
- }
- /**
- * Flush any bytes in the file's buffer that have not yet been written to
- * disk. If the file was created read-only, this method is a no-op.
- */
- public void flush() throws IOException {
- this.flushBuffer();
- }
- /* Flush any dirty bytes in the buffer to disk. */
- private void flushBuffer() throws IOException {
- if (this.dirty_) {
- if (this.diskPos_ != this.lo_)
- super.seek(this.lo_);
- int len = (int) (this.curr_ - this.lo_);
- super.write(this.buff_, 0, len);
- this.diskPos_ = this.curr_;
- this.dirty_ = false;
- }
- }
- /*
- * Read at most "this.buff.length" bytes into "this.buff", returning the
- * number of bytes read. If the return result is less than
- * "this.buff.length", then EOF was read.
- */
- private int fillBuffer() throws IOException {
- int cnt = 0;
- int rem = this.buff_.length;
- while (rem > 0) {
- int n = super.read(this.buff_, cnt, rem);
- if (n < 0)
- break;
- cnt += n;
- rem -= n;
- }
- if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
- // make sure buffer that wasn't read is initialized with -1
- Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
- }
- this.diskPos_ += cnt;
- return cnt;
- }
- /*
- * This method positions <code>this.curr</code> at position <code>pos</code>.
- * If <code>pos</code> does not fall in the current buffer, it flushes the
- * current buffer and loads the correct one.<p>
- *
- * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
- * is at or past the end-of-file, which can only happen if the file was
- * opened in read-only mode.
- */
- public void seek(long pos) throws IOException {
- if (pos >= this.hi_ || pos < this.lo_) {
- // seeking outside of current buffer -- flush and read
- this.flushBuffer();
- this.lo_ = pos & BuffMask_; // start at BuffSz boundary
- this.maxHi_ = this.lo_ + (long) this.buff_.length;
- if (this.diskPos_ != this.lo_) {
- super.seek(this.lo_);
- this.diskPos_ = this.lo_;
- }
- int n = this.fillBuffer();
- this.hi_ = this.lo_ + (long) n;
- } else {
- // seeking inside current buffer -- no read required
- if (pos < this.curr_) {
- // if seeking backwards, we must flush to maintain V4
- this.flushBuffer();
- }
- }
- this.curr_ = pos;
- }
- public long getFilePointer() {
- return this.curr_;
- }
- public long length() throws IOException {
- // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
- return Math.max(this.curr_, super.length());
- }
- public int read() throws IOException {
- if (this.curr_ >= this.hi_) {
- // test for EOF
- // if (this.hi < this.maxHi) return -1;
- if (this.hitEOF_)
- return -1;
- // slow path -- read another buffer
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- return -1;
- }
- byte res = this.buff_[(int) (this.curr_ - this.lo_)];
- this.curr_++;
- return ((int) res) & 0xFF; // convert byte -> int
- }
- public int read(byte[] b) throws IOException {
- return this.read(b, 0, b.length);
- }
- public int read(byte[] b, int off, int len) throws IOException {
- if (this.curr_ >= this.hi_) {
- // test for EOF
- // if (this.hi < this.maxHi) return -1;
- if (this.hitEOF_)
- return -1;
- // slow path -- read another buffer
- this.seek(this.curr_);
- if (this.curr_ == this.hi_)
- return -1;
- }
- len = Math.min(len, (int) (this.hi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(this.buff_, buffOff, b, off, len);
- this.curr_ += len;
- return len;
- }
- public void write(int b) throws IOException {
- if (this.curr_ >= this.hi_) {
- if (this.hitEOF_ && this.hi_ < this.maxHi_) {
- // at EOF -- bump "hi"
- this.hi_++;
- } else {
- // slow path -- write current buffer; read next one
- this.seek(this.curr_);
- if (this.curr_ == this.hi_) {
- // appending to EOF -- bump "hi"
- this.hi_++;
- }
- }
- }
- this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
- this.curr_++;
- this.dirty_ = true;
- syncNeeded_ = true;
- }
- public void write(byte[] b) throws IOException {
- this.write(b, 0, b.length);
- }
- public void write(byte[] b, int off, int len) throws IOException {
- while (len > 0) {
- int n = this.writeAtMost(b, off, len);
- off += n;
- len -= n;
- this.dirty_ = true;
- syncNeeded_ = true;
- }
- }
- /*
- * Write at most "len" bytes to "b" starting at position "off", and return
- * the number of bytes written.
- */
- private int writeAtMost(byte[] b, int off, int len) throws IOException {
- if (this.curr_ >= this.hi_) {
- if (this.hitEOF_ && this.hi_ < this.maxHi_) {
- // at EOF -- bump "hi"
- this.hi_ = this.maxHi_;
- } else {
- // slow path -- write current buffer; read next one
- this.seek(this.curr_);
- if (this.curr_ == this.hi_) {
- // appending to EOF -- bump "hi"
- this.hi_ = this.maxHi_;
- }
- }
- }
- len = Math.min(len, (int) (this.hi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(b, off, this.buff_, buffOff, len);
- this.curr_ += len;
- return len;
- }
- }
三.测试
1.FileUtil
用于封装三种方案(LineNumberReader、RandomAccessFile、BufferedRandomAccessFile)的文件读取
- package com.gqshao.file.util;
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
- import com.gqshao.file.io.BufferedRandomAccessFile;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang3.StringUtils;
- import java.io.*;
- import java.util.List;
- import java.util.Map;
- public class FileUtil {
- /**
- * 通过BufferedRandomAccessFile读取文件,推荐
- *
- * @param file 源文件
- * @param encoding 文件编码
- * @param pos 偏移量
- * @param num 读取量
- * @return pins文件内容,pos当前偏移量
- */
- public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) {
- Map<String, Object> res = Maps.newHashMap();
- List<String> pins = Lists.newArrayList();
- res.put("pins", pins);
- BufferedRandomAccessFile reader = null;
- try {
- reader = new BufferedRandomAccessFile(file, "r");
- reader.seek(pos);
- for (int i = 0; i < num; i++) {
- String pin = reader.readLine();
- if (StringUtils.isBlank(pin)) {
- break;
- }
- pins.add(new String(pin.getBytes("8859_1"), encoding));
- }
- res.put("pos", reader.getFilePointer());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(reader);
- }
- return res;
- }
- /**
- * 通过RandomAccessFile读取文件,能出来大数据文件,效率低
- *
- * @param file 源文件
- * @param encoding 文件编码
- * @param pos 偏移量
- * @param num 读取量
- * @return pins文件内容,pos当前偏移量
- */
- public static Map<String, Object> readLine(File file, String encoding, long pos, int num) {
- Map<String, Object> res = Maps.newHashMap();
- List<String> pins = Lists.newArrayList();
- res.put("pins", pins);
- RandomAccessFile reader = null;
- try {
- reader = new RandomAccessFile(file, "r");
- reader.seek(pos);
- for (int i = 0; i < num; i++) {
- String pin = reader.readLine();
- if (StringUtils.isBlank(pin)) {
- break;
- }
- pins.add(new String(pin.getBytes("8859_1"), encoding));
- }
- res.put("pos", reader.getFilePointer());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(reader);
- }
- return res;
- }
- /**
- * 使用LineNumberReader读取文件,1000w行比RandomAccessFile效率高,无法处理1亿条数据
- *
- * @param file 源文件
- * @param encoding 文件编码
- * @param index 开始位置
- * @param num 读取量
- * @return pins文件内容
- */
- public static List<String> readLine(File file, String encoding, int index, int num) {
- List<String> pins = Lists.newArrayList();
- LineNumberReader reader = null;
- try {
- reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), encoding));
- int lines = 0;
- while (true) {
- String pin = reader.readLine();
- if (StringUtils.isBlank(pin)) {
- break;
- }
- if (lines >= index) {
- if (StringUtils.isNotBlank(pin)) {
- pins.add(pin);
- }
- }
- if (num == pins.size()) {
- break;
- }
- lines++;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(reader);
- }
- return pins;
- }
- }
2.RandomAccessFileTest
测试方法,涉及到的randomFile只是一个掺杂中文的文本文件,可以自己随便写一个
- package com.gqshao.file;
- import com.gqshao.file.util.FileUtil;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.collections.MapUtils;
- import org.apache.commons.io.IOUtils;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.*;
- import java.util.List;
- import java.util.Map;
- public class RandomAccessFileTest {
- private static final Logger logger = LoggerFactory.getLogger(RandomAccessFileTest.class);
- private static final String ENCODING = "UTF-8";
- private static final int NUM = 50000;
- private static File file = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "test.txt");
- private static File randomFile = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "RandomFile.txt");
- /**
- * 生成1000w随机文本文件
- */
- @Test
- public void makePin() {
- String prefix = "_$#";
- OutputStreamWriter out = null;
- try {
- out = new OutputStreamWriter(new FileOutputStream(file, true), ENCODING);
- // 在1500w里随机1000w数据
- for (int j = 0; j < 100000000; j++) {
- out.write(prefix + (int) (130000000 * Math.random()) + "\n");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(out);
- }
- logger.info(file.getAbsolutePath());
- }
- /**
- * 测试RandomAccessFile读取文件
- */
- @Test
- public void testRandomAccessRead() {
- long start = System.currentTimeMillis();
- //
- logger.info(String.valueOf(file.exists()));
- long pos = 0L;
- while (true) {
- Map<String, Object> res = FileUtil.readLine(file, ENCODING, pos, NUM);
- // 如果返回结果为空结束循环
- if (MapUtils.isEmpty(res)) {
- break;
- }
- Object po = res.get("pins");
- List<String> pins = (List<String>) res.get("pins");
- if (CollectionUtils.isNotEmpty(pins)) {
- // logger.info(Arrays.toString(pins.toArray()));
- if (pins.size() < NUM) {
- break;
- }
- } else {
- break;
- }
- pos = (Long) res.get("pos");
- }
- logger.info(((System.currentTimeMillis() - start) / 1000) + "");
- }
- /**
- * 测试RandomAccessFile读取文件
- */
- @Test
- public void testBufferedRandomAccessRead() {
- long start = System.currentTimeMillis();
- //
- logger.info(String.valueOf(file.exists()));
- long pos = 0L;
- while (true) {
- Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(file, ENCODING, pos, NUM);
- // 如果返回结果为空结束循环
- if (MapUtils.isEmpty(res)) {
- break;
- }
- List<String> pins = (List<String>) res.get("pins");
- if (CollectionUtils.isNotEmpty(pins)) {
- // logger.info(Arrays.toString(pins.toArray()));
- if (pins.size() < NUM) {
- break;
- }
- } else {
- break;
- }
- pos = (Long) res.get("pos");
- }
- logger.info(((System.currentTimeMillis() - start) / 1000) + "");
- }
- /**
- * 测试普通读取文件
- */
- @Test
- public void testCommonRead() {
- long start = System.currentTimeMillis();
- logger.info(String.valueOf(randomFile.exists()));
- int index = 0;
- while (true) {
- List<String> pins = FileUtil.readLine(file, ENCODING, index, NUM);
- if (CollectionUtils.isNotEmpty(pins)) {
- // logger.info(Arrays.toString(pins.toArray()));
- if (pins.size() < NUM) {
- break;
- }
- } else {
- break;
- }
- index += NUM;
- }
- logger.info(((System.currentTimeMillis() - start) / 1000) + "");
- }
- }
相关推荐
在这个场景中,我们关注的是在Internet Explorer(IE)浏览器中利用JavaScript来读取和展示大尺寸的TXT文本文件。描述提到的数据量在10万级别,意味着我们需要一种策略来避免内存溢出或性能瓶颈。 首先,JavaScript...
"大数据文本三剑客"指的是Linux环境中用于处理和分析大量文本数据的重要工具——grep、sed和awk。这些工具在大数据领域扮演着不可或缺的角色,因为它们能够高效地处理文本数据,进行搜索、替换、修改和过滤等操作。 ...
3.1 读取网页招聘信息文本文件:首先,需要从招聘网站抓取或爬取招聘信息,这通常涉及到网络爬虫技术,以获取最新的、大量的招聘数据。 3.2 招聘信息文本分词:分词是文本预处理的关键步骤,通过将句子分割成词汇...
在VB(Visual Basic)编程中,高效地读取文本文件是一项关键技能,特别是在处理大量数据时。本主题将深入探讨如何使用VB实现高速读取文本文件的方法,旨在提高程序的性能和效率。 首先,让我们了解VB中最常见的文本...
对于开发人员、数据分析师或系统管理员来说,高效地处理大文本文件是日常工作中常见的情景。这类工具通常具备以下特性: 1. **高性能读取**:针对大文件设计,能够快速加载数GB乃至更大大小的文件,而不消耗大量...
这个软件工具专为此目的设计,能够高效地将结构化的文本文件转换为Excel电子表格格式,支持大量数据的处理,包括超过65536行和256列的数据。以下是关于这个过程的详细知识点: 1. **文本文件格式**:文本文件通常...
在IT领域,文本文件处理是一项基础且重要的任务。在日常工作中,我们经常遇到需要对大量文本数据进行操作的情况,比如电子图书的整理、数据分析、日志分析等。在这个场景下,“文本文件(*.txt)分割合并”是一个非常...
5. **大数据导出**:在处理大数据时,ABAP提供多种工具,如`CL_BCS`(Business Communication Services)用于生成Excel文件,或者`CL_GUI_FRONTEND_SERVICES`类的`DOWNLOAD_FILE`方法进行文件下载。对于大规模数据,...
在IT行业中,处理数据是一项核心任务,特别是在数据...总的来说,读取和处理文本文件涉及到编程语言的基本操作、数据解析技巧以及高效的数据处理库。熟练掌握这些技能,将有助于在各种IT项目中有效地管理和分析数据。
超大文本文件查看编辑器是一种专门...总的来说,超大文本文件查看编辑器是针对大数据时代应运而生的专业工具,它通过优化的算法和高效的设计,解决了处理大型文本文件时面临的挑战,使得分析和编辑大文件变得轻松可行。
- **BufferedReader与BufferedWriter**:用于提高文本文件读写的效率,通过内部缓冲区减少实际的磁盘I/O操作。例如: ```java BufferedReader br = new BufferedReader(new FileReader("file.txt")); String ...
文本文件压缩是信息处理中的一个重要领域,特别是在大数据和存储有限的场景下,高效的数据压缩技术显得尤为关键。Huffman编码是一种基于频率的无损数据压缩方法,由David A. Huffman在1952年提出,它是数据结构课程...
针对这种情况,专门的文本处理工具如"PilotEdit"应运而生,它优化了对大型文本文件的读取和编辑,从而提高了工作效率。 "PilotEdit"是一款强大的文本编辑器,特别适合处理超大文本文件。它具备以下特点和功能: 1....
这在处理文本文件时特别实用,比如读取日志、CSV数据等。这个函数可能包括读取指定行的数据、处理行结束符(如换行符)以及错误处理等细节。 条件查询是高级功能,允许开发者根据特定条件筛选文件中的数据。这可能...
在标题“mapreduce案例文本文件.zip”中,我们可以推测这是一个包含MapReduce实际操作案例的压缩包。通常,这样的案例会包含一系列的文本文件,这些文件可能用于演示如何使用MapReduce处理和分析数据。例如,这些...
文本文件导入导出处理是IT领域中常见的任务,特别是在数据分析、数据存储和程序间的数据...熟练掌握这些技能对任何IT专业人员来说都至关重要,尤其是在大数据时代,高效、准确地处理文本文件数据是提升工作效率的关键。
PB提供了多种机制来支持这种需求,包括使用`BLOB`类型存储大数据、分段读取和写入文件等功能。 #### BLOB类型的介绍 `BLOB`是Binary Large Object的缩写,在数据库系统中常用来存储二进制数据。在PB中,可以通过`...
标题中的“将一个txt里...总之,这个批处理脚本解决了在大数据环境中对文本文件进行高效拆分的问题,对于需要处理大量文本数据的场景非常有用。通过学习和应用这种技术,我们可以提升在日常工作中处理类似任务的能力。
`StreamReader`是用于读取字符流的类,适合读取ASCII或Unicode编码的文本文件。例如,我们可以使用以下代码片段来逐行读取文本文件: ```csharp using (StreamReader reader = new StreamReader("FilePath.txt"))...
本文将探讨如何高效地删除文本文件中的重复行,重点介绍一个使用Pascal语言实现的解决方案,该方案能在短短0.3秒内处理含有100万行、100MB的文本文件。 首先,我们需要理解“删除重复行”的概念。在文本文件中,...