`

Mahout: distributed item-based algorithm 2

 
阅读更多
  • generating user vectors

Input format

userID: itemID1 itemID2 itemID3 ....

Output format

 a Vector from all item IDs for the user, and outputs the user ID mapped to the user’s preference vector. All values in this vector are 0 or 1. For example, 98955 / [590:1.0, 22:1.0, 9059:1.0]

package mia.recommender.ch06;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;

public final class WikipediaToItemPrefsMapper extends
		Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

	private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		Matcher m = NUMBERS.matcher(line);
		m.find();
		VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
		VarLongWritable itemID = new VarLongWritable();
		while (m.find()) {
			itemID.set(Long.parseLong(m.group()));
			context.write(userID, itemID);
		}
	}

}

 

package mia.recommender.ch06;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;

public class WikipediaToUserVectorReducer
		extends
		Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

	public void reduce(VarLongWritable userID,
			Iterable<VarLongWritable> itemPrefs, Context context)
			throws IOException, InterruptedException {
		Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
		for (VarLongWritable itemPref : itemPrefs) {
			userVector.set((int) itemPref.get(), 1.0f);
		}
		context.write(userID, new VectorWritable(userVector));
	}
}

 

  • calculating co-occurrence

Input format

user IDs mapped to Vectors of user preferences—the output of the above MapReduce. For example, 98955 / [590:1.0,22:1.0,9059:1.0]

Output format

rows—or columns—of the co-occurrence matrix. For example,590 / [22:3.0,95:1.0,...,9059:1.0,...]

package mia.recommender.ch06;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.Vector;

public class UserVectorToCooccurrenceMapper extends
		Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable> {

	public void map(VarLongWritable userID, VectorWritable userVector,
			Context context) throws IOException, InterruptedException {
		//Iterator<Vector.Element> it = userVector.get().iterateNonZero();
		Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();
		while (it.hasNext()) {
			int index1 = it.next().index();
//			Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
			Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();
			while (it2.hasNext()) {
				int index2 = it2.next().index();
				context.write(new IntWritable(index1), new IntWritable(index2));
			}
		}
	}
}

 

package mia.recommender.ch06;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class UserVectorToCooccurrenceReducer extends
		Reducer<IntWritable, IntWritable, IntWritable, VectorWritable> {

	public void reduce(IntWritable itemIndex1,
			Iterable<IntWritable> itemIndex2s, Context context)
			throws IOException, InterruptedException {
		Vector cooccurrenceRow = new RandomAccessSparseVector(
				Integer.MAX_VALUE, 100);
		for (IntWritable intWritable : itemIndex2s) {
			int itemIndex2 = intWritable.get();
			cooccurrenceRow.set(itemIndex2,
					cooccurrenceRow.get(itemIndex2) + 1.0);
		}
		context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
	}
}

 

  •  rethinking matrix multiplication

 conventional matrix multiplication

 
 

 The conventional algorithm necessarily touches the entire co-occurrence matrix, because it needs to perform a vector dot product with each row.

 

 alternative matrix compuatation

 

  But note that wherever element i of the user vector is 0, the loop iteration can be skipped entirely, because the product will be the zero vector and doesn’t affect the result. So this loop need only execute for each nonzero element of the user vector. The number of columns loaded will be equal to the number of preferences that the user expresses, which is far smaller than the total number of columns when the user vector is sparse.

 

  •  matrix multiplication by partial products

 The columns of the co-occurrence matrix are available from an earlier step. Because the matrix is symmetric, the rows and columns are identical, so this output can be viewed as either rows or columns, conceptually. The columns are keyed by item ID, and the algorithm must multiply each by every nonzero preference value for that item, across all user vectors. That is, it must map item IDs to a user ID and preference value,and then collect them together in a reducer. After multiplying the co-occurrence col-
umn by each value, it produces a vector that forms part of the final recommender vector R for one user.

 

The difficult part here is that two different kinds of data are joined in one computation: co-occurrence column vectors and user preference values. This isn’t by naturepossible in Hadoop, because values in a reducer can be of one Writable type only. A clever implementation can get around this by crafting a Writable that contains either one or the other type of data: a VectorOrPrefWritable.

 

The mapper phase here will actually contain two mappers, each producing different types of reducer input:

  1.  Input for mapper 1 is the co-occurrence matrix: item IDs as keys, mapped to columns as Vectors. For example, 590 / [22:3.0,95:1.0,...,9059:1.0,...] The map function simply echoes its input, but with the Vector wrapped in a VectorOrPrefWritable.
  2. Input for mapper 2 is again the user vectors: user IDs as keys, mapped to preference Vectors. For example, 98955 / [590:1.0,22:1.0,9059:1.0] For each nonzero value in the user vector, the map function outputs an item ID mapped to the user ID and preference value, wrapped in a VectorOrPrefWritable. For example, 590 / [98955:1.0]
  3. The framework collects together, by item ID, the co-occurrence column and all user ID–preference value pairs.
  4. The reducer collects this information into one output record and stores it.

 

package mia.recommender.ch06;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VectorWritable;

public class CooccurrenceColumnWrapperMapper extends
		Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

	public void map(IntWritable key, VectorWritable value, Context context)
			throws IOException, InterruptedException {
		context.write(key, new VectorOrPrefWritable(value.get()));
	}
}
 
package mia.recommender.ch06;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class UserVectorSplitterMapper
		extends
		Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

	public void map(VarLongWritable key, VectorWritable value, Context context)
			throws IOException, InterruptedException {
		long userID = key.get();
		Vector userVector = value.get();
//		Iterator<Vector.Element> it = userVector.iterateNonZero();
		Iterator<Vector.Element> it = userVector.nonZeroes().iterator();
		IntWritable itemIndexWritable = new IntWritable();
		while (it.hasNext()) {
			Vector.Element e = it.next();
			int itemIndex = e.index();
			float preferenceValue = (float) e.get();
			itemIndexWritable.set(itemIndex);
			context.write(itemIndexWritable, 
					new VectorOrPrefWritable(userID, preferenceValue));
		}
	}
}
 Technically speaking, there is no real Reducer following these two Mappers; it’s no longer possible to feed the output of two Mappers into a Reducer. Instead they’re run separately, and the output is passed through a no-op Reducer and saved in two locations.These two locations can be used as input to another MapReduce, whose Mapper does nothing as well, and whose Reducer collects together a co-occurrence column vector for an item and all users and preference values for that item into a single entity called
VectorAndPrefsWritable. ToVectorAndPrefReducer implements this; for brevity, this detail is omitted.

org.apache.mahout.cf.taste.hadoop.item.ToVectorAndPrefReducer

 

With columns of the co-occurrence matrix and user preferences in hand, both keyed by item ID, the algorithm proceeds by feeding both into a mapper that will output the product of the column and the user’s preference, for each given user ID.

  1.  Input to the mapper is all co-occurrence matrix columns and user preferences by item. For example, 590 / [22:3.0,95:1.0,...,9059:1.0,...] and 590 /[98955:1.0]
  2. The mapper outputs the co-occurrence column for each associated user times the preference value. For example, 590 / [22:3.0,95:1.0,...,9059:1.0,...]
  3. The framework collects these partial products together, by user.
  4. The reducer unpacks this input and sums all the vectors, which gives the user’s final recommendation vector (call it R). For example, 590 /[22:4.0,45:3.0,95:11.0,...,9059:1.0,...]
package mia.recommender.ch06;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class PartialMultiplyMapper
		extends
		Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable> {
	public void map(IntWritable key,
			VectorAndPrefsWritable vectorAndPrefsWritable, Context context)
			throws IOException, InterruptedException {
		Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
		List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
		List<Float> prefValues = vectorAndPrefsWritable.getValues();
		
		for (int i = 0; i < userIDs.size(); i++) {
			long userID = userIDs.get(i);
			float prefValue = prefValues.get(i);
			Vector partialProduct = cooccurrenceColumn.times(prefValue);
			context.write(new VarLongWritable(userID), 
					new VectorWritable(partialProduct));
		}
	}
}

This mapper writes a very large amount of data. For each user-item association, it outputs one copy of an entire column of the co-occurrence matrix. That’s more or less necessary; those copies must be grouped together with copies of other columns in a reducer, and summed, to produce the recommendation vector.
But one optimization comes into play at this stage: a combiner.

 

package mia.recommender.ch06;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class AggregateCombiner
		extends
		Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable> {

	public void reduce(VarLongWritable key, Iterable<VectorWritable> values,
			Context context) throws IOException, InterruptedException {
		Vector partial = null;
		for (VectorWritable vectorWritable : values) {
			partial = partial == null ? vectorWritable.get() : partial
					.plus(vectorWritable.get());
		}
		context.write(key, new VectorWritable(partial));
	}
}

 

  •  making recommendations

At last, the pieces of the recommendation vector must be assembled for each user so
that the algorithm can make recommendations.

 

package mia.recommender.ch06;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntLongHashMap;

public class AggregateAndRecommendReducer
		extends
		Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable> {

	private int recommendationsPerUser = 10;
	private OpenIntLongHashMap indexItemIDMap;
	static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
	static final String NUM_RECOMMENDATIONS = "numRecommendations";
	static final int DEFAULT_NUM_RECOMMENDATIONS = 10;

	protected void setup(Context context) throws IOException {
		Configuration jobConf = context.getConfiguration();
		recommendationsPerUser = jobConf.getInt(NUM_RECOMMENDATIONS,
				DEFAULT_NUM_RECOMMENDATIONS);
//		indexItemIDMap = TasteHadoopUtils.readItemIDIndexMap(
//				jobConf.get(ITEMID_INDEX_PATH), jobConf);
		indexItemIDMap = TasteHadoopUtils.readIDIndexMap(
				jobConf.get(ITEMID_INDEX_PATH), jobConf);
	}

	public void reduce(VarLongWritable key, Iterable<VectorWritable> values,
			Context context) throws IOException, InterruptedException {

		Vector recommendationVector = null;
		for (VectorWritable vectorWritable : values) {
			recommendationVector = recommendationVector == null ? vectorWritable
					.get() : recommendationVector.plus(vectorWritable.get());
		}

		Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(
				recommendationsPerUser + 1,
				Collections.reverseOrder(ByValueRecommendedItemComparator
						.getInstance()));

//		Iterator<Vector.Element> recommendationVectorIterator = recommendationVector
//				.iterateNonZero();
		Iterator<Vector.Element> recommendationVectorIterator = recommendationVector.nonZeroes().iterator();
		while (recommendationVectorIterator.hasNext()) {
			Vector.Element element = recommendationVectorIterator.next();
			int index = element.index();
			float value = (float) element.get();
			if (topItems.size() < recommendationsPerUser) {
				topItems.add(new GenericRecommendedItem(indexItemIDMap
						.get(index), value));
			} else if (value > topItems.peek().getValue()) {
				topItems.add(new GenericRecommendedItem(indexItemIDMap
						.get(index), value));
				topItems.poll();
			}
		}

		List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(
				topItems.size());
		recommendations.addAll(topItems);
		Collections.sort(recommendations,
				ByValueRecommendedItemComparator.getInstance());
		context.write(key, new RecommendedItemsWritable(recommendations));
	}
}

 

 

 

  • 大小: 10.6 KB
  • 大小: 12.2 KB
分享到:
评论

相关推荐

    apache-mahout-distribution-0.11.0-src.zip

    在"apache-mahout-distribution-0.11.0-src.zip"这个压缩包中,您将找到Mahout 0.11.0版本的源代码,这对于开发者和研究者来说是一个宝贵的资源,他们可以深入理解算法的内部工作原理,进行定制化开发或优化。...

    Mahout之Item-based应用使用

    在这个主题中,我们将深入探讨Mahout中的Item-based协同过滤(Item-based Collaborative Filtering)方法,这是一种在推荐系统中广泛使用的算法,用于预测用户可能对哪些项目感兴趣。 协同过滤是一种基于用户行为的...

    如何成功运行Apache Mahout的Taste Webapp-Mahout推荐教程-Maven3.0.5-JDK1.6-Mahout0.5

    在Mahout Taste Webapp工程中,需要添加对mahout-examples的依赖,这一步骤是必须的,因为示例代码提供了实际运行推荐系统所必需的组件。 6. 配置推荐引擎的属性 在Mahout Taste Webapp的recommender.properties...

    mahout-core-0.9.jar+mahout-core-0.8.jar+mahout-core-0.1.jar

    这个压缩包包含的是Mahout项目不同版本的核心库,分别是mahout-core-0.9.jar、mahout-core-0.8.jar和mahout-core-0.1.jar。这些版本的差异在于功能的完善、性能的优化以及对新特性的支持。 1. **Mahout核心功能**:...

    mahout-distribution-0.9.tar.gz

    1. **推荐系统**:Mahout提供了多种协同过滤算法,如User-Based和Item-Based推荐,以及矩阵分解技术如SVD(奇异值分解)和ALS(交替最小二乘法),这些算法常用于电商、音乐、视频等领域的个性化推荐。 2. **分类与...

    mahout:mahout-推荐-测试

    在 Mahout 中,推荐系统主要分为两种类型:基于用户的协同过滤(User-Based Collaborative Filtering)和基于物品的协同过滤(Item-Based Collaborative Filtering)。前者通过找到具有相似购买或评分历史的用户来...

    mahout-distribution-0.8-src

    在Mahout-distribution-0.8-src这个源代码包中,我们可以深入理解其内部机制,同时也为开发者提供了实现自定义机器学习模型的可能。 一、Mahout 0.8概览 Mahout 0.8 版本是该项目的一个重要里程碑,它包含了丰富的...

    apache-mahout-distribution-0.11.1-src

    Apache Mahout 项目旨在帮助开发人员更加方便快捷地创建智能应用程序。Mahout 的创始者 Grant Ingersoll 介绍了机器学习的基本概念,并演示了如何使用 Mahout 来实现文档集群、提出建议和组织内容。

    mahout-distribution-0.9含jar包

    1. **推荐系统**:Mahout提供了多种协同过滤算法,如User-Based和Item-Based,用于实现个性化推荐。这些算法能够分析用户的历史行为数据,预测他们可能感兴趣的新内容,从而提升用户体验。 2. **分类**:Mahout支持...

    mahout-distribution-0.9-src.zip

    标题中的"mahout-distribution-0.9-src.zip"指的是Mahout项目在0.9版本的源代码分布,这对于开发者来说是一个宝贵的资源,可以深入理解其内部实现并进行定制化开发。 Apache Mahout的核心特性主要体现在以下几个...

    maven_mahout_template-mahout-0.8

    《Apache Maven与Mahout实战:基于maven_mahout_template-mahout-0.8的探索》 Apache Maven是一款强大的项目管理和依赖管理工具,广泛应用于Java开发领域。它通过一个项目对象模型(Project Object Model,POM)来...

    mahout-examples-0.9-job.jar(修改版)

    重新编译mahout-examples-0.9-job.jar,增加分类指标:最小最大精度、召回率。详情见http://blog.csdn.net/u012948976/article/details/50203249

    mahout-distribution-0.7-src.zip

    2. 解压`mahout-distribution-0.7-src.zip`文件到本地目录。 3. 进入解压后的源码目录,执行`mvn clean install`命令进行编译。这会下载依赖项,构建Mahout的jar包。 4. 编译完成后,可以在`target`目录下找到编译...

    Apache_Mahout_Cookbook(高清版)

    &lt;artifactId&gt;mahout-core &lt;version&gt;0.13.0 ``` #### 三、使用Sequence Files ##### 3.1 创建Sequence Files **知识点:** - **Sequence Files**:Hadoop的二进制文件格式,适合大数据存储。 - **命令行工具**...

    mahout所需jar包

    2. **下载Mahout**:解压提供的`mahout-distribution-0.5`压缩包到本地目录。 3. **创建输入数据**:准备要进行聚类的数据,通常是以CSV或其他格式存储的数值向量。 4. **预处理数据**:如果需要,可以使用Mahout的...

    mahout-core-0.3.jar

    mahout中需要用到的一个版本jar包:mahout-core-0.3.jar

    mahout-distribution-0.5-src.zip mahout 源码包

    mahout-distribution-0.5-src.zip mahout 源码包

    mahout-integration-0.7

    mahout-integration-0.7mahout-integration-0.7mahout-integration-0.7mahout-integration-0.7

    mahout-distribution-0.12.2-src.tar.gz

    这个压缩包“mahout-distribution-0.12.2-src.tar.gz”是Mahout项目的一个源码版本,版本号为0.12.2,提供给开发者进行深度研究和定制化开发。在解压后的文件“apache-mahout-distribution-0.12.2”中,我们可以找到...

Global site tag (gtag.js) - Google Analytics