对于商品推荐模型,在上一家公司时,就当做课题进行了研究、学习,并取得一定的成果,现在就发博文贴出来,供后面再回头看看,也供大伙参考。
商品推荐模型可以分为两种:基于内容推荐、协同过滤推荐
一:基于内容的推荐
举个简单的小例子,我们已知道
用户u1喜欢的电影是A,B,C
用户u2喜欢的电影是A, C, E, F
用户u3喜欢的电影是B,D
我们需要解决的问题是:决定对u1是不是应该推荐F这部电影
基于内容的做法:要分析F的特征和u1所喜欢的A、B、C的特征,需要知道的信息是A(战争片),B(战争片),C(剧情片),如果F(战争片),那么F很大程度上可以推荐给u1,这是基于内容的做法,你需要对item进行特征建立和建模。
二:基于协同过滤的推荐
举个简单的小例子,我们已知道
用户u1喜欢的电影是A,B,C
用户u2喜欢的电影是A, C, E, F
用户u3喜欢的电影是B,D
我们需要解决的问题是:决定对u1是不是应该推荐F这部电影
协同过滤的做法:那么你完全可以忽略item的建模,因为这种办法的决策是依赖user和item之间的关系,也就是这里的用户和电影之间的关系。我们不再需要知道ABCF哪些是战争片,哪些是剧情片,我们只需要知道用户u1和u2按照item向量表示,他们的相似度比较高,那么我们可以把u2所喜欢的F这部影片推荐给u1。
基于系统过滤的推荐分为以下三类:
1.基于用户的协同过滤推荐
2.基于项目的协同过滤推荐
3.基于模型的协同过滤推荐
1.基于用户的协同过滤推荐
基于用户的协同过滤推荐算法:先使用统计技术寻找与目标用户有相同喜好的邻居。然后根据目标用户的邻居的喜好产生向目标用户的推荐。
基本原理就是利用用户访问行为的相似性来互相推荐用户可能感兴趣的资源,如下图:
上图示意出基于用户的协同过滤推荐机制的基本原理
假设用户 A 喜欢物品 A、物品 C,用户 B 喜欢物品 B,用户 C 喜欢物品 A 、物品 C 和物品 D;那么从用户的历史喜好信息中,我们可以发现用户A和用户C的口味和偏好是比较类似的,同时C还喜欢物品D,那么我们可以推断用户A可能也喜欢物品D,因此可以将物品 D 推荐给用户 A。
2.基于项目的协同过滤推荐
根据所有用户对物品或者信息的评价,发现物品和物品之间的相似度,然后根据用户的历史偏好信息将类似的物品推荐给该用户
上图表明基于项目的协同过滤推荐的基本原理:用户A喜欢物品A和物品C,用户B喜欢物品A、物品B和物品C,用户C喜欢物品A,从这些用户的历史喜好中可以认为物品A与物品C比较类似,喜欢物品A的都喜欢物品C,基于这个判断用户C可能也喜欢物品C,所以推荐系统将物品C推荐给用户C。
3.基于模型的协同过滤推荐
基于模型的协同过滤推荐就是基于样本的用户喜好信息,训练一个推荐模型,然后根据实时的用户喜好的信息进行预测,计算推荐。
基于协同过滤的推荐机制是现今应用最为广泛的推荐机制,它有以下几个显著的优点:
它不需要对物品或者用户进行严格的建模,而且不要求物品的描述是机器可理解的,所以这种方法也是领域无关的。
这种方法计算出来的推荐是开放的,可以共用他人的经验,很好的支持用户发现潜在的兴趣偏好。
它也存在以下几个问题:
a.方法的核心是基于历史数据,所以对新物品和新用户都有“冷启动”的问题。
物品冷启动就是指无法将新加入的物品推荐给相关用户的问题。
用户冷启动就是指无法给新加入的用户进行推荐的问题
b.推荐的效果依赖于用户历史偏好数据的多少和准确性。
c.在大部分的实现中,用户历史偏好是用稀疏矩阵进行存储的,而稀疏矩阵上的计算有些明显的问题,包括可能少部分人的错误偏好会对推荐的准确度有很大的影响等等。
d.对于一些特殊品味的用户不能给予很好的推荐。
e.由于以历史数据为基础,抓取和建模用户的偏好后,很难修改或者根据用户的使用演变,从而导致这个方法不够灵活。
主要流程图如下:
Spark基于协同过滤的推荐主要有以下步骤:
1.准备样本数据(数据来源)
一般情况下,推荐引擎所需要的数据源包括:
a.要推荐物品或内容的元数据,例如关键字,基因描述等;
b.系统用户的基本信息,例如性别,年龄等
c.用户对物品或者信息的偏好,根据应用本身的不同,可能包括用户对物品的评分,用户查看物品的记录,用户的购买记录等。
其实这些用户的偏好信息可以分为两类:
显式的用户反馈:这类是用户在网站上自然浏览或者使用网站以外,显式的提供反馈信息,例如用户对物品的评分,或者对物品的评论。
隐式的用户反馈:这类是用户在使用网站时产生的数据,隐式的反应了用户对物品的喜好,例如用户购买了某物品,用户查看了某物品的信息等等。
抽取的样本数据实例(数据来源与某个网站的电影信息)
0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312
0::12::2::1424380312
0::15::1::1424380312
0::17::1::1424380312
0::19::1::1424380312
0::21::1::1424380312
0::23::1::1424380312
0::26::3::1424380312
以::为分隔符,第一列为用户的id,第二列为商品的id,第三列为用户对于商品的评价,第四列为时间戳
2).建立推荐模型
val model = ALS.train(ratings, ranking, numIterations)
其中:
Ratings:为样本数据的RDD
Ranking:是模型中隐语义因子的个数。
Numiterations:是迭代次数
3).为用户推荐商品
model.recommendProducts(userId, numRecommender).toList
其中:
userId :为用户的id
numRecommerder:为推荐的前n的商品的个数
虽然上述理论和算法都是商品推荐的,但是我这里实际上是拿了部分电影信息和用户信息编写的代码,实现了基于电影推荐、基于用户推荐两种情况,话不多说,直接贴出代码:
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.Rating
import scala.collection.mutable.HashMap
import scala.collection.mutable.ListBuffer
import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.evaluation.RankingMetrics
/**
* 推荐模型训练
*/
class RecommenderModelTrain extends Serializable {
/**
* 求其余弦相似度
*/
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
/**
* 得到用户因子和物品因子
*/
def obtainFeatures(model: MatrixFactorizationModel) = {
val userFeatures = model.userFeatures;
val productFeatures = model.productFeatures;
(userFeatures, productFeatures)
}
/**
* 求某个物品与各个物品的余弦相似度
*/
def productCosineSimilarity(model: MatrixFactorizationModel, itemId: Int) = {
val itemFactor = model.productFeatures.lookup(itemId).head;
val itemVector = new DoubleMatrix(itemFactor);
val sims = model.productFeatures.map {
case (id, factor) => {
val factorVector = new DoubleMatrix(factor);
val sim = this.cosineSimilarity(factorVector, itemVector)
(id, sim)
}
}
sims;
}
/**
* 求某个用户与各个用户的余弦相似度
*/
def userCosineSimilarity(model: MatrixFactorizationModel, userId: Int) = {
val userFactor = model.userFeatures.lookup(userId).head;
val userVector = new DoubleMatrix(userFactor);
val userSims = model.userFeatures.map {
case (id, factor) => {
val factorVector = new DoubleMatrix(factor);
val sim = cosineSimilarity(factorVector, userVector)
(id, sim)
}
}
userSims
}
/**
* 求与某个物品相似的前N的物品
*/
def similarProduct(model: MatrixFactorizationModel, itemId: Int, N: Int = 10) = {
val sims = this.productCosineSimilarity(model, itemId);
val sortedSims = sims.top(N)(Ordering.by { x => x._2})
sortedSims
}
/**
* 求与某个用户相似的前N个用户
*/
def similarUser(model: MatrixFactorizationModel, userId: Int, N: Int = 10) = {
val userSims = this.userCosineSimilarity(model, userId);
val sortedSims = userSims.top(N)(Ordering.by { x => x._2})
sortedSims
}
/**
* 给单个用户推荐商品
*/
def predictByUserMoive(userId: Int,
model: MatrixFactorizationModel,
numRecommender: Int = 10) = {
val map = new ListBuffer[(String, List[Rating])]()
var rs = model.recommendProducts(userId, numRecommender).toList
val tuple = (userId.toString(), rs)
map += tuple;
map;
}
/**
* 为所有用户推荐商品
*/
def predictAllUserProduct(users: ArrayBuffer[Int],
model: MatrixFactorizationModel,
numRecommender: Int = 10) = {
val map = new ListBuffer[(Int, List[Rating])]()
//给用户推荐电影
users.foreach { userId => {
//为用户推荐电影
var rs = model.recommendProducts(userId, numRecommender).toList
val tuple = (userId, rs)
map += tuple;
}
}
map;
}
/**
* 为所有用户推荐商品
*/
def predictUserProduct(users: List[Int],
model: MatrixFactorizationModel,
numRecommender: Int = 10) = {
val map = new ListBuffer[(Int, List[Rating])]()
//给用户推荐电影
users.foreach { userId => {
//为用户推荐电影
var rs = model.recommendProducts(userId, numRecommender).toList
val tuple = (userId, rs)
map += tuple;
}
}
map;
}
/**
* 预测数据
*/
def predictMoive(sc: SparkContext,
model: MatrixFactorizationModel,
path: String,
numRecommender: Int = 10) = {
//读取需要电影推荐的用户数据 "E:/data/sample_movielens_ratings.txt"
val userData = sc.textFile(path)
.map { x => {
val arr = x.split("::");
arr(0)
}
}
.distinct();
val map = new ListBuffer[(String, List[Rating])]()
//给用户推荐电影
userData.collect().foreach { userId => {
//为用户推荐电影
var rs = model.recommendProducts(userId.toInt, numRecommender).toList
// val recommendations = model.predict(candidates.map((30, _)))
// .collect
// .sortBy(-_.rating)
// .take(10)
val tuple = (userId, rs)
map += tuple;
}
}
map;
}
/**
* 查看用户一共评价了多少部电影
*/
def findMoiveForUser(ratingRdd: RDD[Rating], userId: Int) = {
val moivesForUser = ratingRdd.keyBy { x => x.user}.lookup(userId);
moivesForUser
}
/**
* 模型评估 计算均方差
* 均方差=各个平方误差的和/总数
* 那么 平方误差 = (预测到的评级 - 真实评级)*(预测到的评级 - 真实评级)
* 通过演变 均方差= (预测到的评级 - 真实评级)*(预测到的评级 - 真实评级)+(预测到的评级 - 真实评级)*(预测到的评级 - 真实评级)+.../总的评级次数
* 那么均方根误差 = 均方差 开平方
*/
def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) = {
//使用训练数据训练模型
val usersProducets = ratings.map(r => r match {
case Rating(user, product, rate) => (user, product)
})
//预测数据
val predictions = model.predict(usersProducets).map(u => u match {
case Rating(user, product, rate) => ((user, product), rate)
})
//将真实分数与预测分数进行合并
val ratesAndPreds = ratings.map(r => r match {
case Rating(user, product, rate) =>
((user, product), rate)
}).join(predictions)
//计算均方差
// val MSE = ratesAndPreds.map(r => r match {
// case ((user, product), (r1, r2)) =>
// var err = (r1 - r2)
// err * err
// }).mean()
//另外一种计算均方差的方式
//首先求得 平方误差和
val squareErrorTotal = ratesAndPreds.map(data => {
val userId = data._1._1;
val moiveId = data._1._2;
val actualRating = data._2._1;
val predictRating = data._2._2;
val diff = predictRating - actualRating;
val squareError = Math.pow(diff, 2.0);
squareError
})
.reduce(_ + _);
val MSE = squareErrorTotal / ratesAndPreds.count();
//计算均方根误差
val sqrtMSE = Math.sqrt(MSE);
//返回出均方差值
sqrtMSE
}
/**
* 查看单个用户的模型评级
*/
def evaluateMode(userId: Int,
ratingRdd: RDD[Rating],
model: MatrixFactorizationModel) = {
val moiveForUser = this.findMoiveForUser(ratingRdd, userId);
//得到该用户的第一个真实评级
val actualRating = moiveForUser.take(1)(0);
println(actualRating.rating) //打印第一条真实评级
//求得模型的预计评级
val predictRating = model.predict(userId, actualRating.product);
println(predictRating) //打印这个预测评级
//最后计算平方误差
val diff = predictRating - actualRating.rating;
val squareError = Math.pow(diff, 2.0)
println(squareError)
}
/**
* 计算k值平均准确率(MAPK)
* actual 为真实的评级数据
* predicted 模型所预测的评级数据
*/
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
val prek = predicted.take(k);
var score = 0.0;
var numHits = 0.0;
val zipPrek = prek.zipWithIndex;
for ((p, i) <- zipPrek) {
if (actual.contains(p)) {
numHits += 1.0;
score += numHits / (i.toDouble + 1.0)
}
}
if (actual.isEmpty) {
score = 1.0;
} else {
score = score / Math.min(actual.size, k).toDouble
}
score;
}
/**
* 用k值平均准确率来评估模型
* 这是单个用户的实验
*/
def evaluateModeByMAPK(userId: Int,
ratingRdd: RDD[Rating],
model: MatrixFactorizationModel) = {
//得到用户实际评级过的电影id
val moivesForUser = this.findMoiveForUser(ratingRdd, userId)
val actualMovies = moivesForUser.map { x => x.product};
//然后得到推荐的物品列表
val predictedMoives = model.recommendProducts(userId, 10)
.toSeq
.map { x => x.product}
//然后来计算平均准确率
val MAPK = this.avgPrecisionK(actualMovies, predictedMoives, 10);
println("MAPK=" + MAPK)
}
/**
* 用K值平均准确率来评估模型
* 用MAPK作为评估推荐模型时:
* 那么 每个用户将相当于一个查询,前K个推荐物组成的集合则相当于一个查到的文档结果集合
* 用户对电影的实际评级便对应着文档的实际相关性
* 这样MAPK衡量的是 模型对用户感兴趣和会去接触的物品的 预测能力
*/
def evaluateModeByMAPK(ratingRdd: RDD[Rating],
model: MatrixFactorizationModel,
sc: SparkContext) = {
val itemFactors = model.productFeatures.map(data => {
val factor = data._2;
factor
}).collect();
val itemMatrix = new DoubleMatrix(itemFactors);
val imBroadcast = sc.broadcast(itemMatrix);
/**
* 计算 每个用户将相当于一个查询,前K个推荐物组成的集合则相当于一个查到的文档结果集合
* 得到 每个用户的 前K个推荐物组成的集合
*/
val allRecs = model.userFeatures.map(data => {
val userId = data._1;
val userVector = new DoubleMatrix(data._2);
val scores = imBroadcast.value.mmul(userVector);
val sortedScores = scores.data.zipWithIndex.sortBy(-_._1);
val recommendedIds = sortedScores.map(x => x._2 + 1).toSeq;
(userId, recommendedIds)
});
val userMoives = ratingRdd.map { data => {
val userId = data.user;
val productId = data.product;
val rating = data.rating;
(userId, productId)
}
}
.groupBy(data => data._1);
val k = 2000;
val MAPK = allRecs.join(userMoives)
.map(data => {
val userId = data._1;
val predicted = data._2._1;
val actualWithed = data._2._2;
val actual = actualWithed.map(_._2).toSeq
avgPrecisionK(actual, predicted, k);
})
.reduce(_ + _) / allRecs.count();
// println("MAPK="+MAPK)
MAPK
}
/**
* 通过Spark自带的函数对模型进行评估
* 该方法主要是计算均方差 和 均方根误差
*/
def evaluateModeWithFunctionSeekMSE(ratings: RDD[Rating], model: MatrixFactorizationModel) = {
//使用训练数据训练模型
val usersProducets = ratings.map(r => r match {
case Rating(user, product, rate) => (user, product)
})
//预测数据
val predictions = model.predict(usersProducets).map(u => u match {
case Rating(user, product, rate) => ((user, product), rate)
})
//将真实分数与预测分数进行合并
val ratesAndPreds = ratings.map(r => r match {
case Rating(user, product, rate) =>
((user, product), rate)
}).join(predictions)
val predictedAndTrue = ratesAndPreds.map(data => {
val actual = data._2._1;
val predict = data._2._2;
(predict, actual)
});
val regressionMetrics = new RegressionMetrics(predictedAndTrue);
//打印均方差
val MSE = regressionMetrics.meanSquaredError;
//得到均方根误差
val sqrtMSE = regressionMetrics.rootMeanSquaredError;
println(MSE, sqrtMSE);
}
/**
* 通过Spark自带的函数对模型进行评估
* 该方法主要是计算 K值平均准确率
*/
def evaluateModeWithFunctionSeekMAPK(ratingRdd: RDD[Rating],
model: MatrixFactorizationModel,
sc: SparkContext) = {
val itemFactors = model.productFeatures.map(data => {
val factor = data._2;
factor
}).collect();
val itemMatrix = new DoubleMatrix(itemFactors);
val imBroadcast = sc.broadcast(itemMatrix);
/**
* 计算 每个用户将相当于一个查询,前K个推荐物组成的集合则相当于一个查到的文档结果集合
* 得到 每个用户的 前K个推荐物组成的集合
*/
val allRecs = model.userFeatures.map(data => {
val userId = data._1;
val userVector = new DoubleMatrix(data._2);
val scores = imBroadcast.value.mmul(userVector);
val sortedScores = scores.data.zipWithIndex.sortBy(-_._1);
val recommendedIds = sortedScores.map(x => x._2 + 1).toSeq;
(userId, recommendedIds)
});
val userMoives = ratingRdd.map { data => {
val userId = data.user;
val productId = data.product;
val rating = data.rating;
(userId, productId)
}
}
.groupBy(data => data._1);
val predictAndTrue = allRecs.join(userMoives)
.map(data => {
val userId = data._1;
val predict = data._2._1.toArray;
val actual = data._2._2.map(_._2).toArray;
(predict, actual)
})
val rankingMetrics = new RankingMetrics(predictAndTrue);
println(rankingMetrics.meanAveragePrecision);
}
}
import scala.collection.mutable.HashMap
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Get
import javafx.scene.control.cell.CellUtils
import org.apache.hadoop.hbase.CellUtil
import scala.collection.mutable.ListBuffer
/**
* 推荐模型表的相关hbase操作
*/
class RecommenderDao(zk_quorum : String,zk_port : String) extends BaseOperator(zk_quorum,zk_port){
def save(dataMap : RDD[(String,List[Rating])]) {
val putRdd = dataMap.map(data => {
val rowkey = data._1;
val values = data._2;
var strValue = "";
for(rating <- values){
val product = rating.product;
val rate = rating.rating;
strValue = strValue + product + ":" + rate + ",";
}
strValue = strValue.substring(0, strValue.length()-1);
val put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("product"), Bytes.toBytes(strValue));
(new ImmutableBytesWritable,put)
})
this.save(putRdd, "recommender");
}
/**
* 查询全部用户的
*/
def query() = {
val configuration = this.getConfiguration();
// val conn = this.getConnection();
// val table = conn.getTable(TableName.valueOf(tableName))
val table = new HTable(configuration,"recommender");
val scan = new Scan();
val resultScanner = table.getScanner(scan);
val iter = resultScanner.iterator();
while(iter.hasNext()){
val rs = iter.next();
val user = Bytes.toString(rs.getRow);
val product = Bytes.toString(rs.getValue(Bytes.toBytes("cf"), Bytes.toBytes("product")));
println(user + "=" + product);
}
table.close();
// conn.close();
}
/**
* 根据电影Id查询他类似的电影
*/
def querySimMoiveByMoiveId(moiveId : String) = {
val configuration = this.getConfiguration();
val table = new HTable(configuration,"moive_sim");
val get = new Get(Bytes.toBytes(moiveId));
val result = table.get(get);
//得到电影名称
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));
//得到与其电影相似的电影
val cells = result.rawCells();
val simMoiveIdsList = new ListBuffer[String]();
for(cell <- cells){
val family = Bytes.toString(CellUtil.cloneFamily(cell));
if("moive".equals(family)){
val simMoiveId = Bytes.toString(CellUtil.cloneQualifier(cell));
simMoiveIdsList += simMoiveId;
}
}
val simMoiveList = new ListBuffer[(String,String,Double)]();
simMoiveIdsList.foreach { simMoiveId => {
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("moive"), Bytes.toBytes(simMoiveId)));
val moiveRating = Bytes.toDouble(result.getValue(Bytes.toBytes("rating"), Bytes.toBytes(simMoiveId)))
val tuple = (simMoiveId,moiveName,moiveRating);
simMoiveList +=tuple;
} }
table.close();
simMoiveList.sortBy(tuple => -tuple._3)
}
/**
* 查询所有的电影的类似电影
*/
def querySimMoiveByAll() = {
val configuration = this.getConfiguration();
val table = new HTable(configuration,"moive_sim");
val scan = new Scan();
val resultScanner = table.getScanner(scan);
val iter = resultScanner.iterator();
val simAllMoiveMap = new HashMap[String,ListBuffer[(String,String,Double)]]();
while(iter.hasNext()){
val result = iter.next();
val movieId = Bytes.toString(result.getRow);
//得到电影名称
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("basic"), Bytes.toBytes("name")));
//得到与其电影相似的电影
val cells = result.rawCells();
val simMoiveIdsList = new ListBuffer[String]();
for(cell <- cells){
val family = Bytes.toString(CellUtil.cloneFamily(cell));
if("moive".equals(family)){
val simMoiveId = Bytes.toString(CellUtil.cloneQualifier(cell));
simMoiveIdsList += simMoiveId;
}
}
val simMoiveList = new ListBuffer[(String,String,Double)]();
simMoiveIdsList.foreach { simMoiveId => {
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("moive"), Bytes.toBytes(simMoiveId)));
val moiveRating = Bytes.toDouble(result.getValue(Bytes.toBytes("rating"), Bytes.toBytes(simMoiveId)))
val tailTuple = (simMoiveId,moiveName,moiveRating);
simMoiveList += tailTuple;
} }
simAllMoiveMap.put(movieId, simMoiveList.sortBy(tuple => -tuple._3))
}
table.close();
simAllMoiveMap;
}
/**
* 根据用户Id查询给他推荐的电影
*/
def queryPredictMoiveByUserId(userId : String) = {
val configuration = this.getConfiguration();
val table = new HTable(configuration,"moive_recommender");
val get = new Get(Bytes.toBytes(userId));
val result = table.get(get);
//得到给其推荐的电影
val cells = result.rawCells();
val predictMoiveIdsList = new ListBuffer[String]();
for(cell <- cells){
val family = Bytes.toString(CellUtil.cloneFamily(cell));
if("moive".equals(family)){
val predictMoiveId = Bytes.toString(CellUtil.cloneQualifier(cell));
predictMoiveIdsList += predictMoiveId;
}
}
val predictMoiveList = new ListBuffer[(String,String,Double)]();
predictMoiveIdsList.foreach { predictMoiveId => {
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("moive"), Bytes.toBytes(predictMoiveId)));
val moiveRating = Bytes.toDouble(result.getValue(Bytes.toBytes("rating"), Bytes.toBytes(predictMoiveId)))
val tuple = (predictMoiveId,moiveName,moiveRating);
predictMoiveList +=tuple;
} }
table.close();
predictMoiveList.sortBy { tuple => -tuple._3};
}
/**
* 查询所有用户的推荐电影
*/
def queryPredictMoiveByAll() = {
val configuration = this.getConfiguration();
val table = new HTable(configuration,"moive_recommender");
val scan = new Scan();
val resultScanner = table.getScanner(scan);
val iter = resultScanner.iterator();
val predictAllMoiveMap = new HashMap[String,ListBuffer[(String,String,Double)]]();
while(iter.hasNext()){
val result = iter.next();
val userId = Bytes.toString(result.getRow);
//得到给其推荐的电影
val cells = result.rawCells();
val predictMoiveIdsList = new ListBuffer[String]();
for(cell <- cells){
val family = Bytes.toString(CellUtil.cloneFamily(cell));
if("moive".equals(family)){
val predictMoiveId = Bytes.toString(CellUtil.cloneQualifier(cell));
predictMoiveIdsList += predictMoiveId;
}
}
val predictMoiveList = new ListBuffer[(String,String,Double)]();
predictMoiveIdsList.foreach { predictMoiveId => {
val moiveName = Bytes.toString(result.getValue(Bytes.toBytes("moive"), Bytes.toBytes(predictMoiveId)));
val moiveRating = Bytes.toDouble(result.getValue(Bytes.toBytes("rating"), Bytes.toBytes(predictMoiveId)))
val tuple = (predictMoiveId,moiveName,moiveRating);
predictMoiveList +=tuple;
} }
predictAllMoiveMap.put(userId, predictMoiveList.sortBy(tuple => -tuple._3))
}
table.close();
predictAllMoiveMap
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
/**
* 用户推荐计算.
* 根据物品相似度、用户评分、指定最大推荐数量进行用户推荐
*/
class RecommendedItem {
/**
* 用户推荐计算.
* @param items_similar 物品相似度
* @param user_prefer 用户评分
* @param r_number 推荐数量
* @param RDD[UserRecomm] 返回用户推荐物品
*
*/
def Recommend(items_similar: RDD[ItemSimi],
user_prefer: RDD[ItemPref],
r_number: Int): (RDD[UserRecomm]) = {
// 0 数据准备
val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
// 1 矩阵计算——i行与j列join
val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
join(user_prefer1.map(f => (f._2, (f._1, f._3))))
// 2 矩阵计算——i行与j列元素相乘
val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
// 3 矩阵计算——用户:元素累加求和
val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
// 4 矩阵计算——用户:对结果过滤已有I2
val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
// 5 矩阵计算——用户:用户对结果排序,过滤
val rdd_app1_R6 = rdd_app1_R5.groupByKey()
val rdd_app1_R7 = rdd_app1_R6.map(f => {
val i2 = f._2.toBuffer
val i2_2 = i2.sortBy(_._2)
if (i2_2.length > r_number) i2_2.remove(0, (i2_2.length - r_number))
(f._1, i2_2.toIterable)
})
val rdd_app1_R8 = rdd_app1_R7.flatMap(f => {
val id2 = f._2
for (w <- id2) yield (f._1, w._1, w._2)
})
rdd_app1_R8.map(f => UserRecomm(f._1, f._2, f._3))
}
/**
* 用户推荐计算.
* @param items_similar 物品相似度
* @param user_prefer 用户评分
* @param RDD[UserRecomm] 返回用户推荐物品
*
*/
def Recommend(items_similar: RDD[ItemSimi],
user_prefer: RDD[ItemPref]): (RDD[UserRecomm]) = {
// 0 数据准备
val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
// 1 矩阵计算——i行与j列join
val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
join(user_prefer1.map(f => (f._2, (f._1, f._3))))
// 2 矩阵计算——i行与j列元素相乘
val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
// 3 矩阵计算——用户:元素累加求和
val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
// 4 矩阵计算——用户:对结果过滤已有I2
val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
// 5 矩阵计算——用户:用户对结果排序,过滤
val rdd_app1_R6 = rdd_app1_R5.map(f => (f._1, f._2._1, f._2._2)).
sortBy(f => (f._1, f._3))
rdd_app1_R6.map(f => UserRecomm(f._1, f._2, f._3))
}
}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.chinasoft.training.RecommenderModelTrain
import org.apache.spark.mllib.recommendation.ALS
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.chinasoft.dao.hbase.RecommenderDao
/**
* 我的电影推荐系统_客户端入口
*/
object MyMoiveRecommenderClient {
def main(args : Array[String]){
//屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
//设置运行环境
val sparkConf = new SparkConf().setAppName("MyMoiveRecommenderClient")
sparkConf.setMaster("local");
val sc = new SparkContext(sparkConf)
//得到用户数据
val userDataRdd = obtainUserData("E:/data/ml-100k/u.user", sc);
val userMap = userDataRdd.collectAsMap();
//得到电影数据
val itemDataRdd = obtainMoiveData("E:/data/ml-100k/u.item", sc);
val itemMap = itemDataRdd.collectAsMap();
//得到评价数据
val ratingRdd = obtainRatingData("E:/data/ml-100k/u.data", sc);
//得到推荐模型
val recommenderModel = buildRecommenderModel(ratingRdd,sc).get;
//将789这位用户评价的 和 模型推荐 做一个对比
// this.contrastGrade(ratingRdd, recommenderModel, itemMap, 789);
//查看得到789单个用户的平方误差
val train = new RecommenderModelTrain();
// train.evaluateMode(789, ratingRdd, recommenderModel)
//查看得到789单个用户的 K值平均率
// train.evaluateModeByMAPK(790, ratingRdd, recommenderModel);
train.evaluateModeWithFunctionSeekMSE(ratingRdd, recommenderModel);
train.evaluateModeWithFunctionSeekMAPK(ratingRdd, recommenderModel, sc);
val dao = new RecommenderDao("host1,host2,host3","2181");
//得到每部电影相类似的电影,然后保存到HBase中
// val simItemPutRdd = findSimMoive(recommenderModel, sc, itemMap);
// dao.save(simItemPutRdd, "moive_sim");
//为每个用户推荐电影,然后保存到HBase中
// val userMoivePutRdd = predictMoive(recommenderModel, userMap, itemMap, sc);
// dao.save(userMoivePutRdd, "moive_recommender");
//得到与每个用户相类似的用户,然后保存到hbase中
// val simUser = this.findSimUser(recommenderModel, sc, userMap);
sc.stop();
}
/**
* 得到用户数据,从用户信息中加载
*/
def obtainUserData(path:String,sc:SparkContext) = {
//得到用户数据
val userRdd = sc.textFile(path);
val userDataRdd = userRdd.map { userInfo => {
val users = userInfo.split("\\|");
val userId = users(0).toInt;
val age = users(1).toInt;
val sex = users(2);
val zhiye = users(3);
val zipCode = users(4);
val subTuple = (age,sex,zhiye,zipCode)
val tuple = (userId,subTuple)
tuple
} }.cache();
userDataRdd;
}
/**
* 得到电影数据,从电影信息中加载
*/
def obtainMoiveData(path:String,sc:SparkContext) = {
//得到电影数据
val itemRdd = sc.textFile(path); //
val itemDataRdd = itemRdd.map { itemInfo => {
val items = itemInfo.split("\\|");
val itemId = items(0).toInt;
val itemName = items(1);
(itemId,itemName)
} }.cache();
itemDataRdd;
}
/**
* 得到评价数据,从评价数据中获取
*/
def obtainRatingData(path:String,sc:SparkContext) = {
val dataRdd = sc.textFile(path) //
val ratingRdd = dataRdd.map { data => {
val datas = data.split(" ");
val userId = datas(0).toInt;
val itemId = datas(1).toInt;
val rating = datas(2).toDouble
Rating(userId,itemId,rating)
} }
ratingRdd;
}
/**
* 构造推荐模型
*/
def buildRecommenderModel(ratingRdd : RDD[Rating],sc:SparkContext) = {
val ranks = List(8, 10)
val lambdas = List(0.01, 10.0)
val numIters = List(8, 10)
var baseModel: Option[MatrixFactorizationModel] = None;
var baseMean = Double.MaxValue
var beseRank = 0
var beseLambda = -1.0
var beseNumIter = -1
//使用ALS建立推荐模型
//也可以使用简单模式 val model = ALS.train(ratings, ranking, numIterations)
val train = new RecommenderModelTrain();
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model: MatrixFactorizationModel = new ALS().setRank(rank)
.setIterations(numIter)
.setLambda(lambda)
.run(ratingRdd)
val mean = train.evaluateMode(ratingRdd, model);
val MAPK = train.evaluateModeByMAPK(ratingRdd, model ,sc);
println("MSE="+mean , "MAPK="+MAPK);
if (mean < baseMean) {
baseMean = mean;
baseModel = Some(model);
}
}
baseModel;
}
/**
* 找到每部电影的类似的电影
*/
def findSimMoive(recommenderModel : MatrixFactorizationModel,
sc : SparkContext,
itemMap : scala.collection.Map[Int,String]) = {
val train = new RecommenderModelTrain();
//得到每部电影相类似的电影
val simItemMap = itemMap.map(tuple => {
val sims = train.similarProduct(recommenderModel, tuple._1 , 10);
(tuple._1,sims);
})
val simItemRdd = sc.parallelize(simItemMap.toList)
//将每部电影的类似电影保存到hbase中
val simItemPutRdd = simItemRdd.map(data => {
val moiveId = data._1;
val moiveName = itemMap.getOrElse(moiveId, "");
val simMoiveArr = data._2;
val put = new Put(Bytes.toBytes(moiveId+""));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), Bytes.toBytes(moiveName));
simMoiveArr.foreach(tuple => {
val simMoiveId = tuple._1;
val simMoiveName = itemMap.getOrElse(simMoiveId, "");
val rating = tuple._2;
put.add(Bytes.toBytes("moive"), Bytes.toBytes(simMoiveId+""), Bytes.toBytes(simMoiveName));
put.add(Bytes.toBytes("rating"), Bytes.toBytes(simMoiveId+""), Bytes.toBytes(rating));
})
(new ImmutableBytesWritable,put);
})
simItemPutRdd
}
/**
* 找到与每个用户相类似的用户
*/
def findSimUser(recommenderModel : MatrixFactorizationModel,
sc : SparkContext,
userMap : scala.collection.Map[Int,(Int,String,String,String)])={
val train = new RecommenderModelTrain();
val sims = train.similarUser(recommenderModel, 1, 10);
sims.foreach(x => println(x))
}
/**
* 为每个用户推荐电影
*/
def predictMoive(recommenderModel : MatrixFactorizationModel,
userMap : scala.collection.Map[Int,(Int,String,String,String)],
itemMap : scala.collection.Map[Int,String],
sc : SparkContext) = {
val train = new RecommenderModelTrain();
val users = userMap.map(tuple => tuple._1).toList
val predictData = train.predictUserProduct(users, recommenderModel, 10)
val predictDataRdd = sc.parallelize(predictData);
val putRdd = predictDataRdd.map(data => {
val userId = data._1;
val moives = data._2;
val userBasicInfo = userMap.getOrElse(userId, (0,"","",""));
val put = new Put(Bytes.toBytes(userId+""));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("age"), Bytes.toBytes(userBasicInfo._1));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("sex"), Bytes.toBytes(userBasicInfo._2));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("occup"), Bytes.toBytes(userBasicInfo._3));
put.add(Bytes.toBytes("basic"), Bytes.toBytes("code"), Bytes.toBytes(userBasicInfo._4));
moives.foreach { rating => {
val moiveId = rating.product;
val moiveName = itemMap.getOrElse(moiveId, "");
val moiveRating = rating.rating;
put.add(Bytes.toBytes("moive"), Bytes.toBytes(moiveId+""), Bytes.toBytes(moiveName));
put.add(Bytes.toBytes("rating"), Bytes.toBytes(moiveId+""), Bytes.toBytes(moiveRating));
} }
(new ImmutableBytesWritable,put)
})
putRdd;
}
/**
* 将某个用户的评级 与 实际推荐的电影做一个对比
* 看看其效果如何
*/
def contrastGrade(ratingRdd : RDD[Rating],
recommenderModel : MatrixFactorizationModel,
itemMap : scala.collection.Map[Int,String],
userId : Int) = {
//首先得到 其 用户的评级数据 取前10个
val train = new RecommenderModelTrain();
val moivesForUser = train.findMoiveForUser(ratingRdd, userId);
println(moivesForUser.size)
//将用户评价的前十的电影打印出来
moivesForUser.sortBy { x => -x.rating }
.take(10)
.map { rating => {
val moiveId = rating.product;
val rate = rating.rating;
val moiveName = itemMap.getOrElse(moiveId, "");
(moiveName,rate);
}}
.foreach(x => println(x));
println("------------------------");
//得到模型给用户推荐的电影
val predictMoive = train.predictByUserMoive(userId, recommenderModel, 10);
predictMoive.foreach(data => {
val list = data._2;
list.foreach { rating => {
val moiveId = rating.product;
val rate = rating.rating;
val moiveName = itemMap.getOrElse(moiveId, "");
println(moiveName,rate)
} }
})
}
}
import scala.math._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
/**
* 用户评分.
* @param userid 用户
* @param itemid 评分物品
* @param pref 评分
*/
case class ItemPref(
val userid: String,
val itemid: String,
val pref: Double) extends Serializable
/**
* 用户推荐.
* @param userid 用户
* @param itemid 推荐物品
* @param pref 评分
*/
case class UserRecomm(
val userid: String,
val itemid: String,
val pref: Double) extends Serializable
/**
* 相似度.
* @param itemid1 物品
* @param itemid2 物品
* @param similar 相似度
*/
case class ItemSimi(
val itemid1: String,
val itemid2: String,
val similar: Double) extends Serializable
/**
* 相似度计算.
* 支持:同现相似度、欧氏距离相似度、余弦相似度
*
*/
class ItemSimilarity extends Serializable {
/**
* 相似度计算.
* @param user_rdd 用户评分
* @param stype 计算相似度公式
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def Similarity(user_rdd: RDD[ItemPref], stype: String): (RDD[ItemSimi]) = {
val simil_rdd = stype match {
case "cooccurrence" =>
ItemSimilarity.CooccurrenceSimilarity(user_rdd)
case "cosine" =>
ItemSimilarity.CosineSimilarity(user_rdd)
case "euclidean" =>
ItemSimilarity.EuclideanDistanceSimilarity(user_rdd)
case _ =>
ItemSimilarity.CooccurrenceSimilarity(user_rdd)
}
simil_rdd
}
}
object ItemSimilarity {
/**
* 同现相似度矩阵计算.
* w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
* @param user_rdd 用户评分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def CooccurrenceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
// 0 数据做准备
val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
val user_rdd2 = user_rdd1.map(f => (f._1, f._2))
// 1 (用户:物品) 笛卡尔积 (用户:物品) => 物品:物品组合
val user_rdd3 = user_rdd2.join(user_rdd2)
val user_rdd4 = user_rdd3.map(f => (f._2, 1))
// 2 物品:物品:频次
val user_rdd5 = user_rdd4.reduceByKey((x, y) => x + y)
// 3 对角矩阵
val user_rdd6 = user_rdd5.filter(f => f._1._1 == f._1._2)
// 4 非对角矩阵
val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
// 5 计算同现相似度(物品1,物品2,同现频次)
val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).
join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1,
f._2._1._2, f._2._1._3, f._2._2)))
val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
val user_rdd12 = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
// 6 结果返回
val user_rdd13 = user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
user_rdd13
}
/**
* 同现相似度矩阵计算.
* w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
* @param user_rdd 用户评分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def cooccurrenceSimilarity(user_rdd: RDD[ItemPref]) = {
// 0 数据做准备
val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
val user_rdd2 = user_rdd1.map(f => (f._1, f._2))
// 1 (用户:物品) 笛卡尔积 (用户:物品) => 物品:物品组合
val user_rdd3 = user_rdd2.join(user_rdd2)
user_rdd3.foreach(x => println(x))
println("--------------------------")
val user_rdd4 = user_rdd3.map(f => (f._2, 1))
// 2 物品:物品:频次
println("得到物品与物品之间发生的频次")
val user_rdd5 = user_rdd4.reduceByKey((x, y) => x + y);
user_rdd5.foreach(x => println(x))
// 3 对角矩阵
val user_rdd6 = user_rdd5.filter(f => f._1._1 == f._1._2)
// 4 非对角矩阵
val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2)))
.foreach(x => println(x))
println("user_rdd8")
// 5 计算同现相似度(物品1,物品2,同现频次)
val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).
join(user_rdd6.map(f => (f._1._1, f._2)))
user_rdd8.foreach(x => println(x))
val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1,
f._2._1._2, f._2._1._3, f._2._2)))
println("user_rdd9=")
user_rdd9.foreach(x => println(x))
user_rdd6.map(f => (f._1._1, f._2)).foreach(x => println(x))
val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
user_rdd11.foreach(x => println(x))
}
/**
* 余弦相似度矩阵计算.
* T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i)) * ∑(y(i)*y(i)))
* @param user_rdd 用户评分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def CosineSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
// 0 数据做准备
val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
val user_rdd2 = user_rdd1.map(f => (f._1, (f._2, f._3)))
// 1 (用户,物品,评分) 笛卡尔积 (用户,物品,评分) => (物品1,物品2,评分1,评分2)组合
val user_rdd3 = user_rdd2.join(user_rdd2)
val user_rdd4 = user_rdd3.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
// 2 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,评分1*评分2) 组合 并累加
val user_rdd5 = user_rdd4.map(f => (f._1, f._2._1 * f._2._2)).reduceByKey(_ + _)
// 3 对角矩阵
val user_rdd6 = user_rdd5.filter(f => f._1._1 == f._1._2)
// 4 非对角矩阵
val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
// 5 计算相似度
val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).
join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1,
f._2._1._2, f._2._1._3, f._2._2)))
val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
val user_rdd12 = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
// 6 结果返回
user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
}
/**
* 欧氏距离相似度矩阵计算.
* d(x, y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))
* sim(x, y) = n / (1 + d(x, y))
* @param user_rdd 用户评分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def EuclideanDistanceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
// 0 数据做准备
val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
val user_rdd2 = user_rdd1.map(f => (f._1, (f._2, f._3)))
// 1 (用户,物品,评分) 笛卡尔积 (用户,物品,评分) => (物品1,物品2,评分1,评分2)组合
val user_rdd3 = user_rdd2 join user_rdd2
val user_rdd4 = user_rdd3.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
// 2 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,评分1-评分2) 组合 并累加
val user_rdd5 = user_rdd4.map(f => (f._1, (f._2._1 - f._2._2) * (f._2._1 - f._2._2))).reduceByKey(_ + _)
// 3 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,1) 组合 并累加 计算重叠数
val user_rdd6 = user_rdd4.map(f => (f._1, 1)).reduceByKey(_ + _)
// 4 非对角矩阵
val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
// 5 计算相似度
val user_rdd8 = user_rdd7.join(user_rdd6)
val user_rdd9 = user_rdd8.map(f => (f._1._1, f._1._2, f._2._2 / (1 + sqrt(f._2._1))))
// 6 结果返回
val user_rdd10 = user_rdd9.map(f => ItemSimi(f._1, f._2, f._3))
user_rdd10;
}
/**
* 欧氏距离相似度矩阵计算.
* d(x, y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))
* sim(x, y) = n / (1 + d(x, y))
* @param user_rdd 用户评分
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def euclideanDistanceSimilarity(user_rdd: RDD[ItemPref]) = {
// 0 数据做准备
val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
val user_rdd2 = user_rdd1.map(f => (f._1, (f._2, f._3)))
// 1 (用户,物品,评分) 笛卡尔积 (用户,物品,评分) => (物品1,物品2,评分1,评分2)组合
val user_rdd3 = user_rdd2 join user_rdd2
val user_rdd4 = user_rdd3.map(f => ((f._2._1._1, f._2._2._1), (f._2._1._2, f._2._2._2)))
user_rdd4.foreach(x => println(x))
// 2 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,评分1-评分2) 组合 并累加
val user_rdd5 = user_rdd4.map(f => (f._1, (f._2._1 - f._2._2) * (f._2._1 - f._2._2))).reduceByKey(_ + _)
user_rdd5.foreach(x => println(x))
// 3 (物品1,物品2,评分1,评分2)组合 => (物品1,物品2,1) 组合 并累加 计算重叠数
val user_rdd6 = user_rdd4.map(f => (f._1, 1)).reduceByKey(_ + _)
user_rdd6.foreach(x => println(x))
// 4 非对角矩阵
val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
// 5 计算相似度
val user_rdd8 = user_rdd7.join(user_rdd6)
user_rdd8.foreach(x => println(x))
}
}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.chinasoft.recommender.ItemPref
import org.chinasoft.recommender.ItemSimilarity
import org.chinasoft.recommender.RecommendedItem
object ItemCFClient {
def main(args : Array[String]){
//屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
//设置运行环境
val sparkConf = new SparkConf().setAppName("ItemCFClient")
sparkConf.setMaster("local");
val sc = new SparkContext(sparkConf)
//读取样本数据
val dataRdd = sc.textFile("E:/data/ml-100k/u.data")
val userData = dataRdd.map { x => x.split(" ") }
.map { f => ItemPref(f(0),f(1),f(2).toDouble) }
// val simil_rdd1 = ItemSimilarity.cooccurrenceSimilarity(userData)
val mysimil = new ItemSimilarity()
val simil_rdd1 = mysimil.Similarity(userData, "euclidean")
val recommd = new RecommendedItem();
val recommd_rdd1 = recommd.Recommend(simil_rdd1, userData);
println(s"物品相似度矩阵: ${simil_rdd1.count()}")
simil_rdd1.collect().foreach { ItemSimi =>
println(ItemSimi.itemid1 + ", " + ItemSimi.itemid2 + ", " + ItemSimi.similar)
}
// println(s"用戶推荐列表: ${recommd_rdd1.count()}")
// recommd_rdd1.collect().foreach { UserRecomm =>
// println(UserRecomm.userid + ", " + UserRecomm.itemid + ", " + UserRecomm.pref)
// }
sc.stop();
}
}
import java.sql.Connection
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Connection
import org.apache.tomcat.dbcp.dbcp.ConnectionFactory
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.util.Base64
/**
* 基于HBase的基本操作
*/
class BaseOperator(zk_quorum : String,zk_port : String) extends Serializable{
/**
* 获取初始化配置
*/
protected def getConfiguration() : Configuration = {
val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String]();
val HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", zk_quorum); //map.getOrElse("zookeeper_quorum", "redhat05,redhat04,redhat03")
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", zk_port);
val configuration = HBaseConfiguration.create(HBASE_CONFIG);
configuration;
}
/**
* 获取作业信息
*/
protected def getJob(tableName: String): Job = {
val configuration = this.getConfiguration();
configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName);
var job = new Job(configuration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job;
}
/**
* 得到HBase的链接
*/
def getConnection(): Connection = {
val conn = ConnectionFactory.createConnection(this.getConfiguration());
conn;
}
/**
* 关闭连接
*/
def closeConnction(conn: Connection) = {
conn.close();
}
/**
* 获取table
*/
def getTable(tableName: String): Table = {
val conn = this.getConnection();
val table = conn.getTable(TableName.valueOf(tableName));
table;
}
/**
* 关闭table
*/
def closeTable(table: Table) = {
table.close();
}
/**
* 通过反射得到HBase的数据对象,针对单条记录,单个列族
*/
def getObject(model: Object, t: Class[_], result: Result, family: String): Object = {
val fields = t.getDeclaredFields();
for (field <- fields) {
field.setAccessible(true); //设置属性可以访问
val fieldValue = result.getValue(Bytes.toBytes(family), Bytes.toBytes(field.getName));
val fieldType = field.getType().toString(); //得到此属性的类型
if (fieldValue != null) {
if (fieldType.endsWith("String")) {
field.set(model, Bytes.toString(fieldValue));
} else {
field.set(model, Bytes.toBigDecimal(fieldValue));
}
}
}
model;
}
/**
* 保存原始数据到HBase
*/
def save(rdd: RDD[(ImmutableBytesWritable, Put)], tableName: String) {
val job = this.getJob(tableName);
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration);
}
/**
* 转换scan为String类型
*/
def convertScanToString(scan: Scan): String = {
var proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
以上代码为核心的代码,还有部分展示代码基本与后台和算法无关,所以这里就不贴出来了。
相关推荐
在本项目中,"基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现.zip",我们探讨了一个利用大数据处理技术构建的商品推荐系统。这个项目的核心是使用Apache Spark框架,结合Scala编程语言和MongoDB...
本项目是针对计算机科学与技术专业学生的毕业设计,主题为“基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现”。这个系统旨在利用大数据处理技术和机器学习算法,为电商平台提供个性化的商品推荐服务...
在商品推荐系统中,Spark主要负责对大规模用户行为数据进行预处理、清洗、分析和模型训练。它能够处理实时和批处理任务,这使得我们可以实时地更新推荐结果,以反映用户最新行为的变化。Spark的DataFrame和Dataset ...
其核心是利用Apache Spark框架来实施一个电商商品智能分析系统,该系统能够实时处理流式数据,计算商品的关注度,并进行商品推荐与关联分析。 1. **Apache Spark**:Spark是一个分布式计算框架,它提供了内存计算,...
4. **推荐系统**:推荐系统是电商网站提高用户满意度和转化率的重要工具,它通过分析用户历史行为、兴趣偏好等数据,为用户推荐他们可能感兴趣的商品。本项目会涵盖协同过滤、基于内容的推荐、矩阵分解等推荐算法的...
通过以上分析,我们可以看出,这个"Scala版DVD管理系统"充分展示了Scala语言的强大特性和灵活性,实现了对DVD的高效管理。在实际项目中,开发人员可能还需要关注日志记录、错误处理、单元测试和集成测试等方面,以...
具体的内容可能包括Spark作业的Scala或Java代码,用于实现推荐算法;Mlib库的调用示例,如协同过滤、基于内容的推荐或深度学习模型;以及可能的数据处理和模型评估脚本。 在这个项目中,学生可能会经历以下步骤: ...
Scala Scraper是一个强大的工具,专为Scala编程语言设计,用于高效地从HTML页面中提取和解析数据。...在Java开发领域,文档...结合其易用性和灵活性,Scala Scraper能够有效地提升开发效率,实现高效的数据获取和分析。
推荐系统是大数据和人工智能在日常生活中广泛应用的一个领域,它通过分析用户的历史行为、偏好和兴趣,为用户个性化地推荐商品、服务或内容。在这个场景中,我们有两个关键的数据集:`rating.csv` 和 `movie.csv`,...
推荐系统是人工智能领域的一个重要应用,它通过分析用户的行为和偏好,为用户提供个性化的产品或服务推荐。以下是关于这些技术和构建推荐系统的关键知识点: **Apache Spark** Apache Spark是一个用于大数据处理的...
它通过算法团队(由超过80位数据科学家和数据工程师组成)来进行客户偏好的分析,并结合人类策划来为用户提供定制化的商品推荐。Stitch Fix强调其算法在整个业务流程中的重要性,并在其博客上分享了相关的技术和研究...
推荐系统是现代互联网服务中广泛使用的一种个性化技术,它能够根据用户的历史行为和偏好,为用户提供个性化的商品、内容或服务推荐。在这个“推荐系统数据准备【前】的源码资料”中,我们主要探讨的是在构建推荐系统...
《Hadoop》数据分析系统主要关注的是利用大数据技术对海量数据进行高效处理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专为处理和存储大量数据而设计,它支持分布式计算,是当前互联网行业中广泛应用于大...
本项目"Recommender-For-Bigdata-Exercise"聚焦于大数据环境下的商品推荐系统构建,利用Apache Spark、Scala编程语言以及MongoDB数据库等技术,为用户实现个性化推荐,提升用户体验。 首先,我们要理解推荐系统的...
ALS通过分解用户-商品评分矩阵来预测用户可能对未评分商品的兴趣,从而实现个性化推荐。具体来说,ALS将矩阵分解为两个低秩矩阵:用户特征矩阵和商品特征矩阵,这两者的乘积近似原矩阵,从而找出用户和商品之间的...
通过这种方式,我们可以实时地分析用户行为数据,获取每5分钟内点击量最高的商品列表,这对于电商平台的商品推荐、运营决策等具有重要的实际价值。同时,这种实时分析能力是Flink作为流处理框架的一大优势,它能够...