浏览 3926 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2008-09-25
翻译者:pconlin900 博客:http://pconline900.iteye.com Hadoop是apache的一个开源的map-reduce框架,MapReduce是一个并行计算模型,用来处理海量数据。模型思想来源于google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()两个主要的功能。 这是一个很简单的类似于Hadoop的MapReduce应用例子,应用了mapreduce的基本思想,可以帮助理解hadoop的处理思想和技术,但注意,它没有使用hadoop框架。 例子的功能是创建一些字符串,然后统计这些字符串里面每个字符出现的次数,最后汇总得到总的字符出现次数。 Listing 1. 主程序 public class Main { public static void main(String[] args) { MyMapReduce my = new MyMapReduce(); my.init(); } } Listing 2. MyMapReduce.java import java.util.*; public class MyMapReduce { List buckets = new ArrayList(); List intermediateresults = new ArrayList(); List values = new ArrayList(); public void init() { for(int i = 1; i<=30; i++) { values.add("http://pconline900.iteye.com" + new Integer(i).toString()); } System.out.println("**STEP 1 START**-> Running Conversion into Buckets**"); System.out.println(); List b = step1ConvertIntoBuckets(values,5); System.out.println("************STEP 1 COMPLETE*************"); System.out.println(); System.out.println(); System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all Buckets"); System.out.println(); List res = step2RunMapFunctionForAllBuckets(b); System.out.println("************STEP 2 COMPLETE*************"); System.out.println(); System.out.println(); System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results"); System.out.println(); step3RunReduceFunctionForAllBuckets(res); System.out.println("************STEP 3 COMPLETE*************"); System.out.println("************pconline900 翻译*************"); System.out.println("***********博客:http://pconline900.iteye.com*************"); } public List step1ConvertIntoBuckets(List list,int numberofbuckets) { int n = list.size(); int m = n / numberofbuckets; int rem = n% numberofbuckets; int count = 0; System.out.println("BUCKETS"); for(int j =1; j<= numberofbuckets; j++) { List temp = new ArrayList(); for(int i=1; i<= m; i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); temp = new ArrayList(); } if(rem != 0) { List temp = new ArrayList(); for(int i =1; i<=rem;i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); } System.out.println(); System.out.println(buckets); System.out.println(); return buckets; } public List step2RunMapFunctionForAllBuckets(List list) { for(int i=0; i< list.size(); i++) { List elementList = (ArrayList)list.get(i); new StartThread(elementList).start(); } try { Thread.currentThread().sleep(1000); }catch(Exception e) { } return intermediateresults; } public void step3RunReduceFunctionForAllBuckets(List list) { int sum =0; for(int i=0; i< list.size(); i++) { //you can do some processing here, like finding max of all results etc int t = Integer.parseInt((String)list.get(i)); sum += t; } System.out.println(); System.out.println("Total Count is "+ sum); System.out.println(); } class StartThread extends Thread { private List tempList = new ArrayList(); public StartThread(List list) { tempList = list; } public void run() { for(int i=0; i< tempList.size();i++) { String str = (String)tempList.get(i); synchronized(this) { intermediateresults.add(new Integer(str.length()).toString()); } } } } } init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理。 step1ConvertIntoBuckets()方法将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点; step2RunMapFunctionForAllBuckets()方法创建了5个线程(每个bucket一个),每个线程StartThread处理每个bucket并把处理结果放在intermediateresults这个arraylist中。 如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算。\ step3RunReduceFunctionForAllBuckets()方法加载intermediateresults中间处理结果,并进行汇总处理,最后得到最终的计算结果。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2009-05-05
怎么没有继续翻译呢,不知能不能把英文的文章地址给出?谢谢
|
|
返回顶楼 | |