import random from datetime import datetime, timedelta from typing import List, Optional from loguru import logger from adapters.base import BaseDataAdapter from models import ( MarketOverview, MarketIndex, SentimentData, MomentumData, HighLowStock, PriceDistribution, AIAnalysis, KLineData, NewsItem, HotNews, SentimentTrend, Stock, RecommendationType, TrendType, SentimentType, TargetPrice, ) from config import settings class THSAdapter(BaseDataAdapter): def __init__(self): super().__init__("同花顺", settings.THS_API) async def fetch_market_overview(self) -> MarketOverview: return MarketOverview( up_count=2856 + random.randint(-100, 100), down_count=1987 + random.randint(-100, 100), flat_count=157 + random.randint(-20, 20), limit_up_count=68 + random.randint(-10, 10), limit_down_count=12 + random.randint(-5, 5), update_time=datetime.now().isoformat(), ) async def fetch_market_indices(self) -> List[MarketIndex]: indices_config = [ {"code": "000001", "name": "上证指数", "base": 3052.58}, {"code": "399001", "name": "深证成指", "base": 9856.32}, {"code": "399006", "name": "创业板指", "base": 1856.28}, {"code": "000688", "name": "科创50", "base": 856.32}, {"code": "899050", "name": "北证50", "base": 756.85}, ] result = [] for idx in indices_config: price = idx["base"] * (1 + random.uniform(-0.02, 0.02)) change_percent = random.uniform(-1.5, 1.5) result.append(MarketIndex( code=idx["code"], name=idx["name"], price=round(price, 2), change=round(price * change_percent / 100, 2), change_percent=round(change_percent, 2), volume=random.randint(1000000000, 5000000000), up_count=random.randint(800, 1500), down_count=random.randint(500, 1000), flat_count=random.randint(10, 100), )) return result async def fetch_sentiment(self) -> SentimentData: value = random.randint(30, 70) if value >= 70: label, description, color = "乐观", "市场情绪高涨", "#22c55e" elif value >= 50: label, description, color = "中性", "市场情绪平稳", "#f97316" elif value >= 30: label, description, color = "谨慎", "市场观望情绪浓厚", "#eab308" else: label, description, color = "悲观", "市场信心不足", "#ef4444" return SentimentData(value=value, label=label, description=description, color=color) async def fetch_sentiment_trend(self, days: int = 15) -> List[SentimentTrend]: result = [] base_date = datetime.now() - timedelta(days=days) for i in range(days): date = base_date + timedelta(days=i) result.append(SentimentTrend(date=date.strftime("%m-%d"), value=random.randint(35, 65))) return result async def fetch_momentum_data(self) -> List[MomentumData]: sectors = ["半导体", "新能源", "人工智能", "医药生物", "消费电子", "银行", "房地产", "钢铁", "煤炭", "石油石化"] result = [] for sector in sectors: change = random.uniform(-3, 5) momentum = random.uniform(15, 95) result.append(MomentumData( sector=sector, change_percent=round(change, 2), momentum=round(momentum, 1), stocks=[], up_count=random.randint(10, 60), down_count=random.randint(5, 30), flat_count=random.randint(0, 5), top_stocks=[], )) return result async def fetch_high_stocks(self, limit: int = 10) -> List[HighLowStock]: result = [] for i in range(limit): price = random.uniform(50, 500) result.append(HighLowStock( code=f"688{random.randint(100, 999)}", name=f"新高股票{i+1}", price=round(price, 2), change_percent=round(random.uniform(3, 10), 2), high_low_price=round(price, 2), industry="示例行业", days_to_high_low=random.randint(0, 5), )) return result async def fetch_low_stocks(self, limit: int = 10) -> List[HighLowStock]: result = [] for i in range(limit): price = random.uniform(5, 50) result.append(HighLowStock( code=f"000{random.randint(100, 999)}", name=f"新低股票{i+1}", price=round(price, 2), change_percent=round(random.uniform(-5, -1), 2), high_low_price=round(price, 2), industry="示例行业", days_to_high_low=random.randint(0, 7), )) return result async def fetch_price_distribution(self) -> List[PriceDistribution]: ranges = [ (">10%", True, 68), ("9%~10%", True, 45), ("8%~9%", True, 52), ("7%~8%", True, 78), ("6%~7%", True, 96), ("5%~6%", True, 128), ("4%~5%", True, 186), ("3%~4%", True, 268), ("2%~3%", True, 385), ("1%~2%", True, 528), ("0~1%", True, 1022), ("平盘", False, 157), ("-1%~0", False, 856), ("-2%~-1%", False, 568), ("-3%~-2%", False, 325), ("-4%~-3%", False, 128), ("-5%~-4%", False, 56), ("-6%~-5%", False, 28), ("-7%~-6%", False, 15), ("-8%~-7%", False, 8), ("-9%~-8%", False, 5), ("-10%~-9%", False, 3), ("< -10%", False, 12), ] return [PriceDistribution(range=r, count=c + random.randint(-20, 20), is_up=u) for r, u, c in ranges] async def fetch_stock_detail(self, code: str) -> Optional[Stock]: return Stock( code=code, name="示例股票", price=random.uniform(10, 100), change=random.uniform(-5, 5), change_percent=random.uniform(-5, 5), volume=random.randint(100000, 1000000), market_cap=random.randint(1000000000, 100000000000), industry="示例行业", ) async def fetch_kline_data(self, code: str, days: int = 30) -> List[KLineData]: result = [] base_date = datetime.now() - timedelta(days=days) base_price = random.uniform(8, 12) for i in range(days): date = base_date + timedelta(days=i) if date.weekday() >= 5: continue daily_change = random.uniform(-0.15, 0.15) open_price = base_price close_price = base_price * (1 + daily_change) result.append(KLineData( date=date.strftime("%Y-%m-%d"), open=round(open_price, 2), high=round(max(open_price, close_price) * 1.02, 2), low=round(min(open_price, close_price) * 0.98, 2), close=round(close_price, 2), volume=random.randint(500000, 2000000), )) base_price = close_price return result async def fetch_ai_analysis(self, code: str) -> AIAnalysis: current_price = random.uniform(8, 12) trend = random.choice([TrendType.UP, TrendType.DOWN, TrendType.SIDEWAYS]) recommendation = random.choice([RecommendationType.BUY, RecommendationType.SELL, RecommendationType.HOLD]) return AIAnalysis( stock_code=code, stock_name="示例股票", current_price=round(current_price, 2), change_percent=round(random.uniform(-8, 8), 2), insights="该股走势分析...", recommendation=recommendation, recommendation_text={"buy": "买入", "sell": "卖出", "hold": "持有"}[recommendation.value], trend=trend, trend_text={"up": "上涨", "down": "下跌", "sideways": "震荡"}[trend.value], target_price=TargetPrice( ideal_buy=round(current_price * 0.92, 2), second_buy=round(current_price * 0.88, 2), stop_loss=round(current_price * 0.85, 2), take_profit=round(current_price * 1.18, 2), ), confidence=random.randint(60, 85), ) async def fetch_news(self, limit: int = 10) -> List[NewsItem]: result = [] for i in range(limit): sentiment = random.choice([SentimentType.POSITIVE, SentimentType.NEGATIVE, SentimentType.NEUTRAL]) result.append(NewsItem( id=str(i + 1), title=f"新闻标题{i+1}", source="同花顺资讯", time=f"{random.randint(1, 24)}小时前", sentiment=sentiment, sentiment_text={"positive": "正面", "negative": "负面", "neutral": "中性"}[sentiment.value], )) return result async def fetch_hot_news(self, limit: int = 10) -> List[HotNews]: result = [] for i in range(limit): result.append(HotNews( id=str(i + 1), title=f"热点新闻{i+1}", heat=random.randint(6000, 10000), related_stocks=[], time=f"{random.randint(15, 120)}分钟前", )) return result async def fetch_hot_stocks(self, limit: int = 10) -> List[Stock]: result = [] for i in range(limit): price = random.uniform(50, 500) change_percent = random.uniform(3, 10) result.append(Stock( code=f"688{random.randint(100, 999)}", name=f"热门股票{i+1}", price=round(price, 2), change=round(price * change_percent / 100, 2), change_percent=round(change_percent, 2), volume=random.randint(100000, 2000000), market_cap=random.randint(10000000000, 1000000000000), industry="热门行业", )) return result async def fetch_cold_stocks(self, limit: int = 10) -> List[Stock]: result = [] for i in range(limit): price = random.uniform(5, 50) change_percent = random.uniform(-5, -1) result.append(Stock( code=f"000{random.randint(100, 999)}", name=f"冷门股票{i+1}", price=round(price, 2), change=round(price * change_percent / 100, 2), change_percent=round(change_percent, 2), volume=random.randint(500000, 3000000), market_cap=random.randint(100000000000, 1500000000000), industry="冷门行业", )) return result async def is_available(self) -> bool: return True