`
twtbgn
  • 浏览: 45168 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mapreduce简单实现基于物品的协同过滤算法

 
阅读更多

现在进到各种电商网站都会在页面给你推荐一些物品,那么这些推荐的物品是怎么得出来的呢?这里介绍一种协同过滤算法:基于物品的协同过滤算法。简单的说,就是给用户推荐他之前买过且平分高的相似的物品。该算法的主要思想是:

1.   建立物品的同现矩阵

就是说按用户分组,找出每2个物品在多少用户中同时出现的次数。

2.   建立用户对物品的评分矩阵

每个用户对每个物品的评分

3.   2个矩阵相乘,计算结果。

下面以一个简单的例子进行实际开发。

一份用户对商品的评价数据:

1  101  4
1  106  3
2  105  3
2  101  1
2  106  2
2  103  2
2  104  5
2  102  2
3  101  2
3  106  5
3  103  4
4  101  2
4  102  5
4  105  4
4  104  5
5  105  4
5  104  5
6  102  1
6  104  1
6  101  4
6  103  1
7  104  4
7  101  1
7  102  2
7  105  5
7  103  2
7  106  1
8  101  2

 第一个字段为用户id,第二个字段为商品id,第三个字段为评分。

1. 建立物品的同现矩阵

  101 102 103 104 105 106
101 7 4 4 4 3 4
102 4 4 3 4 3 2
103 4 4 4 3 2 3
104 4 4 3 5 4 2
105 3 3 2 4 4 2
106 4 2 3 2 2 4

 

2. 建立用户的评分矩阵

 

这里以用户4为例:

        u4
101 2
102 5
103 0
104 5
105 4
106 0

 

3.  矩阵相乘

 

  101 102 103 104 105 106           u4
101 7 4 4 4 3 4   101 2
102 4 4 3 4 3 2   102 5
103 4 4 4 3 2 3     X 103  0
104 4 4 3 5 4 2   104        5
105 3 3 2 4 4 2   105 4
106 4 2 3 2 2 4   106 0

结果为:

 

 

下面用mapreduce程序来实现以上的算法。

1. 上传文件

将用户对物品的评分文件上传至hdfs下的/test/cf/moive.txt  文件下。

2.  编写mapreduce

该mr程序分为4步。

第一步:按用户分组,计算其对每个物品的评分

第二步:每2个物品一组,计算出现次数,建立同现矩阵

第三步:2个矩阵相乘

第四步:找出推荐的商品

 

编写mr:

第一步:将输入文件为/test/twtbgn/cf/in,输出文件为 /test/twtbgn/cf/step1 

public class RecommendStep1 {
		
	public static class Step1Mapper extends MapReduceBase
								implements Mapper<LongWritable, Text, Text, Text>{
		private Text key = new Text();
		private Text value = new Text();
		
		public void map(LongWritable lineNum, Text line,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			String[] strings = line.toString().split("\\s+");
//			String[] strings = line.toString().split(" ");
			if(strings.length >= 3){
				key.set(strings[0]);
				value.set(strings[1]+":"+strings[2]);
				System.out.println("map key: " + strings[0]);
				System.out.println("map value: " + strings[1]+":"+strings[2]);
				output.collect(key, value);
			}
		}
	}
	
	public static class Step1Reducer extends MapReduceBase
								implements Reducer<Text, Text, Text, Text>{

		public void reduce(Text key, Iterator<Text> value,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			Map<String, List<Float>> movieScoreMap = new HashMap<String, List<Float>>();
			while (value.hasNext()) {
				Text text = (Text) value.next();
				String str = text.toString();
//				System.out.println("moviescore: " + str);
				String[] arr = str.split(":");
				if(arr.length > 1){
					List<Float> scoreList = movieScoreMap.get(arr[0]);
					if(scoreList == null){
						scoreList = new ArrayList<Float>();
						movieScoreMap.put(arr[0], scoreList);
					}
					scoreList.add(Float.valueOf(arr[1]));
				}
			}
			Set<Entry<String, List<Float>>> set = movieScoreMap.entrySet();
			StringBuffer valuebuBuffer = new StringBuffer();
			for(Entry<String, List<Float>> entry : set){
				String movieId = entry.getKey();
				List<Float> scoreList = entry.getValue();
				int length = scoreList.size();
				Float sum = new Float(0);
				for(Float score : scoreList){
					sum += score;
				}
				Float avg = sum/length;
				valuebuBuffer.append(movieId).append(":")
								.append(avg).append(",");
			}
			String valueStr = valuebuBuffer.toString();
//			System.out.println(valueStr);
			output.collect(key, new Text(valueStr.substring(0, valueStr.length()-1)));
		}
	}
	
	public static void main(String[] args) {
		String input = "hdfs://host261:9000/test/twtbgn/cf/in";
		String output = "hdfs://host261:9000/test/twtbgn/cf/step1";
		
		HdfsDaoImpl daoImpl = new HdfsDaoImpl();
		try {
			daoImpl.rmr(output);
			Thread.sleep(5000);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		JobConf conf = new JobConf(RecommendStep1.class);
		conf.setJobName("RecommendStep1");
		conf.addResource("classpath:/hadoop/core-site.xml");
		conf.addResource("classpath:/hadoop/hdfs-site.xml");
		conf.addResource("classpath:/hadoop/mapred-site.xml");
		
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		
		conf.setMapperClass(Step1Mapper.class);
//		conf.setCombinerClass(Step1Reducer.class);
		conf.setReducerClass(Step1Reducer.class);
		
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		
		FileInputFormat.setInputPaths(conf, new org.apache.hadoop.fs.Path(input));
		FileOutputFormat.setOutputPath(conf, new Path(output));
		
		try {
			JobClient.runJob(conf);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

第二步:将输入文件为/test/twtbgn/cf/step1 ,输出文件为 /test/twtbgn/cf/step2

 

public class RecommendStep2 {
	
	public static class Step2Mapper extends MapReduceBase 
								implements Mapper<LongWritable, Text, Text, IntWritable>{
		private static final IntWritable ONE = new IntWritable(1);
		
		public void map(LongWritable linenum, Text line,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			String[] arr = line.toString().split("\\s+");
			String[] movieScores = arr[1].split(",");
			int length = movieScores.length;
			for(int i = 0; i < length; i++){
				String movie = movieScores[i].split(":")[0];
				for(int j=0; j<length; j++){
					String movie1 = movieScores[j].split(":")[0];
					output.collect(getKey(movie, movie1), ONE);
				}
			}
		}
		
		private Text getKey(String str1, String str2){
			Text key = new Text();
			key.set(str1 + ":" + str2);
			return key;
		}
	}
	
	
	public static class Step2Reducer extends MapReduceBase
								implements Reducer<Text, IntWritable, Text, IntWritable>{

		public void reduce(Text text, Iterator<IntWritable> iterator,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			// TODO Auto-generated method stub
			int sum = 0;
			while (iterator.hasNext()) {
				IntWritable one = iterator.next();
				sum += one.get();
			}
			output.collect(text, new IntWritable(sum));
		}
	}
	
	public static void main(String[] args) {
		String input = "hdfs://host261:9000/test/twtbgn/cf/step1";
		String output = "hdfs://host261:9000/test/twtbgn/cf/step2";
		
		HdfsDaoImpl daoImpl = new HdfsDaoImpl();
		try {
			daoImpl.rmr(output);
			Thread.sleep(5000);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		JobConf conf = new JobConf(RecommendStep2.class);
		conf.setJobName("RecommendStep2");
		conf.addResource("classpath:/hadoop/core-site.xml");
		conf.addResource("classpath:/hadoop/hdfs-site.xml");
		conf.addResource("classpath:/hadoop/mapred-site.xml");
		
		conf.setMapOutputKeyClass(Text.class);
		conf.setMapOutputValueClass(IntWritable.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		
		conf.setMapperClass(Step2Mapper.class);
		conf.setReducerClass(Step2Reducer.class);
		
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		
		FileInputFormat.setInputPaths(conf, new org.apache.hadoop.fs.Path(input));
		FileOutputFormat.setOutputPath(conf, new Path(output));
		
		try {
			JobClient.runJob(conf);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

第三步:将输入文件为/test/cf/step1和/test/cf/step2 ,输出文件为 /test/cf/step3

 

第四步:将输入文件为/test/cf/step3 ,输出文件为 /test/cf/step4

 

分享到:
评论
1 楼 xingzhou888 2014-04-17  
你好!  看了你前两个mr的代码,写的很棒,想问下后面两个mr的代码能让我拜读下吗?谢谢! 

相关推荐

Global site tag (gtag.js) - Google Analytics