连接 MySQL
安装数据库驱动
Python 安装包中没有集成 MySQL 数据库驱动,而 ORM 框架 SQLAlchemy 依赖于数据库驱动,所以第一步需要安装 MySQL 驱动。在 Python 3.x 中推荐使用 PyMySQL,其遵循 Python 数据库 API v2.0 规范,并完全使用 Python 代码实现。
pip install pymysql
示例目录结构如下:
.
├── sql_app
│ ├── __init__.py
│ ├── database.py
│ ├── models.py
│ ├── schemas.py
│ ├── crud.py
│ ├── routers.py
│ └── sql_app.db
└── main.py
database.py
连接数据库
__init__.py
import pymysql # 导入数据库驱动
pymysql.install_as_MySQLdb() # 将数据库驱动注册为 MySQLdb 模式
database.py
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 第二步,创建数据连接引擎
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}?charset=utf8mb4')
# 第三步,创建本地会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 第四步,创建数据模型基础类
Base = declarative_base()
models.py
创建 SQLAlchemy 数据库模型
models.py
# 第一步,导入SQLAlchemy组件包
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
# 第二步,从database模块中导入基类Base
from .database import Base
# 声明User模型,继承自Base
class User(Base):
# 指定数据库中的表名
__tablename__ = "user"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
email = Column(String(50), unique=True, index=True)
hashed_password = Column(String(50))
is_active = Column(Boolean, default=True)
# 定义一对多关系
books = relationship("Book", back_populates="owner")
# 声明Book模型,继承自Base类
class Book(Base):
# 指定数据库中对应的表名
__tablename__ = "book"
# 定义类的属性,对应表中的字段
id = Column(Integer, primary_key=True, index=True)
title = Column(String(50), index=True)
description = Column(String(200), index=True)
owner_id = Column(Integer, ForeignKey("user.id"))
# 定义关联
owner = relationship("User", back_populates="books")
schemas.py
创建 Pydantic 数据格式模型
schemas.py
# 第一步,导入相关的模块
from typing import List, Optional
from pydantic import BaseModel
# 第二步,声明BookBase模型,从BaseModel继承
class BookBase(BaseModel):
# 第三步,声明模型的属性
title: str
description: Optional[str] = None
# 第四步,声明ItemCreate模型,从ItemBase继承
class BookCreate(BookBase):
pass
# 第五步,声明Item模型,从ItemBase继承
class Book(BookBase):
id: int
owner_id: int
# 第六步,配置项中起用orm模式
class Config:
orm_mode = True
# 第七步,同样的方式声明一组User模型
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
books: List[Book] = []
# 配置项中起用orm模式
class Config:
orm_mode = True
crud.py
数据操作
crud.py
# 第一步,导入会话组件
from sqlalchemy.orm import Session
# 第二步,导入前两步定义的models和schemas模块
from . import models, schemas
# 第三步,读取数据的函数
# 读取单个用户
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
# 通过email读取单个用户
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
# 读取带分页的用户列表
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
# 读取图书列表
def get_books(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Book).offset(skip).limit(limit).all()
# 第四步,创建数据的函数
# 创建一个用户
def create_user(db: Session, user: schemas.UserCreate):
# 模拟生成密码的过程,并没有真正生成加密值
fake_hashed_password = user.password + "notreallyhashed"
# 第一步,根据数据创建数据库模型的实例
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
# 第二步,将实例添加到会话
db.add(db_user)
# 第三步,提交会话
db.commit()
# 第四步,刷新实例,用于获取数据或者生成数据库中的ID
db.refresh(db_user)
return db_user
# 创建用户相关的一本图书
def create_user_book(db: Session, book: schemas.BookCreate, user_id: int):
db_row = models.Book(**book.dict(), owner_id=user_id)
db.add(db_row)
db.commit()
db.refresh(db_row)
return db_row
# 更新图书的标题
def update_book_title(db: Session, book: schemas.Book):
db.query(models.Book).filter(models.Book.id==book.id).update({"title": book.title})
db.commit()
return 1
# 删除图书
def delete_book(db: Session, book: schemas.Book):
res = db.query(models.Book).filter(models.Book.id==book.id).delete()
print(res)
db.commit()
return 1
routers.py
FastAPI 请求函数
routers.py
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
# 导入本程序的模块
from sql_app import crud, models, schemas
from sql_app.database import SessionLocal, engine
# 生成数据库中的表
models.Base.metadata.create_all(bind=engine)
router = APIRouter()
# 创建依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 定义路径操作函数,并注册路由路径 创建用户
@router.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
# 定义路径操作函数,并注册路由路径 获取用户列表
@router.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
# 定义路径操作函数,并注册路由路径 获取用户信息
@router.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
# 定义路径操作函数,并注册路由路径 创建用户相关的项目
@router.post("/users/{user_id}/books/", response_model=schemas.Book)
def create_book_for_user(
user_id: int, book: schemas.BookCreate, db: Session = Depends(get_db)
):
return crud.create_user_book(db=db, book=book, user_id=user_id)
# 定义路径操作函数,并注册路由路径 获取项目列表
@router.get("/books/", response_model=List[schemas.Book])
def read_books(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
books = crud.get_books(db, skip=skip, limit=limit)
return books
# 定义路径操作函数,并注册路由路径 修改图书标题
@router.put("/books/")
def update_book_title(book: schemas.Book, db: Session = Depends(get_db)):
return crud.update_book_title(db, book)
# 定义路径操作函数,并注册路由路径 删除图书
@router.delete("/books/")
def delete_book(book: schemas.Book, db: Session = Depends(get_db)):
return crud.delete_book(db, book)
main.py
main.py
from fastapi import FastAPI
import uvicorn
import routers
app = FastAPI()
app.include_router(routers.router)
if __name__ == '__main__':
uvicorn.run(app=app)