You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
443 lines
16 KiB
443 lines
16 KiB
|
2 months ago
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
===================================
|
||
|
|
AmazingData Custom Data Source Adapter
|
||
|
|
===================================
|
||
|
|
|
||
|
|
Adapter for AmazingData financial data platform API.
|
||
|
|
Provides stock K-line data, realtime quotes, and market statistics.
|
||
|
|
|
||
|
|
API Documentation: docs/customApi/API.md
|
||
|
|
"""
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import time
|
||
|
|
import requests
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from typing import Optional, List, Dict, Any, Tuple
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
|
||
|
|
from .base import BaseFetcher, DataFetchError, RateLimitError
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class AmazingDataFetcher(BaseFetcher):
|
||
|
|
"""
|
||
|
|
AmazingData data source adapter
|
||
|
|
|
||
|
|
Features:
|
||
|
|
- JWT authentication with auto-refresh
|
||
|
|
- Stock K-line data (daily/minute)
|
||
|
|
- Realtime quotes
|
||
|
|
- Market statistics
|
||
|
|
- Sector rankings
|
||
|
|
|
||
|
|
Configuration (via .env):
|
||
|
|
- AMAZINGDATA_BASE_URL: API base URL (default: http://localhost:8000/api/v1)
|
||
|
|
- AMAZINGDATA_USERNAME: SDK username
|
||
|
|
- AMAZINGDATA_PASSWORD: SDK password
|
||
|
|
- AMAZINGDATA_HOST: SDK server host (default: 140.206.44.234)
|
||
|
|
- AMAZINGDATA_PORT: SDK server port (default: 8600)
|
||
|
|
"""
|
||
|
|
|
||
|
|
name: str = "AmazingDataFetcher"
|
||
|
|
priority: int = 5
|
||
|
|
|
||
|
|
DEFAULT_BASE_URL = "http://localhost:8000/api/v1"
|
||
|
|
DEFAULT_HOST = "140.206.44.234"
|
||
|
|
DEFAULT_PORT = 8600
|
||
|
|
TOKEN_EXPIRY_BUFFER = 300
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
"""Initialize AmazingData fetcher"""
|
||
|
|
self._base_url: Optional[str] = None
|
||
|
|
self._username: Optional[str] = None
|
||
|
|
self._password: Optional[str] = None
|
||
|
|
self._host: Optional[str] = None
|
||
|
|
self._port: Optional[int] = None
|
||
|
|
|
||
|
|
self._token: Optional[str] = None
|
||
|
|
self._token_expires_at: Optional[float] = None
|
||
|
|
|
||
|
|
self._initialized = False
|
||
|
|
self._available = False
|
||
|
|
|
||
|
|
self._init_config()
|
||
|
|
|
||
|
|
def _init_config(self) -> None:
|
||
|
|
"""Load configuration from environment"""
|
||
|
|
try:
|
||
|
|
from src.config import get_config
|
||
|
|
config = get_config()
|
||
|
|
|
||
|
|
self._base_url = getattr(config, 'amazingdata_base_url', None) or self.DEFAULT_BASE_URL
|
||
|
|
self._username = getattr(config, 'amazingdata_username', None)
|
||
|
|
self._password = getattr(config, 'amazingdata_password', None)
|
||
|
|
self._host = getattr(config, 'amazingdata_host', None) or self.DEFAULT_HOST
|
||
|
|
self._port = getattr(config, 'amazingdata_port', None) or self.DEFAULT_PORT
|
||
|
|
|
||
|
|
if self._username and self._password:
|
||
|
|
self._initialized = True
|
||
|
|
self._available = True
|
||
|
|
self.priority = 0
|
||
|
|
logger.info(f"[{self.name}] Initialized with base_url={self._base_url}")
|
||
|
|
else:
|
||
|
|
logger.debug(f"[{self.name}] Not configured (missing username/password)")
|
||
|
|
self._available = False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"[{self.name}] Config load failed: {e}")
|
||
|
|
self._available = False
|
||
|
|
|
||
|
|
def _login(self) -> bool:
|
||
|
|
"""Authenticate and obtain JWT token"""
|
||
|
|
if not self._initialized:
|
||
|
|
return False
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = requests.post(
|
||
|
|
f"{self._base_url}/auth/login",
|
||
|
|
json={"username": self._username, "password": self._password},
|
||
|
|
timeout=30
|
||
|
|
)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
if data.get("code") == 200:
|
||
|
|
token_data = data.get("data", {})
|
||
|
|
self._token = token_data.get("access_token")
|
||
|
|
expires_in = token_data.get("expires_in", 86400)
|
||
|
|
self._token_expires_at = time.time() + expires_in - self.TOKEN_EXPIRY_BUFFER
|
||
|
|
logger.info(f"[{self.name}] Login successful, token valid for {expires_in}s")
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
logger.warning(f"[{self.name}] Login failed: {data.get('message')}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
logger.warning(f"[{self.name}] Login HTTP error: {response.status_code}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"[{self.name}] Login exception: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _get_headers(self) -> Dict[str, str]:
|
||
|
|
"""Get authorization headers"""
|
||
|
|
if not self._token or (self._token_expires_at and time.time() > self._token_expires_at):
|
||
|
|
if not self._login():
|
||
|
|
raise DataFetchError(f"[{self.name}] Authentication failed")
|
||
|
|
|
||
|
|
return {
|
||
|
|
"Authorization": f"Bearer {self._token}",
|
||
|
|
"Content-Type": "application/json"
|
||
|
|
}
|
||
|
|
|
||
|
|
def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
|
||
|
|
"""Make authenticated API request"""
|
||
|
|
headers = self._get_headers()
|
||
|
|
kwargs["headers"] = headers
|
||
|
|
kwargs["timeout"] = kwargs.get("timeout", 60)
|
||
|
|
|
||
|
|
url = f"{self._base_url}{path}"
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = requests.request(method, url, **kwargs)
|
||
|
|
|
||
|
|
if response.status_code == 401:
|
||
|
|
logger.warning(f"[{self.name}] Token expired, re-authenticating...")
|
||
|
|
self._token = None
|
||
|
|
headers = self._get_headers()
|
||
|
|
kwargs["headers"] = headers
|
||
|
|
response = requests.request(method, url, **kwargs)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
return response.json()
|
||
|
|
else:
|
||
|
|
raise DataFetchError(f"[{self.name}] API error {response.status_code}: {response.text}")
|
||
|
|
|
||
|
|
except requests.Timeout:
|
||
|
|
raise DataFetchError(f"[{self.name}] Request timeout")
|
||
|
|
except requests.RequestException as e:
|
||
|
|
raise DataFetchError(f"[{self.name}] Request failed: {e}")
|
||
|
|
|
||
|
|
def _fetch_raw_data(self, stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Fetch raw K-line data from AmazingData
|
||
|
|
|
||
|
|
Args:
|
||
|
|
stock_code: Stock code (e.g., '600519')
|
||
|
|
start_date: Start date (YYYY-MM-DD)
|
||
|
|
end_date: End date (YYYY-MM-DD)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Raw DataFrame with K-line data
|
||
|
|
"""
|
||
|
|
if not self._available:
|
||
|
|
raise DataFetchError(f"[{self.name}] Not configured")
|
||
|
|
|
||
|
|
code_with_suffix = self._add_exchange_suffix(stock_code)
|
||
|
|
start_dt = start_date.replace("-", "")
|
||
|
|
end_dt = end_date.replace("-", "")
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = self._request("GET", "/stock/kline", params={
|
||
|
|
"codes": code_with_suffix,
|
||
|
|
"start_date": start_dt,
|
||
|
|
"end_date": end_dt,
|
||
|
|
"period": "daily"
|
||
|
|
})
|
||
|
|
|
||
|
|
data = response.get("data", {})
|
||
|
|
kline_list = data.get(code_with_suffix, [])
|
||
|
|
|
||
|
|
if not kline_list:
|
||
|
|
raise DataFetchError(f"[{self.name}] No data for {stock_code}")
|
||
|
|
|
||
|
|
df = pd.DataFrame(kline_list)
|
||
|
|
return df
|
||
|
|
|
||
|
|
except DataFetchError:
|
||
|
|
raise
|
||
|
|
except Exception as e:
|
||
|
|
raise DataFetchError(f"[{self.name}] Fetch failed: {e}")
|
||
|
|
|
||
|
|
def _normalize_data(self, df: pd.DataFrame, stock_code: str) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Normalize column names to standard format
|
||
|
|
|
||
|
|
Standard columns: date, open, high, low, close, volume, amount, pct_chg
|
||
|
|
"""
|
||
|
|
column_mapping = {
|
||
|
|
"trade_date": "date",
|
||
|
|
"open": "open",
|
||
|
|
"high": "high",
|
||
|
|
"low": "low",
|
||
|
|
"close": "close",
|
||
|
|
"volume": "volume",
|
||
|
|
"amount": "amount",
|
||
|
|
}
|
||
|
|
|
||
|
|
df = df.copy()
|
||
|
|
|
||
|
|
for old_col, new_col in column_mapping.items():
|
||
|
|
if old_col in df.columns and new_col not in df.columns:
|
||
|
|
df[new_col] = df[old_col]
|
||
|
|
|
||
|
|
if "pct_chg" not in df.columns:
|
||
|
|
if "close" in df.columns and len(df) > 1:
|
||
|
|
df["pct_chg"] = df["close"].pct_change() * 100
|
||
|
|
else:
|
||
|
|
df["pct_chg"] = 0.0
|
||
|
|
|
||
|
|
required_cols = ["date", "open", "high", "low", "close", "volume"]
|
||
|
|
missing_cols = [col for col in required_cols if col not in df.columns]
|
||
|
|
if missing_cols:
|
||
|
|
raise DataFetchError(f"[{self.name}] Missing columns: {missing_cols}")
|
||
|
|
|
||
|
|
return df
|
||
|
|
|
||
|
|
def _add_exchange_suffix(self, stock_code: str) -> str:
|
||
|
|
"""Add exchange suffix to stock code"""
|
||
|
|
code = stock_code.strip()
|
||
|
|
if "." in code:
|
||
|
|
return code
|
||
|
|
|
||
|
|
if code.startswith("6"):
|
||
|
|
return f"{code}.SH"
|
||
|
|
elif code.startswith(("0", "3")):
|
||
|
|
return f"{code}.SZ"
|
||
|
|
elif code.startswith("68"):
|
||
|
|
return f"{code}.SH"
|
||
|
|
else:
|
||
|
|
return f"{code}.SH"
|
||
|
|
|
||
|
|
def get_realtime_quote(self, stock_code: str):
|
||
|
|
"""
|
||
|
|
Get realtime quote for a stock
|
||
|
|
|
||
|
|
Args:
|
||
|
|
stock_code: Stock code
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
UnifiedRealtimeQuote object or None
|
||
|
|
"""
|
||
|
|
if not self._available:
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
code_with_suffix = self._add_exchange_suffix(stock_code)
|
||
|
|
|
||
|
|
today = datetime.now().strftime("%Y%m%d")
|
||
|
|
week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y%m%d")
|
||
|
|
|
||
|
|
response = self._request("GET", "/stock/kline", params={
|
||
|
|
"codes": code_with_suffix,
|
||
|
|
"start_date": week_ago,
|
||
|
|
"end_date": today,
|
||
|
|
"period": "daily"
|
||
|
|
})
|
||
|
|
|
||
|
|
data = response.get("data", {})
|
||
|
|
kline_list = data.get(code_with_suffix, [])
|
||
|
|
|
||
|
|
if not kline_list:
|
||
|
|
return None
|
||
|
|
|
||
|
|
latest = kline_list[-1]
|
||
|
|
prev = kline_list[-2] if len(kline_list) > 1 else latest
|
||
|
|
|
||
|
|
from .realtime_types import UnifiedRealtimeQuote
|
||
|
|
|
||
|
|
close = float(latest.get("close", 0))
|
||
|
|
prev_close = float(prev.get("close", close))
|
||
|
|
change = close - prev_close
|
||
|
|
change_pct = (change / prev_close * 100) if prev_close else 0
|
||
|
|
|
||
|
|
quote = UnifiedRealtimeQuote(
|
||
|
|
code=stock_code,
|
||
|
|
name=None,
|
||
|
|
price=close,
|
||
|
|
open=float(latest.get("open", 0)),
|
||
|
|
high=float(latest.get("high", 0)),
|
||
|
|
low=float(latest.get("low", 0)),
|
||
|
|
prev_close=prev_close,
|
||
|
|
volume=float(latest.get("volume", 0) or 0),
|
||
|
|
amount=float(latest.get("amount", 0) or 0),
|
||
|
|
change_amount=change,
|
||
|
|
change_pct=change_pct,
|
||
|
|
)
|
||
|
|
|
||
|
|
return quote
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"[{self.name}] Realtime quote failed for {stock_code}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_main_indices(self) -> Optional[List[Dict[str, Any]]]:
|
||
|
|
"""Get major market indices realtime data"""
|
||
|
|
if not self._available:
|
||
|
|
return None
|
||
|
|
|
||
|
|
indices_codes = ["000001.SH", "399001.SZ", "399006.SZ", "000698.SH"]
|
||
|
|
|
||
|
|
try:
|
||
|
|
today = datetime.now().strftime("%Y%m%d")
|
||
|
|
week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y%m%d")
|
||
|
|
|
||
|
|
response = self._request("GET", "/stock/kline", params={
|
||
|
|
"codes": ",".join(indices_codes),
|
||
|
|
"start_date": week_ago,
|
||
|
|
"end_date": today,
|
||
|
|
"period": "daily"
|
||
|
|
})
|
||
|
|
|
||
|
|
data = response.get("data", {})
|
||
|
|
result = []
|
||
|
|
|
||
|
|
index_names = {
|
||
|
|
"000001.SH": "上证指数",
|
||
|
|
"399001.SZ": "深证成指",
|
||
|
|
"399006.SZ": "创业板指",
|
||
|
|
"000698.SH": "科创50",
|
||
|
|
}
|
||
|
|
|
||
|
|
for code in indices_codes:
|
||
|
|
kline_list = data.get(code, [])
|
||
|
|
if kline_list:
|
||
|
|
latest = kline_list[-1]
|
||
|
|
prev = kline_list[-2] if len(kline_list) > 1 else latest
|
||
|
|
|
||
|
|
close = float(latest.get("close", 0))
|
||
|
|
prev_close = float(prev.get("close", close))
|
||
|
|
change = close - prev_close
|
||
|
|
change_pct = (change / prev_close * 100) if prev_close else 0
|
||
|
|
|
||
|
|
result.append({
|
||
|
|
"code": code,
|
||
|
|
"name": index_names.get(code, code),
|
||
|
|
"current": close,
|
||
|
|
"change": change,
|
||
|
|
"change_pct": change_pct,
|
||
|
|
"volume": float(latest.get("volume", 0) or 0),
|
||
|
|
"amount": float(latest.get("amount", 0) or 0),
|
||
|
|
})
|
||
|
|
|
||
|
|
return result if result else None
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"[{self.name}] Get indices failed: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_market_stats(self) -> Optional[Dict[str, Any]]:
|
||
|
|
"""Get market up/down statistics"""
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_sector_rankings(self, n: int = 5) -> Optional[Tuple[List[Dict], List[Dict]]]:
|
||
|
|
"""Get sector rankings"""
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_stock_name(self, stock_code: str) -> Optional[str]:
|
||
|
|
"""Get stock name from API"""
|
||
|
|
if not self._available:
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
code_with_suffix = self._add_exchange_suffix(stock_code)
|
||
|
|
|
||
|
|
response = self._request("GET", f"/base/codes/{code_with_suffix}/info")
|
||
|
|
|
||
|
|
data = response.get("data", {})
|
||
|
|
name = data.get("name")
|
||
|
|
|
||
|
|
if name:
|
||
|
|
logger.debug(f"[{self.name}] Got name for {stock_code}: {name}")
|
||
|
|
return name
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"[{self.name}] Get stock name failed: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_kline_chart_data(self, stock_code: str, days: int = 60) -> Optional[Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Get K-line chart data formatted for ECharts
|
||
|
|
|
||
|
|
Args:
|
||
|
|
stock_code: Stock code
|
||
|
|
days: Number of days
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict with categoryData, values, volumes for ECharts
|
||
|
|
"""
|
||
|
|
if not self._available:
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
code_with_suffix = self._add_exchange_suffix(stock_code)
|
||
|
|
end_date = datetime.now().strftime("%Y%m%d")
|
||
|
|
start_date = (datetime.now() - timedelta(days=days * 2)).strftime("%Y%m%d")
|
||
|
|
|
||
|
|
response = self._request("GET", f"/stock/kline/{code_with_suffix}/chart", params={
|
||
|
|
"start_date": start_date,
|
||
|
|
"end_date": end_date,
|
||
|
|
"period": "daily"
|
||
|
|
})
|
||
|
|
|
||
|
|
data = response.get("data", {})
|
||
|
|
|
||
|
|
if data:
|
||
|
|
return {
|
||
|
|
"categoryData": data.get("categoryData", []),
|
||
|
|
"values": data.get("values", []),
|
||
|
|
"volumes": data.get("volumes", []),
|
||
|
|
"stock_name": data.get("stock_name"),
|
||
|
|
}
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"[{self.name}] Get kline chart failed: {e}")
|
||
|
|
return None
|