You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from typing import List, Optional
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import and_
import pandas as pd
import logging
from app.models import KlineDaily, KlineWeekly, KlineIntraday, ContractInfo
from app.services.datasource.manager import DataSourceManager
from app.database import SessionLocal
logger = logging.getLogger(__name__)
class KlineService:
"""K线数据服务负责从数据源拉取数据、存储到数据库、查询缓存"""
def __init__(self):
self.manager = DataSourceManager()
def _ensure_source(self):
"""确保数据源已加载"""
source = self.manager.get_primary_source()
if not source:
raise Exception("没有可用的数据源,请先在管理后台配置并启用数据源")
return source
# ========== 同步数据 ==========
def sync_daily(
self,
symbol: str,
start_date: str,
end_date: str
) -> int:
"""同步日K线数据到数据库"""
logger.info(f"[同步日K线] 开始同步 symbol={symbol}, start_date={start_date}, end_date={end_date}")
source = self._ensure_source()
logger.info(f"[同步日K线] 使用数据源: {source.name}")
df = source.get_kline_daily(symbol, start_date, end_date)
logger.info(f"[同步日K线] 从数据源获取到 {len(df)} 条记录")
if df.empty:
logger.warning(f"[同步日K线] 数据源返回空数据symbol={symbol}")
return 0
db = SessionLocal()
count = 0
try:
for _, row in df.iterrows():
kline = db.query(KlineDaily).filter(
and_(
KlineDaily.symbol == symbol,
KlineDaily.trade_date == row["trade_date"]
)
).first()
if kline:
kline.open = row.get("open")
kline.high = row.get("high")
kline.low = row.get("low")
kline.close = row.get("close")
kline.volume = row.get("volume")
kline.turnover = row.get("turnover")
kline.open_interest = row.get("open_interest")
kline.settle = row.get("settle")
kline.pre_settle = row.get("pre_settle")
kline.updated_at = datetime.utcnow()
else:
kline = KlineDaily(
symbol=symbol,
trade_date=row["trade_date"],
open=row.get("open"),
high=row.get("high"),
low=row.get("low"),
close=row.get("close"),
volume=row.get("volume"),
turnover=row.get("turnover"),
open_interest=row.get("open_interest"),
settle=row.get("settle"),
pre_settle=row.get("pre_settle"),
)
db.add(kline)
count += 1
db.commit()
logger.info(f"[同步日K线] 成功同步 {count} 条记录到数据库")
except Exception as e:
db.rollback()
logger.error(f"[同步日K线] 同步失败: {e}", exc_info=True)
raise
finally:
db.close()
return count
def sync_weekly(
self,
symbol: str,
start_date: str,
end_date: str
) -> int:
"""同步周K线数据"""
source = self._ensure_source()
df = source.get_kline_weekly(symbol, start_date, end_date)
if df.empty:
return 0
db = SessionLocal()
count = 0
try:
for _, row in df.iterrows():
kline = db.query(KlineWeekly).filter(
and_(
KlineWeekly.symbol == symbol,
KlineWeekly.trade_date == row["trade_date"]
)
).first()
if kline:
kline.open = row.get("open")
kline.high = row.get("high")
kline.low = row.get("low")
kline.close = row.get("close")
kline.volume = row.get("volume")
kline.turnover = row.get("turnover")
kline.open_interest = row.get("open_interest")
kline.updated_at = datetime.utcnow()
else:
kline = KlineWeekly(
symbol=symbol,
trade_date=row["trade_date"],
open=row.get("open"),
high=row.get("high"),
low=row.get("low"),
close=row.get("close"),
volume=row.get("volume"),
turnover=row.get("turnover"),
open_interest=row.get("open_interest"),
)
db.add(kline)
count += 1
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
return count
def sync_intraday(
self,
symbol: str,
period: str,
start_date: str,
end_date: str
) -> int:
"""同步分钟级K线数据"""
source = self._ensure_source()
df = source.get_kline_intraday(symbol, period, start_date, end_date)
if df.empty:
return 0
db = SessionLocal()
count = 0
try:
for _, row in df.iterrows():
kline = db.query(KlineIntraday).filter(
and_(
KlineIntraday.symbol == symbol,
KlineIntraday.period == period,
KlineIntraday.trade_time == row["trade_time"]
)
).first()
if kline:
kline.open = row.get("open")
kline.high = row.get("high")
kline.low = row.get("low")
kline.close = row.get("close")
kline.volume = row.get("volume")
kline.turnover = row.get("turnover")
kline.open_interest = row.get("open_interest")
kline.updated_at = datetime.utcnow()
else:
kline = KlineIntraday(
symbol=symbol,
period=period,
trade_time=row["trade_time"],
open=row.get("open"),
high=row.get("high"),
low=row.get("low"),
close=row.get("close"),
volume=row.get("volume"),
turnover=row.get("turnover"),
open_interest=row.get("open_interest"),
)
db.add(kline)
count += 1
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
return count
# ========== 查询数据 ==========
def get_kline(
self,
symbol: str,
period: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 500
) -> List[dict]:
"""查询K线数据优先查库如果数据库没有数据则自动同步"""
logger.info(f"[查询K线] 开始查询 symbol={symbol}, period={period}, start_date={start_date}, end_date={end_date}, limit={limit}")
db = SessionLocal()
try:
if period == "daily":
items = self._query_daily(db, symbol, start_date, end_date, limit)
elif period == "weekly":
items = self._query_weekly(db, symbol, start_date, end_date, limit)
else:
items = self._query_intraday(db, symbol, period, start_date, end_date, limit)
# 如果数据库中没有数据,自动同步
if len(items) == 0:
logger.info(f"[查询K线] 数据库中没有 {symbol}{period} K线数据开始自动同步")
try:
sync_start = start_date or "2020-01-01"
sync_end = end_date or datetime.now().strftime("%Y-%m-%d")
logger.info(f"[查询K线] 自动同步日期范围: {sync_start} ~ {sync_end}")
if period == "daily":
count = self.sync_daily(symbol, sync_start, sync_end)
elif period == "weekly":
count = self.sync_weekly(symbol, sync_start, sync_end)
else:
count = self.sync_intraday(symbol, period, sync_start, sync_end)
if count > 0:
logger.info(f"[查询K线] 自动同步成功,共同步 {count} 条记录,重新查询数据库")
# 重新查询数据库获取同步后的数据
if period == "daily":
items = self._query_daily(db, symbol, start_date, end_date, limit)
elif period == "weekly":
items = self._query_weekly(db, symbol, start_date, end_date, limit)
else:
items = self._query_intraday(db, symbol, period, start_date, end_date, limit)
else:
logger.warning(f"[查询K线] 自动同步完成,但数据源返回空数据")
except Exception as e:
logger.error(f"[查询K线] 自动同步失败: {e}", exc_info=True)
# 同步失败不影响查询,继续返回空结果
return items
finally:
db.close()
def _query_daily(self, db: Session, symbol: str, start_date: str, end_date: str, limit: int) -> List[dict]:
logger.info(f"[查询日K线] symbol={symbol}, start_date={start_date}, end_date={end_date}, limit={limit}")
query = db.query(KlineDaily).filter(KlineDaily.symbol == symbol)
if start_date:
query = query.filter(KlineDaily.trade_date >= start_date)
if end_date:
query = query.filter(KlineDaily.trade_date <= end_date)
query = query.order_by(KlineDaily.trade_date.desc()).limit(limit)
items = query.all()
logger.info(f"[查询日K线] 从数据库查询到 {len(items)} 条记录")
return [
{
"trade_time": item.trade_date,
"open": item.open,
"high": item.high,
"low": item.low,
"close": item.close,
"volume": item.volume,
"turnover": item.turnover,
"open_interest": item.open_interest,
}
for item in items
]
def _query_weekly(self, db: Session, symbol: str, start_date: str, end_date: str, limit: int) -> List[dict]:
query = db.query(KlineWeekly).filter(KlineWeekly.symbol == symbol)
if start_date:
query = query.filter(KlineWeekly.trade_date >= start_date)
if end_date:
query = query.filter(KlineWeekly.trade_date <= end_date)
query = query.order_by(KlineWeekly.trade_date.desc()).limit(limit)
items = query.all()
return [
{
"trade_time": item.trade_date,
"open": item.open,
"high": item.high,
"low": item.low,
"close": item.close,
"volume": item.volume,
"turnover": item.turnover,
"open_interest": item.open_interest,
}
for item in items
]
def _query_intraday(self, db: Session, symbol: str, period: str, start_date: str, end_date: str, limit: int) -> List[dict]:
query = db.query(KlineIntraday).filter(
and_(
KlineIntraday.symbol == symbol,
KlineIntraday.period == period
)
)
if start_date:
query = query.filter(KlineIntraday.trade_time >= start_date)
if end_date:
query = query.filter(KlineIntraday.trade_time <= end_date)
query = query.order_by(KlineIntraday.trade_time.desc()).limit(limit)
items = query.all()
return [
{
"trade_time": item.trade_time,
"open": item.open,
"high": item.high,
"low": item.low,
"close": item.close,
"volume": item.volume,
"turnover": item.turnover,
"open_interest": item.open_interest,
}
for item in items
]
def batch_sync(
self,
symbols: List[str],
period: str,
start_date: str,
end_date: str
) -> dict:
"""批量同步K线数据
Args:
symbols: 合约代码列表
period: 周期 (daily/weekly/5m/15m/30m/60m)
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
Returns:
批量同步结果
"""
results = {
"total": len(symbols),
"success": 0,
"failed": 0,
"total_records": 0,
"details": []
}
for symbol in symbols:
detail = {"symbol": symbol, "status": "success", "records": 0, "error": None}
try:
if period == "daily":
count = self.sync_daily(symbol, start_date, end_date)
elif period == "weekly":
count = self.sync_weekly(symbol, start_date, end_date)
else:
count = self.sync_intraday(symbol, period, start_date, end_date)
detail["records"] = count
results["success"] += 1
results["total_records"] += count
except Exception as e:
detail["status"] = "failed"
detail["error"] = str(e)
results["failed"] += 1
results["details"].append(detail)
return results
kline_service = KlineService()