文章目录

lib/maple_tree.c 高性能B树实现(High-Performance B-Tree Implementation) VMA管理的核心数据结构

在这里插入图片描述

https://github.com/wdfk-prog/linux-study

历史与背景

这项技术是为了解决什么特定问题而诞生的?

Maple Tree(枫树)是为了解决Linux内核内存管理子系统中的一个核心性能瓶颈而设计的。具体来说,它是为了取代管理虚拟内存区域(Virtual Memory Areas, VMAs)的红黑树(Red-Black Tree)

在Maple Tree出现之前,每个进程的内存地址空间(struct mm_struct)都使用一棵红黑树来组织其所有的VMA。当进程的VMA数量非常多时(例如大型数据库或科学计算应用),以及在多线程环境下,这棵红-黑树成为了一个严重的争用点:

  • 锁争用(Lock Contention):对VMA树的任何修改(如mmap, munmap)都需要获取一个重量级的读写信号量(mmap_sem,现在是mmap_lock)。在写者持有锁期间,所有其他试图查找或修改VMA的线程都会被阻塞,这严重限制了内存密集型应用的可扩展性。
  • 缓存效率低下(Cache Inefficiency):红黑树是一种指针密集型的数据结构。树的每个节点都是一个单独的、小的内存分配。遍历树的过程涉及到在内存中进行大量的指针“跳跃”,这会导致很差的CPU缓存局部性(cache locality),从而频繁地从主内存加载数据,拖慢查找速度。
  • 范围操作复杂:虽然红黑树可以进行范围查找,但其实现和性能并非最优。

Maple Tree被设计为一种在现代多核CPU上表现更佳的数据结构,专门优化以解决以上所有问题。

它的发展经历了哪些重要的里程碑或版本迭代?

Maple Tree是由内核开发者Liam R. Howlett主导设计和实现的。

  • 构思与开发:该项目旨在创建一个专为VMA管理优化的、兼顾高性能和高并发性的数据结构。
  • 正式引入:Maple Tree在Linux 5.14内核版本中首次被引入,并开始逐步替换内存管理中的红黑树。这是一个重要的里程碑,标志着内核核心数据结构的现代化演进。
  • 成为VMA默认:经过几个内核版本的迭代和稳定,Maple Tree最终成为了管理VMA的默认数据结构,完全取代了原来的红黑树实现。
  • 通用化:除了VMA,Maple Tree也被设计成一个通用的库(lib/maple_tree.c),可供内核其他需要高性能范围映射的子系统使用。
目前该技术的社区活跃度和主流应用情况如何?

Maple Tree是当前Linux内核内存管理子系统中一个非常核心和活跃的组件。

  • 核心应用:它是所有现代Linux内核管理进程虚拟地址空间的标准方式。
  • 持续优化:社区和主要作者仍在不断对其进行性能优化和功能增强。
  • 逐步推广:由于其优越的性能,其他内核子系统也开始考虑或已经在使用Maple Tree来管理地址范围或其他类型的范围数据。

核心原理与设计

它的核心工作原理是什么?

Maple Tree是一种缓存优化的B树(Cache-conscious B-Tree)变体,专门用于存储和查询非重叠的范围(non-overlapping ranges)

  1. B树结构:与每个节点只有一个数据项的二叉树(如红黑树)不同,B树的每个节点内部都是一个数组(称为“槽位”,slots)。这使得每个节点可以存储多个范围的分界点(pivots)和指向子节点的指针(或在叶子节点中直接存储数据)。
  2. 缓存友好:将多个指针或数据项紧凑地存储在一个节点数组中,极大地提高了CPU缓存的利用率。当CPU加载一个节点到缓存时,它一次性获取了大量相关信息,后续的查找操作可以在缓存中快速完成,减少了对主内存的访问。
  3. 为范围而生:它的节点设计天然适合范围操作。一个节点内的多个分界点可以快速地将一个大的范围划分成多个子范围,引导查找操作高效地进入对应的子树。
  4. 读-拷贝-更新(RCU)友好:这是Maple Tree最重要的设计之一。
    • 无锁读取:对Maple Tree的查找和遍历操作是无锁的。它们在RCU的保护下进行,不会被写操作阻塞。这是对红黑树mmap_lock巨大瓶颈的根本性改进。
    • 写操作:当需要修改树时(插入或删除一个VMA),写者会复制从根节点到目标叶子节点路径上的所有节点(这就是“拷贝”)。然后在新复制的节点上进行修改,最后通过一次原子的指针交换操作,将树的根指向新的、修改后的路径。旧的路径会在所有读者都离开后被RCU机制安全地回收。这个过程虽然对写者来说更复杂,但它保证了读者可以一直无锁地、安全地访问树的一个稳定快照。
它的主要优势体现在哪些方面?
  • 极高的并发性:通过RCU实现的无锁读取,极大地提高了多线程应用在内存操作上的可扩展性。
  • 卓越的缓存性能:B树的节点结构显著减少了指针追逐,提高了缓存命中率,从而加速了查找操作。
  • 高效的范围操作:其数据结构天然适合快速定位和遍历一个地址范围内的所有VMA。
  • 写操作锁粒度优化:虽然写操作仍需同步,但其锁定机制比旧的mmap_lock更精细,且不会阻塞读者。
它存在哪些已知的劣势、局限性或在特定场景下的不适用性?
  • 实现复杂:RCU和写时复制的更新机制使得Maple Tree的内部实现比红黑树复杂得多,理解和调试的门槛更高。
  • 写操作开销:对于写操作,需要复制路径上的多个节点,这在理论上比红黑树的“原地”旋转和重新着色有更高的开销。但在高并发场景下,这种开销被无锁读取带来的巨大收益所弥补。
  • 不适合单值查找:虽然可以做到,但如果需求仅仅是单个键到值的映射(非范围),那么哈希表通常是更简单、更高效的选择。

