- 浏览: 576880 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
用户推荐越来越热, Google使用MinHash, PLSI, LDA, SVD, SVM等算法,分析用户的喜好, 实现新闻的自动分类;新浪也用Slope One以及一些Item-based的算法对音乐进行推荐; 淘宝定期会启动MapReduce作业分析前一天或者一个月用户收藏的宝贝,给相同喜好的买家提供推荐服务。
本文要描述的Slope One 算法是一种对评分进行预测的算法, 它相对于SVD, PLSI, LDA这一类model-based算法来说有以下特点:
1. 简单, 容易实现
2. 训练得到的模型可以增量更新
3. 预测速度很快
4. 用户可以只做过一两次评分,就可以获得推荐.
5. 准确度比较理想
okay, 找到一篇介绍算法的:http://www.fuchaoqun.com/2008/09/slope_one/
讲的不错,就不再重复了。
英文wiki上也有介绍http://en.wikipedia.org/wiki/Slope_One
其中python的实现比较简洁
# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>. # # This software may be used and distributed according to the terms # of the GNU General Public License, version 2 or later, which is # incorporated herein by reference. class SlopeOne(object): def __init__(self): self.diffs = {} self.freqs = {} def predict(self, userprefs): preds, freqs = {}, {} for item, rating in userprefs.iteritems(): for diffitem, diffratings in self.diffs.iteritems(): try: freq = self.freqs[diffitem][item] except KeyError: continue preds.setdefault(diffitem, 0.0) freqs.setdefault(diffitem, 0) preds[diffitem] += freq * (diffratings[item] + rating) freqs[diffitem] += freq return dict([(item, value / freqs[item]) for item, value in preds.iteritems() if item not in userprefs and freqs[item] > 0]) def update(self, userdata): for ratings in userdata.itervalues(): for item1, rating1 in ratings.iteritems(): self.freqs.setdefault(item1, {}) self.diffs.setdefault(item1, {}) for item2, rating2 in ratings.iteritems(): self.freqs[item1].setdefault(item2, 0) self.diffs[item1].setdefault(item2, 0.0) self.freqs[item1][item2] += 1 self.diffs[item1][item2] += rating1 - rating2 print self.diffs[item1][item2] for item1, ratings in self.diffs.iteritems(): for item2 in ratings: ratings[item2] /= self.freqs[item1][item2] if __name__ == '__main__': userdata = dict( alice=dict(squid=1.0, cuttlefish=0.5, octopus=0.2), bob=dict(squid=1.0, octopus=0.5, nautilus=0.2), carole=dict(squid=0.2, octopus=1.0, cuttlefish=0.4, nautilus=0.4), dave=dict(cuttlefish=0.9, octopus=0.4, nautilus=0.5), ) s = SlopeOne() s.update(userdata) print s.predict(dict(octopus=0.4)
现在分析一下Slope One训练的空间及时间复杂度,
如果有m个用户,分别对n件物品进行了评分。每个用户得进行 n 2 次计算,将产生n(n-1)/2级别的数据量(由于diff是个对角矩阵,可以只取下三角)。所以对m个用户来说, CPU计算时间是m n 2 , 产生的中间数据是mn(n-1)/2,最后合并m个用户的这些数据,产生的数据量是n(n-1)/2。
这个算法的计算量对物品数据是呈平方级别地增长,对用户数量是线性的。比较恐怖的是它产生的中间数据,如果某用户物品评价数据为1MB左右, 且数据是double型占8字节, 则有1MB / 8B = 128K,此用户将产生的数据是1MB * (128K - 1) / 2 约为64GB数据量, 这部分中间数据是不可能放在内存的,只能通过磁盘,然而磁盘读写与主存完全不是一个级别,速度上又造成一个瓶颈。
当然也不必这么悲观, Slope One是一个可以进行增量的算法。假设已经对y件物品进行了训练,则当前训练的时间复杂度不会超过n 2 +my 2 . 撇开增量算法不管, 我们可以用MapReduce的优势分布式地进行训练(可以同时使用增量和MapReduce)。以Netflix Prize 的数据为例, 它包含480189个用户对17770部影片的评分数据,训练数据有17770个文件,每个文件代表一部影片, 其中第一行是影片的id, 其余行是各用户对此影片的评分记录。
MovieID:
CustomerID,Rating,Date
这些文件都比较小,最大的不过4793673字节,最小的才70字节,而MapReduce的文件块为64MB。小文件对于mapreduce任务来说是不利的,将会产生太多mapper. 这儿有一种解决办法,将tar包转成sequecefile .
省略此步,直接把解压后的文件put到HDFS,然后使用一道mapreduce把数据转成我们需要的格式。
hadoop dfs -put $NETFLIX_HOME/training_set /user/zhoumin/netflix-source # 将附件中的代码成slopeone-0.00.1-dev.jar后运行 hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOnePreproccessor /user/zhoumin/netflix-source/user/zhoumin/netflix
然后用SlopeOneTrainer进行训练。
SlopeOneTrainer的原理每个mapper计算一个用户各item的diff矩阵。了解hadoop中mapper运行机制的人就会发现,有的用户数据量大,很有可能产生上面说的数十GB的中间数据, 远远超过io.sort.mb的值。会造成mapper不停地merge数据,致使速度较慢, 使用36台个slaves的集群运行netflix的这部分训练花了4个多小时,绝大部分时间会花在mapper之上,特别是mapper的merge阶段.
于是假如把中间数据交给reducer去处理,更为理想,其实此步训练相当于一个join操作。于是使用hive比较方便。先将原始数据转成hive所需要的格式.
hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOneHive /user/zhoumin/netflix-source /user/zhoumin/netflix-hive
然后再建立两张表,netflix是处理后的netflix训练数据, freq_diff是训练后的模型矩阵
CREATE EXTERNAL TABLE netflix( movie_id STRING, user_id STRING, rate DOUBLE, rate_date STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/user/zhoumin/netflix-hive'; CREATE TABLE freq_diff ( movie_id1 STRING, movie_id2 STRING, freq DOUBLE, diff DOUBLE );
okay,运行训练SQL
INSERT OVERWRITE TABLE freq_diff SELECT nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1) FROM netflix nf1 JOIN netflix nf2 ON nf1.user_id = nf2.user_id WHERE nf1.movie_id > nf2.movie_id GROUP BY nf1.movie_id, nf2.movie_id;
此SQL将会产生两道mapreduce job,使用 explain命令即可以看到, 第一道主要做join的工作,在reduce端会输
出所有的中间数据。Hive自动会调整reducer的数量,但这儿的reducer为3, 跑得比较慢(超过9小时),可以将reducer显式地设大些,我这儿设为160,再跑上面的训练SQL.
set mapred.reduce.tasks=160;
两道job第一道花了33mins, 35sec,第二道花了1hrs, 29mins, 29sec,训练时间总共约2小时,可以接受。
训练完毕,就可以试一试预测功能了。假设某用户给影片1000评了2分,那么他将会对其它影片评多少分呢? 他将喜欢哪些影片呢?
okay,先做些准备工作
CREATE TABLE predict( movie_id STRING, rate FLOAT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; echo "1000,2" > predict_data LOAD DATA LOCAL INPATH './predict_data' OVERWRITE INTO TABLE predict;
然后就可以进行预测了:
CREATE TABLE slopeone_result( movie_id STRING, freq DOUBLE, pref DOUBLE, rate DOUBLE ); INSERT OVERWRITE TABLE slopeone_result SELECT /*+ MAPJOIN(p) */ movie_id1 as movie_id, sum(freq) as freq, sum(freq*(diff + rate)) as pref, sum(freq*(diff + rate))/sum(freq) as rate FROM predict p JOIN freq_diff fd ON fd.movie_id2 = p.movie_id GROUP BY movie_id1
注意上面使用了一个Map-Side Join的hint, 因为predict表非常小,只需要跑一个map only的job就可以完成join,无需shuffle数据给reduce. 这一步把用户自身的movie_id也参与计算,由于hive不支持in,所以结果有些偏差。可以用一道MapReduce作业来做预测这一步。
最后select .. order by一下就知道此用户喜欢哪些影片了。
结论:
1. 使用mapreduce,将运算移至reduce端, 避免map端的merge可以有效地提高训练速度
2. Slope One是一种简单易实现的用户推荐算法,而且可以增量训练
3. 结合以上两点,加上BigTable, HyperTable, Voldermort, Cassendera这种分布式key-value存储库,完全可以做到实时用户推荐(HBase甭提了)。
-----------------------------------------------------------------------------------------------------
附: hive生成的mr job描述.
hive> explain
> INSERT OVERWRITE TABLE freq_diff
> SELECT
> nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
> FROM
> netflix nf1
> JOIN
> netflix nf2 ON nf1.user_id = nf2.user_id
> WHERE nf1.movie_id > nf2.movie_id
> GROUP BY nf1.movie_id, nf2.movie_id;
OK
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF netflix nf1) (TOK_TABREF netflix nf2) (= (. (TOK_TABLE_OR_COL nf1) user_id) (. (TOK_TABLE_OR_COL nf2) user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB freq_diff)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf1) movie_id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf2) movie_id)) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (/ (TOK_FUNCTION sum (- (. (TOK_TABLE_OR_COL nf1) rate) (. (TOK_TABLE_OR_COL nf2) rate))) (TOK_FUNCTION count 1)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
nf2
TableScan
alias: nf2
Reduce Output Operator
key expressions:
expr: user_id
type: string
sort order: +
Map-reduce partition columns:
expr: user_id
type: string
tag: 1
value expressions:
expr: movie_id
type: string
expr: rate
type: double
nf1
TableScan
alias: nf1
Reduce Output Operator
key expressions:
expr: user_id
type: string
sort order: +
Map-reduce partition columns:
expr: user_id
type: string
tag: 0
value expressions:
expr: movie_id
type: string
expr: rate
type: double
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col2}
1 {VALUE._col0} {VALUE._col2}
outputColumnNames: _col0, _col2, _col4, _col6
Filter Operator
predicate:
expr: (_col0 > _col4)
type: boolean
Select Operator
expressions:
expr: _col0
type: string
expr: _col4
type: string
expr: _col2
type: double
expr: _col6
type: double
outputColumnNames: _col0, _col4, _col2, _col6
Group By Operator
aggregations:
expr: count(1)
expr: sum((_col2 - _col6))
keys:
expr: _col0
type: string
expr: _col4
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://xxx:9000/user/zhoumin/hive-tmp/22895032/10002
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: string
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: double
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: sum(VALUE._col1)
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: string
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: bigint
expr: (_col3 / _col2)
type: double
outputColumnNames: _col0, _col1, _col2, _col3
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: UDFToDouble(_col2)
type: double
expr: _col3
type: double
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: true
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: freq_diff
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: freq_diff
- slopeone.zip (4.1 KB)
- 下载次数: 279
评论
我这边19个节点,8核,16G,160G硬盘,
mapred.tasktracker.map.tasks.maximum = 7
mapred.tasktracker.reduce.tasks.maximum = 7
mapred.reduce.tasks =126
mapred.child.java.opts = -Xmx1024m
训练SQL到了74%,然后往后倒退到68%左右挂掉。。
是否您还做了其他的优化?
急~
Google一下, 搜得到
并思考如何在分布式的环境中使用
这文章可以当教程收藏一下了
发表评论
-
并行支持向量机
2009-04-13 12:48 2137学校开题的东西,分享一下。 -
SVM的并行化
2009-03-10 13:33 7999目前我在SVM的并行化方面已经有解法. SVM在数学上的本质是 ... -
关于redpoll中使用mahout模块,而没有沿用其中算法的解答
2008-11-07 19:03 3248接到mail, 公布出来省得再有提问 :) 首先, ... -
关于canopy聚类的几点思考
2008-05-10 12:47 50951. 首先是轻量距离量度的选择,是选择数据模型其中的一个属性, ... -
canopy-clustering执行顺序
2008-05-08 14:08 5386好记性不如烂笔头,记一下: NetflixDataPrep ... -
popular clustering techniques
2008-04-27 13:24 4126k-Means, k-Medoids, Kernel Clus ... -
hama -- a parallel matrix computational package
2008-04-02 01:42 4487Today, I accidently found an in ... -
redpoll and mahout
2008-04-02 01:20 4386Days before, I've submitted an ... -
redpoll is launched
2008-03-23 14:30 4380We are pleased to introduce a n ...
相关推荐
根据提供的文件信息,本文将详细解析“基于Slope One算法的电影推荐系统”的核心知识点,包括Slope One算法的基本原理、实现步骤以及在电影推荐系统中的应用等方面。 ### Slope One算法简介 Slope One算法是一种...
加权SlopeOne算法是协同过滤推荐系统中的一种优化策略,旨在解决原始SlopeOne算法在处理大规模数据集时存在的问题。协同过滤是推荐系统中最常见的技术之一,它基于用户的历史行为来预测他们可能对未评价物品的兴趣。...
在Ruby编程语言中实现SlopeOne算法,可以方便地集成到各种Web应用或服务中,为用户提供个性化的商品或内容推荐。 一、SlopeOne算法简介 1. 基本原理:SlopeOne算法基于用户对物品的评分差异进行预测。对于用户u和...
相较于传统的协同过滤算法,Slope One 算法具有执行效率高、易于实现和维护的优点,同时推荐的准确性也相对较高。加权 Slope One 算法是 Slope One 算法的改进版本,它考虑到了对任意两个项目同时评分的用户数量,将...
《Slope One算法在协同过滤中的应用与PHP实现》 协同过滤是一种广泛应用于推荐系统中的机器学习算法,它通过分析用户的历史行为数据来预测用户可能对未评价项目的态度。其中,Slope One算法作为协同过滤的一种简单...
首先计算项目间的属性相似度,并将其与Slope One算法相融合以提高预测精度,然后在Hadoop平台上对改进算法基于MapReduce进行并行化实现.在MovieLens数据集上的实验结果表明,相对于Slope One算法和加权Slope One...
本项目重点介绍了Mahout中的User-Based Collaborative Filtering(用户基协同过滤,UserCF)、Item-Based Collaborative Filtering(物品基协同过滤,ItemCF)以及Slope One算法的实现。 1. **User-Based ...
在实际应用中,SlopeOne算法可以与其他推荐系统算法如Item-Based CF(基于物品的协同过滤)或Matrix Factorization(矩阵分解)结合使用,以提高预测精度。同时,为了优化性能,可以考虑使用缓存策略和并行计算技术...
Slope One算法是一种用于在线评分推荐系统的协同过滤算法。它通过利用用户对已有物品的评分,来预测用户对未知物品的可能评分。Slope One算法的核心思想是,预先计算出不同物品之间评分的平均差异,这些差异来源于...
【Slope One 算法】Slope One算法是一种基于协同过滤的推荐系统方法,它依赖于简单的线性回归模型来预测用户对未评分项的评分。算法的核心思想是通过计算用户对已评分项目的评分差值(即斜率)来预测用户对未评分...