Python数据库编程

    Python数据库编程涉及使用Python语言与各种数据库系统进行交互,执行如创建、读取、更新和删除(CRUD)等操作。Python支持多种数据库,包括各种主流数据库系统:关系型数据库(如MySQL、PostgreSQL、SQLite)和非关系型数据库(如MongoDB、Redis)

1. 常用数据库连接方式

1.1 SQLite (内置数据库)

import sqlite3

# 连接数据库(不存在则创建)
conn = sqlite3.connect('example.db')

# 创建游标
cursor = conn.cursor()

# 执行SQL语句
cursor.execute('''CREATE TABLE IF NOT EXISTS stocks
             (date text, trans text, symbol text, qty real, price real)''')

# 插入数据
cursor.execute("INSERT INTO stocks VALUES ('2023-01-01','BUY','RHAT',100,35.14)")

# 提交事务
conn.commit()

# 查询数据
cursor.execute("SELECT * FROM stocks")
print(cursor.fetchall())

# 关闭连接
conn.close()

1.2 MySQL

import mysql.connector

# 连接数据库
conn = mysql.connector.connect(
    host="localhost",
    user="username",
    password="password",
    database="mydatabase"
)

cursor = conn.cursor()

# 执行SQL操作...
cursor.execute("SELECT * FROM customers")
result = cursor.fetchall()

conn.close()

1.3 PostgreSQL

import psycopg2

conn = psycopg2.connect(
    host="localhost",
    database="mydatabase",
    user="username",
    password="password"
)

cursor = conn.cursor()
# 执行操作...
conn.close()

1.4 SQL Server

import pyodbc

conn = pyodbc.connect(
    'DRIVER={ODBC Driver 17 for SQL Server};'
    'SERVER=localhost;'
    'DATABASE=mydatabase;'
    'UID=username;'
    'PWD=password'
)

cursor = conn.cursor()
# 执行操作...
conn.close()

2. ORM (对象关系映射)

2.1 SQLAlchemy

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# 连接数据库
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 添加数据
new_user = User(name='Alice', age=25)
session.add(new_user)
session.commit()

# 查询数据
users = session.query(User).filter_by(name='Alice').all()
for user in users:
    print(user.id, user.name, user.age)

session.close()

2.2 Django ORM

# Django models.py示例
from django.db import models

class Book(models.Model):
    title = models.CharField(max_length=100)
    author = models.CharField(max_length=100)
    published_date = models.DateField()

# 使用示例
book = Book(title='Python编程', author='John Doe', published_date='2023-01-01')
book.save()

# 查询
books = Book.objects.filter(author='John Doe')

3. 数据库工具包

3.1 SQLite3 (内置)

如前面示例所示,Python内置了SQLite支持。

3.2 PyMySQL (MySQL)

import pymysql

conn = pymysql.connect(
    host='localhost',
    user='username',
    password='password',
    database='mydatabase'
)

cursor = conn.cursor()
# 执行操作...
conn.close()

3.3 psycopg2 (PostgreSQL)

如前面PostgreSQL示例所示。

4. NoSQL数据库

4.1 MongoDB (PyMongo)

from pymongo import MongoClient

# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')

# 选择数据库
db = client['mydatabase']

# 选择集合(类似于表)
collection = db['customers']

# 插入文档
customer = {"name": "John", "address": "Highway 37"}
collection.insert_one(customer)

# 查询
for x in collection.find():
    print(x)

4.2 Redis (redis-py)

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# 设置键值
r.set('foo', 'bar')

# 获取值
value = r.get('foo')
print(value)

5. 数据库最佳实践

  1. 使用连接池:对于频繁的数据库操作,使用连接池提高性能

  2. 参数化查询:防止SQL注入

  3. 异常处理:妥善处理数据库操作中的异常

  4. 上下文管理器:使用with语句自动管理连接

  5. ORM选择:根据项目规模选择合适的ORM工具

使用上下文管理器示例

import sqlite3

with sqlite3.connect('example.db') as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM stocks")
    print(cursor.fetchall())
    # 不需要显式调用conn.close()

参数化查询示例

# 错误方式(易受SQL注入攻击)
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# 正确方式(参数化查询)
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))

Python数据库高阶编程

掌握了基础数据库操作后,下面介绍一些Python数据库编程的高级技巧和实践。

1. 高级连接管理

1.1 连接池技术

# 使用SQLAlchemy连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://user:password@localhost/mydb',
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600
)

