Welcome to aparke’s blog!
前言
本章实现的是歌曲推荐,使用的是ALS算法,ALS是spark.mllib中唯一的推荐算法,因为只有ALS算法可以进行并行运算。
使用的算法
- 交替最小二乘推荐算法
现在我们要给这个隐式反馈数据选择一个合适的推荐算法。这个数据及只记录了用户和歌曲之间的交互情况。
除了歌曲ID的名字外,数据集没有包含用户的信息,也没有提供歌手的其他任何信息。我们要找的学习算法不需要用户和艺术家的属性信息。这类算法通常称为协同过滤算法。举个例子:根据两个用户的年龄相同来判断他们可能有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这才叫协同过滤
- 矩阵分解模型
现在我们的用户和产品数据可以视为一个大矩阵A,矩阵第i 行和第j 列上的元素都有值,代表用户i 播放过艺术家 j 的音乐。可以预料到,矩阵A 是稀疏的,里面大多数元素为0.因为相对于所有可能的 用户 – 艺术家 组合,只有很少一部分组合会出现在数据中。这里我们使用的是一种矩阵分解模型( Non-negative matrix factorization)。
- 潜在因素
潜在因素(factor analysis)模型:
潜在因素模型试图通过数量相对较少的未被观察到的底层原因,来解释大量用户和产品之间可观察到的交互。
例如:有几千个专辑可选,为什么数百万人偏偏只买其中某些专辑?可以用对类别(可能只有数十种)的偏好来解释用户和专辑的关系,其中偏好信息并不能直接观察到,而数据也没有给出这些信息。
这里的偏好,其实就是我们上述要求的 k 里的值。这个k就是潜在因素,用于解释数据中的交互关系。由于k 很小,矩阵分解算法只能是某种近似。所以上述图里用的约等于号。
- 矩阵补全算法
矩阵分解算法有时也称为矩阵补全(matrix completion)算法。因为原始矩阵A 可能非常稀疏,但是乘积 是稠密的。
因为在我们计算出所有列里的系数(最优解)后,在使用 XYT 还原A时,大部分数据会被补全。
两个矩阵分别有一行对应每个用户和每个艺术家。每行的值很少,只有 k 个。每个值代表了对应模型的一个隐含特征。
因此行表示了用户和艺术家怎样关联到这些隐含特征,而隐含特征可能就对应偏好或类别。
于是问题就简化为 用户 – 特征矩阵 和 特征 – 艺术家矩阵 的乘积,该乘积的结果是对整个稠密的 用户 – 艺术家 相互关系矩阵的完整估计。
不幸的是,A = XYT 通常根本没有解。原因就是X 和 Y 通常不够大(严格来讲,就是矩阵的阶太小),无法完美表示 A。
这其实也是好事。A 只是所有可能出现的交互关系的一个微小样本。在某种程度上我们认为 A 是对基本事实的一次观察,它太稀疏,因此很难解释这个基本事实。但用少数几个因素(k 个)就能很好地解释这个基本事实。
XYT 应该尽可能逼近 A,毕竟这是所有后续工作的基础,但它不能也不应该完全复制A。然而同样不幸的是,想直接得到 X 和 Y 的最优解是不可能的。好消息是,如果 Y 已知,求 X 的最优解是非常容易的,反之亦然。但 X 和 Y 事先都是未知的
当然,我们仍有办法求解X 和 Y,其中之一便是交替最小二乘(Alternating Least Squares, ALS)法。Spark Mllib 的ALS 就是此算法的实现。
虽然 Y 是未知的,但我们可以把它初始化为随机行向量矩阵。接着运用简单的线性代数,就能在给定 A 和 Y 的条件下求出 X 的最优解。
实际上,X 的第 i 行是 A 的第 i 行和 Y 的函数,因此可以很容易分开计算 X 的每一行。因为 X 的每一行可以分开计算,所以我们可以将其并行化,而并行化是大规模计算的一大优点。
AiY(YTY)-1 = Xi
想要两边精确相等是不可能的,因此实际的目标是最小化 | AiY(YTY)-1 - Xi | ,或者最小化它们的平方误差(为了计算优化方便)。这也就是著名的最小二乘法。
这里给出的方程式只是为了说明行向量的计算方法,但实践中从来不会对矩阵求逆,我们会借助于QR 分解之类的方法,这种方法速度更快且更直接。
同理,我们可以由X 计算 Yj 。然后又可以由 Y 计算 X,这样反复下去。这就是算法中“交替”的由来。
这里 Y 是随机的,X是由最优化计算出来的,如此迭代下去,X 和 Y 最终会收敛得到一个合适的结果。
将 ALS 算法用于隐形数据矩阵分解时,ALS 矩阵分解稍微复杂一点。它不是直接分解输入矩阵 A,而是分解由 0 和 1 组成的矩阵 P,当 A 中元素为正时,P 中对应元素为 1,否则为 0。A 中的具体值会在后面以权重的形式反映出来。
ALS 算法也可以利用输入数据是稀疏的这一特点。稀疏的输入数据、可以用简单的线性代数运算求最优解,以及数据本身可并行化,这三点使得算法在大规模数据上速度非常快
##环境准备阶段
实验环境:spark-shell 单节点伪分布式 需要此环境在8G以上
数据集:https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005
数据文件 | 数据文件 |
---|---|
user_artist_data.txt | 包含该的是(用户ID、歌曲ID、用户听的次数) |
artist_data.txt | 这个文件包含的是(歌曲ID,歌曲名字) |
artist_alias.txt | 输入错误,或者不同某种原因,同一首歌曲可能具有不同ID,这个是歌曲勘误(bad_id, good_id) |
上传数据集至HDFS
1.启动hadoop
start-all.sh |
2.上传数据到hdfs中
3.在hdfs中创建aparke文件,将三个数据集上传到hdfs中
hadoop fs -mkdir /aparke |
确认上传无误后,启动spark-shell
由于数据集比较大,启动的时候需指定内存为6g以上
1.启动spark-shell
spark-shell --driver-memory 6g |
2.读入三个数据
val rawUserArtistData =sc.textFile("hdfs://localhost:9000/aparke/user_artist_data_1.txt") |
rawUserArtistData.map(_.split(' ')(0).toDouble).stats() |
//stats方法会返回每一列的最大值,最小值,均值、方差、总数等
查看核心数据第一行,数据各代表什么
val first = rawUserArtistData.first |
分别代表用户id 歌曲id 听歌次数
数据处理阶段
1.数据的缺失值处理,把空值和异常值用None代替
val artistByID = rawArtistData.flatMap { line => |
2.若字符串为 null,则返回None:
构建模型阶段
1.导入依赖包
import org.apache.spark.mllib.recommendation._ |
数据集的形式完全符合Spark Mllib 的ALS 算法实现的要求,但我们还需额外做两个转换:
第一, 如果歌曲ID 存在一个不同的正规 ID,我们要用别名数据集对象将所有的歌曲ID 转换成正规 ID。
第二, 需要把数据转成 Rating 对象,Rating 对象是ALS 算法实现对“用户 – 产品 – 值” 的抽象,其中产品指“向人们推荐的物品”。
val bArtistAlias = sc.broadcast(artistAlias) |
val trainData = rawUserArtistData.map { line => |
这里用了一个广播变量,将artistAlias 变量作为一个广播变量。广播变量的作用如下:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
广播变量主要用于在迭代中一直需要被访问的只读变量。它将此变量缓存在每个executor 里,以减少集群网络传输消耗Spark 执行一个阶段(stage)时,会为待执行函数建立闭包,也就是该阶段所有任务所需信息的二进制形式。这个闭包包括驱动程序里函数引用的所有数据结构。Spark 把这个闭包发送到集群的每个executor 上。
当许多任务需要访问同一个(不可变的)数据结构时,我们应该使用广播变量。它对任务闭包的常规处理进行扩展,是我们能够:1.在每个 executor 上将数据缓存为原始的 Java 对象,这样就不用为每个人物执行反序列化
2.在多个作业和阶段之间缓存数据在函数最后,我们调用了 cache() 以指示 Spark 在 RDD 计算好后将其暂时存储在集群的内存里。这样是有益的,因为 ALS 算法是 迭代的,通常情况下至少要访问该数据 10 次以上。如果不调用 cache(),那么每次要用到 RDD 时都需要从原始数据中重新计算。
2.构建模型
现在我们已有了训练数据 Rating,格式如下:
Rating(1000002,1,55.0)
分别对应用户id,歌曲id,听的次数
然后我们构建模型:
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0) |
找出用户ID为2093760的数据
val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter { case Array(user,_,_) => user.toInt == 2093760 } |
把歌曲的ID号转为int型
val existingProducts =rawArtistsForUser.map { |
根据artist_data.txt打印歌曲名
artistByID.filter { case (id, name) => |
利用刚刚训练好的模型给2093760用户推荐5首歌曲
val recommendations = model.recommendProducts(2093760, 5) |
输出结果是
Rating(2093760,2814,0.03569566367141015), Rating(2093760,1300642,0.03488258629334969), Rating(2093760,1001819,0.0345300595926977), Rating(2093760,1007614,0.03371890215740113), Rating(2093760,4605,0.033588342423490986)
每个Rating 包括用户id,艺术家id,以及一个评分值
结果中每行最后的数值是0到1之间的模糊值,值越大,推荐质量越好,分数越高代表用户越喜欢。
val recommendedProductIDs = recommendations.map(_.product).toSet |
artistByID.filter { case (id, name) => |
看得出这些推荐,虽然这些用户都比较受欢迎,而且是hippop之类的歌曲。但好像并没有针对用户的收听习惯进行个性化
参考文献:Spark高级数据分析第三章 音乐推荐和Audioscrobbler数据集
参考博客:https://www.cnblogs.com/zackstang/p/9425299.html
https://blog.csdn.net/u014779006/article/details/75095363
https://www.cnblogs.com/mr-totoro/p/5775759.html