Files
2025-11-20 11:42:18 +08:00

123 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Query, Body, HTTPException
from uuid import UUID
from .schema import Create, Update, Out, OutList
from ..models import Food
from utils.decorators import handle_exceptions_unified
from utils.time_tool import parse_time
from utils.out_base import CommonOut
app = APIRouter()
# 创建食物
@app.post("", response_model=Out, description='创建食物', summary='创建食物')
@handle_exceptions_unified()
async def post(item: Create = Body(..., description='创建数据')):
"""
创建食物记录
"""
res = await Food.create(**item.model_dump())
if not res:
raise HTTPException(status_code=400, detail='创建失败')
return res
# 查询食物
@app.get("", response_model=OutList, description='获取食物', summary='获取食物')
@handle_exceptions_unified()
async def gets(
id: UUID | None = Query(None, description='主键ID'),
name: str | None = Query(None, description='食物名称'),
order_by: str | None = Query('create_time', description='排序字段',
regex='^(-)?(id|name|create_time|update_time)$'),
res_count: bool = Query(False, description='是否返回总数'),
create_time_start: str | int | None = Query(
None, description='创建时间开始 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
create_time_end: str | int | None = Query(
None, description='创建时间结束 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
update_time_start: str | int | None = Query(
None, description='更新时间开始 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
update_time_end: str | int | None = Query(
None, description='更新时间结束 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
page: int = Query(1, ge=1, description='页码'),
limit: int = Query(10, ge=1, le=1000, description='每页数量'),
):
"""
获取食物列表
"""
query = Food.all()
if id:
query = query.filter(id=id)
if name:
query = query.filter(name=name)
if create_time_start:
query = query.filter(create_time__gte=parse_time(create_time_start))
if create_time_end:
query = query.filter(create_time__lte=parse_time(
create_time_end, is_end=True))
if update_time_start:
query = query.filter(update_time__gte=parse_time(update_time_start))
if update_time_end:
query = query.filter(update_time__lte=parse_time(
update_time_end, is_end=True))
if order_by:
query = query.order_by(order_by)
if res_count:
count = await query.count()
else:
count = -1
offset = (page - 1) * limit # 计算偏移量
query = query.limit(limit).offset(offset) # 应用分页
res = await query
if not res:
raise HTTPException(status_code=404, detail='食物不存在')
num = len(res)
return OutList(count=count, num=num, items=res)
# 更新食物
@app.put("", response_model=Out, description='更新食物', summary='更新食物')
@handle_exceptions_unified()
async def put(id: UUID = Query(..., description='主键ID'),
item: Update = Body(..., description='更新数据'),
):
"""
部分更新食物,只更新传入的非空字段
"""
# 检查食物是否存在
secret = await Food.get_or_none(id=id)
if not secret:
raise HTTPException(status_code=404, detail='食物不存在')
# 获取要更新的字段排除None值的字段
update_data = item.model_dump(exclude_unset=True)
# 如果没有要更新的字段
if not update_data:
raise HTTPException(status_code=400, detail='没有要更新的字段')
# 更新食物字段
await secret.update_from_dict(update_data)
await secret.save()
return secret
# 删除食物
@app.delete("", response_model=CommonOut, description='删除食物', summary='删除食物')
@handle_exceptions_unified()
async def delete(id: UUID = Query(..., description='主键ID'),
):
"""删除食物"""
secret = await Food.get_or_none(id=id)
if not secret:
raise HTTPException(status_code=404, detail='食物不存在')
await secret.delete()
# Tortoise ORM 单个实例的 delete() 方法返回 None而不是删除的记录数
# 删除成功时手动返回 1如果有异常会被装饰器捕获
return CommonOut(count=1)