使用场景

在哪些具体的业务或技术场景下,它是首選解決方案?請舉例說明。

在内核中,任何需要管理大量非重叠范围,并且对并发读性能有极高要求的场景,Maple Tree都是首选解决方案。

  • 虚拟内存区域(VMA)管理:这是其设计的初衷和最核心的应用。当一个进程访问一个内存地址时,内核需要快速地从数千甚至数万个VMA中找到包含该地址的VMA。在多线程环境下,多个线程可能同时在进行这种查找,Maple Tree的无锁读取特性完美地满足了这一需求。
  • 其他潜在应用
    • 高级中断管理:管理中断号范围到处理程序的映射。
    • 设备内存映射:管理物理设备地址范围到驱动程序的映射。
    • 文件系统范围锁:管理文件内字节范围的锁。
是否有不推荐使用该技术的场景?为什么?
  • 简单的键值对映射:如果你的键没有范围的概念(例如,通过PID查找进程),哈希表是更好的选择,因为它平均查找时间是O(1)。
  • 需要严格有序遍历且并发度不高:如果主要操作是按键排序遍历,且写操作不频繁、并发度低,那么红黑树因为实现更简单,可能是一个更易于维护的选择。
  • 数据量非常小:对于少量的数据,使用简单的数据结构如链表可能就足够了,引入Maple Tree的复杂性得不偿shī。

对比分析

请将其 与 其他相似技术 进行详细对比。
特性 Maple Tree 红黑树 (Red-Black Tree) 哈希表 (Hash Table) 基数树/XArray
核心功能 并发的、缓存优化的范围映射 有序的单键映射 无序的单键映射 针对整数/指针键的、空间优化的有序映射
数据模型 范围 到 值 单个键 到 值 单个键 到 值 整数索引 到 值
并发/锁 RCU保护,读取无锁 需读写锁或自旋锁保护 需锁保护(通常是分桶锁) RCU保护,读取通常无锁
缓存效率 (节点内是数组) (指针密集,追逐指针) (数组访问) (Trie结构,路径压缩)
查找性能 O(log N) - 但常数因子小,缓存友好 O(log N) - 但常数因子大,缓存不友好 平均 O(1) O(k) (k为索引位数)
范围查找 非常高效 支持,但效率不如B树 不支持 支持,效率高
实现复杂性 非常高 中等 较低
典型用途 VMA管理,高性能范围锁 定时器管理,文件描述符管理 dcache,PID表,连接跟踪 页缓存,文件描述符表

maple_treeRadix Tree的区别、相似点以及各自的应用场景。

maple_treeRadix Tree 是 Linux 内核中两种高效的树形数据结构,它们各自有不同的设计目标、实现方式和应用场景。以下是两者的区别、相似点以及各自的应用场景。


1. 设计目标

1.1 maple_tree

  • 目标: 提供高效的范围操作、低内存占用以及并发友好性。
  • 特点:
    • 支持范围(range)存储,适合稀疏或连续数据的管理。
    • 通过多叉树结构减少树的高度,提升查找效率。
    • 使用 RCU(Read-Copy-Update)机制支持无锁读操作,适合高并发场景。

1.2 Radix Tree

  • 目标: 高效管理稀疏数据,提供快速的查找、插入和删除操作。
  • 特点:
    • 基于压缩前缀树(Compressed Prefix Tree),通过共享公共前缀节省内存。
    • 适合存储稀疏数据,例如文件系统页缓存或 ID 分配。
    • 主要用于单值存储,而不是范围操作。

2. 实现方式

2.1 数据结构

maple_tree
  • 多叉树结构: 每个节点可以存储多个键值对或范围,减少树的高度。
  • 节点类型:
    • 稠密节点(dense nodes): 存储连续的索引值。
    • 范围节点(range nodes): 存储范围数据。
    • 叶子节点和内部节点明确分开。
Radix Tree
  • 压缩前缀树: 通过共享前缀减少存储空间。
  • 节点结构:
    • 每个节点表示路径中的一部分。
    • 支持路径压缩,减少树的深度。

2.2 并发支持

maple_tree
  • RCU 支持: 读操作无锁,写操作延迟生效,适合高并发环境。
  • 写操作锁: 使用 spinlock 确保写操作的安全性。
Radix Tree
  • RCU 支持: 允许无锁读操作。
  • 写操作: 传统写操作需要加锁,部分操作可能没有 maple_tree 那么高效。

2.3 范围操作支持

maple_tree
  • 原生支持范围存储和操作,例如范围插入、范围删除等。
  • 适合管理连续的或稀疏的范围数据。
Radix Tree
  • 不支持范围存储,每个键值对需要单独存储。
  • 适合管理单一索引的稀疏数据。

3. 应用场景

3.1 maple_tree 的应用场景

  • 虚拟内存管理(VMA):
    • 管理虚拟地址空间的范围映射。
    • 替代红黑树(rb_tree)以提高效率。
  • 文件系统元数据:
    • 存储文件的索引和元数据。
    • 支持范围操作的场景。
  • 内存分配器:
    • 管理内存块的分配和回收。
  • 高并发读写场景:
    • 需要频繁读写且要求高性能的场景。

3.2 Radix Tree 的应用场景

  • 页缓存(Page Cache):
    • 管理文件系统的页缓存,存储稀疏的页索引。
  • ID 分配(IDR/IDA):
    • 实现 ID 分配器(IDR)和 ID 分配助手(IDA)。
  • 稀疏数据管理:
    • 存储稀疏的键值对,例如网络设备的索引。
  • 低复杂度操作:
    • 适合不需要范围操作的简单稀疏数据存储。

4. 异同点

