4  Item based 协同过滤

推荐系统的一般性架构如下:

graph TD
    %% ----- 1. 接入层 -----
    Client(客户端/前端) -->|1.请求推荐| API[API网关]
    API --> Engine{调度中心<br>RecEngine}

    %% ----- 2. 核心在线推荐漏斗 (Online Funnel) -----
    subgraph 在线推荐漏斗 [核心推荐漏斗 Online Funnel]
        direction TB
        
        %% 彻底消除英文空格,使用全角标点,物理阻断渲染器的自动换行机制
        Recall["1.召回层Recall:百万级->千级<br>多路召回:双塔/图网络/I2I/热门等"]
        PreRank["2.粗排层Pre-rank:千级->百级<br>轻量模型:浅层MLP/GBDT树模型"]
        Rank["3.精排层Rank:百级->几十级<br>深度网络:DIN/DIEN/MMoE等"]
        ReRank["4.重排层Re-rank:几十级->Top_K<br>业务规则:多样性打散/强插广告/过滤"]

        %% 内部直连
        Recall -->|去重| PreRank -->|打分截断| Rank -->|精排打分| ReRank
    end

    %% 引擎与漏斗的交互
    Engine -->|2.发起调度| Recall
    ReRank -->|3.返回Top_K| Engine
    Engine -.->|4.数据组装| API
    API -.->|5.页面展示| Client

    %% ----- 3. 底层数据与机器学习平台 (Data & ML Infra) -----
    subgraph 底层支撑平台 [数据与模型平台 Data & ML Infra]
        direction LR
        Log[行为日志Kafka] --> Flink[实时计算Flink] & Spark[离线计算Spark]
        Flink --> FS[(特征中心<br>Feature_Store)]
        Spark --> FS & MR[(模型仓库<br>Model_Repo)]
    end

    %% ----- 4. 跨层交互 (解耦虚线) -----
    Client -.->|用户打点| Log
    FS -.->|特征拉取| 在线推荐漏斗
    MR -.->|权重更新| 在线推荐漏斗
    
    %% 样式调整:注入 white-space: nowrap 强制不换行
    classDef layer fill:#f4f7f6,stroke:#4a90e2,stroke-width:2px,rx:5px,white-space:nowrap;
    class Recall,PreRank,Rank,ReRank layer;
    classDef db fill:#fffbe6,stroke:#d4b106,stroke-width:1px,white-space:nowrap;
    class FS,MR db;

4.1 高效数据处理管道

data.table 是 R 语言中处理大型数据集(GB 级)的首选工具。它以极致的内存效率和计算速度著称,语法精简且高度统一。

它的语法极其精简,初看可能有点反直觉(特别是如果你习惯了 tidyverse 系列包的管道操作),但一旦掌握,你会惊叹于它的优雅和速度。

1. 核心哲学:DT[i, j, by]

data.table 的所有操作都遵循这个简单的公式,就像 SQL 语句的浓缩版:

DT[i, j, by]

  • i: Where? (筛选行)
  • j: Select / Update / Compute? (操作列)
  • by: Group By? (分组依据)

2. 创建与筛选 (i)

library(data.table)
dt <- data.table(ID = 1:5, Group = c("A","A","B","B","A"),
 Score = c(80, 95, 70, 85, 90))

# 筛选行
dt[Score > 85]          # 逻辑筛选
dt[1:3]                 # 按行号筛选
dt[order(-Score)]       # 排序(内置优化,极快)

3. 列操作与计算 (j)

使用 .() 来包裹多个列或计算逻辑(.()list() 的缩写)。

# 选择列
dt[, .(ID, Score)]

# 计算汇总
dt[, .(Avg_Score = mean(Score), Max_Score = max(Score))]

4. 引用赋值 (:=)

data.table 使用“原地修改”机制,不会产生多余的数据副本,内存占用极低。

# 新增或修改列
dt[, Is_High := Score > 85]

