环境准备

首先,请确保你的开发环境中安装了 Python。然后,我们需要安装两个核心库:

  • SQLAlchemy: ORM 框架本身。
  • PyMySQL: 一个纯 Python 实现的 MySQL 客户端库,SQLAlchemy 需要它来与 MySQL 数据库通信。

打开你的终端或命令行,运行以下命令进行安装:

pip install sqlalchemy pymysql

 数据库准备

在开始编码前,你需要手动创建一个数据库,执行以下 SQL 命令:

CREATE DATABASE IF NOT EXISTS my_test_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

说明utf8mb4 是推荐的字符集,它支持包括 Emoji 在内的所有 Unicode 字符。

Python 代码实现

1.数据库连接配置

import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import sessionmaker, declarative_base

# --- 1. 数据库连接配置 ---
# 请根据你的实际情况修改以下配置
DB_USER = 'root'      # MySQL 用户名
DB_PASSWORD = '123456'# MySQL 密码
DB_HOST = 'localhost' # 数据库主机地址
DB_PORT = 3306        # MySQL服务端口
DB_NAME = 'my_test_db'# 数据库名称

DATABASE_URL = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

2.创建引擎

engine = create_engine(
    DATABASE_URL,
    echo=True,         # 打印执行的 SQL 语句,方便调试
    pool_pre_ping=True # 检查连接有效性
)

3.声明基类并定义数据模型

Base = declarative_base()


class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
    name = Column(String(50), nullable=False, unique=True, comment='用户名')
    email = Column(String(120), nullable=False, comment='用户邮箱')

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

4.创建数据库表,会话并进行数据清理

print("--- 创建数据库表 ---")
try:
    # 如果表不存在则创建,存在则不做任何操作
    Base.metadata.create_all(engine)
    print("表创建成功或已存在。")
except sqlalchemy.exc.SQLAlchemyError as e:
    print(f"创建表时出错: {e}")

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

print("\n--- 清理 users 表中的旧数据 ---")
with SessionLocal() as db_cleanup:
    try:
        # 删除表中所有记录
        db_cleanup.query(User).delete()
        db_cleanup.commit()
        print("旧数据清理完成。")
    except sqlalchemy.exc.SQLAlchemyError as e:
        db_cleanup.rollback()
        print(f"清理数据时出错: {e}")

5.执行CRUD操作

print("\n--- 开始执行 CRUD 操作 ---")
with SessionLocal() as db:
    # --- C: Create (创建) ---
    print("\n--- [C] 创建新用户 ---")
    new_user1 = User(name="Alice", email="alice@example.com")
    new_user2 = User(name="Bob", email="bob@example.com")

    db.add(new_user1)
    db.add(new_user2)
    db.commit()
    # 刷新会话,获取数据库生成的 id 等信息
    db.refresh(new_user1)
    db.refresh(new_user2)
    print(f"创建成功: {new_user1}, {new_user2}")

    # --- R
    print("\n--- [R] 读取用户 ---")
    # 查询所有用户
    all_users = db.query(User).all()
    print(f"所有用户: {all_users}")

    # 根据主键查询单个用户
    user_id = new_user1.id
    user_by_id = db.get(User, user_id)
    print(f"根据ID {user_id} 查询: {user_by_id}")

    # 条件查询 (返回第一个结果)
    user_by_name = db.query(User).filter(User.name == "Bob").first()
    print(f"查询名字为 'Bob' 的用户: {user_by_name}")

    # 条件查询 (返回所有结果)
    users_with_example_email = db.query(User).filter(User.email.like('%@example.com')).all()
    print(f"查询所有邮箱包含 '@example.com' 的用户: {users_with_example_email}")

    print("\n--- [U] 更新用户 ---")
    user_to_update = db.query(User).filter(User.name == "Alice").first()
    if user_to_update:
        user_to_update.email = "alice.smith@example.com"
        db.commit()
        db.refresh(user_to_update)
        print(f"更新成功: {user_to_update}")

    print("\n--- [D] 删除用户 ---")
    user_to_delete = db.query(User).filter(User.name == "Bob").first()
    if user_to_delete:
        db.delete(user_to_delete)
        db.commit()
        print(f"删除用户 '{user_to_delete.name}' 成功。")

    # 验证删除结果
    deleted_user = db.query(User).filter(User.name == "Bob").first()
    print(f"删除后查询 'Bob': {deleted_user} (应为 None)")

6.执行原始SQL查询

print("\n--- 执行原始 SQL 查询 ---")
with engine.connect() as conn:
    try:
        result = conn.execute(text("SELECT * FROM users"))
        rows = result.fetchall()
        print("原始 SQL 查询结果 (当前表中剩余的用户):")
        for row in rows:
            print(row)
        # 对于 SELECT 语句,commit() 不是必须的,但执行 DML 后需要
        conn.commit()
    except sqlalchemy.exc.SQLAlchemyError as e:
        print(f"执行原始 SQL 时出错: {e}")

