""" AmazingData 数据服务平台 - 历史数据 API """ from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import Optional, List from backend.models.database import get_db from backend.models.schemas import ( BaseResponse, SingleKlineRequest, BatchStockRequest, BatchFutureRequest, KlineDataResponse ) from backend.services.data_service import data_service from backend.auth.dependencies import get_current_user from backend.models.tables import User router = APIRouter() @router.post("/single", response_model=BaseResponse) async def get_single_kline( request: SingleKlineRequest, current_user: Optional[User] = Depends(get_current_user) ): """获取单只股票/期货K线数据""" result = data_service.get_single_kline( code=request.code, trading_day=request.trading_day, period=request.period.value, save_path=request.save_path ) if "error" in result: return BaseResponse(code=500, message=result["error"], data=result) return BaseResponse(data=result) @router.post("/batch-stocks", response_model=BaseResponse) async def batch_get_stock_kline( request: BatchStockRequest, current_user: Optional[User] = Depends(get_current_user) ): """批量获取股票K线数据""" result = data_service.batch_get_stock_kline( codes=request.codes, trading_days=request.trading_days, save_path=request.save_path, batch_size=request.batch_size ) if "error" in result: return BaseResponse(code=500, message=result["error"], data=result) return BaseResponse(data={"files": result, "count": len(result)}) @router.post("/batch-futures", response_model=BaseResponse) async def batch_get_future_kline( request: BatchFutureRequest, current_user: Optional[User] = Depends(get_current_user) ): """批量获取期货K线数据""" result = data_service.batch_get_future_kline( underlying_codes=request.underlying_codes, use_main_contract=request.use_main_contract, trading_days=request.trading_days, save_path=request.save_path ) if "error" in result: return BaseResponse(code=500, message=result["error"], data=result) return BaseResponse(data={"files": result, "count": len(result)}) @router.get("/stock-codes", response_model=BaseResponse) async def get_stock_codes(current_user: Optional[User] = Depends(get_current_user)): """获取股票代码列表""" codes = data_service.get_stock_codes() return BaseResponse(data={"codes": codes, "count": len(codes)}) @router.get("/future-codes", response_model=BaseResponse) async def get_future_codes(current_user: Optional[User] = Depends(get_current_user)): """获取期货代码列表""" codes = data_service.get_future_codes() return BaseResponse(data={"codes": codes, "count": len(codes)}) @router.get("/trading-days", response_model=BaseResponse) async def get_trading_days( year: Optional[int] = None, month: Optional[int] = None, current_user: Optional[User] = Depends(get_current_user) ): """获取交易日列表(简化实现)""" from datetime import date, timedelta today = date.today() if year: target_date = date(year, month or 1, 1) else: target_date = today # 简单返回最近30天(实际应查询交易日历) trading_days = [] for i in range(30): d = target_date - timedelta(days=i) if d.weekday() < 5: # 排除周末 trading_days.append(d.strftime('%Y%m%d')) return BaseResponse(data={"trading_days": trading_days})