You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 数据存储模块
import sqlite3
import json
import os
from datetime import datetime
from typing import Dict, Optional, List
import pandas as pd
from qihuo_analyzer.utils.config_manager import config_manager
class DataStorage:
"""数据存储管理器"""
def __init__(self):
self.db_path = config_manager.db_path
self._init_database()
def _init_database(self):
"""初始化数据库"""
# 确保数据库目录存在
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
# 连接数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建表
# 分析结果表
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
trend TEXT,
probability REAL,
direction TEXT,
cycle TEXT,
atr REAL,
adx REAL,
support REAL,
resistance REAL,
stop_loss REAL,
target_price REAL,
position_size REAL,
risk_ratio REAL,
fund_flow TEXT,
signals TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 历史K线数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS kline_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
duration TEXT NOT NULL,
datetime TEXT NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume INTEGER,
open_interest INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, duration, datetime)
)
''')
# 交易建议表
cursor.execute('''
CREATE TABLE IF NOT EXISTS trade_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
direction TEXT,
entry_price REAL,
stop_loss REAL,
target_price REAL,
position_size REAL,
execution_plan TEXT,
risk_tips TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# 风险监控表
cursor.execute('''
CREATE TABLE IF NOT EXISTS risk_monitoring (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timestamp TEXT NOT NULL,
current_price REAL,
entry_price REAL,
stop_loss REAL,
target_price REAL,
current_profit REAL,
risk_status TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def save_analysis_result(self, result: Dict) -> bool:
"""保存分析结果"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': result.get('symbol', ''),
'timestamp': result.get('timestamp', datetime.now().isoformat()),
'trend': result.get('trend'),
'probability': result.get('probability'),
'direction': result.get('direction'),
'cycle': result.get('cycle'),
'atr': result.get('atr'),
'adx': result.get('adx'),
'support': result.get('support'),
'resistance': result.get('resistance'),
'stop_loss': result.get('stop_loss'),
'target_price': result.get('target_price'),
'position_size': result.get('position_size'),
'risk_ratio': result.get('risk_ratio'),
'fund_flow': json.dumps(result.get('fund_flow', {})) if result.get('fund_flow') else None,
'signals': json.dumps(result.get('signals', {})) if result.get('signals') else None
}
# 插入数据
cursor.execute('''
INSERT INTO analysis_results (
symbol, timestamp, trend, probability, direction, cycle,
atr, adx, support, resistance, stop_loss, target_price,
position_size, risk_ratio, fund_flow, signals
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['trend'], data['probability'],
data['direction'], data['cycle'], data['atr'], data['adx'],
data['support'], data['resistance'], data['stop_loss'], data['target_price'],
data['position_size'], data['risk_ratio'], data['fund_flow'], data['signals']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存分析结果失败:{e}")
return False
def save_kline_data(self, symbol: str, duration: str, df: pd.DataFrame) -> bool:
"""保存K线数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 批量插入数据
data_to_insert = []
for idx, row in df.iterrows():
data_to_insert.append((
symbol, duration, idx.isoformat(),
row['open'], row['high'], row['low'], row['close'],
row['volume'], row['open_interest']
))
# 使用事务批量插入
if data_to_insert:
cursor.executemany('''
INSERT OR IGNORE INTO kline_data (
symbol, duration, datetime, open, high, low, close, volume, open_interest
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', data_to_insert)
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存K线数据失败{e}")
return False
def save_trade_recommendation(self, recommendation: Dict) -> bool:
"""保存交易建议"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': recommendation.get('symbol', ''),
'timestamp': recommendation.get('timestamp', datetime.now().isoformat()),
'direction': recommendation.get('direction'),
'entry_price': recommendation.get('entry_price'),
'stop_loss': recommendation.get('stop_loss'),
'target_price': recommendation.get('target_price'),
'position_size': recommendation.get('position_size'),
'execution_plan': recommendation.get('execution_plan'),
'risk_tips': recommendation.get('risk_tips')
}
# 插入数据
cursor.execute('''
INSERT INTO trade_recommendations (
symbol, timestamp, direction, entry_price, stop_loss,
target_price, position_size, execution_plan, risk_tips
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['direction'], data['entry_price'],
data['stop_loss'], data['target_price'], data['position_size'],
data['execution_plan'], data['risk_tips']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存交易建议失败:{e}")
return False
def save_risk_monitoring(self, monitoring_data: Dict) -> bool:
"""保存风险监控数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 准备数据
data = {
'symbol': monitoring_data.get('symbol', ''),
'timestamp': monitoring_data.get('timestamp', datetime.now().isoformat()),
'current_price': monitoring_data.get('current_price'),
'entry_price': monitoring_data.get('entry_price'),
'stop_loss': monitoring_data.get('stop_loss'),
'target_price': monitoring_data.get('target_price'),
'current_profit': monitoring_data.get('current_profit'),
'risk_status': monitoring_data.get('risk_status')
}
# 插入数据
cursor.execute('''
INSERT INTO risk_monitoring (
symbol, timestamp, current_price, entry_price, stop_loss,
target_price, current_profit, risk_status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
data['symbol'], data['timestamp'], data['current_price'], data['entry_price'],
data['stop_loss'], data['target_price'], data['current_profit'], data['risk_status']
))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"保存风险监控数据失败:{e}")
return False
def get_analysis_results(self, symbol: str, limit: int = 100) -> pd.DataFrame:
"""获取分析结果"""
try:
conn = sqlite3.connect(self.db_path)
query = f"""
SELECT * FROM analysis_results
WHERE symbol = ?
ORDER BY timestamp DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=(symbol, limit))
conn.close()
# 解析JSON字段
if not df.empty:
df['fund_flow'] = df['fund_flow'].apply(lambda x: json.loads(x) if x else {})
df['signals'] = df['signals'].apply(lambda x: json.loads(x) if x else {})
return df
except Exception as e:
print(f"获取分析结果失败:{e}")
return pd.DataFrame()
def get_kline_data(self, symbol: str, duration: str, limit: int = 200) -> pd.DataFrame:
"""获取K线数据"""
try:
conn = sqlite3.connect(self.db_path)
query = f"""
SELECT * FROM kline_data
WHERE symbol = ? AND duration = ?
ORDER BY datetime DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=(symbol, duration, limit))
conn.close()
if not df.empty:
# 转换时间格式并设置索引
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime')
df.set_index('datetime', inplace=True)
# 选择需要的列
df = df[['open', 'high', 'low', 'close', 'volume', 'open_interest']]
return df
except Exception as e:
print(f"获取K线数据失败{e}")
return pd.DataFrame()
def get_trade_recommendations(self, symbol: str, status: Optional[str] = None) -> pd.DataFrame:
"""获取交易建议"""
try:
conn = sqlite3.connect(self.db_path)
if status:
query = f"""
SELECT * FROM trade_recommendations
WHERE symbol = ? AND status = ?
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn, params=(symbol, status))
else:
query = f"""
SELECT * FROM trade_recommendations
WHERE symbol = ?
ORDER BY timestamp DESC
"""
df = pd.read_sql_query(query, conn, params=(symbol,))
conn.close()
return df
except Exception as e:
print(f"获取交易建议失败:{e}")
return pd.DataFrame()
def update_recommendation_status(self, recommendation_id: int, status: str) -> bool:
"""更新交易建议状态"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE trade_recommendations
SET status = ?
WHERE id = ?
''', (status, recommendation_id))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"更新交易建议状态失败:{e}")
return False
def delete_old_data(self, days: int = 30) -> bool:
"""删除旧数据"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 计算删除时间点
delete_time = (datetime.now() - pd.Timedelta(days=days)).isoformat()
# 删除旧的分析结果
cursor.execute('DELETE FROM analysis_results WHERE created_at < ?', (delete_time,))
# 删除旧的K线数据
cursor.execute('DELETE FROM kline_data WHERE created_at < ?', (delete_time,))
# 删除旧的交易建议
cursor.execute('DELETE FROM trade_recommendations WHERE created_at < ?', (delete_time,))
# 删除旧的风险监控数据
cursor.execute('DELETE FROM risk_monitoring WHERE created_at < ?', (delete_time,))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"删除旧数据失败:{e}")
return False