diff --git a/backend/app/api/v1/cache.py b/backend/app/api/v1/cache.py index 5fcd23b..8def87b 100644 --- a/backend/app/api/v1/cache.py +++ b/backend/app/api/v1/cache.py @@ -9,16 +9,65 @@ from app.db.session import get_db from app.schemas.base import ResponseModel, PaginatedResponse from app.schemas.cache import ( DetectMissingRequest, DetectMissingResponse, - BatchCacheRequest, CacheTaskResponse, CacheStatusResponse + BatchCacheRequest, CacheTaskResponse, CacheStatusResponse, + AllDataRequest ) from app.services.cache_service import CacheService from app.core.security import get_current_user from app.models.user import User -from app.utils.date_utils import parse_date +from app.utils.date_utils import parse_date, format_date router = APIRouter() +@router.post("/detect-all-missing", response_model=ResponseModel) +async def detect_all_missing_data( + request: AllDataRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """一键检测所有数据的缺失情况""" + service = CacheService(db) + start = parse_date(request.start_date) + end = parse_date(request.end_date) + + result = service.detect_all_missing_data( + request.security_type, + request.period_type, + start, + end + ) + + return ResponseModel(data=result) + + +@router.post("/cache-all-missing", response_model=ResponseModel) +async def cache_all_missing_data( + request: AllDataRequest, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """一键缓存所有缺失数据""" + service = CacheService(db) + start = parse_date(request.start_date) + end = parse_date(request.end_date) + + task = service.cache_all_missing_data( + request.security_type, + request.period_type, + start, + end + ) + + return ResponseModel(data={ + "task_id": task.id, + "task_name": task.task_name, + "status": task.status, + "total_count": task.total_count, + "progress": task.progress + }) + + @router.post("/detect-missing", response_model=ResponseModel) async def detect_missing_data( request: DetectMissingRequest, diff --git a/backend/app/models/cache.py b/backend/app/models/cache.py index 2c3f418..0d8860d 100644 --- a/backend/app/models/cache.py +++ b/backend/app/models/cache.py @@ -2,7 +2,7 @@ 缓存任务模型 """ from datetime import datetime, date -from sqlalchemy import Column, Integer, BigInteger, String, Numeric, Text, Date, DateTime, ForeignKey +from sqlalchemy import Column, Integer, BigInteger, String, Numeric, Text, Date, DateTime, ForeignKey, Boolean from sqlalchemy.orm import relationship from app.db.base import Base @@ -43,7 +43,7 @@ class CacheTaskDetail(Base): trade_date = Column(Date, nullable=False) expected_count = Column(Integer, default=0) actual_count = Column(Integer, default=0) - is_missing = Column(Integer, default=0) + is_missing = Column(Boolean, default=False) status = Column(String(20), default="pending") # pending, success, failed, skipped error_message = Column(Text) processed_at = Column(DateTime(timezone=True)) diff --git a/backend/app/schemas/cache.py b/backend/app/schemas/cache.py index bff402b..b272617 100644 --- a/backend/app/schemas/cache.py +++ b/backend/app/schemas/cache.py @@ -106,3 +106,11 @@ class BatchCacheRequest(BaseModel): start_date: str end_date: str code_list: List[str] + + +class AllDataRequest(BaseModel): + """一键检测/缓存所有数据请求""" + security_type: str = Field(..., description="证券类型: stock, future") + period_type: str = Field(default="daily", description="周期类型: daily, min1, min5, etc.") + start_date: str = Field(..., description="开始日期 YYYYMMDD") + end_date: str = Field(..., description="结束日期 YYYYMMDD") diff --git a/backend/app/services/cache_service.py b/backend/app/services/cache_service.py index b3fdefc..5edb8c8 100644 --- a/backend/app/services/cache_service.py +++ b/backend/app/services/cache_service.py @@ -13,6 +13,7 @@ from app.models.future import FutureKlineDaily from app.services.base_data_service import BaseDataService from app.services.stock_service import StockService from app.services.future_service import FutureService +from app.services.sdk_manager import sdk_manager from app.utils.date_utils import parse_date, format_date, get_market_from_code from app.config import settings @@ -28,6 +29,365 @@ class CacheService: self.stock_service = StockService(db) self.future_service = FutureService(db) + def get_all_codes(self, security_type: str) -> List[str]: + """ + 获取所有代码列表 + + Args: + security_type: 证券类型 (stock, future) + + Returns: + 代码列表 + """ + adapter = sdk_manager.get_default_connection() + if not adapter: + raise RuntimeError("SDK连接失败") + + if security_type == "stock": + return adapter.get_code_list("EXTRA_STOCK_A") + elif security_type == "future": + return adapter.get_code_list("EXTRA_FUTURE") + else: + return [] + + def detect_all_missing_data( + self, + security_type: str, + period_type: str, + start_date: date, + end_date: date + ) -> Dict: + """ + 一键检测所有数据的缺失情况 + + Args: + security_type: 证券类型 (stock, future) + period_type: 周期类型 (daily, min1, etc.) + start_date: 开始日期 + end_date: 结束日期 + + Returns: + 检测结果字典 + """ + # 获取所有代码 + code_list = self.get_all_codes(security_type) + + if not code_list: + raise ValueError(f"无法获取{security_type}代码列表") + + logger.info(f"获取到{len(code_list)}个{security_type}代码") + + # 创建检测任务 + task = CacheTask( + task_name=f"一键检测所有数据 - {security_type} - {len(code_list)}个代码", + task_type="detect_all_missing", + security_type=security_type, + period_type=period_type, + start_date=start_date, + end_date=end_date, + code_list=",".join(code_list[:100]) + "...", + status="running", + total_count=len(code_list), + started_at=datetime.utcnow() + ) + self.db.add(task) + self.db.commit() + self.db.refresh(task) + + try: + # 获取交易日历 + market = "CFE" if security_type == "future" else "SH" + trading_days = self.base_service.get_trading_calendar(market, start_date, end_date) + expected_count = len(trading_days) + + missing_codes = [] + complete_codes = [] + error_count = 0 + + # 统计每个交易日的缺失情况 + daily_stats = {} + for td in trading_days: + daily_stats[format_date(td)] = { + "expected": len(code_list), + "actual": 0, + "missing": 0 + } + + for i, code in enumerate(code_list): + try: + # 查询实际数据量 + if security_type == "stock" and period_type == "daily": + records = self.db.query(StockKlineDaily).filter( + and_( + StockKlineDaily.code == code, + StockKlineDaily.trade_date >= start_date, + StockKlineDaily.trade_date <= end_date + ) + ).all() + actual_count = len(records) + + # 更新每日统计 + for r in records: + date_key = format_date(r.trade_date) + if date_key in daily_stats: + daily_stats[date_key]["actual"] += 1 + + elif security_type == "future" and period_type == "daily": + records = self.db.query(FutureKlineDaily).filter( + and_( + FutureKlineDaily.code == code, + FutureKlineDaily.trade_date >= start_date, + FutureKlineDaily.trade_date <= end_date + ) + ).all() + actual_count = len(records) + + for r in records: + date_key = format_date(r.trade_date) + if date_key in daily_stats: + daily_stats[date_key]["actual"] += 1 + + else: + actual_count = 0 + + # 判断是否缺失 + is_missing = actual_count < expected_count + + if is_missing: + missing_codes.append({ + "code": code, + "actual_count": actual_count, + "expected_count": expected_count, + "missing_count": expected_count - actual_count, + "missing_ratio": (expected_count - actual_count) / expected_count if expected_count > 0 else 0 + }) + + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + expected_count=expected_count, + actual_count=actual_count, + is_missing=True, + status="pending" + ) + self.db.add(detail) + else: + complete_codes.append(code) + + except Exception as e: + logger.error(f"检测{code}缺失数据失败: {str(e)}") + error_count += 1 + + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + status="failed", + error_message=str(e) + ) + self.db.add(detail) + + # 每100个代码更新一次进度 + if (i + 1) % 100 == 0 or i == len(code_list) - 1: + task.success_count = len(missing_codes) + len(complete_codes) + task.error_count = error_count + task.progress = min(100, int((i + 1) / len(code_list) * 100)) + self.db.commit() + + # 计算每日缺失数 + for date_key in daily_stats: + daily_stats[date_key]["missing"] = daily_stats[date_key]["expected"] - daily_stats[date_key]["actual"] + + task.status = "completed" + task.success_count = len(complete_codes) + task.error_count = error_count + task.completed_at = datetime.utcnow() + self.db.commit() + + logger.info(f"检测完成: 完整{len(complete_codes)}个, 缺失{len(missing_codes)}个, 错误{error_count}个") + + return { + "task_id": task.id, + "task_name": task.task_name, + "status": task.status, + "progress": float(task.progress), + "total_count": task.total_count, + "complete_count": len(complete_codes), + "missing_count": len(missing_codes), + "error_count": error_count, + "expected_days": expected_count, + "start_date": format_date(start_date), + "end_date": format_date(end_date), + "security_type": security_type, + "period_type": period_type, + "daily_stats": daily_stats, + "missing_codes": missing_codes[:100] # 只返回前100个缺失代码 + } + + except Exception as e: + task.status = "failed" + task.error_message = str(e) + task.completed_at = datetime.utcnow() + self.db.commit() + logger.error(f"一键检测缺失数据任务失败: {str(e)}") + + return { + "task_id": task.id, + "task_name": task.task_name, + "status": task.status, + "error_message": str(e) + } + + def cache_all_missing_data( + self, + security_type: str, + period_type: str, + start_date: date, + end_date: date + ) -> CacheTask: + """ + 一键缓存所有缺失数据 + + Args: + security_type: 证券类型 + period_type: 周期类型 + start_date: 开始日期 + end_date: 结束日期 + + Returns: + 缓存任务对象 + """ + # 获取所有代码 + code_list = self.get_all_codes(security_type) + + if not code_list: + raise ValueError(f"无法获取{security_type}代码列表") + + logger.info(f"获取到{len(code_list)}个{security_type}代码,开始缓存") + + # 创建缓存任务 + task = CacheTask( + task_name=f"一键缓存所有数据 - {security_type} - {len(code_list)}个代码", + task_type="cache_all_data", + security_type=security_type, + period_type=period_type, + start_date=start_date, + end_date=end_date, + code_list=",".join(code_list[:100]) + "...", + status="running", + total_count=len(code_list), + started_at=datetime.utcnow() + ) + self.db.add(task) + self.db.commit() + self.db.refresh(task) + + try: + # 获取交易日历 + market = "CFE" if security_type == "future" else "SH" + trading_days = self.base_service.get_trading_calendar(market, start_date, end_date) + expected_count = len(trading_days) + + success_count = 0 + skipped_count = 0 + error_count = 0 + + for i, code in enumerate(code_list): + try: + # 先检查是否已有完整数据 + if security_type == "stock" and period_type == "daily": + actual_count = self.db.query(StockKlineDaily).filter( + and_( + StockKlineDaily.code == code, + StockKlineDaily.trade_date >= start_date, + StockKlineDaily.trade_date <= end_date + ) + ).count() + elif security_type == "future" and period_type == "daily": + actual_count = self.db.query(FutureKlineDaily).filter( + and_( + FutureKlineDaily.code == code, + FutureKlineDaily.trade_date >= start_date, + FutureKlineDaily.trade_date <= end_date + ) + ).count() + else: + actual_count = 0 + + # 如果数据完整,跳过 + if actual_count >= expected_count: + skipped_count += 1 + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + expected_count=expected_count, + actual_count=actual_count, + is_missing=False, + status="skipped" + ) + self.db.add(detail) + continue + + # 获取数据(会自动缓存) + if security_type == "stock": + self.stock_service.get_kline([code], start_date, end_date, period_type) + elif security_type == "future": + self.future_service.get_kline([code], start_date, end_date, period_type) + + success_count += 1 + + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + expected_count=expected_count, + actual_count=actual_count, + is_missing=True, + status="success", + processed_at=datetime.utcnow() + ) + self.db.add(detail) + + except Exception as e: + logger.error(f"缓存{code}数据失败: {str(e)}") + error_count += 1 + + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + status="failed", + error_message=str(e) + ) + self.db.add(detail) + + # 每50个代码更新一次进度 + if (i + 1) % 50 == 0 or i == len(code_list) - 1: + task.success_count = success_count + task.error_count = error_count + task.progress = min(100, int((i + 1) / len(code_list) * 100)) + self.db.commit() + logger.info(f"进度: {i + 1}/{len(code_list)}, 成功: {success_count}, 跳过: {skipped_count}, 错误: {error_count}") + + task.status = "completed" + task.success_count = success_count + task.error_count = error_count + task.completed_at = datetime.utcnow() + self.db.commit() + + logger.info(f"缓存完成: 成功{success_count}个, 跳过{skipped_count}个, 错误{error_count}个") + + except Exception as e: + task.status = "failed" + task.error_message = str(e) + task.completed_at = datetime.utcnow() + self.db.commit() + logger.error(f"一键缓存数据任务失败: {str(e)}") + + return task + def detect_missing_data( self, security_type: str, @@ -111,7 +471,7 @@ class CacheService: trade_date=start_date, expected_count=expected_count, actual_count=actual_count, - is_missing=1 if is_missing else 0, + is_missing=is_missing, status="pending" if is_missing else "skipped" ) self.db.add(detail) diff --git a/frontend/src/api/cache.ts b/frontend/src/api/cache.ts index af4c491..28e1642 100644 --- a/frontend/src/api/cache.ts +++ b/frontend/src/api/cache.ts @@ -20,6 +20,24 @@ export const batchCacheData = (data: { return request.post('/cache/batch-cache', data) } +export const detectAllMissingData = (data: { + security_type: string + period_type: string + start_date: string + end_date: string +}) => { + return request.post('/cache/detect-all-missing', data) +} + +export const cacheAllMissingData = (data: { + security_type: string + period_type: string + start_date: string + end_date: string +}) => { + return request.post('/cache/cache-all-missing', data) +} + export const getCacheTasks = (params?: { page?: number; page_size?: number }) => { return request.get('/cache/tasks', { params }) } diff --git a/frontend/src/views/CacheManager/DetectMissing.vue b/frontend/src/views/CacheManager/DetectMissing.vue index 14ea354..bad7b1b 100644 --- a/frontend/src/views/CacheManager/DetectMissing.vue +++ b/frontend/src/views/CacheManager/DetectMissing.vue @@ -31,6 +31,19 @@ value-format="YYYYMMDD" /> + + + 一键检测所有数据 + + + 一键缓存所有数据 + + + + + 批量检测(指定代码) + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + {{ detectResult.status || '-' }} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -80,15 +197,27 @@ @@ -185,7 +366,19 @@ const showDetail = (row: any) => { padding: 10px; } +.summary-card { + margin-top: 20px; +} + +.daily-card { + margin-top: 20px; +} + +.missing-card { + margin-top: 20px; +} + .result-card { margin-top: 20px; } - + \ No newline at end of file