`
greemranqq
  • 浏览: 977633 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

大数据去除重复--实战(一)

阅读更多

          最近快过年了,来了一个紧急任务,加班加点的一周,终于上线了。也没多少时间去研究出去重复数据的算法,上一篇文章的算法,理论是可以的!但是由于我采用的行迭代的方式,JVM 会出现栈的深度溢出,我就换了一种方式,这里再次介绍给大家:

          回顾一下题目:超过内存限制的URL,去除重复数据!

   

          我的方法是根据hashCode 范围进行分组。比如文件A,假设有1亿行,A作为原始文件,然后循环读取每一行,根据hashCode 值分成两部分文件。初始我以int 的最大值和最小值为边界,0 作为分界线开始分,hashCode 大于0的放右边A-right,小于0的放一边,A-left

         假设第一次分割成 A-right , A-left 两个 文件,范围分别是2^31-0  和 0-2^31,然后记录两个文件hashCode 的范围,作为下一次分割条件,然后继续判断A-right 和A-left 文件是否是内存范围,迭代此方法,直到分割成内存能够读取的文件A-N,然后读入内存进行除重复,追加进汇总文件。

         这里临时画个图解释(图有点丑,谅解~。~):

       

 

这里我先临时写了一个生成文件的类,数据格式:www.+随即数字+.com 的形式

package com.file;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.Random;

/**
 * 生成内存不能一次读取的文件
 * @author Ran
 *
 */
public class InitFiles {
	
	private final static String PREFIX = "www.";
	private final static String SUFFIX = ".com";
	
	private final static Random RANDOM = new Random();
	// 每一次写入的行数,方便监控
	private final static int STRING_LINE_NUM = 1000*100*4;
	
	// 随即生成的数字 ,假设是网站地址
	public static final int RANDOM_NUM = Integer.MAX_VALUE;
	
	// 缓冲区大小
	static final int CACHE_SIZE = 1024*1024;
	
	// 生成文件的大小,这里就2G吧
	static final long FILE_MAX_LENGTH = 1024*1024*1024*2l;
	// 记录总行数
	static long file_lines = 0;
	public static void main(String[] args)   {
           // 看看运行状态
	    printMem();
	    long begin = System.currentTimeMillis();
	    writeFile("bigDate.txt");
	    long result = System.currentTimeMillis()-begin;
	    System.out.println("总行数"+file_lines);
	    System.out.println("生成文件总时间:"+result+"毫秒");
	}
	
	
	/**
	 * 循环写入文件,这里临时写死,生成的大概是1G左右的文件
	 */
	public static void writeFile(String fileName){
		Writer writer = null;
		File file = null;
		// 停止标志
		boolean isWrite = true;
		try {
			file = new File(fileName);
			writer = new BufferedWriter(new FileWriter(file),CACHE_SIZE);
			while(isWrite){
				writer.write(initStr());
				if(file.length() > FILE_MAX_LENGTH){
					isWrite = false;
				}
			}
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			try {
				if(writer != null){
					writer.flush();
					writer.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 生成字符串,大量字符串 写入,会快一些
	 * @return
	 */
	public static String initStr(){
		StringBuilder sb = new StringBuilder(1000000);
		int i = 0;
		// 默认10W行 一次写入
		while(i<STRING_LINE_NUM){
			sb.append(PREFIX);
			sb.append(getContext());
			sb.append(SUFFIX);
			sb.append("\r\n");
			i++;
		}
		file_lines += i;
		return sb.toString();
	}
	
	/**
	 * 生成随机数,作为网站中间地址
	 * 参数 自己调节,测试方便
	 * @return
	 */
	public static String getContext(){
		String context = String.valueOf(RANDOM.nextInt(RANDOM_NUM));
		return context;
	}

       /**
	 * 每次写入时 信息
	 */
	public static void printMem(){
		print("已用内存:"+currRuntime.totalMemory()/1024/1024+" MB");
		print("最大内存:"+currRuntime.maxMemory()/1024/1024+" MB");
		print("可用内存:"+currRuntime.freeMemory()/1024/1024+" MB");
	}

       /**
	 * 方便我测试 是否打印,影响速度
	 * @param str
	 */
	public static void print(String str){
		System.out.println(str);
	}
}

 

生成2G 文件,我这里花了:

总行数:136800000 行

生成文件总时间:75812毫秒

 

下面是除去重复的类代码:

package com.files;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PartFile {
	// 内存监控
	final static Runtime currRuntime = Runtime.getRuntime ();
	// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
	final static long MEMERY_LIMIT = 1024 * 1024 * 3;
	// 内存限制,我内存最多容纳的文件大小
	static final long FILE_LIMIT_SIZE = 1024*1024*20;
	// 文件写入缓冲区 ,我默认1M
	static final int CACHE_SIZE = 1024*1024;
	// 默认文件后缀
	static final String FILE_SUFFIX = ".txt";
	// 临时分割的文件目录,可以删除~。~
	static final String FILE_PREFIX = "test/";
	// 汇总的文件名
	static final String REQUST_FILE_NAME = "resultFile.txt";
	
	// 存放大文件 引用,以及分割位置
	static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
	
	// 存放小文件的,驱除重复数据
	static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
	

	public static void main(String[] args) {
	long begin = System.currentTimeMillis();
	new PartFile().partFile(new File("bigData.txt"),
                   Integer.MAX_VALUE,Integer.MIN_VALUE);
	long result = System.currentTimeMillis()-begin;
	System.out.println("除去重复时间为:"+result +" 毫秒");
	}
     
	
	// 按hashCode 范围分割
	public  void partFile(File origFile,long maxNum,long minNum) {
		String line = null;
		long hashCode = 0;
		long max_left_hashCode = 0;
		long min_left_hashCode = 0;
		long max_right_hashCode = 0;
		long min_right_hashCode = 0;
		BufferedWriter rightWriter = null;
		BufferedWriter leftWriter = null;
		BufferedReader reader = null;
		try {
		reader = new BufferedReader(new FileReader(origFile));
		long midNum = (maxNum+minNum)/2;
		// 以文件hashCode 范围作为子文件名
		File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
		File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
			
		leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
		rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
			
		ChildFile leftChild = new ChildFile(leftFile);
		ChildFile rightChild = new ChildFile(rightFile);
			
		// 字符串 组合写入也行
		// StringBuilder leftStr = new StringBuilder(100000);
		// StringBuilder rightStr = new StringBuilder(100000);
		// hashCode 的范围作为分割线
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode();
					if (hashCode > midNum) {
						if(max_right_hashCode < hashCode || max_right_hashCode == 0){
							max_right_hashCode = hashCode;
						}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
							min_right_hashCode = hashCode;
						}
						// 按行写入缓存
						writeToFile(rightWriter, line);
				}else {
						if(max_left_hashCode < hashCode || max_left_hashCode == 0){
							max_left_hashCode = hashCode;
						}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
							min_left_hashCode = hashCode;
						}
						writeToFile(leftWriter, line);
				}
			}
			// 保存子文件信息
			leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
			rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
			
			closeWriter(rightWriter);
			closeWriter(leftWriter);
			closeReader(reader);
			
			
			// 删除原始文件,保留最原始的文件
			
			if(!origFile.getName().equals("bigData.txt")){
				origFile.delete();
			}
			
			// 分析子文件信息,是否写入或者迭代
			analyseChildFile(rightChild, leftChild);

		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	// 分析子文件信息
	public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
		// 将分割后 还是大于内存的文件保存  继续分割
		File rightFile = rightChild.getChildFile();
		if(isSurpassFileSize(rightFile)){
			bigChildFiles.add(rightChild);
		}else if(rightFile.length()>0){
			orderAndWriteToFiles(rightFile);
		}
		
		File leftFile = leftChild.getChildFile();
		if(isSurpassFileSize(leftFile)){
			bigChildFiles.add(leftChild);
		}else if(leftFile.length()>0){
			orderAndWriteToFiles(leftFile);
		}
		
		// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
		if(bigChildFiles.size() > 0 ){
			ChildFile e  = bigChildFiles.get(bigChildFiles.size()-1);
			bigChildFiles.remove(e);
			// 迭代分割
			partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
		}
	}
	

	// 将小文件读到内存排序除重复
	public  void orderAndWriteToFiles(File file){
		BufferedReader reader = null;
		String line = null;
		BufferedWriter totalWriter = null;
		StringBuilder sb = new StringBuilder(1000000);
		try {
			totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
			reader = new BufferedReader(new FileReader(file));
			while((line = reader.readLine()) != null){
				if(!fileLinesMap.containsKey(line)){
					fileLinesMap.put(line, null);
					sb.append(line+"\r\n");
					//totalWriter.write(line+"\r\n");
				}
			}
			totalWriter.write(sb.toString());
			fileLinesMap.clear();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			closeWriter(totalWriter);
			// 删除子文件
			file.delete();
		}
	}
	

	
	// 判断该文件是否超过 内存限制
	public  boolean isSurpassFileSize(File file){
		return FILE_LIMIT_SIZE <  file.length();
	}
	

	// 将数据写入文件
	public  void writeToFile(BufferedWriter writer, String writeInfo) {
		try {
			writer.write(writeInfo+"\r\n");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	// 关闭流
	public  void closeReader(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	// 关闭流
	public  void closeWriter(Writer writer) {
		if (writer != null) {
			try {
				writer.flush();
				writer.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	// 内部类,记录子文件信息
	class ChildFile{
		// 文件 和 内容 hash 分布
		File childFile;
		long maxHashCode;
		long minHashCode;
		
		public ChildFile(File childFile){
			this.childFile = childFile;
		}
		
		public ChildFile(File childFile, long maxHashCode, long minHashCode) {
			super();
			this.childFile = childFile;
			this.maxHashCode = maxHashCode;
			this.minHashCode = minHashCode;
		}
		
		public File getChildFile() {
			return childFile;
		}
		public void setChildFile(File childFile) {
			this.childFile = childFile;
		}
		public long getMaxHashCode() {
			return maxHashCode;
		}
		public void setMaxHashCode(long maxHashCode) {
			this.maxHashCode = maxHashCode;
		}
		public long getMinHashCode() {
			return minHashCode;
		}
		public void setMinHashCode(long minHashCode) {
			this.minHashCode = minHashCode;
		}
		
		public void setHashCode(long minHashCode,long maxHashCode){
			this.setMaxHashCode(maxHashCode);
			this.setMinHashCode(minHashCode);
		}
		
	}
}

 

 

除去重复时间为:1228984 毫秒,20 多分钟

我的老电脑,总内存小,JVM 才几十M,大一些应该会快。 

 

方法分析:

       1.采用hashCode 范围迭代分割的方式,可以分割成内存可以容纳的小文件,然后完成功能

       2.我们发现每次迭代,相当于重复读取里面的文件,然后再进行分割,这样浪费了很多时间,那么有没有更好的方式呢? 我们可以这样设计,假设我们知道文件的总大小,已经大概的行数,比如2G,1亿行,我们一开始就分配区间,在分配完全均匀的情况下,1亿行数据,最多占用1亿个空间,那么可以这样分配,用hashCode 的范围,也就是Integer的最大值和最小值进行模拟分配。分配范围 根据内存进行,比如:读取第一行的的hashCode 值为100,那么,我们可以分配到1-1000000,(这里以100W 为单位),也就是说只要hashCode 范围在这个区间的都分配到这里,同理,读到任何一个hashCode 值,除以单位(100W),就能找到你的区间,比如hasCode 是 2345678,那么就是200W-300W的区间。这里有些区间可能空的,就删除了,有些区间很多,就用上面的迭代,空间足够 就直接写入汇总文件。当然区间单位的颗粒度划分,根据内存和数据总量 自己弄,这样下来就会一次读取 ,就能尽量的分配均匀,就不会大量迭代读取浪费时间了。

 

       3.我们发现分割的时候是直接写入,没有进行任何排序或者其他操作,如果我们在分割的时候保存一部分到集合内存,发现有重复的,就不写入分割文件了,如果写入量超过集合限制了,就清空集合,这样能保证单个小文件 一次达到除重复的效果,大文件部分除重复,减少子文件的数据量,如果重复数据较多,可以采用,极端的情况下完全不重复,那么集合会浪费你的空间,并且除重复的时候会浪费你的时间,这里自己对数据进行整体考虑。

 

       4.这里内存的控制是我测试进行控制,用JDK 的方法进行内存监控,因为涉及到回收频率 以及时间上的问题,我没有动态的对集合进行监控,占了多少内存,还剩多少内存等等,可以尝试。

 

 

小结:

        1.首先很感谢Jason.Hao     的的帮助,主动去实践,才会发现一些理论和实践的差距,更加体会深刻。

        2.在程序设计,以及一些异常控制上没多做处理,仅仅测试一下理论功能。

        3.这是根据hashCode 范围分离的方法,如果有其他方法或者建议,欢迎大家共同实践哦

 

        最后我的新书spring 到了,兴奋啊。关于《spring in action 》 我大概看了一下,主要是介绍spring 的一些使用技巧,和一些常用功能,也就是说教你如何使用,使用方便。

        关于《spring 核心技术内幕》 里面主要对spring 的一些架构设计进行分析,包括bean 加载, Ioc,AOP等,也就是说能加深你对源码的一些理解,稍微深一点。记得还有一本关于 精通AOP的书名字忘记了,有好的,欢迎分享,推荐哦。

        

  • 大小: 6.8 KB
分享到:
评论
1 楼 woommy 2014-01-26  
很好,有始有终。这个方法的核心原理是,hashcode不同的字符串一定不是相同的字符串,所以经过hashcode分开后的文件之间不存在重复的字符串,然后每个文件又可以读入内存,处理起来就容易了。但是如果对性能有要求还有优化的空间。

相关推荐

    史上最全的大数据面试题-大数据开发者必看.docx

    - 重复记录去除。 - **处理脏数据**:使用数据清洗工具,如Apache Beam或PySpark,根据业务需求定制清洗逻辑。 **3. 数据存储有哪些方式?请简要介绍每种方式的特点和适用场景。** - **关系型数据库**:适用于...

    大数据项目实战—招聘网站大数据职位分析

    在大数据项目实战中,"招聘网站大数据职位分析"是一个典型的案例,它涵盖了多个重要的IT知识点。这个项目旨在通过收集、处理、分析招聘网站上的职位信息,挖掘出行业趋势、热门技能以及人才需求等有价值的数据。 1....

    大数据连接工具 waterdrop

    - **数据清洗**:提供多种数据清洗工具,如去除重复值、填充缺失值等。 - **数据转换**:支持数据格式转换、列选择、聚合等操作,满足数据预处理需求。 - **数据分析**:内置统计分析和机器学习模型,可用于数据...

    第17章 大数据平台运行与应用实战.docx

    在数据清洗阶段,Map 阶段对原始数据进行拆分和映射,Reduce 阶段则对映射后的数据进行聚合和整理,以去除重复、错误或无用的信息。 2. **Hive 基本语法**:Hive 是基于 Hadoop 的数据仓库工具,提供SQL-like 查询...

    Python大数据处理库 PySpark实战-源代码.rar

    本实战教程的源代码涵盖了以上各个知识点的具体实现,通过实际操作和调试,可以帮助开发者深入理解PySpark的工作原理,提升大数据处理能力。通过学习和实践这些源代码,你可以更好地掌握PySpark在大数据处理中的应用...

    《实战大数据》课本源代码

    《实战大数据》这本书是针对大数据处理的一本实践性教材,主要使用MATLAB作为工具进行学习和探索。MATLAB是一款强大的数学计算和数据分析软件,尤其在数值分析、图像处理、信号处理以及机器学习等领域有着广泛的应用...

    大数据开发教程.docx

    - 去除重复数据。 - 填补缺失值。 - 纠正错误数据。 - 格式统一化。 - 异常值处理。 - **工具**: Pandas(Python库)、SQL等。 ##### 3.2 数据挖掘 - **定义**: 从大量数据中提取有用信息的过程。 - **技术**:...

    大数据课程课件PDF...

    数据清洗是去除无效、重复或错误的数据;数据存储则根据数据类型选择合适的存储系统;数据分析利用统计学和机器学习算法发现数据规律;最后,通过图表等形式将结果展示出来,便于决策者理解。 四、大数据应用场景 ...

    51job大数据分析师岗位情况项目.rar

    总的来说,51job大数据分析师岗位情况项目为我们提供了一个全面了解大数据分析师市场需求的窗口,通过Python的实战应用,揭示了这个岗位的动态变化和潜在机会。对于有意从事或正在从事大数据分析工作的专业人士,该...

    大数据实验报告总结体会-大数据挖掘流程及方法总结.pdf

    选取目标数据集,然后进行数据清洗,去除噪声,处理缺失值,确保数据的质量和一致性。 3. **数据挖掘**:根据数据的特性和挖掘目标,选择合适的挖掘算法,如神经网络、遗传算法、决策树、粗集方法等,对预处理后的...

    1_Level Ⅰ业务数据分析师 24.9G的学习资料 资料全面,包含大纲和学习计划表 百度网盘

    - **数据清洗**:包括去除重复值、填补缺失值、异常值处理等。 - **数据转换**:如数据类型转换、数据格式统一等。 #### 2.2 统计分析基础 - **描述性统计**:平均数、中位数、众数等基本统计量的计算及意义。 - **...

    JavaEE大数据

    2. **数据清洗**:通过JavaEE开发的数据处理组件对收集到的数据进行预处理,包括去除重复数据、填充缺失值等。 3. **数据分析**:利用Hadoop MapReduce编写Java程序对清洗后的数据进行统计分析,如用户行为分析、...

    大数据预处理PPT、讲稿、脚本等资源.zip

    2. **预处理流程**:介绍大数据预处理的基本步骤,如数据清洗(去除重复、错误、不完整数据)、数据集成(合并来自不同来源的数据)、数据转换(规范化、归一化、编码)和数据规约(降维、采样)。 3. **数据清洗**...

    免费大数据资源.zip

    4. 数据清洗:数据清洗是预处理步骤,包括去除重复值、处理缺失值、解决格式不一致等问题,以提高数据质量。 5. 数据分析:这一阶段使用统计学和机器学习方法,如回归分析、聚类、分类、关联规则挖掘等,从数据中...

    大数据技术原理及应用课实验7 :Spark初级编程实践

    首先,将两个文件的内容合并为一个DataFrame或RDD,然后通过`reduceByKey(_ + _)`对键值对进行合并,最后用`distinct()`去除重复项。 2. 求平均值:这个任务需要计算多个文件中所有学生的平均成绩。首先,将所有...

    专题资料(2021-2022年)大数据数据分析方法、数据处理流程实战案例.docx

    2. 数据清洗:数据清洗涉及去除重复数据、填充缺失值、修正错误和异常值,以确保数据质量。 3. 数据转化:数据可能需要转化为适合分析的格式,如标准化、归一化或编码,以便进行后续分析。 4. 数据分析:使用统计学...

    【项目实战】Python基于KMeans算法进行文本聚类项目实战

    在本项目实战中,我们将深入探讨如何利用Python和KMeans算法进行文本聚类。文本聚类是无监督学习的一种应用,旨在将相似的文本分组到一起,无需预先指定类别。这个项目涵盖了从数据获取、预处理到模型构建的全过程,...

    大数据ETL开发之图解Kettle工具(入门到精通)

    2. 数据清洗:对大数据进行预处理,去除重复、错误和不一致的数据。 3. 数据加载:高效地将大量数据加载到大数据存储系统,如Hadoop HDFS或HBase。 4. 实时数据处理:通过Kafka等实时数据流平台,实现大数据的实时...

    福建师范大学精品大数据导论课程系列 (8.7.1)--7.3.2 《Excel数据可视化方法与应用》.rar

    《Excel数据可视化方法与应用》是福建师范大学精品大数据导论课程系列的一部分,专注于利用Excel工具进行数据可视化。在大数据时代,数据可视化已经成为理解和解析复杂数据的关键技能,而Excel作为广泛使用的数据...

    大数据离线分析项目(Hadoop) (3).docx

    【大数据离线分析项目(Hadoop)】是一个针对互联网行业的数据处理解决方案,主要涉及Hadoop生态系统中的多个组件,如Hadoop、Hive、Flume、Kafka、Shell等,旨在从海量用户行为数据中提取有价值的信息,例如页面浏览...

Global site tag (gtag.js) - Google Analytics