4.1 相似点

  • 目标: 都是为了高效管理稀疏数据。
  • 并发支持: 都使用 RCU 支持无锁读操作。
  • 内核应用: 都广泛应用于 Linux 内核的核心子系统(如内存管理、文件系统等)。

4.2 不同点

特性 maple_tree Radix Tree
数据结构 多叉树 压缩前缀树
范围操作 原生支持 不支持
内存占用 更低(通过多叉节点减少树的高度) 较高(每个键值对单独存储)
并发支持 RCU + 写锁,读写性能更高 RCU + 写锁,写操作性能稍逊
典型应用 虚拟地址空间(VMA)、文件元数据 页缓存(Page Cache)、ID 分配

5. 总结与选择建议

  • 选择 maple_tree:

    • 如果需要范围操作(如虚拟地址空间管理)。
    • 如果需要更低的内存占用和更高的查找效率。
    • 如果需要在高并发环境下高效读写。
  • 选择 Radix Tree:

    • 如果需要管理稀疏的单值数据(如页缓存或 ID 分配)。
    • 如果范围操作不是主要需求。

maple_tree 是对 Radix Tree 的一种优化和扩展,特别适合范围操作和高并发场景,而 Radix Tree 在单一索引稀疏数据的管理中仍然表现优异。两者各有优势,根据具体的应用场景选择合适的数据结构可以更好地满足系统需求。

include/linux/maple_tree.h

maple_status Maple State 状态

/*
 * Maple State 状态
 * ma_active 表示 Maple 状态指向一个节点和偏移量,并且可以继续在树上运行。
 * ma_start 表示我们尚未搜索树。
 * ma_root 表示我们已经搜索了树,我们找到的条目位于树的根中(即它的索引为 0,长度为 1,并且是树中唯一的条目)。
 * ma_none 表示我们已经搜索了树,并且树中没有此条目的节点。 例如,我们在空树中搜索索引 1。 或者我们有一个指向完整叶节点的树,我们搜索的条目大于该叶节点中可以包含的条目。
 * ma_pause 表示 Maple 状态内的数据可能已过时,请重新启动作
 * ma_overflow 表示搜索已达到搜索上限 ma_overflow 表示搜索已达到搜索下限 
 * ma_error 表示出现错误,请检查节点是否有错误号。
 */ma
enum maple_status {
	ma_active,
	ma_start,
	ma_root,
	ma_none,
	ma_pause,
	ma_overflow,
	ma_underflow,
	ma_error,
};

maple_range_64 叶节点

/*
 * 叶节点不存储指向节点的指针,它们存储用户数据。 用户几乎可以存储任何位模式。 
 * 如上所述,对于底部两位设置为“10”的数据,无法优化在根指针中存储 0 的条目。 
 * 我们还保留了底部两位设置为低于 4096 (即 2, 6, 10 .. 4094) 的 '10' 的值供内部使用。 
 * 某些 API 将 errnos 作为向右移动两位的负 errno 返回,并将底部的两位设置为“10”,虽然选择将这些值存储在数组中不是错误,但如果您使用 mas_is_err() 测试错误,可能会导致混淆。
 *
 * 非叶节点存储指向的节点的类型(以位 3-6 为单位的枚举maple_type),位 2 是保留的。 这使得 0-1 位暂时未使用。
 *
* 在常规的 B 树术语中,枢轴称为键。 术语 pivot 用于表示树正在指定范围,Pivots 可能出现在子树中,并带有附加到值的条目,而键对于 B 树的特定位置是唯一的。 Pivot 值包括具有相同索引的槽。
 */
struct maple_range_64 {
	struct maple_pnode *parent;
	unsigned long pivot[MAPLE_RANGE64_SLOTS - 1];	/* 表示树正在指定范围 */
	union {
		void __rcu *slot[MAPLE_RANGE64_SLOTS];		/* 实际存储内容 */
		struct {
			void __rcu *pad[MAPLE_RANGE64_SLOTS - 1];
			struct maple_metadata meta;
		};
	};
};

struct ma_state

/*
 * maple 状态在结构体 ma_state 中定义,用于在作期间跟踪信息,甚至在使用高级 API 时跟踪作之间的信息。
 *
 * 如果 state->node 设置了位 0,则它引用的树位置不是节点(例如根)。 如果设置了位 1,则其余位为负 errno。 位 2 ('unallocated slots' 位) 是清晰的。 第 3-6 位表示节点类型。
 *
 * state->alloc 具有请求数量的节点或已分配的节点。 如果 stat->alloc 具有请求的节点数,则将设置第一位 (0x1),其余位为值。 
 * 如果 state->alloc 是一个节点,则该节点的类型为 maple_alloc。 maple_alloc 具有 MAPLE_NODE_SLOTS - 1,用于存储更多已分配的节点、分配的节点总数以及此节点中的node_count。 node_count 是此节点中已分配的节点数。 
 * 超过 MAPLE_NODE_SLOTS - 1 的扩展是通过将更多节点存储到 state->alloc->slot[0] 的节点来处理的。 
 * 通过从 state->alloc 节点中删除节点,直到 state->alloc->node_count 为 1,此时返回 state->alloc 并且 state->alloc->slot[0] 提升为 state->alloc,从而从 state->alloc 中获取节点。 
 * 通过将当前 state->alloc 放入被推送节点的 slot[0] 中,节点被推送到 state->alloc 上。
 *
 * state 还包含 state->node 的隐含 min/max、此搜索的深度和偏移量。隐含的 min/max 要么来自父节点,要么对于根节点为 0-oo。 每次向下或向上遍历节点时,深度都会增加或减少。 偏移量是节点中感兴趣的槽/枢轴 - 用于读取或写入。
 *
 * 返回值时,maple state index 和 last 分别包含条目范围的开始和结束。 范围在 Maple Tree 中是包含的。
 *
 * 状态的状态用于确定下一个作应如何处理状态。 例如,如果状态为 ma_start,则下一个作应从树的根开始并向下遍历。 如果状态为 ma_pause,则该节点可能是过时的数据,应丢弃。 如果状态为 ma_overflow,则最后一个作达到上限。
 *
 */
