Skip to main content

连接 MySQL

安装数据库驱动

Python 安装包中没有集成 MySQL 数据库驱动,而 ORM 框架 SQLAlchemy 依赖于数据库驱动,所以第一步需要安装 MySQL 驱动。在 Python 3.x 中推荐使用 PyMySQL,其遵循 Python 数据库 API v2.0 规范,并完全使用 Python 代码实现。

pip install pymysql

示例目录结构如下:

.
├── sql_app
│ ├── __init__.py
│ ├── database.py
│ ├── models.py
│ ├── schemas.py
│ ├── crud.py
│ ├── routers.py
│ └── sql_app.db
└── main.py

database.py 连接数据库

__init__.py
import pymysql               # 导入数据库驱动
pymysql.install_as_MySQLdb() # 将数据库驱动注册为 MySQLdb 模式
database.py
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 第二步,创建数据连接引擎
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}?charset=utf8mb4')

# 第三步,创建本地会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 第四步,创建数据模型基础类
Base = declarative_base()

models.py 创建 SQLAlchemy 数据库模型

models.py
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
# 第二步,从database模块中导入基类Base
from .database import Base

# 声明User模型,继承自Base
class User(Base):
# 指定数据库中的表名
__tablename__ = "user"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
email = Column(String(50), unique=True, index=True)
hashed_password = Column(String(50))
is_active = Column(Boolean, default=True)
# 定义一对多关系
books = relationship("Book", back_populates="owner")

# 声明Book模型,继承自Base类
class Book(Base):
# 指定数据库中对应的表名
__tablename__ = "book"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
title = Column(String(50), index=True)
description = Column(String(200), index=True)
owner_id = Column(Integer, ForeignKey("user.id"))
# 定义关联
owner = relationship("User", back_populates="books")

schemas.py 创建 Pydantic 数据格式模型

schemas.py
# 第一步,导入相关的模块
from typing import List, Optional
from pydantic import BaseModel

# 第二步,声明BookBase模型,从BaseModel继承
class BookBase(BaseModel):
# 第三步,声明模型的属性
title: str
description: Optional[str] = None

# 第四步,声明ItemCreate模型,从ItemBase继承
class BookCreate(BookBase):
pass
# 第五步,声明Item模型,从ItemBase继承
class Book(BookBase):
id: int
owner_id: int
# 第六步,配置项中起用orm模式
class Config:
orm_mode = True

# 第七步,同样的方式声明一组User模型
class UserBase(BaseModel):
email: str

class UserCreate(UserBase):
password: str

class User(UserBase):
id: int
is_active: bool
books: List[Book] = []
# 配置项中起用orm模式
class Config:
orm_mode = True

crud.py 数据操作

crud.py
# 第一步,导入会话组件
from sqlalchemy.orm import Session
# 第二步,导入前两步定义的models和schemas模块
from . import models, schemas

# 第三步,读取数据的函数
# 读取单个用户
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()

# 通过email读取单个用户
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()

# 读取带分页的用户列表
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()

# 读取图书列表
def get_books(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Book).offset(skip).limit(limit).all()

# 第四步,创建数据的函数
# 创建一个用户
def create_user(db: Session, user: schemas.UserCreate):
# 模拟生成密码的过程,并没有真正生成加密值
fake_hashed_password = user.password + "notreallyhashed"
# 第一步,根据数据创建数据库模型的实例
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
# 第二步,将实例添加到会话
db.add(db_user)
# 第三步,提交会话
db.commit()
# 第四步,刷新实例,用于获取数据或者生成数据库中的ID
db.refresh(db_user)
return db_user

# 创建用户相关的一本图书
def create_user_book(db: Session, book: schemas.BookCreate, user_id: int):
db_row = models.Book(**book.dict(), owner_id=user_id)
db.add(db_row)
db.commit()
db.refresh(db_row)
return db_row

# 更新图书的标题
def update_book_title(db: Session, book: schemas.Book):
db.query(models.Book).filter(models.Book.id==book.id).update({"title": book.title})
db.commit()
return 1

# 删除图书
def delete_book(db: Session, book: schemas.Book):
res = db.query(models.Book).filter(models.Book.id==book.id).delete()
print(res)
db.commit()
return 1

routers.py FastAPI 请求函数

routers.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
# 导入本程序的模块
from sql_app import crud, models, schemas
from sql_app.database import SessionLocal, engine
# 生成数据库中的表
models.Base.metadata.create_all(bind=engine)

router = APIRouter()

# 创建依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

# 定义路径操作函数,并注册路由路径 创建用户
@router.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)

# 定义路径操作函数,并注册路由路径 获取用户列表
@router.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users

# 定义路径操作函数,并注册路由路径 获取用户信息
@router.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

# 定义路径操作函数,并注册路由路径 创建用户相关的项目
@router.post("/users/{user_id}/books/", response_model=schemas.Book)
def create_book_for_user(
user_id: int, book: schemas.BookCreate, db: Session = Depends(get_db)
):
return crud.create_user_book(db=db, book=book, user_id=user_id)

# 定义路径操作函数,并注册路由路径 获取项目列表
@router.get("/books/", response_model=List[schemas.Book])
def read_books(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
books = crud.get_books(db, skip=skip, limit=limit)
return books

# 定义路径操作函数,并注册路由路径 修改图书标题
@router.put("/books/")
def update_book_title(book: schemas.Book, db: Session = Depends(get_db)):
return crud.update_book_title(db, book)

# 定义路径操作函数,并注册路由路径 删除图书
@router.delete("/books/")
def delete_book(book: schemas.Book, db: Session = Depends(get_db)):
return crud.delete_book(db, book)

main.py

main.py
from fastapi import FastAPI
import uvicorn
import routers

app = FastAPI()

app.include_router(routers.router)

if __name__ == '__main__':
uvicorn.run(app=app)