You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
11 KiB

"""
缓存管理服务
"""
import logging
from typing import List, Dict, Optional
from datetime import date, datetime
from sqlalchemy.orm import Session
from sqlalchemy import and_, func
from app.models.cache import CacheTask, CacheTaskDetail
from app.models.stock import StockKlineDaily
from app.models.future import FutureKlineDaily
from app.services.base_data_service import BaseDataService
from app.services.stock_service import StockService
from app.services.future_service import FutureService
from app.utils.date_utils import parse_date, format_date, get_market_from_code
from app.config import settings
logger = logging.getLogger(__name__)
class CacheService:
"""缓存服务"""
def __init__(self, db: Session):
self.db = db
self.base_service = BaseDataService(db)
self.stock_service = StockService(db)
self.future_service = FutureService(db)
def detect_missing_data(
self,
security_type: str,
period_type: str,
start_date: date,
end_date: date,
code_list: List[str]
) -> CacheTask:
"""
检测缺失数据
Args:
security_type: 证券类型 (stock, future)
period_type: 周期类型 (daily, min1, etc.)
start_date: 开始日期
end_date: 结束日期
code_list: 代码列表
Returns:
缓存任务对象
"""
# 创建检测任务
task = CacheTask(
task_name=f"检测缺失数据 - {security_type} - {len(code_list)}个代码",
task_type="detect_missing",
security_type=security_type,
period_type=period_type,
start_date=start_date,
end_date=end_date,
code_list=",".join(code_list),
status="running",
total_count=len(code_list),
started_at=datetime.utcnow()
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
try:
# 获取交易日历
market = "CFE" if security_type == "future" else "SH"
trading_days = self.base_service.get_trading_calendar(market, start_date, end_date)
expected_count = len(trading_days)
success_count = 0
error_count = 0
for code in code_list:
try:
# 查询实际数据量
if security_type == "stock" and period_type == "daily":
actual_count = self.db.query(StockKlineDaily).filter(
and_(
StockKlineDaily.code == code,
StockKlineDaily.trade_date >= start_date,
StockKlineDaily.trade_date <= end_date
)
).count()
elif security_type == "future" and period_type == "daily":
actual_count = self.db.query(FutureKlineDaily).filter(
and_(
FutureKlineDaily.code == code,
FutureKlineDaily.trade_date >= start_date,
FutureKlineDaily.trade_date <= end_date
)
).count()
else:
actual_count = 0
# 计算缺失率
missing_ratio = 0
if expected_count > 0:
missing_ratio = (expected_count - actual_count) / expected_count
is_missing = missing_ratio > settings.CACHE_MISSING_THRESHOLD
# 创建任务详情
detail = CacheTaskDetail(
task_id=task.id,
code=code,
trade_date=start_date,
expected_count=expected_count,
actual_count=actual_count,
is_missing=1 if is_missing else 0,
status="pending" if is_missing else "skipped"
)
self.db.add(detail)
if is_missing:
success_count += 1
except Exception as e:
logger.error(f"检测{code}缺失数据失败: {str(e)}")
error_count += 1
detail = CacheTaskDetail(
task_id=task.id,
code=code,
trade_date=start_date,
status="failed",
error_message=str(e)
)
self.db.add(detail)
# 更新进度
task.success_count = success_count
task.error_count = error_count
task.progress = min(100, int((success_count + error_count) / len(code_list) * 100))
self.db.commit()
task.status = "completed"
task.completed_at = datetime.utcnow()
self.db.commit()
except Exception as e:
task.status = "failed"
task.error_message = str(e)
task.completed_at = datetime.utcnow()
self.db.commit()
logger.error(f"检测缺失数据任务失败: {str(e)}")
return task
def batch_cache_data(
self,
security_type: str,
period_type: str,
start_date: date,
end_date: date,
code_list: List[str]
) -> CacheTask:
"""
批量缓存数据
Args:
security_type: 证券类型
period_type: 周期类型
start_date: 开始日期
end_date: 结束日期
code_list: 代码列表
Returns:
缓存任务对象
"""
# 创建缓存任务
task = CacheTask(
task_name=f"批量缓存数据 - {security_type} - {len(code_list)}个代码",
task_type="cache_data",
security_type=security_type,
period_type=period_type,
start_date=start_date,
end_date=end_date,
code_list=",".join(code_list),
status="running",
total_count=len(code_list),
started_at=datetime.utcnow()
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
try:
success_count = 0
error_count = 0
for code in code_list:
try:
# 获取数据(会自动缓存)
if security_type == "stock":
self.stock_service.get_kline([code], start_date, end_date, period_type)
elif security_type == "future":
self.future_service.get_kline([code], start_date, end_date, period_type)
success_count += 1
# 创建任务详情
detail = CacheTaskDetail(
task_id=task.id,
code=code,
trade_date=start_date,
status="success",
processed_at=datetime.utcnow()
)
self.db.add(detail)
except Exception as e:
logger.error(f"缓存{code}数据失败: {str(e)}")
error_count += 1
detail = CacheTaskDetail(
task_id=task.id,
code=code,
trade_date=start_date,
status="failed",
error_message=str(e)
)
self.db.add(detail)
# 更新进度
task.success_count = success_count
task.error_count = error_count
task.progress = min(100, int((success_count + error_count) / len(code_list) * 100))
self.db.commit()
task.status = "completed"
task.completed_at = datetime.utcnow()
self.db.commit()
except Exception as e:
task.status = "failed"
task.error_message = str(e)
task.completed_at = datetime.utcnow()
self.db.commit()
logger.error(f"批量缓存数据任务失败: {str(e)}")
return task
def get_tasks(
self,
page: int = 1,
page_size: int = 20
) -> Dict:
"""获取缓存任务列表"""
query = self.db.query(CacheTask).order_by(CacheTask.created_at.desc())
total = query.count()
tasks = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": tasks,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
def get_task(self, task_id: int) -> Optional[CacheTask]:
"""获取任务详情"""
return self.db.query(CacheTask).filter(CacheTask.id == task_id).first()
def get_task_details(self, task_id: int) -> List[CacheTaskDetail]:
"""获取任务详情列表"""
return self.db.query(CacheTaskDetail).filter(
CacheTaskDetail.task_id == task_id
).all()
def cancel_task(self, task_id: int) -> bool:
"""取消任务"""
task = self.db.query(CacheTask).filter(CacheTask.id == task_id).first()
if task and task.status == "running":
task.status = "cancelled"
task.completed_at = datetime.utcnow()
self.db.commit()
return True
return False
def get_cache_status(self, code: str, security_type: str, period_type: str) -> Dict:
"""获取代码缓存状态"""
if security_type == "stock":
return self.stock_service.get_cache_status(code, period_type)
elif security_type == "future":
return self.future_service.get_cache_status(code, period_type)
else:
return {
"code": code,
"security_type": security_type,
"period_type": period_type,
"record_count": 0,
"min_date": None,
"max_date": None
}