路由

路由时url地址和处理函数之间的映射关系

@app.get("/hello")
async def read_hello():
    return {"msg": 'hello world'}

路径参数

指向唯一的,特定的资源 。 

方法:get

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

参数限制 Path(from fastapi import FastAPI,Path)

Path 的一些参数

description 对参数的描述

lt 参数需要小于的值

gt 参数需要大于的值

max_length 字符串最长长度

min_length 字符串最短长度

@app.get("/items/{item_id}")
async def read_item(item_id: int = Path(...,gt = 0,lt = 101,description='范围为1到100')):
    return {"item_id": item_id}
@app.get("/items/{name}")
async def read_item(name: str = Path(...,max_length=100,min_length=10)):
    return {"name": f'这是{name}'}

查询参数

查询参数就是附加在网址末尾的条件指令,用来精确指挥服务器帮你实现数据的搜索、过滤和翻页。

@app.get("/news/news_list")
async def get_news_list(skip: int = 0,limit:int = 10):
    return {"skip": skip, "limit": limit}

对于限制为 Query 其参数与Path 一样

@app.get("/news/news_list")
async def get_news_list(skip: int = Query(0, description='跳过的记录记录数'),limit:int = 10):
    return {"skip": skip, "limit": limit}

请求体参数(from pydantic import BaseModel)

方法 Post

请求体就是藏在网络请求内部的“数据包裹”,专门用来安全地向服务器提交大量、复杂或私密的信息(如账号密码、长文本和文件)

通过基础BaseModel来定义想要的值

class User(BaseModel):
    username: str
    password: str
@app.post("/register")
async def register_user(user: User):
    return {"username": user.username, "password": user.password}

限制方法 Field

from fastapi import FastAPI,Path,Request,Query
from pydantic import BaseModel,Field
class User(BaseModel):
    username: str = Field(default= '张三',min_length=2,max_length=10,description='你的名字')
    password: str = Field(min_length= 3,max_length= 12)
@app.post("/register")
async def register_user(user: User):
    return {"username": user.username, "password": user.password}

响应类型

FastAPI 默认返回json格式

指定响应类

from fastapi.responses import HTMLResponse
@app.get("/html",response_class=HTMLResponse)
def read_html():
    return '<h1>这是1级标题<h1>'
from fastapi.responses import FileResponse
@app.get("/files")
async def get_files():
    path = './files/test.txt'
    return FileResponse(path = path)

自定义响应数据格式

使用继承BaseModel进行包装

class News(BaseModel):
    id :int
    name: str
    content: str
@app.post("/news/{id}",response_model=News)
async def create_news(id:int):
    return {
        "id":id,
        'name':f"这是第{id}本书",
        'content':f"这是一本好书"
    }

中间件

作用:为每个请求添加统一的处理逻辑

多个中间件执行逻辑为自下而上

async def middleware(request: Request,call_next):
    print("中间件开始1")
    response = await call_next(request)
    print("中间件结束1")
    return response
@app.middleware("http")
async def middleware2(request: Request,call_next):
    print("中间件开始2")
    response = await call_next(request)
    print("中间件结束2")
    return response
@app.middleware("http")
async def middleware3(request: Request,call_next):
    print("中间件开始3")
    response = await call_next(request)
    print("中间件结束3")
    return response

依赖注入

抽取可复用的组件,实现代码复用,解耦且可轻松替换依赖项进行测试

from fastapi import Depends
async def common_parameters(
        skip: int = Query(0, ge=0),
        limit: int = Query(10, ge=60),
):
    return {"skip": skip, "limit": limit}
@app.get("/news/news_list")
async def get_news_list(commons = Depends(common_parameters)):
    return commons

ORM

定义模型类

基类 继承DeclarativeBase

创建数据库表

从链接池获取异步链接,开启事务,执行ORM操作

FastAPI 应用是开始建表

import datetime
from fastapi import FastAPI,Path,Request,Query
from pydantic import BaseModel,Field
from starlette.responses import HTMLResponse
#
from sqlalchemy.ext.asyncio import create_async_engine
ASYNC_DATABASE_URL = "postgresql+asyncpg://john:jxp317777@localhost/Fast"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo = True,
    pool_size = 10,
    max_overflow = 5,
)
# 定义模型类
from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column
from sqlalchemy import DateTime,func,String,Float
from datetime import datetime
class Base(DeclarativeBase):
    create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),comment='创建时间')
    update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),onupdate=func.now(),comment="更新时间")