struct ma_state {
	struct maple_tree *tree;	/* 我们正在作的树 */
	unsigned long index;		/* 我们正在作的索引 - 范围 start*/
	unsigned long last;			/* 我们正在作的最后一个索引 - range end*/
	struct maple_enode *node;	/* 包含此条目的节点*/
	unsigned long min;			/* 该节点的最小索引 - 隐含枢轴最小值 */
	unsigned long max;			/* 该节点的最大索引 - 隐含枢轴最大值 */
	struct maple_alloc *alloc;	/* 为此作分配的节点 */
	enum maple_status status;	/* 状态的状态(活动、启动、无等) */
	unsigned char depth;		/* 写入期间 tree descent 的深度*/
	unsigned char offset;
	unsigned char mas_flags;
	unsigned char end;			/* 节点的结束 */
	enum store_type store_type;	/* 此作所需的存储类型 */
};

struct ma_wr_state {
	struct ma_state *mas;
	struct maple_node *node;	/*解码的 mas->node */
	unsigned long r_min;		/* 最小范围 */
	unsigned long r_max;		/* 最大范围 */
	enum maple_type type;		/* mas->节点类型 */
	unsigned char offset_end;	/* 写入结束的偏移量 */
	unsigned long *pivots;		/* mas->node->pivots 指针 */
	unsigned long end_piv;		/* 偏移端的枢轴 */
	void __rcu **slots;			/* mas->node->slots 指针 */
	void *entry;				/* 要写入的条目 */
	void *content;				/* 正在被覆盖的现有条目 */
};

MA_STATE 表示 Maple Tree 的操作状态

#define MA_STATE(name, mt, first, end)					\
	struct ma_state name = {					\
		.tree = mt,						\
		.index = first,						\
		.last = end,						\
		.node = NULL,						\
		.status = ma_start,					\
		.min = 0,						\
		.max = ULONG_MAX,					\
		.alloc = NULL,						\
		.mas_flags = 0,						\
		.store_type = wr_invalid,				\
	}

MA_WR_STATE 表示写操作状态

#define MA_WR_STATE(name, ma_state, wr_entry)				\
	struct ma_wr_state name = {					\
		.mas = ma_state,					\
		.content = NULL,					\
		.entry = wr_entry,					\
	}

MTREE_INIT 初始化 Maple Tree

/*
 * 如果树在索引 0 处包含单个条目,则通常存储在 tree->ma_root 中。
 * 为了优化页面缓存,以 '00'、'01' 或 '11' 结尾的条目将存储在根目录中,但以 '10' 结尾的条目将存储在节点中。
 * 位 3-6 用于存储枚举maple_type。
 *
 * 标志既用于存储有关此树的一些不可变信息(在树创建时设置),也用于在旋转锁下设置的动态信息。
 *
 * 标志的另一个用途是指示树的全局状态。 MT_FLAGS_USE_RCU 标志就是这种情况,它表示树当前处于 RCU 模式。 
 * 添加此模式是为了允许树重用节点,而不是在存在单个用户时重新分配和 RCU 释放节点。
 */
struct maple_tree {
	union {
		spinlock_t	ma_lock;
		lockdep_map_p	ma_external_lock;
	};
	unsigned int	ma_flags;
	void __rcu      *ma_root;
};

/**
 * MTREE_INIT() - 初始化一棵枫树
 * @name:枫树名称
 * @__flags:枫树标志
 *
 */
#define MTREE_INIT(name, __flags) {					\
	.ma_lock = __SPIN_LOCK_UNLOCKED((name).ma_lock),		\
	.ma_flags = __flags,						\
	.ma_root = NULL,						\
}

#define MTREE_INIT_EXT(name, __flags, __lock)	MTREE_INIT(name, __flags)

lib/maple_tree.c

maple_tree_init

void __init maple_tree_init(void)
{
	maple_node_cache = kmem_cache_create("maple_node",
			sizeof(struct maple_node), sizeof(struct maple_node),
			SLAB_PANIC, NULL);
}

mas_root 获取枫树根

/*
 * mas_root() - 获取枫树根。
 * @mas:枫树状态。
 *
 * Return:指向树根的指针
 */
static __always_inline void *mas_root(struct ma_state *mas)
{
	return rcu_dereference_check(mas->tree->ma_root, mt_locked(mas->tree));
}

mas_start 设置 Maple 状态

/*
 * mas_start() - 为作设置 Maple 状态。
 * @mas:枫树状态。
 *
 * 如果 mas->status == 马_start,则将 min、max 和 depth 设置为
 *违约。
 *
 *返回:
 * - 如果 mas->node 是错误或未mas_start,则返回 NULL。
 * - 如果它是一个空树:NULL & mas->status == ma_none
 * - 如果它是一个单独的条目:条目&mas->status == ma_none
 * - 如果是一棵树:NULL & mas->status == ma_active
 */
static inline struct maple_enode *mas_start(struct ma_state *mas)
{
	if (likely(mas_is_start(mas))) {
		struct maple_enode *root;

		mas->min = 0;
		mas->max = ULONG_MAX;

retry:
		mas->depth = 0;
		root = mas_root(mas);
		/* Tree with nodes */
		/* 多节点树 */
		if (likely(xa_is_node(root))) {
			mas->depth = 1;
			mas->status = ma_active;
			mas->node = mte_safe_root(root);
			mas->offset = 0;
			if (mte_dead_node(mas->node))
				goto retry;

			return NULL;
		}

		mas->node = NULL;
		/* 空树 */
		if (unlikely(!root)) {
			mas->status = ma_none;
			mas->offset = MAPLE_NODE_SLOTS;
			return NULL;
		}

		/* 单节点树 */
		mas->status = ma_root;
		mas->offset = MAPLE_NODE_SLOTS;

		/* Single entry tree. */
		if (mas->index > 0)
			return NULL;

		return root;
	}

	return NULL;
}

