""" AmazingData 数据服务平台 - 缓存管理 API """ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from sqlalchemy import func from typing import Optional import os import json from datetime import datetime from backend.models.database import get_db from backend.models.schemas import BaseResponse, CacheFileItem, CacheStats from backend.models.tables import CacheRecord, User from backend.auth.dependencies import get_current_user from backend.config import settings router = APIRouter() @router.get("/list", response_model=BaseResponse) async def list_cache_files( file_type: Optional[str] = Query(None, description="文件类型: stock/future/realtime"), trading_day: Optional[str] = Query(None, description="交易日"), code: Optional[str] = Query(None, description="代码"), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user) ): """列出缓存文件""" query = db.query(CacheRecord) if file_type: query = query.filter(CacheRecord.file_type == file_type) if trading_day: query = query.filter(CacheRecord.trading_day == trading_day) if code: query = query.filter(CacheRecord.code == code) total = query.count() records = query.order_by(CacheRecord.created_at.desc())\ .offset((page - 1) * page_size)\ .limit(page_size)\ .all() return BaseResponse(data={ "files": [ { "id": r.id, "filename": r.filename, "file_type": r.file_type, "trading_day": r.trading_day, "code": r.code, "period": r.period, "record_count": r.record_count, "file_size": r.file_size, "file_path": r.file_path, "created_at": r.created_at.isoformat() if r.created_at else None } for r in records ], "total": total, "page": page, "page_size": page_size }) @router.get("/stats", response_model=BaseResponse) async def get_cache_stats( db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user) ): """获取缓存统计""" # 数据库统计 total_files = db.query(CacheRecord).count() total_size = db.query(CacheRecord).with_entities( func.coalesce(func.sum(CacheRecord.file_size), 0) ).scalar() # 按类型统计 by_type = {} type_stats = db.query(CacheRecord.file_type, func.count(CacheRecord.id))\ .group_by(CacheRecord.file_type).all() for t, count in type_stats: by_type[t] = count # 按日期统计 by_day = {} day_stats = db.query(CacheRecord.trading_day, func.count(CacheRecord.id))\ .filter(CacheRecord.trading_day.isnot(None))\ .group_by(CacheRecord.trading_day).all() for d, count in day_stats: by_day[d] = count # 文件系统统计 data_path = settings.DATA_SAVE_PATH disk_stats = {} if os.path.exists(data_path): for root, dirs, files in os.walk(data_path): for f in files: if f.endswith('.json'): filepath = os.path.join(root, f) disk_stats[f] = os.path.getsize(filepath) return BaseResponse(data={ "total_files": total_files, "total_size": total_size, "by_type": by_type, "by_day": by_day, "disk_files": len(disk_stats), "disk_size": sum(disk_stats.values()) }) @router.get("/data/{file_type}/{trading_day}", response_model=BaseResponse) async def get_cache_data( file_type: str, trading_day: str, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user) ): """获取缓存数据""" # 查找文件 if file_type == "stock": filename = f"kline_{trading_day}.json" filepath = os.path.join(settings.DATA_SAVE_PATH, "stock", filename) elif file_type == "future": filename = f"futures_{trading_day}.json" filepath = os.path.join(settings.DATA_SAVE_PATH, "future", filename) else: raise HTTPException(status_code=400, detail="Invalid file type") if not os.path.exists(filepath): raise HTTPException(status_code=404, detail="File not found") with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return BaseResponse(data=data) @router.delete("/cleanup", response_model=BaseResponse) async def cleanup_old_cache( days: int = Query(30, ge=1), db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user) ): """清理旧缓存""" from datetime import timedelta cutoff_date = datetime.utcnow() - timedelta(days=days) # 删除数据库记录 old_records = db.query(CacheRecord).filter(CacheRecord.created_at < cutoff_date).all() deleted_count = 0 for record in old_records: # 删除文件 if record.file_path and os.path.exists(record.file_path): try: os.remove(record.file_path) except Exception: pass db.delete(record) deleted_count += 1 db.commit() return BaseResponse(data={"deleted_count": deleted_count})