前言:本次的及机器学习实战分享是一个很有意思的实战案例,就是我们的推荐系统中非常好玩的新闻推荐,这个案例来源于阿里天池比赛中的新闻推荐项目,我将使用这个案例带大家完整地体验推荐系统的魅力。

资料来源:DataWhale组队学习之推荐系统实战

Task1:理解赛题

1. 赛题简介

该赛题是以新闻APP中的新闻推荐为背景, 目的是要求我们根据用户历史浏览点击新闻文章的数据信息预测用户未来的点击行为, 即用户的最后一次点击的新闻文章。

2. 数据概况

该数据来自某新闻APP平台的用户交互数据,包括30万用户,近300万次点击,共36万多篇不同的新闻文章,同时每篇新闻文章有对应的embedding向量表示。为了保证比赛的公平性,从中抽取20万用户的点击日志数据作为训练集,5万用户的点击日志数据作为测试集A,5万用户的点击日志数据作为测试集B。

数据表train_click_log.csv:训练集用户点击日志

testA_click_log.csv:测试集用户点击日志

articles.csv:新闻文章信息数据表

articles_emb.csv:新闻文章embedding向量表示

sample_submit.csv:提交样例文件

字段表Field:Description

user_id:用户id

click_article_id:点击文章id

click_timestamp:点击时间戳

click_environment:点击环境

click_deviceGroup:点击设备组

click_os:点击操作系统

click_country:点击城市

click_region:点击地区

click_referrer_type:点击来源类型

article_id:文章id,与click_article_id相对应

category_id:文章类型id

created_at_ts:文章创建时间戳

words_count:文章字数

emb_1,emb_2,…,emb_249:文章embedding向量表示

3. 评价方式理解

评价指标:

equation?tex=score%28user%29+%3D+%5Csum_%7Bk%3D1%7D%5E5+%5Cfrac%7Bs%28user%2C+k%29%7D%7Bk%7D

假如article1就是真实的用户点击文章,也就是article1命中, 则s(user1,1)=1, s(user1,2-4)都是0, 如果article2是用户点击的文章, 则s(user,2)=1/2,s(user,1,3,4,5)都是0。也就是score(user)=命中第几条的倒数。如果都没中, 则score(user1)=0。 这个是合理的, 因为我们希望的就是命中的结果尽量靠前, 而此时分数正好比较高。

4. 赛题理解

首先,我们需要将实际的问题转化成可以学习的机器学习项目。实际的问题是:根据用户历史浏览点击新闻的数据信息预测用户最后一次点击的新闻文章。如果我们把36w篇文章独立当成一个类别,那么我们的人物将会变得异常困难,既然是要预测最后一次点击的文章, 那么如果我们能预测出某个用户最后一次对于某一篇文章会进行点击的概率, 是不是就间接性的解决了这个问题呢?概率最大的那篇文章不就是用户最后一次可能点击的新闻文章吗? 这样就把原问题变成了一个点击率预测的问题(用户, 文章) --> 点击的概率(软分类), 而这个问题, 就是我们所熟悉的监督学习领域分类问题了。常见的逻辑回归,svm,xgb等都可以派上用场了。

补充:协同过滤算法的相关介绍

协同过滤(Collaborative Filtering)推荐算法是最经典、最常用的推荐算法。

所谓协同过滤,基本思想是根据用户之前的喜好以及其他兴趣相近的用户的选择来给用户推荐物品(基于对用户历史行为数据的挖掘发现用户的喜好偏向,并预测用户可能喜好的产品进行推荐),一般是仅 仅基于用户的行为数据(评价、购买、下载等),而不依赖于项的任何附加信息(物品自身特征)或者用户 的任何附加信息(年龄,性别等)。

目前应用比较广泛的协同过滤算法是基于邻域的方法,而这种方法主 要有下面两种算法:基于用户的协同过滤算法 (UserCF): 给用户推荐和他兴趣相似的其他用户喜欢的产品

基于物品的协同过滤算法 (ItemCF): 给用户推荐和他之前喜欢的物品相似的物品

不管是 UserCF 还是 ItemCF 算法,非常重要的步骤之一就是计算用户和用户或者物品和物品之间的 相似度,所以下面先整理常用的相似性度量方法,然后再对每个算法的具体细节进行展开。

(1)基于用户的协同过滤

UserCF 算法主要包括两个步骤:找到和目标用户兴趣相似的集合

找到这个集合中的用户喜欢的,且目标用户没有听说过的物品推荐给目标用户。

UserCF 编程实现

3. 计算前 n 个相似的用户

至此,我们就用代码完成了上面的小例子,有了这个评分,我们其实就可以对该用户做推荐了。这其实就 是微型版的 UserCF 的工作过程了。

下面开始这个比赛相关的代码:

导入相应的工具包

import time, math, os

from tqdm import tqdm # Tqdm 是一个快速,可扩展的Python进度条

