数据分析学习笔记2:基于电影评分的推荐系统
本实验基于协同过滤算法成功构建了一个电影评分预测模型与推荐模型。未来改进方向:1.预测分数均为5.0,与现实情况相比比较失真,算法仍需改进;2.后续可进行前端系统的构建,提高系统的可使用性。
一、实验背景与目的
评分系统是一种常见的推荐系统。本实验使用PYTHON等语言基于协同过滤算法来构建一个电影评分预测模型。
数据来源:明尼苏达州大学的社会化计算研究中心官网,网站链接为https://grouplens.org/datasets/movielens/
数据集内容:本实验使用官网下载的movies.csv与ratings.csv中的数据进行系统的搭建。其中,movie.csv中包含movieId(电影序号)、title(电影标题)、genres(电影分级)3列变量;ratings.csv中包含userId(用户序号)、movieId(电影序号)、rating(电影评分)、timestamp(时间戳)。
二、代码实现
import sys
import random
import math
import os
from operator import itemgetter
from collections import defaultdict
class MovieRecommender:
def __init__(self):
"""初始化推荐系统"""
self.user_avg = {} # 用户平均评分
self.movies = {} # 电影ID到标题的映射
self.ratings = [] # 评分数据列表,格式为(userId, movieId, rating)
self.user_ratings = defaultdict(dict) # 用户评分数据:userId -> {movieId: rating}
self.movie_ratings = defaultdict(dict) # 电影评分数据:movieId -> {userId: rating}
self.user_similarities = {} # 用户相似度缓存
def load_and_preprocess_data(self, movies_file, ratings_file):
"""加载并预处理数据
参数:
movies_file: 电影数据文件路径
ratings_file: 评分数据文件路径
处理步骤:
1. 加载原始数据
2. 处理缺失值
3. 处理异常值
4. 数据格式转换
"""
# 1. 加载电影数据并预处理
movie_ids = set()
with open(movies_file, 'r', encoding='utf-8') as f:
header = next(f) # 读取标题行
for line_num, line in enumerate(f, 2): # 从第2行开始
try:
parts = line.strip().split(',')
# 检查字段数量是否足够
if len(parts) < 3:
print(f"警告: 电影文件第{line_num}行数据不完整,已跳过")
continue
movie_id = int(parts[0])
# 检查重复的电影ID
if movie_id in movie_ids:
print(f"警告: 重复的电影ID {movie_id},已跳过")
continue
movie_ids.add(movie_id)
title = ','.join(parts[1:-1]) # 处理标题中可能包含的逗号
genres = parts[-1]
# 检查标题是否为空
if not title.strip():
print(f"警告: 电影ID {movie_id} 标题为空,已跳过")
continue
self.movies[movie_id] = title
except (ValueError, IndexError) as e:
print(f"错误: 处理电影文件第{line_num}行时出错 - {str(e)},已跳过")
continue
# 2. 加载评分数据并预处理
with open(ratings_file, 'r', encoding='utf-8') as f:
header = next(f) # 读取标题行
for line_num, line in enumerate(f, 2): # 从第2行开始
try:
parts = line.strip().split(',')
# 检查字段数量是否足够
if len(parts) < 4:
print(f"警告: 评分文件第{line_num}行数据不完整,已跳过")
continue
user_id = int(parts[0])
movie_id = int(parts[1])
rating = float(parts[2])
timestamp = parts[3] # 时间戳暂时不使用
# 检查电影ID是否存在
if movie_id not in self.movies:
print(f"警告: 电影ID {movie_id} 不存在于电影数据中,已跳过")
continue
# 检查评分是否在有效范围内(0.5-5.0)
if rating < 0.5 or rating > 5.0:
print(f"警告: 用户 {user_id} 对电影 {movie_id} 的评分 {rating} 超出范围,已跳过")
continue
# 检查评分是否为0.5的倍数
if not math.isclose(rating * 2, round(rating * 2)):
print(f"警告: 用户 {user_id} 对电影 {movie_id} 的评分 {rating} 不是0.5的倍数,已跳过")
continue
self.ratings.append((user_id, movie_id, rating))
self.user_ratings[user_id][movie_id] = rating
self.movie_ratings[movie_id][user_id] = rating
except (ValueError, IndexError) as e:
print(f"错误: 处理评分文件第{line_num}行时出错 - {str(e)},已跳过")
continue
# 3. 后处理 - 移除评分数量过少的用户和电影
self._remove_sparse_data()
# 计算用户平均评分
for user in self.user_ratings:
ratings = self.user_ratings[user].values()
self.user_avg[user] = sum(ratings) / len(ratings)
def _remove_sparse_data(self, min_user_ratings=5, min_movie_ratings=5):
"""移除评分数量过少的用户和电影
参数:
min_user_ratings: 用户保留的最小评分数量
min_movie_ratings: 电影保留的最小评分数量
"""
print("\n正在进行数据稀疏性处理...")
# 初始统计
original_user_count = len(self.user_ratings)
original_movie_count = len(self.movie_ratings)
original_rating_count = len(self.ratings)
# 过滤评分过少的用户
users_to_keep = set()
for user, ratings in self.user_ratings.items():
if len(ratings) >= min_user_ratings:
users_to_keep.add(user)
# 过滤评分过少的电影
movies_to_keep = set()
for movie, ratings in self.movie_ratings.items():
if len(ratings) >= min_movie_ratings:
movies_to_keep.add(movie)
# 重建数据结构
new_ratings = []
new_user_ratings = defaultdict(dict)
new_movie_ratings = defaultdict(dict)
for user, movie, rating in self.ratings:
if user in users_to_keep and movie in movies_to_keep:
new_ratings.append((user, movie, rating))
new_user_ratings[user][movie] = rating
new_movie_ratings[movie][user] = rating
# 更新数据
self.ratings = new_ratings
self.user_ratings = new_user_ratings
self.movie_ratings = new_movie_ratings
# 更新电影字典,只保留存在的电影
self.movies = {k: v for k, v in self.movies.items() if k in movies_to_keep}
# 打印处理结果
print(f"原始数据 - 用户: {original_user_count}, 电影: {original_movie_count}, 评分: {original_rating_count}")
print(f"处理后数据 - 用户: {len(self.user_ratings)}, 电影: {len(self.movie_ratings)}, 评分: {len(self.ratings)}")
print(f"过滤掉了 {original_user_count - len(self.user_ratings)} 个用户和 {original_movie_count - len(self.movie_ratings)} 部电影")
def cosine_similarity(self, user1, user2):
"""计算两个用户之间的余弦相似度
参数:
user1: 用户1的ID
user2: 用户2的ID
返回:
两个用户的余弦相似度值(0-1之间)
"""
# 获取两个用户共同评价过的电影
common_movies = set(self.user_ratings[user1].keys()) & set(self.user_ratings[user2].keys())
if not common_movies:
return 0 # 如果没有共同评价的电影,相似度为0
# 计算点积和模长
dot_product = 0
mag1 = 0
mag2 = 0
for movie in common_movies:
rating1 = self.user_ratings[user1][movie]
rating2 = self.user_ratings[user2][movie]
dot_product += rating1 * rating2
mag1 += rating1 ** 2
mag2 += rating2 ** 2
mag1 = math.sqrt(mag1)
mag2 = math.sqrt(mag2)
if mag1 == 0 or mag2 == 0:
return 0
return dot_product / (mag1 * mag2)
def get_similar_users(self, target_user, k=5):
"""找出与目标用户最相似的k个用户
参数:
target_user: 目标用户ID
k: 返回的相似用户数量
返回:
包含(userId, 相似度)的列表,按相似度降序排列
"""
if target_user in self.user_similarities:
return self.user_similarities[target_user]
similarities = []
for user in self.user_ratings:
if user == target_user:
continue
similarity = self.cosine_similarity(target_user, user)
similarities.append((user, similarity))
# 按相似度降序排序
similarities.sort(key=itemgetter(1), reverse=True)
# 缓存结果
self.user_similarities[target_user] = similarities[:k]
return similarities[:k]
def predict_rating(self, target_user, target_movie, k=5):
# 如果用户已经评价过这部电影,直接返回实际评分
if target_movie in self.user_ratings[target_user]:
return self.user_ratings[target_user][target_movie]
similar_users = self.get_similar_users(target_user, k)
# 获取目标用户的平均评分(默认使用全局平均)
target_avg = self.user_avg.get(target_user, sum(r for _,_,r in self.ratings)/len(self.ratings))
numerator = 0 # 分子
denominator = 0 # 分母
valid_count = 0 # 有效相似用户计数
# 基于相似用户的评分计算加权偏差
for user, similarity in similar_users:
if target_movie in self.user_ratings[user]:
user_avg = self.user_avg.get(user, target_avg)
rating = self.user_ratings[user][target_movie]
# 使用偏差(评分-用户平均)而不是原始评分
numerator += similarity * (rating - user_avg)
denominator += abs(similarity)
valid_count += 1
if valid_count == 0:
# 如果没有相似用户评价过这部电影
if target_movie in self.movie_ratings:
# 返回这部电影的平均评分,并约束在[0.5,5.0]范围内
ratings = list(self.movie_ratings[target_movie].values())
return max(0.5, min(5.0, sum(ratings) / len(ratings)))
else:
# 如果电影没有任何评分,返回用户平均,并约束在[0.5,5.0]范围内
return max(0.5, min(5.0, target_avg))
# 预测值 = 用户平均 + 加权偏差,并约束在[0.5,5.0]范围内
raw_prediction = target_avg + (numerator / denominator if denominator != 0 else 0)
return max(0.5, min(5.0, raw_prediction))
def recommend_movies(self, target_user, n=20,k=5):
"""为目标用户推荐电影
参数:
target_user: 目标用户ID
n: 推荐的电影数量
k: 使用的相似用户数量
返回:
推荐电影列表,包含电影ID、标题和预测评分
"""
# 获取用户未评价的电影
rated_movies = set(self.user_ratings[target_user].keys())
all_movies = set(self.movies.keys())
unrated_movies = all_movies - rated_movies
# 预测用户对未评价电影的评分
predictions = []
for movie in unrated_movies:
pred = self.predict_rating(target_user, movie, k)
predictions.append((movie, pred))
# 按预测评分降序排序
predictions.sort(key=itemgetter(1), reverse=True)
# 返回前n个推荐
recommendations = []
for movie_id, rating in predictions[:n]:
recommendations.append({
'movie_id': movie_id,
'title': self.movies[movie_id],
'predicted_rating': rating
})
return recommendations
def evaluate(self, test_ratio=0.2, k=5):
"""评估推荐系统的预测准确性
参数:
test_ratio: 测试集比例
k: 使用的相似用户数量
返回:
均方根误差(RMSE)
"""
# 随机划分训练集和测试集
random.shuffle(self.ratings)
split_idx = int(len(self.ratings) * (1 - test_ratio))
train = self.ratings[:split_idx]
test = self.ratings[split_idx:]
# 使用训练集重建用户和电影评分数据
self.user_ratings = defaultdict(dict)
self.movie_ratings = defaultdict(dict)
for user, movie, rating in train:
self.user_ratings[user][movie] = rating
self.movie_ratings[movie][user] = rating
# 在测试集上计算RMSE
squared_errors = 0
for user, movie, actual in test:
predicted = self.predict_rating(user, movie, k)
squared_errors += (predicted - actual) ** 2
rmse = math.sqrt(squared_errors / len(test))
return rmse
def main():
"""主函数"""
# 初始化推荐系统
print("正在加载数据...")
recommender = MovieRecommender()
recommender.load_and_preprocess_data('movies.csv', 'ratings.csv')
# 评估推荐系统
print("\n正在评估推荐系统...")
rmse = recommender.evaluate()
print(f"均方根误差(RMSE): {rmse:.4f}")
# 为学号为49用户生成推荐
print(f"\n为用户 {49}推荐的电影:")
recommendations = recommender.recommend_movies(49)
for rec in recommendations:
print(f"{rec['title']} (预测评分: {rec['predicted_rating']:.2f})")
# 检查相似用户的实际评分
test_user = 49
sim_users = recommender.get_similar_users(test_user)
print(f"用户{test_user}的相似用户及其评分示例:")
for uid, sim in sim_users:
print(f"用户{uid}(相似度:{sim:.2f}) 平均评分:{sum(recommender.user_ratings[uid].values())/len(recommender.user_ratings[uid]):.2f}")
if __name__ == "__main__":
main()
以用户ID=49作为示例,输出结果如下:

可以看出,实验搭建的推荐平台较好的完成了任务。
三、总结
本实验基于协同过滤算法成功构建了一个电影评分预测模型与推荐模型。
未来改进方向:
1.预测分数均为5.0,与现实情况相比比较失真,算法仍需改进;
2.后续可进行前端系统的构建,提高系统的可使用性。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)