|
|
"""
|
|
|
缓存服务
|
|
|
基于 Redis 实现 K 线数据缓存
|
|
|
"""
|
|
|
import json
|
|
|
import logging
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import List, Dict, Optional, Any
|
|
|
import redis.asyncio as redis
|
|
|
|
|
|
from app.config import settings
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class CacheService:
|
|
|
"""
|
|
|
缓存服务
|
|
|
|
|
|
提供 K 线数据的 Redis 缓存功能
|
|
|
- 缓存键格式:kline:{symbol}:{period}:{start}:{end}
|
|
|
- 默认 TTL:300 秒(5 分钟)
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.redis: Optional[redis.Redis] = None
|
|
|
self._connected = False
|
|
|
self.default_ttl = 300 # 5 分钟
|
|
|
|
|
|
async def connect(self) -> bool:
|
|
|
"""连接 Redis"""
|
|
|
try:
|
|
|
self.redis = redis.from_url(settings.REDIS_URL, decode_responses=True)
|
|
|
await self.redis.ping()
|
|
|
self._connected = True
|
|
|
logger.info("Redis cache connected")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Redis connection failed: {e}")
|
|
|
self._connected = False
|
|
|
return False
|
|
|
|
|
|
async def disconnect(self):
|
|
|
"""断开 Redis 连接"""
|
|
|
if self.redis:
|
|
|
await self.redis.close()
|
|
|
self._connected = False
|
|
|
logger.info("Redis cache disconnected")
|
|
|
|
|
|
def _make_key(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
period: str,
|
|
|
start: str,
|
|
|
end: str
|
|
|
) -> str:
|
|
|
"""生成缓存键"""
|
|
|
return f"kline:{symbol}:{period}:{start}:{end}"
|
|
|
|
|
|
async def get_kline(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
period: str,
|
|
|
start: str,
|
|
|
end: str
|
|
|
) -> Optional[List[Dict]]:
|
|
|
"""
|
|
|
从缓存获取 K 线数据
|
|
|
|
|
|
Args:
|
|
|
symbol: 证券代码
|
|
|
period: 周期
|
|
|
start: 开始时间
|
|
|
end: 结束时间
|
|
|
|
|
|
Returns:
|
|
|
K 线数据列表,缓存未命中返回 None
|
|
|
"""
|
|
|
if not self._connected:
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
key = self._make_key(symbol, period, start, end)
|
|
|
data = await self.redis.get(key)
|
|
|
|
|
|
if data:
|
|
|
logger.debug(f"Cache hit: {key}")
|
|
|
return json.loads(data)
|
|
|
|
|
|
logger.debug(f"Cache miss: {key}")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cache get error: {e}")
|
|
|
return None
|
|
|
|
|
|
async def set_kline(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
period: str,
|
|
|
start: str,
|
|
|
end: str,
|
|
|
data: List[Dict],
|
|
|
ttl: Optional[int] = None
|
|
|
) -> bool:
|
|
|
"""
|
|
|
写入 K 线数据到缓存
|
|
|
|
|
|
Args:
|
|
|
symbol: 证券代码
|
|
|
period: 周期
|
|
|
start: 开始时间
|
|
|
end: 结束时间
|
|
|
data: K 线数据
|
|
|
ttl: 过期时间(秒),默认使用 default_ttl
|
|
|
|
|
|
Returns:
|
|
|
是否成功
|
|
|
"""
|
|
|
if not self._connected:
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
key = self._make_key(symbol, period, start, end)
|
|
|
serialized = json.dumps(data, ensure_ascii=False)
|
|
|
ttl = ttl or self.default_ttl
|
|
|
|
|
|
await self.redis.setex(key, ttl, serialized)
|
|
|
logger.debug(f"Cache set: {key} (TTL={ttl}s)")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cache set error: {e}")
|
|
|
return False
|
|
|
|
|
|
async def delete_kline(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
period: str,
|
|
|
start: str,
|
|
|
end: str
|
|
|
) -> bool:
|
|
|
"""删除缓存"""
|
|
|
if not self._connected:
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
key = self._make_key(symbol, period, start, end)
|
|
|
await self.redis.delete(key)
|
|
|
logger.debug(f"Cache deleted: {key}")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cache delete error: {e}")
|
|
|
return False
|
|
|
|
|
|
async def clear_kline_cache(self, symbol: Optional[str] = None) -> int:
|
|
|
"""
|
|
|
清除 K 线缓存
|
|
|
|
|
|
Args:
|
|
|
symbol: 如果指定,只清除该品种的缓存
|
|
|
|
|
|
Returns:
|
|
|
清除的键数量
|
|
|
"""
|
|
|
if not self._connected:
|
|
|
return 0
|
|
|
|
|
|
try:
|
|
|
if symbol:
|
|
|
pattern = f"kline:{symbol}:*"
|
|
|
else:
|
|
|
pattern = "kline:*"
|
|
|
|
|
|
count = 0
|
|
|
async for key in self.redis.scan_iter(match=pattern):
|
|
|
await self.redis.delete(key)
|
|
|
count += 1
|
|
|
|
|
|
logger.info(f"Cleared {count} cache keys (pattern={pattern})")
|
|
|
return count
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cache clear error: {e}")
|
|
|
return 0
|
|
|
|
|
|
async def get_stats(self) -> Dict[str, Any]:
|
|
|
"""获取缓存统计信息"""
|
|
|
if not self._connected:
|
|
|
return {"connected": False}
|
|
|
|
|
|
try:
|
|
|
info = await self.redis.info("stats")
|
|
|
keys_count = await self.redis.dbsize()
|
|
|
|
|
|
# 统计 kline 相关键
|
|
|
kline_keys_count = 0
|
|
|
async for _ in self.redis.scan_iter(match="kline:*"):
|
|
|
kline_keys_count += 1
|
|
|
|
|
|
return {
|
|
|
"connected": True,
|
|
|
"total_keys": keys_count,
|
|
|
"kline_cache_keys": kline_keys_count,
|
|
|
"hits": info.get("keyspace_hits", 0),
|
|
|
"misses": info.get("keyspace_misses", 0)
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Cache stats error: {e}")
|
|
|
return {"connected": False, "error": str(e)}
|
|
|
|
|
|
|
|
|
# 全局缓存服务实例
|
|
|
cache_service = CacheService()
|
|
|
|
|
|
|
|
|
async def get_cache_service() -> CacheService:
|
|
|
"""获取缓存服务实例"""
|
|
|
return cache_service
|