mas_wr_prealloc_setup mas写预分配设置

static inline void mas_wr_prealloc_setup(struct ma_wr_state *wr_mas)
{
	struct ma_state *mas = wr_mas->mas;

	if (!mas_is_active(mas)) {
		/* 初始化时, 是这个状态 */
		if (mas_is_start(mas))
			goto set_content;

		if (unlikely(mas_is_paused(mas)))
			goto reset;

		if (unlikely(mas_is_none(mas)))
			goto reset;

		if (unlikely(mas_is_overflow(mas)))
			goto reset;

		if (unlikely(mas_is_underflow(mas)))
			goto reset;
	}

	/*
	 * 不太严格的 mas_is_span_wr() 版本,我们允许在此节点内进行跨写。 
	 * 这是为了阻止 mas_prealloc() 中的部分遍历被重置。
	 */
	if (mas->last > mas->max)
		goto reset;

	if (wr_mas->entry)
		goto set_content;

	if (mte_is_leaf(mas->node) && mas->last == mas->max)
		goto reset;

	goto set_content;

reset:
	mas_reset(mas);
set_content:
	wr_mas->content = mas_start(mas);
}

mas_wr_store_type 获取存储类型

/*
 * mas_wr_store_type() - 确定给定存储作的存储类型
 * @wr_mas:maple 写入状态
 *
 * 返回:作所需的 store 类型
 */
static inline enum store_type mas_wr_store_type(struct ma_wr_state *wr_mas)
{
	struct ma_state *mas = wr_mas->mas;
	unsigned char new_end;

	if (unlikely(mas_is_none(mas) || mas_is_ptr(mas)))
		return wr_store_root;

	if (unlikely(!mas_wr_walk(wr_mas)))
		return wr_spanning_store;

	/* 此时,我们位于需要更改的叶节点处。 */
	mas_wr_end_piv(wr_mas);
	if (!wr_mas->entry)
		mas_wr_extend_null(wr_mas);

	if ((wr_mas->r_min == mas->index) && (wr_mas->r_max == mas->last))
		return wr_exact_fit;

	if (unlikely(!mas->index && mas->last == ULONG_MAX))
		return wr_new_root;

	new_end = mas_wr_new_end(wr_mas);
	/* 潜在跨度再平衡折叠节点 */
	if (new_end < mt_min_slots[wr_mas->type]) {
		if (!mte_is_root(mas->node) && !(mas->mas_flags & MA_STATE_BULK))
			return  wr_rebalance;
		return wr_node_store;
	}

	if (new_end >= mt_slots[wr_mas->type])
		return wr_split_store;

	if (!mt_in_rcu(mas->tree) && (mas->offset == mas->end))
		return wr_append;

	if ((new_end == mas->end) && (!mt_in_rcu(mas->tree) ||
		(wr_mas->offset_end - mas->offset == 1)))
		return wr_slot_store;

	return wr_node_store;
}

mas_prealloc_calc 计算给定 store 运行所需的节点数

/**
 * mas_prealloc_calc() - 计算给定 store 运行所需的节点数
 * @mas:枫树状态
 * @entry:要存储到树中的入口
 *
 * 返回:预分配所需的节点数。
 */
static inline int mas_prealloc_calc(struct ma_state *mas, void *entry)
{
	int ret = mas_mt_height(mas) * 3 + 1;

	switch (mas->store_type) {
	case wr_invalid:
		WARN_ON_ONCE(1);
		break;
	case wr_new_root:
		ret = 1;
		break;
	case wr_store_root:
		if (likely((mas->last != 0) || (mas->index != 0)))
			ret = 1;
		else if (((unsigned long) (entry) & 3) == 2)
			ret = 1;
		else
			ret = 0;
		break;
	case wr_spanning_store:
		ret =  mas_mt_height(mas) * 3 + 1;
		break;
	case wr_split_store:
		ret =  mas_mt_height(mas) * 2 + 1;
		break;
	case wr_rebalance:
		ret =  mas_mt_height(mas) * 2 - 1;
		break;
	case wr_node_store:
		ret = mt_in_rcu(mas->tree) ? 1 : 0;
		break;
	case wr_append:
	case wr_exact_fit:
	case wr_slot_store:
		ret = 0;
	}

	return ret;
}

mas_allocated 获取在 Maple 状态下分配的节点数

/*
 * mas_allocated() - 获取在 Maple 状态下分配的节点数。
 * @mas:枫树状态
 *
 * ma_state alloc 成员重载以保存指向第一个已分配节点或要分配的请求节点数的指针。
 * 如果设置了 bit 0,则 alloc 包含请求的节点数。
 * 如果存在已分配的节点,则已分配的节点总数位于该节点中。
 *
 * 返回:分配的节点总数
 */
static inline unsigned long mas_allocated(const struct ma_state *mas)
{
	if (!mas->alloc || ((unsigned long)mas->alloc & 0x1))
		return 0;

	return mas->alloc->total;
}

mas_set_alloc_req 设置请求的分配数

/*
 * mas_set_alloc_req() - 设置请求的分配数量。
 * @mas:枫树州
 * @count:分配数量。
 *
 * 请求的分配数量要么位于位于 @mas->alloc->request_count 中的第一个已分配节点中,要么在没有已分配节点的情况下直接位于 @mas->alloc 中。 
 * 在节点中设置请求,或执行必要的编码以直接存储在 @mas->alloc 中。
 */
