You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
5.3 KiB

"""
指数数据查询路由
"""
from typing import List
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import and_
from datetime import date
from app.db.session import get_db
from app.schemas.base import ResponseModel
from app.models.stock_basic import IndexBasic, IndexTrade
from app.core.security import get_current_user
from app.models.user import User
from app.utils.date_utils import parse_date, format_date
router = APIRouter()
@router.get("/list", response_model=ResponseModel)
async def get_index_list(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取指数列表"""
indexes = db.query(IndexBasic).order_by(IndexBasic.code).all()
result = []
for idx in indexes:
result.append({
"code": idx.code,
"name": idx.name,
"component_count": idx.component_count
})
return ResponseModel(data=result)
@router.get("/trade", response_model=ResponseModel)
async def get_index_trade_data(
codes: str = Query(..., description="指数代码列表,逗号分隔"),
start_date: str = Query(..., description="开始日期 YYYYMMDD"),
end_date: str = Query(..., description="结束日期 YYYYMMDD"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取指数交易数据"""
code_list = codes.split(",")
start = parse_date(start_date)
end = parse_date(end_date)
result = {}
for code in code_list:
code = code.strip()
index_basic = db.query(IndexBasic).filter(IndexBasic.code == code).first()
trades = db.query(IndexTrade).filter(
and_(
IndexTrade.index_code == code,
IndexTrade.trade_date >= start,
IndexTrade.trade_date <= end
)
).order_by(IndexTrade.trade_date).all()
trade_list = []
for trade in trades:
trade_list.append({
"trade_date": format_date(trade.trade_date),
"open": float(trade.open) if trade.open else None,
"close": float(trade.close) if trade.close else None,
"high": float(trade.high) if trade.high else None,
"low": float(trade.low) if trade.low else None,
"change_pct": float(trade.change_pct) if trade.change_pct else None,
"volume": trade.volume,
"amount": float(trade.amount) if trade.amount else None,
"total_market_value": float(trade.total_market_value) if trade.total_market_value else None,
"float_market_value": float(trade.float_market_value) if trade.float_market_value else None,
"up_count": trade.up_count,
"down_count": trade.down_count,
"flat_count": trade.flat_count,
"limit_up_count": trade.limit_up_count,
"limit_down_count": trade.limit_down_count,
"suspend_count": trade.suspend_count,
"pe_ratio": float(trade.pe_ratio) if trade.pe_ratio else None,
"pe_median": float(trade.pe_median) if trade.pe_median else None,
"is_new_high": trade.is_new_high,
"is_new_low": trade.is_new_low
})
result[code] = {
"basic": {
"code": index_basic.code if index_basic else code,
"name": index_basic.name if index_basic else None,
"component_count": index_basic.component_count if index_basic else None
},
"trades": trade_list
}
return ResponseModel(data=result)
@router.get("/{code}/chart", response_model=ResponseModel)
async def get_index_chart_data(
code: str,
start_date: str = Query(..., description="开始日期 YYYYMMDD"),
end_date: str = Query(..., description="结束日期 YYYYMMDD"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""获取指数K线图表数据(ECharts格式)"""
start = parse_date(start_date)
end = parse_date(end_date)
trades = db.query(IndexTrade).filter(
and_(
IndexTrade.index_code == code,
IndexTrade.trade_date >= start,
IndexTrade.trade_date <= end
)
).order_by(IndexTrade.trade_date).all()
category_data = []
values = []
volumes = []
for trade in trades:
category_data.append(format_date(trade.trade_date))
values.append([
float(trade.open) if trade.open else 0,
float(trade.close) if trade.close else 0,
float(trade.low) if trade.low else 0,
float(trade.high) if trade.high else 0,
trade.volume if trade.volume else 0
])
volumes.append([
trade.volume if trade.volume else 0,
1 if (trade.close and trade.open and trade.close >= trade.open) else -1
])
index_basic = db.query(IndexBasic).filter(IndexBasic.code == code).first()
return ResponseModel(data={
"code": code,
"name": index_basic.name if index_basic else None,
"component_count": index_basic.component_count if index_basic else None,
"categoryData": category_data,
"values": values,
"volumes": volumes
})