相关:
《Postgresql源码(66)insert on conflict语法介绍与内核执行流程解析》
《Postgresql源码(70)逻辑复制DecodeXLOG主要流程和数据结构》

本篇只简单介绍逻辑复制解析过程,这里面的比较有意思的历史快照还没有看,后面在补一篇。

0 总结速查

  • 优先理解数据结构,整体流程就是一个解析–>半成品挂链–>回调处理生成成品的过程。
  • 调pg_logical_slot_get_changes走decode这部分代码逻辑简单且相对独立,可以参考的地方:
    • 经典查找结构hashtable entry中挂dlist的数据结构。
    • 从XLOG反解并拼出tuple的几个函数调用。
    • 完整独立的Xlog解析接口的调用、hash表、dlist的操作;不同日志类型的处理方式。

1 解析流程

逻辑复制数据结构稍复杂,流程逻辑很简单。(主要是眼花了把reorder看成了record,困惑了很久😭)

walsender进程不断的从自己的复制槽中获取新产生的wal record,通过LogicalDecodingProcessRecord()函数进行wal record的初步过滤和解析,解析结果为一个ReorderBufferChange(解析半成品)(对于DML语句而言这个结构里面主要的信息为oldtuple和newtuple),并将这个解析结果放置到当前事务ID对应的解析buf中去。

当某个事务发生了提交,先通过ReorderBuffer维护的哈希表,通过XID找到ReorderBufferTXN,ReorderBufferTXN维护的链中的ReorderBufferChange(半成品)会被指定的plugin做二次处理(成品),并将处理结果按照指定的途径输出。

分解具体分成几步:

  1. XLogReadRecord模块拿到一条完整的的Xlog record
  2. LogicalDecodingProcessRecord收到record,将record数据内容识别出来,封装到ReorderBufferChange结构中。ReorderBufferChange对应一次修改。
  3. ReorderBufferChange串成链表,挂在ReorderBufferTXN结构上。
  4. ReorderBuffer维护一个哈希表,key为xid,values为ReorderBufferTXN
  5. 在发现事务提交XLOG时,从ReorderBuffer的哈希表中,用xid找到ReorderBufferTXN,在把维护的链表上的所有ReorderBufferChange拿到,把半成品ReorderBufferChange传入解析插件的回调函数,完成一个事务的处理。
  6. (insert on conflict的解析场景稍微特殊一些,这种spec insert会产生两条日志,参考这篇:《Postgresql源码(66)insert on conflict语法介绍与内核执行流程解析》
    在这里插入图片描述

2 实例

拿一个具体用例来看

drop table decoding_test;
CREATE TABLE decoding_test(x integer, y text);
SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');

-- 测试开始
SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
 pg_logical_slot_get_changes 
-----------------------------
(0 rows)

INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,2) s;

-- 调试位置
SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
                             pg_logical_slot_get_changes                             
-------------------------------------------------------------------------------------
 (7/80F087D0,4002996,"BEGIN 4002996")
 (7/80F087D0,4002996,"table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'")
 (7/80F08A00,4002996,"table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'")
 (7/80F08A70,4002996,"COMMIT 4002996")

2 关键数据结构

  1. ReorderBuffer中的哈希表维护了xid到TXN的映射。
  2. 每个TXN结构维护了DLIST,元素是Change结构。
  3. Change结构记录了修改的具体内容,也可以理解为逻辑解析的半成品。
  4. Change结构在事务提交时,会全部拿出来给试下指定的逻辑复制解析插件,按插件自己定制解析后输出逻辑解析成品。

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