`
zxxapple
  • 浏览: 80282 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

现阶段MapReduce框架 实现简单图的算法

阅读更多

刚开始接触hadoop的mapreduce并行计算的编程框架,使用的是java语言,对于一些简单的日志文档处理,相当的容易上手,但是经过一段时间的学习调研,发现用其实现一些图的算法,相当蹩脚,效率很低。。。

 

下面我列出下mapreduce实现图的单源最短路径的算法(伪代码)这里假设的是每个节点之间是单源节点

1: class Mapper
2: method Map(nid n,node N)
3: d ! N.Distance
4: Emit(nid n,N) Passalonggraphstructure
5: forall nodeid m ! N.AdjacencyListdo
6: Emit(nid m,d +1) Emitdistancestoreachablenodes

1: class Reducer
2:       method Reduce(nid m,[d ,d ,...])
3:      dmin=Max
4:      M=0
5:      forall d ! counts [d ,d ,...] do
6:     if IsNode(d) then
7:     M=d 
8:    elseif d<dmin then 
       dmin=d
9:    M.Distance=dmin
10:  Emit(nid m,node M)

 其中的要点是(采用的广度优先搜索的思想)

 

这只是其中的一次迭代,每次迭代都是一个job,而且每个job之间需要传输一个特殊的键值对  包含整个图的完整信息。

还有一个问题整体的迭代次数,什么时候停止呢  如何判断,我这里采用的Counter计数器,记录到达源节点的距离为Max的个数,根据这个个数来判断是否该停止工作,工作之间的数据传输,最好采用二进制文件来传输方便处理。图在mapreduce中存储是采用临界表的方式进行存储,比较方便

 

如果实现节点之间的边带权值的也很简单  只需要将上述的伪代码  d+1 改为d+对应的全知即可

 

整个工作需要一个驱动程序来控制作业。。下面是一个简单的实现 伪代码

public static void main(String[] args) throws Exception
	{
		long count = 0;
		int it=1;
		String input="/Graph/input/";
		String output="/Graph/output1";
		do
		{
			Job job = getNodeJob(input, output);
			job.waitForCompletion(true);
			Counters counter = job.getCounters();
			count = counter.findCounter(ReachableNodes.Reduce).getValue();
			
			input="/Graph/output"+it+"/part-r-00000";
			it++;
			output="/Graph/output"+it;
		} while (count != agr[0]);//所有节点都可达的时候结束迭代--
	}
	
	public static Job getNodeJob(String inPath,String outPath) throws Exception
	{
		Configuration conf = new Configuration();
		Job job=new Job(conf,"Iterable Node");
		
		job.setJarByClass(GraphDrive.class);
		
		job.setNumReduceTasks(1);

		job.setMapperClass(GraphMapper.class);
		job.setReducerClass(GraphReducer.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(BFSNode.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(BFSNode.class);
		
		job.setInputFormatClass(SequenceFileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		
		FileInputFormat.addInputPath(job, new Path(inPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
		
		return job;	
	}

 

 

下面我来说说我的问题,能帮我解惑的朋友谢谢指正,当把一个图存放到一个文件的时候 也就是一个map的时候 ,比较容易实现--很容易收敛,,但是如果采用多个map任务的时候  收敛会很慢  甚至死循环。。。很困惑

 

分享到:
评论

相关推荐

    Ch5-MapReduce算法设计1

    MapReduce是一种分布式计算框架,由Google开发,用于处理和生成大型数据集。它将复杂的计算任务分解为两个主要阶段:Map和Reduce,以及一个中间的Shuffle和Sort步骤。Map阶段将输入数据拆分为键值对,然后进行局部...

    大数据环境下的高效分布式增量序列挖掘.pdf

    文献[12]提出了一种云均匀分布式词汇序列树算法,该算法使用两阶段MapReduce框架来发现序列模式,能在云中提供完美的负载平衡和高度的可扩展性。 综合来看,本文的研究重点在于大数据环境下的增量序列模式挖掘问题...

    Hadoop从业者为什么需要Spark?

    5,Mahout前一阶段表示从现在起他们将不再接受任何形式的以MapReduce形式实现的算法,另外一方面,Mahout宣布新的算法基于Spark; 6,Cloudera的机器学习框架Oryx的执行引擎也将由Hadoop的MapReduce替换成Spark; ...

    spark技术原理(精华版)~亚当.pdf

    此外,Spark技术栈非常强大,包含SQL查询、流式计算、机器学习(MLlib)和图算法(GraphX)组件。这些组件可以无缝整合到同一个应用中,以应对各种复杂的计算需求。通过将数据处理、数据分析、机器学习等多种计算...

    spark大数据实践

    2. **DAGScheduler**:依赖关系图调度器负责将用户的程序转换成一系列的任务,并将这些任务划分成不同的阶段进行调度。 3. **TaskScheduler**:任务调度器负责具体的任务调度,如任务的分配和执行等。 4. **Block ...

    大数据相关的一些资料

    Map阶段将数据分解为键值对,Reduce阶段则将这些对进行聚合,以生成最终结果。Hadoop因其可扩展性和灵活性,被广泛应用于互联网公司、科研机构和企业内部的数据处理。 Storm是实时大数据处理系统,它可以连续处理...

    架构师月刊200907-201002期

    1. **云计算兴起**:2009年至2010年正值云计算技术快速发展阶段,杂志可能详细讨论了云计算的定义、分类(如IaaS、PaaS、SaaS),以及Amazon AWS、Google App Engine等早期云服务提供商的案例分析。 2. **SOA(面向...

Global site tag (gtag.js) - Google Analytics