static inline void mas_set_alloc_req(struct ma_state *mas, unsigned long count)
{
	if (!mas->alloc || ((unsigned long)mas->alloc & 0x1)) {
		if (!count)
			mas->alloc = NULL;
		else
			mas->alloc = (struct maple_alloc *)(((count) << 1U) | 1U);
		return;
	}

	mas->alloc->request_count = count;
}

mas_alloc_req 获取请求的分配数

/*
 * mas_alloc_req() - 获取请求的分配数量。
 * @mas:枫树状态
 *
 * alloc count 要么直接存储在 @mas 中,要么存储在 @mas->alloc->request_count 中(如果至少分配了一个节点)。 
 * 如果请求计数直接存储在 @mas->alloc 中,请解码请求计数。
 *
 * 返回:分配请求计数。
 */
static inline unsigned int mas_alloc_req(const struct ma_state *mas)
{
	if ((unsigned long)mas->alloc & 0x1)
		return (unsigned long)(mas->alloc) >> 1;
	else if (mas->alloc)
		return mas->alloc->request_count;
	return 0;
}

mt_alloc_one 分配一个 Maple 节点

static inline struct maple_node *mt_alloc_one(gfp_t gfp)
{
	return kmem_cache_alloc(maple_node_cache, gfp);
}

mas_alloc_nodes 将节点分配到 Maple 状态

/*
 * mas_alloc_nodes - 将节点分配到 Maple 状态
 * @mas:枫树状态
 * @gfp:GFP 标志
 */
static inline void mas_alloc_nodes(struct ma_state *mas, gfp_t gfp)
{
	struct maple_alloc *node;
	unsigned long allocated = mas_allocated(mas);
	unsigned int requested = mas_alloc_req(mas);
	unsigned int count;
	void **slots = NULL;
	unsigned int max_req = 0;

	if (!requested)
		return;
	/* 清除当前的分配请求,避免重复分配 */
	mas_set_alloc_req(mas, 0);
	/* 预分配模式检查 */
	if (mas->mas_flags & MA_STATE_PREALLOC) {
		if (allocated)
			return;
		WARN_ON(!allocated);
	}
	/* 如果尚无分配的节点 或当前分配的节点已满 */
	if (!allocated || mas->alloc->node_count == MAPLE_ALLOC_SLOTS) {
		/* 分配一个新节点 */
		node = (struct maple_alloc *)mt_alloc_one(gfp);
		if (!node)
			goto nomem_one;

		if (allocated) {
			node->slot[0] = mas->alloc;
			node->node_count = 1;
		} else {
			node->node_count = 0;
		}

		mas->alloc = node;
		node->total = ++allocated;
		node->request_count = 0;
		requested--;
	}

	node = mas->alloc;
	/* 批量分配节点 */
	while (requested) {
		max_req = MAPLE_ALLOC_SLOTS - node->node_count;
		slots = (void **)&node->slot[node->node_count];
		max_req = min(requested, max_req);
		count = mt_alloc_bulk(gfp, max_req, slots);
		if (!count)
			goto nomem_bulk;

		if (node->node_count == 0) {
			node->slot[0]->node_count = 0;
			node->slot[0]->request_count = 0;
		}

		node->node_count += count;
		allocated += count;
		/* find a non-full node*/
		do {
			node = node->slot[0];
		} while (unlikely(node->node_count == MAPLE_ALLOC_SLOTS));
		requested -= count;
	}
	mas->alloc->total = allocated;
	return;

nomem_bulk:
	/* Clean up potential freed allocations on bulk failure */
	memset(slots, 0, max_req * sizeof(unsigned long));
	mas->alloc->total = allocated;
nomem_one:
	mas_set_alloc_req(mas, requested);
	mas_set_err(mas, -ENOMEM);
}

mas_node_count 计算节点数

/*
 * mas_node_count_gfp() - 检查是否分配了足够的节点,如果节点数不足,则请求更多节点。
 * @mas:枫树状态
 * @count:所需的节点数
 * @gfp:GFP 标志
 */
static void mas_node_count_gfp(struct ma_state *mas, int count, gfp_t gfp)
{
	unsigned long allocated = mas_allocated(mas);

	if (allocated < count) {
		mas_set_alloc_req(mas, count - allocated);
		mas_alloc_nodes(mas, gfp);
	}
}

/*
 * mas_node_count() - 检查是否分配了足够的节点,如果节点数不足,则请求更多节点。
 * @mas:枫树状态
 * @count:所需的节点数
 *
 * 注意:使用GFP_NOWAIT |__GFP_NOWARN 用于 gfp 标志。
 */
static void mas_node_count(struct ma_state *mas, int count)
{
	return mas_node_count_gfp(mas, count, GFP_NOWAIT | __GFP_NOWARN);
}

mas_wr_preallocate 为存储作预分配足够的节点

/**
 * mas_wr_preallocate() - 为存储作预分配足够的节点
 * @wr_mas:maple 写入状态
 * @entry:将要存储的条目
 *
 */
static inline void mas_wr_preallocate(struct ma_wr_state *wr_mas, void *entry)
{
	struct ma_state *mas = wr_mas->mas;
	int request;

	mas_wr_prealloc_setup(wr_mas);
	mas->store_type = mas_wr_store_type(wr_mas);
	/* 计算写操作所需的资源请求量 */
	request = mas_prealloc_calc(mas, entry);
	if (!request)
		return;
	/* 分配所需的节点资源 */
	mas_node_count(mas, request);
}

mas_pop_node 从 maple 状态获取先前分配的 maple 节点

