python编程:数据库编程
Python数据库编程涉及使用Python语言与各种数据库系统进行交互,执行如创建、读取、更新和删除(CRUD)等操作。Python支持多种数据库,包括各种主流数据库系统:关系型数据库(如MySQL、PostgreSQL、SQLite)和非关系型数据库(如MongoDB、Redis)
·
Python数据库编程
Python数据库编程涉及使用Python语言与各种数据库系统进行交互,执行如创建、读取、更新和删除(CRUD)等操作。Python支持多种数据库,包括各种主流数据库系统:关系型数据库(如MySQL、PostgreSQL、SQLite)和非关系型数据库(如MongoDB、Redis)
1. 常用数据库连接方式
1.1 SQLite (内置数据库)
import sqlite3
# 连接数据库(不存在则创建)
conn = sqlite3.connect('example.db')
# 创建游标
cursor = conn.cursor()
# 执行SQL语句
cursor.execute('''CREATE TABLE IF NOT EXISTS stocks
(date text, trans text, symbol text, qty real, price real)''')
# 插入数据
cursor.execute("INSERT INTO stocks VALUES ('2023-01-01','BUY','RHAT',100,35.14)")
# 提交事务
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM stocks")
print(cursor.fetchall())
# 关闭连接
conn.close()
1.2 MySQL
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="username",
password="password",
database="mydatabase"
)
cursor = conn.cursor()
# 执行SQL操作...
cursor.execute("SELECT * FROM customers")
result = cursor.fetchall()
conn.close()
1.3 PostgreSQL
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="mydatabase",
user="username",
password="password"
)
cursor = conn.cursor()
# 执行操作...
conn.close()
1.4 SQL Server
import pyodbc
conn = pyodbc.connect(
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=localhost;'
'DATABASE=mydatabase;'
'UID=username;'
'PWD=password'
)
cursor = conn.cursor()
# 执行操作...
conn.close()
2. ORM (对象关系映射)
2.1 SQLAlchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# 连接数据库
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 添加数据
new_user = User(name='Alice', age=25)
session.add(new_user)
session.commit()
# 查询数据
users = session.query(User).filter_by(name='Alice').all()
for user in users:
print(user.id, user.name, user.age)
session.close()
2.2 Django ORM
# Django models.py示例
from django.db import models
class Book(models.Model):
title = models.CharField(max_length=100)
author = models.CharField(max_length=100)
published_date = models.DateField()
# 使用示例
book = Book(title='Python编程', author='John Doe', published_date='2023-01-01')
book.save()
# 查询
books = Book.objects.filter(author='John Doe')
3. 数据库工具包
3.1 SQLite3 (内置)
如前面示例所示,Python内置了SQLite支持。
3.2 PyMySQL (MySQL)
import pymysql
conn = pymysql.connect(
host='localhost',
user='username',
password='password',
database='mydatabase'
)
cursor = conn.cursor()
# 执行操作...
conn.close()
3.3 psycopg2 (PostgreSQL)
如前面PostgreSQL示例所示。
4. NoSQL数据库
4.1 MongoDB (PyMongo)
from pymongo import MongoClient
# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
# 选择数据库
db = client['mydatabase']
# 选择集合(类似于表)
collection = db['customers']
# 插入文档
customer = {"name": "John", "address": "Highway 37"}
collection.insert_one(customer)
# 查询
for x in collection.find():
print(x)
4.2 Redis (redis-py)
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 设置键值
r.set('foo', 'bar')
# 获取值
value = r.get('foo')
print(value)
5. 数据库最佳实践
-
使用连接池:对于频繁的数据库操作,使用连接池提高性能
-
参数化查询:防止SQL注入
-
异常处理:妥善处理数据库操作中的异常
-
上下文管理器:使用
with语句自动管理连接 -
ORM选择:根据项目规模选择合适的ORM工具
使用上下文管理器示例
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM stocks")
print(cursor.fetchall())
# 不需要显式调用conn.close()
参数化查询示例
# 错误方式(易受SQL注入攻击)
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# 正确方式(参数化查询)
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
Python数据库高阶编程
掌握了基础数据库操作后,下面介绍一些Python数据库编程的高级技巧和实践。
1. 高级连接管理
1.1 连接池技术
# 使用SQLAlchemy连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:password@localhost/mydb',
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600
)
# 使用psycopg2连接池
from psycopg2 import pool
connection_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host='localhost',
database='mydb',
user='user',
password='password'
)
# 从池中获取连接
conn = connection_pool.getconn()
# 使用连接...
# 归还连接到池
connection_pool.putconn(conn)
1.2 异步数据库访问
# 使用aiomysql异步MySQL客户端
import asyncio
import aiomysql
async def execute_query():
conn = await aiomysql.connect(
host='localhost',
port=3306,
user='user',
password='password',
db='mydb'
)
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users")
result = await cursor.fetchall()
print(result)
conn.close()
asyncio.run(execute_query())
# 使用asyncpg异步PostgreSQL客户端
import asyncpg
async def run():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
values = await conn.fetch('SELECT * FROM users')
print(values)
await conn.close()
asyncio.run(run())
2. 高级ORM技巧
2.1 SQLAlchemy高级查询
from sqlalchemy import func, case, and_, or_
# 聚合函数
session.query(func.count(User.id)).scalar()
# 条件表达式
session.query(
User.name,
case(
[(User.age < 18, '未成年'),
(User.age >= 18, '成年')],
else_='未知'
).label('age_group')
)
# 复杂条件查询
session.query(User).filter(
or_(
User.name.like('A%'),
and_(
User.age >= 18,
User.age <= 30
)
)
)
# 子查询
subq = session.query(
func.count(Order.id).label('order_count')
).filter(
Order.user_id == User.id
).correlate(User).as_scalar()
session.query(User, subq)
2.2 批量操作优化
# 批量插入 - 原生SQL方式
data = [{'name': f'user{i}', 'age': i} for i in range(1000)]
conn.execute(
"INSERT INTO users (name, age) VALUES (:name, :age)",
data
)
# SQLAlchemy批量插入
session.bulk_insert_mappings(User, data)
# 批量更新
session.bulk_update_mappings(User, [
{'id': 1, 'name': 'new name1'},
{'id': 2, 'name': 'new name2'}
])
# 使用executemany
cursor.executemany(
"INSERT INTO users (name, age) VALUES (?, ?)",
[('user1', 20), ('user2', 25)]
)
3. 数据库迁移与版本控制
3.1 Alembic (SQLAlchemy迁移工具)
# 初始化迁移环境
# alembic init migrations
# 配置alembic.ini
# sqlalchemy.url = driver://user:pass@localhost/dbname
# 生成迁移脚本
# alembic revision --autogenerate -m "create user table"
# 应用迁移
# alembic upgrade head
# 回滚迁移
# alembic downgrade -1
3.2 Django迁移
# 创建迁移文件
# python manage.py makemigrations
# 应用迁移
# python manage.py migrate
# 查看迁移SQL
# python manage.py sqlmigrate app_name migration_number
4. 高级事务管理
4.1 嵌套事务与保存点
# SQLAlchemy保存点
with session.begin():
session.add(User(name='user1'))
# 创建保存点
savepoint = session.begin_nested()
try:
session.add(User(name='user2'))
savepoint.commit()
except:
savepoint.rollback()
raise
# 使用原生SQL保存点(SQLite示例)
conn.execute("SAVEPOINT my_savepoint")
try:
conn.execute("INSERT INTO users (name) VALUES ('test')")
conn.execute("RELEASE SAVEPOINT my_savepoint")
except:
conn.execute("ROLLBACK TO SAVEPOINT my_savepoint")
4.2 分布式事务
# 使用两阶段提交(XA事务)
from sqlalchemy import create_engine
from sqlalchemy.sql import text
# 配置多个数据库引擎
engine1 = create_engine('postgresql://user:pwd@host1/db1')
engine2 = create_engine('mysql://user:pwd@host2/db2')
# 获取连接
conn1 = engine1.connect()
conn2 = engine2.connect()
# 开始分布式事务
try:
trans1 = conn1.begin()
trans2 = conn2.begin()
# 执行跨库操作
conn1.execute(text("UPDATE accounts SET balance = balance - 100 WHERE id = 1"))
conn2.execute(text("UPDATE accounts SET balance = balance + 100 WHERE id = 2"))
# 提交
trans1.commit()
trans2.commit()
except:
trans1.rollback()
trans2.rollback()
raise
finally:
conn1.close()
conn2.close()
5. 性能优化技巧
5.1 查询优化
# 使用JOIN优化
session.query(User).join(Order).filter(Order.amount > 100)
# 使用子查询优化
subq = session.query(Order.user_id).filter(Order.amount > 100).subquery()
session.query(User).filter(User.id.in_(subq))
# 使用EXISTS
session.query(User).filter(
session.query(Order)
.filter(Order.user_id == User.id)
.exists()
)
# 使用索引提示(SQL Server示例)
session.query(User).with_hint(User, 'WITH (INDEX(ix_user_name))')
5.2 批量操作与流式处理
# 流式处理大量数据
def stream_users(batch_size=1000):
start_id = 0
while True:
users = session.query(User)\
.filter(User.id > start_id)\
.order_by(User.id)\
.limit(batch_size)\
.all()
if not users:
break
yield users
start_id = users[-1].id
for user_batch in stream_users():
process_users(user_batch)
# 使用服务器端游标(PostgreSQL)
conn = psycopg2.connect("...")
cursor = conn.cursor('server_side_cursor')
cursor.execute("SELECT * FROM large_table")
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
process_rows(rows)
6. 数据库监控与分析
6.1 SQL日志与分析
# SQLAlchemy日志配置
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# 使用EXPLAIN分析查询
explain_stmt = text("EXPLAIN ANALYZE SELECT * FROM users WHERE age > 18")
result = session.execute(explain_stmt)
print(result.fetchall())
# Django查询分析
from django.db import connection
User.objects.filter(age__gt=18)
print(connection.queries)
6.2 性能监控
# 使用SQLAlchemy事件监控
from sqlalchemy import event
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
if duration > 0.5: # 记录慢查询
print(f"Slow query ({duration:.2f}s): {statement}")
7. 多数据库与分片
7.1 多数据库路由(Django示例)
# settings.py
DATABASE_ROUTERS = ['path.to.MyRouter']
DATABASES = {
'default': {...},
'replica': {...},
'analytics': {...}
}
# routers.py
class MyRouter:
def db_for_read(self, model, **hints):
return 'replica' if random.random() < 0.8 else 'default'
def db_for_write(self, model, **hints):
return 'default'
def allow_relation(self, obj1, obj2, **hints):
return True
7.2 数据库分片
# 简单分片策略
def get_shard(user_id):
return f'shard_{user_id % 4}' # 假设有4个分片
class UserSharded:
@classmethod
def get_session(cls, user_id=None):
shard = get_shard(user_id) if user_id else 'default'
return sessions[shard]
@classmethod
def query(cls, user_id):
return cls.get_session(user_id).query(cls)
# 使用
user = UserSharded.query(123).filter_by(id=123).first()
8. 安全最佳实践
8.1 参数化查询与防注入
# 不安全的方式
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# 安全的方式 - 使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
# SQLAlchemy安全方式
session.query(User).filter(User.name == user_input)
# 使用ORM的in_操作也要注意安全
# 不安全
session.query(User).filter(User.id.in_(user_input.split(',')))
# 安全方式
ids = [int(id) for id in user_input.split(',') if id.isdigit()]
session.query(User).filter(User.id.in_(ids))
8.2 敏感数据加密
# 使用SQLAlchemy混合属性加密字段
from sqlalchemy.ext.hybrid import hybrid_property
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher_suite = Fernet(key)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
_email = Column('email', String)
@hybrid_property
def email(self):
return cipher_suite.decrypt(self._email.encode()).decode()
@email.setter
def email(self, value):
self._email = cipher_suite.encrypt(value.encode()).decode()
这些高级技巧可以帮助你构建更高效、更安全、更可靠的Python数据库应用程序。Python的数据库生态系统非常丰富,可以根据项目需求选择合适的数据库和工具库。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)