You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
5.3 KiB

"""
AmazingData 数据服务平台 - 缓存管理 API
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import Optional
import os
import json
from datetime import datetime
from backend.models.database import get_db
from backend.models.schemas import BaseResponse, CacheFileItem, CacheStats
from backend.models.tables import CacheRecord, User
from backend.auth.dependencies import get_current_user
from backend.config import settings
router = APIRouter()
@router.get("/list", response_model=BaseResponse)
async def list_cache_files(
file_type: Optional[str] = Query(None, description="文件类型: stock/future/realtime"),
trading_day: Optional[str] = Query(None, description="交易日"),
code: Optional[str] = Query(None, description="代码"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user)
):
"""列出缓存文件"""
query = db.query(CacheRecord)
if file_type:
query = query.filter(CacheRecord.file_type == file_type)
if trading_day:
query = query.filter(CacheRecord.trading_day == trading_day)
if code:
query = query.filter(CacheRecord.code == code)
total = query.count()
records = query.order_by(CacheRecord.created_at.desc())\
.offset((page - 1) * page_size)\
.limit(page_size)\
.all()
return BaseResponse(data={
"files": [
{
"id": r.id,
"filename": r.filename,
"file_type": r.file_type,
"trading_day": r.trading_day,
"code": r.code,
"period": r.period,
"record_count": r.record_count,
"file_size": r.file_size,
"file_path": r.file_path,
"created_at": r.created_at.isoformat() if r.created_at else None
}
for r in records
],
"total": total,
"page": page,
"page_size": page_size
})
@router.get("/stats", response_model=BaseResponse)
async def get_cache_stats(
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user)
):
"""获取缓存统计"""
# 数据库统计
total_files = db.query(CacheRecord).count()
total_size = db.query(CacheRecord).with_entities(
func.coalesce(func.sum(CacheRecord.file_size), 0)
).scalar()
# 按类型统计
by_type = {}
type_stats = db.query(CacheRecord.file_type, func.count(CacheRecord.id))\
.group_by(CacheRecord.file_type).all()
for t, count in type_stats:
by_type[t] = count
# 按日期统计
by_day = {}
day_stats = db.query(CacheRecord.trading_day, func.count(CacheRecord.id))\
.filter(CacheRecord.trading_day.isnot(None))\
.group_by(CacheRecord.trading_day).all()
for d, count in day_stats:
by_day[d] = count
# 文件系统统计
data_path = settings.DATA_SAVE_PATH
disk_stats = {}
if os.path.exists(data_path):
for root, dirs, files in os.walk(data_path):
for f in files:
if f.endswith('.json'):
filepath = os.path.join(root, f)
disk_stats[f] = os.path.getsize(filepath)
return BaseResponse(data={
"total_files": total_files,
"total_size": total_size,
"by_type": by_type,
"by_day": by_day,
"disk_files": len(disk_stats),
"disk_size": sum(disk_stats.values())
})
@router.get("/data/{file_type}/{trading_day}", response_model=BaseResponse)
async def get_cache_data(
file_type: str,
trading_day: str,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user)
):
"""获取缓存数据"""
# 查找文件
if file_type == "stock":
filename = f"kline_{trading_day}.json"
filepath = os.path.join(settings.DATA_SAVE_PATH, "stock", filename)
elif file_type == "future":
filename = f"futures_{trading_day}.json"
filepath = os.path.join(settings.DATA_SAVE_PATH, "future", filename)
else:
raise HTTPException(status_code=400, detail="Invalid file type")
if not os.path.exists(filepath):
raise HTTPException(status_code=404, detail="File not found")
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
return BaseResponse(data=data)
@router.delete("/cleanup", response_model=BaseResponse)
async def cleanup_old_cache(
days: int = Query(30, ge=1),
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user)
):
"""清理旧缓存"""
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
# 删除数据库记录
old_records = db.query(CacheRecord).filter(CacheRecord.created_at < cutoff_date).all()
deleted_count = 0
for record in old_records:
# 删除文件
if record.file_path and os.path.exists(record.file_path):
try:
os.remove(record.file_path)
except Exception:
pass
db.delete(record)
deleted_count += 1
db.commit()
return BaseResponse(data={"deleted_count": deleted_count})