from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from admin_app.api.panel_session import get_panel_access_cookie from admin_app.core import ( AdminSecurityService, AdminSettings, AuthenticatedStaffContext, AuthenticatedStaffPrincipal, get_admin_settings, ) from admin_app.db.database import get_admin_db_session from admin_app.repositories import AuditLogRepository, StaffAccountRepository, StaffSessionRepository from admin_app.services import AuditService, AuthService from shared.contracts import AdminPermission, StaffRole, permissions_for_role, role_has_permission, role_includes bearer_scheme = HTTPBearer(auto_error=False) def get_settings(request: Request) -> AdminSettings: app_settings = getattr(request.app.state, "admin_settings", None) if isinstance(app_settings, AdminSettings): return app_settings return get_admin_settings() def get_admin_db(db: Session = Depends(get_admin_db_session)) -> Session: return db def get_security_service(settings: AdminSettings = Depends(get_settings)) -> AdminSecurityService: return AdminSecurityService(settings) def get_staff_account_repository(db: Session = Depends(get_admin_db)) -> StaffAccountRepository: return StaffAccountRepository(db) def get_staff_session_repository(db: Session = Depends(get_admin_db)) -> StaffSessionRepository: return StaffSessionRepository(db) def get_audit_log_repository(db: Session = Depends(get_admin_db)) -> AuditLogRepository: return AuditLogRepository(db) def get_audit_service( repository: AuditLogRepository = Depends(get_audit_log_repository), ) -> AuditService: return AuditService(repository) def get_auth_service( account_repository: StaffAccountRepository = Depends(get_staff_account_repository), session_repository: StaffSessionRepository = Depends(get_staff_session_repository), security_service: AdminSecurityService = Depends(get_security_service), audit_service: AuditService = Depends(get_audit_service), ) -> AuthService: return AuthService( account_repository=account_repository, session_repository=session_repository, security_service=security_service, audit_service=audit_service, ) def get_current_staff_context( credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme), auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext: if credentials is None or credentials.scheme.lower() != "bearer": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Autenticacao administrativa obrigatoria.", headers={"WWW-Authenticate": "Bearer"}, ) try: return auth_service.get_authenticated_context(credentials.credentials) except ValueError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token administrativo invalido.", headers={"WWW-Authenticate": "Bearer"}, ) from exc def get_current_panel_staff_context( request: Request, auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext: access_token = get_panel_access_cookie(request) if not access_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web obrigatoria.", ) try: return auth_service.get_authenticated_context(access_token) except ValueError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web invalida.", ) from exc def get_optional_panel_staff_context( request: Request, auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext | None: access_token = get_panel_access_cookie(request) if not access_token: return None try: return auth_service.get_authenticated_context(access_token) except ValueError: return None def get_current_staff_principal( context: AuthenticatedStaffContext = Depends(get_current_staff_context), ) -> AuthenticatedStaffPrincipal: return context.principal def get_current_panel_staff_principal( context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context), ) -> AuthenticatedStaffPrincipal: return context.principal def get_optional_panel_staff_principal( context: AuthenticatedStaffContext | None = Depends(get_optional_panel_staff_context), ) -> AuthenticatedStaffPrincipal | None: if context is None: return None return context.principal def get_current_staff_session_id( context: AuthenticatedStaffContext = Depends(get_current_staff_context), ) -> int: return context.session_id def get_current_panel_staff_session_id( context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context), ) -> int: return context.session_id def require_staff_role(minimum_role: StaffRole): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_includes(current_staff.role, minimum_role): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Acesso administrativo requer papel minimo '{minimum_role.value}'.", ) return current_staff return dependency def require_admin_permission(permission: AdminPermission): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_has_permission(current_staff.role, permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permissao administrativa insuficiente: '{permission.value}'.", ) return current_staff return dependency def require_panel_admin_permission(permission: AdminPermission): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_panel_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_has_permission(current_staff.role, permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permissao administrativa insuficiente: '{permission.value}'.", ) return current_staff return dependency def get_current_staff_permissions( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> tuple[str, ...]: return tuple(permission.value for permission in permissions_for_role(current_staff.role))