import gc # 垃圾回收

import pickle # 用于序列化和反序列化Python对象结构的二进制协议

import random

from datetime import datetime

from operator import itemgetter # Python内部操作符的函数

import numpy as np

import pandas as pd

import warnings

from collections import defaultdict # collections----容器数据类型

import collections

warnings.filterwarnings('ignore')

节约内存的一个标配函数

这个函数的想法其实很简单,就是我们每一列的数字其实用的字节不一样,我们需要为每一列找到合适的储存字节,免得浪费。

# 节约内存的一个标配函数

def reduce_mem(df): # 将int转化为int8或者int16或者int32等,float也一样

starttime = time.time()

numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']

start_mem = df.memory_usage().sum() / 1024**2

for col in df.columns:

col_type = df[col].dtypes

if col_type in numerics:

c_min = df[col].min()

c_max = df[col].max()

if pd.isnull(c_min) or pd.isnull(c_max):

continue

if str(col_type)[:3] == 'int': # 判断这个列需要多大的位数存储

if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:

df[col] = df[col].astype(np.int8)

elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:

df[col] = df[col].astype(np.int16)

elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:

df[col] = df[col].astype(np.int32)

elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:

df[col] = df[col].astype(np.int64)

else:

if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:

df[col] = df[col].astype(np.float16)

elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:

df[col] = df[col].astype(np.float32)

else:

df[col] = df[col].astype(np.float64)

end_mem = df.memory_usage().sum() / 1024**2

print('-- Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction),time spend:{:2.2f} min'.format(end_mem,

100*(start_mem-end_mem)/start_mem,

(time.time()-starttime)/60))

return df

读取采样或全量数据

# 读取采样或全量数据

# debug模式:从训练集中划出一部分数据来调试代码

data_path = "./"

save_path = "./"

def get_all_click_sample(data_path, sample_nums=10000):

"""

训练集中采样一部分数据调试

data_path: 原数据的存储路径

sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)

"""

all_click = pd.read_csv(data_path + 'train_click_log.csv')

all_user_ids = all_click.user_id.unique()

sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)

all_click = all_click[all_click['user_id'].isin(sample_user_ids)]

all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

return all_click

# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中

# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集

def get_all_click_df(data_path='./data_raw/', offline=True):

if offline:

all_click = pd.read_csv(data_path + 'train_click_log.csv')

else:

trn_click = pd.read_csv(data_path + 'train_click_log.csv')

tst_click = pd.read_csv(data_path + 'testA_click_log.csv')

all_click = trn_click.append(tst_click)

all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

return all_click

获取全量训练集

# 全量训练集

all_click_df = get_all_click_df(data_path, offline=False)

获取 用户 - 文章 - 点击时间字典

# 获取 用户 - 文章 - 点击时间字典

# 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}

def get_user_item_time(click_df):

click_df = click_df.sort_values('click_timestamp') # 按照时间排序

def make_item_time_pair(df):

return list(zip(df['click_article_id'], df['click_timestamp']))

user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\

.reset_index().rename(columns={0: 'item_time_list'})

user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))

return user_item_time_dict

获取点击最多的topk个文章

# 获取点击最多的topk个文章

# 获取近期点击最多的文章

def get_item_topk_click(click_df, k):

topk_click = click_df['click_article_id'].value_counts().index[:k]

return topk_click

itemcf的物品相似度计算

# itemcf的物品相似度计算

# 协同过滤参照文章:https://www.cnblogs.com/NeilZhang/p/9900537.html

def itemcf_sim(df):

"""

文章与文章之间的相似性矩阵计算

:param df: 数据表

:item_created_time_dict: 文章创建时间的字典

return : 文章与文章的相似性矩阵

思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习), 在多路召回部分会加上关联规则的召回策略

"""

user_item_time_dict = get_user_item_time(df)

# 计算物品相似度

i2i_sim = {}

item_cnt = defaultdict(int) # 当字典的key不存在返回一个int,0

for user, item_time_list in tqdm(user_item_time_dict.items()): # 设置进度条

# 在基于商品的协同过滤优化的时候可以考虑时间因素

for i, i_click_time in item_time_list:

item_cnt[i] += 1

i2i_sim.setdefault(i, {})

for j, j_click_time in item_time_list:

if(i == j):

continue

i2i_sim[i].setdefault(j, 0)

i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

i2i_sim_ = i2i_sim.copy()

for i, related_items in i2i_sim.items():

for j, wij in related_items.items():

i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

# 将得到的相似性矩阵保存到本地

pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))

return i2i_sim_

i2i_sim = itemcf_sim(all_click_df)

100%|██████████| 250000/250000 [00:58<00:00, 4309.22it/s]

itemcf 的文章推荐

# itemcf 的文章推荐

# 基于商品的召回i2i

def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click):