完整代码为:

import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import sessionmaker, declarative_base

# --- 1. 数据库连接配置 ---
# 请根据你的实际情况修改以下配置
DB_USER = 'root'      # MySQL 用户名
DB_PASSWORD = '123456'# MySQL 密码
DB_HOST = 'localhost' # 数据库主机地址
DB_PORT = 3306        # MySQL服务端口
DB_NAME = 'my_test_db'# 数据库名称

DATABASE_URL = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# --- 2. 创建引擎 (Engine) ---
engine = create_engine(
    DATABASE_URL,
    echo=True,         # 打印执行的 SQL 语句,方便调试
    pool_pre_ping=True # 检查连接有效性
)

# --- 3. 声明性基类 (Declarative Base) ---
Base = declarative_base()

# --- 4. 定义数据模型 (Model) ---
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
    name = Column(String(50), nullable=False, unique=True, comment='用户名')
    email = Column(String(120), nullable=False, comment='用户邮箱')

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

# --- 5. 创建数据库表 ---
print("--- 创建数据库表 ---")
try:
    # 如果表不存在则创建,存在则不做任何操作
    Base.metadata.create_all(engine)
    print("表创建成功或已存在。")
except sqlalchemy.exc.SQLAlchemyError as e:
    print(f"创建表时出错: {e}")

# --- 6. 创建会话工厂 (Session Factory) ---
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# --- 7. 数据清理(用于演示/测试,每次运行前确保表为空)---
print("\n--- 清理 users 表中的旧数据 ---")
with SessionLocal() as db_cleanup:
    try:
        # 删除表中所有记录
        db_cleanup.query(User).delete()
        db_cleanup.commit()
        print("旧数据清理完成。")
    except sqlalchemy.exc.SQLAlchemyError as e:
        db_cleanup.rollback()
        print(f"清理数据时出错: {e}")

# --- 8. 执行 CRUD 操作 ---
print("\n--- 开始执行 CRUD 操作 ---")
with SessionLocal() as db:
    # --- C: Create (创建) ---
    print("\n--- [C] 创建新用户 ---")
    new_user1 = User(name="Alice", email="alice@example.com")
    new_user2 = User(name="Bob", email="bob@example.com")

    db.add(new_user1)
    db.add(new_user2)
    db.commit()
    # 刷新会话,获取数据库生成的 id 等信息
    db.refresh(new_user1)
    db.refresh(new_user2)
    print(f"创建成功: {new_user1}, {new_user2}")

    # --- R
    print("\n--- [R] 读取用户 ---")
    # 查询所有用户
    all_users = db.query(User).all()
    print(f"所有用户: {all_users}")

    # 根据主键查询单个用户
    user_id = new_user1.id
    user_by_id = db.get(User, user_id)
    print(f"根据ID {user_id} 查询: {user_by_id}")

    # 条件查询 (返回第一个结果)
    user_by_name = db.query(User).filter(User.name == "Bob").first()
    print(f"查询名字为 'Bob' 的用户: {user_by_name}")

    # 条件查询 (返回所有结果)
    users_with_example_email = db.query(User).filter(User.email.like('%@example.com')).all()
    print(f"查询所有邮箱包含 '@example.com' 的用户: {users_with_example_email}")

    print("\n--- [U] 更新用户 ---")
    user_to_update = db.query(User).filter(User.name == "Alice").first()
    if user_to_update:
        user_to_update.email = "alice.smith@example.com"
        db.commit()
        db.refresh(user_to_update)
        print(f"更新成功: {user_to_update}")

    print("\n--- [D] 删除用户 ---")
    user_to_delete = db.query(User).filter(User.name == "Bob").first()
    if user_to_delete:
        db.delete(user_to_delete)
        db.commit()
        print(f"删除用户 '{user_to_delete.name}' 成功。")

    # 验证删除结果
    deleted_user = db.query(User).filter(User.name == "Bob").first()
    print(f"删除后查询 'Bob': {deleted_user} (应为 None)")

# --- 9. 执行原始 SQL 查询 ---
print("\n--- 执行原始 SQL 查询 ---")
with engine.connect() as conn:
    try:
        result = conn.execute(text("SELECT * FROM users"))
        rows = result.fetchall()
        print("原始 SQL 查询结果 (当前表中剩余的用户):")
        for row in rows:
            print(row)
        # 对于 SELECT 语句,commit() 不是必须的,但执行 DML 后需要
        conn.commit()
    except sqlalchemy.exc.SQLAlchemyError as e:
        print(f"执行原始 SQL 时出错: {e}")

print("\n--- 所有操作完成 ---")

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