# 删除列
dt[, Is_High := NULL]

# 多列同时修改
dt[, `:=`(New_Col1 = 1, New_Col2 = 2)]

5. 分组聚合 (by)

# 按 Group 分组,计算平均分
dt[, .(Mean_S = mean(Score)), by = Group]

6. 链式操作 (Chaining)

无需管道符 %>%,直接链接方括号:

dt[Score > 70, .(Mean_S = mean(Score)), by = Group][order(-Mean_S)]

7. 特殊符号

  • .N: 当前分组的行数(类似 count(*))。
  • .SD: Subset of Data,代表当前分组除分组列外的所有数据。

下表是在 R 体系中,data.tabletidyverse 系列包(如 dplyr)的对比:

特性 data.table dplyr (Tidyverse) Base R
设计核心 速度与内存效率 可读性与一致性 原生支持
语法风格 紧凑、方括号内完成 动词化、管道操作 繁杂、重复引用
内存消耗 极低 (原地修改) 较高 (创建副本) 较高
适用场景 亿级数据、生产环境 探索性分析、小数据集 基础演示
读写性能 fread() 业内最快 read_csv() 适中 read.csv()

关于 R base 的 Copy-on-modify 机制:

df1 <- data.frame(a = 1:3)
df2 <- df1          # 此时两者指向同一块内存
df2$a[1] <- 99      # 当你尝试修改 df2 时,R 会默默地在底层复制一份 df1 赋值给 df2
df1
  a
1 1
2 2
3 3
df2
   a
1 99
2  2
3  3

data.table 使用的是引用传递 (Pass-by-reference)。直接赋值时,并没有复制数据,只是给同一块内存地址起了一个新的别名。

library(data.table)
dt1 <- data.table(a = 1:3)
dt2 <- dt1          # dt2 仅仅是 dt1 的一个“快捷方式”
dt2[, a := c(99, 2, 3)] 
print(dt1) 
       a
   <num>
1:    99
2:     2
3:     3

4.2 构建协同过滤模型

本案例我们使用 MovieLens 10M 数据集

  • 数据集来源:由 GroupLens 研究组(明尼苏达大学)收集并公开。
  • 数据集规模:包含 72,000 名用户对约 10000 部电影的 1 亿条评分。

数据大概长这样(每行一个评分,格式为 用户ID::电影ID::评分::时间戳):

1::122::5::838985046
1::185::5::838983525
1::231::5::838983392
1::292::5::838983421
1::316::5::838983392
1::329::5::838983392

早期推荐系统的研究(如经典的 Netflix Prize)高度依赖显式反馈(Explicit Feedback),即用户明确表达偏好的数据,如我们使用的 MovieLens 数据集中的 1 到 5 星评分。但现实的商业目标往往不是“预测用户会给这部电影打多少分”,而是“在有限的屏幕曝光里,预测用户最有可能点击哪个商品”。这就要求我们将目光转向海量、实时但充满噪音的隐式反馈(Implicit Feedback)数据。

隐式反馈是指用户自然产生的行为脚印:浏览、点击、停留、加入购物车。业务目标的转换,要求我们在数学建模上做出彻底的改变。我们的预测目标不再是“打多少分”,而是“在给定的上下文中,用户是否会发生交互”。如果发生交互,标签 \(y=1\);否则 \(y=0\)。这就把推荐问题从回归任务优雅地转化为二分类任务。

基于物品的协同过滤(Item-Based Collaborative Filtering, IBCF)的逻辑非常直观:“看了电影 A 的人,通常也会看电影 B”。由于其极高的可解释性和易于预计算的特点,占据主导地位很长一段时间。

4.2.1 IBCF 的运行原理

1. 构建交互矩阵 (User-Item Matrix)

首先,将底层的日志数据映射为一个二维矩阵。行通常代表用户 (User),列代表物品 (Item)。矩阵中的值 \(y_{ui}\) 代表用户对物品的偏好度。