"""

基于文章协同过滤的召回

:param user_id: 用户id

:param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}

:param i2i_sim: 字典,文章相似性矩阵

:param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章

:param recall_item_num: 整数, 最后的召回文章数量

:param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全

return: 召回的文章列表 {item1:score1, item2: score2...}

注意: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习), 在多路召回部分会加上关联规则的召回策略

"""

# 获取用户历史交互的文章

user_hist_items = user_item_time_dict[user_id]

user_hist_items_ = {user_id for user_id, _ in user_hist_items}

item_rank = {}

for loc, (i, click_time) in enumerate(user_hist_items):

for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:

if j in user_hist_items_:

continue

item_rank.setdefault(j, 0)

item_rank[j] += wij

# 不足10个,用热门商品补全

if len(item_rank) < recall_item_num:

for i, item in enumerate(item_topk_click):

if item in item_rank.items(): # 填充的item应该不在原来的列表中

continue

item_rank[item] = - i - 100 # 随便给个负数就行

if len(item_rank) == recall_item_num:

break

item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]

return item_rank

给每个用户根据物品的协同过滤推荐文章

# 给每个用户根据物品的协同过滤推荐文章

# 定义

user_recall_items_dict = collections.defaultdict(dict)

# 获取 用户 - 文章 - 点击时间的字典

user_item_time_dict = get_user_item_time(all_click_df)

# 去取文章相似度

i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))

# 相似文章的数量

sim_item_topk = 10

# 召回文章数量

recall_item_num = 10

# 用户热度补全

item_topk_click = get_item_topk_click(all_click_df, k=50)

for user in tqdm(all_click_df['user_id'].unique()):

user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim,

sim_item_topk, recall_item_num, item_topk_click)

100%|██████████| 250000/250000 [59:14<00:00, 70.33it/s]

召回字典转换成df

# 召回字典转换成df

# 将字典的形式转换成df

user_item_score_list = []

for user, items in tqdm(user_recall_items_dict.items()):

for item, score in items:

user_item_score_list.append([user, item, score])

recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])

100%|██████████| 250000/250000 [00:14<00:00, 16721.23it/s]

生成提交文件

# 生成提交文件

# 生成提交文件

def submit(recall_df, topk=5, model_name=None):

recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])

recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

# 判断是不是每个用户都有5篇文章及以上

tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())

assert tmp.min() >= topk

del recall_df['pred_score']

submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()

submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

# 按照提交格式定义列名

submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',

3: 'article_3', 4: 'article_4', 5: 'article_5'})

save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'

submit.to_csv(save_name, index=False, header=True)

# 获取测试集

tst_click = pd.read_csv(data_path + 'testA_click_log.csv')

tst_users = tst_click['user_id'].unique()

# 从所有的召回数据中将测试集中的用户选出来

tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]

# 生成提交文件

submit(tst_recall, topk=5, model_name='itemcf_baseline')

总结:

本节内容主要包括赛题简介,数据概况,评价方式以及对该赛题进行了一个总体上的思路分析,作为竞赛前的预热,旨在帮助学习者们能够更好切入该赛题,为后面的学习内容打下一个良好的基础。最后我们给出了关于本赛题的一个简易Baseline, 帮助学习者们先了解一下新闻推荐比赛的一个整理流程, 接下来我们就对于流程中的每个步骤进行详细的介绍。

今天的学习比较简单,下面整理一下关于赛题理解的一些经验:

赛题理解究竟是在理解什么?理解赛题:从直观上对问题进行梳理, 分析问题的目标,到底要让做什么事情, 这个非常重要

理解数据:对赛题数据有一个初步了解,知道和任务相关的数据字段和数据字段的类型, 数据之间的内在关联等,大体梳理一下哪些数据会对我们解决问题非常有用,方便后面我们的数据分析和特征工程。

理解评估指标:评估指标是检验我们提出的方法,我们给出结果好坏的标准,只有正确的理解了评估指标,我们才能进行更好的训练模型,更好的进行预测。此外,很多情况下,线上验证是有一定的时间和次数限制的,所以在比赛中构建一个合理的本地的验证集和验证的评价指标是很关键的步骤,能有效的节省很多时间。 不同的指标对于同样的预测结果是具有误差敏感的差异性的所以不同的评价指标会影响后续一些预测的侧重点。

有了赛题理解之后,我们该做什么?

在对于赛题有了一定的了解后,分析清楚了问题的类型性质和对于数据理解 的这一基础上,我们可以梳理一个解决赛题的一个大题思路和框架

我们至少要有一些相应的理解分析,比如这题的难点可能在哪里,关键点可能在哪里,哪些地方可以挖掘更好的特征.

用什么样得线下验证方式更为稳定,出现了过拟合或者其他问题,估摸可以用什么方法去解决这些问题。

这时是在一个宏观的大体下分析的,有助于摸清整个题的思路脉络,以及后续的分析方向。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