`

广度优先BFS的MapReduce实现

 
阅读更多

社交网络中的图模型经常需要构造一棵树型结构:从一个特定的节点出发,例如,构造mary的朋友以及mary朋友的朋友的一棵树。

为构造这样的一棵树,最简单的方法是使用广度优先算法:

 

经常使用链表来表示图的节点以及节点之间的链接关系,如

 

frank -> {mary, jill}
jill -> {frank, bob, james}
mary -> {william, joe, erin} 

 

 表示,mary有3个朋友,分别是william,joe和erin

 

 将上述关系形式化表示为

 

0-> {1, 2}
2-> {3, 4, 5}
1-> {6, 7, 8} 

 

有了上述链表结构,我们可以得到:

单线程的BFS如下:

1、节点对象建模Node.java

 

import java.util.*;

public class Node {
	public static enum Color {
		WHITE, GRAY, BLACK
	};

	private final int id;
	private int parent = Integer.MAX_VALUE;
	private int distance = Integer.MAX_VALUE;
	private List<Integer> edges = null;
	private Color color = Color.WHITE;

	public Node(int id) {
		this.id = id;
	}

	public int getId() {
		return this.id;
	}

	public int getParent() {
		return this.parent;
	}

	public void setParent(int parent) {
		this.parent = parent;
	}

	public int getDistance() {
		return this.distance;
	}

	public void setDistance(int distance) {
		this.distance = distance;
	}

	public Color getColor() {
		return this.color;
	}

	public void setColor(Color color) {
		this.color = color;
	}

	public List<Integer> getEdges() {
		return this.edges;
	}

	public void setEdges(List<Integer> vertices) {
		this.edges = vertices;
	}
}

 

 

2、BFS算法 Graph.java

 

import java.util.*;

public class Graph {

  private Map<Integer, Node> nodes;

  public Graph() {
    this.nodes = new HashMap<Integer, Node>();
  }

  public void breadthFirstSearch(int source) {

    // Set the initial conditions for the source node
    Node snode = nodes.get(source);
    snode.setColor(Node.Color.GRAY);
    snode.setDistance(0);

    Queue<Integer> q = new LinkedList<Integer>();
    q.add(source);

    while (!q.isEmpty()) {
      Node unode = nodes.get(q.poll());

      for (int v : unode.getEdges()) {
        Node vnode = nodes.get(v);
        if (vnode.getColor() == Node.Color.WHITE) {
          vnode.setColor(Node.Color.GRAY);
          vnode.setDistance(unode.getDistance() + 1);
          vnode.setParent(unode.getId());
          q.add(v);
        }
      }
      unode.setColor(Node.Color.BLACK);
    }
  }

  public void addNode(int id, int[] edges) {

    // A couple lines of hacky code to transform our
    // input integer arrays (which are most comprehensible
    // write out in our main method) into List<Integer>
    List<Integer> list = new ArrayList<Integer>();
    for (int edge : edges)
      list.add(edge);

    Node node = new Node(id);
    node.setEdges(list);
    nodes.put(id, node);
  }
  
  public void print() {
    for (int v : nodes.keySet()) {
      Node vnode = nodes.get(v);
      System.out.printf("v = %2d parent = %2d distance = %2d \n", vnode.getId(), vnode.getParent(),
          vnode.getDistance());
    }
  }

  public static void main(String[] args) {

    Graph graph = new Graph();
    graph.addNode(1, new int[] { 2, 5 });
    graph.addNode(2, new int[] { 1, 5, 3, 4 });
    graph.addNode(3, new int[] { 2, 4 });
    graph.addNode(4, new int[] { 2, 5, 3 });
    graph.addNode(5, new int[] { 4, 1, 2 });

    graph.breadthFirstSearch(1);
    graph.print();
  }
}

 

 

 

但是以上BFS单线程构造树形结构对于大数据的时候,显得苍白无力。

对此,下面提出基于MapReduce的BFS并行构造社交网络中的树图算法

 

使用MapReduce计算图模型,基本思想是在每个Map slot的迭代中“makes a mess” 而在 Reduce slot中“cleans up the mess”

假设,我们用如下方式表示一个节点:

 

ID    EDGES|DISTANCE_FROM_SOURCE|COLOR|

 

 其中,EDGES是一个用“,”隔开的链接到本节点的其他节点链表List,对于我们不知道链表中的节点到本节点的距离,

使用Integer.MAX_VALUE表示"unknown"。

从COLOR,我们可以知道本节点我们计算过没有,WHITE表示计算过。

假设,我们的输入数据如下,我们从节点1开始广度优先搜索,因此,初始时,标记节点1的距离为0,color为GRAY

 

1       2,5|0|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|
3       2,4|Integer.MAX_VALUE|WHITE|
4       2,3,5|Integer.MAX_VALUE|WHITE|
5       1,2,4|Integer.MAX_VALUE|WHITE|

 

 map slot负责找出所有COLOR为GEAY的节点。而,对于每个我们计算过的节点,即COLOR为GRAY的节点,对应地,map slot的输出为一个COLOR为BLACK的节点,其中的DISTANCE = DISTANCE + 1。同时,map slot也输出所有不是GEAY的节点,其中距离不变。

因此,上述输入的输出形式如下:

 

1       2,5|0|BLACK|
2       NULL|1|GRAY|
5       NULL|1|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|
3       2,4|Integer.MAX_VALUE|WHITE|
4       2,3,5|Integer.MAX_VALUE|WHITE|
5       1,2,4|Integer.MAX_VALUE|WHITE|

 

 

在reduce slot获取的数据都具有同一个key。例如,获取key=2的reduce slot的对应values值为:

 

2       NULL|1|GRAY|
2       1,3,4,5|Integer.MAX_VALUE|WHITE|

 

 reduce slot的任务是从获取到的数据,经过采用:

 

 1、有邻接节点的节点

 2、所有有邻接节点的节点中的最小距离

 3、所有有邻接节点中颜色最深的节点

构造出新的输出,如,经过第一次MapReduce过程,我们得到如下形式的数据:

 

1       2,5,|0|BLACK
2       1,3,4,5,|1|GRAY
3       2,4,|Integer.MAX_VALUE|WHITE
4       2,3,5,|Integer.MAX_VALUE|WHITE
5       1,2,4,|1|GRAY

 

 第二次MapReduce过程,采用上述输出作为输入,以相同的逻辑运算,得到如下结果:

 

1       2,5,|0|BLACK
2       1,3,4,5,|1|BLACK
3       2,4,|2|GRAY
4       2,3,5,|2|GRAY
5       1,2,4,|1|BLACK

 

 第三次的输出为:

 

1       2,5,|0|BLACK
2       1,3,4,5,|1|BLACK
3       2,4,|2|BLACK
4       2,3,5,|2|BLACK
5       1,2,4,|1|BLACK

 

 

MapReduce迭代过程直到所有节点不为GRAY为止。

而如果有节点没有连接到源节点,那么可能迭代过程每次都有COLOR为WHITE的节点。

 

 

MapReduce的代码如下:

1、节点对象建模:Node.java

 

package org.apache.hadoop.examples;

import java.util.*;
import org.apache.hadoop.io.Text;

public class Node {
	public static enum Color {
		WHITE, GRAY, BLACK
	};

	private final int id;
	private int distance;
	private List<Integer> edges = new ArrayList<Integer>();
	private Color color = Color.WHITE;

	public Node(String str) {
		String[] map = str.split("\t");
		String key = map[0];
		String value = map[1];
		String[] tokens = value.split("\\|");
		this.id = Integer.parseInt(key);
		for (String s : tokens[0].split(",")) {
			if (s.length() > 0) {
				edges.add(Integer.parseInt(s));
			}
		}
		if (tokens[1].equals("Integer.MAX_VALUE")) {
			this.distance = Integer.MAX_VALUE;
		} else {
			this.distance = Integer.parseInt(tokens[1]);
		}
		this.color = Color.valueOf(tokens[2]);
	}

	public Node(int id) {
		this.id = id;
	}

	public int getId() {
		return this.id;
	}

	public int getDistance() {
		return this.distance;
	}

	public void setDistance(int distance) {
		this.distance = distance;
	}

	public Color getColor() {
		return this.color;
	}

	public void setColor(Color color) {
		this.color = color;
	}

	public List<Integer> getEdges() {
		return this.edges;
	}

	public void setEdges(List<Integer> edges) {
		this.edges = edges;
	}

	public Text getLine() {
		StringBuffer s = new StringBuffer();
		for (int v : edges) {
			s.append(v).append(",");
		}
		s.append("|");
		if (this.distance < Integer.MAX_VALUE) {
			s.append(this.distance).append("|");
		} else {
			s.append("Integer.MAX_VALUE").append("|");
		}
		s.append(color.toString());
		return new Text(s.toString());
	}
}
 

 

2、MapRecue广度优先搜索:

 

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * This is an example Hadoop Map/Reduce application. 
* 
* It inputs a map in adjacency list format, and performs a breadth-first search.
 * The input format is
 * ID   EDGES|DISTANCE|COLOR
 * where
 * ID = the unique identifier for a node (assumed to be an int here)
 * EDGES = the list of edges emanating from the node (e.g. 3,8,9,12)
 * DISTANCE = the to be determined distance of the node from the source
 * COLOR = a simple status tracking field to keep track of when we're finished with a node
 * It assumes that the source node (the node from which to start the search) has
 * been marked with distance 0 and color GRAY in the original input.  All other
 * nodes will have input distance Integer.MAX_VALUE and color WHITE.
 */
public class GraphSearch extends Configured implements Tool {

  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.examples.GraphSearch");

  /**
   * Nodes that are Color.WHITE or Color.BLACK are emitted, as is. For every
   * edge of a Color.GRAY node, we emit a new Node with distance incremented by
   * one. The Color.GRAY node is then colored black and is also emitted.
   */
  public static class MapClass extends MapReduceBase implements
      Mapper<LongWritable, Text, IntWritable, Text> {

    public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output,
        Reporter reporter) throws IOException {

      Node node = new Node(value.toString());

      // For each GRAY node, emit each of the edges as a new node (also GRAY)
      if (node.getColor() == Node.Color.GRAY) {
        for (int v : node.getEdges()) {
          Node vnode = new Node(v);
          vnode.setDistance(node.getDistance() + 1);
          vnode.setColor(Node.Color.GRAY);
          output.collect(new IntWritable(vnode.getId()), vnode.getLine());
        }
        // We're done with this node now, color it BLACK
        node.setColor(Node.Color.BLACK);
      }

      // No matter what, we emit the input node
      // If the node came into this method GRAY, it will be output as BLACK
      output.collect(new IntWritable(node.getId()), node.getLine());

    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase implements
      Reducer<IntWritable, Text, IntWritable, Text> {

    /**
     * Make a new node which combines all information for this single node id.
     * The new node should have 
     * - The full list of edges 
     * - The minimum distance 
     * - The darkest Color
     */
    public void reduce(IntWritable key, Iterator<Text> values,
        OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {

      List<Integer> edges = null;
      int distance = Integer.MAX_VALUE;
      Node.Color color = Node.Color.WHITE;

      while (values.hasNext()) {
        Text value = values.next();

        Node u = new Node(key.get() + "\t" + value.toString());

        // One (and only one) copy of the node will be the fully expanded
        // version, which includes the edges
        if (u.getEdges().size() > 0) {
          edges = u.getEdges();
        }

        // Save the minimum distance
        if (u.getDistance() < distance) {
          distance = u.getDistance();
        }

        // Save the darkest color
        if (u.getColor().ordinal() > color.ordinal()) {
          color = u.getColor();
        }

      }

      Node n = new Node(key.get());
      n.setDistance(distance);
      n.setEdges(edges);
      n.setColor(color);
      output.collect(key, new Text(n.getLine()));
     
    }
  }

  static int printUsage() {
    System.out.println("graphsearch [-m <num mappers>] [-r <num reducers>]");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  private JobConf getJobConf(String[] args) {
    JobConf conf = new JobConf(getConf(), GraphSearch.class);
    conf.setJobName("graphsearch");

    // the keys are the unique identifiers for a Node (ints in this case).
    conf.setOutputKeyClass(IntWritable.class);
    // the values are the string representation of a Node
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(MapClass.class);
    conf.setReducerClass(Reduce.class);

    for (int i = 0; i < args.length; ++i) {
      if ("-m".equals(args[i])) {
        conf.setNumMapTasks(Integer.parseInt(args[++i]));
      } else if ("-r".equals(args[i])) {
        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
      }
    }

    return conf;
  }

  /**
   * The main driver for word count map/reduce program. Invoke this method to
   * submit the map/reduce job.
   * 
   * @throws IOException
   *           When there is communication problems with the job tracker.
   */
  public int run(String[] args) throws Exception {

    int iterationCount = 0;

    while (keepGoing(iterationCount)) {

      String input;
      if (iterationCount == 0)
        input = "input-graph";
      else
        input = "output-graph-" + iterationCount;

      String output = "output-graph-" + (iterationCount + 1);

      JobConf conf = getJobConf(args);
      FileInputFormat.setInputPaths(conf, new Path(input));
      FileOutputFormat.setOutputPath(conf, new Path(output));
      RunningJob job = JobClient.runJob(conf);

      iterationCount++;
    }

    return 0;
  }
  
  private boolean keepGoing(int iterationCount) {
    if(iterationCount >= 4) {
      return false;
    }
    
    return true;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new GraphSearch(), args);
    System.exit(res);
  }

}

 

参考:

breadth-first graph search using an iterative map-reduce algorithm


分享到:
评论

相关推荐

    MapReduce-algorithms

    并行广度优先搜索(BFS)是一种在分布式环境中寻找图中最短路径的有效方法。通过将图分割成多个子图,并在不同的处理器上并行执行BFS,可以显著提高搜索速度。 ##### 5.3 PageRank算法 PageRank算法最初由Google...

    基于Hadoop的分布式网络爬虫设计与实现.docx

    2. **网络爬虫技术综述**:详细阐述了网络爬虫的基本原理和常用算法,包括深度优先搜索(DFS)、广度优先搜索(BFS)等,并探讨了网络爬虫的数据存储与处理策略。 3. **分布式网络爬虫系统设计**:结合Hadoop的特点,...

    基于MapReduce的可达性保留图计算方法

    他们首先利用BFS(广度优先搜索)结果来获得原图数据集的所有强连通分量,然后通过标签传播算法对这些分量进行压缩,并通过分析基于可达性等价关系的BFS结果来进一步压缩等价类,最终得到一个压缩比良好、可以加速...

    程序员实用算法_源码

    7. **图形与网络流**:可能包括最大流、最小割等网络流算法,以及图的遍历算法如深度优先搜索(DFS)和广度优先搜索(BFS)。 8. **编码与解码**:如哈夫曼编码、LZW压缩、CRC校验等,这些算法在数据传输和存储中至关...

    基于大数据分析的分布式MOLAP技术研究.docx

    - **广度优先搜索(BFS)**:逐层展开维的结构,确保每个维度都被均匀地探索。 这些遍历算法的选择取决于具体的应用场景和性能需求。例如,在需要快速获取顶层概览数据时,BFS可能更为合适;而当需要深入分析某一个...

    基于java的分布式爬虫框架.zip

    常见的策略有深度优先搜索(DFS)、广度优先搜索(BFS)以及基于优先级的队列。 6. **数据库存储**:爬取的数据通常需要存储,以便后续分析。Java可以与多种数据库进行交互,如MySQL、MongoDB等,使用JDBC或NoSQL...

    算法-搜索- 概述.rar

    典型的图搜索算法有深度优先搜索(DFS)和广度优先搜索(BFS)。DFS优先探索分支的深处,而BFS则先访问离起点近的节点。A*搜索算法是一种启发式搜索方法,结合了DFS和BFS的优点,并引入了估价函数以指导搜索,通常...

    数据结构与算法实验报告和实验源代码

    图的遍历算法有深度优先搜索(DFS)和广度优先搜索(BFS),常用于寻找路径或拓扑排序;动态规划则通过解决子问题来求解原问题,如斐波那契数列、背包问题等。 在大数据的背景下,数据结构和算法的重要性更加凸显。...

    java算法大全源码包_近100种算法打包

    2. 搜索算法:如二分查找、广度优先搜索(BFS)、深度优先搜索(DFS)等,这些算法在数据检索和图遍历等方面有着广泛应用。 3. 数据结构:如栈、队列、链表、树(二叉树、平衡树、堆树等)、图等数据结构的实现,...

    Algorithm-awesome-algorithm.zip

    搜索算法是另一大重点,如二分查找、深度优先搜索(DFS)和广度优先搜索(BFS)。二分查找在有序数组中寻找特定元素时展现出高效性,而DFS和BFS则常用于遍历图和树结构,理解它们的不同策略可以帮助我们在处理复杂...

    各大企业大数据面试题目总结

    15. **树相关的算法**:层次遍历的变种可能是深度优先搜索(DFS)或广度优先搜索(BFS)的扩展,例如反向层次遍历、按条件遍历等。 16. **JVM内存布局**:Java虚拟机内存分为堆、栈、方法区、本地方法栈、程序...

    分布式网络爬虫系统的研究现状.pdf

    1. 爬虫策略:包括广度优先搜索(BFS)、深度优先搜索(DFS)等基本图形搜索算法,也包括结合了特定需求的定制化遍历策略。这些策略决定了爬虫如何高效地从网页中抓取信息。 2. 爬虫节点:在分布式网络爬虫系统中,每个...

    scala程序员面试算法宝典代码

    - 广度优先搜索(BFS)和深度优先搜索(DFS):用于遍历或搜索图结构。 - 最短路径算法:Dijkstra算法、Floyd-Warshall算法用于找出图中两点间的最短路径。 - Kruskal's算法和Prim's算法:用于最小生成树问题。 ...

    程序员BAT面试求职算法大全(只介绍算法、不含源代码)

    图论是研究点和边关系的数学分支,包括图的遍历(深度优先搜索DFS和广度优先搜索BFS)、最短路径(Dijkstra算法、Floyd-Warshall算法)、最小生成树(Prim算法、Kruskal算法)以及拓扑排序等。 六、递归 递归是解决...

    数据结构--赵致琢课件.rar

    图数据结构则可能涵盖深度优先搜索(DFS)和广度优先搜索(BFS),以及最小生成树(Prim或Kruskal算法)、最短路径(Dijkstra或Floyd算法)等经典算法。 此外,数据结构的学习离不开算法。排序和查找是两个重要的...

    多核计算中的分布式数据结构

    例如,BFS(广度优先搜索)和DFS(深度优先搜索)可以通过并行化来加速。 6. 分布式锁:在多线程或多进程环境中,分布式锁是保证数据一致性的重要工具。它们确保在任何时刻只有一个核心能够访问特定的资源或执行...

    百度10年商务搜索笔试题

    - **广度优先遍历(Breadth-First Search, BFS)**: - **定义**: 广度优先遍历是从树或图的一个顶点出发,首先访问所有邻近顶点,然后再逐一访问它们的邻近顶点的过程。 - **特点**: - 使用队列来保存访问过的节点...

    开发基于 Nutch 的集群式搜索引擎

    - 从一个或多个种子URL开始,采用广度优先搜索策略(BFS)遍历网络。 - 抓取过程中涉及的操作数据库包括CrawlDB和LinkDB。 - Nutch解析器支持多种格式的文档解析,例如HTML、XML、RSS、PDF等。 - 解析后的文档...

Global site tag (gtag.js) - Google Analytics