在本例我们转化为了隐式反馈(观看为 1,未观看为 0),\(y_{ui}\) 是一个 \(72000 \times 10000\) 的超大矩阵。

2. 计算物品相似度矩阵 (Item Similarity Computation)

算法将矩阵的每一列看作一个高维空间中的向量。两个向量的夹角越小,说明这两个物品被同一个群体交互的概率越高。在工程实践中,最常用的是余弦相似度 (Cosine Similarity):

\[ sim(i, j) = \frac{\vec{i} \cdot \vec{j}}{\|\vec{i}\| \|\vec{j}\|} \]

其中 \(\vec{i}\)\(\vec{j}\) 分别是物品 \(i\) 和物品 \(j\) 的用户交互向量。这一步产出的结果是一个 \(M \times M\) 的对称相似度矩阵。

3. 预测得分与生成推荐 (Scoring & Recommendation)

当需要为用户 \(u\) 生成推荐时,系统会提取该用户历史上发生过交互(即 \(y=1\))的物品集合。然后,基于第 2 步算出的相似度矩阵,找到与候选物品 \(i\) 最相似的 \(K\) 个物品集合,记为 \(N(i)\)。在隐式反馈场景下,我们预测用户 \(u\) 对候选物品 \(i\) 的交互倾向(即最终用于推荐排序的打分)公式修改为:

\[ \hat{y}_{ui} = \sum_{j \in N(i)} sim(i, j) \cdot y_{uj} \]

这里 \(y_{uj} \in \{0, 1\}\) 表示用户 \(u\) 是否与物品 \(j\) 发生过交互。

4.2.2 数据处理过程

library(vroom)
library(data.table)

DATA_DIR <- "/Users/liusizhe/data/ml-10m100K"
FILE_PATH <- file.path(DATA_DIR, "ratings.dat")

dt_ratings <- vroom(
  FILE_PATH,
  delim = "::",
  col_names = c("user_id", "movie_id", "rating", "timestamp"),
  col_select = c(user_id, movie_id, rating, timestamp),
  show_col_types = FALSE
)
setDT(dt_ratings)

# 提取唯一 ID 并计算维度
unique_users <- unique(dt_ratings$user_id)
unique_items <- unique(dt_ratings$movie_id)
num_users <- length(unique_users)
num_items <- length(unique_items)
all_items_vec <- 1:num_items

# 映射为从 1 开始的连续整数索引
dt_ratings[, user_idx := match(user_id, unique_users)]
dt_ratings[, item_idx := match(movie_id, unique_items)]

在数据中会出现有大量交互异常用户,比如爬虫或者异常偏好的用户。这些用户对模型的训练和评估都是不友好的,需要被剔除。我们可以通过设置交互次数的上下限来剔除这些异常用户。

首先观影次数最高 5% 的用户的数据情况:

# 
dt_ratings[, .(action_cnt = .N), by = user_idx][,action_cnt] |>
 quantile(seq(0.95, 1, by = 0.01))
    95%     96%     97%     98%     99%    100% 
 512.00  575.92  665.69  806.00 1058.23 7359.00 

基于该信息,我们剔除约 2% 的异常用户。

valid_users <- dt_ratings[, .(action_cnt = .N), by = user_idx][
  action_cnt <= 800, user_idx
]
dt_ratings <- dt_ratings[user_idx %in% valid_users]

在真实的商业环境中,严禁使用“未来”的数据预测“过去”。因此,我们放弃随机切分,严格按照每个用户交互的时间戳(Timestamp)进行时序切分(80% 训练,20% 评估),并剔除测试集中从未在训练集出现过的冷启动物品。

# 严谨的时序划分 (80% Train, 20% Test)
setorder(dt_ratings, user_idx, timestamp)
dt_ratings[, `:=`(seq_num = 1:.N, total_num = .N), by = user_idx]

