`
zxxapple
  • 浏览: 79066 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

MapReduce框架中PageRank算法的简单实现

阅读更多

主要实现思想在另一篇博客中已经提到:

 

具体实现每次迭代包括两个Job

第一个分散各个节点的PR值

 

第二个用于将dangling节点的PR值分散到其它节点

 

主要包括5个类

PageRankNode:图中的节点类-代表一个页面

PageRankJob:实现分散各个节点的PR值的类

DistributionPRMass:实现dangling节点的PR值分散到其它节点的Job类

RangePartitioner:partition类  将连续的节点分配到同一个reduce中

PageRankDirver:整个工作的驱动类(主函数)

 

package com.zxx.PageRank;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class PageRankJob
{
	public static final double d = 0.85;
	private static final double nodecount = 10;
	private static final double threshold=0.01;//收敛邻接点
	
	public static enum MidNodes
	{
		 // 记录已经收敛的个数
			Map, Reduce
	};

	public static class PageRankMaper extends Mapper<Object, Text, Text, Text>
	{
		@Override
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			PageRankNode node = PageRankNode.InstanceFormString(value.toString());
			node.setOldPR(node.getNewPR());
			context.write(new Text(node.getId()), new Text(PageRankNode.toStringWithOutID(node)));

			for (String str : node.getDestNodes())
			{
				String outPR = new Double(node.getNewPR() / (double)node.getNumDest()).toString();
				context.write(new Text(str), new Text(outPR));
			}
		}
	}

	public static class PageRankJobReducer extends Reducer<Text, Text, Text, Text>
	{
		private double totalMass = Double.NEGATIVE_INFINITY; // 缓存每个key从其它点得到的全部PR值
		private double missMass=Double.NEGATIVE_INFINITY;

		@Override
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			PageRankNode currentNode = new PageRankNode(key.toString());
			double inPR = 0.0;

			for (Text val : values)
			{
				String[] temp = val.toString().trim().split("\\s+");
				if (temp.length == 1) // 此时候只输出一个PR值
				{
					inPR += Double.valueOf(temp[0]);
				} else if (temp.length >= 4)
				{// 此时输出的是含有邻接点的节点全信息
					currentNode = PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());
				} else if (temp.length == 3)
				{ // 此时输出的点没有出度
					context.getCounter("PageRankJobReducer", "errornode").increment(1);
					currentNode=PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());
				}
			}
            if (currentNode.getNumDest()>=1)
			{
            	double newPRofD = (1 - PageRankJob.d) /(double) PageRankJob.nodecount + PageRankJob.d * inPR;
    			currentNode.setNewPR(newPRofD);
    			context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));
			}else if (currentNode.getNumDest()==0) {
				
				missMass=currentNode.getOldPR();//得到dangling节点的上一次的PR值,传播到下一个分布Pr的job
			}
			
			totalMass += inPR;
			double partPR=(currentNode.getNewPR()-currentNode.getOldPR())*(currentNode.getNewPR()-currentNode.getOldPR());
			if (partPR<=threshold)
			{
				context.getCounter(MidNodes.Reduce).increment(1);
			}
		}

		@Override
		public void cleanup(Context context) throws IOException, InterruptedException
		{
			// 将total记录到文件中
			Configuration conf = context.getConfiguration();
			String taskId = conf.get("mapred.task.id");
			String path = conf.get("PageRankMassPath");// 注意此处的path路径设置------------------
            
			if (missMass==Double.NEGATIVE_INFINITY)
			{
				return;
			}
			FileSystem fs = FileSystem.get(context.getConfiguration());
			FSDataOutputStream out = fs.create(new Path(path + "/"+"missMass"), false);
			out.writeDouble(missMass);
			out.close();
		}
	}
}

 

package com.zxx.PageRank;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

import com.zxx.Graph.ArrayListOfInts;
import com.zxx.Graph.BFSNode;
import com.zxx.Graph.HMapII;
import com.zxx.Graph.MapII;
import com.zxx.Graph.ReachableNodes;

public class DistributionPRMass
{
	public class GraphMapper extends Mapper<Object, Text, Text, Text>
	{
		private double missingMass = 0.0;
	    private int nodeCnt = 0;
		@Override
		public void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration();

		    missingMass = (double)conf.getFloat("MissingMass", 0.0f);//该值等于1-totalMass
		    nodeCnt = conf.getInt("NodeCount", 0);
		}
		@Override
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			PageRankNode currentNode=PageRankNode.InstanceFormString(value.toString().trim());
			currentNode.setOldPR(currentNode.getNewPR());
			
			double p=currentNode.getNewPR();
			double pnew=(1-PageRankJob.d)/(double)(nodeCnt-1)+PageRankJob.d*missingMass/(double)(nodeCnt-1);
			//double pnew=missingMass/(double)(nodeCnt-1);
			currentNode.setNewPR(p+pnew);
			context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));
		}

		@Override
		public void cleanup(Context context) throws IOException, InterruptedException
		{
			
		}
	}
}

 

package com.zxx.PageRank;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Partitioner;

public class RangePartitioner extends Partitioner<Text, Text> implements Configurable
{

	private int nodeCnt = 0;
	private Configuration conf;

	public RangePartitioner() {}
	@Override
	public Configuration getConf()
	{
		return conf;
	}

	@Override
	public void setConf(Configuration arg0)
	{
		this.conf = arg0;
	    configure();
	}

	@Override
	public int getPartition(Text arg0, Text arg1, int arg2)
	{
		return (int) ((float)(Integer.parseInt(arg0.toString()) / (float) nodeCnt) * arg2) % arg2;
	}
	private void configure()   //获得节点的总数
	{
		nodeCnt = conf.getInt("NodeCount", 0);
	}

}

 

 

 

package com.zxx.PageRank;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class PageRankDirver
{
    public static final int numNodes=5;  //节点数
    public static final int maxiter=10;   //最大收敛次数
	public static void main(String[] args) throws Exception
	{
		long count=0;  //缓存已经接近收敛的节点个数
		int it=1;
		int num=1;
		String input="/Graph/input/";
		String output="/Graph/output1";
		do{
			Job job=getPageRankJob(input, output);
			job.waitForCompletion(true);
			
			Counters counter = job.getCounters();
			count = counter.findCounter(PageRankJob.MidNodes.Reduce).getValue();
			
			
			input="/Graph/output"+it;
			it++;
			output="/Graph/output"+it;
			
		    Job job1=getDistrbuteJob(input,output);
		    job1.waitForCompletion(true);
		    
		    input="/Graph/output"+it;
			it++;
			output="/Graph/output"+it;
			
			if(num<maxiter)
			System.out.println("it:"+it+" "+count);
			num++;
		}while(count!=numNodes);

	}
	
	public static Job getPageRankJob(String inPath,String outPath) throws Exception
	{
		Configuration conf = new Configuration();
		Job job=new Job(conf,"PageRank job");
		
		job.getConfiguration().setInt("NodeCount", numNodes);
	    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
	    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
	    
	    job.getConfiguration().set("PageRankMassPath", "/mass");
	    

		job.setJarByClass(PageRankDirver.class);
		
		job.setNumReduceTasks(5);

		job.setMapperClass(PageRankJob.PageRankMaper.class);
		job.setReducerClass(PageRankJob.PageRankJobReducer.class);
		job.setPartitionerClass(RangePartitioner.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		FileInputFormat.addInputPath(job, new Path(inPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
		
		return job;	
	}

	public static Job getDistrbuteJob(String inPath,String outPath) throws Exception
	{
		Configuration conf = new Configuration();
		Job job=new Job(conf,"Ditribute job");
		
		double mass = Double.NEGATIVE_INFINITY;                //一下是读取dangling节点的PR值,将其分配到其他节点
	    FileSystem fs = FileSystem.get(conf);
	    for (FileStatus f : fs.listStatus(new Path("/mass/missMass")))
	    {
	      FSDataInputStream fin = fs.open(f.getPath());
	      mass = fin.readDouble();
	      fin.close();
	    }
	    job.getConfiguration().setFloat("MissingMass",(float)mass);
		job.getConfiguration().setInt("NodeCount", numNodes);
		job.getConfiguration().setInt("NodeCount", numNodes);
	    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
	    job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
	    
	    job.getConfiguration().set("PageRankMassPath", "/mass");
	    

		job.setJarByClass(PageRankDirver.class);
		
		job.setNumReduceTasks(5);

		job.setMapperClass(PageRankJob.PageRankMaper.class);
		job.setReducerClass(PageRankJob.PageRankJobReducer.class);
		job.setPartitionerClass(RangePartitioner.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
		FileInputFormat.addInputPath(job, new Path(inPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		
		FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除
		
		return job;	
	}
}
 

 

 

 

分享到:
评论
6 楼 zhaohuiweixiao 2013-05-20  
博主,输入文件的格式是什么样的呀?能举例说明一下吗?急用。谢谢!
5 楼 bleath2046 2013-04-15  
博主你好,最近在学hadoop你上面程序的输入文件应该是设么样的,希望方便时告诉我一下,我邮箱是381568990@qq.com  谢谢!
4 楼 schaha 2012-07-02  
前段时间忙,现在又开始看pagerank了。
楼主,程序我没看明白,在Hadoop环境中运行有错,求指导!
3 楼 schaha123 2012-05-18  
谢谢啊,这几天仔细研究一下。
2 楼 zxxapple 2012-05-18  
实现很简单的

我在这里给你贴出来吧
package com.zxx.PageRank;

import java.util.*;

import javax.naming.spi.DirStateFactory.Result;
import javax.xml.soap.Node;

public class PageRankNode
{
	private String id;
	private List<String> destNodes=new ArrayList<String>();
    private double oldPR;
    private double newPR;
    private int numDest;
    
    public PageRankNode()
    {
    	
    }
    
    public PageRankNode(String id)
    {
        this.id=id;	
    }
    
    public static String toStringWithOutID(PageRankNode node)
    {
    	StringBuffer temp=new StringBuffer();
    	temp.append(node.getOldPR());
    	temp.append("\t"+node.getNewPR());
    	temp.append("\t"+node.getNumDest());
    	for(String dest:node.getDestNodes())
    	{
    		temp.append("\t"+dest);
    	}
    	return temp.toString();
    }

    public static PageRankNode InstanceFormString(String nodeStr)
    {
    	PageRankNode node=new PageRankNode();
    	String[] res=nodeStr.split("\\s+");
    	node.setId(res[0]);
    	
    	if (res.length==2)
    	{
    		node.setNewPR(Double.valueOf(res[1]));
    	}else if (res.length>4)
		{
			node.setOldPR(Double.valueOf(res[1]));
			node.setNewPR(Double.valueOf(res[2]));
			node.setNumDest(Integer.valueOf(res[3]));
			for (int i = 4; i < res.length; i++)
			{
				node.getDestNodes().add(res[i]);
			}
			assert(node.getNumDest()==node.getDestNodes().size());
		}
    	return node;
    }
	public String getId()
	{
		return id;
	}

	public void setId(String id)
	{
		this.id = id;
	}

	public List<String> getDestNodes()
	{
		return destNodes;
	}

	public void setDestNodes(List<String> destNodes)
	{
		this.destNodes = destNodes;
	}

	public double getOldPR()
	{
		return oldPR;
	}

	public void setOldPR(double oldPR)
	{
		this.oldPR = oldPR;
	}

	public double getNewPR()
	{
		return newPR;
	}

	public void setNewPR(double newPR)
	{
		this.newPR = newPR;
	}

	public int getNumDest()
	{
		return numDest;
	}

	public void setNumDest(int numDest)
	{
		this.numDest = numDest;
	}
    
    
}

1 楼 schaha123 2012-05-18  
您好,楼主,我现在正在做Nutch中实现PageRank算法,想看看你在MapReduce编程模式上是怎么实现的,好像少写一个类啊--PageRankNode类。

相关推荐

    Hadoop-MapReduce下的PageRank矩阵分块算法

    PageRank算法作为Web结构挖掘领域的经典算法,在Google搜索引擎中的成功应用已经充分证明了其在评估网页重要性方面的价值。然而,传统的PageRank算法在处理大规模数据集时,面临着迭代次数多、时间与空间消耗大的...

    MapReduce 之PageRank 算法概述

    为了更好地理解PageRank算法的工作原理,我们可以通过一个简化的例子来进行说明。 1. **假设场景**:假设一个小型网络由四个网页组成(A、B、C、D),这些网页之间的链接关系如下图所示: ``` A ----&gt; B | | | v v...

    山东大学大数据实验三:Hadoop实现PageRank

    在Hadoop MapReduce框架中,迭代器通常用于处理需要多次迭代直到收敛的情况,如PageRank算法。在这个类中,可能包含了计算和更新每个网页PageRank值的逻辑,以及检查收敛条件的代码。 3. **GraphBuilder.java**:此...

    PageRank算法的mapreduce实现

    在PageRank算法的实现中,MapReduce框架发挥了重要作用,因为它能够高效地处理海量数据并行计算。 Map阶段: 在这个阶段,输入数据是互联网上的网页集合,每个网页通常包含一个URL和指向其他网页的链接。Map函数的...

    java实现和Matlab语言实现的pagerank算法

    在MATLAB的`pagerank.m`文件中,实现PageRank算法可能包括以下步骤: 1. **导入数据**:读取网页链接关系的数据,这通常是一个稀疏矩阵,节省内存空间。 2. **设置参数**:定义迭代次数上限、衰减因子等参数。 3. *...

    pagerank_大数据pagerank算法代码_pageRank_

    在Python中实现PageRank,通常包括以下几个步骤: 1. **构建邻接矩阵**:邻接矩阵表示网页间的链接关系,行代表源网页,列代表目标网页,值表示链接权重。 2. **初始化PageRank**:所有网页初始PageRank值平均分配...

    基于分布式PageRank算法的可疑目标挖掘.pdf

    分布式PageRank算法利用MapReduce框架,将链接分析任务分配到成千上万个计算节点上进行,从而实现对大量URL的快速分析和处理。 此外,分布式PageRank算法通过适应大矩阵计算,可以有效地分析挖掘可疑URL目标。在...

    基于MapReduce的网页排序算法

    通过上述方式,我们可以利用MapReduce在并行计算环境中高效地处理大规模网页排序问题,实现了分布式环境下PageRank算法的高效执行。这种方法不仅提高了计算速度,还确保了在互联网规模增长时算法的可扩展性。

    pagerank.zip

    总结来说,本项目是使用Java和MapReduce技术实现PageRank算法,处理的数据集来源于"web-Google.txt"文件,它展示了如何在分布式环境中高效处理大规模的网络链接数据,评估网页的重要性。这不仅加深了对PageRank算法...

    PageRank算法实时大数据实验报告广工(Map Reduce)(附源码)

    实验内容 1. 采用基于“抽税”法在MapReduce框架下,分析图1的网页PageRank排名; 2. 图1中,若节点②和节点⑤是主题节点,采用面向主题的PageRank算法重新计算所有节点的PageRank值。

    PageRank:MapReduce 中的 PageRank 实现

    在MapReduce框架中实现PageRank,可以处理大规模的互联网数据,有效地计算网页之间的链接关系,从而评估每个网页的重要性。 PageRank的基本思想是,一个网页被其他网页链接的数量和质量反映了它的影响力。每个网页...

    MiniGoogle:由搜寻器,索引器,PageRank,MapReduce框架,搜索算法和前端组成

    迷你谷歌由搜寻器,索引器,PageRank,MapReduce框架,搜索算法和前端组成。 搜寻器,索引器和pagerank作为mapreduce作业运行。 该系统利用了一个主节点和八个工作/数据节点,这些节点通过REST调用进行通信。

    基于Spark+PageRank算法构建仿微博用户好友的分布式推荐系统.zip

    本项目“基于Spark+PageRank算法构建仿微博用户好友的分布式推荐系统”就是这样一个例子,它将大数据处理框架Spark与经典的PageRank算法相结合,旨在实现大规模社交网络中的用户好友推荐。 首先,Spark是Apache ...

    PageRank:Google 的 PageRank 算法在 MapReduce 范式中的实现。 Apache Hadoop、Java

    在本文中,我们将深入探讨PageRank算法及其在MapReduce范式中的实现,同时关注Apache Hadoop和Java在其中的应用。 PageRank算法的核心思想是,一个被许多其他网页链接的页面具有较高的权威性,因为它被视为有价值的...

    《MapReduce数据密集型文本处理》.pdf

    4. 图算法:探索了如何在MapReduce框架下实现图算法,如并行广度优先搜索(Parallel Breadth-First Search)和PageRank算法。这在社交网络分析、网络拓扑等领域尤为重要。 5. EM算法在文本处理中的应用:介绍了期望...

    Experiment-of-a-Comparison-with-Iterative-MR-frameworks:我们为在迭代 MapReduce 框架上运行迭代算法而实现的示例代码

    5. **实验设计**:项目中的实验可能包括设置不同的迭代算法,如朴素贝叶斯分类、K-means聚类等,并在多种迭代MapReduce框架上运行,记录和比较其运行时间、资源消耗以及最终结果的准确性。 6. **代码结构**:压缩包...

    Pagerank实验.zip

    4. 分布式计算:如果涉及Hadoop,会讨论如何在Hadoop MapReduce框架下分布式执行PageRank算法,以处理大数据量。 5. 结果分析:展示计算出的PageRank值,并可能与其他方法的结果进行比较,讨论其意义和局限性。 ...

    云计算 mapreduce - <Data-Intensive[1].Text.Processing.With.MapReduce>

    - **EM算法在MapReduce中的应用**:分析如何在MapReduce框架中实现EM算法。 - **案例研究:词对齐**:通过一个具体的例子——词对齐,来展示EM算法的实际应用,特别是在统计机器翻译中的作用。 通过以上核心知识...

Global site tag (gtag.js) - Google Analytics