from typing import Type, TypeVar, Generic, Sequence, Optional, Callable, Awaitable import math from fastapi import Query from pydantic import BaseModel from tortoise.queryset import QuerySet from tortoise.models import Model T = TypeVar('T', bound=BaseModel) M = TypeVar('M', bound=Model) class PaginationPydantic(BaseModel, Generic[T]): """分页模型""" status_code: int = 200 message: str = "Success" total: int page: int size: int total_pages: int data: Sequence[T] class Params(BaseModel): """传参""" page: int = Query(1, ge=1, description="Page number") size: int = Query(10, gt=0, le=200, description="Page size") order_by: Optional[str] = Query(None, max_length=32, description="Sort key") async def pagination( pydantic_model: Type[T], query_set: QuerySet[M], params: Params, callback: Optional[Callable[[QuerySet[M]], Awaitable[QuerySet[M]]]] = None ) -> PaginationPydantic[T]: """分页响应""" page: int = params.page size: int = params.size order_by: Optional[str] = params.order_by total = await query_set.count() # 计算总页数 total_pages = math.ceil(total / size) if total > 0 else 1 if page > total_pages and total > 0: raise ValueError("页数输入有误") # 排序 if order_by: order_by_fields = order_by.split(',') query_set = query_set.order_by(*order_by_fields) # 分页 query_set = query_set.offset((page - 1) * size).limit(size) # 回调处理 if callback: query_set = await callback(query_set) # 获取数据 data = await pydantic_model.from_queryset(query_set) return PaginationPydantic[T]( total=total, page=page, size=size, total_pages=total_pages, data=data )