刚开始接触hadoop的mapreduce并行计算的编程框架,使用的是java语言,对于一些简单的日志文档处理,相当的容易上手,但是经过一段时间的学习调研,发现用其实现一些图的算法,相当蹩脚,效率很低。。。
下面我列出下mapreduce实现图的单源最短路径的算法(伪代码)这里假设的是每个节点之间是单源节点
1: class Mapper
2: method Map(nid n,node N)
3: d ! N.Distance
4: Emit(nid n,N) Passalonggraphstructure
5: forall nodeid m ! N.AdjacencyListdo
6: Emit(nid m,d +1) Emitdistancestoreachablenodes
1: class Reducer
2: method Reduce(nid m,[d ,d ,...])
3: dmin=Max
4: M=0
5: forall d ! counts [d ,d ,...] do
6: if IsNode(d) then
7: M=d
8: elseif d<dmin then
dmin=d
9: M.Distance=dmin
10: Emit(nid m,node M)
其中的要点是(采用的广度优先搜索的思想)
这只是其中的一次迭代,每次迭代都是一个job,而且每个job之间需要传输一个特殊的键值对 包含整个图的完整信息。
还有一个问题整体的迭代次数,什么时候停止呢 如何判断,我这里采用的Counter计数器,记录到达源节点的距离为Max的个数,根据这个个数来判断是否该停止工作,工作之间的数据传输,最好采用二进制文件来传输方便处理。图在mapreduce中存储是采用临界表的方式进行存储,比较方便
如果实现节点之间的边带权值的也很简单 只需要将上述的伪代码 d+1 改为d+对应的全知即可
整个工作需要一个驱动程序来控制作业。。下面是一个简单的实现 伪代码
public static void main(String[] args) throws Exception
{
long count = 0;
int it=1;
String input="/Graph/input/";
String output="/Graph/output1";
do
{
Job job = getNodeJob(input, output);
job.waitForCompletion(true);
Counters counter = job.getCounters();
count = counter.findCounter(ReachableNodes.Reduce).getValue();
input="/Graph/output"+it+"/part-r-00000";
it++;
output="/Graph/output"+it;
} while (count != agr[0]);//所有节点都可达的时候结束迭代--
}
public static Job getNodeJob(String inPath,String outPath) throws Exception
{
Configuration conf = new Configuration();
Job job=new Job(conf,"Iterable Node");
job.setJarByClass(GraphDrive.class);
job.setNumReduceTasks(1);
job.setMapperClass(GraphMapper.class);
job.setReducerClass(GraphReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(BFSNode.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(BFSNode.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
return job;
}
下面我来说说我的问题,能帮我解惑的朋友谢谢指正,当把一个图存放到一个文件的时候 也就是一个map的时候 ,比较容易实现--很容易收敛,,但是如果采用多个map任务的时候 收敛会很慢 甚至死循环。。。很困惑
分享到:
相关推荐
MapReduce是一种分布式计算框架,由Google开发,用于处理和生成大型数据集。它将复杂的计算任务分解为两个主要阶段:Map和Reduce,以及一个中间的Shuffle和Sort步骤。Map阶段将输入数据拆分为键值对,然后进行局部...
文献[12]提出了一种云均匀分布式词汇序列树算法,该算法使用两阶段MapReduce框架来发现序列模式,能在云中提供完美的负载平衡和高度的可扩展性。 综合来看,本文的研究重点在于大数据环境下的增量序列模式挖掘问题...
5,Mahout前一阶段表示从现在起他们将不再接受任何形式的以MapReduce形式实现的算法,另外一方面,Mahout宣布新的算法基于Spark; 6,Cloudera的机器学习框架Oryx的执行引擎也将由Hadoop的MapReduce替换成Spark; ...
此外,Spark技术栈非常强大,包含SQL查询、流式计算、机器学习(MLlib)和图算法(GraphX)组件。这些组件可以无缝整合到同一个应用中,以应对各种复杂的计算需求。通过将数据处理、数据分析、机器学习等多种计算...
2. **DAGScheduler**:依赖关系图调度器负责将用户的程序转换成一系列的任务,并将这些任务划分成不同的阶段进行调度。 3. **TaskScheduler**:任务调度器负责具体的任务调度,如任务的分配和执行等。 4. **Block ...
Map阶段将数据分解为键值对,Reduce阶段则将这些对进行聚合,以生成最终结果。Hadoop因其可扩展性和灵活性,被广泛应用于互联网公司、科研机构和企业内部的数据处理。 Storm是实时大数据处理系统,它可以连续处理...
1. **云计算兴起**:2009年至2010年正值云计算技术快速发展阶段,杂志可能详细讨论了云计算的定义、分类(如IaaS、PaaS、SaaS),以及Amazon AWS、Google App Engine等早期云服务提供商的案例分析。 2. **SOA(面向...