You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
5.3 KiB
170 lines
5.3 KiB
"""
|
|
AmazingData 数据服务平台 - 缓存管理 API
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
from typing import Optional
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from backend.models.database import get_db
|
|
from backend.models.schemas import BaseResponse, CacheFileItem, CacheStats
|
|
from backend.models.tables import CacheRecord, User
|
|
from backend.auth.dependencies import get_current_user
|
|
from backend.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/list", response_model=BaseResponse)
|
|
async def list_cache_files(
|
|
file_type: Optional[str] = Query(None, description="文件类型: stock/future/realtime"),
|
|
trading_day: Optional[str] = Query(None, description="交易日"),
|
|
code: Optional[str] = Query(None, description="代码"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
db: Session = Depends(get_db),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
):
|
|
"""列出缓存文件"""
|
|
query = db.query(CacheRecord)
|
|
|
|
if file_type:
|
|
query = query.filter(CacheRecord.file_type == file_type)
|
|
if trading_day:
|
|
query = query.filter(CacheRecord.trading_day == trading_day)
|
|
if code:
|
|
query = query.filter(CacheRecord.code == code)
|
|
|
|
total = query.count()
|
|
records = query.order_by(CacheRecord.created_at.desc())\
|
|
.offset((page - 1) * page_size)\
|
|
.limit(page_size)\
|
|
.all()
|
|
|
|
return BaseResponse(data={
|
|
"files": [
|
|
{
|
|
"id": r.id,
|
|
"filename": r.filename,
|
|
"file_type": r.file_type,
|
|
"trading_day": r.trading_day,
|
|
"code": r.code,
|
|
"period": r.period,
|
|
"record_count": r.record_count,
|
|
"file_size": r.file_size,
|
|
"file_path": r.file_path,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None
|
|
}
|
|
for r in records
|
|
],
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size
|
|
})
|
|
|
|
|
|
@router.get("/stats", response_model=BaseResponse)
|
|
async def get_cache_stats(
|
|
db: Session = Depends(get_db),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
):
|
|
"""获取缓存统计"""
|
|
# 数据库统计
|
|
total_files = db.query(CacheRecord).count()
|
|
total_size = db.query(CacheRecord).with_entities(
|
|
func.coalesce(func.sum(CacheRecord.file_size), 0)
|
|
).scalar()
|
|
|
|
# 按类型统计
|
|
by_type = {}
|
|
type_stats = db.query(CacheRecord.file_type, func.count(CacheRecord.id))\
|
|
.group_by(CacheRecord.file_type).all()
|
|
for t, count in type_stats:
|
|
by_type[t] = count
|
|
|
|
# 按日期统计
|
|
by_day = {}
|
|
day_stats = db.query(CacheRecord.trading_day, func.count(CacheRecord.id))\
|
|
.filter(CacheRecord.trading_day.isnot(None))\
|
|
.group_by(CacheRecord.trading_day).all()
|
|
for d, count in day_stats:
|
|
by_day[d] = count
|
|
|
|
# 文件系统统计
|
|
data_path = settings.DATA_SAVE_PATH
|
|
disk_stats = {}
|
|
if os.path.exists(data_path):
|
|
for root, dirs, files in os.walk(data_path):
|
|
for f in files:
|
|
if f.endswith('.json'):
|
|
filepath = os.path.join(root, f)
|
|
disk_stats[f] = os.path.getsize(filepath)
|
|
|
|
return BaseResponse(data={
|
|
"total_files": total_files,
|
|
"total_size": total_size,
|
|
"by_type": by_type,
|
|
"by_day": by_day,
|
|
"disk_files": len(disk_stats),
|
|
"disk_size": sum(disk_stats.values())
|
|
})
|
|
|
|
|
|
@router.get("/data/{file_type}/{trading_day}", response_model=BaseResponse)
|
|
async def get_cache_data(
|
|
file_type: str,
|
|
trading_day: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
):
|
|
"""获取缓存数据"""
|
|
# 查找文件
|
|
if file_type == "stock":
|
|
filename = f"kline_{trading_day}.json"
|
|
filepath = os.path.join(settings.DATA_SAVE_PATH, "stock", filename)
|
|
elif file_type == "future":
|
|
filename = f"futures_{trading_day}.json"
|
|
filepath = os.path.join(settings.DATA_SAVE_PATH, "future", filename)
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Invalid file type")
|
|
|
|
if not os.path.exists(filepath):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
return BaseResponse(data=data)
|
|
|
|
|
|
@router.delete("/cleanup", response_model=BaseResponse)
|
|
async def cleanup_old_cache(
|
|
days: int = Query(30, ge=1),
|
|
db: Session = Depends(get_db),
|
|
current_user: Optional[User] = Depends(get_current_user)
|
|
):
|
|
"""清理旧缓存"""
|
|
from datetime import timedelta
|
|
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
# 删除数据库记录
|
|
old_records = db.query(CacheRecord).filter(CacheRecord.created_at < cutoff_date).all()
|
|
|
|
deleted_count = 0
|
|
for record in old_records:
|
|
# 删除文件
|
|
if record.file_path and os.path.exists(record.file_path):
|
|
try:
|
|
os.remove(record.file_path)
|
|
except Exception:
|
|
pass
|
|
db.delete(record)
|
|
deleted_count += 1
|
|
|
|
db.commit()
|
|
|
|
return BaseResponse(data={"deleted_count": deleted_count}) |