关于上一篇数据去重复的问题,在结尾的时候提到,另一种思路:在url-->hashCode 根据范围写入文件的时候,不用迭代二分法,采用平均算法,也就是说根据url的大概行数,设置一个单位区间,循环遍历行的时候,根据hashCode 值,放入不同的空间,然后再放入内存去除重复,写入汇总文件。
去个例子,我文件数据2G,1.5亿行,自己设定一个区间值1000W。 也就是说0~1000W、1000W~2000W 。。。为一个区间,每行都会通过计算放入不同的区间,这样在数据均匀的情况下,分布式很均匀的,如果某一个区间值过多,超过内存限制,继续切割,这样会大大减少第一种算法带来的文件数据的重复读取的效率。
看代码,这里生产文件的代码参考前一篇文章。
package com.files; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; public class PartFile2 { // 内存监控 final static Runtime currRuntime = Runtime.getRuntime (); // 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用 final static long MEMERY_LIMIT = 1024 * 1024 * 3; // 内存限制,我内存最多容纳的文件大小 static final long FILE_LIMIT_SIZE = 1024*1024*1; // 文件写入缓冲区 ,我默认 static final int CACHE_SIZE = 1024; // 默认文件后缀 static final String FILE_SUFFIX = ".txt"; // 临时分割的文件目录,可以删除~。~ static final String FILE_PREFIX = "test/"; // 汇总的文件名 static final String REQUST_FILE_NAME = "resultFile.txt"; // 存放小文件的,驱除重复数据 static Map<String,String> fileLinesMap = new HashMap<String,String>(10000); // 按hashCode 分割的单位 static int HASH_UNIT = 1000*1000*10; // 收缩率,HASH_UNIT 在一定范围内,数据过大,用这个调节之后继续分割 static final int shrinkage_factor = 1000; // 存放分割d static Map<File,BufferedWriter> map = new HashMap<File, BufferedWriter>(); // 存放大文件 引用,以及分割位置 static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>(); static final String origFileName = "xxx.txt"; public static void main(String[] args) { long begin = System.currentTimeMillis(); new PartFile2().partFile(new File(origFileName)); long result = System.currentTimeMillis()-begin; System.out.println("除去重复时间为:"+result +" 毫秒"); // 除去重复时间为:931594 毫秒 } // 按hashCode 范围分割 public void partFile(File origFile) { String line = null; BufferedReader reader = null; int hashCode; try { reader = new BufferedReader(new FileReader(origFile)); while ((line = reader.readLine()) != null) { hashCode = line.hashCode(); File file = getFileByHashCode(hashCode); writeToFile(getFileWriterByHashCode(hashCode,file), line); } // 将集合对象可以内存排序的全部写入汇总文件,大于内存的继续分割 Iterator<Entry<File, BufferedWriter>> it = map.entrySet().iterator(); while(it.hasNext()){ Entry<File, BufferedWriter> entry = it.next(); File file = entry.getKey(); BufferedWriter writer = entry.getValue(); if(isSurpassFileSize(file)){ // 这里本想用再次细化空间,进行分割,发现当大量重复的元素的时候会溢出 // 极端的情况下,全是重复元素,即使以前的方法也不能执行,需要另外控制 // HASH_UNIT = HASH_UNIT/shrinkage_factor; // partFile(file); // 如果超出,继续分割 String[] number = file.getName().split("_"); long maxNum = Long.parseLong(number[0])*HASH_UNIT; long minNum = Long.parseLong(number[1].substring(0, number[1].indexOf(FILE_SUFFIX)))*HASH_UNIT; partFile(file,maxNum,minNum); }else{ writer.flush(); orderAndWriteToFiles(file); // 关闭刷新流 closeWriter(writer); file.delete(); } } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ closeReader(reader); shotDown(); } } // 清除临时文件 public void shotDown(){ Iterator<File> it = map.keySet().iterator(); while(it.hasNext()){ File file = it.next(); file.delete(); } } // 按hashCode 范围分割 public void partFile(File origFile,long maxNum,long minNum) { String line = null; long hashCode = 0; long max_left_hashCode = 0; long min_left_hashCode = 0; long max_right_hashCode = 0; long min_right_hashCode = 0; BufferedWriter rightWriter = null; BufferedWriter leftWriter = null; BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(origFile)); long midNum = (maxNum+minNum)/2; // 以文件hashCode 范围作为子文件名 File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX); File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX); leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE); rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE); ChildFile leftChild = new ChildFile(leftFile); ChildFile rightChild = new ChildFile(rightFile); // hashCode 的范围作为分割线 while ((line = reader.readLine()) != null) { hashCode = line.hashCode(); if (hashCode > midNum) { if(max_right_hashCode < hashCode || max_right_hashCode == 0){ max_right_hashCode = hashCode; }else if(min_right_hashCode > hashCode || min_right_hashCode == 0){ min_right_hashCode = hashCode; } // 按行写入缓存 writeToFile(rightWriter, line); }else { if(max_left_hashCode < hashCode || max_left_hashCode == 0){ max_left_hashCode = hashCode; }else if(min_left_hashCode > hashCode || min_left_hashCode == 0){ min_left_hashCode = hashCode; } writeToFile(leftWriter, line); } } // 保存子文件信息 leftChild.setHashCode(min_left_hashCode, max_left_hashCode); rightChild.setHashCode(min_right_hashCode, max_right_hashCode); closeWriter(rightWriter); closeWriter(leftWriter); closeReader(reader); // 删除原始文件,保留最原始的文件 if(!origFile.getName().equals(origFileName)){ origFile.delete(); } // 分析子文件信息,是否写入或者迭代 analyseChildFile(rightChild, leftChild); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } // 分析子文件信息 public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){ // 将分割后 还是大于内存的文件保存 继续分割 File rightFile = rightChild.getChildFile(); if(isSurpassFileSize(rightFile)){ bigChildFiles.add(rightChild); }else if(rightFile.length()>0){ orderAndWriteToFiles(rightFile); } File leftFile = leftChild.getChildFile(); if(isSurpassFileSize(leftFile)){ bigChildFiles.add(leftChild); }else if(leftFile.length()>0){ orderAndWriteToFiles(leftFile); } // 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出 if(bigChildFiles.size() > 0 ){ ChildFile e = bigChildFiles.get(bigChildFiles.size()-1); bigChildFiles.remove(e); // 迭代分割 partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode()); } } // 根据hashCode 值,计算存放的位置 public File getFileByHashCode(int hashCode){ int i = hashCode/HASH_UNIT; return new File(FILE_PREFIX+i+"_"+i+1+FILE_SUFFIX); } // 获得缓存流,这里缓存大小不能太大 public BufferedWriter getFileWriterByHashCode(int hashCode,File file) throws IOException{ BufferedWriter writer = null; writer = map.get(file); if(writer == null){ writer = new BufferedWriter(new FileWriter(file,true),CACHE_SIZE); map.put(file, writer); } return writer; } // 将小文件读到内存排序除重复 public void orderAndWriteToFiles(File file){ BufferedReader reader = null; String line = null; BufferedWriter totalWriter = null; StringBuilder sb = new StringBuilder(1000000); try { totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE); reader = new BufferedReader(new FileReader(file)); while((line = reader.readLine()) != null){ if(!fileLinesMap.containsKey(line)){ fileLinesMap.put(line, null); //sb.append(line+"\r\n"); totalWriter.write(line+"\r\n"); } } //totalWriter.write(sb.toString()); fileLinesMap.clear(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ closeReader(reader); closeWriter(totalWriter); file.delete(); } } // 判断该文件是否超过 内存限制 public boolean isSurpassFileSize(File file){ if(file.length() == 0){ System.out.println(file.delete());; return false; } return FILE_LIMIT_SIZE < file.length(); } // 将数据写入文件 public void writeToFile(BufferedWriter writer, String writeInfo) { try { writer.write(writeInfo+"\r\n"); } catch (IOException e) { e.printStackTrace(); }finally{ } } // 关闭流 public void closeReader(Reader reader) { if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } // 关闭流 public void closeWriter(Writer writer) { if (writer != null) { try { writer.flush(); writer.close(); } catch (IOException e) { e.printStackTrace(); } } } // 内部类,记录子文件信息 class ChildFile{ // 文件 和 内容 hash 分布 File childFile; long maxHashCode; long minHashCode; public ChildFile(File childFile){ this.childFile = childFile; } public ChildFile(File childFile, long maxHashCode, long minHashCode) { super(); this.childFile = childFile; this.maxHashCode = maxHashCode; this.minHashCode = minHashCode; } public File getChildFile() { return childFile; } public void setChildFile(File childFile) { this.childFile = childFile; } public long getMaxHashCode() { return maxHashCode; } public void setMaxHashCode(long maxHashCode) { this.maxHashCode = maxHashCode; } public long getMinHashCode() { return minHashCode; } public void setMinHashCode(long minHashCode) { this.minHashCode = minHashCode; } public void setHashCode(long minHashCode,long maxHashCode){ this.setMaxHashCode(maxHashCode); this.setMinHashCode(minHashCode); } } }
在数据分布均匀的均匀,不会有大量重复数据的情况下,比如用上一次的2G 文件测试,能节约5分钟左右的时间,如果全是重复数据会溢出失败,不太稳定,需要对数据做一定量的分析。
小结:
1.这是对实战一方法的一些改进,在一定程度上能提高速度。小数据(100m)和第一种速度相当,在数据hashCode分布均匀,速度较快。
2.在处理这类大数据的问题上,主要是分离法,hash分离 成内存可以容纳的文件,然后分别处理,不同的要求有不同的处理办法,但是大致方法类似。
3.有问题,请留言,共同探讨!
相关推荐
- 重复记录去除。 - **处理脏数据**:使用数据清洗工具,如Apache Beam或PySpark,根据业务需求定制清洗逻辑。 **3. 数据存储有哪些方式?请简要介绍每种方式的特点和适用场景。** - **关系型数据库**:适用于...
3. **数据预处理**:抓取到的数据往往需要清洗,包括去除重复项、处理缺失值、标准化格式等。这一步可能用到Pandas等数据处理库。 4. **数据存储**:大数据项目通常会用到分布式文件系统HDFS(Hadoop Distributed ...
- **数据清洗**:提供多种数据清洗工具,如去除重复值、填充缺失值等。 - **数据转换**:支持数据格式转换、列选择、聚合等操作,满足数据预处理需求。 - **数据分析**:内置统计分析和机器学习模型,可用于数据...
在数据清洗阶段,Map 阶段对原始数据进行拆分和映射,Reduce 阶段则对映射后的数据进行聚合和整理,以去除重复、错误或无用的信息。 2. **Hive 基本语法**:Hive 是基于 Hadoop 的数据仓库工具,提供SQL-like 查询...
数据清洗部分,源代码可能包含去除重复值、处理缺失值和异常值的函数。数据集成则可能涉及到不同数据源的数据融合,这需要理解MATLAB的数据导入导出功能。统计建模和预测分析部分,可能涵盖了线性回归、决策树、随机...
数据清洗是去除无效、重复或错误的数据;数据存储则根据数据类型选择合适的存储系统;数据分析利用统计学和机器学习算法发现数据规律;最后,通过图表等形式将结果展示出来,便于决策者理解。 四、大数据应用场景 ...
- 去除重复数据。 - 填补缺失值。 - 纠正错误数据。 - 格式统一化。 - 异常值处理。 - **工具**: Pandas(Python库)、SQL等。 ##### 3.2 数据挖掘 - **定义**: 从大量数据中提取有用信息的过程。 - **技术**:...
- **数据清洗**:包括去除重复值、填补缺失值、异常值处理等。 - **数据转换**:如数据类型转换、数据格式统一等。 #### 2.2 统计分析基础 - **描述性统计**:平均数、中位数、众数等基本统计量的计算及意义。 - **...
2. **预处理流程**:介绍大数据预处理的基本步骤,如数据清洗(去除重复、错误、不完整数据)、数据集成(合并来自不同来源的数据)、数据转换(规范化、归一化、编码)和数据规约(降维、采样)。 3. **数据清洗**...
在分析过程中,生成的txt文件可能包含了初步的数据清洗和预处理结果,如去除重复项、填充缺失值或进行数据类型转换等。而png图片文件则用于数据可视化,将复杂的数据信息转化为直观的图表,例如职位需求量的时间序列...
选取目标数据集,然后进行数据清洗,去除噪声,处理缺失值,确保数据的质量和一致性。 3. **数据挖掘**:根据数据的特性和挖掘目标,选择合适的挖掘算法,如神经网络、遗传算法、决策树、粗集方法等,对预处理后的...
本实战教程的源代码涵盖了以上各个知识点的具体实现,通过实际操作和调试,可以帮助开发者深入理解PySpark的工作原理,提升大数据处理能力。通过学习和实践这些源代码,你可以更好地掌握PySpark在大数据处理中的应用...
2. 数据清洗:数据清洗涉及去除重复数据、填充缺失值、修正错误和异常值,以确保数据质量。 3. 数据转化:数据可能需要转化为适合分析的格式,如标准化、归一化或编码,以便进行后续分析。 4. 数据分析:使用统计学...
2. **数据清洗**:通过JavaEE开发的数据处理组件对收集到的数据进行预处理,包括去除重复数据、填充缺失值等。 3. **数据分析**:利用Hadoop MapReduce编写Java程序对清洗后的数据进行统计分析,如用户行为分析、...
4. 数据清洗:数据清洗是预处理步骤,包括去除重复值、处理缺失值、解决格式不一致等问题,以提高数据质量。 5. 数据分析:这一阶段使用统计学和机器学习方法,如回归分析、聚类、分类、关联规则挖掘等,从数据中...
首先,将两个文件的内容合并为一个DataFrame或RDD,然后通过`reduceByKey(_ + _)`对键值对进行合并,最后用`distinct()`去除重复项。 2. 求平均值:这个任务需要计算多个文件中所有学生的平均成绩。首先,将所有...
2. 数据清洗:对大数据进行预处理,去除重复、错误和不一致的数据。 3. 数据加载:高效地将大量数据加载到大数据存储系统,如Hadoop HDFS或HBase。 4. 实时数据处理:通过Kafka等实时数据流平台,实现大数据的实时...
在本项目实战中,我们将深入探讨如何利用Python和KMeans算法进行文本聚类。文本聚类是无监督学习的一种应用,旨在将相似的文本分组到一起,无需预先指定类别。这个项目涵盖了从数据获取、预处理到模型构建的全过程,...
1. 数据清洗:在进行数据可视化前,首先要对原始数据进行清洗,去除错误、重复或无关的数据,确保分析结果的准确性。Excel的“查找和替换”、“删除重复项”等功能在此过程中起着关键作用。 2. 数据整理:数据应以...
【大数据离线分析项目(Hadoop)】是一个针对互联网行业的数据处理解决方案,主要涉及Hadoop生态系统中的多个组件,如Hadoop、Hive、Flume、Kafka、Shell等,旨在从海量用户行为数据中提取有价值的信息,例如页面浏览...