/*
 * mas_pop_node() - 从 maple 状态获取先前分配的 maple 节点。
 * @mas:枫树状态
 *
 * Return:指向 Maple 节点的指针。
 */
static inline struct maple_node *mas_pop_node(struct ma_state *mas)
{
	struct maple_alloc *ret, *node = mas->alloc;
	unsigned long total = mas_allocated(mas);
	unsigned int req = mas_alloc_req(mas);

	/* nothing or a request pending. */
	if (WARN_ON(!total))
		return NULL;

	if (total == 1) {
		/* single allocation in this ma_state */
		mas->alloc = NULL;
		ret = node;
		goto single_node;
	}

	if (node->node_count == 1) {
		/* Single allocation in this node. */
		mas->alloc = node->slot[0];
		mas->alloc->total = node->total - 1;
		ret = node;
		goto new_head;
	}
	node->total--;
	ret = node->slot[--node->node_count];
	node->slot[node->node_count] = NULL;

single_node:
new_head:
	if (req) {
		req++;
		mas_set_alloc_req(mas, req);
	}

	memset(ret, 0, sizeof(*ret));
	return (struct maple_node *)ret;
}

mas_root_expand 将根扩展为节点

/*
 * mas_root_expand() - 将根扩展为节点
 * @mas:枫树状态
 * @entry:要存储到树中的入口
 */
static inline void mas_root_expand(struct ma_state *mas, void *entry)
{
	/* 获取 mas>tree->ma_root */
	void *contents = mas_root_locked(mas);
	enum maple_type type = maple_leaf_64;
	struct maple_node *node;
	void __rcu **slots;
	unsigned long *pivots;
	int slot = 0;

	node = mas_pop_node(mas);
	pivots = ma_pivots(node, type);
	slots = ma_slots(node, type);
	node->parent = ma_parent_ptr(mas_tree_parent(mas));
	mas->node = mt_mk_node(node, type);
	mas->status = ma_active;

	if (mas->index) {
		if (contents) {
			/* 将其存储到新节点的槽位中 */
			rcu_assign_pointer(slots[slot], contents);
			if (likely(mas->index > 1))
				slot++;
		}
		/* 增加槽位索引 */
		pivots[slot++] = mas->index - 1;
	}
	/* 将新条目(entry)存储到当前槽位 */
	rcu_assign_pointer(slots[slot], entry);
	mas->offset = slot;
	/* 设置枢轴值(pivots),表示条目的范围 */
	pivots[slot] = mas->last;
	/* 如果范围未覆盖 ULONG_MAX,为其添加一个额外的枢轴 */
	if (mas->last != ULONG_MAX)
		pivots[++slot] = ULONG_MAX;
	/* 更新树的元数据 */
	/* 设置树的深度为 1(mas->depth = 1),表示树现在有一个内部节点 */
	mas->depth = 1;
	/*  更新树的高度 */
	mas_set_height(mas);
	/* 设置新节点的元数据 */
	ma_set_meta(node, maple_leaf_64, 0, slot);
	/* 将新根交换到树中*/
	rcu_assign_pointer(mas->tree->ma_root, mte_mk_root(mas->node));
	return;
}

mas_store_root 将值存储到 root 中

/*
 * mas_store_root() - 将值存储到 root 中。
 * @mas:枫树状态
 * @entry:要存储的入口。
 *
 * 现在没有根节点,我们正在将一个值存储到根节点中 - 这个
 * 函数分配指针或扩展为节点。
 */
static inline void mas_store_root(struct ma_state *mas, void *entry)
{
	if (!entry) {
		if (!mas->index)
			rcu_assign_pointer(mas->tree->ma_root, NULL);
	} else if (likely((mas->last != 0) || (mas->index != 0)))
		mas_root_expand(mas, entry);
	else if (((unsigned long) (entry) & 3) == 2)
		mas_root_expand(mas, entry);
	else {
		rcu_assign_pointer(mas->tree->ma_root, entry);
		mas->status = ma_start;
	}
}

mas_wr_store_entry 用于存储值的内部调用

/*
 * mas_wr_store_entry() - 用于存储值的内部调用
 * @wr_mas:maple 写入状态
 */
static inline void mas_wr_store_entry(struct ma_wr_state *wr_mas)
{
	struct ma_state *mas = wr_mas->mas;
	unsigned char new_end = mas_wr_new_end(wr_mas);

	switch (mas->store_type) {
	case wr_invalid:
		MT_BUG_ON(mas->tree, 1);
		return;
	case wr_new_root:
		mas_new_root(mas, wr_mas->entry);
		break;
	case wr_store_root:
		mas_store_root(mas, wr_mas->entry);
		break;
	case wr_exact_fit:
		rcu_assign_pointer(wr_mas->slots[mas->offset], wr_mas->entry);
		if (!!wr_mas->entry ^ !!wr_mas->content)
			mas_update_gap(mas);
		break;
	case wr_append:
		mas_wr_append(wr_mas, new_end);
		break;
	case wr_slot_store:
		mas_wr_slot_store(wr_mas);
		break;
	case wr_node_store:
		mas_wr_node_store(wr_mas, new_end);
		break;
	case wr_spanning_store:
		mas_wr_spanning_store(wr_mas);
		break;
	case wr_split_store:
	case wr_rebalance:
		mas_wr_bnode(wr_mas);
		break;
	}

	return;
}

mas_store_gfp 将值存储到树中

/**
 * mas_store_gfp() - 将值存储到树中。
 * @mas:枫树状态
 * @entry:商店入口
 * @gfp:必要时用于分配的GFP_FLAGS。
 *
 * 返回:成功时为 0,无效请求时为 -EINVAL,内存无法时为 -ENOMEM
 * 被分配。
 */
