FastAPI 学习
请求体就是藏在网络请求内部的“数据包裹”,专门用来安全地向服务器提交大量、复杂或私密的信息(如账号密码、长文本和文件)查询参数就是附加在网址末尾的条件指令,用来精确指挥服务器帮你实现数据的搜索、过滤和翻页。步骤:定义ORM对象->添加对象到事物:add(对象)-> commit提交到数据库。步骤:查询get->delete(查到的书)->commit提交到数据库。抽取可复用的组件,实现代码复用,解
路由
路由时url地址和处理函数之间的映射关系
@app.get("/hello")
async def read_hello():
return {"msg": 'hello world'}
路径参数
指向唯一的,特定的资源 。
方法:get
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
参数限制 Path(from fastapi import FastAPI,Path)
Path 的一些参数
description 对参数的描述
lt 参数需要小于的值
gt 参数需要大于的值
max_length 字符串最长长度
min_length 字符串最短长度
@app.get("/items/{item_id}")
async def read_item(item_id: int = Path(...,gt = 0,lt = 101,description='范围为1到100')):
return {"item_id": item_id}
@app.get("/items/{name}")
async def read_item(name: str = Path(...,max_length=100,min_length=10)):
return {"name": f'这是{name}'}
查询参数
查询参数就是附加在网址末尾的条件指令,用来精确指挥服务器帮你实现数据的搜索、过滤和翻页。
@app.get("/news/news_list")
async def get_news_list(skip: int = 0,limit:int = 10):
return {"skip": skip, "limit": limit}
对于限制为 Query 其参数与Path 一样
@app.get("/news/news_list")
async def get_news_list(skip: int = Query(0, description='跳过的记录记录数'),limit:int = 10):
return {"skip": skip, "limit": limit}
请求体参数(from pydantic import BaseModel)
方法 Post
请求体就是藏在网络请求内部的“数据包裹”,专门用来安全地向服务器提交大量、复杂或私密的信息(如账号密码、长文本和文件)
通过基础BaseModel来定义想要的值
class User(BaseModel):
username: str
password: str
@app.post("/register")
async def register_user(user: User):
return {"username": user.username, "password": user.password}
限制方法 Field
from fastapi import FastAPI,Path,Request,Query
from pydantic import BaseModel,Field
class User(BaseModel):
username: str = Field(default= '张三',min_length=2,max_length=10,description='你的名字')
password: str = Field(min_length= 3,max_length= 12)
@app.post("/register")
async def register_user(user: User):
return {"username": user.username, "password": user.password}
响应类型
FastAPI 默认返回json格式
指定响应类
from fastapi.responses import HTMLResponse
@app.get("/html",response_class=HTMLResponse)
def read_html():
return '<h1>这是1级标题<h1>'
from fastapi.responses import FileResponse
@app.get("/files")
async def get_files():
path = './files/test.txt'
return FileResponse(path = path)
自定义响应数据格式
使用继承BaseModel进行包装
class News(BaseModel):
id :int
name: str
content: str
@app.post("/news/{id}",response_model=News)
async def create_news(id:int):
return {
"id":id,
'name':f"这是第{id}本书",
'content':f"这是一本好书"
}
中间件
作用:为每个请求添加统一的处理逻辑
多个中间件执行逻辑为自下而上
async def middleware(request: Request,call_next):
print("中间件开始1")
response = await call_next(request)
print("中间件结束1")
return response
@app.middleware("http")
async def middleware2(request: Request,call_next):
print("中间件开始2")
response = await call_next(request)
print("中间件结束2")
return response
@app.middleware("http")
async def middleware3(request: Request,call_next):
print("中间件开始3")
response = await call_next(request)
print("中间件结束3")
return response
依赖注入
抽取可复用的组件,实现代码复用,解耦且可轻松替换依赖项进行测试
from fastapi import Depends
async def common_parameters(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=60),
):
return {"skip": skip, "limit": limit}
@app.get("/news/news_list")
async def get_news_list(commons = Depends(common_parameters)):
return commons
ORM
定义模型类
基类 继承DeclarativeBase
创建数据库表
从链接池获取异步链接,开启事务,执行ORM操作
FastAPI 应用是开始建表
import datetime
from fastapi import FastAPI,Path,Request,Query
from pydantic import BaseModel,Field
from starlette.responses import HTMLResponse
#
from sqlalchemy.ext.asyncio import create_async_engine
ASYNC_DATABASE_URL = "postgresql+asyncpg://john:jxp317777@localhost/Fast"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo = True,
pool_size = 10,
max_overflow = 5,
)
# 定义模型类
from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column
from sqlalchemy import DateTime,func,String,Float
from datetime import datetime
class Base(DeclarativeBase):
create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),comment='创建时间')
update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now(),onupdate=func.now(),comment="更新时间")
class Book(Base):
__tablename__ = "book"
id:Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255),comment="书名")
author: Mapped[str] = mapped_column(String(255),comment="作者")
price: Mapped[float] = mapped_column(Float,comment="价格")
publisher:Mapped[str] = mapped_column(String(255),comment='出版社')
# 建表
from contextlib import asynccontextmanager
async def create_table():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@asynccontextmanager
async def lifespan(app: FastAPI):
await create_table()
app.state.engine = async_engine
yield
await async_engine.dispose()
# 初始化 FastAPI 应用
app = FastAPI(
title="我的简易 API",
description="这是一个非常简单的 FastAPI 基础框架",
version="1.0.0",
lifespan=lifespan
)
路由注入会话
# 创造会话工厂
AsyncSessionLocal = async_sessionmaker(
bind = async_engine,#绑定数据库引擎
class_ = AsyncSession,#指定会话类
expire_on_commit = False,#提交后会话不过期,不会重新查询数据库
)
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
# 初始化 FastAPI 应用
app = FastAPI(
title="我的简易 API",
description="这是一个非常简单的 FastAPI 基础框架",
version="1.0.0",
lifespan=lifespan
)
@app.get("/book/books")
async def get_books_list(db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book))
book = result.scalars().all()
return book
ORM的数据库操作
查询
async def get_books_list(db: AsyncSession = Depends(get_database)):
# result = await db.execute(select(Book))
# book = result.scalars().all()
book = await db.get(Book,5)
return book
条件查询
async def get_books_list(book_id:int,db: AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.id == book_id))
book = result.scalars().all()
return book
@app.get("/book/search_book")
async def search_book(db: AsyncSession = Depends(get_database)):
# result = await db.execute(select(Book).where(Book.author.like("曹%")))#%可以代表1个或多个字符,_代表一个字符
# result = await db.execute(select(Book).where(Book.author.like("曹%") & Book.price > 100))
id_list=[1,2,4,6]
result = await db.execute(select(Book).where(Book.id.in_(id_list)))
book = result.scalars().all()
return book
聚合查询
聚合计算:func.方法(模型类.属性)
async def get_book_count(db: AsyncSession = Depends(get_database)):
result =await db.execute(select(Book.count(Book.id)))
num = result.scalars()
return num
分页查询(这里指连续页面中每个页面需要的数据)
async def get_book_list(
page:int = 1,
page_size:int = 3,
db: AsyncSession = Depends(get_database)
):
skip = (page-1)*page_size
stmt = select(Book).offset(skip).limit(page_size)
result = await db.execute(stmt)
books = result.scalars().all()
return books
增加数据
步骤:定义ORM对象->添加对象到事物:add(对象)-> commit提交到数据库
@app.post("/book/add_book")
async def add_book(book:BookBase,db: AsyncSession = Depends(get_database)):
book_obj = Book(**book.__dict__)
db.add(book_obj)
await db.commit()
return book_obj
修改数据
步骤:查询get->属性重新赋值->commit提交到数据库
@app.put("/book/update_book/{book_id}")
async def update_book(book_id:int,data:Bookdatabase,db: AsyncSession = Depends(get_database)):
db_book = await db.get(Book,book_id)
if db_book is None:
raise HTTPException(status_code=404,detail="Book not found")
else:
db_book.bookname = data.bookname
db_book.author = data.author
db_book.price = data.price
db_book.publisher = data.publisher
await db.commit()
return db_book
删除数据
步骤:查询get->delete(查到的书)->commit提交到数据库
@app.delete("/book/delete_book/{book_id}")
async def delete_book(book_id:int,db: AsyncSession = Depends(get_database)):
db_book = await db.get(Book,book_id)
if db_book is None:
raise HTTPException(status_code=404,detail="Book not found")
else:
await db.delete(db_book)
await db.commit()
return {"message":"Book deleted"}
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐



所有评论(0)