`
luweimstr
  • 浏览: 19161 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

最短路径Mapreduce实现

阅读更多

 

¢A map task receives
lKey: node n
lValue: D (distance from start); points-to (list of nodes reachable from n)
¢p \in  points-to: emit (p, D+1)
¢The reduce task gathers possible distances to a given p and selects the minimum one
input文件格式为:<node id><distance to the start node><[node id it can reach]:>
map过程中根据每个node id的distance输出“VALUE”类型键值对:[(node id it can reach):(distance + 1)]。如果distance是inf,则不输出“VALUE”类型的键值对;另外还需要输出node自身的连接信息,即“NODE”类型的键值对:<node id>:<[node id it can reach]:>。
Reduce过程输入为<node id>[VALUE distance]/[NODE node id it can reach:]。计算出VALUE list中的最小值,输出为<node id><min of distance><node id it can reach> (和输入文件格式一致)。
Mapper代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DjksMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

	private String MAXDIS = Integer.MAX_VALUE + "";

	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {

	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// Key is node n
		// Value is D, Points-To
		// For every point (or key), look at everything it points to.
		// Emit or write to the points to variable with the current distance + 1
		Text word = new Text();
		String line = value.toString();// looks like 1 0 2:3:
		String[] sp = line.split(" ");// splits on space
		if (sp[1].compareTo(MAXDIS) != 0) { // we don't care those lines with
											// MAX distance
			int distanceadd = Integer.parseInt(sp[1]) + 1;
			String[] PointsTo = sp[2].split(":");
			for (int i = 0; i < PointsTo.length; i++) {
				word.set("VALUE " + distanceadd);// tells me to look at distance
													// value
				context.write(new LongWritable(Integer.parseInt(PointsTo[i])),
						word);
				word.clear();
			}
		}
		// pass in current node's distance (if it is the lowest distance)
		word.set("VALUE " + sp[1]);
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

		word.set("NODES " + sp[2]);// tells me to append on the final tally
		context.write(new LongWritable(Integer.parseInt(sp[0])), word);
		word.clear();

	}
}
Reducer代码:
package com.hadoop.dijkstra;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class DjksReducer extends
		Reducer<LongWritable, Text, LongWritable, Text> {

	public void reduce(LongWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin, Univ
		// @ Maryland)
		// The key is the current point
		// The values are all the possible distances to this point
		// we simply emit the point and the minimum distance value

		String nodes = "UNMODED";
		Text word = new Text();
		int lowest = Integer.MAX_VALUE;// start at infinity

		for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we need to
									// use the first as a key
			String[] sp = val.toString().split(" ");// splits on space
			// look at first value
			if (sp[0].equalsIgnoreCase("NODES")) {
				nodes = null;
				nodes = sp[1];
			} else if (sp[0].equalsIgnoreCase("VALUE")) {
				int distance = Integer.parseInt(sp[1]);
				lowest = Math.min(distance, lowest);
			}
		}
		word.set(lowest + " " + nodes);
		context.write(key, word);
		word.clear();
	}
}
 
         算法需要迭代多次最终得到各个顶点到start顶点的最短距离。每个迭代round都是一次mapreduce。后一轮的输入为前一轮的输出,直到结果与上一轮相同。主程序代码:
package com.hadoop.dijkstra;

import java.io.*;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Djkstra {
	public static String OUT = "outfile";
	public static String IN = "inputlarger";

	public static void main(String args[]) throws IOException,
			InterruptedException, ClassNotFoundException {

		// set in and out to args.
		IN = args[0];
		OUT = args[1];

		String infile = IN;
		String outputfile = OUT + System.nanoTime();

		boolean isdone = false;
		@SuppressWarnings("unused")
		boolean success = false;

		HashMap<Integer, Integer> _map = new HashMap<Integer, Integer>();

		while (isdone == false) {
			Configuration conf = new Configuration();
			conf.set("mapred.textoutputformat.separator", " ");// make the key
																// -> value
																// space
																// separated
																// (for
																// iterations)
			Job job = new Job(conf);
			job.setJarByClass(Djkstra.class);
			job.setJobName("Dijkstra");

			job.setMapperClass(DjksMapper.class);
			job.setReducerClass(DjksReducer.class);

			FileInputFormat.addInputPath(job, new Path(infile));
			FileOutputFormat.setOutputPath(job, new Path(outputfile));

			job.setMapOutputKeyClass(LongWritable.class);
			job.setMapOutputValueClass(Text.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(Text.class);

			success = job.waitForCompletion(true);

			// remove the input file
			if (infile != IN) {
				String indir = infile.replace("part-r-00000", "");
				Path ddir = new Path(indir);
				FileSystem dfs = FileSystem.get(conf);
				dfs.delete(ddir, true);
			}

			// output path as the input path for next round
			// TODO: what if there are more than one reducers?
			infile = outputfile + "/part-r-00000";
			outputfile = OUT + System.nanoTime();

			// do we need to re-run the job with the new input file??
			isdone = true;// set the job to NOT run again!
			Path ofile = new Path(infile);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));

			HashMap<Integer, Integer> imap = new HashMap<Integer, Integer>();
			String line = br.readLine();
			while (line != null) {
				// each line looks like 0 1 2:3:
				// we need to verify node -> distance doesn't change
				String[] sp = line.split(" ");
				int node = Integer.parseInt(sp[0]);
				int distance = Integer.parseInt(sp[1]);
				imap.put(node, distance);
				line = br.readLine();
			}
			if (_map.isEmpty()) {
				// first iteration... must do a second iteration regardless!
				isdone = false;
			} else {
				Iterator<Integer> itr = imap.keySet().iterator();
				while (itr.hasNext()) {
					int key = itr.next();
					int val = imap.get(key);
					if (_map.get(key) != val) {
						// values aren't the same... we aren't at convergence
						// yet (iterate until results are the same as last round)
						isdone = false;
					}
				}
			}
			if (isdone == false) {
				_map.putAll(imap);// copy imap to _map for the next iteration
									// (if required)
			}
		}

	}
}
 
 
 
 
 

 

分享到:
评论

相关推荐

    MapReduce实现单元最短路径算法.doc

    《MapReduce实现单元最短路径算法》 MapReduce是一种分布式计算模型,常用于处理大数据集。在本文档中,我们探讨了如何利用MapReduce来解决图论中的单元最短路径问题。单元最短路径问题旨在寻找图中从一个源节点到...

    单源最短路径算法(MapReduce)源代码

    本篇文章主要解析一个关于单源最短路径(Single Source Shortest Path, SSSP)算法在MapReduce框架下的实现源代码。单源最短路径问题是指在一个带有权重的有向图或无向图中找到从一个指定的起点到所有其他顶点的最短...

    算法项目-实时最短路径

    实时最短路径算法是计算机科学领域中的一个重要...通过理解和掌握以上知识点,可以设计并实现一个高效的实时最短路径算法。这个项目可能涵盖了从基础的图论概念到高级的并行计算技术,是深入学习和实践算法的好平台。

    最短路径算法分类体系与研究进展

    此外,分布式系统如MapReduce也可用于大规模图的最短路径计算,将问题分解为多个子任务并行处理。 最后,研究进展部分可能涵盖了最新的优化策略和算法创新,如动态更新最短路径、在线算法、近似算法以及适应复杂...

    基于云计算的大规模交通路网的最短路径算法.pdf

    总之,基于云计算的大规模交通路网最短路径算法,通过利用MapReduce并行编程模型,实现了并行搜索方法,大大提高了大规模交通路网最短路径计算的效率和准确性。这项技术的发展和应用,对于提升智能交通系统的整体...

    基于云计算求解城市物流配送最短路径研究.pdf

    针对物流配送中路径优化的需求,本研究提出了一种基于云计算的求解方法,通过MapReduce并行算法和GIS仿真技术来解决大规模城市路网中的物流配送最短路径问题。这种方法可以有效地处理海量数据,并在有限的时间内给出...

    最短路径系列之一从零开始学习HADOOP

    最短路径系列之一从零开始学习HADOOP,只要有一台能上网的计算机,就可以让读者在最短的时间内,学会Hadoop的初级开发。所以,这本书只讲干货,也就是必须要知道的Hadoop的最核心知识点,包括如何搭建Hadoop,如何写...

    MapReduce求解物流配送单源最短路径研究

    【MapReduce求解物流配送单源最短路径研究】 随着电子商务的快速发展,物流配送成为了一个重要的环节,如何高效地规划配送路线以降低成本和提高效率,成为业界亟待解决的问题。本文针对这一挑战,提出了一种利用...

    最短路径系列之一从零开始学习Hadoop

    通过本书,读者能够在最短的时间内掌握Hadoop的初级知识,并具备搭建Hadoop环境、编写基础MapReduce程序以及查询API的能力。而对于Hadoop更深入的内容和细节问题,可以通过作者提供的书目进行更系统的学习。

    基于云计算的混合并行遗传算法求解最短路径.pdf

    本文提出的基于云计算的细粒度混合并行遗传算法,通过结合MapReduce编程模型、细粒度并行遗传算法和禁忌搜索算法,旨在提高求解最短路径问题的效率。具体来说,该方法首先利用MapReduce模型提升遗传算法编码效率,...

    一种基于Hadoop的大规模图最短路径查询方法

    标题:“一种基于Hadoop的大规模图最短路径查询方法”描述了研究论文的主要内容,即提出一种新的大规模图最短路径查询方法,该方法是基于Hadoop平台来实现的。在大数据环境下,对大规模图数据进行最短路径查询是一个...

    简单的MapReduce程序(Hadoop2.2.0)

    MapReduce是Apache Hadoop的核心组件之一,用于处理和存储大规模数据集。在这个场景中,我们讨论的是一个在Hadoop 2.2.0版本上运行的简单MapReduce程序,名为"MaxTemperature",它通常被用作入门示例来演示MapReduce...

    基于云计算的混合并行遗传算法求解最短路径

    方法采用云计算中Hadoop的MapReduce并行编程模型,提高编码效率,同时将细粒度并行遗传算法和禁忌搜索算法结合,提高了寻优算法的计算速度和局部寻优能力,进而提高最短路径的求解效率。仿真结果表明,该方法在计算...

    license.txt

    - **MapReduce框架**:可以利用MapReduce框架来实现分布式版本的Dijkstra算法或其他最短路径算法,通过将计算任务划分到多个节点上并行执行,显著提高计算效率。 - **Pregel模型**:这是一种基于顶点为中心的图处理...

    MapReduce下的Dijkstra并行算法研究.pdf

    本文主要探讨的是在MapReduce框架下,如何实现Dijkstra算法的并行化,以高效地解决大规模图中的最短路径问题。 Dijkstra算法是一种经典的单源最短路径算法,广泛应用于网络路由、图形理论等领域。其基本思想是从源...

    single-target-shortest-path:Hadoop Map Reduce实施,以找到到达特定目标的最短路径(在输入中给出)

    综上所述,"single-target-shortest-path"项目通过Hadoop MapReduce实现了寻找图中特定目标的最短路径功能,涉及到图论、分布式计算、Java编程等多个IT领域的核心知识点。理解和实现这个项目不仅可以深化对这些技术...

    readme.txt

    - **分布式计算**:通过MapReduce等框架将计算任务分散到多个节点上执行,特别适合处理大规模网络的最短路径问题。 - **近似算法**:对于某些场景,可以接受非精确解而换取更快的计算速度,这时可以考虑使用近似算法...

    MapReduce_SSSP.rar_mapReduce_mapreduce sssp_single_sssp

    MapReduce框架实现的单源最短路径算法(SSSP, Single-source Shortest Path),包含源文件及数据。sssp.c是MapReduce实现,ss seq.cpp是串行的普通实现。

    并行计算-实验指导-实验03-图论.doc

    - **最短路径并行算法**:并行版本通常利用并行优先队列或者分布式数据结构,比如在MapReduce框架下,可以将计算任务分配到多个节点,每个节点计算其负责部分的最短路径,然后在全局范围内同步和整合结果。...

    基于MapReduce框架下的复杂网络社团发现算法

    然而,GN算法在处理大规模网络时存在效率问题,因为它需要计算每对节点之间的最短路径,这在节点数量巨大时变得极其耗时。 为了解决这个问题,研究者提出了基于MapReduce模型的SPB-MRA(Shortest Path Betweenness ...

Global site tag (gtag.js) - Google Analytics