You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/admin_app/api/dependencies.py

201 lines
6.8 KiB
Python

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from admin_app.api.panel_session import get_panel_access_cookie
from admin_app.core import (
AdminSecurityService,
AdminSettings,
AuthenticatedStaffContext,
AuthenticatedStaffPrincipal,
get_admin_settings,
)
from admin_app.db.database import get_admin_db_session
from admin_app.repositories import AuditLogRepository, StaffAccountRepository, StaffSessionRepository
from admin_app.services import AuditService, AuthService
from shared.contracts import AdminPermission, StaffRole, permissions_for_role, role_has_permission, role_includes
bearer_scheme = HTTPBearer(auto_error=False)
def get_settings(request: Request) -> AdminSettings:
app_settings = getattr(request.app.state, "admin_settings", None)
if isinstance(app_settings, AdminSettings):
return app_settings
return get_admin_settings()
def get_admin_db(db: Session = Depends(get_admin_db_session)) -> Session:
return db
def get_security_service(settings: AdminSettings = Depends(get_settings)) -> AdminSecurityService:
return AdminSecurityService(settings)
def get_staff_account_repository(db: Session = Depends(get_admin_db)) -> StaffAccountRepository:
return StaffAccountRepository(db)
def get_staff_session_repository(db: Session = Depends(get_admin_db)) -> StaffSessionRepository:
return StaffSessionRepository(db)
def get_audit_log_repository(db: Session = Depends(get_admin_db)) -> AuditLogRepository:
return AuditLogRepository(db)
def get_audit_service(
repository: AuditLogRepository = Depends(get_audit_log_repository),
) -> AuditService:
return AuditService(repository)
def get_auth_service(
account_repository: StaffAccountRepository = Depends(get_staff_account_repository),
session_repository: StaffSessionRepository = Depends(get_staff_session_repository),
security_service: AdminSecurityService = Depends(get_security_service),
audit_service: AuditService = Depends(get_audit_service),
) -> AuthService:
return AuthService(
account_repository=account_repository,
session_repository=session_repository,
security_service=security_service,
audit_service=audit_service,
)
def get_current_staff_context(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
auth_service: AuthService = Depends(get_auth_service),
) -> AuthenticatedStaffContext:
if credentials is None or credentials.scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Autenticacao administrativa obrigatoria.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
return auth_service.get_authenticated_context(credentials.credentials)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token administrativo invalido.",
headers={"WWW-Authenticate": "Bearer"},
) from exc
def get_current_panel_staff_context(
request: Request,
auth_service: AuthService = Depends(get_auth_service),
) -> AuthenticatedStaffContext:
access_token = get_panel_access_cookie(request)
if not access_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Sessao administrativa web obrigatoria.",
)
try:
return auth_service.get_authenticated_context(access_token)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Sessao administrativa web invalida.",
) from exc
def get_optional_panel_staff_context(
request: Request,
auth_service: AuthService = Depends(get_auth_service),
) -> AuthenticatedStaffContext | None:
access_token = get_panel_access_cookie(request)
if not access_token:
return None
try:
return auth_service.get_authenticated_context(access_token)
except ValueError:
return None
def get_current_staff_principal(
context: AuthenticatedStaffContext = Depends(get_current_staff_context),
) -> AuthenticatedStaffPrincipal:
return context.principal
def get_current_panel_staff_principal(
context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context),
) -> AuthenticatedStaffPrincipal:
return context.principal
def get_optional_panel_staff_principal(
context: AuthenticatedStaffContext | None = Depends(get_optional_panel_staff_context),
) -> AuthenticatedStaffPrincipal | None:
if context is None:
return None
return context.principal
def get_current_staff_session_id(
context: AuthenticatedStaffContext = Depends(get_current_staff_context),
) -> int:
return context.session_id
def get_current_panel_staff_session_id(
context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context),
) -> int:
return context.session_id
def require_staff_role(minimum_role: StaffRole):
def dependency(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> AuthenticatedStaffPrincipal:
if not role_includes(current_staff.role, minimum_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Acesso administrativo requer papel minimo '{minimum_role.value}'.",
)
return current_staff
return dependency
def require_admin_permission(permission: AdminPermission):
def dependency(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> AuthenticatedStaffPrincipal:
if not role_has_permission(current_staff.role, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permissao administrativa insuficiente: '{permission.value}'.",
)
return current_staff
return dependency
def require_panel_admin_permission(permission: AdminPermission):
def dependency(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_panel_staff_principal),
) -> AuthenticatedStaffPrincipal:
if not role_has_permission(current_staff.role, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permissao administrativa insuficiente: '{permission.value}'.",
)
return current_staff
return dependency
def get_current_staff_permissions(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> tuple[str, ...]:
return tuple(permission.value for permission in permissions_for_role(current_staff.role))