一.前言
本文是对大数据文本文件读取(按行读取)的优化,目前常规的方案(限于JDK)有三种,第一种LineNumberReader,第二种RandomAccessFile,第三种是内存映射文件(详见http://sgq0085.iteye.com/blog/1318622)在RandomAccessFile基础上调用getChannel().map(...)。
1.LineNumberReader
按行读取,只能从第一行向后遍历,到需要读取的行时开始读入,直到完成;在我的测试用例中,读取1000W行数据每次5万行,用时93秒,效率实测比RandomAccessFile要高,但读取一亿跳数据时效率太低了(因为每次都要从头遍历),因为测试时超过1个小时,放弃测试;
2.RandomAccessFile
实际不适用于这种大数据读取,RandomAccessFile是为了磁盘文件的随机访问,所以效率很低,1000w行测试时用时140秒,一亿行数据测试用时1438秒但由于可以通过getFilePointer方法记录位置,并通过seek方法指定读取位置,所以从理论上比较适用这种大数据按行读取的场景;
RandomAccessFile只能按照8859_1这种方法读取,所以需要对内容重新编码,方法如下
new String(pin.getBytes("8859_1"), "")
3.内存映射文件
由于每行数据大小不同,内存映射文件在这种情况下不适用,其他情况请参考我的博客(详见http://sgq0085.iteye.com/blog/1318622)
二.解决方案
如果在RandomAccessFile基础上,整合内部缓冲区,效率会有提高,测试过程中1000w行数据用时1秒,1亿行数据用时103(比1438秒快了13倍左右)
BufferedRandomAccessFile
网上已经有实现,代码如下:
package com.gqshao.file.io; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; public class BufferedRandomAccessFile extends RandomAccessFile { static final int LogBuffSz_ = 16; // 64K buffer public static final int BuffSz_ = (1 << LogBuffSz_); static final long BuffMask_ = ~(((long) BuffSz_) - 1L); private String path_; /* * This implementation is based on the buffer implementation in Modula-3's * "Rd", "Wr", "RdClass", and "WrClass" interfaces. */ private boolean dirty_; // true iff unflushed bytes exist private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately private long curr_; // current position in file private long lo_, hi_; // bounds on characters in "buff" private byte[] buff_; // local buffer private long maxHi_; // this.lo + this.buff.length private boolean hitEOF_; // buffer contains last file block? private long diskPos_; // disk position /* * To describe the above fields, we introduce the following abstractions for * the file "f": * * len(f) the length of the file curr(f) the current position in the file * c(f) the abstract contents of the file disk(f) the contents of f's * backing disk file closed(f) true iff the file is closed * * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush * operation has the effect of making "disk(f)" identical to "c(f)". * * A file is said to be *valid* if the following conditions hold: * * V1. The "closed" and "curr" fields are correct: * * f.closed == closed(f) f.curr == curr(f) * * V2. The current position is either contained in the buffer, or just past * the buffer: * * f.lo <= f.curr <= f.hi * * V3. Any (possibly) unflushed characters are stored in "f.buff": * * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo]) * * V4. For all characters not covered by V3, c(f) and disk(f) agree: * * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] == * disk(f)[i]) * * V5. "f.dirty" is true iff the buffer contains bytes that should be * flushed to the file; by V3 and V4, only part of the buffer can be dirty. * * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo]) * * V6. this.maxHi == this.lo + this.buff.length * * Note that "f.buff" can be "null" in a valid file, since the range of * characters in V3 is empty when "f.lo == f.curr". * * A file is said to be *ready* if the buffer contains the current position, * i.e., when: * * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi * * When a file is ready, reading or writing a single byte can be performed * by reading or writing the in-memory buffer without performing a disk * operation. */ /** * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code> * in mode <code>mode</code>, which should be "r" for reading only, or * "rw" for reading and writing. */ public BufferedRandomAccessFile(File file, String mode) throws IOException { this(file, mode, 0); } public BufferedRandomAccessFile(File file, String mode, int size) throws IOException { super(file, mode); path_ = file.getAbsolutePath(); this.init(size); } /** * Open a new <code>BufferedRandomAccessFile</code> on the file named * <code>name</code> in mode <code>mode</code>, which should be "r" for * reading only, or "rw" for reading and writing. */ public BufferedRandomAccessFile(String name, String mode) throws IOException { this(name, mode, 0); } public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException { super(name, mode); path_ = name; this.init(size); } private void init(int size) { this.dirty_ = false; this.lo_ = this.curr_ = this.hi_ = 0; this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_]; this.maxHi_ = (long) BuffSz_; this.hitEOF_ = false; this.diskPos_ = 0L; } public String getPath() { return path_; } public void sync() throws IOException { if (syncNeeded_) { flush(); getChannel().force(true); syncNeeded_ = false; } } // public boolean isEOF() throws IOException // { // assert getFilePointer() <= length(); // return getFilePointer() == length(); // } public void close() throws IOException { this.flush(); this.buff_ = null; super.close(); } /** * Flush any bytes in the file's buffer that have not yet been written to * disk. If the file was created read-only, this method is a no-op. */ public void flush() throws IOException { this.flushBuffer(); } /* Flush any dirty bytes in the buffer to disk. */ private void flushBuffer() throws IOException { if (this.dirty_) { if (this.diskPos_ != this.lo_) super.seek(this.lo_); int len = (int) (this.curr_ - this.lo_); super.write(this.buff_, 0, len); this.diskPos_ = this.curr_; this.dirty_ = false; } } /* * Read at most "this.buff.length" bytes into "this.buff", returning the * number of bytes read. If the return result is less than * "this.buff.length", then EOF was read. */ private int fillBuffer() throws IOException { int cnt = 0; int rem = this.buff_.length; while (rem > 0) { int n = super.read(this.buff_, cnt, rem); if (n < 0) break; cnt += n; rem -= n; } if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) { // make sure buffer that wasn't read is initialized with -1 Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff); } this.diskPos_ += cnt; return cnt; } /* * This method positions <code>this.curr</code> at position <code>pos</code>. * If <code>pos</code> does not fall in the current buffer, it flushes the * current buffer and loads the correct one.<p> * * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code> * is at or past the end-of-file, which can only happen if the file was * opened in read-only mode. */ public void seek(long pos) throws IOException { if (pos >= this.hi_ || pos < this.lo_) { // seeking outside of current buffer -- flush and read this.flushBuffer(); this.lo_ = pos & BuffMask_; // start at BuffSz boundary this.maxHi_ = this.lo_ + (long) this.buff_.length; if (this.diskPos_ != this.lo_) { super.seek(this.lo_); this.diskPos_ = this.lo_; } int n = this.fillBuffer(); this.hi_ = this.lo_ + (long) n; } else { // seeking inside current buffer -- no read required if (pos < this.curr_) { // if seeking backwards, we must flush to maintain V4 this.flushBuffer(); } } this.curr_ = pos; } public long getFilePointer() { return this.curr_; } public long length() throws IOException { // max accounts for the case where we have written past the old file length, but not yet flushed our buffer return Math.max(this.curr_, super.length()); } public int read() throws IOException { if (this.curr_ >= this.hi_) { // test for EOF // if (this.hi < this.maxHi) return -1; if (this.hitEOF_) return -1; // slow path -- read another buffer this.seek(this.curr_); if (this.curr_ == this.hi_) return -1; } byte res = this.buff_[(int) (this.curr_ - this.lo_)]; this.curr_++; return ((int) res) & 0xFF; // convert byte -> int } public int read(byte[] b) throws IOException { return this.read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws IOException { if (this.curr_ >= this.hi_) { // test for EOF // if (this.hi < this.maxHi) return -1; if (this.hitEOF_) return -1; // slow path -- read another buffer this.seek(this.curr_); if (this.curr_ == this.hi_) return -1; } len = Math.min(len, (int) (this.hi_ - this.curr_)); int buffOff = (int) (this.curr_ - this.lo_); System.arraycopy(this.buff_, buffOff, b, off, len); this.curr_ += len; return len; } public void write(int b) throws IOException { if (this.curr_ >= this.hi_) { if (this.hitEOF_ && this.hi_ < this.maxHi_) { // at EOF -- bump "hi" this.hi_++; } else { // slow path -- write current buffer; read next one this.seek(this.curr_); if (this.curr_ == this.hi_) { // appending to EOF -- bump "hi" this.hi_++; } } } this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b; this.curr_++; this.dirty_ = true; syncNeeded_ = true; } public void write(byte[] b) throws IOException { this.write(b, 0, b.length); } public void write(byte[] b, int off, int len) throws IOException { while (len > 0) { int n = this.writeAtMost(b, off, len); off += n; len -= n; this.dirty_ = true; syncNeeded_ = true; } } /* * Write at most "len" bytes to "b" starting at position "off", and return * the number of bytes written. */ private int writeAtMost(byte[] b, int off, int len) throws IOException { if (this.curr_ >= this.hi_) { if (this.hitEOF_ && this.hi_ < this.maxHi_) { // at EOF -- bump "hi" this.hi_ = this.maxHi_; } else { // slow path -- write current buffer; read next one this.seek(this.curr_); if (this.curr_ == this.hi_) { // appending to EOF -- bump "hi" this.hi_ = this.maxHi_; } } } len = Math.min(len, (int) (this.hi_ - this.curr_)); int buffOff = (int) (this.curr_ - this.lo_); System.arraycopy(b, off, this.buff_, buffOff, len); this.curr_ += len; return len; } }
三.测试
1.FileUtil
用于封装三种方案(LineNumberReader、RandomAccessFile、BufferedRandomAccessFile)的文件读取
package com.gqshao.file.util; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.gqshao.file.io.BufferedRandomAccessFile; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import java.io.*; import java.util.List; import java.util.Map; public class FileUtil { /** * 通过BufferedRandomAccessFile读取文件,推荐 * * @param file 源文件 * @param encoding 文件编码 * @param pos 偏移量 * @param num 读取量 * @return pins文件内容,pos当前偏移量 */ public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) { Map<String, Object> res = Maps.newHashMap(); List<String> pins = Lists.newArrayList(); res.put("pins", pins); BufferedRandomAccessFile reader = null; try { reader = new BufferedRandomAccessFile(file, "r"); reader.seek(pos); for (int i = 0; i < num; i++) { String pin = reader.readLine(); if (StringUtils.isBlank(pin)) { break; } pins.add(new String(pin.getBytes("8859_1"), encoding)); } res.put("pos", reader.getFilePointer()); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } return res; } /** * 通过RandomAccessFile读取文件,能出来大数据文件,效率低 * * @param file 源文件 * @param encoding 文件编码 * @param pos 偏移量 * @param num 读取量 * @return pins文件内容,pos当前偏移量 */ public static Map<String, Object> readLine(File file, String encoding, long pos, int num) { Map<String, Object> res = Maps.newHashMap(); List<String> pins = Lists.newArrayList(); res.put("pins", pins); RandomAccessFile reader = null; try { reader = new RandomAccessFile(file, "r"); reader.seek(pos); for (int i = 0; i < num; i++) { String pin = reader.readLine(); if (StringUtils.isBlank(pin)) { break; } pins.add(new String(pin.getBytes("8859_1"), encoding)); } res.put("pos", reader.getFilePointer()); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } return res; } /** * 使用LineNumberReader读取文件,1000w行比RandomAccessFile效率高,无法处理1亿条数据 * * @param file 源文件 * @param encoding 文件编码 * @param index 开始位置 * @param num 读取量 * @return pins文件内容 */ public static List<String> readLine(File file, String encoding, int index, int num) { List<String> pins = Lists.newArrayList(); LineNumberReader reader = null; try { reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), encoding)); int lines = 0; while (true) { String pin = reader.readLine(); if (StringUtils.isBlank(pin)) { break; } if (lines >= index) { if (StringUtils.isNotBlank(pin)) { pins.add(pin); } } if (num == pins.size()) { break; } lines++; } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } return pins; } }
2.RandomAccessFileTest
测试方法,涉及到的randomFile只是一个掺杂中文的文本文件,可以自己随便写一个
package com.gqshao.file; import com.gqshao.file.util.FileUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.IOUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.List; import java.util.Map; public class RandomAccessFileTest { private static final Logger logger = LoggerFactory.getLogger(RandomAccessFileTest.class); private static final String ENCODING = "UTF-8"; private static final int NUM = 50000; private static File file = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "test.txt"); private static File randomFile = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "RandomFile.txt"); /** * 生成1000w随机文本文件 */ @Test public void makePin() { String prefix = "_$#"; OutputStreamWriter out = null; try { out = new OutputStreamWriter(new FileOutputStream(file, true), ENCODING); // 在1500w里随机1000w数据 for (int j = 0; j < 100000000; j++) { out.write(prefix + (int) (130000000 * Math.random()) + "\n"); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(out); } logger.info(file.getAbsolutePath()); } /** * 测试RandomAccessFile读取文件 */ @Test public void testRandomAccessRead() { long start = System.currentTimeMillis(); // logger.info(String.valueOf(file.exists())); long pos = 0L; while (true) { Map<String, Object> res = FileUtil.readLine(file, ENCODING, pos, NUM); // 如果返回结果为空结束循环 if (MapUtils.isEmpty(res)) { break; } Object po = res.get("pins"); List<String> pins = (List<String>) res.get("pins"); if (CollectionUtils.isNotEmpty(pins)) { // logger.info(Arrays.toString(pins.toArray())); if (pins.size() < NUM) { break; } } else { break; } pos = (Long) res.get("pos"); } logger.info(((System.currentTimeMillis() - start) / 1000) + ""); } /** * 测试RandomAccessFile读取文件 */ @Test public void testBufferedRandomAccessRead() { long start = System.currentTimeMillis(); // logger.info(String.valueOf(file.exists())); long pos = 0L; while (true) { Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(file, ENCODING, pos, NUM); // 如果返回结果为空结束循环 if (MapUtils.isEmpty(res)) { break; } List<String> pins = (List<String>) res.get("pins"); if (CollectionUtils.isNotEmpty(pins)) { // logger.info(Arrays.toString(pins.toArray())); if (pins.size() < NUM) { break; } } else { break; } pos = (Long) res.get("pos"); } logger.info(((System.currentTimeMillis() - start) / 1000) + ""); } /** * 测试普通读取文件 */ @Test public void testCommonRead() { long start = System.currentTimeMillis(); logger.info(String.valueOf(randomFile.exists())); int index = 0; while (true) { List<String> pins = FileUtil.readLine(file, ENCODING, index, NUM); if (CollectionUtils.isNotEmpty(pins)) { // logger.info(Arrays.toString(pins.toArray())); if (pins.size() < NUM) { break; } } else { break; } index += NUM; } logger.info(((System.currentTimeMillis() - start) / 1000) + ""); } }
相关推荐
标题“JS快速读取TXT大数据.zip”所涉及的知识点主要集中在JavaScript(JS)技术上,特别是如何高效地处理大量文本数据。在这个场景中,我们关注的是在Internet Explorer(IE)浏览器中利用JavaScript来读取和展示大...
"大数据文本三剑客"指的是Linux环境中用于处理和分析大量文本数据的重要工具——grep、sed和awk。这些工具在大数据领域扮演着不可或缺的角色,因为它们能够高效地处理文本数据,进行搜索、替换、修改和过滤等操作。 ...
2. **行去重**:在处理文本文件时,去重通常意味着消除重复的行。Python中可以使用集合(set)数据结构来高效地完成这项任务,因为集合中的元素是唯一的。在处理大数据时,可以采用分块读取文件的方式,每次处理一...
在IT行业中,处理数据是一项核心任务,特别是在数据...总的来说,读取和处理文本文件涉及到编程语言的基本操作、数据解析技巧以及高效的数据处理库。熟练掌握这些技能,将有助于在各种IT项目中有效地管理和分析数据。
3.1 读取网页招聘信息文本文件:首先,需要从招聘网站抓取或爬取招聘信息,这通常涉及到网络爬虫技术,以获取最新的、大量的招聘数据。 3.2 招聘信息文本分词:分词是文本预处理的关键步骤,通过将句子分割成词汇...
描述中提到的“若是你有几百万数据的一个文本,想把数据分割成每个1000条”,这表明我们需要处理的文本文件可能包含数百万行的数据,对这样的文件进行操作需要高效的策略。批处理脚本就是一种解决方案,它可以快速地...
启动Flume Agent时,它会读取配置文件并开始数据采集。 Flume提供了多种类型的Source,如Exec源执行Unix命令来收集数据,而Spool目录源则监视特定目录,当新文件出现时进行数据采集。这些源的灵活性使得Flume能够...
CSV(Comma Separated Values)是一种广泛使用的数据格式,它以纯文本形式存储表格数据,每行代表一个记录,列之间用逗号分隔。由于CSV文件的简单结构,它们可以被多种软件工具读取,包括电子表格程序(如Microsoft ...
在VB(Visual Basic)编程中,高效地读取文本文件是一项关键技能,特别是在处理大量数据时。本主题将深入探讨如何使用VB实现高速读取文本文件的方法,旨在提高程序的性能和效率。 首先,让我们了解VB中最常见的文本...
本文将探讨如何高效地删除文本文件中的重复行,重点介绍一个使用Pascal语言实现的解决方案,该方案能在短短0.3秒内处理含有100万行、100MB的文本文件。 首先,我们需要理解“删除重复行”的概念。在文本文件中,...
这在处理文本文件时特别实用,比如读取日志、CSV数据等。这个函数可能包括读取指定行的数据、处理行结束符(如换行符)以及错误处理等细节。 条件查询是高级功能,允许开发者根据特定条件筛选文件中的数据。这可能...
2. 使用pandas读取文本文件:read_table函数可以处理多种文本格式的数据文件,它同样会将文本数据转换为DataFrame对象。当文本文件没有明确的列名时,可以使用header=None让pandas自动分配默认列名,或者使用names...
1. 数据导入:Perl可以读取多种格式的文件,如CSV、JSON、XML等,将数据加载到内存中进行处理。通过Perl的内置模块,如Text::CSV或JSON,可以轻松处理这些格式。 2. 数据清洗:在大数据处理中,数据质量至关重要。...
在IT行业中,尤其是在数据分析、...Qt提供了强大的工具和接口,使得开发者可以构建高效、响应式的用户界面,即使面对千万行级别的数据也能游刃有余。通过不断优化和学习,我们可以更好地应对日益增长的数据处理需求。
"自动读取文件数据实例源码.rar"这个资源提供了一个全面的C#实现,它能帮助开发者理解和掌握如何在不同场景下高效地读取各种类型的数据文件。 首先,我们要了解C#中读取文件的基本方法。在C#中,`System.IO`命名...
4. **长文本读取**:在处理大量文本数据时,ABAP提供了`CL_ABAP_TEXTUTIL`类,它包含`READ_TEXT`方法,可以用于读取超过标准长度限制的文本。这对于存储用户评论、文档内容等长文本非常有用。 5. **大数据导出**:...
综上所述,"读取txt-jdbc导入百万级数据.rar"是一个涉及数据读取、数据库操作、性能优化等多个技术点的实战案例,对于理解和掌握大规模数据处理有很高的参考价值。通过学习和实践这个案例,开发者可以提升自己在处理...
文本文件压缩是信息处理中的一个重要领域,特别是在大数据和存储有限的场景下,高效的数据压缩技术显得尤为关键。Huffman编码是一种基于频率的无损数据压缩方法,由David A. Huffman在1952年提出,它是数据结构课程...
- 对于非常大的文本文件,可以考虑分块读取,避免一次性加载所有数据导致内存压力过大。 - 如果数据有特定的排序需求,可以在写入新文件时保持原有顺序。 - 可以考虑使用多线程或异步处理来提高处理速度,但需要...
CSV文件的优势在于其简单性,它只包含文本数据,没有复杂的格式和样式,因此在导入和处理大数据时具有更好的性能,并且占用的内存相对较少。 Java作为一种强大的编程语言,提供了丰富的库和工具来处理CSV文件。例如...