class Book(Base):
    __tablename__ = "book"
    id:Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
    bookname: Mapped[str] = mapped_column(String(255),comment="书名")
    author: Mapped[str] = mapped_column(String(255),comment="作者")
    price: Mapped[float] = mapped_column(Float,comment="价格")
    publisher:Mapped[str] = mapped_column(String(255),comment='出版社')
# 建表
from contextlib import asynccontextmanager
async def create_table():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await create_table()
    app.state.engine = async_engine
    yield
    await async_engine.dispose()
#  初始化 FastAPI 应用
app = FastAPI(
    title="我的简易 API",
    description="这是一个非常简单的 FastAPI 基础框架",
    version="1.0.0",
    lifespan=lifespan
)

路由注入会话

#     创造会话工厂
AsyncSessionLocal = async_sessionmaker(
    bind = async_engine,#绑定数据库引擎
    class_ = AsyncSession,#指定会话类
    expire_on_commit = False,#提交后会话不过期,不会重新查询数据库
)
async def get_database():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception as e:
            await session.rollback()
            raise e
        finally:
            await session.close()
#  初始化 FastAPI 应用
app = FastAPI(
    title="我的简易 API",
    description="这是一个非常简单的 FastAPI 基础框架",
    version="1.0.0",
    lifespan=lifespan
)
@app.get("/book/books")
async def get_books_list(db: AsyncSession = Depends(get_database)):
     result = await db.execute(select(Book))
     book = result.scalars().all()
     return book

ORM的数据库操作

查询

async def get_books_list(db: AsyncSession = Depends(get_database)):
     # result = await db.execute(select(Book))
     # book = result.scalars().all()
     book = await db.get(Book,5)
     return book

条件查询

async def get_books_list(book_id:int,db: AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.id == book_id))
    book = result.scalars().all()
    return book
@app.get("/book/search_book")
async def search_book(db: AsyncSession = Depends(get_database)):
    # result = await db.execute(select(Book).where(Book.author.like("曹%")))#%可以代表1个或多个字符,_代表一个字符
    # result = await db.execute(select(Book).where(Book.author.like("曹%") & Book.price > 100))
    id_list=[1,2,4,6]
    result = await db.execute(select(Book).where(Book.id.in_(id_list)))
    book = result.scalars().all()
    return book

聚合查询

聚合计算:func.方法(模型类.属性)

async def get_book_count(db: AsyncSession = Depends(get_database)):
    result =await db.execute(select(Book.count(Book.id)))
    num = result.scalars()
    return num

分页查询(这里指连续页面中每个页面需要的数据)

async def get_book_list(
        page:int = 1,
        page_size:int = 3,
        db: AsyncSession = Depends(get_database)
):
    skip = (page-1)*page_size
    stmt = select(Book).offset(skip).limit(page_size)
    result = await db.execute(stmt)
    books = result.scalars().all()
    return books

增加数据

步骤:定义ORM对象->添加对象到事物:add(对象)-> commit提交到数据库

@app.post("/book/add_book")
async def add_book(book:BookBase,db: AsyncSession = Depends(get_database)):
    book_obj = Book(**book.__dict__)
    db.add(book_obj)
    await db.commit()
    return book_obj

修改数据

步骤:查询get->属性重新赋值->commit提交到数据库

@app.put("/book/update_book/{book_id}")
async def update_book(book_id:int,data:Bookdatabase,db: AsyncSession = Depends(get_database)):
    db_book = await db.get(Book,book_id)
    if db_book is None:
        raise HTTPException(status_code=404,detail="Book not found")
    else:
        db_book.bookname = data.bookname
        db_book.author = data.author
        db_book.price = data.price
        db_book.publisher = data.publisher
        await db.commit()
    return db_book

删除数据

步骤:查询get->delete(查到的书)->commit提交到数据库

@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id:int,db: AsyncSession = Depends(get_database)):
    db_book = await db.get(Book,book_id)
    if db_book is None:
        raise HTTPException(status_code=404,detail="Book not found")
    else:
        await db.delete(db_book)
        await db.commit()
    return {"message":"Book deleted"}

Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