本项目涉及的音乐推荐系统的主要功能
1.根据用户的行为来推荐歌曲。
2.根据歌曲找出相似歌曲。
3.预测用户的听歌兴趣。
1.零启动问题。(新歌或几乎没有用户听过)
2.用户兴趣预测问题。
surprise库简介:是python中的一个库,是scikit系列中的一个。
优点:支持多种推荐算法。例如基础算法、基于近邻方法(协同过滤)和矩阵分解方法。
其中基于近邻的方法(协同过滤)可以设定不同的度量准则,分别有cosine(向量余弦距离)、msd(欧氏距离)、pearson(均值)、pearson baseline和Jaccard similarity(交集元素个数或并集元素个数)。
同时还支持不同的评估准则,包括mse、msd和fcp。
一、基本流程:
1.获取原始数据
运用python的爬虫程序获取网易云上的用户音乐数据。
2.数据处理
按分隔符隔开,统一转化为固定简介的格式,最终分解成user item rating timestamp的形式。
2.1 原始数据=>歌单数据
抽取 歌单名称,歌单id,收藏数,所属分类 4个歌单维度的信息
抽取 歌曲id,歌曲名,歌手,歌曲热度 等4个维度信息歌曲的信息
2.2 歌单数据=>推荐系统格式数据
主流的python推荐系统框架,支持的最基本数据格式为movielens dataset,其评分数据格式为 user item rating timestamp,为了简单,我们也把数据处理成这个格式。
1.针对用户推荐网易云音乐(每日30首歌/7首歌)
2.针对歌曲在你听某首歌的时候,找“相似歌曲”
2.3 保存歌单和歌曲信息备用
我们需要保存歌单id=>歌单名和歌曲id=>歌曲名的信息后期备用。
二、在数据集上训练模型
1.用协同过滤构建模型并进行预测
1.1 利用movielens的例子
from surprise import KNNWithMeans
from surprise import Dataset
from surprise import evaluate, print_perf
# 默认载入movielens数据集
data = Dataset.load_builtin('ml-100k')
# k折交叉验证(k=3)
data.split(n_folds=3)
# 试一把SVD矩阵分解
algo = KNNWithMeans()
# 在数据集上测试一下效果
perf = evaluate(algo, data, measures=['RMSE', 'MAE'])
#输出结果
print_perf(perf)
1.2 根据一个歌曲得到相似度最高的歌曲
1)用算法计算相互间的相似度
2)获取歌曲名到歌曲id和歌曲id到歌曲名的映射
3)拿出来具体某首歌曲对应的item id
4)找到最近的10个邻居
5)从近邻的id映射回歌曲名称
1.3 音乐预测的例子
1)重建歌单id到歌单名的映射字典
2)重建歌单名到歌单id的映射字典
3)指定文件格式
4)从文件读取数据
5)计算歌曲和歌曲之间的相似度
1.4寻找相似度最近的歌单
1)取出近邻并映射名字到id
2)取出来对应的内部user id => to_inner_uid
3)把歌曲id转成歌曲名字并将to_raw_uid映射回去
1.5 对用户的兴趣进行预测
1)重建歌曲id到歌曲名的映射字典
2)重建歌曲名到歌曲id的映射字典
三、用矩阵分解进行预测(需要使用NMF)
1)指定文件格式
2)文件读取数据
3)构建数据集和建模
四、模型的存储
import surprise
surprise.dump.dump('./recommendation.model', algo=algo)
# 可以用下面的方式载入
algo = surprise.dump.load('./recommendation.model')
五、不同的推荐系统算法评估
载入数据:指定文件路径、指定文件格式、从文件读取数据、分成5折。
### 使用NormalPredictor
from surprise import NormalPredictor, evaluate
algo = NormalPredictor()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
### 使用BaselineOnly
from surprise import BaselineOnly, evaluate
algo = BaselineOnly()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
### 使用基础版协同过滤
from surprise import KNNBasic, evaluate
algo = KNNBasic()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
### 使用均值协同过滤
from surprise import KNNWithMeans, evaluate
algo = KNNWithMeans()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
### 使用协同过滤baseline
from surprise import KNNBaseline, evaluate
algo = KNNBaseline()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
### 使用SVD
from surprise import SVD, evaluate
algo = SVD()
perf = evaluate(algo, music_data, measures=['RMSE', 'MAE'])
六、歌曲序列建模
因为计算机只能处理数字,在自然语言情况(NLP)下,需要将每个词编码成一个向量,这样机器可以出现这样的词。例如,我是中国人=>我 是 中国 人,我=>[1,0,0,0]。但是,有两个词之间可能会有一定的相似度,比如“喜欢”和“稀罕”,这样用向量就看不出相似度。
以上过程称为word to vector,将一个词映射成向量,向量之间可以计算ditstance similarity。
我们在自然语言处理的机器学习问题中,需要把词映射成词向量,最简单的方式是one-hot,而为了达到更好的效果,我们通常需要捕捉一下近义词,该方法就是word2vec。
word2vec的底层原理是,“物以类聚,人以群分”,根据周边的item来判断两者之间的关联度或相似度,在本文中,判断两首歌的相似度就是根据周边的item出现情况。所以针对相似度较高的两个序列,也需要将映射成相似的向量。
word2vec使用方式是,设定一个超参数,判断一个词周边相近的词语,交给机器来学习。其实就是把词映射成一定维度的稠密向量,同时保持住词与词之间的关联度,主要体现在欧氏距离上的远近上。
1.1 从word2vec到song2vec
有一个很重要的python库是gensim,整个word2vec是在其中完成。在本文中的歌曲之间是没有顺序的,所以要用shuffle打乱机制(为了尽可能找到更多的关联歌曲)。
先初始化歌单序列,再进行分割,按照固定格式进行重塑,再不断地解析跟单序列。需要不断地打乱歌单的顺序。最后加到playlist中,它本身就是个列表,里面每个元素也是一个列表。
训练该模型,最后又model和songdic字典部分,最近的歌用most_similiar()。输出歌和相似度。
#coding: utf-8
import multiprocessing
import gensim
import sys
from random import shuffle
def parse_playlist_get_sequence(in_line, playlist_sequence):
song_sequence = []
contents = in_line.strip().split("\t")
# 解析歌单序列
for song in contents[1:]:
try:
song_id, song_name, artist, popularity = song.split(":::")
song_sequence.append(song_id)
except:
print "song format error"
print song+"\n"
for i in range(len(song_sequence)):
shuffle(song_sequence)
playlist_sequence.append(song_sequence)
def train_song2vec(in_file, out_file):
#所有歌单序列
playlist_sequence = []
#遍历所有歌单
for line in open(in_file):
parse_playlist_get_sequence(line, playlist_sequence)
#使用word2vec训练
cores = multiprocessing.cpu_count()#使用cpu跑程序
print "using all "+str(cores)+" cores"
print "Training word2vec model..."
model = gensim.models.Word2Vec(sentences=playlist_sequence, size=150, min_count=3, window=7, workers=cores)
print "Saving model..."
model.save(out_file)
以上代码是word2vec的训练过程,以下是利用该训练结果来寻找相似歌曲的代码。
song_id_list = song_dic.keys()[1000:1500:50]
for song_id in song_id_list:
result_song_list = model.most_similar(song_id)
print song_id, song_dic[song_id]
print "\n相似歌曲 和 相似度 分别为:"
for song in result_song_list:
print "\t", song_dic[song[0]], song[1]
print "\n"
七、序列建模的进一步思考
冷启动问题
描述:一首歌刚刚推送出来,怎么把它推荐给用户;我听完一首很冷门的歌,怎么根据这首歌给我推荐。
总体来说就是有用的信息太少,很少用户在上面发生行为,那就很难关联。
解决方法:
(1)推荐热门的歌曲。因为根据马太效应,人们会去更热门的歌曲。
(2)把song里的歌曲替换成歌手,重新建模,从而能够得到与这个歌手最接近的歌手。
用户兴趣预测问题
每个人的兴趣是有时效性的,不同的年份可能喜欢不同类型的歌曲,如去年我喜欢摇滚类型,今年喜欢二次元动漫风格的。
解决方法:把每个用户喜欢或收藏的歌,沿着时间轴排序好,同时由近到远给不同的衰减因子,同时针对不同的歌曲热度,给定不同的推荐因子,每一首歌都可以拿回一个song2vec的推荐列表和对应的相似度,对相似度以时间衰减因子和热度权重进行加权,最后的结果排序后,展示给用户。
PS:不可能所有的歌曲推荐在online上算好,一般是提前一天就算好了每天的歌曲推荐。
八、Tensorflow
前言:给用户做推荐时,无法一次性全部将歌曲载入到内存。由于受到硬件设备的限制,然而数据是在实时增加的,能不能在online处理一部分的数据。
0)矩阵分解回顾
将m x n的打分矩阵,分解为只跟用户有关的打分数据。
LFM:把用户在item上的打分行为,看作是内部依据的,认为和k个factor有关系,每一个user i会有一个用户的向量(k维),每一个item会有一个item的向量(k维)。因为每个用户会有倾向性,比如我会更喜欢高桥李依配音的动漫。
SVD是矩阵分解的一种方式。
预测公式:用户的偏置项(类似直线的截距)[global+user+item]+用户向量和物品向量的内积。
机器学习肯定要定义损失函数,我们需要最小化的loss计算如下:
$\sum_{u, i} |y_{pred[u, i]} - y_{true[u, i]}|^2 + \lambda(|embedding_{user[u]}|^2 + |embedding_{item[i]}|^2)$
1)获取数据
依旧以movielens为例,数据格式为user item rating timestamp。
2)数据处理部分
import numpy(用于科学计算)和pandas(用于数据分析和处理)。
shuffledataIterator方法用于打乱顺序,并从中取出需要的一批数据。
Oneepochdataiterator方法用于顺序产出数据。
from __future__ import absolute_import, division, print_function
import numpy as np
import pandas as pd
def read_data_and_process(filname, sep="\t"):
col_names = ["user", "item", "rate", "st"]
df = pd.read_csv(filname, sep=sep, header=None, names=col_names, engine='python')
df["user"] -= 1
df["item"] -= 1
for col in ("user", "item"):
df[col] = df[col].astype(np.int32)
df["rate"] = df["rate"].astype(np.float32)
return df
class ShuffleDataIterator(object):
"""
随机生成一个batch一个batch数据
"""
#初始化
def __init__(self, inputs, batch_size=10):
self.inputs = inputs
self.batch_size = batch_size
self.num_cols = len(self.inputs)
self.len = len(self.inputs[0])
self.inputs = np.transpose(np.vstack([np.array(self.inputs[i]) for i in range(self.num_cols)]))
#总样本量
def __len__(self):
return self.len
def __iter__(self):
return self
#取出下一个batch
def __next__(self):
return self.next()
#随机生成batch_size个下标,取出对应的样本
def next(self):
ids = np.random.randint(0, self.len, (self.batch_size,))
out = self.inputs[ids, :]
return [out[:, i] for i in range(self.num_cols)]
class OneEpochDataIterator(ShuffleDataIterator):
"""
顺序产出一个epoch的数据,在测试中可能会用到
"""
def __init__(self, inputs, batch_size=10):
super(OneEpochDataIterator, self).__init__(inputs, batch_size=batch_size)
if batch_size > 0:
self.idx_group = np.array_split(np.arange(self.len), np.ceil(self.len / batch_size))
else:
self.idx_group = [np.arange(self.len)]
self.group_id = 0
def next(self):
if self.group_id >= len(self.idx_group):
self.group_id = 0
raise StopIteration
out = self.inputs[self.idx_group[self.group_id], :]
self.group_id += 1
return [out[:, i] for i in range(self.num_cols)]
3)搭建模型
最终预测的得分为user的向量和item的向量做一个乘积。
后面有点模糊。。。
import tensorflow as tf
# 使用矩阵分解搭建的网络结构
def inference_svd(user_batch, item_batch, user_num, item_num, dim=5, device="/cpu:0"):
#使用CPU
with tf.device("/cpu:0"):
# 初始化几个bias项
global_bias = tf.get_variable("global_bias", shape=[])
w_bias_user = tf.get_variable("embd_bias_user", shape=[user_num])
w_bias_item = tf.get_variable("embd_bias_item", shape=[item_num])
# bias向量
bias_user = tf.nn.embedding_lookup(w_bias_user, user_batch, name="bias_user")
bias_item = tf.nn.embedding_lookup(w_bias_item, item_batch, name="bias_item")
w_user = tf.get_variable("embd_user", shape=[user_num, dim],
initializer=tf.truncated_normal_initializer(stddev=0.02))
w_item = tf.get_variable("embd_item", shape=[item_num, dim],
initializer=tf.truncated_normal_initializer(stddev=0.02))
# user向量与item向量
embd_user = tf.nn.embedding_lookup(w_user, user_batch, name="embedding_user")
embd_item = tf.nn.embedding_lookup(w_item, item_batch, name="embedding_item")
with tf.device(device):
# 按照实际公式进行计算
# 先对user向量和item向量求内积
infer = tf.reduce_sum(tf.multiply(embd_user, embd_item), 1)
# 加上几个偏置项
infer = tf.add(infer, global_bias)
infer = tf.add(infer, bias_user)
infer = tf.add(infer, bias_item, name="svd_inference")
# 加上正则化项
regularizer = tf.add(tf.nn.l2_loss(embd_user), tf.nn.l2_loss(embd_item), name="svd_regularizer")
return infer, regularizer
# 迭代优化部分
def optimization(infer, regularizer, rate_batch, learning_rate=0.001, reg=0.1, device="/cpu:0"):
global_step = tf.train.get_global_step()
assert global_step is not None
# 选择合适的optimizer做优化
with tf.device(device):
cost_l2 = tf.nn.l2_loss(tf.subtract(infer, rate_batch))
penalty = tf.constant(reg, dtype=tf.float32, shape=[], name="l2")
cost = tf.add(cost_l2, tf.multiply(regularizer, penalty))
train_op = tf.train.AdamOptimizer(learning_rate).minimize(cost, global_step=global_step)
return cost, train_op
4)数据上的模型训练
import time
from collections import deque
import numpy as np
import tensorflow as tf
from six import next
from tensorflow.core.framework import summary_pb2
np.random.seed(13575)
# 一批数据的大小
BATCH_SIZE = 2000
# 用户数
USER_NUM = 6040
# 电影数
ITEM_NUM = 3952
# factor维度
DIM = 15
# 最大迭代轮数
EPOCH_MAX = 200
# 使用cpu做训练
DEVICE = "/cpu:0"
# 截断
def clip(x):
return np.clip(x, 1.0, 5.0)
# 这个是方便Tensorboard可视化做的summary
def make_scalar_summary(name, val):
return summary_pb2.Summary(value=[summary_pb2.Summary.Value(tag=name, simple_value=val)])
# 调用上面的函数获取数据
def get_data():
df = read_data_and_process("./movielens/ml-1m/ratings.dat", sep="::")
rows = len(df)
df = df.iloc[np.random.permutation(rows)].reset_index(drop=True)
split_index = int(rows * 0.9)
df_train = df[0:split_index]
df_test = df[split_index:].reset_index(drop=True)
print(df_train.shape, df_test.shape)
return df_train, df_test
# 实际训练过程
def svd(train, test):
samples_per_batch = len(train) // BATCH_SIZE
# 一批一批数据用于训练
iter_train = ShuffleDataIterator([train["user"],
train["item"],
train["rate"]],
batch_size=BATCH_SIZE)
# 测试数据
iter_test = OneEpochDataIterator([test["user"],
test["item"],
test["rate"]],
batch_size=-1)
# user和item batch
user_batch = tf.placeholder(tf.int32, shape=[None], name="id_user")
item_batch = tf.placeholder(tf.int32, shape=[None], name="id_item")
rate_batch = tf.placeholder(tf.float32, shape=[None])
# 构建graph和训练
infer, regularizer = inference_svd(user_batch, item_batch, user_num=USER_NUM, item_num=ITEM_NUM, dim=DIM,
device=DEVICE)
global_step = tf.contrib.framework.get_or_create_global_step()
_, train_op = optimization(infer, regularizer, rate_batch, learning_rate=0.001, reg=0.05, device=DEVICE)
# 初始化所有变量
init_op = tf.global_variables_initializer()
# 开始迭代
with tf.Session() as sess:
sess.run(init_op)
summary_writer = tf.summary.FileWriter(logdir="/tmp/svd/log", graph=sess.graph)
print("{} {} {} {}".format("epoch", "train_error", "val_error", "elapsed_time"))
errors = deque(maxlen=samples_per_batch)
start = time.time()
for i in range(EPOCH_MAX * samples_per_batch):
users, items, rates = next(iter_train)
_, pred_batch = sess.run([train_op, infer], feed_dict={user_batch: users,
item_batch: items,
rate_batch: rates})
pred_batch = clip(pred_batch)
errors.append(np.power(pred_batch - rates, 2))
if i % samples_per_batch == 0:
train_err = np.sqrt(np.mean(errors))
test_err2 = np.array([])
for users, items, rates in iter_test:
pred_batch = sess.run(infer, feed_dict={user_batch: users,
item_batch: items})
pred_batch = clip(pred_batch)
test_err2 = np.append(test_err2, np.power(pred_batch - rates, 2))
end = time.time()
test_err = np.sqrt(np.mean(test_err2))
print("{:3d} {:f} {:f} {:f}(s)".format(i // samples_per_batch, train_err, test_err,
end - start))
train_err_summary = make_scalar_summary("training_error", train_err)
test_err_summary = make_scalar_summary("test_error", test_err)
summary_writer.add_summary(train_err_summary, i)
summary_writer.add_summary(test_err_summary, i)
start = end
最后可以看到[损失值 消耗时间]
九、数据实在太大时利用spark处理
前言:在工业界都会使用分布式方式处理数据,比如spark。
基于用户的协同过滤使用余弦相似度。
参考文献资料
bilibili 机器学习实战
代码全是搬运的