SQLAlchemy连接mysql数据库并进行CRUD 流程
本文介绍了使用SQLAlchemy操作MySQL数据库的完整流程。首先安装必要的Python库(pymysql和sqlalchemy),然后创建数据库连接配置。通过定义User模型类映射数据库表,使用create_all()方法创建表结构。演示了完整的CRUD操作:创建新用户、查询所有用户/条件查询、更新用户信息和删除用户记录。最后展示了如何执行原始SQL查询。代码包含异常处理和事务管理(comm
·
环境准备
首先,请确保你的开发环境中安装了 Python。然后,我们需要安装两个核心库:
- SQLAlchemy: ORM 框架本身。
- PyMySQL: 一个纯 Python 实现的 MySQL 客户端库,SQLAlchemy 需要它来与 MySQL 数据库通信。
打开你的终端或命令行,运行以下命令进行安装:
pip install sqlalchemy pymysql
数据库准备
在开始编码前,你需要手动创建一个数据库,执行以下 SQL 命令:
CREATE DATABASE IF NOT EXISTS my_test_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
说明:utf8mb4 是推荐的字符集,它支持包括 Emoji 在内的所有 Unicode 字符。
Python 代码实现
1.数据库连接配置
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import sessionmaker, declarative_base
# --- 1. 数据库连接配置 ---
# 请根据你的实际情况修改以下配置
DB_USER = 'root' # MySQL 用户名
DB_PASSWORD = '123456'# MySQL 密码
DB_HOST = 'localhost' # 数据库主机地址
DB_PORT = 3306 # MySQL服务端口
DB_NAME = 'my_test_db'# 数据库名称
DATABASE_URL = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
2.创建引擎
engine = create_engine(
DATABASE_URL,
echo=True, # 打印执行的 SQL 语句,方便调试
pool_pre_ping=True # 检查连接有效性
)
3.声明基类并定义数据模型
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
name = Column(String(50), nullable=False, unique=True, comment='用户名')
email = Column(String(120), nullable=False, comment='用户邮箱')
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
4.创建数据库表,会话并进行数据清理
print("--- 创建数据库表 ---")
try:
# 如果表不存在则创建,存在则不做任何操作
Base.metadata.create_all(engine)
print("表创建成功或已存在。")
except sqlalchemy.exc.SQLAlchemyError as e:
print(f"创建表时出错: {e}")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
print("\n--- 清理 users 表中的旧数据 ---")
with SessionLocal() as db_cleanup:
try:
# 删除表中所有记录
db_cleanup.query(User).delete()
db_cleanup.commit()
print("旧数据清理完成。")
except sqlalchemy.exc.SQLAlchemyError as e:
db_cleanup.rollback()
print(f"清理数据时出错: {e}")
5.执行CRUD操作
print("\n--- 开始执行 CRUD 操作 ---")
with SessionLocal() as db:
# --- C: Create (创建) ---
print("\n--- [C] 创建新用户 ---")
new_user1 = User(name="Alice", email="alice@example.com")
new_user2 = User(name="Bob", email="bob@example.com")
db.add(new_user1)
db.add(new_user2)
db.commit()
# 刷新会话,获取数据库生成的 id 等信息
db.refresh(new_user1)
db.refresh(new_user2)
print(f"创建成功: {new_user1}, {new_user2}")
# --- R
print("\n--- [R] 读取用户 ---")
# 查询所有用户
all_users = db.query(User).all()
print(f"所有用户: {all_users}")
# 根据主键查询单个用户
user_id = new_user1.id
user_by_id = db.get(User, user_id)
print(f"根据ID {user_id} 查询: {user_by_id}")
# 条件查询 (返回第一个结果)
user_by_name = db.query(User).filter(User.name == "Bob").first()
print(f"查询名字为 'Bob' 的用户: {user_by_name}")
# 条件查询 (返回所有结果)
users_with_example_email = db.query(User).filter(User.email.like('%@example.com')).all()
print(f"查询所有邮箱包含 '@example.com' 的用户: {users_with_example_email}")
print("\n--- [U] 更新用户 ---")
user_to_update = db.query(User).filter(User.name == "Alice").first()
if user_to_update:
user_to_update.email = "alice.smith@example.com"
db.commit()
db.refresh(user_to_update)
print(f"更新成功: {user_to_update}")
print("\n--- [D] 删除用户 ---")
user_to_delete = db.query(User).filter(User.name == "Bob").first()
if user_to_delete:
db.delete(user_to_delete)
db.commit()
print(f"删除用户 '{user_to_delete.name}' 成功。")
# 验证删除结果
deleted_user = db.query(User).filter(User.name == "Bob").first()
print(f"删除后查询 'Bob': {deleted_user} (应为 None)")
6.执行原始SQL查询
print("\n--- 执行原始 SQL 查询 ---")
with engine.connect() as conn:
try:
result = conn.execute(text("SELECT * FROM users"))
rows = result.fetchall()
print("原始 SQL 查询结果 (当前表中剩余的用户):")
for row in rows:
print(row)
# 对于 SELECT 语句,commit() 不是必须的,但执行 DML 后需要
conn.commit()
except sqlalchemy.exc.SQLAlchemyError as e:
print(f"执行原始 SQL 时出错: {e}")
完整代码为:
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, text
from sqlalchemy.orm import sessionmaker, declarative_base
# --- 1. 数据库连接配置 ---
# 请根据你的实际情况修改以下配置
DB_USER = 'root' # MySQL 用户名
DB_PASSWORD = '123456'# MySQL 密码
DB_HOST = 'localhost' # 数据库主机地址
DB_PORT = 3306 # MySQL服务端口
DB_NAME = 'my_test_db'# 数据库名称
DATABASE_URL = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
# --- 2. 创建引擎 (Engine) ---
engine = create_engine(
DATABASE_URL,
echo=True, # 打印执行的 SQL 语句,方便调试
pool_pre_ping=True # 检查连接有效性
)
# --- 3. 声明性基类 (Declarative Base) ---
Base = declarative_base()
# --- 4. 定义数据模型 (Model) ---
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
name = Column(String(50), nullable=False, unique=True, comment='用户名')
email = Column(String(120), nullable=False, comment='用户邮箱')
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
# --- 5. 创建数据库表 ---
print("--- 创建数据库表 ---")
try:
# 如果表不存在则创建,存在则不做任何操作
Base.metadata.create_all(engine)
print("表创建成功或已存在。")
except sqlalchemy.exc.SQLAlchemyError as e:
print(f"创建表时出错: {e}")
# --- 6. 创建会话工厂 (Session Factory) ---
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# --- 7. 数据清理(用于演示/测试,每次运行前确保表为空)---
print("\n--- 清理 users 表中的旧数据 ---")
with SessionLocal() as db_cleanup:
try:
# 删除表中所有记录
db_cleanup.query(User).delete()
db_cleanup.commit()
print("旧数据清理完成。")
except sqlalchemy.exc.SQLAlchemyError as e:
db_cleanup.rollback()
print(f"清理数据时出错: {e}")
# --- 8. 执行 CRUD 操作 ---
print("\n--- 开始执行 CRUD 操作 ---")
with SessionLocal() as db:
# --- C: Create (创建) ---
print("\n--- [C] 创建新用户 ---")
new_user1 = User(name="Alice", email="alice@example.com")
new_user2 = User(name="Bob", email="bob@example.com")
db.add(new_user1)
db.add(new_user2)
db.commit()
# 刷新会话,获取数据库生成的 id 等信息
db.refresh(new_user1)
db.refresh(new_user2)
print(f"创建成功: {new_user1}, {new_user2}")
# --- R
print("\n--- [R] 读取用户 ---")
# 查询所有用户
all_users = db.query(User).all()
print(f"所有用户: {all_users}")
# 根据主键查询单个用户
user_id = new_user1.id
user_by_id = db.get(User, user_id)
print(f"根据ID {user_id} 查询: {user_by_id}")
# 条件查询 (返回第一个结果)
user_by_name = db.query(User).filter(User.name == "Bob").first()
print(f"查询名字为 'Bob' 的用户: {user_by_name}")
# 条件查询 (返回所有结果)
users_with_example_email = db.query(User).filter(User.email.like('%@example.com')).all()
print(f"查询所有邮箱包含 '@example.com' 的用户: {users_with_example_email}")
print("\n--- [U] 更新用户 ---")
user_to_update = db.query(User).filter(User.name == "Alice").first()
if user_to_update:
user_to_update.email = "alice.smith@example.com"
db.commit()
db.refresh(user_to_update)
print(f"更新成功: {user_to_update}")
print("\n--- [D] 删除用户 ---")
user_to_delete = db.query(User).filter(User.name == "Bob").first()
if user_to_delete:
db.delete(user_to_delete)
db.commit()
print(f"删除用户 '{user_to_delete.name}' 成功。")
# 验证删除结果
deleted_user = db.query(User).filter(User.name == "Bob").first()
print(f"删除后查询 'Bob': {deleted_user} (应为 None)")
# --- 9. 执行原始 SQL 查询 ---
print("\n--- 执行原始 SQL 查询 ---")
with engine.connect() as conn:
try:
result = conn.execute(text("SELECT * FROM users"))
rows = result.fetchall()
print("原始 SQL 查询结果 (当前表中剩余的用户):")
for row in rows:
print(row)
# 对于 SELECT 语句,commit() 不是必须的,但执行 DML 后需要
conn.commit()
except sqlalchemy.exc.SQLAlchemyError as e:
print(f"执行原始 SQL 时出错: {e}")
print("\n--- 所有操作完成 ---")
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)