前边提到了单线程的实现,这里贴出多线程版,此处主要用多线程去处理hash后的小文件:
- package com.kingdee.gmis.mass.data.ips;
- import static com.kingdee.gmis.mass.data.ips.MassIP.K10;
- import static com.kingdee.gmis.mass.data.ips.MassIP.getPartitionFile;
- import static com.kingdee.gmis.mass.data.ips.MassIP.partationCount;
- import static com.kingdee.gmis.mass.data.ips.MassIP.printResult;
- import static com.kingdee.gmis.mass.data.ips.MassIP.*;
- import java.io.BufferedInputStream;
- import java.io.DataInputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicInteger;
- import com.kingdee.gmis.common.TopNHeap;
- import com.kingdee.gmis.mass.data.ips.MassIP.IPInfo;
- public class ConcurrentMassIP {
- private static int workerCnt = 3;
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- // generateMassIp("ip", "ips.txt", 100000000);
- // generatePartitionFile("ip", "ips.txt", 100);
- searchTopN(20);
- }
- private static AtomicInteger curIdx = new AtomicInteger(-1);
- static File[] smallFiles;
- static TopNHeap<IPInfo> destHeap;
- /**
- * 查找出现次数最多的K个ip地址
- *
- * @param count
- * @throws Exception
- */
- public static void searchTopN(int count) throws Exception {
- Thread.sleep(10000);
- long start = System.currentTimeMillis();
- smallFiles = getPartitionFile("ip", partationCount);
- destHeap = new TopNHeap<MassIP.IPInfo>(count);
- int cnt = workerCnt;
- synchronized (lock) {
- for (int i = 0; i < cnt; i++) {
- new Thread(new Worker(i, count)).start();
- }
- lock.wait();
- printResult(destHeap);
- }
- System.out.println("Total spend " + (System.currentTimeMillis() - start) + " ms");
- }
- static Object lock = new Object();
- public static synchronized void mergeToResult(TopNHeap<IPInfo> srcHeap) {
- try {
- destHeap.mergeHeap(srcHeap);
- } finally {
- if (--workerCnt == 0) {
- synchronized (lock) {
- lock.notify();
- }
- }
- }
- }
- static class Worker implements Runnable {
- private int topCount;
- private int id;
- public Worker(int id, int count) {
- this.id = id;
- this.topCount = count;
- }
- @Override
- public void run() {
- int curFileIdx;
- DataInputStream dis = null;
- Map<Integer, Integer> ipCountMap = new HashMap<Integer, Integer>();
- TopNHeap<IPInfo> heap = new TopNHeap<IPInfo>(topCount);
- int processCnt = 0;
- while ((curFileIdx = curIdx.addAndGet(1)) < partationCount) {
- processCnt++;
- ipCountMap.clear();
- try {
- dis = new DataInputStream(new BufferedInputStream(
- new FileInputStream(smallFiles[curFileIdx]), K10));
- while (dis.available() > 0) {
- int ip = dis.readInt();
- Integer cnt = ipCountMap.get(ip);
- ipCountMap.put(ip, cnt == null ? 1 : cnt + 1);
- }
- searchMostCountIps(ipCountMap, heap);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (dis != null) {
- try {
- dis.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- System.out.println("Thread " + this.id + " process " + processCnt
- + " files.");
- mergeToResult(heap);
- }
- }
- }
测试了下,3线程是120s左右,串行是320s左右,的确提高了很多。测试机子是4核cpu,如果启4个线程,机器都变得很卡,cpu居高不下。另jvm启动参数为:
- -server
- -Xmx1024m
- -Xms1024m
- -Xmn600m
- -XX:+UseConcMarkSweepGC
- -XX:ParallelGCThreads=4
因为此处理过程中,大多数对象存活周期并不长,所以可以把新生代设置大一些。堆初始化设置大些,避免minor gc的时候才去扩展内存大小,因为可以预料到程序一旦启动,加载的内存的东西就会很多。
另外,此处垃圾收集器是用了cms,老生代内存回收就用了cms,新生代用了PurNew,并行收集的,设置gc线程数等于cpu内核数。当然这里也可以设置成-XX:+UseParallelGC、-XX:+UseParallelOldGC都可以,此处主要是新生代内存回收频繁,所以一定要把新生代设置成并发或并行版本的。
通过-server把虚拟机启动为server模式,这样运行时候会启用c2级别的JIT优化,能获得更高质量的编译代码。当然server模式下启动的jvm,默认使用的gc收集器跟-XX:UseParallelGC使用的一样
http://yueyemaitian.iteye.com/blog/1207913
相关推荐
标题 "提取出某日访问网站次数最多的那K个IP" 涉及的是数据分析和数据处理方面的技术,主要目标是从海量的日志数据中找出在特定日期内访问网站频率最高的K个IP地址。在这个过程中,我们可以使用多种编程语言和工具来...
1.提取出某日访问百度次数最多的那个IP 2.有一个1G大小的一个文件,里面每一行是一个词 3.给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url? 4.在2.5亿个整数中找出...
在这个场景中,我们有海量日志数据,问题是如何提取出某日访问百度次数最多的那个 IP? 解决方案 1:首先是这一天,并且是访问百度的日志中的 IP 取出来,逐个写入到一个大文件中,然后采用映射的方法,找出每个小...
第一部分、十道海量数据处理面试题 1、海量日志数据,提取出某日访问百度次数最多的那个IP。 此题,在我之前的一篇文章算法里头有所提到,当时给出的方案是:IP的数目还是有限的,最多2^32个,所以可以考虑使用hash...
示例中的代码均基于python2.7.10##问题1海量日志,提取出某日访问百度次数最多的那个IP。 ##问题2寻找热门查询,300万个查询字符串中统计最热门的10个查询。 ##问题3有一个1G大小的一个文件,里面每行是一个词,词的...
大数据面试题目的解决方法可以应用于实际的数据处理中,例如,在海量日志数据中,使用Bloom filter可以快速判断某个元素是否存在于一个集合中,而使用Hashing可以快速提取出某日访问百度次数最多的那个IP。...
1. 提取出某日访问百度次数最多的IP 这是一个典型的分布式计算问题。首先,通过对IP地址取模将海量日志分散到多个小文件中,例如模1000,生成1000个小文件。然后,对每个小文件使用哈希映射(如hash_map)统计IP地址...
一、海量日志数据,提取出某日访问百度次数最多的那个 IP 这是一个典型的 hash 表应用问题,解决方案是使用 hash 表将 IP 直接存入内存,然后进行统计。在实际操作中,可以将大文件映射为多个小文件,然后找出每个...
海量日志数据,提取出某日访问百度次数最多的那个IP。 方案 1:首先是这一天,并且是访问百度的日志中的IP 取出来,逐个写入到一个大文件中。注意到 IP 是 32 位的,最多有个IP。同样可以采用映射的方法,比如模...
提取出某日访问百度次数最多的IP #### 题目描述 从海量日志数据中,找出某天访问百度次数最多的IP地址。 #### 解答思路 - 使用哈希策略将日志数据分布到多个小文件中,例如通过IP地址的哈希值对1024取模,得到...
对于海量日志数据,例如需要提取出某日访问百度次数最多的那个IP,可以使用映射的方法,即%1000将整个大文件映射为1000个小文件,然后逐个写入到一个大文件中,然后对每个小文件中的所有IP进行频率统计,最后在这...
1. 提取出某日访问百度次数最多的 IP: 这题通常需要使用流式处理,如 MapReduce 或者 Hadoop。可以先通过 Map 阶段对每个 IP 进行计数,然后在 Reduce 阶段汇总并找出最大值。 2. 找出频数最高的 100 个词: ...
提取某日访问百度次数最多的那个IP** **问题描述**:给定一天内的大量日志数据,需要从中提取出访问百度次数最多的那个IP。 **解决方案**:考虑到IP地址的数量有限,最多2^32个,可以考虑使用哈希表(如`hash_map...
4. 海量日志数据分析:有海量日志数据,要求提取出某日访问百度次数最多的那个 IP。解决方案是首先提取出这一天的日志中的 IP,然后使用映射的方法将其分割为 1000 个小文件,最后找出每个小文件中出现频率最大的 IP...
2. 海量日志数据,提取出某日访问百度次数最多的那个 IP。 3. 如何根据输入元素个数 n,确定位数组 m 的大小及哈希函数个数 k。 这些问题可以使用上述方法来解决,例如使用 Bloom filter 或 Hashing 等方法来查找、...
4. 给定海量日志数据,需要提取出某日访问百度次数最多的那个IP。解决方法是使用HashMap来统计每个IP的访问次数,然后使用堆排序来输出访问次数最多的IP。 5. 给定2.5亿个整数,需要找出不重复的整数,内存空间不...
**1、海量日志数据,提取出某日访问百度次数最多的那个IP** - **问题概述**: 给定一天内的海量日志数据,从中找出访问百度次数最多的IP地址。 - **解决方案**: - **初步思路**: 将所有IP地址写入一个大文件,考虑...
* 海量日志数据,提取出某日访问百度次数最多的那个IP(大数据处理) * 有10个文件,每个文件1G,每个文件的每一行都存放的是用户的query,每个文件的query都可能重复。如何按照query的频度排序?(大数据处理) ...
问题实例:海量日志数据,提取出某日访问百度次数最多的那个IP。IP的数目还是有限的,最多2^32个,所以可以考虑使用hash将ip直接存入内存,然后进行统计。 3.bit-map: Bit-map是一种常见的大数据处理方法,适用...