You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
14 KiB

# 数据存储模块
import sqlite3
import json
import os
from datetime import datetime
from typing import Dict, Optional, List
import pandas as pd
from qihuo_analyzer.utils.config_manager import config_manager
class DataStorage:
"""数据存储管理器"""
def __init__(self):
self.db_path = config_manager.db_path
self._init_database()
def _init_database(self):
"""初始化数据库"""
# 确保数据库目录存在
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
# 连接数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建表
# 分析结果表
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
trend TEXT,
probability REAL,
direction TEXT,
cycle TEXT,
atr REAL,
adx REAL,
support REAL,
resistance REAL,
stop_loss REAL,
target_price REAL,
position_size REAL,
risk_ratio REAL,
fund_flow TEXT,
signals TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 历史K线数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS kline_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
duration TEXT NOT NULL,
datetime TEXT NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume INTEGER,
open_interest INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, duration, datetime)
)
''')
# 交易建议表
cursor.execute('''
CREATE TABLE IF NOT EXISTS trade_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
direction TEXT,
entry_price REAL,
stop_loss REAL,
target_price REAL,
position_size REAL,
execution_plan TEXT,
risk_tips TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 风险监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS risk_monitoring (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
current_price REAL,
entry_price REAL,
stop_loss REAL,
target_price REAL,
current_profit REAL,
risk_status TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def save_analysis_result(self, result: Dict) -> bool:
"""保存分析结果"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': result.get('symbol', ''),
'timestamp': result.get('timestamp', datetime.now().isoformat()),
'trend': result.get('trend'),
'probability': result.get('probability'),
'direction': result.get('direction'),
'cycle': result.get('cycle'),
'atr': result.get('atr'),
'adx': result.get('adx'),
'support': result.get('support'),
'resistance': result.get('resistance'),
'stop_loss': result.get('stop_loss'),
'target_price': result.get('target_price'),
'position_size': result.get('position_size'),
'risk_ratio': result.get('risk_ratio'),
'fund_flow': json.dumps(result.get('fund_flow', {})) if result.get('fund_flow') else None,
'signals': json.dumps(result.get('signals', {})) if result.get('signals') else None
}
# 插入数据
cursor.execute('''
INSERT INTO analysis_results (
symbol, timestamp, trend, probability, direction, cycle,
atr, adx, support, resistance, stop_loss, target_price,
position_size, risk_ratio, fund_flow, signals
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['trend'], data['probability'],
data['direction'], data['cycle'], data['atr'], data['adx'],
data['support'], data['resistance'], data['stop_loss'], data['target_price'],
data['position_size'], data['risk_ratio'], data['fund_flow'], data['signals']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存分析结果失败:{e}")
return False
def save_kline_data(self, symbol: str, duration: str, df: pd.DataFrame) -> bool:
"""保存K线数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 批量插入数据
data_to_insert = []
for idx, row in df.iterrows():
data_to_insert.append((
symbol, duration, idx.isoformat(),
row['open'], row['high'], row['low'], row['close'],
row['volume'], row['open_interest']
))
# 使用事务批量插入
if data_to_insert:
cursor.executemany('''
INSERT OR IGNORE INTO kline_data (
symbol, duration, datetime, open, high, low, close, volume, open_interest
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', data_to_insert)
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存K线数据失败{e}")
return False
def save_trade_recommendation(self, recommendation: Dict) -> bool:
"""保存交易建议"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': recommendation.get('symbol', ''),
'timestamp': recommendation.get('timestamp', datetime.now().isoformat()),
'direction': recommendation.get('direction'),
'entry_price': recommendation.get('entry_price'),
'stop_loss': recommendation.get('stop_loss'),
'target_price': recommendation.get('target_price'),
'position_size': recommendation.get('position_size'),
'execution_plan': recommendation.get('execution_plan'),
'risk_tips': recommendation.get('risk_tips')
}
# 插入数据
cursor.execute('''
INSERT INTO trade_recommendations (
symbol, timestamp, direction, entry_price, stop_loss,
target_price, position_size, execution_plan, risk_tips
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['direction'], data['entry_price'],
data['stop_loss'], data['target_price'], data['position_size'],
data['execution_plan'], data['risk_tips']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存交易建议失败:{e}")
return False
def save_risk_monitoring(self, monitoring_data: Dict) -> bool:
"""保存风险监控数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': monitoring_data.get('symbol', ''),
'timestamp': monitoring_data.get('timestamp', datetime.now().isoformat()),
'current_price': monitoring_data.get('current_price'),
'entry_price': monitoring_data.get('entry_price'),
'stop_loss': monitoring_data.get('stop_loss'),
'target_price': monitoring_data.get('target_price'),
'current_profit': monitoring_data.get('current_profit'),
'risk_status': monitoring_data.get('risk_status')
}
# 插入数据
cursor.execute('''
INSERT INTO risk_monitoring (
symbol, timestamp, current_price, entry_price, stop_loss,
target_price, current_profit, risk_status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['current_price'], data['entry_price'],
data['stop_loss'], data['target_price'], data['current_profit'], data['risk_status']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存风险监控数据失败:{e}")
return False
def get_analysis_results(self, symbol: str, limit: int = 100) -> pd.DataFrame:
"""获取分析结果"""
try:
conn = sqlite3.connect(self.db_path)
query = f"""
SELECT * FROM analysis_results
WHERE symbol = ?
ORDER BY timestamp DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=(symbol, limit))
conn.close()
# 解析JSON字段
if not df.empty:
df['fund_flow'] = df['fund_flow'].apply(lambda x: json.loads(x) if x else {})
df['signals'] = df['signals'].apply(lambda x: json.loads(x) if x else {})
return df
except Exception as e:
print(f"获取分析结果失败:{e}")
return pd.DataFrame()
def get_kline_data(self, symbol: str, duration: str, limit: int = 200) -> pd.DataFrame:
"""获取K线数据"""
try:
conn = sqlite3.connect(self.db_path)
query = f"""
SELECT * FROM kline_data
WHERE symbol = ? AND duration = ?
ORDER BY datetime DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=(symbol, duration, limit))
conn.close()
if not df.empty:
# 转换时间格式并设置索引
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime')
df.set_index('datetime', inplace=True)
# 选择需要的列
df = df[['open', 'high', 'low', 'close', 'volume', 'open_interest']]
return df
except Exception as e:
print(f"获取K线数据失败{e}")
return pd.DataFrame()
def get_trade_recommendations(self, symbol: str, status: Optional[str] = None) -> pd.DataFrame:
"""获取交易建议"""
try:
conn = sqlite3.connect(self.db_path)
if status:
query = f"""
SELECT * FROM trade_recommendations
WHERE symbol = ? AND status = ?
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn, params=(symbol, status))
else:
query = f"""
SELECT * FROM trade_recommendations
WHERE symbol = ?
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn, params=(symbol,))
conn.close()
return df
except Exception as e:
print(f"获取交易建议失败:{e}")
return pd.DataFrame()
def update_recommendation_status(self, recommendation_id: int, status: str) -> bool:
"""更新交易建议状态"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE trade_recommendations
SET status = ?
WHERE id = ?
''', (status, recommendation_id))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"更新交易建议状态失败:{e}")
return False
def delete_old_data(self, days: int = 30) -> bool:
"""删除旧数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 计算删除时间点
delete_time = (datetime.now() - pd.Timedelta(days=days)).isoformat()
# 删除旧的分析结果
cursor.execute('DELETE FROM analysis_results WHERE created_at < ?', (delete_time,))
# 删除旧的K线数据
cursor.execute('DELETE FROM kline_data WHERE created_at < ?', (delete_time,))
# 删除旧的交易建议
cursor.execute('DELETE FROM trade_recommendations WHERE created_at < ?', (delete_time,))
# 删除旧的风险监控数据
cursor.execute('DELETE FROM risk_monitoring WHERE created_at < ?', (delete_time,))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"删除旧数据失败:{e}")
return False