You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

84 lines
2.4 KiB

"""
认证服务
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional
import jwt
from passlib.context import CryptContext
from app.config import settings
from app.models import User
from app.db.init_db import SQLiteSessionLocal
# 密码加密上下文 - 使用 bcrypt 算法(更安全)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""创建刷新令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[dict]:
"""解码令牌"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def authenticate_user(username: str, password: str) -> Optional[User]:
"""认证用户"""
with SQLiteSessionLocal() as db:
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.password_hash):
return None
if not user.is_active:
return None
return user
def generate_api_key() -> str:
"""生成 API Key"""
return f"ak_{secrets.token_urlsafe(32)}"
def hash_api_key(api_key: str) -> str:
"""哈希 API Key"""
return hashlib.sha256(api_key.encode()).hexdigest()