You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
10 KiB

import random
from datetime import datetime, timedelta
from typing import List, Optional
from adapters.base import BaseDataAdapter
from models import (
MarketOverview, MarketIndex, SentimentData, MomentumData, HighLowStock,
PriceDistribution, AIAnalysis, KLineData, NewsItem, HotNews, SentimentTrend, Stock,
RecommendationType, TrendType, SentimentType, TargetPrice,
)
from config import settings
class TencentAdapter(BaseDataAdapter):
def __init__(self):
super().__init__("腾讯财经", settings.TENCENT_API)
async def fetch_market_overview(self) -> MarketOverview:
return MarketOverview(
up_count=2856 + random.randint(-100, 100),
down_count=1987 + random.randint(-100, 100),
flat_count=157 + random.randint(-20, 20),
limit_up_count=68 + random.randint(-10, 10),
limit_down_count=12 + random.randint(-5, 5),
update_time=datetime.now().isoformat(),
)
async def fetch_market_indices(self) -> List[MarketIndex]:
indices_config = [
{"code": "000001", "name": "上证指数", "base": 3052.58},
{"code": "399001", "name": "深证成指", "base": 9856.32},
{"code": "399006", "name": "创业板指", "base": 1856.28},
{"code": "000688", "name": "科创50", "base": 856.32},
{"code": "899050", "name": "北证50", "base": 756.85},
]
result = []
for idx in indices_config:
price = idx["base"] * (1 + random.uniform(-0.02, 0.02))
change_percent = random.uniform(-1.5, 1.5)
result.append(MarketIndex(
code=idx["code"], name=idx["name"], price=round(price, 2),
change=round(price * change_percent / 100, 2),
change_percent=round(change_percent, 2),
volume=random.randint(1000000000, 5000000000),
up_count=random.randint(800, 1500), down_count=random.randint(500, 1000),
flat_count=random.randint(10, 100),
))
return result
async def fetch_sentiment(self) -> SentimentData:
value = random.randint(30, 70)
if value >= 70:
label, description, color = "乐观", "市场情绪高涨", "#22c55e"
elif value >= 50:
label, description, color = "中性", "市场情绪平稳", "#f97316"
elif value >= 30:
label, description, color = "谨慎", "市场观望情绪浓厚", "#eab308"
else:
label, description, color = "悲观", "市场信心不足", "#ef4444"
return SentimentData(value=value, label=label, description=description, color=color)
async def fetch_sentiment_trend(self, days: int = 15) -> List[SentimentTrend]:
result = []
base_date = datetime.now() - timedelta(days=days)
for i in range(days):
date = base_date + timedelta(days=i)
result.append(SentimentTrend(date=date.strftime("%m-%d"), value=random.randint(35, 65)))
return result
async def fetch_momentum_data(self) -> List[MomentumData]:
sectors = ["半导体", "新能源", "人工智能", "医药生物", "消费电子", "银行", "房地产", "钢铁", "煤炭", "石油石化"]
result = []
for sector in sectors:
change = random.uniform(-3, 5)
momentum = random.uniform(15, 95)
result.append(MomentumData(
sector=sector, change_percent=round(change, 2), momentum=round(momentum, 1),
stocks=[], up_count=random.randint(10, 60), down_count=random.randint(5, 30),
flat_count=random.randint(0, 5), top_stocks=[],
))
return result
async def fetch_high_stocks(self, limit: int = 10) -> List[HighLowStock]:
result = []
for i in range(limit):
price = random.uniform(50, 500)
result.append(HighLowStock(
code=f"688{random.randint(100, 999)}", name=f"新高股票{i+1}",
price=round(price, 2), change_percent=round(random.uniform(3, 10), 2),
high_low_price=round(price, 2), industry="示例行业", days_to_high_low=random.randint(0, 5),
))
return result
async def fetch_low_stocks(self, limit: int = 10) -> List[HighLowStock]:
result = []
for i in range(limit):
price = random.uniform(5, 50)
result.append(HighLowStock(
code=f"000{random.randint(100, 999)}", name=f"新低股票{i+1}",
price=round(price, 2), change_percent=round(random.uniform(-5, -1), 2),
high_low_price=round(price, 2), industry="示例行业", days_to_high_low=random.randint(0, 7),
))
return result
async def fetch_price_distribution(self) -> List[PriceDistribution]:
ranges = [
(">10%", True, 68), ("9%~10%", True, 45), ("8%~9%", True, 52),
("7%~8%", True, 78), ("6%~7%", True, 96), ("5%~6%", True, 128),
("4%~5%", True, 186), ("3%~4%", True, 268), ("2%~3%", True, 385),
("1%~2%", True, 528), ("0~1%", True, 1022), ("平盘", False, 157),
("-1%~0", False, 856), ("-2%~-1%", False, 568), ("-3%~-2%", False, 325),
("-4%~-3%", False, 128), ("-5%~-4%", False, 56), ("-6%~-5%", False, 28),
("-7%~-6%", False, 15), ("-8%~-7%", False, 8), ("-9%~-8%", False, 5),
("-10%~-9%", False, 3), ("< -10%", False, 12),
]
return [PriceDistribution(range=r, count=c + random.randint(-20, 20), is_up=u) for r, u, c in ranges]
async def fetch_stock_detail(self, code: str) -> Optional[Stock]:
return Stock(
code=code, name="示例股票", price=random.uniform(10, 100),
change=random.uniform(-5, 5), change_percent=random.uniform(-5, 5),
volume=random.randint(100000, 1000000), market_cap=random.randint(1000000000, 100000000000),
industry="示例行业",
)
async def fetch_kline_data(self, code: str, days: int = 30) -> List[KLineData]:
result = []
base_date = datetime.now() - timedelta(days=days)
base_price = random.uniform(8, 12)
for i in range(days):
date = base_date + timedelta(days=i)
if date.weekday() >= 5:
continue
daily_change = random.uniform(-0.15, 0.15)
open_price = base_price
close_price = base_price * (1 + daily_change)
result.append(KLineData(
date=date.strftime("%Y-%m-%d"), open=round(open_price, 2),
high=round(max(open_price, close_price) * 1.02, 2),
low=round(min(open_price, close_price) * 0.98, 2),
close=round(close_price, 2), volume=random.randint(500000, 2000000),
))
base_price = close_price
return result
async def fetch_ai_analysis(self, code: str) -> AIAnalysis:
current_price = random.uniform(8, 12)
trend = random.choice([TrendType.UP, TrendType.DOWN, TrendType.SIDEWAYS])
recommendation = random.choice([RecommendationType.BUY, RecommendationType.SELL, RecommendationType.HOLD])
return AIAnalysis(
stock_code=code, stock_name="示例股票", current_price=round(current_price, 2),
change_percent=round(random.uniform(-8, 8), 2), insights="腾讯财经数据分析...",
recommendation=recommendation,
recommendation_text={"buy": "买入", "sell": "卖出", "hold": "持有"}[recommendation.value],
trend=trend, trend_text={"up": "上涨", "down": "下跌", "sideways": "震荡"}[trend.value],
target_price=TargetPrice(
ideal_buy=round(current_price * 0.92, 2),
second_buy=round(current_price * 0.88, 2),
stop_loss=round(current_price * 0.85, 2),
take_profit=round(current_price * 1.18, 2),
),
confidence=random.randint(60, 85),
)
async def fetch_news(self, limit: int = 10) -> List[NewsItem]:
result = []
for i in range(limit):
sentiment = random.choice([SentimentType.POSITIVE, SentimentType.NEGATIVE, SentimentType.NEUTRAL])
result.append(NewsItem(
id=str(i + 1), title=f"腾讯新闻{i+1}", source="腾讯财经",
time=f"{random.randint(1, 24)}小时前", sentiment=sentiment,
sentiment_text={"positive": "正面", "negative": "负面", "neutral": "中性"}[sentiment.value],
))
return result
async def fetch_hot_news(self, limit: int = 10) -> List[HotNews]:
result = []
for i in range(limit):
result.append(HotNews(
id=str(i + 1), title=f"腾讯热点{i+1}",
heat=random.randint(6000, 10000), related_stocks=[],
time=f"{random.randint(15, 120)}分钟前",
))
return result
async def fetch_hot_stocks(self, limit: int = 10) -> List[Stock]:
result = []
for i in range(limit):
price = random.uniform(50, 500)
change_percent = random.uniform(3, 10)
result.append(Stock(
code=f"688{random.randint(100, 999)}", name=f"热门股票{i+1}",
price=round(price, 2), change=round(price * change_percent / 100, 2),
change_percent=round(change_percent, 2),
volume=random.randint(100000, 2000000),
market_cap=random.randint(10000000000, 1000000000000), industry="热门行业",
))
return result
async def fetch_cold_stocks(self, limit: int = 10) -> List[Stock]:
result = []
for i in range(limit):
price = random.uniform(5, 50)
change_percent = random.uniform(-5, -1)
result.append(Stock(
code=f"000{random.randint(100, 999)}", name=f"冷门股票{i+1}",
price=round(price, 2), change=round(price * change_percent / 100, 2),
change_percent=round(change_percent, 2),
volume=random.randint(500000, 3000000),
market_cap=random.randint(100000000000, 1500000000000), industry="冷门行业",
))
return result
async def is_available(self) -> bool:
return True