spark-submit --class com.ones.soc.cf.MoiveRecommender --master yarn --num-executors 3 --driver-memory 5g --executor-memory 4g /root/bigData.jar 2 5 0.01 /ones/mldata/1u.user /ones/mldata/1u.data /ones/result/1
package com.ones.soc.cf
import com.ones.soc.json.JSONObject
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap
import java.util.List
import java.util.ArrayList
/**
* Created by tom
*/
object MoiveRecommender {
val numRecommender = 10
case class Params(
input: String = null,
numIterations: Int = 20,
lambda: Double = 1.0,
rank: Int = 10,
numUserBlocks: Int = -1,
numProductBlocks: Int = -1,
implicitPrefs: Boolean = false,
userDataInput: String = null)
def main(args: Array[String]) {
run(args: Array[String])
}
def run(args: Array[String]) {
val confighdfs = new Configuration();
val fs=FileSystem.get(confighdfs) ;
if(args(5) != null && args(5).trim().length > 1){
val output = new Path(args(5));
if(fs.exists(output)){ //删除输出目录
fs.delete(output, true);
}
}
var input: String = null
var numIterations: Int = 20
var lambda: Double = 1.0
var rank: Int = 10
var numUserBlocks: Int = -1
var numProductBlocks: Int = -1
var implicitPrefs: Boolean = false
var userDataInput: String = null
rank=args(0).toInt
numIterations=args(1).toInt
lambda=args(2).toDouble
userDataInput=args(3).toString
input=args(4).toString
var outpath=args(5).toString
//本地运行模式,读取本地的spark主目录
var conf = new SparkConf().setAppName("Moive Recommendation")
//.setSparkHome("D:\\work\\hadoop_lib\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
//conf.setMaster("local[*]")
//集群运行模式,读取spark集群的环境变量
//var conf = new SparkConf().setAppName("Moive Recommendation")
val context = new SparkContext(conf)
//加载数据
val data = context.textFile(input)
/**
* *MovieLens ratings are on a scale of 1-5:
* 5: Must see
* 4: Will enjoy
* 3: It's okay
* 2: Fairly bad
* 1: Awful
*/
val ratings = data.map(_.split("\t") match {
case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
})
//使用ALS建立推荐模型
//也可以使用简单模式 val model = ALS.train(ratings, ranking, numIterations)
val model = new ALS()
.setRank(rank)
.setIterations(numIterations)
.setLambda(lambda)
.setImplicitPrefs(implicitPrefs)
.setUserBlocks(numUserBlocks)
.setProductBlocks(numProductBlocks)
.run(ratings)
//预测数据并保存
predictMoive(userDataInput, context, model,fs,outpath)
//模型评估
evaluateMode(ratings, model)
//clean up
context.stop()
}
/**
* 模型评估
*/
private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {
//使用训练数据训练模型
val usersProducets = ratings.map(r => r match {
case Rating(user, product, rate) => (user, product)
})
//预测数据
val predictions = model.predict(usersProducets).map(u => u match {
case Rating(user, product, rate) => ((user, product), rate)
})
//将真实分数与预测分数进行合并
val ratesAndPreds = ratings.map(r => r match {
case Rating(user, product, rate) =>
((user, product), rate)
}).join(predictions)
//计算均方差
val MSE = ratesAndPreds.map(r => r match {
case ((user, product), (r1, r2)) =>
var err = (r1 - r2)
err * err
}).mean()
//打印出均方差值
println("Mean Squared Error = " + MSE)
}
/**
* 预测数据并保存
*/
private def predictMoive(userDataInput: String, context: SparkContext, model: MatrixFactorizationModel,fs:FileSystem,outpath:String) {
var recommenders = new ArrayList[java.util.Map[String, String]]();
var sb=new StringBuilder
//读取需要进行电影推荐的用户数据
val userData = context.textFile(userDataInput) //u.user
userData.map(_.split("\\|") match {
case Array(id, age, sex, job, x) => (id)
}).collect().foreach(id => {
//为用户推荐电影
var rs = model.recommendProducts(id.toInt, numRecommender)
var value = ""
var key = 0
rs.foreach(r => {
key = r.user
value = value + r.product + ":" + r.rating + ","
})
sb.append("user="+key+"\t"+"value="+value).append("\r\n")
//成功,则封装put对象,等待插入到Hbase中
/*
if (!value.equals("")) {
var put = new java.util.HashMap[String, String]()
put.put("rowKey", key.toString)
put.put("t:info", value)
recommenders.add(put)
}
*/
})
outputHdfs(fs,sb.toString(),outpath)
//保存到到HBase的[recommender]表中
//recommenders是返回的java的ArrayList,可以自己用Java或者Scala写HBase的操作工具类,这里我就不给出具体的代码了,应该可以很快的写出
//HbaseUtil.saveListMap("recommender", recommenders)
}
def outputHdfs(fs:FileSystem,text:String,textdir:String):Unit={
try{
val fsDataOutputStream = fs.create(new Path(textdir+"/result.txt"), true);
val s=text.getBytes("UTF-8")
fsDataOutputStream.write(s,0,s.length)
fsDataOutputStream.hflush();
}catch{
case e:Exception =>
}
}
}
分享到:
相关推荐
基于Spark MLlib自带的ALS最小二乘法矩阵分解算法的Netflix Prize电影推荐系统(包括离线推荐与实时推荐)Netflix-Recommender-with-Spark-master.zip
增广最小二乘法(Augmented Least Squares, ALS)通过将原始数据矩阵进行增广处理,能够解决一些特定问题,例如处理含有缺失数据或异常值的情况。 6. 广义最小二乘法 广义最小二乘法(Generalized Least Squares, ...
4. **增广最小二乘法(Augmented Least Squares, ALS)**:在处理具有输入延迟或非最小相位系统的辨识时,ALS是一种有效的策略。它通过增加额外的虚拟输入或输出来扩展原系统的状态空间表示,从而解决这些问题。...
递推最小二乘法(Recursive Least Squares, RLS)和增广最小二乘法(Augmented Least Squares, ALS)是其两种重要的变体,分别针对在线估计和处理复杂模型的情况。 递推最小二乘法是一种动态更新参数的方法,适用于...
交替最小二乘法(Alternating Least Squares,ALS)是实现MCR的一种常用算法。该方法基于最小化残差平方和的目标函数,通过交替更新矩阵的分解来进行迭代优化。在MCR-ALS过程中,我们通常假设数据是由少数纯组分的...
增广最小二乘法(Augmented Least Squares, ALS)是一种在系统辨识和估计理论中广泛应用的数学方法,尤其在处理线性非最小相位系统时。MATLAB作为一种强大的数值计算和数据可视化环境,是实现此类算法的理想平台。本...
增广最小二乘法(ALS)将原问题转化为一个增广的线性模型,通过对原模型进行增广来提高参数估计的精度,适用于含有附加信息或约束的情况。 6. 广义最小二乘法 广义最小二乘法(GLS)是在最小二乘的基础上考虑了数据...
在这个“als.zip_als_reachjj5_交替最小二乘法_协同训练_协同过滤”的资料中,主要讨论的是如何利用交替最小二乘法(ALS)来优化协同过滤算法。 交替最小二乘法(Alternating Least Squares)是一种在协同过滤中...
基于Spark的电影推荐,ALS交替最小二乘法,基于矩阵分解的协同过滤推荐。全部资料+详细文档.zip 【备注】 1、该项目是个人高分项目源码,已获导师指导认可通过,答辩评审分达到95分 2、该资源内项目代码都经过测试...
增广最小二乘法(Augmented Least Squares, ALS)是处理带有约束条件的最小二乘问题的方法,如非负权重约束或正交性约束。在系统辨识中,这些约束可能来源于物理系统的性质。ALS通过将约束转化为额外的方程来处理...
4. **增广最小二乘法**(Augmented Least Squares, ALS):通过增加附加项来处理非线性问题,使问题线性化。 5. **广义最小二乘法**(Generalized Least Squares, GLS):引入白化滤波器,将有色噪声转化为白噪声,...
为解决此问题,提出了基于LU分解和交替最小二乘法(ALS)的分布式奇异值分解(SVD)推荐算法。 交替最小二乘法(ALS)是推荐系统中常用的一种优化算法,它通过交替地固定其他变量来优化一个变量的方式最小化目标...
4. 增广最小二乘法(Augmented Least Squares, ALS):通过扩展模型来同时估计参数和噪声模型,适用于非线性系统。 5. 广义最小二乘法(Generalized Least Squares, GLS):通过引入白化滤波器将有色噪声转化为白...
增广最小二乘法(Augmented Least Squares, ALS)是另一种常用的系统辨识方法,特别是在非线性系统中。它通过添加适当的辅助变量,将非线性问题转化为线性问题,然后应用最小二乘法求解。这种方法对于处理非线性模型...
在数值分析和优化领域,增广最小二乘法(Augmented Least Squares,ALS)是一种常用的算法,尤其在处理线性回归问题时效果显著。它通过扩展原始的最小二乘问题来解决因数据误差积累而带来的挑战。在MATLAB环境中,...
- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,...
- 基于Spark MLlib自带的ALS最小二乘法矩阵分解算法,比较简单,文档也比较多,就不详细说了:-D。。。 - 选择的迭代次数iterations=5,物品、用户特征个数K=50 - 使用了RMSE方法评价推荐精度,结果为0.5757 不同...
基于Spark MLlib自带的ALS最小二乘法矩阵分解算法,比较简单,文档也比较多,就不详细说了:-D。。。 选择的迭代次数iterations=5,物品、用户特征个数K=50 使用了RMSE方法评价推荐精度,结果为0.5757 实时推荐...
协同过滤算法(Collaborative Filtering)是一种经典的推荐算法,其基本原理是“协同大家的反馈、评价和意见,一起对海量的信息进行过滤,从中筛选出用户可能感兴趣的信息”。它主要依赖于用户和物品之间的行为关系...