diff --git a/app/adapters/__pycache__/internal_data_service.cpython-311.pyc b/app/adapters/__pycache__/internal_data_service.cpython-311.pyc index e39274a..0c26ca1 100644 Binary files a/app/adapters/__pycache__/internal_data_service.cpython-311.pyc and b/app/adapters/__pycache__/internal_data_service.cpython-311.pyc differ diff --git a/app/adapters/internal_data_service.py b/app/adapters/internal_data_service.py index 5ff0dbf..f408eca 100644 --- a/app/adapters/internal_data_service.py +++ b/app/adapters/internal_data_service.py @@ -3,12 +3,21 @@ 将 AmazingData SDK 的调用封装为内部接口 对外接口不直接调用 SDK,而是通过内部接口调用 """ +import time +import functools import pandas as pd from datetime import datetime, timedelta from typing import List, Optional, Dict, Any from app.core.logger import info, error +# 导入指标统计 +try: + from app.core.metrics import record_internal_call + METRICS_AVAILABLE = True +except ImportError: + METRICS_AVAILABLE = False + # 数据库相关导入(可选,如果数据库未配置则回退到SDK) try: @@ -20,12 +29,43 @@ except ImportError: DB_AVAILABLE = False +def track_internal_call(category: str): + """对内接口调用统计装饰器 + + Args: + category: 接口类别 (market/base/info) + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if not METRICS_AVAILABLE: + return func(*args, **kwargs) + + method_name = func.__name__ + start_time = time.time() + status = "success" + + try: + result = func(*args, **kwargs) + return result + except Exception as e: + status = "error" + raise e + finally: + duration = time.time() - start_time + record_internal_call(category, method_name, status, duration) + + return wrapper + return decorator + + class _MarketDataInternal: """市场数据内部接口 - 封装 _market_data""" def __init__(self, market_data): self._market_data = market_data + @track_internal_call("market") def login(self, username: str, password: str, ip: str, port: str) -> bool: """登录""" try: @@ -39,6 +79,7 @@ class _MarketDataInternal: error(f"[_MarketDataInternal] Login failed: {e}") raise + @track_internal_call("market") def is_login(self) -> bool: """检查登录状态""" try: @@ -46,6 +87,7 @@ class _MarketDataInternal: except Exception: return False + @track_internal_call("market") def query_kline( self, code_list: List[str], @@ -65,6 +107,7 @@ class _MarketDataInternal: error(f"[_MarketDataInternal] Query kline failed: {e}") return {} + @track_internal_call("market") def query_snapshot( self, code_list: List[str], @@ -89,6 +132,7 @@ class _BaseDataInternal: def __init__(self, base_data): self._base_data = base_data + @track_internal_call("base") def get_code_list(self, security_type: str) -> List[str]: """获取代码列表 - 优先从数据库获取,无数据则从SDK获取并缓存""" # 1. 先尝试从数据库获取 @@ -171,6 +215,7 @@ class _BaseDataInternal: finally: db.close() + @track_internal_call("base") def get_future_code_list(self, security_type: str) -> List[str]: """获取期货代码列表""" try: @@ -179,6 +224,7 @@ class _BaseDataInternal: error(f"[_BaseDataInternal] Get future code list failed: {e}") return [] + @track_internal_call("base") def get_code_info(self, security_type: str) -> pd.DataFrame: """获取代码信息""" try: @@ -187,6 +233,7 @@ class _BaseDataInternal: error(f"[_BaseDataInternal] Get code info failed: {e}") return pd.DataFrame() + @track_internal_call("base") def get_calendar(self, market: str) -> List[int]: """获取交易日历 - 优先从数据库获取,无数据则从SDK获取并缓存""" # 1. 先尝试从数据库获取 @@ -270,6 +317,7 @@ class _BaseDataInternal: finally: db.close() + @track_internal_call("base") def get_adj_factor( self, code_list: List[str], @@ -287,6 +335,7 @@ class _BaseDataInternal: error(f"[_BaseDataInternal] Get adj factor failed: {e}") return pd.DataFrame() + @track_internal_call("base") def get_backward_factor( self, code_list: List[str], @@ -304,6 +353,7 @@ class _BaseDataInternal: error(f"[_BaseDataInternal] Get backward factor failed: {e}") return pd.DataFrame() + @track_internal_call("base") def get_etf_pcf(self, code_list: List[str]) -> tuple: """获取ETF申赎数据""" try: @@ -312,6 +362,7 @@ class _BaseDataInternal: error(f"[_BaseDataInternal] Get ETF PCF failed: {e}") return ({}, {}) + @track_internal_call("base") def get_hist_code_list(self, security_type: str) -> pd.DataFrame: """获取历史代码列表""" try: @@ -327,6 +378,7 @@ class _InfoDataInternal: def __init__(self, info_data): self._info_data = info_data + @track_internal_call("info") def get_equity_structure( self, code_list: List[str], @@ -344,6 +396,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get equity structure failed: {e}") return {} + @track_internal_call("info") def get_share_holder( self, code_list: List[str], @@ -368,6 +421,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get share holder failed: {e}") return {} + @track_internal_call("info") def get_holder_num( self, code_list: List[str], @@ -389,6 +443,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get holder num failed: {e}") return {} + @track_internal_call("info") def get_income( self, code_list: List[str], @@ -410,6 +465,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get income failed: {e}") return {} + @track_internal_call("info") def get_balance_sheet( self, code_list: List[str], @@ -431,6 +487,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get balance sheet failed: {e}") return {} + @track_internal_call("info") def get_cash_flow( self, code_list: List[str], @@ -452,6 +509,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get cash flow failed: {e}") return {} + @track_internal_call("info") def get_profit_express( self, code_list: List[str], @@ -473,6 +531,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get profit express failed: {e}") return {} + @track_internal_call("info") def get_profit_notice( self, code_list: List[str], @@ -494,6 +553,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get profit notice failed: {e}") return {} + @track_internal_call("info") def get_margin_summary( self, local_path: str, @@ -513,6 +573,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get margin summary failed: {e}") return pd.DataFrame() + @track_internal_call("info") def get_margin_detail( self, code_list: List[str], @@ -534,6 +595,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get margin detail failed: {e}") return {} + @track_internal_call("info") def get_long_hu_bang( self, code_list: List[str], @@ -555,6 +617,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get long hu bang failed: {e}") return pd.DataFrame() + @track_internal_call("info") def get_block_trading( self, code_list: List[str], @@ -576,6 +639,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get block trading failed: {e}") return pd.DataFrame() + @track_internal_call("info") def get_index_constituent( self, code_list: List[str], @@ -593,6 +657,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get index constituent failed: {e}") return {} + @track_internal_call("info") def get_index_weight( self, code_list: List[str], @@ -614,6 +679,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get index weight failed: {e}") return {} + @track_internal_call("info") def get_fund_share( self, code_list: List[str], @@ -635,6 +701,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get fund share failed: {e}") return {} + @track_internal_call("info") def get_kzz_issuance( self, code_list: List[str], @@ -652,6 +719,7 @@ class _InfoDataInternal: error(f"[_InfoDataInternal] Get kzz issuance failed: {e}") return {} + @track_internal_call("info") def get_history_stock_status( self, code_list: List[str], diff --git a/app/core/__pycache__/metrics.cpython-311.pyc b/app/core/__pycache__/metrics.cpython-311.pyc index 01eaa79..b7c6c33 100644 Binary files a/app/core/__pycache__/metrics.cpython-311.pyc and b/app/core/__pycache__/metrics.cpython-311.pyc differ diff --git a/app/core/metrics.py b/app/core/metrics.py index 1f3b5b3..31c4113 100644 --- a/app/core/metrics.py +++ b/app/core/metrics.py @@ -3,7 +3,36 @@ from contextvars import ContextVar from typing import Callable, Optional import time -from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST +# 尝试导入 prometheus_client,如果不存在则提供空实现 +try: + from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST + PROMETHEUS_AVAILABLE = True +except ImportError: + PROMETHEUS_AVAILABLE = False + # 提供空的指标类 + class _MockMetric: + def __init__(self, *args, **kwargs): + pass + def labels(self, *args, **kwargs): + return self + def inc(self, *args, **kwargs): + pass + def dec(self, *args, **kwargs): + pass + def set(self, *args, **kwargs): + pass + def observe(self, *args, **kwargs): + pass + def info(self, *args, **kwargs): + pass + + Counter = _MockMetric + Histogram = _MockMetric + Gauge = _MockMetric + Info = _MockMetric + generate_latest = lambda: b"" + CONTENT_TYPE_LATEST = "text/plain" + from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware @@ -75,6 +104,25 @@ cache_hit_ratio = Gauge( ['cache_type'] ) +# ============================================ +# 对内接口调用统计(Internal SDK Interface) +# ============================================ + +# 对内接口调用计数器 +internal_calls_total = Counter( + 'internal_calls_total', + 'Total internal SDK interface calls', + ['category', 'method', 'status'] # category: market/base/info, method: 方法名, status: success/error +) + +# 对内接口调用持续时间 +internal_call_duration_seconds = Histogram( + 'internal_call_duration_seconds', + 'Internal SDK interface call duration', + ['category', 'method'], + buckets=[.001, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0, 10.0] +) + # 应用信息 app_info = Info( 'market_data_service', @@ -151,6 +199,31 @@ def set_cache_hit_ratio(cache_type: str, ratio: float): cache_hit_ratio.labels(cache_type=cache_type).set(ratio) +# ============================================ +# 对内接口调用统计函数 +# ============================================ + +def record_internal_call(category: str, method: str, status: str, duration: float): + """记录对内接口调用指标 + + Args: + category: 接口类别 (market/base/info) + method: 方法名 + status: 状态 (success/error) + duration: 调用耗时(秒) + """ + internal_calls_total.labels( + category=category, + method=method, + status=status + ).inc() + + internal_call_duration_seconds.labels( + category=category, + method=method + ).observe(duration) + + def set_app_info(version: str, build_time: str, git_commit: str = ""): """设置应用信息""" app_info.info({