diff --git a/app/analysis_models.py b/app/analysis_models.py index eb9bfb7..9557f0b 100644 --- a/app/analysis_models.py +++ b/app/analysis_models.py @@ -95,6 +95,7 @@ class AIAnalysisCache(AnalysisBase): id = Column(Integer, primary_key=True, autoincrement=True) symbol = Column(String(32), nullable=False, index=True, comment="品种合约代码") analysis_data = Column(JSON, nullable=False, comment="AI分析结果数据") + kline_timestamp = Column(DateTime, nullable=True, comment="分析时K线数据的时间戳") created_at = Column(DateTime, nullable=False, default=datetime.now, index=True, comment="分析时间") def __repr__(self): diff --git a/app/api/__pycache__/futures_analysis.cpython-311.pyc b/app/api/__pycache__/futures_analysis.cpython-311.pyc index f684a90..4291edb 100644 Binary files a/app/api/__pycache__/futures_analysis.cpython-311.pyc and b/app/api/__pycache__/futures_analysis.cpython-311.pyc differ diff --git a/app/api/futures_analysis.py b/app/api/futures_analysis.py index d64af42..24d4fc0 100644 --- a/app/api/futures_analysis.py +++ b/app/api/futures_analysis.py @@ -818,11 +818,12 @@ def run_ai_analysis(symbol: str, db: Session = Depends(get_db), analysis_db: Ses @router.get("/ai-analysis/{symbol}") def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depends(get_db), analysis_db: Session = Depends(get_analysis_db)): - """获取AI分析结果(可选择是否强制刷新)""" + """获取AI分析结果(智能判断是否需要重新分析)""" try: analyzer = AIFuturesAnalyzer(db, analysis_db) if force_refresh: + logger.info(f"强制刷新: {symbol}") result = analyzer.analyze(symbol) if result.get("success"): return { @@ -836,8 +837,35 @@ def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depe "error": result.get("error", "AI分析失败") } + # 获取最新缓存 cache = analyzer.get_latest_cache(symbol) if cache: + # 智能判断是否需要重新分析 + if analyzer.should_reanalyze(symbol, cache): + logger.info(f"检测到数据变化或超时,自动重新分析: {symbol}") + result = analyzer.analyze(symbol) + if result.get("success"): + return { + "success": True, + "data": result["data"], + "is_cached": False + } + else: + # 如果重新分析失败,返回旧缓存 + logger.warning(f"重新分析失败,返回旧缓存: {symbol}") + return { + "success": True, + "data": { + "id": cache.id, + "symbol": cache.symbol, + "analysis_time": cache.created_at.isoformat(), + "result": cache.analysis_data + }, + "is_cached": True, + "warning": "分析数据可能不是最新的" + } + + # 返回缓存数据 return { "success": True, "data": { @@ -849,6 +877,7 @@ def get_ai_analysis(symbol: str, force_refresh: bool = False, db: Session = Depe "is_cached": True } + # 没有缓存,执行分析 result = analyzer.analyze(symbol) if result.get("success"): return { diff --git a/app/services/ai_analysis.py b/app/services/ai_analysis.py index bdce2ab..fced968 100644 --- a/app/services/ai_analysis.py +++ b/app/services/ai_analysis.py @@ -382,11 +382,12 @@ class AIFuturesAnalyzer: logger.error(f"解析AI响应失败: {e}") return None - def save_analysis_cache(self, symbol: str, analysis_data: Dict) -> AIAnalysisCache: + def save_analysis_cache(self, symbol: str, analysis_data: Dict, kline_timestamp: datetime) -> AIAnalysisCache: """保存AI分析结果到缓存""" cache = AIAnalysisCache( symbol=symbol, analysis_data=analysis_data, + kline_timestamp=kline_timestamp, created_at=datetime.now() ) @@ -402,10 +403,61 @@ class AIFuturesAnalyzer: AIAnalysisCache.symbol == symbol ).order_by(AIAnalysisCache.created_at.desc()).first() + def get_latest_kline_timestamp(self, symbol: str) -> Optional[datetime]: + """获取当前K线数据的最新时间戳""" + cached_data = get_cached_data( + self.db, + symbol, + "futures", + ["5min", "15min", "30min", "60min", "daily"] + ) + + if not cached_data or not cached_data.get("timeframes"): + return None + + latest_time = None + for period, candles in cached_data.get("timeframes", {}).items(): + if candles and len(candles) > 0: + last_candle = candles[-1] + time_str = last_candle.get("datetime", last_candle.get("time", "")) + if time_str: + try: + candle_time = datetime.fromisoformat(time_str.replace("Z", "+00:00")) + if latest_time is None or candle_time > latest_time: + latest_time = candle_time + except: + pass + + return latest_time + + def should_reanalyze(self, symbol: str, cache: AIAnalysisCache) -> bool: + """判断是否需要重新分析""" + if not cache: + return True + + # 1. 检查时间是否超过15分钟 + time_since_analysis = (datetime.now() - cache.created_at).total_seconds() + if time_since_analysis > 900: # 15分钟 = 900秒 + logger.info(f"分析时间已超过15分钟,需要重新分析") + return True + + # 2. 检查K线数据是否有变化 + current_kline_time = self.get_latest_kline_timestamp(symbol) + if current_kline_time and cache.kline_timestamp: + if current_kline_time > cache.kline_timestamp: + logger.info(f"K线数据已更新(当前: {current_kline_time}, 缓存: {cache.kline_timestamp}),需要重新分析") + return True + + return False + def analyze(self, symbol: str) -> Dict: """执行完整的AI分析流程""" logger.info(f"===== 开始AI分析: {symbol} =====") + # 获取当前K线数据的时间戳 + kline_timestamp = self.get_latest_kline_timestamp(symbol) + logger.info(f"当前K线数据时间戳: {kline_timestamp}") + model = self.get_active_model() if not model: logger.error("未找到激活的AI模型配置") @@ -450,8 +502,8 @@ class AIFuturesAnalyzer: logger.info(f"AI响应解析成功") - cache = self.save_analysis_cache(symbol, analysis_result) - logger.info(f"分析结果已保存到缓存,ID: {cache.id}") + cache = self.save_analysis_cache(symbol, analysis_result, kline_timestamp) + logger.info(f"分析结果已保存到缓存,ID: {cache.id}, K线时间戳: {kline_timestamp}") logger.info(f"===== AI分析完成: {symbol} =====") return { diff --git a/data/buffer.db b/data/buffer.db index 6ef27ad..961cf3b 100644 Binary files a/data/buffer.db and b/data/buffer.db differ diff --git a/update_schema.py b/update_schema.py new file mode 100644 index 0000000..40974a1 --- /dev/null +++ b/update_schema.py @@ -0,0 +1,31 @@ +""" +更新AI分析缓存表结构 +添加 kline_timestamp 字段 +""" +import sqlite3 +from pathlib import Path + +project_root = Path(__file__).parent +ANALYSIS_DB_PATH = project_root / "data" / "futures_analysis.db" + +def add_kline_timestamp_column(): + """添加 kline_timestamp 列""" + conn = sqlite3.connect(str(ANALYSIS_DB_PATH)) + cursor = conn.cursor() + + # 检查列是否已存在 + cursor.execute("PRAGMA table_info(ai_analysis_cache)") + columns = [col[1] for col in cursor.fetchall()] + + if 'kline_timestamp' in columns: + print("✅ kline_timestamp 列已存在") + else: + print("添加 kline_timestamp 列...") + cursor.execute("ALTER TABLE ai_analysis_cache ADD COLUMN kline_timestamp DATETIME") + conn.commit() + print("✅ kline_timestamp 列添加成功") + + conn.close() + +if __name__ == "__main__": + add_kline_timestamp_column()