# 按照用户观影的时间顺序,预留 20% 做 test 数据集。
dt_train_raw <- dt_ratings[seq_num <= floor(0.8 * total_num)]
dt_test_raw  <- dt_ratings[seq_num > floor(0.8 * total_num)]

# 过滤测试集中的冷启动物品
valid_train_items <- unique(dt_train_raw$item_idx)
dt_test_raw <- dt_test_raw[item_idx %in% valid_train_items]

# 预先计算每个用户的全局交互历史,用于负采样时避开已交互物品
dt_user_history <- dt_ratings[, .(interacted = list(item_idx)), by = user_idx]

4.2.3 相似度矩阵计算

先看一下这个亿级用户和物品的数据在 R 中是如何存储的。如果使用标准的稠密矩阵会导致内存瞬间溢出,借助 R 的 Matrix 包,可以利用极其节约内存的稀疏矩阵(Sparse Matrix)来完成数据表达和高效的后续处理。

library(Matrix)

# 使用 dt_train_raw (隐式反馈,所有交互视为 1)
# 使用 sparseMatrix 极大降低内存占用并加速矩阵运算
R_train <- sparseMatrix(
  i = dt_train_raw$user_idx,
  j = dt_train_raw$item_idx,
  x = rep(1, nrow(dt_train_raw)), 
  dims = c(num_users, num_items)
)

构建“物品-物品相似度矩阵”:按照常规思维,我们会写两层 for 循环,挨个抽出两列,算分子、算分母、再相除。但这在面对上万个物品时会非常慢。我们换个思路,让矩阵里的“所有列”都除以自己的模长(列归一化)。

\[ sim(i, j) = \frac{\vec{i} \cdot \vec{j}}{\|\vec{i}\| \|\vec{j}\|} \]

# 1. 计算物品列的 L2 模长
item_norms <- sqrt(colSums(R_train^2))
item_norms[item_norms == 0] <- 1e-9 # 防止除以零

# 2. 列归一化:R_norm = R * D_item^(-1/2)
# 在 sparseMatrix 中,乘以对角矩阵是最高效的列缩放方式
R_norm <- R_train %*% Diagonal(x = 1 / item_norms)

t0 <- Sys.time()
# 3. 计算 Cosine 相似度:Sim = R_norm^T * R_norm
S_cosine <- crossprod(R_norm)
Sys.time() - t0
Time difference of 3.881818 secs

如果这个矩阵相乘的操作在 GPU 中计算(如 NV3080),耗时仅为于 1/10。另外如果开始不做异常用户剔除,耗时会到 7s 左右。

# 4. 移除自相似度 (对角线置 0)
diag(S_cosine) <- 0 

得到相似度矩阵 S_cosine 后,我们只需要用用户的历史交互矩阵 R_train 与其相乘,就能得到该用户对所有未交互物品的预测打分。

这里需要注意对象的大小:

object.size(S_cosine) |> format(units = 'MB')
[1] "225.7 Mb"
# 正常大小为:
10000^2 * 8/1024/1024 # 单位:MB
[1] 762.9395

R 为了保证数据安全,采用的是 Copy-on-Modify (修改即拷贝)。如果你对这个矩阵进行原地修改(例如 m[1,1] <- 0),R 并不会像 C++ 那样在原地修改内存。它会触发“写时复制”机制,导致系统瞬间需要分配两块内存来完成更新。如果你在循环中频繁修改矩阵,内存会被爆掉。

4.3 效果评估

对于评估集,业界最严苛也是最标准的做法是留一法(Leave-One-Out)配合负采样:取用户在测试集中的最后一次真实交互作为唯一的正样本(标签为 1),并为其随机抽取 99 个该用户从未见过的物品作为负样本(标签为 0)。

# 构造评估集 (严格留一法: 1 正 + 99 负) 
set.seed(42)
# 取测试集第一条作为目标正样本
dt_eval_pos <- dt_test_raw[, .SD[1], by = user_idx][, .(user_idx,item_idx,label=1)]

