openmetadata 获取数据库表元数据代码分析-mysql为例
openmetadata 获取数据库表元数据-mysql为例
利用了 SQLAlchemy 这个 Python ORM 框架的**反射(Reflection)**机制,并对其进行了扩展和修补。
简单来说,通过“增强工具”和“继承通用逻辑”来实现元数据提取。
以下是提取 Metadata 的具体技术原理和步骤拆解:
🧩 1. 核心机制:利用 SQLAlchemy 的反射
代码的注释和结构明确指出,它通过扩展 SQLAlchemy 的反射功能来工作。
- 什么是反射: SQLAlchemy 可以连接到数据库,查询数据库的 information_schema(系统表),并自动将物理表映射为 Python 对象。
- 基础能力: 原生的 SQLAlchemy 已经能处理大部分标准的元数据提取(如获取表名、列名)。
🛠️ 2. 关键技术手段:代码是如何“修补”和“增强”提取能力的?
openmetadata最核心的工作不是从头写提取逻辑,而是通过**猴子补丁(Monkey Patching)**来修改 SQLAlchemy 的默认行为,以解决 MySQL 特有的兼容性问题。
🔧 手段一:扩展数据类型识别
ischema_names.update(col_type_map)
- 问题: MySQL 有一些特殊的数据类型(如 GEOMETRY, POINT, ENUM 等),标准的 SQLAlchemy 可能无法正确识别或映射。
- 解决: 这行代码将自定义的类型映射字典 col_type_map 更新到了 SQLAlchemy 的内部类型注册表 ischema_names 中。
- 效果: 在提取元数据时,如果遇到 MySQL 特有的类型,SQLAlchemy 不会报错或识别为未知类型,而是能正确提取并转换为 OpenMetadata 能理解的格式。
🔧 手段二:重写列解析逻辑
MySQLTableDefinitionParser._parse_column = parse_column
- 问题: 默认的列解析方法可能无法提取某些详细的元数据(例如列的原始定义、字符集、排序规则或注释的完整信息)。
- 解决: 代码用自定义的 parse_column 函数替换了 SQLAlchemy 内部的 _parse_column 方法。
- 效果: 在提取字段级详细信息(如字段类型精度、是否为无符号整数等)时,会执行这个自定义函数,从而提取到更丰富、更符合 MySQL 实际情况的元数据。
🔧 手段三:注入 DDL 获取能力
Inspector.get_table_ddl = get_table_ddl
- 问题: SQLAlchemy 的标准 Inspector(检查器)主要用于获取结构,不直接提供获取表创建语句(SHOW CREATE TABLE)的方法。
- 解决: 代码动态地给 Inspector 类添加了一个新方法 get_table_ddl。
- 效果: 在元数据采集流程中,连接器现在可以调用这个方法,直接执行 SHOW CREATE TABLE 语句,从而提取出表的完整 DDL(数据定义语言) 语句。这对于数据血缘和表结构对比非常关键。
📦 3. 流程执行:MysqlSource 类
代码最后定义的 MysqlSource 类继承自 CommonDbSourceService。
- 继承机制: CommonDbSourceService 已经实现了通用的元数据提取流程(如:连接数据库 -> 获取数据库列表 -> 获取表列表 -> 获取列信息 -> 发送到服务器)。
- 定制化: MysqlSource 主要负责配置验证(create 方法)。它确保传入的配置确实是 MySQL 的配置。
- 协同工作: 当 OpenMetadata 的摄取框架运行时:
- 它会实例化 MysqlSource。
- 利用前面“修补”好的 SQLAlchemy 组件(能识别特殊类型、能获取 DDL)。
- 执行标准的扫描流程,从而安全、准确地提取出 MySQL 的元数据。
📝 总结
openmetadata提取 Metadata 的方式可以概括为:
- 利用引擎: 基于 SQLAlchemy 的数据库反射能力作为基础。
- 修补短板: 通过“猴子补丁”技术,动态替换 SQLAlchemy 的内部方法,以支持 MySQL 特有的数据类型和 DDL 提取需求。
- 执行流程: 通过继承通用服务类,利用修补后的引擎自动执行扫描任务。
一句话总结: 它通过“改装” SQLAlchemy 这个工具箱,使其能完美适配 MySQL 的各种特性,然后利用这个改装后的工具箱去读取数据库的系统表和结构信息。
SQLAlchemy 功能实例
为了让你更直观地理解这段代码背后的技术原理,我将通过一个具体的 Python 代码示例 来演示 SQLAlchemy 是如何查询 information_schema 并将物理表映射为 Python 对象的。
在这个过程中,SQLAlchemy 充当了一个“翻译官”,让你可以用 Python 代码代替原生的 SQL 语句来获取数据库的元数据。
🧩 核心概念拆解
在 SQLAlchemy 中,这个过程主要依赖于 Inspector 和 MetaData 两个组件:
- Inspector(检查器): 负责连接数据库并“查看”数据库里有什么(即查询 information_schema)。
- MetaData(元数据): 负责保存从数据库中“看”到的结构信息(表名、列名等)。
- 映射: 将数据库中的“行”和“列”变成 Python 中的字典、列表或类实例。
🧪 实际代码演示
假设我们有一个 MySQL 数据库 my_test_db,里面有一张表叫 users。
第一步:建立连接
首先,我们需要创建一个引擎(Engine),这是 SQLAlchemy 连接数据库的入口。
from sqlalchemy import create_engine
数据库连接字符串 (Dialect + Driver)
格式:mysql+驱动://用户名:密码@地址:端口/数据库名
DATABASE_URL = “mysql+pymysql://root:password@localhost:3306/my_test_db”
创建引擎
engine = create_engine(DATABASE_URL)
第二步:使用 Inspector 查询 information_schema (提取元数据)
这里就是对应你代码中提到的“反射”过程。我们不需要写 SELECT * FROM information_schema.columns…,而是调用 SQLAlchemy 的方法。
from sqlalchemy import inspect
创建 Inspector 对象
inspector = inspect(engine)
- 获取数据库中所有的表名
这一步相当于执行了: SELECT table_name FROM information_schema.tables WHERE table_schema = ‘my_test_db’
tables = inspector.get_table_names()
print(“数据库中的表:”, tables)
输出: [‘users’, ‘orders’] (Python 列表对象)
第三步:将物理表结构映射为 Python 对象
这是最核心的一步。SQLAlchemy 可以把表的结构(列名、类型、是否为空等)读取出来,变成 Python 的字典或对象。
- 获取 ‘users’ 表的列信息
这一步相当于执行了复杂的查询: SELECT column_name, data_type, is_nullable, column_key FROM information_schema.columns WHERE table_name = ‘users’
columns = inspector.get_columns(‘users’)
print(“users 表的结构详情:”)
for col in columns:
# ‘col’ 是一个 Python 字典对象,包含了该列的所有元数据
print(f"字段名: {col[‘name’]}, "
f"数据类型: {col[‘type’]} (Python对象), "
f"是否为空: {col[‘nullable’]}, "
f"默认值: {col[‘default’]}")
输出结果示例:
字段名: id, 数据类型: INTEGER(), 是否为空: False, 默认值: None
字段名: name, 数据类型: VARCHAR(length=50), 是否为空: True, 默认值: None
💡 技术揭秘:
注意 数据类型 这一栏。SQLAlchemy 并没有把类型存成字符串 “INT” 或 “VARCHAR”,而是将其映射成了 Python 的 对象(INTEGER() 和 VARCHAR() 对象)。这就是代码中 ischema_names 的作用——它是一个字典,定义了如何将数据库的字符串类型(如 ‘INT’)转换为 Python 的类(如 Integer)。
第四步:高级映射 - 将表映射为类 (ORM)
除了获取结构,SQLAlchemy 还能将整个表映射为一个 Python 类。当你查询数据时,每一行数据都会变成这个类的一个实例(对象)。
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
Base = declarative_base()
定义一个 Python 类来映射数据库中的 users 表
class User(Base):
tablename = ‘users’
# 这里的定义对应了数据库的字段
id = Column(Integer, primary_key=True)
name = Column(String(50))
现在,我们可以查询数据了
Session = sessionmaker(bind=engine)
session = Session()
查询所有用户
users = session.query(User).all()
for user in users:
# ‘user’ 是一个 Python 对象
# 我们可以直接用点号访问属性,而不需要用索引
print(f"用户对象: ID={user.id}, Name={user.name}")
📌 总结:对应到你的代码
回到你提供的 OpenMetadata 代码,它的工作流程正是基于上述原理,但做了增强:
- inspector.get_table_names() -> 对应代码中扫描所有表的逻辑。
- inspector.get_columns() -> 对应代码中提取字段级详细信息的逻辑。
- ischema_names.update(…) -> 对应代码中为了让 SQLAlchemy 能识别 MySQL 特有类型(如 GEOMETRY)而做的修补。
- Inspector 扩展 -> 对应代码中注入的 get_table_ddl 方法,用来获取 SHOW CREATE TABLE 的结果。
通过这种方式,OpenMetadata 避免了手动拼接复杂的 SQL 语句去查询 information_schema,而是利用 SQLAlchemy 这个成熟的工具,以面向对象的方式高效、安全地提取了元数据。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)