You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
6.8 KiB

"""
AKShare HTTP API 服务
提供股票数据接口
"""
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import akshare as ak
import pandas as pd
from typing import Optional, List
import json
app = FastAPI(
title="AKShare HTTP API",
description="AKShare 数据接口 HTTP 服务",
version="1.0.0"
)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def dataframe_to_records(df: pd.DataFrame) -> List[dict]:
"""将 DataFrame 转换为可 JSON 序列化的记录列表"""
if df is None or df.empty:
return []
# 处理 NaN 值
df = df.replace({pd.NaT: None})
df = df.where(pd.notnull(df), None)
return df.to_dict('records')
@app.get("/")
async def root():
"""健康检查"""
return {
"status": "healthy",
"service": "AKShare HTTP API",
"version": "1.0.0"
}
@app.get("/stock_zh_a_spot")
async def stock_zh_a_spot():
"""
获取 A 股实时行情数据
"""
try:
df = ak.stock_zh_a_spot_em()
records = dataframe_to_records(df)
# 字段映射,统一返回格式
mapped_records = []
for record in records:
mapped_records.append({
"code": record.get("代码"),
"name": record.get("名称"),
"price": record.get("最新价", 0),
"change": record.get("涨跌额", 0),
"change_percent": record.get("涨跌幅", 0),
"volume": record.get("成交量", 0),
"turnover": record.get("成交额", 0),
"open": record.get("开盘价", 0),
"high": record.get("最高价", 0),
"low": record.get("最低价", 0),
"pre_close": record.get("昨收", 0),
"turnover_rate": record.get("换手率", 0),
"amplitude": record.get("振幅", 0),
"market_cap": record.get("总市值", 0),
"pe": record.get("市盈率-动态", 0),
"pb": record.get("市净率", 0),
"industry": record.get("行业", ""),
})
return mapped_records
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据失败: {str(e)}")
@app.get("/stock_zh_a_hist")
async def stock_zh_a_hist(
symbol: str = Query(..., description="股票代码,如 000001"),
period: str = Query("daily", description="周期: daily/weekly/monthly"),
start_date: Optional[str] = Query(None, description="开始日期 YYYYMMDD"),
end_date: Optional[str] = Query(None, description="结束日期 YYYYMMDD"),
adjust: str = Query("qfq", description="复权方式: qfq-前复权, hfq-后复权, 不复权")
):
"""
获取 A 股历史 K 线数据
"""
try:
# 转换周期参数
period_map = {
"daily": "daily",
"weekly": "weekly",
"monthly": "monthly"
}
ak_period = period_map.get(period, "daily")
# 转换复权参数
adjust_map = {
"qfq": "qfq",
"hfq": "hfq",
"": ""
}
ak_adjust = adjust_map.get(adjust, "qfq")
df = ak.stock_zh_a_hist(
symbol=symbol,
period=ak_period,
start_date=start_date or "19700101",
end_date=end_date or "20500101",
adjust=ak_adjust
)
records = dataframe_to_records(df)
# 字段映射
mapped_records = []
for record in records:
mapped_records.append({
"date": record.get("日期"),
"open": record.get("开盘", 0),
"high": record.get("最高", 0),
"low": record.get("最低", 0),
"close": record.get("收盘", 0),
"volume": record.get("成交量", 0),
"turnover": record.get("成交额", 0),
"amplitude": record.get("振幅", 0),
"change": record.get("涨跌幅", 0),
"change_amount": record.get("涨跌额", 0),
"turnover_rate": record.get("换手率", 0),
})
return mapped_records
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据失败: {str(e)}")
@app.get("/stock_zh_index_spot")
async def stock_zh_index_spot():
"""
获取股票指数实时行情
"""
try:
df = ak.index_zh_a_spot_em()
records = dataframe_to_records(df)
mapped_records = []
for record in records:
mapped_records.append({
"code": record.get("代码"),
"name": record.get("名称"),
"price": record.get("最新价", 0),
"change": record.get("涨跌额", 0),
"change_percent": record.get("涨跌幅", 0),
"volume": record.get("成交量", 0),
"turnover": record.get("成交额", 0),
"open": record.get("开盘价", 0),
"high": record.get("最高价", 0),
"low": record.get("最低价", 0),
"pre_close": record.get("昨收", 0),
})
return mapped_records
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据失败: {str(e)}")
@app.get("/stock_sector_spot")
async def stock_sector_spot():
"""
获取板块行情数据
"""
try:
df = ak.stock_board_industry_name_em()
records = dataframe_to_records(df)
mapped_records = []
for record in records:
mapped_records.append({
"code": record.get("代码"),
"name": record.get("名称"),
"change_percent": record.get("涨跌幅", 0),
})
return mapped_records
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据失败: {str(e)}")
@app.get("/stock_sector_cons")
async def stock_sector_cons(
symbol: str = Query(..., description="板块名称")
):
"""
获取板块成分股
"""
try:
df = ak.stock_board_industry_cons_em(symbol=symbol)
records = dataframe_to_records(df)
mapped_records = []
for record in records:
mapped_records.append({
"code": record.get("代码"),
"name": record.get("名称"),
"price": record.get("最新价", 0),
"change_percent": record.get("涨跌幅", 0),
})
return mapped_records
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据失败: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)