`
bo_hai
  • 浏览: 564635 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

spark基于ALS的协同过虑实例

 
阅读更多

最近在学习spark,网上有很多关于ALS的实例,大多数千篇一例,决定自己动手写一个实例,力求做到可以运行,有结果。

1、数据集准备:

http://grouplens.org/datasets/movielens/ 下载电影评分数据,查看一下README里面有对数据集的介绍。

对数据进行预处理:

 

cat u1.base | awk -F "\t" '{print $1"::"$2"::"$3"::"$4}' > ratings.dat  
cat u.item | awk -F "|" '{print $1"\t"$2"\t"$3}' > movies.dat  

 结果如下:

 

 

[root@hongboVM ml-100k]# head -10 ratings.data 
1::1::5::874965758
1::2::3::876893171
1::3::4::878542960
1::4::3::876893119
1::5::3::889751712
1::7::4::875071561
1::8::1::875072484
1::9::5::878543541
1::11::2::875072262
1::13::5::875071805

 

[root@hongboVM ml-100k]# head -10 movies.data 
1::Toy Story (1995)::01-Jan-1995
2::GoldenEye (1995)::01-Jan-1995
3::Four Rooms (1995)::01-Jan-1995
4::Get Shorty (1995)::01-Jan-1995
5::Copycat (1995)::01-Jan-1995
6::Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)::01-Jan-1995
7::Twelve Monkeys (1995)::01-Jan-1995
8::Babe (1995)::01-Jan-1995
9::Dead Man Walking (1995)::01-Jan-1995
10::Richard III (1995)::22-Jan-1996

 

[root@hongboVM ml-100k]# head -10 user.data 
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
6|42|M|executive|98101
7|57|M|administrator|91344
8|36|M|administrator|05201
9|29|M|student|01002
10|53|M|lawyer|90703

 

  将数据上传到hdfs上。

2、数据处理基于思路:

  先使用ratings数据训练模型,然后使用模型做预测。打印出推荐信息:

3、代码如下:

 

package com.bohai.mllib

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.{SparkConf, SparkContext}

object MoviesRecommondNew {
  def main(args: Array[String]) {
    //屏蔽日志,由于结果是打印在控制台上的,为了方便查看结果,将spark日志输出关掉
    //解决spark日志输出的问题最好的解决办法是:修改spark日志文件,将日志写入文件中
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("MoviesRecommondNew")
    val sc = new SparkContext(conf)
    //ratings.data的数据
    val data = sc.textFile("/data/ratings.data")
    val test_data = sc.textFile("/data/test.data")
    //注意这里的分隔符
    val user_data = sc.textFile("/data/user.data").map(x => x.split("[|]") match {
      case Array(userId, age, gender, occupation, zipCode) => Users(userId.toInt, age.toInt, gender)
    })
    val movie_data = sc.textFile("/data/movies.data").map(x =>x.split("::"))

    println("rate data count is : " + data.count())
    val ratings = data.map(x => x.split("::") match {
      case Array(user, item, rate, ts) => Rating(user.toInt, item.toInt, rate.toDouble)
    })

    val test_ratings = test_data.map(x => x.split("::") match {
      case Array(user, item, rate, ts) => Rating(user.toInt, item.toInt, rate.toDouble)
    })
    println("test rate data count is : " +test_ratings.count())
    //println("test rate data is : " + test_ratings.take(2))

    val userIds = user_data.map(_.id)

    println("user data is : " + userIds.count())

    //生成k,v形式,便于通过movieID找到movieName
    val movieDataMap = movie_data.map(x => (x(0).toInt,x(1))).collectAsMap()
    //broadcast进行广播
    val bMovieDataMap = sc.broadcast(movieDataMap)

    val rank = 10
    val numIterations = 10
    val model = ALS.train(ratings, rank, numIterations, 0.001)
    val usersProducts = ratings.map { case Rating(user, prod, rate) => (user, prod) }
    val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate) }
    val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions)
    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
      val err = r1 - r2
      err * err
    }.mean()
    println(s"Mean squared Error = $MSE")


    val userID = 384
    val moviesForUser = ratings.keyBy(_.user).lookup(userID)
    println(s"用户$userID 评价过的电影:\n")
    for (movieID <- moviesForUser.map(f => f.product)) {
      //movie_data.filter{x  => x(0).toInt == movieID}.map(x => x(1)).collect().foreach(println)
      println(bMovieDataMap.value.getOrElse(movieID,""))
    }

    println(s"用户$userID 推荐的电影:\n")
    val recommendProds:Array[Rating] = model.recommendProducts(userID,20)
    for (recommend <- recommendProds) {
      //println(recommend.user + "," + recommend.product + "," + recommend.rating)
      //movie_data.filter{x  => x(0).toInt == recommend.product}.map(x => x(1)).collect().foreach(println)
      println(bMovieDataMap.value.getOrElse(recommend.product,""))
    }

    println("为每个用户推荐10个电影:\n")
    val allRecommendations = model.recommendProductsForUsers(10).map{
      case (userId,recommends) =>
        val str = new StringBuilder()
        for (r <- recommends) {
          if (str.nonEmpty) {
            str.append("::")
          }
          str.append(r.product)
        }
        (userId,str.toString())
    }
    allRecommendations.take(10).foreach(println)
  }

  //样例类,用作SparkSQL隐式转换
  case class Ratings(userId: Int, movieId: Int, rating: Int)

  case class Movies(id: Int, moveTitle: String, releaseDate: String)

  case class Users(id: Int, age: Int, gender: String)

}

   提交到spark上进行测试:

 

 

spark-submit --master spark://172.4.23.99:7077 --num-executors 4 --executor-cores 2 --class com.bohai.mllib.MoviesRecommondNew  ./simple-project_2.10-1.0.jar

 运行结果如下 :

 

 

rate data count is : 80000                                                      
test rate data count is : 20000
user data is : 943
Mean squared Error = 0.44838904095188975                                        
用户384 评价过的电影:

Contact (1997)
Starship Troopers (1997)
English Patient, The (1996)
Evita (1996)
Air Force One (1997)
L.A. Confidential (1997)
Titanic (1997)
As Good As It Gets (1997)
Cop Land (1997)
Conspiracy Theory (1997)
Desperate Measures (1998)
Game, The (1997)
Tomorrow Never Dies (1997)
That Darn Cat! (1997)
Peacemaker, The (1997)
Cats Don't Dance (1997)
用户384 推荐的电影:

Amos & Andrew (1993)
I'm Not Rappaport (1996)
Ruling Class, The (1972)
Amateur (1994)
Englishman Who Went Up a Hill, But Came Down a Mountain, The (1995)
To Live (Huozhe) (1994)
Stupids, The (1996)
Cemetery Man (Dellamorte Dellamore) (1994)
Eye for an Eye (1996)
M. Butterfly (1993)
Crooklyn (1994)
8 1/2 (1963)
Herbie Rides Again (1974)
City of Lost Children, The (1995)
Vanya on 42nd Street (1994)
Afterglow (1997)
Addiction, The (1995)
Die xue shuang xiong (Killer, The) (1989)
Haunted World of Edward D. Wood Jr., The (1995)
Mute Witness (1994)
为每个用户推荐10个电影:

(656,1313::998::1480::1206::149::253::1411::1451::974::401)                     
(692,1192::1058::1286::1425::1483::703::1113::1404::960::753)
(932,1313::57::1643::947::601::1128::1131::954::1224::965)
(772,1131::860::1192::1512::1205::1129::967::1128::57::445)
(324,1019::1192::904::1022::982::1262::320::1404::786::1298)
(180,1426::394::1195::1184::793::1389::1069::1208::1245::1120)
(340,860::1192::974::998::1131::1273::440::1178::296::1483)
(320,1286::1056::115::1425::320::916::534::962::960::703)
(752,1286::1643::115::906::800::1160::296::1129::767::1049)
(744,1131::1380::624::1211::1137::782::860::1192::1113::630)

 总结:(1)下载下来的数据是使用 \t 进行分隔的,需要进行预处理;(2)为方便通过movieID找到movieName,我们将movieID、movieName封装成map,并对map进行 broadcast ,提交检索的性能。(3)没有计算预测的结果与实际评分的误 差,有空补上;

参考:

http://blog.csdn.net/oopsoom/article/details/34462329

http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html

 

0
0
分享到:
评论
2 楼 bo_hai 2016-07-27  
bo_hai 写道
movies.dat 数据中有一条无效的数据,在267行。不删除会引起case时发生错误。

使用filter找出有问题的数据。
1 楼 bo_hai 2016-07-26  
movies.dat 数据中有一条无效的数据,在267行。不删除会引起case时发生错误。

相关推荐

    spark mllib 协同过滤推荐算法(ALS) python 实现 完整实例程序

    在本实例中,我们将探讨如何使用 PySpark(Python 接口)实现基于 MLlib 的协同过滤推荐算法——交替最小二乘法(Alternating Least Squares, ALS),用于用户和物品的推荐。 协同过滤是推荐系统中最常用的方法之一...

    使用协同过滤和lfm(sparkmllibALS)的电影推荐演示_Python_下载.zip

    在本项目中,我们将深入探讨如何使用Python编程语言和Apache Spark的机器学习库MMLib中的协同过滤(Collaborative Filtering, CF)以及latent factor models(LFM)算法,特别是Alternating Least Squares(ALS)来...

    基于spark的大数据过滤引擎推荐系统.zip

    5. **Spark MLlib的使用**:Spark的MLlib库提供了推荐算法的实现,例如基于ALS(交替最小二乘法)的协同过滤算法。开发者需要理解如何使用MLlib创建数据集,调用相应的算法模型,训练并优化参数,最终生成推荐结果。...

    SparkALS-EtuSandbox-:SparkALS-EtuSandbox-

    在SparkALS-EtuSandbox--master这个项目中,开发者提供了一个完整的Spark ALS实现框架,包括数据读取、模型训练、预测和评估等步骤。这为我们理解和实践Spark ALS提供了一个理想的沙箱环境。 总的来说,Spark ALS是...

    基于Apache Spark的Netflix电影的离线与实时推荐系统.zip

    总之,"基于Apache Spark的Netflix电影的离线与实时推荐系统"是一个全面展示Spark功能和推荐系统算法的实例,对于学习者和开发者来说,是一个极具价值的学习资源。通过深入研究和实践,不仅能够提升技能,还能为解决...

    Sparkml实战

    Spark MLlib 提供了 ALS(Alternating Least Squares)算法实现协同过滤。 #### 1.3.2 实例介绍 本实例将使用 ALS 算法来实现一个基于用户和物品的推荐系统。具体步骤包括: 1. **加载数据**: 从 CSV 文件中读取...

    光环国际spark大数据&机器学习PPT

    在实例介绍中,通过对用户行为和物料信息的分析,Spark MLlib能够在给定用户和物品的交互信息中,采用协同过滤算法生成个性化的推荐。例如,根据用户A的历史行为和相似用户的偏好,可以推荐那些用户A未曾有过交互但...

    spark隐语义模型推荐.zip

    Spark隐语义模型推荐系统是一种基于大数据处理框架Apache Spark的推荐算法实现,它结合了机器学习中的隐语义模型(Latent Factor Models)与分布式计算的优势。本项目着重讲解如何利用Python编程语言来构建这样的...

    《大数据》第4章大数据挖掘工具.pptx

    最后,协同过滤算法是推荐系统的重要组成部分,Mahout提供了基于物品的协同过滤和基于ALS的矩阵分解算法,这些方法能够根据用户的历史行为预测他们可能感兴趣的新物品。 综上所述,本章《大数据》深入探讨了大数据...

    spark-recommendation

    总之,"火花推荐"项目是一个使用Java API实现的基于Spark MLlib的推荐系统实例,旨在展示如何利用协同过滤技术在大数据上构建高效的推荐引擎。通过理解Spark的分布式计算特性、MLlib的机器学习功能以及Java API的...

    POIRecommendSystem.rar

    矩阵分解(如SVD、ALS)是协同过滤的一种高效实现方式,通过将用户-POI交互矩阵分解为低秩矩阵,捕捉用户和POI之间的隐含关系。这种方法能有效处理稀疏数据,并且有助于发现潜在的兴趣模式。 7. **系统优化** ...

    大数据挖掘工具培训课件.pptx

    协同过滤算法中,Mahout支持基于用户和物品的协同过滤,并实现了ALS矩阵分解的不同变体。 **Mahout的安装与使用** 在Linux操作系统(例如CentOS 6.5)和Hadoop平台(如Hadoop 2.5.1)上安装Mahout,可以通过下载...

    PySpark 机器学习、自然语言处理与推荐系统配套代码+数据集.zip

    在推荐系统中,PySpark的协同过滤算法如Alternating Least Squares (ALS)被广泛使用。通过用户对物品的评分数据,ALS可以学习出用户和物品的隐向量,从而预测未知评分,实现个性化推荐。此外,还可以结合其他特征,...

    Sparkify_capstone

    根据项目需求,可能会使用协同过滤、基于内容的推荐或其他机器学习算法,如矩阵分解(如ALS)来构建推荐模型。这些模型基于用户的历史行为,预测他们可能喜欢的歌曲,从而实现个性化推荐。 八、模型评估与优化 通过...

Global site tag (gtag.js) - Google Analytics