# 使用psycopg2连接池
from psycopg2 import pool

connection_pool = pool.SimpleConnectionPool(
    minconn=1,
    maxconn=10,
    host='localhost',
    database='mydb',
    user='user',
    password='password'
)

# 从池中获取连接
conn = connection_pool.getconn()
# 使用连接...
# 归还连接到池
connection_pool.putconn(conn)

1.2 异步数据库访问

# 使用aiomysql异步MySQL客户端
import asyncio
import aiomysql

async def execute_query():
    conn = await aiomysql.connect(
        host='localhost', 
        port=3306,
        user='user',
        password='password',
        db='mydb'
    )
    
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT * FROM users")
        result = await cursor.fetchall()
        print(result)
    
    conn.close()

asyncio.run(execute_query())

# 使用asyncpg异步PostgreSQL客户端
import asyncpg

async def run():
    conn = await asyncpg.connect(
        user='user',
        password='password',
        database='mydb',
        host='localhost'
    )
    
    values = await conn.fetch('SELECT * FROM users')
    print(values)
    
    await conn.close()

asyncio.run(run())

2. 高级ORM技巧

2.1 SQLAlchemy高级查询

from sqlalchemy import func, case, and_, or_

# 聚合函数
session.query(func.count(User.id)).scalar()

# 条件表达式
session.query(
    User.name,
    case(
        [(User.age < 18, '未成年'),
         (User.age >= 18, '成年')],
        else_='未知'
    ).label('age_group')
)

# 复杂条件查询
session.query(User).filter(
    or_(
        User.name.like('A%'),
        and_(
            User.age >= 18,
            User.age <= 30
        )
    )
)

# 子查询
subq = session.query(
    func.count(Order.id).label('order_count')
).filter(
    Order.user_id == User.id
).correlate(User).as_scalar()

session.query(User, subq)

2.2 批量操作优化

# 批量插入 - 原生SQL方式
data = [{'name': f'user{i}', 'age': i} for i in range(1000)]
conn.execute(
    "INSERT INTO users (name, age) VALUES (:name, :age)",
    data
)

# SQLAlchemy批量插入
session.bulk_insert_mappings(User, data)

# 批量更新
session.bulk_update_mappings(User, [
    {'id': 1, 'name': 'new name1'},
    {'id': 2, 'name': 'new name2'}
])

# 使用executemany
cursor.executemany(
    "INSERT INTO users (name, age) VALUES (?, ?)",
    [('user1', 20), ('user2', 25)]
)

3. 数据库迁移与版本控制

3.1 Alembic (SQLAlchemy迁移工具)

# 初始化迁移环境
# alembic init migrations

# 配置alembic.ini
# sqlalchemy.url = driver://user:pass@localhost/dbname

# 生成迁移脚本
# alembic revision --autogenerate -m "create user table"

# 应用迁移
# alembic upgrade head

# 回滚迁移
# alembic downgrade -1

3.2 Django迁移

# 创建迁移文件
# python manage.py makemigrations

# 应用迁移
# python manage.py migrate

# 查看迁移SQL
# python manage.py sqlmigrate app_name migration_number

4. 高级事务管理

4.1 嵌套事务与保存点

# SQLAlchemy保存点
with session.begin():
    session.add(User(name='user1'))
    
    # 创建保存点
    savepoint = session.begin_nested()
    try:
        session.add(User(name='user2'))
        savepoint.commit()
    except:
        savepoint.rollback()
        raise

# 使用原生SQL保存点(SQLite示例)
conn.execute("SAVEPOINT my_savepoint")
try:
    conn.execute("INSERT INTO users (name) VALUES ('test')")
    conn.execute("RELEASE SAVEPOINT my_savepoint")
except:
    conn.execute("ROLLBACK TO SAVEPOINT my_savepoint")

4.2 分布式事务

# 使用两阶段提交(XA事务)
from sqlalchemy import create_engine
from sqlalchemy.sql import text

# 配置多个数据库引擎
engine1 = create_engine('postgresql://user:pwd@host1/db1')
engine2 = create_engine('mysql://user:pwd@host2/db2')

# 获取连接
conn1 = engine1.connect()
conn2 = engine2.connect()

# 开始分布式事务
try:
    trans1 = conn1.begin()
    trans2 = conn2.begin()
    
    # 执行跨库操作
    conn1.execute(text("UPDATE accounts SET balance = balance - 100 WHERE id = 1"))
    conn2.execute(text("UPDATE accounts SET balance = balance + 100 WHERE id = 2"))
    
    # 提交
    trans1.commit()
    trans2.commit()