int mas_store_gfp(struct ma_state *mas, void *entry, gfp_t gfp)
{
	/* 保存 mas 的索引范围(index 和 last),用于后续操作 */
	unsigned long index = mas->index;
	unsigned long last = mas->last;
	/* 初始化写入状态 */
	MA_WR_STATE(wr_mas, mas, entry);
	int ret = 0;

retry:
	mas_wr_preallocate(&wr_mas, entry);
	/* 检查是否需要分配内存 */
	if (unlikely(mas_nomem(mas, gfp))) {
		if (!entry)
		/*  设置范围,然后跳转到 retry 重新尝试 */
			__mas_set_range(mas, index, last);
		goto retry;
	}

	if (mas_is_err(mas)) {
		ret = xa_err(mas->node);
		goto out;
	}
	/* 将值存储到树中 */
	mas_wr_store_entry(&wr_mas);
out:
	mas_destroy(mas);
	return ret;
}
EXPORT_SYMBOL_GPL(mas_store_gfp);

tree_load: 从枫树中加载一个值

此函数是Linux内核中“枫树”(Maple Tree)数据结构的核心读取操作接口。它的作用是根据给定的索引(index),在指定的枫树(mt)中高效地查找并返回存储在该索引处的数据条目(一个指针)。枫树是一种为稀疏、大范围的无符号长整型索引设计的、对CPU缓存友好的B-树,它的查找性能非常高。

/**
 * mtree_load() - 从一个枫树中加载一个值
 * @mt: 目标枫树
 * @index: 要加载的条目所对应的索引
 *
 * 返回: 找到的数据条目, 如果未找到则返回 %NULL
 */
void *mtree_load(struct maple_tree *mt, unsigned long index)
{
	/*
	 * MA_STATE 是一个宏, 用于在栈上定义并初始化一个名为'mas'的枫树状态机(Maple State)结构体.
	 * 这个状态机将跟踪查找过程中的所有状态.
	 *  mas: 状态机变量名.
	 *  mt: 要操作的枫树.
	 *  index, index: 查找范围的起始和结束. 因为是加载单个值, 所以起始和结束是同一个索引.
	 */
	MA_STATE(mas, mt, index, index);
	/*
	 * 定义一个 void 指针变量 entry, 它将用于存储最终找到的数据条目.
	 */
	void *entry;

	/*
	 * 调用跟踪点函数, 用于在内核跟踪(tracing)启用时记录本次读操作, 便于调试和性能分析.
	 */
	trace_ma_read(__func__, &mas);
	/*
	 * 调用 rcu_read_lock() 进入RCU的读端临界区.
	 * 在单核抢占式系统上, 这通常通过禁用内核抢占来实现.
	 * 这可以防止在遍历树的节点时, 被一个正在修改此树的写者任务抢占, 从而保证了读取过程的一致性.
	 */
	rcu_read_lock();
/*
 * 定义一个goto标签, 用于在查找因并发修改而失败时进行重试.
 */
retry:
	/*
	 * 调用 mas_start(), 开始查找过程. 它会定位到树的根节点, 并准备开始遍历.
	 * 对于一些特殊情况(如树只有一个节点), 它可能直接返回结果.
	 */
	entry = mas_start(&mas);
	/*
	 * 检查状态机是否处于'mas_is_none'状态, 这表示查找立即失败(例如树为空, 或索引超出范围).
	 * unlikely() 是给编译器的优化提示, 表示此分支很少发生.
	 */
	if (unlikely(mas_is_none(&mas)))
		goto unlock; // 如果是, 直接跳转到解锁步骤.

	/*
	 * 检查状态机是否处于'mas_is_ptr'状态. 这是针对树只有一个条目的优化情况,
	 * 该条目直接存储在树的根指针中.
	 */
	if (unlikely(mas_is_ptr(&mas))) {
		/*
		 * 对于单条目树, 该条目只能在索引0处. 如果要查找的索引不是0, 则视为未找到.
		 */
		if (index)
			entry = NULL;

		goto unlock; // 处理完毕, 跳转到解锁步骤.
	}

	/*
	 * 调用 mtree_lookup_walk(), 这是主要的遍历函数.
	 * 它会从当前状态机(mas)的位置开始, 沿着树向下遍历, 直到找到包含目标索引的叶子节点.
	 */
	entry = mtree_lookup_walk(&mas);
	/*
	 * 检查遍历后的结果. 如果没有找到条目(entry为NULL), 并且状态机报告说它被重置回了
	 * 起始状态(unlikely(mas_is_start(&mas))), 这表明在遍历过程中, 我们所在的树路径
	 * 被一个并发的写者修改了, 导致遍历失败.
	 */
	if (!entry && unlikely(mas_is_start(&mas)))
		goto retry; // 在这种情况下, 我们必须跳转到retry标签, 重新进行整个查找过程.
/*
 * 定义一个goto标签, 作为所有退出路径的统一出口.
 */
unlock:
	/*
	 * 调用 rcu_read_unlock(), 退出RCU读端临界区. 在单核系统上, 这会重新启用内核抢占.
	 */
	rcu_read_unlock();
	/*
	 * 枫树的底层实现(XArray)用一个特殊的非NULL指针来表示"零值条目".
	 * xa_is_zero() 用于检查返回的条目是否是这个特殊的"零值条目".
	 * 如果是, 我们必须将其转换成一个标准的 NULL 指针返回给调用者.
	 */
	if (xa_is_zero(entry))
		return NULL;

	/*
	 * 返回找到的数据条目指针. 如果未找到, 此处将是NULL.
	 */
	return entry;
}
/*
 * EXPORT_SYMBOL 是一个宏, 用于将 mtree_load 函数的符号导出到内核的符号表中.
 * 这使得其他内核模块(LKM)可以调用这个函数, 因为它是一个基础数据结构的核心API.
 */
EXPORT_SYMBOL(mtree_load);
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