From 82a12ff464f5172b8be9a057e6325517f6fb11dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Thu, 26 Mar 2026 16:49:45 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=90=20feat(admin):=20implementar=20ide?= =?UTF-8?q?ntidade=20e=20seguranca=20administrativa?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 + admin_app/api/dependencies.py | 125 +++++++++- admin_app/api/router.py | 5 + admin_app/api/routes/audit.py | 38 ++++ admin_app/api/routes/auth.py | 109 +++++++++ admin_app/api/routes/system.py | 68 +++++- admin_app/api/schemas.py | 79 ++++++- admin_app/core/__init__.py | 23 +- admin_app/core/security.py | 213 ++++++++++++++++++ admin_app/core/settings.py | 82 ++++++- admin_app/db/database.py | 14 +- admin_app/db/models/__init__.py | 5 +- admin_app/db/models/audit_log.py | 26 +++ admin_app/db/models/staff_account.py | 36 +++ admin_app/db/models/staff_session.py | 31 +++ admin_app/repositories/__init__.py | 10 +- .../repositories/audit_log_repository.py | 39 ++++ .../repositories/staff_account_repository.py | 22 ++ .../repositories/staff_session_repository.py | 45 ++++ admin_app/services/__init__.py | 14 +- admin_app/services/audit_service.py | 153 +++++++++++++ admin_app/services/auth_service.py | 210 +++++++++++++++++ .../architecture/admin-credential-strategy.md | 164 ++++++++++++++ .../architecture/monorepo-target-structure.md | 6 +- .../shared-contracts-and-access-hierarchy.md | 1 + shared/contracts/access_control.py | 4 +- tests/test_admin_app_bootstrap.py | 13 ++ tests/test_admin_audit_log_model.py | 26 +++ tests/test_admin_audit_service.py | 76 +++++++ tests/test_admin_auth_service.py | 206 +++++++++++++++++ tests/test_admin_auth_web.py | 150 ++++++++++++ tests/test_admin_authorization_web.py | 100 ++++++++ tests/test_admin_credential_strategy.py | 62 +++++ tests/test_admin_identity_isolation.py | 63 ++++++ tests/test_admin_security_service.py | 56 +++++ tests/test_staff_account_model.py | 49 ++++ tests/test_staff_session_model.py | 24 ++ 37 files changed, 2331 insertions(+), 20 deletions(-) create mode 100644 admin_app/api/routes/audit.py create mode 100644 admin_app/api/routes/auth.py create mode 100644 admin_app/core/security.py create mode 100644 admin_app/db/models/audit_log.py create mode 100644 admin_app/db/models/staff_account.py create mode 100644 admin_app/db/models/staff_session.py create mode 100644 admin_app/repositories/audit_log_repository.py create mode 100644 admin_app/repositories/staff_account_repository.py create mode 100644 admin_app/repositories/staff_session_repository.py create mode 100644 admin_app/services/audit_service.py create mode 100644 admin_app/services/auth_service.py create mode 100644 docs/architecture/admin-credential-strategy.md create mode 100644 tests/test_admin_audit_log_model.py create mode 100644 tests/test_admin_audit_service.py create mode 100644 tests/test_admin_auth_service.py create mode 100644 tests/test_admin_auth_web.py create mode 100644 tests/test_admin_authorization_web.py create mode 100644 tests/test_admin_credential_strategy.py create mode 100644 tests/test_admin_identity_isolation.py create mode 100644 tests/test_admin_security_service.py create mode 100644 tests/test_staff_account_model.py create mode 100644 tests/test_staff_session_model.py diff --git a/README.md b/README.md index 33ff038..b7bed55 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,7 @@ Nesta fase, o produto continua rodando a partir de `app/` sem migracao de import A estrutura alvo detalhada esta em [docs/architecture/monorepo-target-structure.md](docs/architecture/monorepo-target-structure.md). A hierarquia inicial de acesso e os primeiros contratos entre servicos estao em [docs/architecture/shared-contracts-and-access-hierarchy.md](docs/architecture/shared-contracts-and-access-hierarchy.md). A estrategia de deploy independente entre os dois runtimes esta em [docs/architecture/independent-deploy-strategy.md](docs/architecture/independent-deploy-strategy.md). +A estrategia de credenciais das contas administrativas esta em [docs/architecture/admin-credential-strategy.md](docs/architecture/admin-credential-strategy.md). ## Tools Disponiveis @@ -457,3 +458,6 @@ Os proximos ganhos mais valiosos para o projeto sao: - criar uma camada de avaliacao semantica e replay de conversas - integrar o orquestrador com sistemas reais de operacao + + + diff --git a/admin_app/api/dependencies.py b/admin_app/api/dependencies.py index 264ebeb..d4df90a 100644 --- a/admin_app/api/dependencies.py +++ b/admin_app/api/dependencies.py @@ -1,6 +1,20 @@ -from fastapi import Request +from fastapi import Depends, HTTPException, Request, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from sqlalchemy.orm import Session -from admin_app.core.settings import AdminSettings, get_admin_settings +from admin_app.core import ( + AdminSecurityService, + AdminSettings, + AuthenticatedStaffContext, + AuthenticatedStaffPrincipal, + get_admin_settings, +) +from admin_app.db.database import get_admin_db_session +from admin_app.repositories import AuditLogRepository, StaffAccountRepository, StaffSessionRepository +from admin_app.services import AuditService, AuthService +from shared.contracts import AdminPermission, StaffRole, permissions_for_role, role_has_permission, role_includes + +bearer_scheme = HTTPBearer(auto_error=False) def get_settings(request: Request) -> AdminSettings: @@ -8,3 +22,110 @@ def get_settings(request: Request) -> AdminSettings: if isinstance(app_settings, AdminSettings): return app_settings return get_admin_settings() + + +def get_admin_db(db: Session = Depends(get_admin_db_session)) -> Session: + return db + + +def get_security_service(settings: AdminSettings = Depends(get_settings)) -> AdminSecurityService: + return AdminSecurityService(settings) + + +def get_staff_account_repository(db: Session = Depends(get_admin_db)) -> StaffAccountRepository: + return StaffAccountRepository(db) + + +def get_staff_session_repository(db: Session = Depends(get_admin_db)) -> StaffSessionRepository: + return StaffSessionRepository(db) + + +def get_audit_log_repository(db: Session = Depends(get_admin_db)) -> AuditLogRepository: + return AuditLogRepository(db) + + +def get_audit_service( + repository: AuditLogRepository = Depends(get_audit_log_repository), +) -> AuditService: + return AuditService(repository) + + +def get_auth_service( + account_repository: StaffAccountRepository = Depends(get_staff_account_repository), + session_repository: StaffSessionRepository = Depends(get_staff_session_repository), + security_service: AdminSecurityService = Depends(get_security_service), + audit_service: AuditService = Depends(get_audit_service), +) -> AuthService: + return AuthService( + account_repository=account_repository, + session_repository=session_repository, + security_service=security_service, + audit_service=audit_service, + ) + + +def get_current_staff_context( + credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme), + auth_service: AuthService = Depends(get_auth_service), +) -> AuthenticatedStaffContext: + if credentials is None or credentials.scheme.lower() != "bearer": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Autenticacao administrativa obrigatoria.", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + return auth_service.get_authenticated_context(credentials.credentials) + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Token administrativo invalido.", + headers={"WWW-Authenticate": "Bearer"}, + ) from exc + + +def get_current_staff_principal( + context: AuthenticatedStaffContext = Depends(get_current_staff_context), +) -> AuthenticatedStaffPrincipal: + return context.principal + + +def get_current_staff_session_id( + context: AuthenticatedStaffContext = Depends(get_current_staff_context), +) -> int: + return context.session_id + + +def require_staff_role(minimum_role: StaffRole): + def dependency( + current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), + ) -> AuthenticatedStaffPrincipal: + if not role_includes(current_staff.role, minimum_role): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Acesso administrativo requer papel minimo '{minimum_role.value}'.", + ) + return current_staff + + return dependency + + +def require_admin_permission(permission: AdminPermission): + def dependency( + current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), + ) -> AuthenticatedStaffPrincipal: + if not role_has_permission(current_staff.role, permission): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Permissao administrativa insuficiente: '{permission.value}'.", + ) + return current_staff + + return dependency + + +def get_current_staff_permissions( + current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), +) -> tuple[str, ...]: + return tuple(permission.value for permission in permissions_for_role(current_staff.role)) \ No newline at end of file diff --git a/admin_app/api/router.py b/admin_app/api/router.py index 41463ba..a7a97c6 100644 --- a/admin_app/api/router.py +++ b/admin_app/api/router.py @@ -1,6 +1,11 @@ from fastapi import APIRouter +from admin_app.api.routes.audit import router as audit_router +from admin_app.api.routes.auth import router as auth_router from admin_app.api.routes.system import router as system_router +# Agrega as rotas do servico administrativo. api_router = APIRouter() +api_router.include_router(auth_router) api_router.include_router(system_router) +api_router.include_router(audit_router) \ No newline at end of file diff --git a/admin_app/api/routes/audit.py b/admin_app/api/routes/audit.py new file mode 100644 index 0000000..53fd4fb --- /dev/null +++ b/admin_app/api/routes/audit.py @@ -0,0 +1,38 @@ +from fastapi import APIRouter, Depends + +from admin_app.api.dependencies import get_audit_service, require_admin_permission +from admin_app.api.schemas import AdminAuditEntryResponse, AdminAuditListResponse +from admin_app.core import AuthenticatedStaffPrincipal +from admin_app.services import AuditService +from shared.contracts import AdminPermission + +router = APIRouter(prefix="/audit", tags=["audit"]) + + +@router.get("/events", response_model=AdminAuditListResponse) +def list_audit_events( + audit_service: AuditService = Depends(get_audit_service), + _: AuthenticatedStaffPrincipal = Depends( + require_admin_permission(AdminPermission.VIEW_AUDIT_LOGS) + ), +): + events = audit_service.list_recent(limit=50) + return AdminAuditListResponse( + service="orquestrador-admin", + events=[ + AdminAuditEntryResponse( + id=event.id, + actor_staff_account_id=event.actor_staff_account_id, + event_type=event.event_type, + resource_type=event.resource_type, + resource_id=event.resource_id, + outcome=event.outcome, + message=event.message, + payload_json=event.payload_json, + ip_address=event.ip_address, + user_agent=event.user_agent, + created_at=event.created_at, + ) + for event in events + ], + ) \ No newline at end of file diff --git a/admin_app/api/routes/auth.py b/admin_app/api/routes/auth.py new file mode 100644 index 0000000..a35c355 --- /dev/null +++ b/admin_app/api/routes/auth.py @@ -0,0 +1,109 @@ +from fastapi import APIRouter, Depends, HTTPException, Request, status + +from admin_app.api.dependencies import ( + get_auth_service, + get_current_staff_context, + get_current_staff_principal, +) +from admin_app.api.schemas import ( + AdminAuthenticatedStaffResponse, + AdminLoginRequest, + AdminLogoutResponse, + AdminRefreshTokenRequest, + AdminSessionResponse, +) +from admin_app.core import AuthenticatedStaffContext, AuthenticatedStaffPrincipal +from admin_app.services import AuthService + +router = APIRouter(prefix="/auth", tags=["auth"]) + + +def _extract_request_metadata(request: Request) -> tuple[str | None, str | None]: + ip_address = request.client.host if request.client else None + user_agent = request.headers.get("user-agent") + return ip_address, user_agent + + +@router.post("/login", response_model=AdminSessionResponse) +def login( + payload: AdminLoginRequest, + request: Request, + auth_service: AuthService = Depends(get_auth_service), +): + ip_address, user_agent = _extract_request_metadata(request) + session = auth_service.login( + email=payload.email, + password=payload.password, + ip_address=ip_address, + user_agent=user_agent, + ) + if session is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Credenciais administrativas invalidas.", + ) + + return AdminSessionResponse( + session_id=session.session_id, + access_token=session.access_token, + refresh_token=session.refresh_token, + token_type=session.token_type, + expires_in_seconds=session.expires_in_seconds, + staff_account=AdminAuthenticatedStaffResponse(**session.principal.model_dump()), + ) + + +@router.post("/refresh", response_model=AdminSessionResponse) +def refresh( + payload: AdminRefreshTokenRequest, + request: Request, + auth_service: AuthService = Depends(get_auth_service), +): + ip_address, user_agent = _extract_request_metadata(request) + session = auth_service.refresh_session( + refresh_token=payload.refresh_token, + ip_address=ip_address, + user_agent=user_agent, + ) + if session is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Refresh token administrativo invalido.", + ) + + return AdminSessionResponse( + session_id=session.session_id, + access_token=session.access_token, + refresh_token=session.refresh_token, + token_type=session.token_type, + expires_in_seconds=session.expires_in_seconds, + staff_account=AdminAuthenticatedStaffResponse(**session.principal.model_dump()), + ) + + +@router.post("/logout", response_model=AdminLogoutResponse) +def logout( + request: Request, + current_context: AuthenticatedStaffContext = Depends(get_current_staff_context), + auth_service: AuthService = Depends(get_auth_service), +): + ip_address, user_agent = _extract_request_metadata(request) + auth_service.logout( + current_context.session_id, + actor_staff_account_id=current_context.principal.id, + ip_address=ip_address, + user_agent=user_agent, + ) + return AdminLogoutResponse( + service="orquestrador-admin", + status="ok", + message="Sessao administrativa encerrada.", + session_id=current_context.session_id, + ) + + +@router.get("/me", response_model=AdminAuthenticatedStaffResponse) +def current_staff( + current_staff_account: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), +): + return AdminAuthenticatedStaffResponse(**current_staff_account.model_dump()) \ No newline at end of file diff --git a/admin_app/api/routes/system.py b/admin_app/api/routes/system.py index 1c01d62..c694267 100644 --- a/admin_app/api/routes/system.py +++ b/admin_app/api/routes/system.py @@ -1,9 +1,21 @@ from fastapi import APIRouter, Depends -from admin_app.api.dependencies import get_settings -from admin_app.api.schemas import AdminHealthResponse, AdminRootResponse, AdminSystemInfoResponse +from admin_app.api.dependencies import ( + get_current_staff_permissions, + get_settings, + require_admin_permission, +) +from admin_app.api.schemas import ( + AdminCapabilityResponse, + AdminCurrentAccessResponse, + AdminHealthResponse, + AdminRootResponse, + AdminSystemInfoResponse, +) +from admin_app.core import AuthenticatedStaffPrincipal from admin_app.core.settings import AdminSettings from admin_app.services.system_service import SystemService +from shared.contracts import AdminPermission router = APIRouter(tags=["system"]) @@ -22,6 +34,54 @@ def health_check(settings: AdminSettings = Depends(get_settings)): return _build_service(settings).build_health_payload() -@router.get("/system/info", response_model=AdminSystemInfoResponse) -def system_info(settings: AdminSettings = Depends(get_settings)): +@router.get( + "/system/info", + response_model=AdminSystemInfoResponse, +) +def system_info( + settings: AdminSettings = Depends(get_settings), + _: AuthenticatedStaffPrincipal = Depends( + require_admin_permission(AdminPermission.VIEW_SYSTEM) + ), +): return _build_service(settings).build_system_info_payload() + + +@router.get( + "/system/access", + response_model=AdminCurrentAccessResponse, +) +def current_access( + current_staff: AuthenticatedStaffPrincipal = Depends( + require_admin_permission(AdminPermission.VIEW_SYSTEM) + ), + permissions: tuple[str, ...] = Depends(get_current_staff_permissions), +): + return AdminCurrentAccessResponse( + service="orquestrador-admin", + staff_account={ + "id": current_staff.id, + "email": current_staff.email, + "display_name": current_staff.display_name, + "role": current_staff.role, + "is_active": current_staff.is_active, + }, + permissions=list(permissions), + ) + + +@router.get( + "/system/admin-capabilities", + response_model=AdminCapabilityResponse, +) +def admin_capabilities( + current_staff: AuthenticatedStaffPrincipal = Depends( + require_admin_permission(AdminPermission.MANAGE_SETTINGS) + ), +): + return AdminCapabilityResponse( + service="orquestrador-admin", + action="manage_settings", + allowed=True, + role=current_staff.role, + ) diff --git a/admin_app/api/schemas.py b/admin_app/api/schemas.py index 26939e7..26c90f8 100644 --- a/admin_app/api/schemas.py +++ b/admin_app/api/schemas.py @@ -1,4 +1,8 @@ -from pydantic import BaseModel +from datetime import datetime + +from pydantic import BaseModel, Field, field_validator + +from shared.contracts import StaffRole class AdminRootResponse(BaseModel): @@ -21,3 +25,76 @@ class AdminSystemInfoResponse(BaseModel): version: str api_prefix: str debug: bool + + +class AdminAuthenticatedStaffResponse(BaseModel): + id: int + email: str + display_name: str + role: StaffRole + is_active: bool + + +class AdminCurrentAccessResponse(BaseModel): + service: str + staff_account: AdminAuthenticatedStaffResponse + permissions: list[str] + + +class AdminCapabilityResponse(BaseModel): + service: str + action: str + allowed: bool + role: StaffRole + + +class AdminAuditEntryResponse(BaseModel): + id: int + actor_staff_account_id: int | None + event_type: str + resource_type: str + resource_id: str | None + outcome: str + message: str | None + payload_json: dict | None + ip_address: str | None + user_agent: str | None + created_at: datetime + + +class AdminAuditListResponse(BaseModel): + service: str + events: list[AdminAuditEntryResponse] + + +class AdminLoginRequest(BaseModel): + email: str + password: str = Field(min_length=1) + + @field_validator("email") + @classmethod + def validate_email(cls, value: str) -> str: + normalized = value.strip().lower() + if "@" not in normalized or normalized.startswith("@") or normalized.endswith("@"): + raise ValueError("email must be a valid administrative login") + return normalized + + +class AdminRefreshTokenRequest(BaseModel): + refresh_token: str = Field(min_length=1) + + +class AdminSessionResponse(BaseModel): + session_id: int + access_token: str + refresh_token: str + token_type: str + expires_in_seconds: int + staff_account: AdminAuthenticatedStaffResponse + + +class AdminLogoutResponse(BaseModel): + service: str + status: str + message: str + session_id: int \ No newline at end of file diff --git a/admin_app/core/__init__.py b/admin_app/core/__init__.py index 46b00f0..c5d1c39 100644 --- a/admin_app/core/__init__.py +++ b/admin_app/core/__init__.py @@ -1 +1,22 @@ -"""Configuracoes centrais do servico administrativo.""" +"""Configuracoes centrais do servico administrativo.""" + +from admin_app.core.security import ( + AdminAccessTokenClaims, + AdminAuthenticatedSession, + AdminCredentialStrategy, + AdminSecurityService, + AuthenticatedStaffContext, + AuthenticatedStaffPrincipal, +) +from admin_app.core.settings import AdminSettings, get_admin_settings + +__all__ = [ + "AdminAccessTokenClaims", + "AdminAuthenticatedSession", + "AdminCredentialStrategy", + "AdminSecurityService", + "AdminSettings", + "AuthenticatedStaffContext", + "AuthenticatedStaffPrincipal", + "get_admin_settings", +] diff --git a/admin_app/core/security.py b/admin_app/core/security.py new file mode 100644 index 0000000..66abc30 --- /dev/null +++ b/admin_app/core/security.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import secrets +from datetime import datetime, timedelta, timezone + +from pydantic import BaseModel + +from admin_app.core.settings import AdminSettings +from shared.contracts import StaffRole + + +class AdminPasswordPolicy(BaseModel): + hash_scheme: str + hash_iterations: int + min_length: int + require_uppercase: bool + require_lowercase: bool + require_digit: bool + require_symbol: bool + pepper_configured: bool + + +class AdminTokenPolicy(BaseModel): + access_token_ttl_minutes: int + refresh_token_ttl_days: int + refresh_token_bytes: int + issuer: str + + +class AdminBootstrapPolicy(BaseModel): + enabled: bool + email: str | None + display_name: str | None + role: str + password_configured: bool + + +class AdminCredentialStrategy(BaseModel): + password: AdminPasswordPolicy + tokens: AdminTokenPolicy + bootstrap: AdminBootstrapPolicy + + +class AuthenticatedStaffPrincipal(BaseModel): + id: int + email: str + display_name: str + role: StaffRole + is_active: bool + + +class AuthenticatedStaffContext(BaseModel): + principal: AuthenticatedStaffPrincipal + session_id: int + + +class AdminAccessTokenClaims(BaseModel): + sub: str + sid: int + email: str + role: StaffRole + token_type: str + iss: str + iat: int + exp: int + + +class AdminAuthenticatedSession(BaseModel): + session_id: int + access_token: str + refresh_token: str + token_type: str = "bearer" + expires_in_seconds: int + principal: AuthenticatedStaffPrincipal + + +class AdminSecurityService: + def __init__(self, settings: AdminSettings): + self.settings = settings + + def build_credential_strategy(self) -> AdminCredentialStrategy: + return AdminCredentialStrategy( + password=AdminPasswordPolicy( + hash_scheme=self.settings.admin_auth_password_hash_scheme, + hash_iterations=self.settings.admin_auth_password_hash_iterations, + min_length=self.settings.admin_auth_password_min_length, + require_uppercase=self.settings.admin_auth_password_require_uppercase, + require_lowercase=self.settings.admin_auth_password_require_lowercase, + require_digit=self.settings.admin_auth_password_require_digit, + require_symbol=self.settings.admin_auth_password_require_symbol, + pepper_configured=bool(self.settings.admin_auth_password_pepper), + ), + tokens=AdminTokenPolicy( + access_token_ttl_minutes=self.settings.admin_auth_access_token_ttl_minutes, + refresh_token_ttl_days=self.settings.admin_auth_refresh_token_ttl_days, + refresh_token_bytes=self.settings.admin_auth_refresh_token_bytes, + issuer=self.settings.admin_auth_token_issuer, + ), + bootstrap=AdminBootstrapPolicy( + enabled=self.settings.admin_bootstrap_enabled, + email=self.settings.admin_bootstrap_email, + display_name=self.settings.admin_bootstrap_display_name, + role=str(self.settings.admin_bootstrap_role or "admin"), + password_configured=bool(self.settings.admin_bootstrap_password), + ), + ) + + def validate_password_strength(self, password: str) -> None: + if len(password) < self.settings.admin_auth_password_min_length: + raise ValueError("Password does not meet minimum length policy") + if self.settings.admin_auth_password_require_uppercase and not any(char.isupper() for char in password): + raise ValueError("Password must include an uppercase letter") + if self.settings.admin_auth_password_require_lowercase and not any(char.islower() for char in password): + raise ValueError("Password must include a lowercase letter") + if self.settings.admin_auth_password_require_digit and not any(char.isdigit() for char in password): + raise ValueError("Password must include a digit") + if self.settings.admin_auth_password_require_symbol and not any(not char.isalnum() for char in password): + raise ValueError("Password must include a symbol") + + def hash_password(self, password: str) -> str: + self.validate_password_strength(password) + if self.settings.admin_auth_password_hash_scheme != "pbkdf2_sha256": + raise ValueError("Unsupported password hash scheme") + + salt = secrets.token_hex(16) + digest = self._pbkdf2_digest(password=password, salt=salt, iterations=self.settings.admin_auth_password_hash_iterations) + return f"pbkdf2_sha256${self.settings.admin_auth_password_hash_iterations}${salt}${digest}" + + def verify_password(self, password: str, stored_hash: str) -> bool: + try: + scheme, iterations_raw, salt, digest = stored_hash.split("$", 3) + except ValueError: + return False + if scheme != "pbkdf2_sha256": + return False + try: + iterations = int(iterations_raw) + except ValueError: + return False + expected_digest = self._pbkdf2_digest(password=password, salt=salt, iterations=iterations) + return hmac.compare_digest(expected_digest, digest) + + def issue_access_token(self, principal: AuthenticatedStaffPrincipal, session_id: int) -> str: + now = datetime.now(timezone.utc) + expires_at = now + timedelta(minutes=self.settings.admin_auth_access_token_ttl_minutes) + payload = { + "sub": str(principal.id), + "sid": session_id, + "email": principal.email, + "role": principal.role.value, + "token_type": "access", + "iss": self.settings.admin_auth_token_issuer, + "iat": int(now.timestamp()), + "exp": int(expires_at.timestamp()), + } + payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8") + signature = hmac.new(self.settings.admin_auth_token_secret.encode("utf-8"), payload_bytes, hashlib.sha256).digest() + return f"{self._urlsafe_b64encode(payload_bytes)}.{self._urlsafe_b64encode(signature)}" + + def decode_access_token(self, token: str) -> AdminAccessTokenClaims: + try: + encoded_payload, encoded_signature = token.split(".", 1) + except ValueError as exc: + raise ValueError("Invalid token format") from exc + + payload_bytes = self._urlsafe_b64decode(encoded_payload) + provided_signature = self._urlsafe_b64decode(encoded_signature) + expected_signature = hmac.new(self.settings.admin_auth_token_secret.encode("utf-8"), payload_bytes, hashlib.sha256).digest() + if not hmac.compare_digest(provided_signature, expected_signature): + raise ValueError("Invalid token signature") + + payload = json.loads(payload_bytes.decode("utf-8")) + claims = AdminAccessTokenClaims.model_validate(payload) + if claims.iss != self.settings.admin_auth_token_issuer: + raise ValueError("Invalid token issuer") + if claims.token_type != "access": + raise ValueError("Invalid token type") + if claims.exp < int(datetime.now(timezone.utc).timestamp()): + raise ValueError("Expired token") + return claims + + def generate_refresh_token(self) -> str: + return secrets.token_urlsafe(self.settings.admin_auth_refresh_token_bytes) + + def hash_refresh_token(self, refresh_token: str) -> str: + return hmac.new( + self.settings.admin_auth_token_secret.encode("utf-8"), + refresh_token.encode("utf-8"), + hashlib.sha256, + ).hexdigest() + + def build_refresh_token_expiry(self) -> datetime: + return datetime.now(timezone.utc) + timedelta(days=self.settings.admin_auth_refresh_token_ttl_days) + + def _pbkdf2_digest(self, password: str, salt: str, iterations: int) -> str: + material = password + if self.settings.admin_auth_password_pepper: + material = f"{material}{self.settings.admin_auth_password_pepper}" + digest = hashlib.pbkdf2_hmac("sha256", material.encode("utf-8"), salt.encode("utf-8"), iterations) + return digest.hex() + + @staticmethod + def _urlsafe_b64encode(value: bytes) -> str: + return base64.urlsafe_b64encode(value).decode("ascii").rstrip("=") + + @staticmethod + def _urlsafe_b64decode(value: str) -> bytes: + padding = "=" * (-len(value) % 4) + return base64.urlsafe_b64decode(f"{value}{padding}") diff --git a/admin_app/core/settings.py b/admin_app/core/settings.py index f6fb0a4..5c50a1e 100644 --- a/admin_app/core/settings.py +++ b/admin_app/core/settings.py @@ -16,7 +16,6 @@ class AdminSettings(BaseSettings): admin_version: str = "0.1.0" admin_api_prefix: str = "" - # Banco administrativo do runtime interno. admin_db_host: str = "127.0.0.1" admin_db_port: int = 3306 admin_db_user: str = "root" @@ -24,6 +23,26 @@ class AdminSettings(BaseSettings): admin_db_name: str = "orquestrador_admin" admin_db_cloud_sql_connection_name: str | None = None + admin_auth_password_hash_scheme: str = "pbkdf2_sha256" + admin_auth_password_hash_iterations: int = 390000 + admin_auth_password_min_length: int = 12 + admin_auth_password_require_uppercase: bool = True + admin_auth_password_require_lowercase: bool = True + admin_auth_password_require_digit: bool = True + admin_auth_password_require_symbol: bool = True + admin_auth_password_pepper: str | None = None + admin_auth_token_secret: str = "local-admin-token-secret-change-me" + admin_auth_token_issuer: str = "orquestrador-admin" + admin_auth_access_token_ttl_minutes: int = 30 + admin_auth_refresh_token_ttl_days: int = 7 + admin_auth_refresh_token_bytes: int = 32 + + admin_bootstrap_enabled: bool = False + admin_bootstrap_email: str | None = None + admin_bootstrap_display_name: str | None = None + admin_bootstrap_password: str | None = None + admin_bootstrap_role: str = "admin" + @field_validator("admin_debug", mode="before") @classmethod def parse_debug_aliases(cls, value): @@ -35,13 +54,70 @@ class AdminSettings(BaseSettings): return False return value - @field_validator("admin_environment", "admin_api_prefix", mode="before") + @field_validator( + "admin_environment", + "admin_api_prefix", + "admin_auth_password_hash_scheme", + "admin_auth_token_secret", + "admin_auth_token_issuer", + "admin_bootstrap_role", + mode="before", + ) @classmethod - def normalize_text_settings(cls, value): + def normalize_required_text_settings(cls, value): if isinstance(value, str): return value.strip() return value + @field_validator( + "admin_bootstrap_email", + "admin_bootstrap_display_name", + "admin_bootstrap_password", + "admin_auth_password_pepper", + mode="before", + ) + @classmethod + def normalize_optional_text_settings(cls, value): + if isinstance(value, str): + stripped = value.strip() + return stripped or None + return value + + @field_validator("admin_auth_password_min_length") + @classmethod + def validate_password_min_length(cls, value: int) -> int: + if value < 12: + raise ValueError("admin_auth_password_min_length must be >= 12") + return value + + @field_validator("admin_auth_password_hash_iterations") + @classmethod + def validate_password_hash_iterations(cls, value: int) -> int: + if value < 100_000: + raise ValueError("admin_auth_password_hash_iterations must be >= 100000") + return value + + @field_validator("admin_auth_access_token_ttl_minutes") + @classmethod + def validate_access_token_ttl(cls, value: int) -> int: + if value < 5: + raise ValueError("admin_auth_access_token_ttl_minutes must be >= 5") + return value + + @field_validator("admin_auth_refresh_token_ttl_days") + @classmethod + def validate_refresh_token_ttl(cls, value: int) -> int: + if value < 1: + raise ValueError("admin_auth_refresh_token_ttl_days must be >= 1") + return value + + @field_validator("admin_auth_refresh_token_bytes") + @classmethod + def validate_refresh_token_bytes(cls, value: int) -> int: + if value < 16: + raise ValueError("admin_auth_refresh_token_bytes must be >= 16") + return value + @lru_cache(maxsize=1) def get_admin_settings() -> AdminSettings: diff --git a/admin_app/db/database.py b/admin_app/db/database.py index fa939cb..95fc261 100644 --- a/admin_app/db/database.py +++ b/admin_app/db/database.py @@ -1,8 +1,12 @@ +from collections.abc import Generator + from sqlalchemy import create_engine -from sqlalchemy.orm import declarative_base, sessionmaker +from sqlalchemy.orm import Session, declarative_base, sessionmaker from admin_app.core.settings import get_admin_settings +# monta a conexão do banco administrativo e expõe get_admin_db_session(). Esse generator é o que alimenta as dependências FastAPI para repositórios e serviços. + settings = get_admin_settings() admin_cloud_sql = settings.admin_db_cloud_sql_connection_name @@ -30,3 +34,11 @@ AdminSessionLocal = sessionmaker( ) AdminBase = declarative_base() + + +def get_admin_db_session() -> Generator[Session, None, None]: + db = AdminSessionLocal() + try: + yield db + finally: + db.close() diff --git a/admin_app/db/models/__init__.py b/admin_app/db/models/__init__.py index 93e1e84..2746396 100644 --- a/admin_app/db/models/__init__.py +++ b/admin_app/db/models/__init__.py @@ -1,3 +1,6 @@ +from admin_app.db.models.audit_log import AuditLog from admin_app.db.models.base import AdminTimestampedModel +from admin_app.db.models.staff_account import StaffAccount +from admin_app.db.models.staff_session import StaffSession -__all__ = ["AdminTimestampedModel"] +__all__ = ["AdminTimestampedModel", "AuditLog", "StaffAccount", "StaffSession"] diff --git a/admin_app/db/models/audit_log.py b/admin_app/db/models/audit_log.py new file mode 100644 index 0000000..4452a0f --- /dev/null +++ b/admin_app/db/models/audit_log.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from sqlalchemy import JSON, ForeignKey, Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from admin_app.db.models.base import AdminTimestampedModel + + +class AuditLog(AdminTimestampedModel): + __tablename__ = "admin_audit_logs" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_staff_account_id: Mapped[int | None] = mapped_column( + Integer, + ForeignKey("staff_accounts.id"), + nullable=True, + index=True, + ) + event_type: Mapped[str] = mapped_column(String(80), nullable=False, index=True) + resource_type: Mapped[str] = mapped_column(String(80), nullable=False, index=True) + resource_id: Mapped[str | None] = mapped_column(String(120), nullable=True, index=True) + outcome: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + message: Mapped[str | None] = mapped_column(Text, nullable=True) + payload_json: Mapped[dict | None] = mapped_column(JSON, nullable=True) + ip_address: Mapped[str | None] = mapped_column(String(64), nullable=True) + user_agent: Mapped[str | None] = mapped_column(String(512), nullable=True) diff --git a/admin_app/db/models/staff_account.py b/admin_app/db/models/staff_account.py new file mode 100644 index 0000000..928a97c --- /dev/null +++ b/admin_app/db/models/staff_account.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import Boolean, DateTime, Enum as SAEnum, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from admin_app.db.models.base import AdminTimestampedModel +from shared.contracts import StaffRole + +# Modelo da conta administrativa +# Ele representa o usuário interno do painel + +def _staff_role_values(enum_cls) -> list[str]: + return [role.value for role in enum_cls] + + +class StaffAccount(AdminTimestampedModel): + __tablename__ = "staff_accounts" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False) + display_name: Mapped[str] = mapped_column(String(150), nullable=False) + password_hash: Mapped[str] = mapped_column(String(255), nullable=False) + role: Mapped[StaffRole] = mapped_column( + SAEnum( + StaffRole, + native_enum=False, + values_callable=_staff_role_values, + length=32, + ), + nullable=False, + default=StaffRole.VIEWER, + ) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + last_login_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) diff --git a/admin_app/db/models/staff_session.py b/admin_app/db/models/staff_session.py new file mode 100644 index 0000000..a32e396 --- /dev/null +++ b/admin_app/db/models/staff_session.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from admin_app.db.models.base import AdminTimestampedModel + + +class StaffSession(AdminTimestampedModel): + __tablename__ = "staff_sessions" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + staff_account_id: Mapped[int] = mapped_column( + Integer, + ForeignKey("staff_accounts.id"), + nullable=False, + index=True, + ) + refresh_token_hash: Mapped[str] = mapped_column( + String(255), + unique=True, + index=True, + nullable=False, + ) + expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) + last_used_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + revoked_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + ip_address: Mapped[str | None] = mapped_column(String(64), nullable=True) + user_agent: Mapped[str | None] = mapped_column(String(512), nullable=True) diff --git a/admin_app/repositories/__init__.py b/admin_app/repositories/__init__.py index 60eb35b..82290b4 100644 --- a/admin_app/repositories/__init__.py +++ b/admin_app/repositories/__init__.py @@ -1,3 +1,11 @@ +from admin_app.repositories.audit_log_repository import AuditLogRepository from admin_app.repositories.base_repository import BaseRepository +from admin_app.repositories.staff_account_repository import StaffAccountRepository +from admin_app.repositories.staff_session_repository import StaffSessionRepository -__all__ = ["BaseRepository"] +__all__ = [ + "AuditLogRepository", + "BaseRepository", + "StaffAccountRepository", + "StaffSessionRepository", +] diff --git a/admin_app/repositories/audit_log_repository.py b/admin_app/repositories/audit_log_repository.py new file mode 100644 index 0000000..685f726 --- /dev/null +++ b/admin_app/repositories/audit_log_repository.py @@ -0,0 +1,39 @@ +from sqlalchemy import desc, select + +from admin_app.db.models import AuditLog +from admin_app.repositories.base_repository import BaseRepository + + +class AuditLogRepository(BaseRepository): + def create( + self, + *, + actor_staff_account_id: int | None, + event_type: str, + resource_type: str, + resource_id: str | None, + outcome: str, + message: str | None, + payload_json: dict | None, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + audit_log = AuditLog( + actor_staff_account_id=actor_staff_account_id, + event_type=event_type, + resource_type=resource_type, + resource_id=resource_id, + outcome=outcome, + message=message, + payload_json=payload_json, + ip_address=ip_address, + user_agent=user_agent, + ) + self.db.add(audit_log) + self.db.commit() + self.db.refresh(audit_log) + return audit_log + + def list_recent(self, limit: int = 50) -> list[AuditLog]: + statement = select(AuditLog).order_by(desc(AuditLog.created_at)).limit(limit) + return list(self.db.execute(statement).scalars().all()) diff --git a/admin_app/repositories/staff_account_repository.py b/admin_app/repositories/staff_account_repository.py new file mode 100644 index 0000000..77c9616 --- /dev/null +++ b/admin_app/repositories/staff_account_repository.py @@ -0,0 +1,22 @@ +from sqlalchemy import select +from sqlalchemy.orm import Session + +from admin_app.db.models import StaffAccount +from admin_app.repositories.base_repository import BaseRepository + +# encapsula o acesso a StaffAccount (busca por e-mail e id) + +class StaffAccountRepository(BaseRepository): + def get_by_email(self, email: str) -> StaffAccount | None: + statement = select(StaffAccount).where(StaffAccount.email == email) + return self.db.execute(statement).scalar_one_or_none() + + def get_by_id(self, staff_account_id: int) -> StaffAccount | None: + statement = select(StaffAccount).where(StaffAccount.id == staff_account_id) + return self.db.execute(statement).scalar_one_or_none() + + def update_last_login(self, staff_account: StaffAccount) -> StaffAccount: + self.db.add(staff_account) + self.db.commit() + self.db.refresh(staff_account) + return staff_account diff --git a/admin_app/repositories/staff_session_repository.py b/admin_app/repositories/staff_session_repository.py new file mode 100644 index 0000000..945a1ac --- /dev/null +++ b/admin_app/repositories/staff_session_repository.py @@ -0,0 +1,45 @@ +from datetime import datetime + +from sqlalchemy import select + +from admin_app.db.models import StaffSession +from admin_app.repositories.base_repository import BaseRepository + + +class StaffSessionRepository(BaseRepository): + def create( + self, + *, + staff_account_id: int, + refresh_token_hash: str, + expires_at: datetime, + ip_address: str | None, + user_agent: str | None, + ) -> StaffSession: + session = StaffSession( + staff_account_id=staff_account_id, + refresh_token_hash=refresh_token_hash, + expires_at=expires_at, + ip_address=ip_address, + user_agent=user_agent, + ) + self.db.add(session) + self.db.commit() + self.db.refresh(session) + return session + + def get_by_id(self, session_id: int) -> StaffSession | None: + statement = select(StaffSession).where(StaffSession.id == session_id) + return self.db.execute(statement).scalar_one_or_none() + + def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None: + statement = select(StaffSession).where( + StaffSession.refresh_token_hash == refresh_token_hash + ) + return self.db.execute(statement).scalar_one_or_none() + + def save(self, staff_session: StaffSession) -> StaffSession: + self.db.add(staff_session) + self.db.commit() + self.db.refresh(staff_session) + return staff_session diff --git a/admin_app/services/__init__.py b/admin_app/services/__init__.py index b211bd7..37b3cd8 100644 --- a/admin_app/services/__init__.py +++ b/admin_app/services/__init__.py @@ -1,3 +1,15 @@ +from admin_app.services.audit_service import ( + AdminAuditEventType, + AdminAuditOutcome, + AuditService, +) +from admin_app.services.auth_service import AuthService from admin_app.services.system_service import SystemService -__all__ = ["SystemService"] +__all__ = [ + "AdminAuditEventType", + "AdminAuditOutcome", + "AuditService", + "AuthService", + "SystemService", +] diff --git a/admin_app/services/audit_service.py b/admin_app/services/audit_service.py new file mode 100644 index 0000000..d63ff69 --- /dev/null +++ b/admin_app/services/audit_service.py @@ -0,0 +1,153 @@ +from enum import Enum + +from admin_app.db.models import AuditLog +from admin_app.repositories import AuditLogRepository + + +class AdminAuditEventType(str, Enum): + STAFF_LOGIN_SUCCEEDED = "staff.login.succeeded" + STAFF_LOGIN_FAILED = "staff.login.failed" + STAFF_LOGOUT_SUCCEEDED = "staff.logout.succeeded" + TOOL_APPROVAL_RECORDED = "tool.approval.recorded" + TOOL_PUBLICATION_RECORDED = "tool.publication.recorded" + + +class AdminAuditOutcome(str, Enum): + SUCCESS = "success" + FAILED = "failed" + + +class AuditService: + def __init__(self, repository: AuditLogRepository): + self.repository = repository + + def record_event( + self, + *, + actor_staff_account_id: int | None, + event_type: AdminAuditEventType, + resource_type: str, + resource_id: str | None, + outcome: AdminAuditOutcome, + message: str | None, + payload_json: dict | None, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.repository.create( + actor_staff_account_id=actor_staff_account_id, + event_type=event_type.value, + resource_type=resource_type, + resource_id=resource_id, + outcome=outcome.value, + message=message, + payload_json=payload_json, + ip_address=ip_address, + user_agent=user_agent, + ) + + def record_login_succeeded( + self, + *, + actor_staff_account_id: int, + session_id: int, + email: str, + role: str, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.record_event( + actor_staff_account_id=actor_staff_account_id, + event_type=AdminAuditEventType.STAFF_LOGIN_SUCCEEDED, + resource_type="staff_account", + resource_id=str(actor_staff_account_id), + outcome=AdminAuditOutcome.SUCCESS, + message="Login administrativo concluido.", + payload_json={"session_id": session_id, "email": email, "role": role}, + ip_address=ip_address, + user_agent=user_agent, + ) + + def record_login_failed( + self, + *, + email: str, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.record_event( + actor_staff_account_id=None, + event_type=AdminAuditEventType.STAFF_LOGIN_FAILED, + resource_type="staff_account", + resource_id=email, + outcome=AdminAuditOutcome.FAILED, + message="Tentativa de login administrativo falhou.", + payload_json={"email": email}, + ip_address=ip_address, + user_agent=user_agent, + ) + + def record_logout_succeeded( + self, + *, + actor_staff_account_id: int, + session_id: int, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.record_event( + actor_staff_account_id=actor_staff_account_id, + event_type=AdminAuditEventType.STAFF_LOGOUT_SUCCEEDED, + resource_type="staff_session", + resource_id=str(session_id), + outcome=AdminAuditOutcome.SUCCESS, + message="Sessao administrativa encerrada.", + payload_json={"session_id": session_id}, + ip_address=ip_address, + user_agent=user_agent, + ) + + def record_tool_approval( + self, + *, + actor_staff_account_id: int, + tool_name: str, + tool_version: int, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.record_event( + actor_staff_account_id=actor_staff_account_id, + event_type=AdminAuditEventType.TOOL_APPROVAL_RECORDED, + resource_type="tool_publication", + resource_id=tool_name, + outcome=AdminAuditOutcome.SUCCESS, + message="Aprovacao de tool registrada.", + payload_json={"tool_name": tool_name, "tool_version": tool_version}, + ip_address=ip_address, + user_agent=user_agent, + ) + + def record_tool_publication( + self, + *, + actor_staff_account_id: int, + tool_name: str, + tool_version: int, + ip_address: str | None, + user_agent: str | None, + ) -> AuditLog: + return self.record_event( + actor_staff_account_id=actor_staff_account_id, + event_type=AdminAuditEventType.TOOL_PUBLICATION_RECORDED, + resource_type="tool_publication", + resource_id=tool_name, + outcome=AdminAuditOutcome.SUCCESS, + message="Publicacao de tool registrada.", + payload_json={"tool_name": tool_name, "tool_version": tool_version}, + ip_address=ip_address, + user_agent=user_agent, + ) + + def list_recent(self, limit: int = 50) -> list[AuditLog]: + return self.repository.list_recent(limit=limit) diff --git a/admin_app/services/auth_service.py b/admin_app/services/auth_service.py new file mode 100644 index 0000000..9451df9 --- /dev/null +++ b/admin_app/services/auth_service.py @@ -0,0 +1,210 @@ +from datetime import datetime, timezone + +from admin_app.core import ( + AdminAuthenticatedSession, + AdminSecurityService, + AuthenticatedStaffContext, + AuthenticatedStaffPrincipal, +) +from admin_app.db.models import StaffAccount, StaffSession +from admin_app.repositories import StaffAccountRepository, StaffSessionRepository +from admin_app.services.audit_service import AuditService + + +class AuthService: + def __init__( + self, + account_repository: StaffAccountRepository, + session_repository: StaffSessionRepository, + security_service: AdminSecurityService, + audit_service: AuditService, + ): + self.account_repository = account_repository + self.session_repository = session_repository + self.security_service = security_service + self.audit_service = audit_service + + def login( + self, + email: str, + password: str, + *, + ip_address: str | None, + user_agent: str | None, + ) -> AdminAuthenticatedSession | None: + normalized_email = self.normalize_email(email) + account = self.authenticate_account(email=normalized_email, password=password) + if account is None: + self.audit_service.record_login_failed( + email=normalized_email, + ip_address=ip_address, + user_agent=user_agent, + ) + return None + + session = self._create_authenticated_session( + account, + ip_address=ip_address, + user_agent=user_agent, + ) + self.audit_service.record_login_succeeded( + actor_staff_account_id=account.id, + session_id=session.session_id, + email=account.email, + role=account.role.value, + ip_address=ip_address, + user_agent=user_agent, + ) + return session + + def refresh_session( + self, + refresh_token: str, + *, + ip_address: str | None, + user_agent: str | None, + ) -> AdminAuthenticatedSession | None: + token_hash = self.security_service.hash_refresh_token(refresh_token) + staff_session = self.session_repository.get_by_refresh_token_hash(token_hash) + if not self._is_session_active(staff_session): + return None + + account = self.account_repository.get_by_id(staff_session.staff_account_id) + if account is None or not account.is_active: + return None + + return self._rotate_session( + staff_session, + account, + ip_address=ip_address, + user_agent=user_agent, + ) + + def logout( + self, + session_id: int, + *, + actor_staff_account_id: int | None, + ip_address: str | None, + user_agent: str | None, + ) -> bool: + staff_session = self.session_repository.get_by_id(session_id) + if staff_session is None: + return False + + if staff_session.revoked_at is None: + staff_session.revoked_at = datetime.now(timezone.utc) + self.session_repository.save(staff_session) + if actor_staff_account_id is not None: + self.audit_service.record_logout_succeeded( + actor_staff_account_id=actor_staff_account_id, + session_id=session_id, + ip_address=ip_address, + user_agent=user_agent, + ) + return True + + def get_authenticated_context(self, access_token: str) -> AuthenticatedStaffContext: + claims = self.security_service.decode_access_token(access_token) + staff_session = self.session_repository.get_by_id(claims.sid) + if not self._is_session_active(staff_session): + raise ValueError("Administrative session is not available") + + account = self.account_repository.get_by_id(int(claims.sub)) + if account is None or not account.is_active: + raise ValueError("Staff account is not available") + if self.normalize_email(account.email) != self.normalize_email(claims.email): + raise ValueError("Token principal mismatch") + + return AuthenticatedStaffContext( + principal=self.build_principal(account), + session_id=staff_session.id, + ) + + def authenticate_account(self, email: str, password: str) -> StaffAccount | None: + normalized_email = self.normalize_email(email) + account = self.account_repository.get_by_email(normalized_email) + if account is None or not account.is_active: + return None + if not self.security_service.verify_password(password, account.password_hash): + return None + + account.last_login_at = datetime.now(timezone.utc) + return self.account_repository.update_last_login(account) + + @staticmethod + def normalize_email(email: str) -> str: + return email.strip().lower() + + @staticmethod + def build_principal(account: StaffAccount) -> AuthenticatedStaffPrincipal: + return AuthenticatedStaffPrincipal( + id=account.id, + email=account.email, + display_name=account.display_name, + role=account.role, + is_active=account.is_active, + ) + + def _create_authenticated_session( + self, + account: StaffAccount, + *, + ip_address: str | None, + user_agent: str | None, + ) -> AdminAuthenticatedSession: + refresh_token = self.security_service.generate_refresh_token() + refresh_token_hash = self.security_service.hash_refresh_token(refresh_token) + expires_at = self.security_service.build_refresh_token_expiry() + staff_session = self.session_repository.create( + staff_account_id=account.id, + refresh_token_hash=refresh_token_hash, + expires_at=expires_at, + ip_address=ip_address, + user_agent=user_agent, + ) + principal = self.build_principal(account) + access_token = self.security_service.issue_access_token(principal, staff_session.id) + return AdminAuthenticatedSession( + session_id=staff_session.id, + access_token=access_token, + refresh_token=refresh_token, + token_type="bearer", + expires_in_seconds=self.security_service.settings.admin_auth_access_token_ttl_minutes * 60, + principal=principal, + ) + + def _rotate_session( + self, + staff_session: StaffSession, + account: StaffAccount, + *, + ip_address: str | None, + user_agent: str | None, + ) -> AdminAuthenticatedSession: + refresh_token = self.security_service.generate_refresh_token() + staff_session.refresh_token_hash = self.security_service.hash_refresh_token(refresh_token) + staff_session.expires_at = self.security_service.build_refresh_token_expiry() + staff_session.last_used_at = datetime.now(timezone.utc) + staff_session.ip_address = ip_address + staff_session.user_agent = user_agent + persisted_session = self.session_repository.save(staff_session) + + principal = self.build_principal(account) + access_token = self.security_service.issue_access_token(principal, persisted_session.id) + return AdminAuthenticatedSession( + session_id=persisted_session.id, + access_token=access_token, + refresh_token=refresh_token, + token_type="bearer", + expires_in_seconds=self.security_service.settings.admin_auth_access_token_ttl_minutes * 60, + principal=principal, + ) + + @staticmethod + def _is_session_active(staff_session: StaffSession | None) -> bool: + if staff_session is None: + return False + if staff_session.revoked_at is not None: + return False + return staff_session.expires_at >= datetime.now(timezone.utc) \ No newline at end of file diff --git a/docs/architecture/admin-credential-strategy.md b/docs/architecture/admin-credential-strategy.md new file mode 100644 index 0000000..4b9526e --- /dev/null +++ b/docs/architecture/admin-credential-strategy.md @@ -0,0 +1,164 @@ +# Estrategia De Credenciais Para StaffAccount + +Este documento define a estrategia inicial de credenciais para contas administrativas internas (`StaffAccount`). + +## Objetivo + +Permitir autenticacao web no `orquestrador-admin` sem misturar a identidade do painel com o `User` do atendimento. + +## O que e o StaffAccount + +`StaffAccount` e a conta administrativa interna do sistema. + +Ela representa um funcionario ou administrador da empresa e existe apenas no dominio administrativo. +Nao deve ser usada para identificar cliente do atendimento. + +Responsabilidades principais do `StaffAccount`: + +- autenticar acesso ao painel administrativo +- carregar papel de autorizacao (`viewer`, `staff`, `admin`) +- auditar quem executou uma acao interna +- governar drafts, configuracoes, relatorios e publicacao de tools + +## O que e a StaffSession + +`StaffSession` representa uma sessao administrativa ativa derivada de um login do `StaffAccount`. + +Cada sessao possui: + +- `session_id` +- `refresh_token_hash` +- data de expiracao +- marcador de revogacao +- metadados simples de IP e user-agent + +Isso permite tratar login, refresh e logout sem depender apenas de um access token solto. + +## Identificador de login + +O identificador principal de login sera: + +- `email` + +Regras: + +- deve ser unico no banco administrativo +- deve ser normalizado em lowercase na camada de servico e repositorio +- nao sera compartilhado com a identidade do atendimento + +## Segredo primario + +O segredo primario inicial sera: + +- senha gerenciada internamente + +A senha nunca deve ser armazenada em texto puro. + +## Estrategia de hash + +A estrategia inicial definida para `password_hash` e: + +- esquema: `pbkdf2_sha256` +- iteracoes padrao: `390000` +- salt aleatorio por conta +- pepper opcional via ambiente: `ADMIN_AUTH_PASSWORD_PEPPER` + +Formato de persistencia adotado: + +- `pbkdf2_sha256$$$` + +## Politica inicial de senha + +Politica minima definida: + +- tamanho minimo: `12` +- exigir letra maiuscula +- exigir letra minuscula +- exigir digito +- exigir simbolo + +A validacao dessa politica ja esta refletida no runtime administrativo. + +## Tokens e sessao + +Estrategia inicial para a autenticacao web: + +- access token curto: `30` minutos +- refresh token: `7` dias +- secret de assinatura dedicado: `ADMIN_AUTH_TOKEN_SECRET` +- issuer do admin runtime: `ADMIN_AUTH_TOKEN_ISSUER` +- tamanho configuravel do refresh token: `ADMIN_AUTH_REFRESH_TOKEN_BYTES` + +Nesta etapa, o runtime administrativo ja faz: + +- emissao de access token assinado por HMAC-SHA256 +- emissao de refresh token opaco +- armazenamento hash do refresh token em `StaffSession` +- rotacao do refresh token no endpoint de refresh +- revogacao da sessao no logout + +## Bootstrap do primeiro admin + +O primeiro admin deve nascer por fluxo controlado, nunca por startup automatico. + +Variaveis previstas: + +- `ADMIN_BOOTSTRAP_ENABLED` +- `ADMIN_BOOTSTRAP_EMAIL` +- `ADMIN_BOOTSTRAP_DISPLAY_NAME` +- `ADMIN_BOOTSTRAP_PASSWORD` +- `ADMIN_BOOTSTRAP_ROLE` + +Regras: + +- o bootstrap deve ser executado por comando explicito no futuro +- nao deve criar conta automaticamente ao subir o servico +- o papel padrao do bootstrap e `admin` + +## Relacao com papeis e permissoes + +A conta `StaffAccount` continua acoplada a hierarquia compartilhada: + +- `viewer` +- `staff` +- `admin` + +A senha autentica a identidade; o `role` governa autorizacao. + +## Configuracoes adicionadas ao admin runtime + +As configuracoes de credenciais passam a existir em `admin_app/core/settings.py`: + +- `admin_auth_password_hash_scheme` +- `admin_auth_password_hash_iterations` +- `admin_auth_password_min_length` +- `admin_auth_password_require_uppercase` +- `admin_auth_password_require_lowercase` +- `admin_auth_password_require_digit` +- `admin_auth_password_require_symbol` +- `admin_auth_password_pepper` +- `admin_auth_token_secret` +- `admin_auth_token_issuer` +- `admin_auth_access_token_ttl_minutes` +- `admin_auth_refresh_token_ttl_days` +- `admin_auth_refresh_token_bytes` +- `admin_bootstrap_enabled` +- `admin_bootstrap_email` +- `admin_bootstrap_display_name` +- `admin_bootstrap_password` +- `admin_bootstrap_role` + +## Endpoints iniciais de autenticacao + +Nesta etapa, o `orquestrador-admin` passa a expor: + +- `POST /auth/login` +- `POST /auth/refresh` +- `POST /auth/logout` +- `GET /auth/me` + +## Proximos passos naturais + +- implementar autorizacao por papel nas demais rotas administrativas +- implementar fluxo explicito de bootstrap do primeiro admin +- implementar gestao de sessoes revogaveis por tela administrativa diff --git a/docs/architecture/monorepo-target-structure.md b/docs/architecture/monorepo-target-structure.md index 48ab962..4c3f33b 100644 --- a/docs/architecture/monorepo-target-structure.md +++ b/docs/architecture/monorepo-target-structure.md @@ -64,10 +64,10 @@ admin_app/ # reports.py core/ settings.py - # security.py + security.py db/ models/ - # staff_account.py + staff_account.py # tool_draft.py # tool_generation_job.py # tool_publication.py @@ -145,3 +145,5 @@ Quando o admin ganhar runtime real, o deploy vai evoluir para dois servicos dist Sem mover o runtime atual de `app/` nesta etapa. + + diff --git a/docs/architecture/shared-contracts-and-access-hierarchy.md b/docs/architecture/shared-contracts-and-access-hierarchy.md index ac4c94c..fd9207e 100644 --- a/docs/architecture/shared-contracts-and-access-hierarchy.md +++ b/docs/architecture/shared-contracts-and-access-hierarchy.md @@ -92,3 +92,4 @@ Ele cobre: - criar a entidade `StaffAccount` - plugar a role do usuario interno ao contrato compartilhado - modelar a persistencia de drafts/publicacoes de tool + diff --git a/shared/contracts/access_control.py b/shared/contracts/access_control.py index 136dcbd..2a5baed 100644 --- a/shared/contracts/access_control.py +++ b/shared/contracts/access_control.py @@ -81,8 +81,6 @@ def role_includes(role: StaffRole | str, minimum_role: StaffRole | str) -> bool: return _ROLE_HIERARCHY[normalized_role] >= _ROLE_HIERARCHY[normalized_minimum] -def role_has_permission( - role: StaffRole | str, permission: AdminPermission | str -) -> bool: +def role_has_permission(role: StaffRole | str, permission: AdminPermission | str) -> bool: normalized_permission = normalize_admin_permission(permission) return normalized_permission in permissions_for_role(role) diff --git a/tests/test_admin_app_bootstrap.py b/tests/test_admin_app_bootstrap.py index 8df6f43..9bf6b0d 100644 --- a/tests/test_admin_app_bootstrap.py +++ b/tests/test_admin_app_bootstrap.py @@ -2,8 +2,10 @@ import unittest from fastapi.testclient import TestClient +from admin_app.api.dependencies import get_current_staff_principal from admin_app.app_factory import create_app from admin_app.core.settings import AdminSettings +from shared.contracts import StaffRole class AdminAppBootstrapTests(unittest.TestCase): @@ -45,6 +47,17 @@ class AdminAppBootstrapTests(unittest.TestCase): admin_debug=True, ) app = create_app(settings) + app.dependency_overrides[get_current_staff_principal] = lambda: type( + "Principal", + (), + { + "id": 1, + "email": "viewer@empresa.com", + "display_name": "Viewer", + "role": StaffRole.VIEWER, + "is_active": True, + }, + )() client = TestClient(app) response = client.get("/admin/system/info") diff --git a/tests/test_admin_audit_log_model.py b/tests/test_admin_audit_log_model.py new file mode 100644 index 0000000..ebfbd1e --- /dev/null +++ b/tests/test_admin_audit_log_model.py @@ -0,0 +1,26 @@ +import unittest + +from admin_app.db.models import AuditLog + + +class AuditLogModelTests(unittest.TestCase): + def test_audit_log_declares_expected_table_and_columns(self): + self.assertEqual(AuditLog.__tablename__, "admin_audit_logs") + self.assertIn("actor_staff_account_id", AuditLog.__table__.columns) + self.assertIn("event_type", AuditLog.__table__.columns) + self.assertIn("resource_type", AuditLog.__table__.columns) + self.assertIn("resource_id", AuditLog.__table__.columns) + self.assertIn("outcome", AuditLog.__table__.columns) + self.assertIn("message", AuditLog.__table__.columns) + self.assertIn("payload_json", AuditLog.__table__.columns) + self.assertIn("ip_address", AuditLog.__table__.columns) + self.assertIn("user_agent", AuditLog.__table__.columns) + + def test_audit_log_uses_staff_account_foreign_key(self): + foreign_keys = list(AuditLog.__table__.columns["actor_staff_account_id"].foreign_keys) + self.assertEqual(len(foreign_keys), 1) + self.assertEqual(str(foreign_keys[0].target_fullname), "staff_accounts.id") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_admin_audit_service.py b/tests/test_admin_audit_service.py new file mode 100644 index 0000000..bee971b --- /dev/null +++ b/tests/test_admin_audit_service.py @@ -0,0 +1,76 @@ +import unittest + +from admin_app.db.models import AuditLog +from admin_app.services import AuditService + + +class _FakeAuditLogRepository: + def __init__(self): + self.entries: list[AuditLog] = [] + self._next_id = 1 + + def create(self, **kwargs) -> AuditLog: + audit_log = AuditLog(id=self._next_id, **kwargs) + self.entries.append(audit_log) + self._next_id += 1 + return audit_log + + def list_recent(self, limit: int = 50) -> list[AuditLog]: + return list(reversed(self.entries))[:limit] + + +class AdminAuditServiceTests(unittest.TestCase): + def setUp(self): + self.repository = _FakeAuditLogRepository() + self.service = AuditService(self.repository) + + def test_record_tool_approval_keeps_actor_and_version_metadata(self): + audit_entry = self.service.record_tool_approval( + actor_staff_account_id=7, + tool_name="consultar_clientes_vip", + tool_version=3, + ip_address="127.0.0.1", + user_agent="pytest", + ) + + self.assertEqual(audit_entry.event_type, "tool.approval.recorded") + self.assertEqual(audit_entry.resource_id, "consultar_clientes_vip") + self.assertEqual(audit_entry.payload_json["tool_version"], 3) + self.assertEqual(audit_entry.actor_staff_account_id, 7) + + def test_record_tool_publication_keeps_actor_and_version_metadata(self): + audit_entry = self.service.record_tool_publication( + actor_staff_account_id=9, + tool_name="emitir_relatorio_receita", + tool_version=5, + ip_address="127.0.0.1", + user_agent="pytest", + ) + + self.assertEqual(audit_entry.event_type, "tool.publication.recorded") + self.assertEqual(audit_entry.resource_id, "emitir_relatorio_receita") + self.assertEqual(audit_entry.payload_json["tool_version"], 5) + self.assertEqual(audit_entry.actor_staff_account_id, 9) + + def test_list_recent_returns_newest_first(self): + self.service.record_login_failed( + email="viewer@empresa.com", + ip_address="127.0.0.1", + user_agent="pytest", + ) + self.service.record_tool_publication( + actor_staff_account_id=1, + tool_name="publicar_x", + tool_version=1, + ip_address="127.0.0.1", + user_agent="pytest", + ) + + recent = self.service.list_recent(limit=2) + + self.assertEqual(recent[0].event_type, "tool.publication.recorded") + self.assertEqual(recent[1].event_type, "staff.login.failed") + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_admin_auth_service.py b/tests/test_admin_auth_service.py new file mode 100644 index 0000000..604b0f8 --- /dev/null +++ b/tests/test_admin_auth_service.py @@ -0,0 +1,206 @@ +import unittest +from datetime import datetime, timedelta, timezone + +from admin_app.core import AdminSecurityService, AdminSettings +from admin_app.db.models import AuditLog, StaffAccount, StaffSession +from admin_app.services import AuditService +from admin_app.services.auth_service import AuthService +from shared.contracts import StaffRole + + +class _FakeStaffAccountRepository: + def __init__(self, account: StaffAccount | None): + self.account = account + + def get_by_email(self, email: str) -> StaffAccount | None: + if self.account and self.account.email == email: + return self.account + return None + + def get_by_id(self, staff_account_id: int) -> StaffAccount | None: + if self.account and self.account.id == staff_account_id: + return self.account + return None + + def update_last_login(self, staff_account: StaffAccount) -> StaffAccount: + self.account = staff_account + return staff_account + + +class _FakeStaffSessionRepository: + def __init__(self): + self.sessions: dict[int, StaffSession] = {} + self._next_id = 1 + + def create(self, *, staff_account_id: int, refresh_token_hash: str, expires_at: datetime, ip_address: str | None, user_agent: str | None) -> StaffSession: + session = StaffSession( + id=self._next_id, + staff_account_id=staff_account_id, + refresh_token_hash=refresh_token_hash, + expires_at=expires_at, + ip_address=ip_address, + user_agent=user_agent, + ) + self.sessions[session.id] = session + self._next_id += 1 + return session + + def get_by_id(self, session_id: int) -> StaffSession | None: + return self.sessions.get(session_id) + + def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None: + for session in self.sessions.values(): + if session.refresh_token_hash == refresh_token_hash: + return session + return None + + def save(self, staff_session: StaffSession) -> StaffSession: + self.sessions[staff_session.id] = staff_session + return staff_session + + +class _FakeAuditLogRepository: + def __init__(self): + self.entries: list[AuditLog] = [] + self._next_id = 1 + + def create(self, **kwargs) -> AuditLog: + audit_log = AuditLog(id=self._next_id, **kwargs) + self.entries.append(audit_log) + self._next_id += 1 + return audit_log + + def list_recent(self, limit: int = 50) -> list[AuditLog]: + return list(reversed(self.entries))[:limit] + + +class AdminAuthServiceTests(unittest.TestCase): + def setUp(self): + self.security_service = AdminSecurityService( + AdminSettings( + admin_auth_token_secret="test-secret", + admin_auth_password_pepper="pepper", + ) + ) + self.account = StaffAccount( + id=1, + email="admin@empresa.com", + display_name="Administrador", + password_hash=self.security_service.hash_password("SenhaMuitoSegura!123"), + role=StaffRole.ADMIN, + is_active=True, + ) + self.account_repository = _FakeStaffAccountRepository(self.account) + self.session_repository = _FakeStaffSessionRepository() + self.audit_repository = _FakeAuditLogRepository() + self.audit_service = AuditService(self.audit_repository) + self.auth_service = AuthService( + account_repository=self.account_repository, + session_repository=self.session_repository, + security_service=self.security_service, + audit_service=self.audit_service, + ) + + def test_login_creates_authenticated_session_with_refresh_token(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="SenhaMuitoSegura!123", + ip_address="127.0.0.1", + user_agent="unittest", + ) + + self.assertIsNotNone(session) + self.assertEqual(session.session_id, 1) + self.assertTrue(session.access_token) + self.assertTrue(session.refresh_token) + self.assertEqual(session.principal.role, StaffRole.ADMIN) + self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.succeeded") + + def test_login_failure_creates_audit_entry(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="senha-incorreta", + ip_address="127.0.0.1", + user_agent="unittest", + ) + + self.assertIsNone(session) + self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.failed") + self.assertEqual(self.audit_repository.entries[-1].outcome, "failed") + + def test_refresh_session_rotates_refresh_token(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="SenhaMuitoSegura!123", + ip_address="127.0.0.1", + user_agent="unittest", + ) + + refreshed = self.auth_service.refresh_session( + refresh_token=session.refresh_token, + ip_address="127.0.0.1", + user_agent="unittest-refresh", + ) + + self.assertIsNotNone(refreshed) + self.assertEqual(refreshed.session_id, session.session_id) + self.assertNotEqual(refreshed.refresh_token, session.refresh_token) + + def test_logout_revokes_session_and_creates_audit_entry(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="SenhaMuitoSegura!123", + ip_address="127.0.0.1", + user_agent="unittest", + ) + + self.assertTrue( + self.auth_service.logout( + session.session_id, + actor_staff_account_id=self.account.id, + ip_address="127.0.0.1", + user_agent="unittest", + ) + ) + self.assertIsNotNone(self.session_repository.get_by_id(session.session_id).revoked_at) + self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.logout.succeeded") + + def test_get_authenticated_context_rejects_revoked_session(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="SenhaMuitoSegura!123", + ip_address="127.0.0.1", + user_agent="unittest", + ) + self.auth_service.logout( + session.session_id, + actor_staff_account_id=self.account.id, + ip_address="127.0.0.1", + user_agent="unittest", + ) + + with self.assertRaises(ValueError): + self.auth_service.get_authenticated_context(session.access_token) + + def test_refresh_session_rejects_expired_session(self): + session = self.auth_service.login( + email="admin@empresa.com", + password="SenhaMuitoSegura!123", + ip_address="127.0.0.1", + user_agent="unittest", + ) + stored_session = self.session_repository.get_by_id(session.session_id) + stored_session.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) + self.session_repository.save(stored_session) + + refreshed = self.auth_service.refresh_session( + refresh_token=session.refresh_token, + ip_address="127.0.0.1", + user_agent="unittest-refresh", + ) + + self.assertIsNone(refreshed) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_admin_auth_web.py b/tests/test_admin_auth_web.py new file mode 100644 index 0000000..63d7bb8 --- /dev/null +++ b/tests/test_admin_auth_web.py @@ -0,0 +1,150 @@ +import unittest + +from fastapi.testclient import TestClient + +from admin_app.api.dependencies import get_auth_service, get_current_staff_context, get_current_staff_principal +from admin_app.app_factory import create_app +from admin_app.core import ( + AdminAuthenticatedSession, + AdminSettings, + AuthenticatedStaffContext, + AuthenticatedStaffPrincipal, +) +from shared.contracts import StaffRole + + +class _FakeAuthService: + def login(self, email: str, password: str, *, ip_address: str | None, user_agent: str | None): + if email == "admin@empresa.com" and password == "SenhaMuitoSegura!123": + principal = AuthenticatedStaffPrincipal( + id=1, + email="admin@empresa.com", + display_name="Administrador", + role=StaffRole.ADMIN, + is_active=True, + ) + return AdminAuthenticatedSession( + session_id=77, + access_token="token-abc", + refresh_token="refresh-abc", + token_type="bearer", + expires_in_seconds=1800, + principal=principal, + ) + return None + + def refresh_session(self, refresh_token: str, *, ip_address: str | None, user_agent: str | None): + if refresh_token == "refresh-abc": + principal = AuthenticatedStaffPrincipal( + id=1, + email="admin@empresa.com", + display_name="Administrador", + role=StaffRole.ADMIN, + is_active=True, + ) + return AdminAuthenticatedSession( + session_id=77, + access_token="token-new", + refresh_token="refresh-new", + token_type="bearer", + expires_in_seconds=1800, + principal=principal, + ) + return None + + def logout( + self, + session_id: int, + *, + actor_staff_account_id: int | None, + ip_address: str | None, + user_agent: str | None, + ) -> bool: + return session_id == 77 and actor_staff_account_id == 1 + + +class AdminAuthWebTests(unittest.TestCase): + def setUp(self): + app = create_app(AdminSettings(admin_auth_token_secret="test-secret")) + app.dependency_overrides[get_auth_service] = lambda: _FakeAuthService() + app.dependency_overrides[get_current_staff_principal] = lambda: AuthenticatedStaffPrincipal( + id=1, + email="admin@empresa.com", + display_name="Administrador", + role=StaffRole.ADMIN, + is_active=True, + ) + app.dependency_overrides[get_current_staff_context] = lambda: AuthenticatedStaffContext( + principal=AuthenticatedStaffPrincipal( + id=1, + email="admin@empresa.com", + display_name="Administrador", + role=StaffRole.ADMIN, + is_active=True, + ), + session_id=77, + ) + self.client = TestClient(app) + self.app = app + + def tearDown(self): + self.app.dependency_overrides.clear() + + def test_login_returns_tokens_and_staff_account(self): + response = self.client.post( + "/auth/login", + json={"email": "admin@empresa.com", "password": "SenhaMuitoSegura!123"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["session_id"], 77) + self.assertEqual(response.json()["token_type"], "bearer") + self.assertEqual(response.json()["refresh_token"], "refresh-abc") + self.assertEqual(response.json()["staff_account"]["role"], "admin") + + def test_refresh_returns_rotated_tokens(self): + response = self.client.post( + "/auth/refresh", + json={"refresh_token": "refresh-abc"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["access_token"], "token-new") + self.assertEqual(response.json()["refresh_token"], "refresh-new") + + def test_refresh_rejects_invalid_token(self): + response = self.client.post( + "/auth/refresh", + json={"refresh_token": "refresh-invalido"}, + ) + + self.assertEqual(response.status_code, 401) + self.assertEqual(response.json()["detail"], "Refresh token administrativo invalido.") + + def test_logout_revokes_current_session(self): + response = self.client.post( + "/auth/logout", + headers={"Authorization": "Bearer token-abc", "User-Agent": "pytest"}, + ) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["session_id"], 77) + self.assertEqual(response.json()["status"], "ok") + + def test_me_returns_authenticated_staff_account(self): + response = self.client.get("/auth/me", headers={"Authorization": "Bearer token-abc"}) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["email"], "admin@empresa.com") + self.assertEqual(response.json()["role"], "admin") + + def test_system_access_returns_permissions_for_authenticated_staff(self): + response = self.client.get("/system/access", headers={"Authorization": "Bearer token-abc"}) + + self.assertEqual(response.status_code, 200) + self.assertIn("manage_settings", response.json()["permissions"]) + self.assertEqual(response.json()["staff_account"]["role"], "admin") + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_admin_authorization_web.py b/tests/test_admin_authorization_web.py new file mode 100644 index 0000000..2af4dc7 --- /dev/null +++ b/tests/test_admin_authorization_web.py @@ -0,0 +1,100 @@ +import unittest +from datetime import datetime, timezone + +from fastapi.testclient import TestClient + +from admin_app.api.dependencies import get_audit_service, get_current_staff_principal +from admin_app.app_factory import create_app +from admin_app.core import AuthenticatedStaffPrincipal, AdminSettings +from admin_app.db.models import AuditLog +from shared.contracts import StaffRole + + +class _FakeAuditService: + def list_recent(self, limit: int = 50) -> list[AuditLog]: + return [ + AuditLog( + id=1, + actor_staff_account_id=10, + event_type="staff.login.succeeded", + resource_type="staff_account", + resource_id="10", + outcome="success", + message="Login administrativo concluido.", + payload_json={"session_id": 77}, + ip_address="127.0.0.1", + user_agent="pytest", + created_at=datetime(2026, 3, 26, 12, 0, tzinfo=timezone.utc), + ) + ] + + +class AdminAuthorizationWebTests(unittest.TestCase): + def _build_client_with_role(self, role: StaffRole) -> tuple[TestClient, object]: + app = create_app(AdminSettings(admin_auth_token_secret="test-secret")) + app.dependency_overrides[get_current_staff_principal] = lambda: AuthenticatedStaffPrincipal( + id=10, + email="staff@empresa.com", + display_name="Equipe Interna", + role=role, + is_active=True, + ) + app.dependency_overrides[get_audit_service] = lambda: _FakeAuditService() + return TestClient(app), app + + def test_system_info_requires_authentication(self): + app = create_app(AdminSettings(admin_auth_token_secret="test-secret")) + client = TestClient(app) + + response = client.get("/system/info") + + self.assertEqual(response.status_code, 401) + self.assertEqual(response.json()["detail"], "Autenticacao administrativa obrigatoria.") + + def test_viewer_can_access_system_info(self): + client, app = self._build_client_with_role(StaffRole.VIEWER) + try: + response = client.get("/system/info", headers={"Authorization": "Bearer token"}) + finally: + app.dependency_overrides.clear() + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["service"], "orquestrador-admin") + + def test_viewer_can_access_audit_events(self): + client, app = self._build_client_with_role(StaffRole.VIEWER) + try: + response = client.get("/audit/events", headers={"Authorization": "Bearer token"}) + finally: + app.dependency_overrides.clear() + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["events"][0]["event_type"], "staff.login.succeeded") + + def test_staff_cannot_access_admin_only_capability(self): + client, app = self._build_client_with_role(StaffRole.STAFF) + try: + response = client.get("/system/admin-capabilities", headers={"Authorization": "Bearer token"}) + finally: + app.dependency_overrides.clear() + + self.assertEqual(response.status_code, 403) + self.assertEqual( + response.json()["detail"], + "Permissao administrativa insuficiente: 'manage_settings'.", + ) + + def test_admin_can_access_admin_only_capability(self): + client, app = self._build_client_with_role(StaffRole.ADMIN) + try: + response = client.get("/system/admin-capabilities", headers={"Authorization": "Bearer token"}) + finally: + app.dependency_overrides.clear() + + self.assertEqual(response.status_code, 200) + self.assertTrue(response.json()["allowed"]) + self.assertEqual(response.json()["role"], "admin") + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_admin_credential_strategy.py b/tests/test_admin_credential_strategy.py new file mode 100644 index 0000000..ed2e9d5 --- /dev/null +++ b/tests/test_admin_credential_strategy.py @@ -0,0 +1,62 @@ +import unittest + +from pydantic import ValidationError + +from admin_app.core.security import AdminSecurityService +from admin_app.core.settings import AdminSettings + + +class AdminCredentialStrategyTests(unittest.TestCase): + def test_admin_settings_expose_secure_defaults_for_credentials(self): + settings = AdminSettings() + + self.assertEqual(settings.admin_auth_password_hash_scheme, "pbkdf2_sha256") + self.assertEqual(settings.admin_auth_password_hash_iterations, 390000) + self.assertEqual(settings.admin_auth_password_min_length, 12) + self.assertEqual(settings.admin_auth_access_token_ttl_minutes, 30) + self.assertEqual(settings.admin_auth_refresh_token_ttl_days, 7) + self.assertFalse(settings.admin_bootstrap_enabled) + self.assertEqual(settings.admin_bootstrap_role, "admin") + + def test_admin_settings_reject_insecure_password_policy(self): + with self.assertRaises(ValidationError): + AdminSettings(admin_auth_password_min_length=8) + + with self.assertRaises(ValidationError): + AdminSettings(admin_auth_password_hash_iterations=50000) + + def test_admin_settings_normalize_optional_bootstrap_values(self): + settings = AdminSettings( + admin_bootstrap_email=" ", + admin_bootstrap_display_name=" ", + admin_bootstrap_password=" ", + admin_auth_password_pepper=" ", + ) + + self.assertIsNone(settings.admin_bootstrap_email) + self.assertIsNone(settings.admin_bootstrap_display_name) + self.assertIsNone(settings.admin_bootstrap_password) + self.assertIsNone(settings.admin_auth_password_pepper) + + def test_admin_security_service_builds_runtime_credential_strategy(self): + settings = AdminSettings( + admin_auth_password_pepper="secret-pepper", + admin_bootstrap_enabled=True, + admin_bootstrap_email="admin@empresa.com", + admin_bootstrap_display_name="Admin Inicial", + admin_bootstrap_password="SenhaMuitoSegura!123", + ) + + strategy = AdminSecurityService(settings).build_credential_strategy() + + self.assertEqual(strategy.password.hash_scheme, "pbkdf2_sha256") + self.assertTrue(strategy.password.pepper_configured) + self.assertEqual(strategy.tokens.access_token_ttl_minutes, 30) + self.assertTrue(strategy.bootstrap.enabled) + self.assertEqual(strategy.bootstrap.email, "admin@empresa.com") + self.assertTrue(strategy.bootstrap.password_configured) + self.assertEqual(strategy.bootstrap.role, "admin") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_admin_identity_isolation.py b/tests/test_admin_identity_isolation.py new file mode 100644 index 0000000..b8367db --- /dev/null +++ b/tests/test_admin_identity_isolation.py @@ -0,0 +1,63 @@ +import ast +import unittest +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +ADMIN_ROOT = REPO_ROOT / "admin_app" +PRODUCT_ROOT = REPO_ROOT / "app" + +FORBIDDEN_ADMIN_IMPORT_PREFIXES = ( + "app.repositories.user_repository", + "app.services.user", +) +FORBIDDEN_ADMIN_MODULE_IMPORTS = { + ("app.db.mock_models", "User"), +} + + +def _iter_python_files(root: Path): + for path in root.rglob("*.py"): + if "__pycache__" in path.parts: + continue + yield path + + +def _iter_imports(path: Path): + source = path.read_text(encoding="utf-8-sig") + tree = ast.parse(source, filename=str(path)) + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + yield ("import", alias.name, None) + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + for alias in node.names: + yield ("from", module, alias.name) + + +class AdminIdentityIsolationTests(unittest.TestCase): + def test_admin_app_does_not_depend_on_product_user_identity_modules(self): + violations: list[str] = [] + for path in _iter_python_files(ADMIN_ROOT): + for import_kind, module, imported_name in _iter_imports(path): + if import_kind == "import" and module.startswith(FORBIDDEN_ADMIN_IMPORT_PREFIXES): + violations.append(f"{path.relative_to(REPO_ROOT)} imports {module}") + if import_kind == "from" and module.startswith(FORBIDDEN_ADMIN_IMPORT_PREFIXES): + violations.append(f"{path.relative_to(REPO_ROOT)} imports from {module}") + if import_kind == "from" and (module, imported_name) in FORBIDDEN_ADMIN_MODULE_IMPORTS: + violations.append(f"{path.relative_to(REPO_ROOT)} imports {imported_name} from {module}") + + self.assertEqual(violations, [], "\n".join(violations)) + + def test_product_runtime_does_not_depend_on_admin_app_modules(self): + violations: list[str] = [] + for path in _iter_python_files(PRODUCT_ROOT): + for _, module, _ in _iter_imports(path): + if module.startswith("admin_app"): + violations.append(f"{path.relative_to(REPO_ROOT)} imports {module}") + + self.assertEqual(violations, [], "\n".join(violations)) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_admin_security_service.py b/tests/test_admin_security_service.py new file mode 100644 index 0000000..75215eb --- /dev/null +++ b/tests/test_admin_security_service.py @@ -0,0 +1,56 @@ +import unittest +from datetime import datetime, timedelta, timezone + +from admin_app.core import AdminSecurityService, AdminSettings, AuthenticatedStaffPrincipal +from shared.contracts import StaffRole + + +class AdminSecurityServiceTests(unittest.TestCase): + def setUp(self): + self.settings = AdminSettings( + admin_auth_token_secret="test-secret", + admin_auth_password_pepper="pepper", + ) + self.security_service = AdminSecurityService(self.settings) + + def test_hash_password_and_verify_round_trip(self): + password_hash = self.security_service.hash_password("SenhaMuitoSegura!123") + self.assertTrue(self.security_service.verify_password("SenhaMuitoSegura!123", password_hash)) + self.assertFalse(self.security_service.verify_password("senha-errada", password_hash)) + + def test_validate_password_strength_rejects_weak_password(self): + with self.assertRaises(ValueError): + self.security_service.validate_password_strength("fraca") + + def test_issue_and_decode_access_token_round_trip(self): + principal = AuthenticatedStaffPrincipal( + id=7, + email="admin@empresa.com", + display_name="Admin", + role=StaffRole.ADMIN, + is_active=True, + ) + token = self.security_service.issue_access_token(principal, session_id=99) + claims = self.security_service.decode_access_token(token) + + self.assertEqual(claims.sub, "7") + self.assertEqual(claims.sid, 99) + self.assertEqual(claims.email, "admin@empresa.com") + self.assertEqual(claims.role, StaffRole.ADMIN) + self.assertEqual(claims.token_type, "access") + + def test_refresh_token_hash_is_stable_for_same_token(self): + refresh_token = self.security_service.generate_refresh_token() + self.assertEqual( + self.security_service.hash_refresh_token(refresh_token), + self.security_service.hash_refresh_token(refresh_token), + ) + + def test_build_refresh_token_expiry_uses_refresh_ttl(self): + expires_at = self.security_service.build_refresh_token_expiry() + min_expected = datetime.now(timezone.utc) + timedelta(days=6) + self.assertGreater(expires_at, min_expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_staff_account_model.py b/tests/test_staff_account_model.py new file mode 100644 index 0000000..2d76c26 --- /dev/null +++ b/tests/test_staff_account_model.py @@ -0,0 +1,49 @@ +import unittest + +from admin_app.db.models import StaffAccount +from shared.contracts import StaffRole + + +class StaffAccountModelTests(unittest.TestCase): + def test_staff_account_declares_expected_table_and_columns(self): + self.assertEqual(StaffAccount.__tablename__, "staff_accounts") + self.assertIn("email", StaffAccount.__table__.columns) + self.assertIn("display_name", StaffAccount.__table__.columns) + self.assertIn("password_hash", StaffAccount.__table__.columns) + self.assertIn("role", StaffAccount.__table__.columns) + self.assertIn("is_active", StaffAccount.__table__.columns) + self.assertIn("last_login_at", StaffAccount.__table__.columns) + self.assertIn("created_at", StaffAccount.__table__.columns) + self.assertIn("updated_at", StaffAccount.__table__.columns) + + def test_staff_account_role_column_uses_shared_hierarchy_values(self): + role_column = StaffAccount.__table__.columns["role"] + enum_values = tuple(role_column.type.enums) + + self.assertEqual( + enum_values, + ( + StaffRole.VIEWER.value, + StaffRole.STAFF.value, + StaffRole.ADMIN.value, + ), + ) + + def test_staff_account_can_hold_admin_identity_fields(self): + account = StaffAccount( + email="admin@empresa.com", + display_name="Administrador Interno", + password_hash="hashed-secret", + role=StaffRole.ADMIN, + is_active=True, + ) + + self.assertEqual(account.email, "admin@empresa.com") + self.assertEqual(account.display_name, "Administrador Interno") + self.assertEqual(account.password_hash, "hashed-secret") + self.assertEqual(account.role, StaffRole.ADMIN) + self.assertTrue(account.is_active) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_staff_session_model.py b/tests/test_staff_session_model.py new file mode 100644 index 0000000..9bcc63a --- /dev/null +++ b/tests/test_staff_session_model.py @@ -0,0 +1,24 @@ +import unittest + +from admin_app.db.models import StaffSession + + +class StaffSessionModelTests(unittest.TestCase): + def test_staff_session_declares_expected_table_and_columns(self): + self.assertEqual(StaffSession.__tablename__, "staff_sessions") + self.assertIn("staff_account_id", StaffSession.__table__.columns) + self.assertIn("refresh_token_hash", StaffSession.__table__.columns) + self.assertIn("expires_at", StaffSession.__table__.columns) + self.assertIn("last_used_at", StaffSession.__table__.columns) + self.assertIn("revoked_at", StaffSession.__table__.columns) + self.assertIn("ip_address", StaffSession.__table__.columns) + self.assertIn("user_agent", StaffSession.__table__.columns) + + def test_staff_session_uses_staff_account_foreign_key(self): + foreign_keys = list(StaffSession.__table__.columns["staff_account_id"].foreign_keys) + self.assertEqual(len(foreign_keys), 1) + self.assertEqual(str(foreign_keys[0].target_fullname), "staff_accounts.id") + + +if __name__ == "__main__": + unittest.main()