- 浏览: 14976 次
文章分类
最新评论
基于Spark Mllib, 使用java api操作的电影推荐系统(spark1.5.2 jdk1.7)
最近在学习Spark Mllib,看了一些它的算法,但不知道算法怎么去应用,网上的实例大部分都是使用Scala语言写的,没有java的代码,从网上找到了一篇基于Spark Mllib,SparkSQL的电影推荐系统 也是使用Scala语言(对其不是很了解,暂时也没有多少时间去学),还好他讲得特别细,关于测试数据他都有下载地址,在这就不多说了。直接把由其改写的java代码附上:
maven依赖如下(可能有的不需要,当时还看了spark的别的方面):
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.2</version> </dependency> --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>1.2.0</version> </dependency>上面有的jar包可能直接下不下来,可以手动安装,或者连接vpn后再下载。
java代码如下:
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.DoubleFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import scala.Tuple2; import scala.Tuple3; /** * Collaborative filtering 协同过滤 alternating least squares (ALS) (交替最小二乘法(ALS) ) * Title. <br> * Description. * <p> * Version: 1.0 * <p> */ public class Test { public static void main(String[] args) { // 创建入口对象 SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 加载评分数据 String path = "C:/Users/dulinan/Desktop/movie/ratings.dat"; JavaRDD<String> data = sc.textFile(path); // 所有评分数据,由于此数据要分三部分使用,60%用于训练,20%用于验证,最后20%用于测试。将时间戳%10可以得到近似的10等分,用于三部分数据切分 JavaRDD<Tuple2<Integer, Rating>> ratingsTrain_KV = data.map(new Function<String, Tuple2<Integer, Rating>>() { @Override public Tuple2<Integer, Rating> call(String line) throws Exception { String[] fields = line.split("::"); if (fields.length != 4) { throw new IllegalArgumentException("每一行必须有且只有4个元素"); } int userId = Integer.parseInt(fields[0]); int movieId = Integer.parseInt(fields[1]); double rating = Float.parseFloat(fields[2]); int timestamp = (int) (Long.parseLong(fields[3])%10); return new Tuple2<Integer, Rating>(timestamp, new Rating(userId, movieId, rating)); } }); System.out.println("get " + ratingsTrain_KV.count() + " ratings from " + ratingsTrain_KV.distinct().count() + "users on " + ratingsTrain_KV.distinct().count() + "movies"); // 加载我的评分数据 String mypath = "C:/Users/dulinan/Desktop/movie/test.dat"; JavaRDD<String> mydata = sc.textFile(mypath); JavaRDD<Rating> myRatedData_Rating = mydata.map(new Function<String, Rating>() { @Override public Rating call(String line) throws Exception { String[] fields = line.split("::"); if (fields.length != 4) { throw new IllegalArgumentException("每一行必须有且只有4个元素"); } int userId = Integer.parseInt(fields[0]); int movieId = Integer.parseInt(fields[1]); double rating = Float.parseFloat(fields[2]); return new Rating(userId, movieId, rating); } }); //设置分区数 int numPartitions = 3; //将键值小于6(60%)的数据用于训练 JavaRDD<Rating> traningData_Rating = JavaPairRDD.fromJavaRDD(ratingsTrain_KV.filter(new Function<Tuple2<Integer,Rating>, Boolean>() { @Override public Boolean call(Tuple2<Integer, Rating> v1) throws Exception { return v1._1 < 6; } })).values().union(myRatedData_Rating).repartition(numPartitions).cache(); //将键值大于6小于8(20%)的数据用于验证 JavaRDD<Rating> validateData_Rating = JavaPairRDD.fromJavaRDD(ratingsTrain_KV.filter(new Function<Tuple2<Integer,Rating>, Boolean>() { @Override public Boolean call(Tuple2<Integer, Rating> v1) throws Exception { return v1._1 >= 6 && v1._1 < 8; } })).values().repartition(numPartitions).cache(); //将键值大于8(20%)的数据用于测试 JavaRDD<Rating> testData_Rating = JavaPairRDD.fromJavaRDD(ratingsTrain_KV.filter(new Function<Tuple2<Integer,Rating>, Boolean>() { @Override public Boolean call(Tuple2<Integer, Rating> v1) throws Exception { return v1._1 >= 8; } })).values().cache(); System.out.println("training data's num : " + traningData_Rating.count() + " validate data's num : " + validateData_Rating.count() + " test data's num : " + testData_Rating.count()); // 为训练设置参数 每种参数设置2个值,三层for循环,一共进行8次训练 List<Integer> ranks = new ArrayList<Integer>(); ranks.add(8); ranks.add(22); List<Double> lambdas = new ArrayList<Double>(); lambdas.add(0.1); lambdas.add(10.0); List<Integer> iters = new ArrayList<Integer>(); iters.add(5); iters.add(7); // 初始化最好的模型参数 MatrixFactorizationModel bestModel = null; double bestValidateRnse = Double.MAX_VALUE; int bestRank = 0; double bestLambda = -1.0; int bestIter = -1; for (int i = 0; i < ranks.size(); i++) { for (int j = 0; j < lambdas.size(); j++) { for (int k = 0; k < iters.size(); k++) { //训练获得模型 MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(traningData_Rating), ranks.get(i), iters.get(i), lambdas.get(i)); //通过校验集validateData_Rating获取方差,以便查看此模型的好坏,方差方法定义在最下面 double validateRnse = variance(model, validateData_Rating, validateData_Rating.count()); System.out.println("validation = " + validateRnse + " for the model trained with rank = " + ranks.get(i) + " lambda = " + lambdas.get(i) + " and numIter" + iters.get(i)); //将最好的模型训练结果所设置的参数进行保存 if (validateRnse < bestValidateRnse) { bestModel = model; bestValidateRnse = validateRnse; bestRank = ranks.get(i); bestLambda = lambdas.get(i); bestIter = iters.get(i); } } } } //8次训练后获取最好的模型,根据最好的模型及训练集testData_Rating来获取此方差 double testDataRnse = variance(bestModel, testData_Rating, testData_Rating.count()); System.out.println("the best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + " and numIter = " + bestIter + " and Rnse on the test data is " + testDataRnse); // 获取测试数据中,分数的平均值 final double meanRating = traningData_Rating.union(validateData_Rating).mapToDouble(new DoubleFunction<Rating>() { @Override public double call(Rating t) throws Exception { return t.rating(); } }).mean(); // 根据平均值来计算旧的方差值 double baseLineRnse = Math.sqrt(testData_Rating.mapToDouble(new DoubleFunction<Rating>() { @Override public double call(Rating t) throws Exception { return (meanRating - t.rating()) * (meanRating - t.rating()); } }).mean()); // 通过模型,数据的拟合度提升了多少 double improvent = (baseLineRnse - testDataRnse) / baseLineRnse * 100; System.out.println("the best model improves the baseline by " + improvent + "%"); //加载电影数据 String moviepath = "C:/Users/dulinan/Desktop/movie/movies.dat"; JavaRDD<String> moviedata = sc.textFile(moviepath); // 将电影的id,标题,类型以三元组的形式保存 JavaRDD<Tuple3<Integer, String, String>> movieList_Tuple = moviedata.map(new Function<String, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> call(String line) throws Exception { String[] fields = line.split("::"); if (fields.length != 3) { throw new IllegalArgumentException("Each line must contain 3 fields"); } int id = Integer.parseInt(fields[0]); String title = fields[1]; String type = fields[2]; return new Tuple3<Integer, String, String>(id, title, type); } }); // 将电影的id,标题以二元组的形式保存 JavaRDD<Tuple2<Integer, String>> movies_Map = movieList_Tuple.map(new Function<Tuple3<Integer,String,String>, Tuple2<Integer,String>>() { @Override public Tuple2<Integer, String> call(Tuple3<Integer, String, String> v1) throws Exception { return new Tuple2<Integer, String>(v1._1(), v1._2()); } }); System.out.println("movies recommond for you:"); // 获取我所看过的电影ids final List<Integer> movieIds = myRatedData_Rating.map(new Function<Rating, Integer>() { @Override public Integer call(Rating v1) throws Exception { return v1.product(); } }).collect(); // 从电影数据中去除我看过的电影数据 JavaRDD<Tuple2<Integer, String>> movieIdList = movies_Map.filter(new Function<Tuple2<Integer,String>, Boolean>() { @Override public Boolean call(Tuple2<Integer, String> v1) throws Exception { return !movieIds.contains(v1._1); } }); // 封装rating的参数形式,user为0,product为电影id进行封装 JavaPairRDD<Integer, Integer> recommondList = JavaPairRDD.fromJavaRDD(movieIdList.map(new Function<Tuple2<Integer,String>, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, String> v1) throws Exception { return new Tuple2<Integer, Integer>(0, v1._1); } })); //通过模型预测出user为0的各product(电影id)的评分,并按照评分进行排序,获取前10个电影id final List<Integer> list = bestModel.predict(recommondList).sortBy(new Function<Rating, Double>() { @Override public Double call(Rating v1) throws Exception { return v1.rating(); } }, false, 1).map(new Function<Rating, Integer>() { @Override public Integer call(Rating v1) throws Exception { return v1.product(); } }).take(10); if (list != null && !list.isEmpty()) { //从电影数据中过滤出这10部电影,遍历打印 movieList_Tuple.filter(new Function<Tuple3<Integer,String,String>, Boolean>() { @Override public Boolean call(Tuple3<Integer, String, String> v1) throws Exception { return list.contains(v1._1()); } }).foreach(new VoidFunction<Tuple3<Integer,String,String>>() { @Override public void call(Tuple3<Integer, String, String> t) throws Exception { System.out.println("nmovie name --> " + t._2() + " nmovie type --> " + t._3()); } }); } } //方差的计算方法 public static double variance(MatrixFactorizationModel model, JavaRDD<Rating> predictionData, long n) { //将predictionData转化成二元组型式,以便训练使用 JavaRDD<Tuple2<Object, Object>> userProducts = predictionData.map(new Function<Rating, Tuple2<Object, Object>>() { public Tuple2<Object, Object> call(Rating r) { return new Tuple2<Object, Object>(r.user(), r.product()); } }); //通过模型对数据进行预测 JavaPairRDD<Tuple2<Integer, Integer>, Double> prediction = JavaPairRDD.fromJavaRDD(model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) { //System.out.println(r.user()+"..."+r.product()+"..."+r.rating()); return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating()); } })); //预测值和原值内连接 JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(predictionData.map(new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r) { //System.out.println(r.user() + "..." + r.product() + "..." + r.rating()); return new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating()); } })).join(prediction).values(); //计算方差并返回结果 Double dVar = ratesAndPreds.map(new Function<Tuple2<Double,Double>, Double>() { @Override public Double call(Tuple2<Double, Double> v1) throws Exception { return (v1._1 - v1._2) * (v1._1 - v1._2); } }).reduce(new Function2<Double, Double, Double>() { @Override public Double call(Double v1, Double v2) throws Exception { return v1 + v2; } }); return Math.sqrt(dVar / n); } }
相关推荐
总之,Spark 1.5.2是一个值得推荐的稳定版本,它提供了高效的分布式计算能力,丰富的数据处理API,以及对多种数据源的支持。无论是对于大数据处理初学者还是经验丰富的开发者,都是一个理想的工具选择。
总的来说,Spark-assembly-1.5.2-hadoop2.6.0.jar是开发和部署基于Spark的分布式大数据应用的关键组件,它的存在使得开发人员能够轻松地在Scala环境中利用Spark的强大功能。尽管随着Spark版本的更新,新的特性不断...
总结,Spark-assembly-1.5.2-hadoop2.6.0.jar是Spark生态系统的关键组成部分,它集成了所有必要的模块和依赖,使得Spark应用程序的开发和部署变得更加简单。无论是初学者还是经验丰富的开发者,理解这个组件的工作...
### Spark升级后遇到`java.lang.NoClassDefFoundError: org/apache/spark/Logging`问题解决方案 #### 一、问题背景及现象 在升级Spark至2.1版本之后,在使用streaming-kafka进行测试时,遇到了`java.lang....
spark-core_2.11-2.0.0.jar比spark-core_2.11-1.5.2.jar少了org.apache.spark.Logging.class,故此把缺少的class放到spark-core_2.11-1.5.2.logging.jar里面
Spark是Apache软件基金会下的一个开源大数据处理框架,其1.5.2版本是该系统的一个稳定版本,提供了高效、易用且可扩展的数据处理能力。Spark的核心特性在于它的内存计算,这使得它在处理大规模数据时相比传统的...
Java3D 1.5.2 是一个基于Java平台的三维图形编程库,它为开发者提供了丰富的工具和接口,用于创建复杂的3D场景和应用程序。这个版本包含了Windows平台的安装程序,源代码,API文档,以及示例源代码,使得学习和开发...
JavaWebsocket 1.5.2 jar包
j3d-1_5_2-api-docs.zip j3d-1_5_2-src.zip j3d-1_5_2-windows-i586.exe j3d-examples-1_5_2-src.zip :source code for the 3D example programs java3d-1_5-spec.zip joalmixer-1_5_2.zip vecmath-1_5_2-src.zip ...
如果你也有什么好的扩展可以推荐给大家的话,大家可以用邮件的形式将扩展以及相关的API说明发送给我,我会在测试确认之后打包到下一个版本的扩展库中。我的邮箱是:richie696@163.com 【三.版本说明】 本开发包的...
【标题】"VS2019+FileGDB_API_1.5.2" 涉及的关键技术点是Visual Studio 2019(VS2019)与Esri的File Geodatabase API(FileGDB_API)版本1.5.2的结合使用,这主要用于地理信息系统(GIS)开发。下面将详细解析这两个...
赠送jar包:wildfly-common-1.5.2.Final.jar; 赠送原API文档:wildfly-common-1.5.2.Final-javadoc.jar; 赠送源代码:wildfly-common-1.5.2.Final-sources.jar; 赠送Maven依赖信息文件:wildfly-common-1.5.2....
jQuery EasyUI 是一款基于 jQuery 的前端开发框架,它简化了网页界面的构建,提供了丰富的组件和易用的API。在1.5.2版本中,这个框架进一步优化了功能和性能,使其更适合大型Web应用的需求。这个压缩包“jQuery ...
jQuery EasyUI 1.5.2 API 中文版
《openEuler操作系统实验手册-华为云版v1.5.2a》是针对openEuler操作系统的实践教程,特别关注在华为云上的部署与应用。本手册详细介绍了如何进行openEuler开发、安装isulad以及isulad-build的过程,这些都是在Linux...
"Smart Admin1.5.2 API" 是一个用于管理和控制应用程序后端的智能管理框架的API版本。这个框架主要用于简化Web应用的开发过程,提供了一系列的接口和工具,以高效、便捷的方式处理常见的后台任务。它可能包含了诸如...
官方的1.5.2版本spark,亲测可用,现提供低积分下载,只为帮助需要的人,如果可以,给个好评!
赠送jar包:wildfly-common-1.5.2.Final.jar; 赠送原API文档:wildfly-common-1.5.2.Final-javadoc.jar; 赠送源代码:wildfly-common-1.5.2.Final-sources.jar; 赠送Maven依赖信息文件:wildfly-common-1.5.2....
标签:mapstruct、jdk8、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用...