You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
buffer_platform/app/auth_service.py

130 lines
3.4 KiB

"""
用户权限系统 - 认证服务
"""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from app.user_models import User, Session as UserSession
# 密码加密工具
def hash_password(password: str) -> str:
"""对密码进行哈希加密"""
salt = secrets.token_hex(16)
pwd_hash = hashlib.sha256((salt + password).encode()).hexdigest()
return f"{salt}${pwd_hash}"
def verify_password(password: str, hashed: str) -> bool:
"""验证密码"""
try:
salt, pwd_hash = hashed.split('$')
return hashlib.sha256((salt + password).encode()).hexdigest() == pwd_hash
except:
return False
def generate_token() -> str:
"""生成会话令牌"""
return secrets.token_urlsafe(32)
# 用户操作
def create_user(db: Session, username: str, password: str, email: str = None, role: str = 'user') -> User:
"""创建新用户"""
user = User(
username=username,
password_hash=hash_password(password),
email=email,
role=role,
is_active=True
)
db.add(user)
db.commit()
db.refresh(user)
return user
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""验证用户登录"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not user.is_active:
return None
if not verify_password(password, user.password_hash):
return None
return user
def update_last_login(db: Session, user: User):
"""更新最后登录时间"""
user.last_login = datetime.utcnow()
db.commit()
# 会话操作
def create_session(db: Session, user_id: int, expires_hours: int = 24) -> str:
"""创建用户会话"""
token = generate_token()
session = UserSession(
user_id=user_id,
token=token,
expires_at=datetime.utcnow() + timedelta(hours=expires_hours),
is_valid=True
)
db.add(session)
db.commit()
return token
def validate_session(db: Session, token: str) -> Optional[User]:
"""验证会话令牌"""
session = db.query(UserSession).filter(
UserSession.token == token,
UserSession.is_valid == True,
UserSession.expires_at > datetime.utcnow()
).first()
if not session:
return None
user = db.query(User).filter(User.id == session.user_id).first()
if not user or not user.is_active:
return None
return user
def invalidate_session(db: Session, token: str):
"""使会话失效(登出)"""
session = db.query(UserSession).filter(UserSession.token == token).first()
if session:
session.is_valid = False
db.commit()
def cleanup_expired_sessions(db: Session):
"""清理过期会话"""
db.query(UserSession).filter(
UserSession.expires_at < datetime.utcnow()
).update({"is_valid": False})
db.commit()
# 默认用户创建
def create_default_admin(db: Session):
"""创建默认管理员账户"""
admin = db.query(User).filter(User.username == 'lxy_root').first()
if not admin:
create_user(
db=db,
username='lxy_root',
password='admin123',
email='admin@system.local',
role='admin'
)
print("✓ 默认管理员账户已创建: lxy_root / admin123")