`
coderplay
  • 浏览: 576880 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

用户推荐Slope One算法与mapreduce&hive实现

阅读更多

下载本文代码

用户推荐越来越热, Google使用MinHash, PLSI, LDA, SVD, SVM等算法,分析用户的喜好, 实现新闻的自动分类;新浪也用Slope One以及一些Item-based的算法对音乐进行推荐; 淘宝定期会启动MapReduce作业分析前一天或者一个月用户收藏的宝贝,给相同喜好的买家提供推荐服务。

本文要描述的Slope One 算法是一种对评分进行预测的算法, 它相对于SVD, PLSI, LDA这一类model-based算法来说有以下特点:

1. 简单, 容易实现

2. 训练得到的模型可以增量更新

3. 预测速度很快

4. 用户可以只做过一两次评分,就可以获得推荐.

5. 准确度比较理想

 

okay,  找到一篇介绍算法的:http://www.fuchaoqun.com/2008/09/slope_one/

讲的不错,就不再重复了。

英文wiki上也有介绍http://en.wikipedia.org/wiki/Slope_One

其中python的实现比较简洁

# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>.
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, version 2 or later, which is
# incorporated herein by reference.

class SlopeOne(object):
    def __init__(self):
        self.diffs = {}
        self.freqs = {}

    def predict(self, userprefs):
        preds, freqs = {}, {}
        for item, rating in userprefs.iteritems():
            for diffitem, diffratings in self.diffs.iteritems():
                try:
                    freq = self.freqs[diffitem][item]
                except KeyError:
                    continue
                preds.setdefault(diffitem, 0.0)
                freqs.setdefault(diffitem, 0)
                preds[diffitem] += freq * (diffratings[item] + rating)
                freqs[diffitem] += freq
        return dict([(item, value / freqs[item])
                     for item, value in preds.iteritems()
                     if item not in userprefs and freqs[item] > 0])

    def update(self, userdata):
        for ratings in userdata.itervalues():
            for item1, rating1 in ratings.iteritems():
                self.freqs.setdefault(item1, {})
                self.diffs.setdefault(item1, {})
                for item2, rating2 in ratings.iteritems():
                    self.freqs[item1].setdefault(item2, 0)
                    self.diffs[item1].setdefault(item2, 0.0)
                    self.freqs[item1][item2] += 1
                    self.diffs[item1][item2] += rating1 - rating2
                    print self.diffs[item1][item2]
        for item1, ratings in self.diffs.iteritems():
            for item2 in ratings:
                ratings[item2] /= self.freqs[item1][item2]

if __name__ == '__main__':
    userdata = dict(
        alice=dict(squid=1.0,
                   cuttlefish=0.5,
                   octopus=0.2),
        bob=dict(squid=1.0,
                 octopus=0.5,
                 nautilus=0.2),
        carole=dict(squid=0.2,
                    octopus=1.0,
                    cuttlefish=0.4,
                    nautilus=0.4),
        dave=dict(cuttlefish=0.9,
                  octopus=0.4,
                  nautilus=0.5),
        )
    s = SlopeOne()
    s.update(userdata)
    print s.predict(dict(octopus=0.4)
   

 

现在分析一下Slope One训练的空间及时间复杂度,

如果有m个用户,分别对n件物品进行了评分。每个用户得进行 n 2 次计算,将产生n(n-1)/2级别的数据量(由于diff是个对角矩阵,可以只取下三角)。所以对m个用户来说, CPU计算时间是m n 2 , 产生的中间数据是mn(n-1)/2,最后合并m个用户的这些数据,产生的数据量是n(n-1)/2。

这个算法的计算量对物品数据是呈平方级别地增长,对用户数量是线性的。比较恐怖的是它产生的中间数据,如果某用户物品评价数据为1MB左右, 且数据是double型占8字节, 则有1MB / 8B = 128K,此用户将产生的数据是1MB * (128K - 1) / 2 约为64GB数据量, 这部分中间数据是不可能放在内存的,只能通过磁盘,然而磁盘读写与主存完全不是一个级别,速度上又造成一个瓶颈。

 

当然也不必这么悲观, Slope One是一个可以进行增量的算法。假设已经对y件物品进行了训练,则当前训练的时间复杂度不会超过n 2 +my 2 . 撇开增量算法不管, 我们可以用MapReduce的优势分布式地进行训练(可以同时使用增量和MapReduce)。以Netflix Prize 的数据为例, 它包含480189个用户对17770部影片的评分数据,训练数据有17770个文件,每个文件代表一部影片, 其中第一行是影片的id, 其余行是各用户对此影片的评分记录。

MovieID:

CustomerID,Rating,Date

 

这些文件都比较小,最大的不过4793673字节,最小的才70字节,而MapReduce的文件块为64MB。小文件对于mapreduce任务来说是不利的,将会产生太多mapper. 这儿有一种解决办法,将tar包转成sequecefile .

 

省略此步,直接把解压后的文件put到HDFS,然后使用一道mapreduce把数据转成我们需要的格式。

hadoop dfs -put $NETFLIX_HOME/training_set /user/zhoumin/netflix-source
# 将附件中的代码成slopeone-0.00.1-dev.jar后运行
hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOnePreproccessor /user/zhoumin/netflix-source/user/zhoumin/netflix

  然后用SlopeOneTrainer进行训练。

SlopeOneTrainer的原理每个mapper计算一个用户各item的diff矩阵。了解hadoop中mapper运行机制的人就会发现,有的用户数据量大,很有可能产生上面说的数十GB的中间数据, 远远超过io.sort.mb的值。会造成mapper不停地merge数据,致使速度较慢, 使用36台个slaves的集群运行netflix的这部分训练花了4个多小时,绝大部分时间会花在mapper之上,特别是mapper的merge阶段.

 

于是假如把中间数据交给reducer去处理,更为理想,其实此步训练相当于一个join操作。于是使用hive比较方便。先将原始数据转成hive所需要的格式.

hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOneHive /user/zhoumin/netflix-source /user/zhoumin/netflix-hive
 

然后再建立两张表,netflix是处理后的netflix训练数据, freq_diff是训练后的模型矩阵

CREATE EXTERNAL TABLE netflix(
  movie_id STRING,
  user_id STRING,
  rate DOUBLE,
  rate_date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' 
STORED AS TEXTFILE 
LOCATION '/user/zhoumin/netflix-hive'; 

CREATE TABLE freq_diff (
    movie_id1 STRING,
    movie_id2 STRING,
    freq     DOUBLE,
    diff     DOUBLE
);

  okay,运行训练SQL

INSERT OVERWRITE TABLE freq_diff
SELECT
  nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
FROM
  netflix nf1 
JOIN 
  netflix nf2 ON nf1.user_id = nf2.user_id 
WHERE nf1.movie_id > nf2.movie_id
GROUP BY nf1.movie_id, nf2.movie_id;

 此SQL将会产生两道mapreduce job,使用 explain命令即可以看到, 第一道主要做join的工作,在reduce端会输

出所有的中间数据。Hive自动会调整reducer的数量,但这儿的reducer为3, 跑得比较慢(超过9小时),可以将reducer显式地设大些,我这儿设为160,再跑上面的训练SQL.

set mapred.reduce.tasks=160;

 两道job第一道花了33mins, 35sec,第二道花了1hrs, 29mins, 29sec,训练时间总共约2小时,可以接受。

 

 

训练完毕,就可以试一试预测功能了。假设某用户给影片1000评了2分,那么他将会对其它影片评多少分呢? 他将喜欢哪些影片呢?

 

okay,先做些准备工作

CREATE TABLE predict(
  movie_id STRING,
  rate FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

echo "1000,2" > predict_data

LOAD DATA LOCAL INPATH './predict_data' OVERWRITE INTO TABLE predict;
 

然后就可以进行预测了:

 

CREATE TABLE slopeone_result(
    movie_id STRING,
    freq     DOUBLE,
    pref     DOUBLE,
    rate     DOUBLE
);

INSERT OVERWRITE TABLE slopeone_result
SELECT
    /*+ MAPJOIN(p) */
    movie_id1                          as movie_id,
    sum(freq)                          as freq,
    sum(freq*(diff + rate))            as pref,
    sum(freq*(diff + rate))/sum(freq)  as rate
FROM
    predict p 
JOIN freq_diff fd ON fd.movie_id2 = p.movie_id
GROUP BY movie_id1

 注意上面使用了一个Map-Side Join的hint, 因为predict表非常小,只需要跑一个map only的job就可以完成join,无需shuffle数据给reduce. 这一步把用户自身的movie_id也参与计算,由于hive不支持in,所以结果有些偏差。可以用一道MapReduce作业来做预测这一步。

最后select .. order by一下就知道此用户喜欢哪些影片了。

 

结论:

1. 使用mapreduce,将运算移至reduce端, 避免map端的merge可以有效地提高训练速度

2. Slope One是一种简单易实现的用户推荐算法,而且可以增量训练

3. 结合以上两点,加上BigTable, HyperTable, Voldermort, Cassendera这种分布式key-value存储库,完全可以做到实时用户推荐(HBase甭提了)。

 

 

 

 

-----------------------------------------------------------------------------------------------------

附: hive生成的mr job描述.

hive> explain
    > INSERT OVERWRITE TABLE freq_diff
    > SELECT
    >   nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
    > FROM
    >   netflix nf1
    > JOIN
    >   netflix nf2 ON nf1.user_id = nf2.user_id
    > WHERE nf1.movie_id > nf2.movie_id
    > GROUP BY nf1.movie_id, nf2.movie_id;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF netflix nf1) (TOK_TABREF netflix nf2) (= (. (TOK_TABLE_OR_COL nf1) user_id) (. (TOK_TABLE_OR_COL nf2) user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB freq_diff)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf1) movie_id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf2) movie_id)) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (/ (TOK_FUNCTION sum (- (. (TOK_TABLE_OR_COL nf1) rate) (. (TOK_TABLE_OR_COL nf2) rate))) (TOK_FUNCTION count 1)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        nf2
          TableScan
            alias: nf2
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 1
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
        nf1
          TableScan
            alias: nf1
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 0
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col0} {VALUE._col2}
            1 {VALUE._col0} {VALUE._col2}
          outputColumnNames: _col0, _col2, _col4, _col6
          Filter Operator
            predicate:
                expr: (_col0 > _col4)
                type: boolean
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col4
                    type: string
                    expr: _col2
                    type: double
                    expr: _col6
                    type: double
              outputColumnNames: _col0, _col4, _col2, _col6
              Group By Operator
                aggregations:
                      expr: count(1)
                      expr: sum((_col2 - _col6))
                keys:
                      expr: _col0
                      type: string
                      expr: _col4
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://xxx:9000/user/zhoumin/hive-tmp/22895032/10002
            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              tag: -1
              value expressions:
                    expr: _col2
                    type: bigint
                    expr: _col3
                    type: double
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: count(VALUE._col0)
                expr: sum(VALUE._col1)
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col2
                  type: bigint
                  expr: (_col3 / _col2)
                  type: double
            outputColumnNames: _col0, _col1, _col2, _col3
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
                    expr: UDFToDouble(_col2)
                    type: double
                    expr: _col3
                    type: double
              outputColumnNames: _col0, _col1, _col2, _col3
              File Output Operator
                compressed: true
                GlobalTableId: 1
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: freq_diff

  Stage: Stage-0
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: freq_diff

 

 

8
1
分享到:
评论
6 楼 egoegmdslls 2012-05-24  
你好,我想请问你使用hive方法,每个节点的配置是怎么样的?
我这边19个节点,8核,16G,160G硬盘,
mapred.tasktracker.map.tasks.maximum = 7
mapred.tasktracker.reduce.tasks.maximum =  7
mapred.reduce.tasks =126
mapred.child.java.opts = -Xmx1024m
训练SQL到了74%,然后往后倒退到68%左右挂掉。。
是否您还做了其他的优化?
急~
5 楼 coderplay 2012-05-11  
cuijiwei1989 写道
您好,请问如何拿到原始数据文件?Netflix Prize 的数据

Google一下, 搜得到
4 楼 cuijiwei1989 2012-05-08  
您好,请问如何拿到原始数据文件?Netflix Prize 的数据
3 楼 iamicebergs 2010-11-08  
我在做分布式计算和数据挖掘方向的学习,每次都能搜到楼主的帖子,这个帖子挺不错的!
2 楼 asdfsx 2010-10-13  
这几天看《集体智慧编程》,正在研究相关的算法知识
并思考如何在分布式的环境中使用
这文章可以当教程收藏一下了
1 楼 itstarting 2009-11-18  
看起来有点吃力,但还是看完了,赞一个

相关推荐

    文档基于Slopeone算法的电影推荐系统电影推荐系统

    根据提供的文件信息,本文将详细解析“基于Slope One算法的电影推荐系统”的核心知识点,包括Slope One算法的基本原理、实现步骤以及在电影推荐系统中的应用等方面。 ### Slope One算法简介 Slope One算法是一种...

    一种基于机器学习的加权SlopeOne算法改进.doc

    加权SlopeOne算法是协同过滤推荐系统中的一种优化策略,旨在解决原始SlopeOne算法在处理大规模数据集时存在的问题。协同过滤是推荐系统中最常见的技术之一,它基于用户的历史行为来预测他们可能对未评价物品的兴趣。...

    SlopeOne推荐算法的Ruby实现_Ruby_下载.zip

    在Ruby编程语言中实现SlopeOne算法,可以方便地集成到各种Web应用或服务中,为用户提供个性化的商品或内容推荐。 一、SlopeOne算法简介 1. 基本原理:SlopeOne算法基于用户对物品的评分差异进行预测。对于用户u和...

    融合改进加权Slope One的协同过滤算法.docx

    相较于传统的协同过滤算法,Slope One 算法具有执行效率高、易于实现和维护的优点,同时推荐的准确性也相对较高。加权 Slope One 算法是 Slope One 算法的改进版本,它考虑到了对任意两个项目同时评分的用户数量,将...

    Algorithm-slopeone.zip

    《Slope One算法在协同过滤中的应用与PHP实现》 协同过滤是一种广泛应用于推荐系统中的机器学习算法,它通过分析用户的历史行为数据来预测用户可能对未评价项目的态度。其中,Slope One算法作为协同过滤的一种简单...

    基于项目属性相似和MapReduce并行化的Slope One算法 (2015年)

    首先计算项目间的属性相似度,并将其与Slope One算法相融合以提高预测精度,然后在Hadoop平台上对改进算法基于MapReduce进行并行化实现.在MovieLens数据集上的实验结果表明,相对于Slope One算法和加权Slope One...

    Mahout推荐算法usercf itemcf,slopeone三种算法实现

    本项目重点介绍了Mahout中的User-Based Collaborative Filtering(用户基协同过滤,UserCF)、Item-Based Collaborative Filtering(物品基协同过滤,ItemCF)以及Slope One算法的实现。 1. **User-Based ...

    SlopeOne算法

    在实际应用中,SlopeOne算法可以与其他推荐系统算法如Item-Based CF(基于物品的协同过滤)或Matrix Factorization(矩阵分解)结合使用,以提高预测精度。同时,为了优化性能,可以考虑使用缓存策略和并行计算技术...

    Slope One Predictors for Online Rating-Based Collaborative Filtering

    Slope One算法是一种用于在线评分推荐系统的协同过滤算法。它通过利用用户对已有物品的评分,来预测用户对未知物品的可能评分。Slope One算法的核心思想是,预先计算出不同物品之间评分的平均差异,这些差异来源于...

    基于用户相似度加权的Slope One 算法 (2016年)

    【Slope One 算法】Slope One算法是一种基于协同过滤的推荐系统方法,它依赖于简单的线性回归模型来预测用户对未评分项的评分。算法的核心思想是通过计算用户对已评分项目的评分差值(即斜率)来预测用户对未评分...

Global site tag (gtag.js) - Google Analytics