# 仅为有效的评估用户生成 99 个负样本
valid_eval_users <- dt_eval_pos$user_idx
dt_eval_neg <- dt_user_history[user_idx %in% valid_eval_users,
                               .(item_idx = sample(
                                 setdiff(
                                   all_items_vec, unlist(interacted)
                                   ), 99, replace = FALSE),
                                      label = 0), by = user_idx]

dt_eval <- rbindlist(list(dt_eval_pos, dt_eval_neg), use.names = TRUE)
setorder(dt_eval, user_idx, -label) # 保证每个用户 100 行连续排列

因为每个用户在评估集中只有唯一一个正样本,我们将采用命中率(Hit Ratio, HR)作为核心评估指标1。评估时,我们计算模型预测得分排名前 10 的物品中是否包含了这个正样本,即 HR@10。如果命中得分为 1,未命中得分为 0,最后对所有用户求均值。

1 本章的模型主要用于推荐系统的召回阶段,其核心目标是“广度和漏斗”:只要能把用户可能喜欢的物品从海量库里捞出来,扔进前 \(K\) 个候选集里交给下游的精排模型(比如 DCN、DIN 等)去打分就可以了。如果使用 NDCG 指标,会对排在第 9、第 10 名的召回结果施加极大的对数惩罚,这对召回模型来说有些“苛求”了。

# 1. 预先分配预测分列,初始化为 NA
dt_eval_ibcf <- copy(dt_eval) # 深拷贝,避免修改原始数据
dt_eval_ibcf[, pred_score := NA_real_]

# 2. 设置 Batch Size (根据你的内存大小调整,比如每次算 1000 个用户)
batch_size <- 1000
unique_users <- unique(dt_eval_ibcf$user_idx)
num_batches <- ceiling(length(unique_users) / batch_size)

# 3. 分批计算与提取
for (b in 1:num_batches) {
  # 打印当前 batch 开始执行的时间和进度
  current_time <- format(Sys.time(), "%H:%M:%S")
  cat(sprintf("[%s] 正在处理 Batch %d / %d ... ", current_time, b, num_batches))
  
  # 获取当前批次的用户 ID
  start_idx <- (b - 1) * batch_size + 1
  end_idx <- min(b * batch_size, length(unique_users))
  batch_users <- unique_users[start_idx:end_idx]
  
  # 矩阵乘法
  R_batch <- R_train[batch_users, , drop = FALSE]
  Pred_batch <- R_batch %*% S_cosine
  
  # 提取索引并赋值
  row_indices_in_dt <- which(dt_eval_ibcf$user_idx %in% batch_users)
  u_idx_mapped <- match(dt_eval_ibcf$user_idx[row_indices_in_dt], batch_users)
  i_idx_mapped <- dt_eval_ibcf$item_idx[row_indices_in_dt]
  
  # 稀疏矩阵索引提取
  dt_eval_ibcf$pred_score[row_indices_in_dt] <-
   Pred_batch[cbind(u_idx_mapped, i_idx_mapped)]
  
  # 释放内存
  rm(R_batch, Pred_batch)
  gc()
  
  # 打印该 batch 完成标志
  cat("完成!\n")
}
[16:49:58] 正在处理 Batch 67 / 70 ... 完成!
[16:50:00] 正在处理 Batch 68 / 70 ... 完成!
[16:50:02] 正在处理 Batch 69 / 70 ... 完成!
[16:50:03] 正在处理 Batch 70 / 70 ... 完成!
# 4. 计算 HR@10
dt_eval_ibcf[, rank := frank(-pred_score, ties.method = "random"), by = user_idx]
recall_10_ibcf <- dt_eval_ibcf[label == 1, mean(rank <= 10)]
cat(sprintf("=> Item-CF HR@10: %.4f\n", recall_10_ibcf))
# => Item-CF HR@10: 0.8470 # 不做异常用户剔除的结果
=> Item-CF HR@10: 0.8720