`
greemranqq
  • 浏览: 975552 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

大数据去除重复--实战(二)

阅读更多

           关于上一篇数据去重复的问题,在结尾的时候提到,另一种思路:在url-->hashCode 根据范围写入文件的时候,不用迭代二分法,采用平均算法,也就是说根据url的大概行数,设置一个单位区间,循环遍历行的时候,根据hashCode 值,放入不同的空间,然后再放入内存去除重复,写入汇总文件。

          去个例子,我文件数据2G,1.5亿行,自己设定一个区间值1000W。 也就是说0~1000W、1000W~2000W 。。。为一个区间,每行都会通过计算放入不同的区间,这样在数据均匀的情况下,分布式很均匀的,如果某一个区间值过多,超过内存限制,继续切割,这样会大大减少第一种算法带来的文件数据的重复读取的效率。

          看代码,这里生产文件的代码参考前一篇文章。

          

        

package com.files;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class PartFile2 {
	// 内存监控
	final static Runtime currRuntime = Runtime.getRuntime ();
	// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
	final static long MEMERY_LIMIT = 1024 * 1024 * 3;
	// 内存限制,我内存最多容纳的文件大小
	static final long FILE_LIMIT_SIZE = 1024*1024*1;
	// 文件写入缓冲区 ,我默认
	static final int CACHE_SIZE = 1024;
	// 默认文件后缀
	static final String FILE_SUFFIX = ".txt";
	// 临时分割的文件目录,可以删除~。~
	static final String FILE_PREFIX = "test/";
	// 汇总的文件名
	static final String REQUST_FILE_NAME = "resultFile.txt";

	
	// 存放小文件的,驱除重复数据
	static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
	// 按hashCode 分割的单位
	static int HASH_UNIT = 1000*1000*10;
	// 收缩率,HASH_UNIT 在一定范围内,数据过大,用这个调节之后继续分割
	static final int shrinkage_factor = 1000;
	// 存放分割d
	static Map<File,BufferedWriter> map = new HashMap<File, BufferedWriter>();
	
	// 存放大文件 引用,以及分割位置
	static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
	
	static final String origFileName = "xxx.txt";

	public static void main(String[] args) {
		long begin = System.currentTimeMillis();
		new PartFile2().partFile(new File(origFileName));
		long result = System.currentTimeMillis()-begin;
		System.out.println("除去重复时间为:"+result +" 毫秒");
		// 除去重复时间为:931594 毫秒
	}
     
	
	// 按hashCode 范围分割
	public  void partFile(File origFile) {
		String line = null;
		BufferedReader reader = null;
		int hashCode;
		try {
			reader = new BufferedReader(new FileReader(origFile));
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode(); 
				File file = getFileByHashCode(hashCode);
				writeToFile(getFileWriterByHashCode(hashCode,file), line);
			}
			// 将集合对象可以内存排序的全部写入汇总文件,大于内存的继续分割
			Iterator<Entry<File, BufferedWriter>> it = map.entrySet().iterator();
			while(it.hasNext()){
				Entry<File, BufferedWriter> entry = it.next();
				File file = entry.getKey();
				BufferedWriter writer = entry.getValue();
				if(isSurpassFileSize(file)){
					// 这里本想用再次细化空间,进行分割,发现当大量重复的元素的时候会溢出
					// 极端的情况下,全是重复元素,即使以前的方法也不能执行,需要另外控制
					// HASH_UNIT = HASH_UNIT/shrinkage_factor;
					// partFile(file);
					
					// 如果超出,继续分割
					String[] number = file.getName().split("_");
					long maxNum = Long.parseLong(number[0])*HASH_UNIT;
					long minNum = Long.parseLong(number[1].substring(0, number[1].indexOf(FILE_SUFFIX)))*HASH_UNIT;
					partFile(file,maxNum,minNum);
				}else{
					writer.flush();
					orderAndWriteToFiles(file);
					// 关闭刷新流
					closeWriter(writer);
					file.delete();
				}
			}
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			shotDown();
		}
	}
	
	// 清除临时文件
	public void shotDown(){
		Iterator<File> it = map.keySet().iterator();
		while(it.hasNext()){
			File file = it.next();
			file.delete();
		}
	}
	
	// 按hashCode 范围分割
	public  void partFile(File origFile,long maxNum,long minNum) {
		String line = null;
		long hashCode = 0;
		long max_left_hashCode = 0;
		long min_left_hashCode = 0;
		long max_right_hashCode = 0;
		long min_right_hashCode = 0;
		BufferedWriter rightWriter = null;
		BufferedWriter leftWriter = null;
		BufferedReader reader = null;
		try {
			reader = new BufferedReader(new FileReader(origFile));
			long midNum = (maxNum+minNum)/2;
			// 以文件hashCode 范围作为子文件名
			File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
			File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
			
			leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
			rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
			
			ChildFile leftChild = new ChildFile(leftFile);
			ChildFile rightChild = new ChildFile(rightFile);
			
			// hashCode 的范围作为分割线
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode();
					if (hashCode > midNum) {
						if(max_right_hashCode < hashCode || max_right_hashCode == 0){
							max_right_hashCode = hashCode;
						}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
							min_right_hashCode = hashCode;
						}
						// 按行写入缓存
						writeToFile(rightWriter, line);
				}else {
						if(max_left_hashCode < hashCode || max_left_hashCode == 0){
							max_left_hashCode = hashCode;
						}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
							min_left_hashCode = hashCode;
						}
						writeToFile(leftWriter, line);
				}
			}
			// 保存子文件信息
			leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
			rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
			
			closeWriter(rightWriter);
			closeWriter(leftWriter);
			closeReader(reader);

			
			// 删除原始文件,保留最原始的文件
			
			if(!origFile.getName().equals(origFileName)){
				origFile.delete();
			}
			
			// 分析子文件信息,是否写入或者迭代
			analyseChildFile(rightChild, leftChild);

		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	// 分析子文件信息
	public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
		// 将分割后 还是大于内存的文件保存  继续分割
		File rightFile = rightChild.getChildFile();
		if(isSurpassFileSize(rightFile)){
			bigChildFiles.add(rightChild);
		}else if(rightFile.length()>0){
			orderAndWriteToFiles(rightFile);
		}
		
		File leftFile = leftChild.getChildFile();
		if(isSurpassFileSize(leftFile)){
			bigChildFiles.add(leftChild);
		}else if(leftFile.length()>0){
			orderAndWriteToFiles(leftFile);
		}
		
		// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
		if(bigChildFiles.size() > 0 ){
			ChildFile e  = bigChildFiles.get(bigChildFiles.size()-1);
			bigChildFiles.remove(e);
			// 迭代分割
			partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
		}
	}
	
	
	// 根据hashCode 值,计算存放的位置
	public File getFileByHashCode(int hashCode){
		int i = hashCode/HASH_UNIT;
		return new File(FILE_PREFIX+i+"_"+i+1+FILE_SUFFIX);
	}
	
	// 获得缓存流,这里缓存大小不能太大
	public BufferedWriter getFileWriterByHashCode(int hashCode,File file) throws IOException{
		BufferedWriter writer = null;
		writer = map.get(file);
		if(writer == null){
			writer = new BufferedWriter(new FileWriter(file,true),CACHE_SIZE);
			map.put(file, writer);
		}
		return writer;
	}
	

	// 将小文件读到内存排序除重复
	public  void orderAndWriteToFiles(File file){
		BufferedReader reader = null;
		String line = null;
		BufferedWriter totalWriter = null;
		StringBuilder sb = new StringBuilder(1000000);
		try {
			totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
			reader = new BufferedReader(new FileReader(file));
			while((line = reader.readLine()) != null){
				if(!fileLinesMap.containsKey(line)){
					fileLinesMap.put(line, null);
					//sb.append(line+"\r\n");
					totalWriter.write(line+"\r\n");
				}
			}
			//totalWriter.write(sb.toString());
			fileLinesMap.clear();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			closeWriter(totalWriter);
			file.delete();
		}
	}
	

	
	// 判断该文件是否超过 内存限制
	public  boolean isSurpassFileSize(File file){
		if(file.length() == 0){
			System.out.println(file.delete());;
			return false;
		}
		return FILE_LIMIT_SIZE <  file.length();
	}
	

	// 将数据写入文件
	public  void writeToFile(BufferedWriter writer, String writeInfo) {
		try {
			writer.write(writeInfo+"\r\n");
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
		}
	}

	// 关闭流
	public  void closeReader(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	// 关闭流
	public  void closeWriter(Writer writer) {
		if (writer != null) {
			try {
				writer.flush();
				writer.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	// 内部类,记录子文件信息
	class ChildFile{
		// 文件 和 内容 hash 分布
		File childFile;
		long maxHashCode;
		long minHashCode;
		
		public ChildFile(File childFile){
			this.childFile = childFile;
		}
		
		public ChildFile(File childFile, long maxHashCode, long minHashCode) {
			super();
			this.childFile = childFile;
			this.maxHashCode = maxHashCode;
			this.minHashCode = minHashCode;
		}
		
		public File getChildFile() {
			return childFile;
		}
		public void setChildFile(File childFile) {
			this.childFile = childFile;
		}
		public long getMaxHashCode() {
			return maxHashCode;
		}
		public void setMaxHashCode(long maxHashCode) {
			this.maxHashCode = maxHashCode;
		}
		public long getMinHashCode() {
			return minHashCode;
		}
		public void setMinHashCode(long minHashCode) {
			this.minHashCode = minHashCode;
		}
		
		public void setHashCode(long minHashCode,long maxHashCode){
			this.setMaxHashCode(maxHashCode);
			this.setMinHashCode(minHashCode);
		}
		
	}

}

 

在数据分布均匀的均匀,不会有大量重复数据的情况下,比如用上一次的2G 文件测试,能节约5分钟左右的时间,如果全是重复数据会溢出失败,不太稳定,需要对数据做一定量的分析。

 

 

 

小结:

           1.这是对实战一方法的一些改进,在一定程度上能提高速度。小数据(100m)和第一种速度相当,在数据hashCode分布均匀,速度较快。

           2.在处理这类大数据的问题上,主要是分离法,hash分离 成内存可以容纳的文件,然后分别处理,不同的要求有不同的处理办法,但是大致方法类似。

           3.有问题,请留言,共同探讨!

分享到:
评论

相关推荐

    史上最全的大数据面试题-大数据开发者必看.docx

    - 重复记录去除。 - **处理脏数据**:使用数据清洗工具,如Apache Beam或PySpark,根据业务需求定制清洗逻辑。 **3. 数据存储有哪些方式?请简要介绍每种方式的特点和适用场景。** - **关系型数据库**:适用于...

    大数据项目实战—招聘网站大数据职位分析

    3. **数据预处理**:抓取到的数据往往需要清洗,包括去除重复项、处理缺失值、标准化格式等。这一步可能用到Pandas等数据处理库。 4. **数据存储**:大数据项目通常会用到分布式文件系统HDFS(Hadoop Distributed ...

    大数据连接工具 waterdrop

    - **数据清洗**:提供多种数据清洗工具,如去除重复值、填充缺失值等。 - **数据转换**:支持数据格式转换、列选择、聚合等操作,满足数据预处理需求。 - **数据分析**:内置统计分析和机器学习模型,可用于数据...

    第17章 大数据平台运行与应用实战.docx

    在数据清洗阶段,Map 阶段对原始数据进行拆分和映射,Reduce 阶段则对映射后的数据进行聚合和整理,以去除重复、错误或无用的信息。 2. **Hive 基本语法**:Hive 是基于 Hadoop 的数据仓库工具,提供SQL-like 查询...

    《实战大数据》课本源代码

    数据清洗部分,源代码可能包含去除重复值、处理缺失值和异常值的函数。数据集成则可能涉及到不同数据源的数据融合,这需要理解MATLAB的数据导入导出功能。统计建模和预测分析部分,可能涵盖了线性回归、决策树、随机...

    大数据课程课件PDF...

    数据清洗是去除无效、重复或错误的数据;数据存储则根据数据类型选择合适的存储系统;数据分析利用统计学和机器学习算法发现数据规律;最后,通过图表等形式将结果展示出来,便于决策者理解。 四、大数据应用场景 ...

    大数据开发教程.docx

    - 去除重复数据。 - 填补缺失值。 - 纠正错误数据。 - 格式统一化。 - 异常值处理。 - **工具**: Pandas(Python库)、SQL等。 ##### 3.2 数据挖掘 - **定义**: 从大量数据中提取有用信息的过程。 - **技术**:...

    1_Level Ⅰ业务数据分析师 24.9G的学习资料 资料全面,包含大纲和学习计划表 百度网盘

    - **数据清洗**:包括去除重复值、填补缺失值、异常值处理等。 - **数据转换**:如数据类型转换、数据格式统一等。 #### 2.2 统计分析基础 - **描述性统计**:平均数、中位数、众数等基本统计量的计算及意义。 - **...

    大数据预处理PPT、讲稿、脚本等资源.zip

    2. **预处理流程**:介绍大数据预处理的基本步骤,如数据清洗(去除重复、错误、不完整数据)、数据集成(合并来自不同来源的数据)、数据转换(规范化、归一化、编码)和数据规约(降维、采样)。 3. **数据清洗**...

    51job大数据分析师岗位情况项目.rar

    在分析过程中,生成的txt文件可能包含了初步的数据清洗和预处理结果,如去除重复项、填充缺失值或进行数据类型转换等。而png图片文件则用于数据可视化,将复杂的数据信息转化为直观的图表,例如职位需求量的时间序列...

    大数据实验报告总结体会-大数据挖掘流程及方法总结.pdf

    选取目标数据集,然后进行数据清洗,去除噪声,处理缺失值,确保数据的质量和一致性。 3. **数据挖掘**:根据数据的特性和挖掘目标,选择合适的挖掘算法,如神经网络、遗传算法、决策树、粗集方法等,对预处理后的...

    Python大数据处理库 PySpark实战-源代码.rar

    本实战教程的源代码涵盖了以上各个知识点的具体实现,通过实际操作和调试,可以帮助开发者深入理解PySpark的工作原理,提升大数据处理能力。通过学习和实践这些源代码,你可以更好地掌握PySpark在大数据处理中的应用...

    专题资料(2021-2022年)大数据数据分析方法、数据处理流程实战案例.docx

    2. 数据清洗:数据清洗涉及去除重复数据、填充缺失值、修正错误和异常值,以确保数据质量。 3. 数据转化:数据可能需要转化为适合分析的格式,如标准化、归一化或编码,以便进行后续分析。 4. 数据分析:使用统计学...

    JavaEE大数据

    2. **数据清洗**:通过JavaEE开发的数据处理组件对收集到的数据进行预处理,包括去除重复数据、填充缺失值等。 3. **数据分析**:利用Hadoop MapReduce编写Java程序对清洗后的数据进行统计分析,如用户行为分析、...

    免费大数据资源.zip

    4. 数据清洗:数据清洗是预处理步骤,包括去除重复值、处理缺失值、解决格式不一致等问题,以提高数据质量。 5. 数据分析:这一阶段使用统计学和机器学习方法,如回归分析、聚类、分类、关联规则挖掘等,从数据中...

    大数据技术原理及应用课实验7 :Spark初级编程实践

    首先,将两个文件的内容合并为一个DataFrame或RDD,然后通过`reduceByKey(_ + _)`对键值对进行合并,最后用`distinct()`去除重复项。 2. 求平均值:这个任务需要计算多个文件中所有学生的平均成绩。首先,将所有...

    大数据ETL开发之图解Kettle工具(入门到精通)

    2. 数据清洗:对大数据进行预处理,去除重复、错误和不一致的数据。 3. 数据加载:高效地将大量数据加载到大数据存储系统,如Hadoop HDFS或HBase。 4. 实时数据处理:通过Kafka等实时数据流平台,实现大数据的实时...

    【项目实战】Python基于KMeans算法进行文本聚类项目实战

    在本项目实战中,我们将深入探讨如何利用Python和KMeans算法进行文本聚类。文本聚类是无监督学习的一种应用,旨在将相似的文本分组到一起,无需预先指定类别。这个项目涵盖了从数据获取、预处理到模型构建的全过程,...

    福建师范大学精品大数据导论课程系列 (8.7.1)--7.3.2 《Excel数据可视化方法与应用》.rar

    1. 数据清洗:在进行数据可视化前,首先要对原始数据进行清洗,去除错误、重复或无关的数据,确保分析结果的准确性。Excel的“查找和替换”、“删除重复项”等功能在此过程中起着关键作用。 2. 数据整理:数据应以...

    大数据离线分析项目(Hadoop) (3).docx

    【大数据离线分析项目(Hadoop)】是一个针对互联网行业的数据处理解决方案,主要涉及Hadoop生态系统中的多个组件,如Hadoop、Hive、Flume、Kafka、Shell等,旨在从海量用户行为数据中提取有价值的信息,例如页面浏览...

Global site tag (gtag.js) - Google Analytics