diff --git a/backend/app/api/v1/cache.py b/backend/app/api/v1/cache.py index 0597428..b5cf8ee 100644 --- a/backend/app/api/v1/cache.py +++ b/backend/app/api/v1/cache.py @@ -417,6 +417,29 @@ async def get_missing_dates_for_code( return ResponseModel(data=result) +@router.get("/missing-codes-by-date", response_model=ResponseModel) +async def get_missing_codes_by_date( + trade_date: str = Query(..., description="交易日 YYYYMMDD"), + security_type: str = Query("stock", description="证券类型: stock, future"), + period_type: str = Query("daily", description="周期类型: daily, min1, min5"), + contract_type: str = Query("all", description="合约类型: all, main"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """获取某个交易日缺失的代码列表""" + service = CacheService(db) + date_obj = parse_date(trade_date) + + result = service.get_missing_codes_by_date( + date_obj, + security_type, + period_type, + contract_type + ) + + return ResponseModel(data=result) + + @router.post("/fill-single-date", response_model=ResponseModel) async def fill_single_date_data( request: dict, @@ -501,3 +524,67 @@ async def fill_all_dates_for_code( "missing_count": len(missing_dates), "status": "processing" }) + + +@router.post("/fill-date-batch", response_model=ResponseModel) +async def fill_missing_codes_for_date_batch( + request: dict, + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """批量补齐某个交易日的所有缺失代码数据""" + trade_date = request.get("trade_date") + security_type = request.get("security_type", "stock") + period_type = request.get("period_type", "daily") + missing_codes = request.get("missing_codes", []) + + if not trade_date: + return ResponseModel(code=400, message="缺少必要参数: trade_date") + + if not missing_codes or len(missing_codes) == 0: + return ResponseModel(data={ + "task_id": None, + "message": "没有缺失代码需要补齐", + "missing_count": 0 + }) + + from app.utils.date_utils import parse_date + date_obj = parse_date(trade_date) + + service = CacheService(db) + + task = service._create_batch_fill_task( + trade_date=date_obj, + security_type=security_type, + period_type=period_type, + missing_codes=missing_codes + ) + + def run_batch_fill_task(): + from app.db.session import SessionLocal + db_local = SessionLocal() + try: + service_local = CacheService(db_local) + service_local._execute_batch_fill_task( + task.id, + date_obj, + security_type, + period_type, + missing_codes + ) + except Exception as e: + import logging + logging.getLogger(__name__).error(f"批量补齐交易日数据失败: {str(e)}") + finally: + db_local.close() + + background_tasks.add_task(run_batch_fill_task) + + return ResponseModel(data={ + "task_id": task.id, + "task_name": task.task_name, + "trade_date": trade_date, + "missing_count": len(missing_codes), + "status": "processing" + }) diff --git a/backend/app/services/cache_service.py b/backend/app/services/cache_service.py index 0c2024c..9a5f4fd 100644 --- a/backend/app/services/cache_service.py +++ b/backend/app/services/cache_service.py @@ -11,6 +11,7 @@ from sqlalchemy import and_, func from app.models.cache import CacheTask, CacheTaskDetail from app.models.stock import StockKlineDaily from app.models.future import FutureKlineDaily +from app.models.config import SDKConfig from app.services.base_data_service import BaseDataService from app.services.stock_service import StockService from app.services.future_service import FutureService @@ -174,6 +175,12 @@ class CacheService: """ 一键检测所有数据的缺失情况 + 改进逻辑: + 1. 初步检测:统计每个代码的数据条数,找出疑似缺失的代码 + 2. 二次验证:对疑似缺失的代码,按交易日调用接口验证是否真的缺失 + - 如果接口也没有数据,说明该交易日股票未上市或停牌,不算缺失 + - 如果接口有数据但本地没有,说明确实缺失 + Args: security_type: 证券类型 (stock, future) period_type: 周期类型 (daily, min1, etc.) @@ -231,8 +238,9 @@ class CacheService: market = "CFE" if security_type == "future" else "SH" trading_days = self.base_service.get_trading_calendar(market, start_date, end_date) expected_count = len(trading_days) + trading_days_set = set(trading_days) - missing_codes = [] + preliminary_missing_codes = [] complete_codes = [] error_count = 0 @@ -241,11 +249,14 @@ class CacheService: daily_stats[format_date(td)] = { "expected": len(code_list), "actual": 0, - "missing": 0 + "missing": 0, + "verified_missing": 0 } push_progress(10, "running", message="查询数据库统计...") + code_existing_dates = {} + if security_type == "stock" and period_type == "daily": from sqlalchemy import func @@ -261,6 +272,21 @@ class CacheService: code_counts = {r.code: r.count for r in code_count_query} + code_dates_query = self.db.query( + StockKlineDaily.code, + StockKlineDaily.trade_date + ).filter( + and_( + StockKlineDaily.trade_date >= start_date, + StockKlineDaily.trade_date <= end_date + ) + ).all() + + for r in code_dates_query: + if r.code not in code_existing_dates: + code_existing_dates[r.code] = set() + code_existing_dates[r.code].add(r.trade_date) + date_count_query = self.db.query( func.date(StockKlineDaily.trade_date).label('trade_date'), func.count(StockKlineDaily.id).label('count') @@ -276,45 +302,38 @@ class CacheService: if date_key in daily_stats: daily_stats[date_key]["actual"] = r.count - push_progress(20, "running", message="分析数据完整性...") + push_progress(20, "running", message="初步分析数据完整性...") for i, code in enumerate(code_list): actual_count = code_counts.get(code, 0) is_missing = actual_count < expected_count if is_missing: - missing_codes.append({ + existing_dates = code_existing_dates.get(code, set()) + missing_dates = trading_days_set - existing_dates + + preliminary_missing_codes.append({ "code": code, "actual_count": actual_count, "expected_count": expected_count, - "missing_count": expected_count - actual_count, - "missing_ratio": (expected_count - actual_count) / expected_count if expected_count > 0 else 0 + "missing_count": len(missing_dates), + "missing_dates": sorted(list(missing_dates)), + "missing_ratio": len(missing_dates) / expected_count if expected_count > 0 else 0 }) - - detail = CacheTaskDetail( - task_id=task.id, - code=code, - trade_date=start_date, - expected_count=expected_count, - actual_count=actual_count, - is_missing=True, - status="pending" - ) - self.db.add(detail) else: complete_codes.append(code) if (i + 1) % 500 == 0 or i == len(code_list) - 1: - task.success_count = len(missing_codes) + len(complete_codes) + task.success_count = len(preliminary_missing_codes) + len(complete_codes) task.error_count = error_count - task.progress = min(100, int((i + 1) / len(code_list) * 100)) + task.progress = min(50, int((i + 1) / len(code_list) * 30)) self.db.commit() push_progress( - 20 + int((i + 1) / len(code_list) * 70), + 20 + int((i + 1) / len(code_list) * 30), "running", - processed=len(missing_codes) + len(complete_codes), - missing=len(missing_codes), + processed=len(preliminary_missing_codes) + len(complete_codes), + missing=len(preliminary_missing_codes), complete=len(complete_codes) ) @@ -333,6 +352,21 @@ class CacheService: code_counts = {r.code: r.count for r in code_count_query} + code_dates_query = self.db.query( + FutureKlineDaily.code, + FutureKlineDaily.trade_date + ).filter( + and_( + FutureKlineDaily.trade_date >= start_date, + FutureKlineDaily.trade_date <= end_date + ) + ).all() + + for r in code_dates_query: + if r.code not in code_existing_dates: + code_existing_dates[r.code] = set() + code_existing_dates[r.code].add(r.trade_date) + date_count_query = self.db.query( func.date(FutureKlineDaily.trade_date).label('trade_date'), func.count(FutureKlineDaily.id).label('count') @@ -348,89 +382,160 @@ class CacheService: if date_key in daily_stats: daily_stats[date_key]["actual"] = r.count - push_progress(20, "running", message="分析数据完整性...") + push_progress(20, "running", message="初步分析数据完整性...") for i, code in enumerate(code_list): actual_count = code_counts.get(code, 0) is_missing = actual_count < expected_count if is_missing: - missing_codes.append({ + existing_dates = code_existing_dates.get(code, set()) + missing_dates = trading_days_set - existing_dates + + preliminary_missing_codes.append({ "code": code, "actual_count": actual_count, "expected_count": expected_count, - "missing_count": expected_count - actual_count, - "missing_ratio": (expected_count - actual_count) / expected_count if expected_count > 0 else 0 + "missing_count": len(missing_dates), + "missing_dates": sorted(list(missing_dates)), + "missing_ratio": len(missing_dates) / expected_count if expected_count > 0 else 0 }) - - detail = CacheTaskDetail( - task_id=task.id, - code=code, - trade_date=start_date, - expected_count=expected_count, - actual_count=actual_count, - is_missing=True, - status="pending" - ) - self.db.add(detail) else: complete_codes.append(code) if (i + 1) % 500 == 0 or i == len(code_list) - 1: - task.success_count = len(missing_codes) + len(complete_codes) + task.success_count = len(preliminary_missing_codes) + len(complete_codes) task.error_count = error_count - task.progress = min(100, int((i + 1) / len(code_list) * 100)) + task.progress = min(50, int((i + 1) / len(code_list) * 30)) self.db.commit() push_progress( - 20 + int((i + 1) / len(code_list) * 70), + 20 + int((i + 1) / len(code_list) * 30), "running", - processed=len(missing_codes) + len(complete_codes), - missing=len(missing_codes), + processed=len(preliminary_missing_codes) + len(complete_codes), + missing=len(preliminary_missing_codes), complete=len(complete_codes) ) else: for i, code in enumerate(code_list): - actual_count = 0 - is_missing = True - - missing_codes.append({ + preliminary_missing_codes.append({ "code": code, "actual_count": 0, "expected_count": expected_count, "missing_count": expected_count, + "missing_dates": trading_days, "missing_ratio": 1.0 }) - detail = CacheTaskDetail( - task_id=task.id, - code=code, - trade_date=start_date, - expected_count=expected_count, - actual_count=0, - is_missing=True, - status="pending" - ) - self.db.add(detail) - if (i + 1) % 500 == 0 or i == len(code_list) - 1: - task.success_count = len(missing_codes) + task.success_count = len(preliminary_missing_codes) task.error_count = error_count - task.progress = min(100, int((i + 1) / len(code_list) * 100)) + task.progress = min(50, int((i + 1) / len(code_list) * 30)) self.db.commit() push_progress( - 20 + int((i + 1) / len(code_list) * 70), + 20 + int((i + 1) / len(code_list) * 30), "running", - processed=len(missing_codes), - missing=len(missing_codes) + processed=len(preliminary_missing_codes), + missing=len(preliminary_missing_codes) + ) + + logger.info(f"初步检测完成: 完整{len(complete_codes)}个, 疑似缺失{len(preliminary_missing_codes)}个") + + push_progress(50, "running", message="开始二次验证疑似缺失数据...") + + verified_missing_codes = [] + verified_complete_codes = [] + + adapter = sdk_manager.get_default_connection() + if not adapter: + logger.warning("SDK连接失败,跳过二次验证,使用初步检测结果") + verified_missing_codes = preliminary_missing_codes + else: + total_preliminary = len(preliminary_missing_codes) + + for i, item in enumerate(preliminary_missing_codes): + code = item["code"] + missing_dates = item["missing_dates"] + + truly_missing_dates = [] + unavailable_dates = [] + + for trade_date in missing_dates: + try: + if security_type == "stock": + kline_data = adapter.get_stock_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + elif security_type == "future": + kline_data = adapter.get_future_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + else: + kline_data = [] + + if kline_data and len(kline_data) > 0: + truly_missing_dates.append(trade_date) + else: + unavailable_dates.append(trade_date) + + except Exception as e: + logger.debug(f"验证{code} {format_date(trade_date)}时出错: {str(e)}") + unavailable_dates.append(trade_date) + + truly_missing_count = len(truly_missing_dates) + + if truly_missing_count > 0: + verified_missing_codes.append({ + "code": code, + "actual_count": item["actual_count"], + "expected_count": item["expected_count"], + "missing_count": truly_missing_count, + "missing_dates": truly_missing_dates, + "unavailable_dates": unavailable_dates, + "missing_ratio": truly_missing_count / expected_count if expected_count > 0 else 0 + }) + + detail = CacheTaskDetail( + task_id=task.id, + code=code, + trade_date=start_date, + expected_count=expected_count, + actual_count=item["actual_count"], + is_missing=True, + status="pending" + ) + self.db.add(detail) + else: + verified_complete_codes.append(code) + complete_codes.append(code) + + if (i + 1) % 50 == 0 or i == total_preliminary - 1: + task.success_count = len(complete_codes) + task.error_count = error_count + task.progress = min(100, 50 + int((i + 1) / total_preliminary * 50)) + self.db.commit() + + push_progress( + 50 + int((i + 1) / total_preliminary * 50), + "running", + message=f"二次验证进度: {i + 1}/{total_preliminary}", + processed=len(complete_codes), + missing=len(verified_missing_codes), + complete=len(complete_codes) ) for date_key in daily_stats: daily_stats[date_key]["missing"] = daily_stats[date_key]["expected"] - daily_stats[date_key]["actual"] - missing_code_list = [m["code"] for m in missing_codes] - task.code_list = ",".join(missing_code_list[:500]) if missing_code_list else "" + verified_missing_code_list = [m["code"] for m in verified_missing_codes] + task.code_list = ",".join(verified_missing_code_list[:500]) if verified_missing_code_list else "" task.status = "completed" task.success_count = len(complete_codes) @@ -439,13 +544,25 @@ class CacheService: self.db.commit() push_progress(100, "completed", - message="检测完成", + message="检测完成(含二次验证)", complete_count=len(complete_codes), - missing_count=len(missing_codes), + missing_count=len(verified_missing_codes), error_count=error_count ) - logger.info(f"检测完成: 完整{len(complete_codes)}个, 缺失{len(missing_codes)}个, 错误{error_count}个") + logger.info(f"检测完成(含二次验证): 完整{len(complete_codes)}个, 真实缺失{len(verified_missing_codes)}个, 错误{error_count}个") + + missing_codes_display = [] + for m in verified_missing_codes[:100]: + missing_dates_str = [format_date(d) for d in m.get("missing_dates", [])] + missing_codes_display.append({ + "code": m["code"], + "actual_count": m["actual_count"], + "expected_count": m["expected_count"], + "missing_count": m["missing_count"], + "missing_ratio": m["missing_ratio"], + "missing_dates_display": missing_dates_str[:10] + }) return { "task_id": task.id, @@ -455,7 +572,7 @@ class CacheService: "progress": float(task.progress), "total_count": task.total_count, "complete_count": len(complete_codes), - "missing_count": len(missing_codes), + "missing_count": len(verified_missing_codes), "error_count": error_count, "expected_days": expected_count, "start_date": format_date(start_date), @@ -463,8 +580,9 @@ class CacheService: "security_type": security_type, "period_type": period_type, "daily_stats": daily_stats, - "missing_codes": missing_codes[:100], - "missing_code_list": missing_code_list + "missing_codes": missing_codes_display, + "missing_code_list": verified_missing_code_list, + "verification_enabled": True } except Exception as e: @@ -1172,6 +1290,125 @@ class CacheService: "missing_dates": missing_dates_list } + def get_missing_codes_by_date( + self, + trade_date: date, + security_type: str, + period_type: str, + contract_type: str = "all" + ) -> Dict: + """ + 获取某个交易日缺失的代码列表 + + Args: + trade_date: 交易日 + security_type: 证券类型 (stock, future) + period_type: 周期类型 (daily, min1, etc.) + contract_type: 合约类型 (all, main) + + Returns: + 缺失代码列表详情 + """ + code_list = self.get_all_codes(security_type, contract_type) + + if not code_list: + raise ValueError(f"无法获取{security_type}代码列表") + + logger.info(f"获取{trade_date}交易日缺失代码,共{len(code_list)}个代码") + + existing_codes = set() + + if security_type == "stock" and period_type == "daily": + records = self.db.query(StockKlineDaily.code).filter( + StockKlineDaily.trade_date == trade_date + ).all() + existing_codes = set(r.code for r in records) + elif security_type == "future" and period_type == "daily": + records = self.db.query(FutureKlineDaily.code).filter( + FutureKlineDaily.trade_date == trade_date + ).all() + existing_codes = set(r.code for r in records) + + preliminary_missing_codes = [code for code in code_list if code not in existing_codes] + + adapter = sdk_manager.get_default_connection() + truly_missing_codes = [] + unavailable_codes = [] + unverified_codes = [] + sdk_connection_error = None + + if adapter: + logger.info(f"SDK连接成功,开始二次验证 {len(preliminary_missing_codes)} 个疑似缺失代码") + + for code in preliminary_missing_codes: + try: + if security_type == "stock": + kline_data = adapter.get_stock_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + elif security_type == "future": + kline_data = adapter.get_future_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + else: + kline_data = [] + + if kline_data and len(kline_data) > 0: + truly_missing_codes.append(code) + logger.debug(f"验证结果: {code} {format_date(trade_date)} - 接口有数据,确认为缺失") + else: + unavailable_codes.append(code) + logger.debug(f"验证结果: {code} {format_date(trade_date)} - 接口无数据,可能未上市或停牌") + + except Exception as e: + error_msg = str(e) + logger.warning(f"验证{code} {format_date(trade_date)}时出错: {error_msg}") + unavailable_codes.append(code) + else: + sdk_connection_error = "SDK连接失败,无法进行二次验证" + + from app.services.config_service import ConfigService + sdk_configs = self.db.query(SDKConfig).filter(SDKConfig.is_active == True).all() + + if not sdk_configs: + sdk_connection_error = "SDK连接失败:未配置任何SDK连接,请前往配置管理添加SDK配置" + logger.error(sdk_connection_error) + else: + default_config = self.db.query(SDKConfig).filter(SDKConfig.is_default == True).first() + if not default_config: + sdk_connection_error = "SDK连接失败:未设置默认SDK配置,请前往配置管理设置默认配置" + logger.error(sdk_connection_error) + else: + sdk_connection_error = f"SDK连接失败:默认配置 '{default_config.name}' 连接失败,请检查配置或测试连接" + logger.error(sdk_connection_error) + + unverified_codes = preliminary_missing_codes + logger.warning(f"由于SDK连接失败,{len(unverified_codes)} 个疑似缺失代码未能验证,暂不标记为真实缺失") + + return { + "trade_date": format_date(trade_date), + "security_type": security_type, + "period_type": period_type, + "contract_type": contract_type, + "total_codes": len(code_list), + "existing_codes": len(existing_codes), + "missing_count": len(truly_missing_codes), + "unavailable_count": len(unavailable_codes), + "unverified_count": len(unverified_codes), + "missing_codes": truly_missing_codes[:100], + "missing_code_list": truly_missing_codes, + "unavailable_codes": unavailable_codes[:50], + "unverified_codes": unverified_codes[:50], + "verification_status": "verified" if adapter else "skipped", + "sdk_connection_error": sdk_connection_error + } + def fill_single_date_data( self, code: str, @@ -1252,3 +1489,104 @@ class CacheService: error_count += 1 logger.info(f"补齐完成: {code}, 成功{success_count}个, 失败{error_count}个") + + def _create_batch_fill_task( + self, + trade_date: date, + security_type: str, + period_type: str, + missing_codes: List[str] + ) -> CacheTask: + """创建批量补齐交易日数据任务记录""" + if not missing_codes: + raise ValueError("没有缺失代码需要补齐") + + task = CacheTask( + task_name=f"批量补齐交易日数据 - {format_date(trade_date)} - {len(missing_codes)}个代码", + task_type="batch_fill_date", + security_type=security_type, + period_type=period_type, + start_date=trade_date, + end_date=trade_date, + code_list=",".join(missing_codes[:500]), + status="pending", + total_count=len(missing_codes), + started_at=datetime.utcnow() + ) + self.db.add(task) + self.db.commit() + self.db.refresh(task) + + return task + + def _execute_batch_fill_task( + self, + task_id: int, + trade_date: date, + security_type: str, + period_type: str, + missing_codes: List[str] + ): + """执行批量补齐交易日数据任务""" + task = self.db.query(CacheTask).filter(CacheTask.id == task_id).first() + if not task: + return + + task.status = "running" + self.db.commit() + + adapter = sdk_manager.get_default_connection() + + success_count = 0 + error_count = 0 + + for i, code in enumerate(missing_codes): + try: + if adapter: + if security_type == "stock": + kline_data = adapter.get_stock_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + elif security_type == "future": + kline_data = adapter.get_future_kline( + code, + format_date(trade_date), + format_date(trade_date), + period_type + ) + else: + kline_data = [] + + if kline_data and len(kline_data) > 0: + if security_type == "stock": + self.stock_service._save_kline_data(code, kline_data, period_type) + elif security_type == "future": + self.future_service._save_kline_data(code, kline_data, period_type) + success_count += 1 + else: + error_count += 1 + logger.debug(f"{code} {format_date(trade_date)} 无数据") + else: + error_count += 1 + logger.warning(f"SDK连接失败,无法补齐 {code}") + + except Exception as e: + logger.error(f"补齐{code} {format_date(trade_date)}失败: {str(e)}") + error_count += 1 + + if (i + 1) % 20 == 0 or i == len(missing_codes) - 1: + task.success_count = success_count + task.error_count = error_count + task.progress = min(100, int((i + 1) / len(missing_codes) * 100)) + self.db.commit() + + task.status = "completed" + task.success_count = success_count + task.error_count = error_count + task.completed_at = datetime.utcnow() + self.db.commit() + + logger.info(f"批量补齐完成: {format_date(trade_date)}, 成功{success_count}个, 失败{error_count}个") diff --git a/frontend/src/api/cache.ts b/frontend/src/api/cache.ts index ce7107d..cac83d4 100644 --- a/frontend/src/api/cache.ts +++ b/frontend/src/api/cache.ts @@ -107,3 +107,21 @@ export const fillAllDatesForCode = (data: { }) => { return cacheRequest.post('/cache/fill-code-all', data) } + +export const getMissingCodesByDate = (params: { + trade_date: string + security_type: string + period_type: string + contract_type?: string +}) => { + return request.get('/cache/missing-codes-by-date', { params }) +} + +export const fillMissingCodesForDateBatch = (data: { + trade_date: string + security_type: string + period_type: string + missing_codes: string[] +}) => { + return cacheRequest.post('/cache/fill-date-batch', data) +} diff --git a/frontend/src/views/CacheManager/DetectMissing.vue b/frontend/src/views/CacheManager/DetectMissing.vue index 0540ecc..4a685f3 100644 --- a/frontend/src/views/CacheManager/DetectMissing.vue +++ b/frontend/src/views/CacheManager/DetectMissing.vue @@ -175,9 +175,9 @@ - + @@ -196,9 +196,165 @@ /> + + + + + +
+ +

正在加载缺失代码数据...

+
+ +
+ + + + + + + + + + {{ dailyMissingCodes.trade_date }} + {{ dailyMissingCodes.total_codes }} + {{ dailyMissingCodes.existing_codes }} + + {{ dailyMissingCodes.missing_count }} + + + {{ dailyMissingCodes.unavailable_count }} + + + + {{ dailyMissingCodes.unverified_count || 0 }} + + + + +
+
+ 缺失代码列表(前100个): + + 补齐该交易日所有缺失数据 + +
+ + + + + + + + + + + +
+ +
+ +
+ +
+ +
+ +
+ 未验证代码(SDK连接失败,无法确认是否缺失) + + {{ code }} + +

+ 这些代码因SDK连接失败未能验证,请先配置SDK连接后再重新检测 +

+
+ +
+ 不可用代码(未上市或停牌,无需补齐) + + {{ code }} + +
+
+ +
+

暂无数据

+
+ + +
+