`
sharong
  • 浏览: 494408 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
D1667ae2-8cfc-3b68-ac7c-5e282789fa4a
论开源
浏览量:8747
7eb53364-fe48-371c-9623-887640be0185
Spring-data-j...
浏览量:13098
社区版块
存档分类
最新评论

分治和hash-从海量数据大文件中查出5分钟内连续登陆超过阈值的ip地址

 
阅读更多
一个很大的文件,例如10G,仅包含ip地址和访问时间二列,格式如下:
127.0.0.1   2013-07-22 14:00
127.0.0.1   2013-07-22 14:02
127.0.0.1   2013-07-22 14:03
127.0.0.3   2013-07-22 14:03
127.0.0.1   2013-07-22 14:04
127.0.0.1   2013-07-22 14:05
127.0.0.1   2013-07-22 14:06
127.0.0.1   2013-07-22 14:07
127.0.0.1   2013-07-22 14:08
127.0.0.1   2013-07-22 14:09
127.0.0.1   2013-07-22 14:10
127.0.0.1   2013-07-22 14:11
127.0.0.1   2013-07-22 14:12
127.0.0.1   2013-07-22 14:13
127.0.0.4   2013-07-22 14:13
127.0.0.1   2013-07-22 14:15
127.0.0.1   2013-07-22 14:16
127.0.0.4   2013-07-22 14:17
... ...

从文件里查出在5分钟内连续登陆10次以上的ip地址集合并输出。这类问题是一个很常见的应用,通常都是从大的log日志文件中找出有攻击嫌疑的ip。

这类应用因为要处理分析的文件非常大,显然不能将整个文件全部读入内存,然后进行下一步工作。常见的比较成熟的解决方案有:分治+Hash,Bloom filter,2-Bitmap等。可参考
http://blog.csdn.net/v_JULY_v/article/details/6279498
这里就使用第一种方式来解决。
下面是分治与hash的代码
public class DuplicateIP {
	private String delimiter = " ";
	private String FILE_PRE = "ip_";
	
	private int MAGIC = 10,BATCH_MAGIC = 500;
	private String root = "/DuplicateIP/";
	
	private String filename = "";
	
	public DuplicateIP(final String filename) {
		this.filename = filename;
	}
	
	/**
	 * 将大文件拆分成较小的文件,进行预处理
	 * @throws IOException
	 */
	private void preProcess() throws IOException {
		//Path newfile = FileSystems.getDefault().getPath(filename);
		BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));			
		// 用5M的缓冲读取文本文件
		BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
		
		//假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断
		//如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,
		//当来访的ip地址足够多的情况下,内存开销吃不消
//		List<Entity> entities = new ArrayList<Entity>();
		
		//存放ip的hashcode->accessTimes集合
		Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>();
		String line = "";
		int count = 0;
		while((line = reader.readLine()) != null){
			String split[] = line.split(delimiter);
			if(split != null && split.length >= 2){
				//根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动
				//极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件
				int serial = split[0].trim().hashCode() % MAGIC;

				String splitFilename = FILE_PRE + serial;
				List<String> lines = hashcodeMap.get(splitFilename);
				if(lines == null){
					lines = new ArrayList<String>();
					
					hashcodeMap.put(splitFilename, lines);
				}
				lines.add(line);
			}
			
			count ++;
			if(count > 0 && count % BATCH_MAGIC == 0){
			    for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){  
			        //System.out.println(entry.getKey()+"--->"+entry.getValue());
			    	DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));	
			    }  
				//一次操作500之后清空,重新执行
				hashcodeMap.clear();
			}
		}
		
		reader.close();
		fis.close();
	}
	
	private boolean process() throws IOException{
		Path target = Paths.get(root);
		
		//ip -> List<Date>
		Map<String,List<Date>> resMap = new HashMap<String,List<Date>>();
		this.recurseFile(target,resMap);
		
		for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){
			System.out.println(entry.getKey());
			for(Date date : entry.getValue()){
				System.out.println(date);
			}			
		}
		
		return true;
	}
	
	/**
	 * 递归执行,将5分钟内访问超过阈值的ip找出来
	 * 
	 * @param parent
	 * @return
	 * @throws IOException 
	 */
	private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{
		//Path target = Paths.get(dir);
		if(!Files.exists(parent) || !Files.isDirectory(parent)){
			return;
		}
		
		Iterator<Path> targets = parent.iterator();
		for(;targets.hasNext();){
			Path path = targets.next();
			if(Files.isDirectory(parent)){
				//如果还是目录,递归
				recurseFile(path.toAbsolutePath(),resMap);
			}else {
				//将一个文件中所有的行读上来
				List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8"));
				judgeAndcollection(lines,resMap);
			}			
		}
	}
	
	/**
	 * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip
	 * 并放入resMap
	 * 
	 * @param lines
	 * @param resMap
	 */
	private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
		if(lines != null){
			//ip->List<String>accessTimes
			Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
			for(String line : lines){
				line = line.trim();
				int space = line.indexOf(delimiter);
				String ip = line.substring(0, space);
				
				List<String> accessTimes = judgeMap.get(ip);
				if(accessTimes == null){
					accessTimes = new ArrayList<String>();					
				}
				accessTimes.add(line.substring(space + 1).trim());
				judgeMap.put(ip, accessTimes);
			}
			
			if(judgeMap.size() == 0){
				return;
			}
			
			for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
				List<String> acessTimes = entry.getValue();
				//相同ip,先判断整体大于10个
				if(acessTimes != null && acessTimes.size() >= MAGIC){
					//开始判断在List集合中,5分钟内访问超过MAGIC=10
					List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC);
					if(attackTimes != null){
						resMap.put(entry.getKey(), attackTimes);
					}
				}
			}
		}
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		String filename = "/DuplicateIP/log.txt";
		DuplicateIP dip = new DuplicateIP(filename);
		try {
			dip.preProcess();
			dip.process();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

下面是工具类,提供了一些文件读写及查找的功能
public class DuplicateUtils {
	/**
	 * 根据给出的数据,往给定的文件形参中追加一行或者几行数据
	 * 
	 * @param file
	 * @throws IOException 
	 */
	public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException {
		if(accessTimes != null){
			Path target = Paths.get(splitFilename);
			if(target == null){
				createFile(splitFilename);
			}
			return Files.write(target, accessTimes, cs);//, options)
		}
		
		return null;
	}
	
	/**
	 * 创建文件
	 * @throws IOException 
	 */
	public static void createFile(String splitFilename) throws IOException {
		Path target = Paths.get(splitFilename);
		Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-");
		FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
		Files.createFile(target, attr);
	}
	
	public static Date stringToDate(String dateStr,String dateStyle){
		if(dateStr == null || "".equals(dateStr))
			return null;
		
		DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss"); 
		try {
			return format.parse(dateStr);
		} catch (ParseException e) {
			e.printStackTrace();
			return null;
		}  
	}
	
	public static String dateToString(Date date,String dateStyle){
		if(date == null)
			return null;
		
		DateFormat format = new SimpleDateFormat(dateStyle); 
		return format.format(date);
	}
	
	/**
	 * 根据间隔时间,判断列表中的数据是否已经大于magic给出的魔法数
	 * 返回true or false
	 * 
	 * @param dates
	 * @param intervalDate
	 * @param magic
	 * @return
	 * @throws ParseException 
	 */
	public static boolean attack(List<String> dateStrs,long intervalDate,int magic) {
		if(dateStrs == null || dateStrs.size() < magic){
			return false;
		}
		
		List<Date> dates = new ArrayList<Date>();
		for(String date : dateStrs){
			if(date != null && !"".equals(date))
				dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
		}
		
		Collections.sort(dates);
		return judgeAttack(dates,intervalDate,magic);
	}
	
	public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){
		if(sequenceDates == null || sequenceDates.size() < magic){
			return false;
		}
		
		for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
			Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate);
			
			int count = 1;
			for(int i = x + 1;i< sequenceDates.size();i++){
				Date compareDate = sequenceDates.get(i);
			
				if(compareDate.before(dateAfter5))
					count ++ ;
				else 
					break;
			}
			
			if(count >= magic)
				return true;
		}

		return false;
	}
	
	/**
	 * 判断在间隔时间内,是否有大于magic的上限的数据集合,
	 * 如果有,则返回满足条件的集合
	 * 如果找不到满足条件的,就返回null
	 * 
	 * @param sequenceDates 已经按照时间顺序排序了的数组
	 * @param intervalDate
	 * @param magic
	 * @return
	 */
	public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
		if(sequenceDates == null || sequenceDates.size() < magic){
			return null;
		}
		
		List<Date> res = new ArrayList<Date>();
		for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
			Date souceDate = sequenceDates.get(x);			
			Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
			
			res.add(souceDate);
			
			for(int i = x + 1;i< sequenceDates.size();i++){
				Date compareDate = sequenceDates.get(i);
			
				if(compareDate.before(dateAfter5)){
					res.add(compareDate);
				}else 
					break;
			}
			
			if(res.size() >= magic)
				return res;
			else 
				res.clear();
		}

		return null;
	}

	public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
		if(dateStrs == null || dateStrs.size() < magic){
			return null;
		}
		
		List<Date> dates = new ArrayList<Date>();
		for(String date : dateStrs){
			if(date != null && !"".equals(date))
				dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss"));
		}
		
		Collections.sort(dates);
		return attackTimes(dates,intervalDate,magic);
	}
}
分享到:
评论

相关推荐

    使用Hash散列从海量IP地址中查找IP地址

    标题中的“使用Hash散列从海量IP地址中查找IP地址”是一个关于数据结构与算法的实践应用,主要涉及如何高效地在大量IP地址中进行查找操作。在这个问题中,使用了哈希(Hash)技术,它是一种将任意大小的数据映射到...

    python_geohash-0.8.5-cp310-cp310-win_amd64.whl.zip

    4. **范围查询**:通过设置一个Geohash的边界,可以获取该区域内所有其他Geohash,这对于地图应用和地理大数据分析非常关键。 在实际应用中,Geohash常用于地理信息系统(GIS)、社交网络的附近好友查找、物流配送...

    python_geohash-0.8.5-cp38-cp38-win_amd64.whl.zip

    `python_geohash-0.8.5-cp38-cp38-win_amd64.whl` 是一个专门为Python 3.8编译的Windows 64位平台的安装包,`.whl` 文件是Python的Wheel格式,它是预编译的Python包,可以直接通过pip安装,避免了编译过程,提高了...

    海量数据处理:十道面试题与十个海量数据处理方法总结

    - **问题概述**: 给定一天内的海量日志数据,从中找出访问百度次数最多的IP地址。 - **解决方案**: - **初步思路**: 将所有IP地址写入一个大文件,考虑到IP地址的范围为2^32,可以使用映射的方法将这些IP分散到较小...

    python_geohash-0.8.5-cp312-cp312-win_amd64.whl.zip

    `python_geohash-0.8.5-cp312-cp312-win_amd64.whl` 文件包含了Python Geohash库的源代码和所需的依赖项,已经预先编译为特定平台的字节码,用户可以通过`pip`工具轻松安装到Python环境中。例如,使用以下命令可以...

    python_geohash-0.8.5-cp39-cp39-win_amd64.whl.zip

    标题中的"python_geohash-0.8.5-cp39-cp39-win_amd64.whl.zip"表明这是一个与Python相关的压缩包,其中包含了一个名为"python_geohash-0.8.5-cp39-cp39-win_amd64.whl"的文件,这个文件是Python的轮子(wheel)格式...

    python_geohash-0.8.5-cp37-cp37m-win_amd64.whl.zip

    标题中的"python_geohash-0.8.5-cp37-cp37m-win_amd64.whl.zip"表明这是一个与Python相关的压缩包,特别提到了`geohash`,它是一个用于处理地理位置数据的库。版本号是0.8.5,`cp37`和`cp37m`指的是它适用于Python ...

    python_geohash-0.8.5-cp311-cp311-win_amd64.whl.zip

    标题中的"python_geohash-0.8.5-cp311-cp311-win_amd64.whl.zip"是一个Python软件包的压缩文件,它包含了Python的GeoHash库的一个特定版本。GeoHash是一种地理位置编码技术,用于将经纬度坐标转换成可存储和查询的...

    python_geohash-0.8.5-cp36-cp36m-win_amd64.whl.zip

    标题中的"python_geohash-0.8.5-cp36-cp36m-win_amd64.whl.zip"表明这是一个与Python相关的库,名为`geohash`,版本为0.8.5。它是一个适用于Python 3.6(cp36代表Python 3.6)且构建于Windows AMD64架构(win_amd64...

    海量数据处理

    海量数据处理是指在合理的时间内,对大规模数据集进行高效存储、管理和分析的技术过程。这种处理方式不仅涉及到数据的收集、清洗和存储,更重要的是通过各种算法和技术来实现数据分析和挖掘。 #### 二、海量数据...

    thinkphp5-使用SimHash进行海量内容数据查重

    在IT行业中,尤其是在大数据处理和搜索引擎优化的领域,内容查重是一个重要的问题。ThinkPHP5是一个流行的PHP框架,它提供了丰富的功能和工具来帮助开发者构建高效的应用程序。本篇文章将详细探讨如何在ThinkPHP5中...

    shiro-crypto-hash-1.4.0-API文档-中文版.zip

    赠送jar包:shiro-crypto-hash-1.4.0.jar; 赠送原API文档:shiro-crypto-hash-1.4.0-javadoc.jar; 赠送源代码:shiro-crypto-hash-1.4.0-sources.jar; 赠送Maven依赖信息文件:shiro-crypto-hash-1.4.0.pom; ...

    前端开源库-hash-stream

    总结,`hash-stream`是一个强大的工具,它简化了前端开发者在处理文件和数据流时计算哈希值的过程。通过流式处理,`hash-stream`不仅提高了效率,还降低了内存占用,是构建安全、可靠前端应用的重要组件。

    十道海量数据处理面试题

    例如,对于访问日志数据提取IP的问题,可以采用哈希(Hash)方法将数据分散到1024个小文件中,每个文件分别统计IP出现频率,最后合并结果找出频率最高的IP。 其次,需要掌握如哈希表、堆(Heap)、trie树等数据结构...

    文件hash算法,对大文件进行hash计算

    文件哈希算法是信息安全领域中的一个重要概念,它用于验证数据的完整性和一致性。在处理大文件时,这种算法尤其有用,因为它们能快速地生成一个固定长度的唯一标识符(哈希值),即使文件内容有微小变化,哈希值也会...

    python_geohash-0.8.5-cp35-cp35m-win_amd64

    python_geohash-0.8.5-cp35-cp35m-win_amd64

    Hash-1.0.4文件校验

    “可检大文件”意味着该工具功能强大,能够处理大容量的文件,这对于检查大型数据文件的完整性和一致性至关重要。“同时显示MD5,SHA1,CRC32”是关键特性,这些是三种常见的哈希算法,用于生成文件的数字指纹。MD5...

    python_geohash-0.8.5-cp37-cp37m-win_amd64

    python_geohash-0.8.5-cp37-cp37m-win_amd64

    shiro-crypto-hash-1.4.0-API文档-中英对照版.zip

    赠送jar包:shiro-crypto-hash-1.4.0.jar; 赠送原API文档:shiro-crypto-hash-1.4.0-javadoc.jar; 赠送源代码:shiro-crypto-hash-1.4.0-sources.jar; 赠送Maven依赖信息文件:shiro-crypto-hash-1.4.0.pom; ...

Global site tag (gtag.js) - Google Analytics