except:
    trans1.rollback()
    trans2.rollback()
    raise
finally:
    conn1.close()
    conn2.close()

5. 性能优化技巧

5.1 查询优化

# 使用JOIN优化
session.query(User).join(Order).filter(Order.amount > 100)

# 使用子查询优化
subq = session.query(Order.user_id).filter(Order.amount > 100).subquery()
session.query(User).filter(User.id.in_(subq))

# 使用EXISTS
session.query(User).filter(
    session.query(Order)
    .filter(Order.user_id == User.id)
    .exists()
)

# 使用索引提示(SQL Server示例)
session.query(User).with_hint(User, 'WITH (INDEX(ix_user_name))')

5.2 批量操作与流式处理

# 流式处理大量数据
def stream_users(batch_size=1000):
    start_id = 0
    while True:
        users = session.query(User)\
            .filter(User.id > start_id)\
            .order_by(User.id)\
            .limit(batch_size)\
            .all()
        
        if not users:
            break
            
        yield users
        start_id = users[-1].id

for user_batch in stream_users():
    process_users(user_batch)

# 使用服务器端游标(PostgreSQL)
conn = psycopg2.connect("...")
cursor = conn.cursor('server_side_cursor')
cursor.execute("SELECT * FROM large_table")
while True:
    rows = cursor.fetchmany(1000)
    if not rows:
        break
    process_rows(rows)

6. 数据库监控与分析

6.1 SQL日志与分析

# SQLAlchemy日志配置
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

# 使用EXPLAIN分析查询
explain_stmt = text("EXPLAIN ANALYZE SELECT * FROM users WHERE age > 18")
result = session.execute(explain_stmt)
print(result.fetchall())

# Django查询分析
from django.db import connection
User.objects.filter(age__gt=18)
print(connection.queries)

6.2 性能监控

# 使用SQLAlchemy事件监控
from sqlalchemy import event

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    context._query_start_time = time.time()

@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    duration = time.time() - context._query_start_time
    if duration > 0.5:  # 记录慢查询
        print(f"Slow query ({duration:.2f}s): {statement}")

7. 多数据库与分片

7.1 多数据库路由(Django示例)

# settings.py
DATABASE_ROUTERS = ['path.to.MyRouter']
DATABASES = {
    'default': {...},
    'replica': {...},
    'analytics': {...}
}

# routers.py
class MyRouter:
    def db_for_read(self, model, **hints):
        return 'replica' if random.random() < 0.8 else 'default'
    
    def db_for_write(self, model, **hints):
        return 'default'
    
    def allow_relation(self, obj1, obj2, **hints):
        return True

7.2 数据库分片

# 简单分片策略
def get_shard(user_id):
    return f'shard_{user_id % 4}'  # 假设有4个分片

class UserSharded:
    @classmethod
    def get_session(cls, user_id=None):
        shard = get_shard(user_id) if user_id else 'default'
        return sessions[shard]
    
    @classmethod
    def query(cls, user_id):
        return cls.get_session(user_id).query(cls)

# 使用
user = UserSharded.query(123).filter_by(id=123).first()

8. 安全最佳实践

8.1 参数化查询与防注入

# 不安全的方式
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# 安全的方式 - 使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))

# SQLAlchemy安全方式
session.query(User).filter(User.name == user_input)

# 使用ORM的in_操作也要注意安全
# 不安全
session.query(User).filter(User.id.in_(user_input.split(',')))

# 安全方式
ids = [int(id) for id in user_input.split(',') if id.isdigit()]
session.query(User).filter(User.id.in_(ids))

8.2 敏感数据加密

# 使用SQLAlchemy混合属性加密字段
from sqlalchemy.ext.hybrid import hybrid_property
from cryptography.fernet import Fernet

key = Fernet.generate_key()
cipher_suite = Fernet(key)

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    _email = Column('email', String)
    
    @hybrid_property
    def email(self):
        return cipher_suite.decrypt(self._email.encode()).decode()
    
    @email.setter
    def email(self, value):
        self._email = cipher_suite.encrypt(value.encode()).decode()

这些高级技巧可以帮助你构建更高效、更安全、更可靠的Python数据库应用程序。Python的数据库生态系统非常丰富,可以根据项目需求选择合适的数据库和工具库。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