🔐 feat(admin): implementar identidade e seguranca administrativa

feat/self-evolving-tools-foundation
parent 1541948e76
commit 82a12ff464

@ -199,6 +199,7 @@ Nesta fase, o produto continua rodando a partir de `app/` sem migracao de import
A estrutura alvo detalhada esta em [docs/architecture/monorepo-target-structure.md](docs/architecture/monorepo-target-structure.md).
A hierarquia inicial de acesso e os primeiros contratos entre servicos estao em [docs/architecture/shared-contracts-and-access-hierarchy.md](docs/architecture/shared-contracts-and-access-hierarchy.md).
A estrategia de deploy independente entre os dois runtimes esta em [docs/architecture/independent-deploy-strategy.md](docs/architecture/independent-deploy-strategy.md).
A estrategia de credenciais das contas administrativas esta em [docs/architecture/admin-credential-strategy.md](docs/architecture/admin-credential-strategy.md).
## Tools Disponiveis
@ -457,3 +458,6 @@ Os proximos ganhos mais valiosos para o projeto sao:
- criar uma camada de avaliacao semantica e replay de conversas
- integrar o orquestrador com sistemas reais de operacao

@ -1,6 +1,20 @@
from fastapi import Request
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from admin_app.core.settings import AdminSettings, get_admin_settings
from admin_app.core import (
AdminSecurityService,
AdminSettings,
AuthenticatedStaffContext,
AuthenticatedStaffPrincipal,
get_admin_settings,
)
from admin_app.db.database import get_admin_db_session
from admin_app.repositories import AuditLogRepository, StaffAccountRepository, StaffSessionRepository
from admin_app.services import AuditService, AuthService
from shared.contracts import AdminPermission, StaffRole, permissions_for_role, role_has_permission, role_includes
bearer_scheme = HTTPBearer(auto_error=False)
def get_settings(request: Request) -> AdminSettings:
@ -8,3 +22,110 @@ def get_settings(request: Request) -> AdminSettings:
if isinstance(app_settings, AdminSettings):
return app_settings
return get_admin_settings()
def get_admin_db(db: Session = Depends(get_admin_db_session)) -> Session:
return db
def get_security_service(settings: AdminSettings = Depends(get_settings)) -> AdminSecurityService:
return AdminSecurityService(settings)
def get_staff_account_repository(db: Session = Depends(get_admin_db)) -> StaffAccountRepository:
return StaffAccountRepository(db)
def get_staff_session_repository(db: Session = Depends(get_admin_db)) -> StaffSessionRepository:
return StaffSessionRepository(db)
def get_audit_log_repository(db: Session = Depends(get_admin_db)) -> AuditLogRepository:
return AuditLogRepository(db)
def get_audit_service(
repository: AuditLogRepository = Depends(get_audit_log_repository),
) -> AuditService:
return AuditService(repository)
def get_auth_service(
account_repository: StaffAccountRepository = Depends(get_staff_account_repository),
session_repository: StaffSessionRepository = Depends(get_staff_session_repository),
security_service: AdminSecurityService = Depends(get_security_service),
audit_service: AuditService = Depends(get_audit_service),
) -> AuthService:
return AuthService(
account_repository=account_repository,
session_repository=session_repository,
security_service=security_service,
audit_service=audit_service,
)
def get_current_staff_context(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
auth_service: AuthService = Depends(get_auth_service),
) -> AuthenticatedStaffContext:
if credentials is None or credentials.scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Autenticacao administrativa obrigatoria.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
return auth_service.get_authenticated_context(credentials.credentials)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token administrativo invalido.",
headers={"WWW-Authenticate": "Bearer"},
) from exc
def get_current_staff_principal(
context: AuthenticatedStaffContext = Depends(get_current_staff_context),
) -> AuthenticatedStaffPrincipal:
return context.principal
def get_current_staff_session_id(
context: AuthenticatedStaffContext = Depends(get_current_staff_context),
) -> int:
return context.session_id
def require_staff_role(minimum_role: StaffRole):
def dependency(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> AuthenticatedStaffPrincipal:
if not role_includes(current_staff.role, minimum_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Acesso administrativo requer papel minimo '{minimum_role.value}'.",
)
return current_staff
return dependency
def require_admin_permission(permission: AdminPermission):
def dependency(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> AuthenticatedStaffPrincipal:
if not role_has_permission(current_staff.role, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permissao administrativa insuficiente: '{permission.value}'.",
)
return current_staff
return dependency
def get_current_staff_permissions(
current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
) -> tuple[str, ...]:
return tuple(permission.value for permission in permissions_for_role(current_staff.role))

@ -1,6 +1,11 @@
from fastapi import APIRouter
from admin_app.api.routes.audit import router as audit_router
from admin_app.api.routes.auth import router as auth_router
from admin_app.api.routes.system import router as system_router
# Agrega as rotas do servico administrativo.
api_router = APIRouter()
api_router.include_router(auth_router)
api_router.include_router(system_router)
api_router.include_router(audit_router)

@ -0,0 +1,38 @@
from fastapi import APIRouter, Depends
from admin_app.api.dependencies import get_audit_service, require_admin_permission
from admin_app.api.schemas import AdminAuditEntryResponse, AdminAuditListResponse
from admin_app.core import AuthenticatedStaffPrincipal
from admin_app.services import AuditService
from shared.contracts import AdminPermission
router = APIRouter(prefix="/audit", tags=["audit"])
@router.get("/events", response_model=AdminAuditListResponse)
def list_audit_events(
audit_service: AuditService = Depends(get_audit_service),
_: AuthenticatedStaffPrincipal = Depends(
require_admin_permission(AdminPermission.VIEW_AUDIT_LOGS)
),
):
events = audit_service.list_recent(limit=50)
return AdminAuditListResponse(
service="orquestrador-admin",
events=[
AdminAuditEntryResponse(
id=event.id,
actor_staff_account_id=event.actor_staff_account_id,
event_type=event.event_type,
resource_type=event.resource_type,
resource_id=event.resource_id,
outcome=event.outcome,
message=event.message,
payload_json=event.payload_json,
ip_address=event.ip_address,
user_agent=event.user_agent,
created_at=event.created_at,
)
for event in events
],
)

@ -0,0 +1,109 @@
from fastapi import APIRouter, Depends, HTTPException, Request, status
from admin_app.api.dependencies import (
get_auth_service,
get_current_staff_context,
get_current_staff_principal,
)
from admin_app.api.schemas import (
AdminAuthenticatedStaffResponse,
AdminLoginRequest,
AdminLogoutResponse,
AdminRefreshTokenRequest,
AdminSessionResponse,
)
from admin_app.core import AuthenticatedStaffContext, AuthenticatedStaffPrincipal
from admin_app.services import AuthService
router = APIRouter(prefix="/auth", tags=["auth"])
def _extract_request_metadata(request: Request) -> tuple[str | None, str | None]:
ip_address = request.client.host if request.client else None
user_agent = request.headers.get("user-agent")
return ip_address, user_agent
@router.post("/login", response_model=AdminSessionResponse)
def login(
payload: AdminLoginRequest,
request: Request,
auth_service: AuthService = Depends(get_auth_service),
):
ip_address, user_agent = _extract_request_metadata(request)
session = auth_service.login(
email=payload.email,
password=payload.password,
ip_address=ip_address,
user_agent=user_agent,
)
if session is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenciais administrativas invalidas.",
)
return AdminSessionResponse(
session_id=session.session_id,
access_token=session.access_token,
refresh_token=session.refresh_token,
token_type=session.token_type,
expires_in_seconds=session.expires_in_seconds,
staff_account=AdminAuthenticatedStaffResponse(**session.principal.model_dump()),
)
@router.post("/refresh", response_model=AdminSessionResponse)
def refresh(
payload: AdminRefreshTokenRequest,
request: Request,
auth_service: AuthService = Depends(get_auth_service),
):
ip_address, user_agent = _extract_request_metadata(request)
session = auth_service.refresh_session(
refresh_token=payload.refresh_token,
ip_address=ip_address,
user_agent=user_agent,
)
if session is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token administrativo invalido.",
)
return AdminSessionResponse(
session_id=session.session_id,
access_token=session.access_token,
refresh_token=session.refresh_token,
token_type=session.token_type,
expires_in_seconds=session.expires_in_seconds,
staff_account=AdminAuthenticatedStaffResponse(**session.principal.model_dump()),
)
@router.post("/logout", response_model=AdminLogoutResponse)
def logout(
request: Request,
current_context: AuthenticatedStaffContext = Depends(get_current_staff_context),
auth_service: AuthService = Depends(get_auth_service),
):
ip_address, user_agent = _extract_request_metadata(request)
auth_service.logout(
current_context.session_id,
actor_staff_account_id=current_context.principal.id,
ip_address=ip_address,
user_agent=user_agent,
)
return AdminLogoutResponse(
service="orquestrador-admin",
status="ok",
message="Sessao administrativa encerrada.",
session_id=current_context.session_id,
)
@router.get("/me", response_model=AdminAuthenticatedStaffResponse)
def current_staff(
current_staff_account: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal),
):
return AdminAuthenticatedStaffResponse(**current_staff_account.model_dump())

@ -1,9 +1,21 @@
from fastapi import APIRouter, Depends
from admin_app.api.dependencies import get_settings
from admin_app.api.schemas import AdminHealthResponse, AdminRootResponse, AdminSystemInfoResponse
from admin_app.api.dependencies import (
get_current_staff_permissions,
get_settings,
require_admin_permission,
)
from admin_app.api.schemas import (
AdminCapabilityResponse,
AdminCurrentAccessResponse,
AdminHealthResponse,
AdminRootResponse,
AdminSystemInfoResponse,
)
from admin_app.core import AuthenticatedStaffPrincipal
from admin_app.core.settings import AdminSettings
from admin_app.services.system_service import SystemService
from shared.contracts import AdminPermission
router = APIRouter(tags=["system"])
@ -22,6 +34,54 @@ def health_check(settings: AdminSettings = Depends(get_settings)):
return _build_service(settings).build_health_payload()
@router.get("/system/info", response_model=AdminSystemInfoResponse)
def system_info(settings: AdminSettings = Depends(get_settings)):
@router.get(
"/system/info",
response_model=AdminSystemInfoResponse,
)
def system_info(
settings: AdminSettings = Depends(get_settings),
_: AuthenticatedStaffPrincipal = Depends(
require_admin_permission(AdminPermission.VIEW_SYSTEM)
),
):
return _build_service(settings).build_system_info_payload()
@router.get(
"/system/access",
response_model=AdminCurrentAccessResponse,
)
def current_access(
current_staff: AuthenticatedStaffPrincipal = Depends(
require_admin_permission(AdminPermission.VIEW_SYSTEM)
),
permissions: tuple[str, ...] = Depends(get_current_staff_permissions),
):
return AdminCurrentAccessResponse(
service="orquestrador-admin",
staff_account={
"id": current_staff.id,
"email": current_staff.email,
"display_name": current_staff.display_name,
"role": current_staff.role,
"is_active": current_staff.is_active,
},
permissions=list(permissions),
)
@router.get(
"/system/admin-capabilities",
response_model=AdminCapabilityResponse,
)
def admin_capabilities(
current_staff: AuthenticatedStaffPrincipal = Depends(
require_admin_permission(AdminPermission.MANAGE_SETTINGS)
),
):
return AdminCapabilityResponse(
service="orquestrador-admin",
action="manage_settings",
allowed=True,
role=current_staff.role,
)

@ -1,4 +1,8 @@
from pydantic import BaseModel
from datetime import datetime
from pydantic import BaseModel, Field, field_validator
from shared.contracts import StaffRole
class AdminRootResponse(BaseModel):
@ -21,3 +25,76 @@ class AdminSystemInfoResponse(BaseModel):
version: str
api_prefix: str
debug: bool
class AdminAuthenticatedStaffResponse(BaseModel):
id: int
email: str
display_name: str
role: StaffRole
is_active: bool
class AdminCurrentAccessResponse(BaseModel):
service: str
staff_account: AdminAuthenticatedStaffResponse
permissions: list[str]
class AdminCapabilityResponse(BaseModel):
service: str
action: str
allowed: bool
role: StaffRole
class AdminAuditEntryResponse(BaseModel):
id: int
actor_staff_account_id: int | None
event_type: str
resource_type: str
resource_id: str | None
outcome: str
message: str | None
payload_json: dict | None
ip_address: str | None
user_agent: str | None
created_at: datetime
class AdminAuditListResponse(BaseModel):
service: str
events: list[AdminAuditEntryResponse]
class AdminLoginRequest(BaseModel):
email: str
password: str = Field(min_length=1)
@field_validator("email")
@classmethod
def validate_email(cls, value: str) -> str:
normalized = value.strip().lower()
if "@" not in normalized or normalized.startswith("@") or normalized.endswith("@"):
raise ValueError("email must be a valid administrative login")
return normalized
class AdminRefreshTokenRequest(BaseModel):
refresh_token: str = Field(min_length=1)
class AdminSessionResponse(BaseModel):
session_id: int
access_token: str
refresh_token: str
token_type: str
expires_in_seconds: int
staff_account: AdminAuthenticatedStaffResponse
class AdminLogoutResponse(BaseModel):
service: str
status: str
message: str
session_id: int

@ -1 +1,22 @@
"""Configuracoes centrais do servico administrativo."""
"""Configuracoes centrais do servico administrativo."""
from admin_app.core.security import (
AdminAccessTokenClaims,
AdminAuthenticatedSession,
AdminCredentialStrategy,
AdminSecurityService,
AuthenticatedStaffContext,
AuthenticatedStaffPrincipal,
)
from admin_app.core.settings import AdminSettings, get_admin_settings
__all__ = [
"AdminAccessTokenClaims",
"AdminAuthenticatedSession",
"AdminCredentialStrategy",
"AdminSecurityService",
"AdminSettings",
"AuthenticatedStaffContext",
"AuthenticatedStaffPrincipal",
"get_admin_settings",
]

@ -0,0 +1,213 @@
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import secrets
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from admin_app.core.settings import AdminSettings
from shared.contracts import StaffRole
class AdminPasswordPolicy(BaseModel):
hash_scheme: str
hash_iterations: int
min_length: int
require_uppercase: bool
require_lowercase: bool
require_digit: bool
require_symbol: bool
pepper_configured: bool
class AdminTokenPolicy(BaseModel):
access_token_ttl_minutes: int
refresh_token_ttl_days: int
refresh_token_bytes: int
issuer: str
class AdminBootstrapPolicy(BaseModel):
enabled: bool
email: str | None
display_name: str | None
role: str
password_configured: bool
class AdminCredentialStrategy(BaseModel):
password: AdminPasswordPolicy
tokens: AdminTokenPolicy
bootstrap: AdminBootstrapPolicy
class AuthenticatedStaffPrincipal(BaseModel):
id: int
email: str
display_name: str
role: StaffRole
is_active: bool
class AuthenticatedStaffContext(BaseModel):
principal: AuthenticatedStaffPrincipal
session_id: int
class AdminAccessTokenClaims(BaseModel):
sub: str
sid: int
email: str
role: StaffRole
token_type: str
iss: str
iat: int
exp: int
class AdminAuthenticatedSession(BaseModel):
session_id: int
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in_seconds: int
principal: AuthenticatedStaffPrincipal
class AdminSecurityService:
def __init__(self, settings: AdminSettings):
self.settings = settings
def build_credential_strategy(self) -> AdminCredentialStrategy:
return AdminCredentialStrategy(
password=AdminPasswordPolicy(
hash_scheme=self.settings.admin_auth_password_hash_scheme,
hash_iterations=self.settings.admin_auth_password_hash_iterations,
min_length=self.settings.admin_auth_password_min_length,
require_uppercase=self.settings.admin_auth_password_require_uppercase,
require_lowercase=self.settings.admin_auth_password_require_lowercase,
require_digit=self.settings.admin_auth_password_require_digit,
require_symbol=self.settings.admin_auth_password_require_symbol,
pepper_configured=bool(self.settings.admin_auth_password_pepper),
),
tokens=AdminTokenPolicy(
access_token_ttl_minutes=self.settings.admin_auth_access_token_ttl_minutes,
refresh_token_ttl_days=self.settings.admin_auth_refresh_token_ttl_days,
refresh_token_bytes=self.settings.admin_auth_refresh_token_bytes,
issuer=self.settings.admin_auth_token_issuer,
),
bootstrap=AdminBootstrapPolicy(
enabled=self.settings.admin_bootstrap_enabled,
email=self.settings.admin_bootstrap_email,
display_name=self.settings.admin_bootstrap_display_name,
role=str(self.settings.admin_bootstrap_role or "admin"),
password_configured=bool(self.settings.admin_bootstrap_password),
),
)
def validate_password_strength(self, password: str) -> None:
if len(password) < self.settings.admin_auth_password_min_length:
raise ValueError("Password does not meet minimum length policy")
if self.settings.admin_auth_password_require_uppercase and not any(char.isupper() for char in password):
raise ValueError("Password must include an uppercase letter")
if self.settings.admin_auth_password_require_lowercase and not any(char.islower() for char in password):
raise ValueError("Password must include a lowercase letter")
if self.settings.admin_auth_password_require_digit and not any(char.isdigit() for char in password):
raise ValueError("Password must include a digit")
if self.settings.admin_auth_password_require_symbol and not any(not char.isalnum() for char in password):
raise ValueError("Password must include a symbol")
def hash_password(self, password: str) -> str:
self.validate_password_strength(password)
if self.settings.admin_auth_password_hash_scheme != "pbkdf2_sha256":
raise ValueError("Unsupported password hash scheme")
salt = secrets.token_hex(16)
digest = self._pbkdf2_digest(password=password, salt=salt, iterations=self.settings.admin_auth_password_hash_iterations)
return f"pbkdf2_sha256${self.settings.admin_auth_password_hash_iterations}${salt}${digest}"
def verify_password(self, password: str, stored_hash: str) -> bool:
try:
scheme, iterations_raw, salt, digest = stored_hash.split("$", 3)
except ValueError:
return False
if scheme != "pbkdf2_sha256":
return False
try:
iterations = int(iterations_raw)
except ValueError:
return False
expected_digest = self._pbkdf2_digest(password=password, salt=salt, iterations=iterations)
return hmac.compare_digest(expected_digest, digest)
def issue_access_token(self, principal: AuthenticatedStaffPrincipal, session_id: int) -> str:
now = datetime.now(timezone.utc)
expires_at = now + timedelta(minutes=self.settings.admin_auth_access_token_ttl_minutes)
payload = {
"sub": str(principal.id),
"sid": session_id,
"email": principal.email,
"role": principal.role.value,
"token_type": "access",
"iss": self.settings.admin_auth_token_issuer,
"iat": int(now.timestamp()),
"exp": int(expires_at.timestamp()),
}
payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")
signature = hmac.new(self.settings.admin_auth_token_secret.encode("utf-8"), payload_bytes, hashlib.sha256).digest()
return f"{self._urlsafe_b64encode(payload_bytes)}.{self._urlsafe_b64encode(signature)}"
def decode_access_token(self, token: str) -> AdminAccessTokenClaims:
try:
encoded_payload, encoded_signature = token.split(".", 1)
except ValueError as exc:
raise ValueError("Invalid token format") from exc
payload_bytes = self._urlsafe_b64decode(encoded_payload)
provided_signature = self._urlsafe_b64decode(encoded_signature)
expected_signature = hmac.new(self.settings.admin_auth_token_secret.encode("utf-8"), payload_bytes, hashlib.sha256).digest()
if not hmac.compare_digest(provided_signature, expected_signature):
raise ValueError("Invalid token signature")
payload = json.loads(payload_bytes.decode("utf-8"))
claims = AdminAccessTokenClaims.model_validate(payload)
if claims.iss != self.settings.admin_auth_token_issuer:
raise ValueError("Invalid token issuer")
if claims.token_type != "access":
raise ValueError("Invalid token type")
if claims.exp < int(datetime.now(timezone.utc).timestamp()):
raise ValueError("Expired token")
return claims
def generate_refresh_token(self) -> str:
return secrets.token_urlsafe(self.settings.admin_auth_refresh_token_bytes)
def hash_refresh_token(self, refresh_token: str) -> str:
return hmac.new(
self.settings.admin_auth_token_secret.encode("utf-8"),
refresh_token.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def build_refresh_token_expiry(self) -> datetime:
return datetime.now(timezone.utc) + timedelta(days=self.settings.admin_auth_refresh_token_ttl_days)
def _pbkdf2_digest(self, password: str, salt: str, iterations: int) -> str:
material = password
if self.settings.admin_auth_password_pepper:
material = f"{material}{self.settings.admin_auth_password_pepper}"
digest = hashlib.pbkdf2_hmac("sha256", material.encode("utf-8"), salt.encode("utf-8"), iterations)
return digest.hex()
@staticmethod
def _urlsafe_b64encode(value: bytes) -> str:
return base64.urlsafe_b64encode(value).decode("ascii").rstrip("=")
@staticmethod
def _urlsafe_b64decode(value: str) -> bytes:
padding = "=" * (-len(value) % 4)
return base64.urlsafe_b64decode(f"{value}{padding}")

@ -16,7 +16,6 @@ class AdminSettings(BaseSettings):
admin_version: str = "0.1.0"
admin_api_prefix: str = ""
# Banco administrativo do runtime interno.
admin_db_host: str = "127.0.0.1"
admin_db_port: int = 3306
admin_db_user: str = "root"
@ -24,6 +23,26 @@ class AdminSettings(BaseSettings):
admin_db_name: str = "orquestrador_admin"
admin_db_cloud_sql_connection_name: str | None = None
admin_auth_password_hash_scheme: str = "pbkdf2_sha256"
admin_auth_password_hash_iterations: int = 390000
admin_auth_password_min_length: int = 12
admin_auth_password_require_uppercase: bool = True
admin_auth_password_require_lowercase: bool = True
admin_auth_password_require_digit: bool = True
admin_auth_password_require_symbol: bool = True
admin_auth_password_pepper: str | None = None
admin_auth_token_secret: str = "local-admin-token-secret-change-me"
admin_auth_token_issuer: str = "orquestrador-admin"
admin_auth_access_token_ttl_minutes: int = 30
admin_auth_refresh_token_ttl_days: int = 7
admin_auth_refresh_token_bytes: int = 32
admin_bootstrap_enabled: bool = False
admin_bootstrap_email: str | None = None
admin_bootstrap_display_name: str | None = None
admin_bootstrap_password: str | None = None
admin_bootstrap_role: str = "admin"
@field_validator("admin_debug", mode="before")
@classmethod
def parse_debug_aliases(cls, value):
@ -35,13 +54,70 @@ class AdminSettings(BaseSettings):
return False
return value
@field_validator("admin_environment", "admin_api_prefix", mode="before")
@field_validator(
"admin_environment",
"admin_api_prefix",
"admin_auth_password_hash_scheme",
"admin_auth_token_secret",
"admin_auth_token_issuer",
"admin_bootstrap_role",
mode="before",
)
@classmethod
def normalize_text_settings(cls, value):
def normalize_required_text_settings(cls, value):
if isinstance(value, str):
return value.strip()
return value
@field_validator(
"admin_bootstrap_email",
"admin_bootstrap_display_name",
"admin_bootstrap_password",
"admin_auth_password_pepper",
mode="before",
)
@classmethod
def normalize_optional_text_settings(cls, value):
if isinstance(value, str):
stripped = value.strip()
return stripped or None
return value
@field_validator("admin_auth_password_min_length")
@classmethod
def validate_password_min_length(cls, value: int) -> int:
if value < 12:
raise ValueError("admin_auth_password_min_length must be >= 12")
return value
@field_validator("admin_auth_password_hash_iterations")
@classmethod
def validate_password_hash_iterations(cls, value: int) -> int:
if value < 100_000:
raise ValueError("admin_auth_password_hash_iterations must be >= 100000")
return value
@field_validator("admin_auth_access_token_ttl_minutes")
@classmethod
def validate_access_token_ttl(cls, value: int) -> int:
if value < 5:
raise ValueError("admin_auth_access_token_ttl_minutes must be >= 5")
return value
@field_validator("admin_auth_refresh_token_ttl_days")
@classmethod
def validate_refresh_token_ttl(cls, value: int) -> int:
if value < 1:
raise ValueError("admin_auth_refresh_token_ttl_days must be >= 1")
return value
@field_validator("admin_auth_refresh_token_bytes")
@classmethod
def validate_refresh_token_bytes(cls, value: int) -> int:
if value < 16:
raise ValueError("admin_auth_refresh_token_bytes must be >= 16")
return value
@lru_cache(maxsize=1)
def get_admin_settings() -> AdminSettings:

@ -1,8 +1,12 @@
from collections.abc import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.orm import Session, declarative_base, sessionmaker
from admin_app.core.settings import get_admin_settings
# monta a conexão do banco administrativo e expõe get_admin_db_session(). Esse generator é o que alimenta as dependências FastAPI para repositórios e serviços.
settings = get_admin_settings()
admin_cloud_sql = settings.admin_db_cloud_sql_connection_name
@ -30,3 +34,11 @@ AdminSessionLocal = sessionmaker(
)
AdminBase = declarative_base()
def get_admin_db_session() -> Generator[Session, None, None]:
db = AdminSessionLocal()
try:
yield db
finally:
db.close()

@ -1,3 +1,6 @@
from admin_app.db.models.audit_log import AuditLog
from admin_app.db.models.base import AdminTimestampedModel
from admin_app.db.models.staff_account import StaffAccount
from admin_app.db.models.staff_session import StaffSession
__all__ = ["AdminTimestampedModel"]
__all__ = ["AdminTimestampedModel", "AuditLog", "StaffAccount", "StaffSession"]

@ -0,0 +1,26 @@
from __future__ import annotations
from sqlalchemy import JSON, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from admin_app.db.models.base import AdminTimestampedModel
class AuditLog(AdminTimestampedModel):
__tablename__ = "admin_audit_logs"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_staff_account_id: Mapped[int | None] = mapped_column(
Integer,
ForeignKey("staff_accounts.id"),
nullable=True,
index=True,
)
event_type: Mapped[str] = mapped_column(String(80), nullable=False, index=True)
resource_type: Mapped[str] = mapped_column(String(80), nullable=False, index=True)
resource_id: Mapped[str | None] = mapped_column(String(120), nullable=True, index=True)
outcome: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
message: Mapped[str | None] = mapped_column(Text, nullable=True)
payload_json: Mapped[dict | None] = mapped_column(JSON, nullable=True)
ip_address: Mapped[str | None] = mapped_column(String(64), nullable=True)
user_agent: Mapped[str | None] = mapped_column(String(512), nullable=True)

@ -0,0 +1,36 @@
from __future__ import annotations
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Enum as SAEnum, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from admin_app.db.models.base import AdminTimestampedModel
from shared.contracts import StaffRole
# Modelo da conta administrativa
# Ele representa o usuário interno do painel
def _staff_role_values(enum_cls) -> list[str]:
return [role.value for role in enum_cls]
class StaffAccount(AdminTimestampedModel):
__tablename__ = "staff_accounts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
display_name: Mapped[str] = mapped_column(String(150), nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
role: Mapped[StaffRole] = mapped_column(
SAEnum(
StaffRole,
native_enum=False,
values_callable=_staff_role_values,
length=32,
),
nullable=False,
default=StaffRole.VIEWER,
)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
last_login_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)

@ -0,0 +1,31 @@
from __future__ import annotations
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from admin_app.db.models.base import AdminTimestampedModel
class StaffSession(AdminTimestampedModel):
__tablename__ = "staff_sessions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
staff_account_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("staff_accounts.id"),
nullable=False,
index=True,
)
refresh_token_hash: Mapped[str] = mapped_column(
String(255),
unique=True,
index=True,
nullable=False,
)
expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
last_used_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
revoked_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
ip_address: Mapped[str | None] = mapped_column(String(64), nullable=True)
user_agent: Mapped[str | None] = mapped_column(String(512), nullable=True)

@ -1,3 +1,11 @@
from admin_app.repositories.audit_log_repository import AuditLogRepository
from admin_app.repositories.base_repository import BaseRepository
from admin_app.repositories.staff_account_repository import StaffAccountRepository
from admin_app.repositories.staff_session_repository import StaffSessionRepository
__all__ = ["BaseRepository"]
__all__ = [
"AuditLogRepository",
"BaseRepository",
"StaffAccountRepository",
"StaffSessionRepository",
]

@ -0,0 +1,39 @@
from sqlalchemy import desc, select
from admin_app.db.models import AuditLog
from admin_app.repositories.base_repository import BaseRepository
class AuditLogRepository(BaseRepository):
def create(
self,
*,
actor_staff_account_id: int | None,
event_type: str,
resource_type: str,
resource_id: str | None,
outcome: str,
message: str | None,
payload_json: dict | None,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
audit_log = AuditLog(
actor_staff_account_id=actor_staff_account_id,
event_type=event_type,
resource_type=resource_type,
resource_id=resource_id,
outcome=outcome,
message=message,
payload_json=payload_json,
ip_address=ip_address,
user_agent=user_agent,
)
self.db.add(audit_log)
self.db.commit()
self.db.refresh(audit_log)
return audit_log
def list_recent(self, limit: int = 50) -> list[AuditLog]:
statement = select(AuditLog).order_by(desc(AuditLog.created_at)).limit(limit)
return list(self.db.execute(statement).scalars().all())

@ -0,0 +1,22 @@
from sqlalchemy import select
from sqlalchemy.orm import Session
from admin_app.db.models import StaffAccount
from admin_app.repositories.base_repository import BaseRepository
# encapsula o acesso a StaffAccount (busca por e-mail e id)
class StaffAccountRepository(BaseRepository):
def get_by_email(self, email: str) -> StaffAccount | None:
statement = select(StaffAccount).where(StaffAccount.email == email)
return self.db.execute(statement).scalar_one_or_none()
def get_by_id(self, staff_account_id: int) -> StaffAccount | None:
statement = select(StaffAccount).where(StaffAccount.id == staff_account_id)
return self.db.execute(statement).scalar_one_or_none()
def update_last_login(self, staff_account: StaffAccount) -> StaffAccount:
self.db.add(staff_account)
self.db.commit()
self.db.refresh(staff_account)
return staff_account

@ -0,0 +1,45 @@
from datetime import datetime
from sqlalchemy import select
from admin_app.db.models import StaffSession
from admin_app.repositories.base_repository import BaseRepository
class StaffSessionRepository(BaseRepository):
def create(
self,
*,
staff_account_id: int,
refresh_token_hash: str,
expires_at: datetime,
ip_address: str | None,
user_agent: str | None,
) -> StaffSession:
session = StaffSession(
staff_account_id=staff_account_id,
refresh_token_hash=refresh_token_hash,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent,
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
return session
def get_by_id(self, session_id: int) -> StaffSession | None:
statement = select(StaffSession).where(StaffSession.id == session_id)
return self.db.execute(statement).scalar_one_or_none()
def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None:
statement = select(StaffSession).where(
StaffSession.refresh_token_hash == refresh_token_hash
)
return self.db.execute(statement).scalar_one_or_none()
def save(self, staff_session: StaffSession) -> StaffSession:
self.db.add(staff_session)
self.db.commit()
self.db.refresh(staff_session)
return staff_session

@ -1,3 +1,15 @@
from admin_app.services.audit_service import (
AdminAuditEventType,
AdminAuditOutcome,
AuditService,
)
from admin_app.services.auth_service import AuthService
from admin_app.services.system_service import SystemService
__all__ = ["SystemService"]
__all__ = [
"AdminAuditEventType",
"AdminAuditOutcome",
"AuditService",
"AuthService",
"SystemService",
]

@ -0,0 +1,153 @@
from enum import Enum
from admin_app.db.models import AuditLog
from admin_app.repositories import AuditLogRepository
class AdminAuditEventType(str, Enum):
STAFF_LOGIN_SUCCEEDED = "staff.login.succeeded"
STAFF_LOGIN_FAILED = "staff.login.failed"
STAFF_LOGOUT_SUCCEEDED = "staff.logout.succeeded"
TOOL_APPROVAL_RECORDED = "tool.approval.recorded"
TOOL_PUBLICATION_RECORDED = "tool.publication.recorded"
class AdminAuditOutcome(str, Enum):
SUCCESS = "success"
FAILED = "failed"
class AuditService:
def __init__(self, repository: AuditLogRepository):
self.repository = repository
def record_event(
self,
*,
actor_staff_account_id: int | None,
event_type: AdminAuditEventType,
resource_type: str,
resource_id: str | None,
outcome: AdminAuditOutcome,
message: str | None,
payload_json: dict | None,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.repository.create(
actor_staff_account_id=actor_staff_account_id,
event_type=event_type.value,
resource_type=resource_type,
resource_id=resource_id,
outcome=outcome.value,
message=message,
payload_json=payload_json,
ip_address=ip_address,
user_agent=user_agent,
)
def record_login_succeeded(
self,
*,
actor_staff_account_id: int,
session_id: int,
email: str,
role: str,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.record_event(
actor_staff_account_id=actor_staff_account_id,
event_type=AdminAuditEventType.STAFF_LOGIN_SUCCEEDED,
resource_type="staff_account",
resource_id=str(actor_staff_account_id),
outcome=AdminAuditOutcome.SUCCESS,
message="Login administrativo concluido.",
payload_json={"session_id": session_id, "email": email, "role": role},
ip_address=ip_address,
user_agent=user_agent,
)
def record_login_failed(
self,
*,
email: str,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.record_event(
actor_staff_account_id=None,
event_type=AdminAuditEventType.STAFF_LOGIN_FAILED,
resource_type="staff_account",
resource_id=email,
outcome=AdminAuditOutcome.FAILED,
message="Tentativa de login administrativo falhou.",
payload_json={"email": email},
ip_address=ip_address,
user_agent=user_agent,
)
def record_logout_succeeded(
self,
*,
actor_staff_account_id: int,
session_id: int,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.record_event(
actor_staff_account_id=actor_staff_account_id,
event_type=AdminAuditEventType.STAFF_LOGOUT_SUCCEEDED,
resource_type="staff_session",
resource_id=str(session_id),
outcome=AdminAuditOutcome.SUCCESS,
message="Sessao administrativa encerrada.",
payload_json={"session_id": session_id},
ip_address=ip_address,
user_agent=user_agent,
)
def record_tool_approval(
self,
*,
actor_staff_account_id: int,
tool_name: str,
tool_version: int,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.record_event(
actor_staff_account_id=actor_staff_account_id,
event_type=AdminAuditEventType.TOOL_APPROVAL_RECORDED,
resource_type="tool_publication",
resource_id=tool_name,
outcome=AdminAuditOutcome.SUCCESS,
message="Aprovacao de tool registrada.",
payload_json={"tool_name": tool_name, "tool_version": tool_version},
ip_address=ip_address,
user_agent=user_agent,
)
def record_tool_publication(
self,
*,
actor_staff_account_id: int,
tool_name: str,
tool_version: int,
ip_address: str | None,
user_agent: str | None,
) -> AuditLog:
return self.record_event(
actor_staff_account_id=actor_staff_account_id,
event_type=AdminAuditEventType.TOOL_PUBLICATION_RECORDED,
resource_type="tool_publication",
resource_id=tool_name,
outcome=AdminAuditOutcome.SUCCESS,
message="Publicacao de tool registrada.",
payload_json={"tool_name": tool_name, "tool_version": tool_version},
ip_address=ip_address,
user_agent=user_agent,
)
def list_recent(self, limit: int = 50) -> list[AuditLog]:
return self.repository.list_recent(limit=limit)

@ -0,0 +1,210 @@
from datetime import datetime, timezone
from admin_app.core import (
AdminAuthenticatedSession,
AdminSecurityService,
AuthenticatedStaffContext,
AuthenticatedStaffPrincipal,
)
from admin_app.db.models import StaffAccount, StaffSession
from admin_app.repositories import StaffAccountRepository, StaffSessionRepository
from admin_app.services.audit_service import AuditService
class AuthService:
def __init__(
self,
account_repository: StaffAccountRepository,
session_repository: StaffSessionRepository,
security_service: AdminSecurityService,
audit_service: AuditService,
):
self.account_repository = account_repository
self.session_repository = session_repository
self.security_service = security_service
self.audit_service = audit_service
def login(
self,
email: str,
password: str,
*,
ip_address: str | None,
user_agent: str | None,
) -> AdminAuthenticatedSession | None:
normalized_email = self.normalize_email(email)
account = self.authenticate_account(email=normalized_email, password=password)
if account is None:
self.audit_service.record_login_failed(
email=normalized_email,
ip_address=ip_address,
user_agent=user_agent,
)
return None
session = self._create_authenticated_session(
account,
ip_address=ip_address,
user_agent=user_agent,
)
self.audit_service.record_login_succeeded(
actor_staff_account_id=account.id,
session_id=session.session_id,
email=account.email,
role=account.role.value,
ip_address=ip_address,
user_agent=user_agent,
)
return session
def refresh_session(
self,
refresh_token: str,
*,
ip_address: str | None,
user_agent: str | None,
) -> AdminAuthenticatedSession | None:
token_hash = self.security_service.hash_refresh_token(refresh_token)
staff_session = self.session_repository.get_by_refresh_token_hash(token_hash)
if not self._is_session_active(staff_session):
return None
account = self.account_repository.get_by_id(staff_session.staff_account_id)
if account is None or not account.is_active:
return None
return self._rotate_session(
staff_session,
account,
ip_address=ip_address,
user_agent=user_agent,
)
def logout(
self,
session_id: int,
*,
actor_staff_account_id: int | None,
ip_address: str | None,
user_agent: str | None,
) -> bool:
staff_session = self.session_repository.get_by_id(session_id)
if staff_session is None:
return False
if staff_session.revoked_at is None:
staff_session.revoked_at = datetime.now(timezone.utc)
self.session_repository.save(staff_session)
if actor_staff_account_id is not None:
self.audit_service.record_logout_succeeded(
actor_staff_account_id=actor_staff_account_id,
session_id=session_id,
ip_address=ip_address,
user_agent=user_agent,
)
return True
def get_authenticated_context(self, access_token: str) -> AuthenticatedStaffContext:
claims = self.security_service.decode_access_token(access_token)
staff_session = self.session_repository.get_by_id(claims.sid)
if not self._is_session_active(staff_session):
raise ValueError("Administrative session is not available")
account = self.account_repository.get_by_id(int(claims.sub))
if account is None or not account.is_active:
raise ValueError("Staff account is not available")
if self.normalize_email(account.email) != self.normalize_email(claims.email):
raise ValueError("Token principal mismatch")
return AuthenticatedStaffContext(
principal=self.build_principal(account),
session_id=staff_session.id,
)
def authenticate_account(self, email: str, password: str) -> StaffAccount | None:
normalized_email = self.normalize_email(email)
account = self.account_repository.get_by_email(normalized_email)
if account is None or not account.is_active:
return None
if not self.security_service.verify_password(password, account.password_hash):
return None
account.last_login_at = datetime.now(timezone.utc)
return self.account_repository.update_last_login(account)
@staticmethod
def normalize_email(email: str) -> str:
return email.strip().lower()
@staticmethod
def build_principal(account: StaffAccount) -> AuthenticatedStaffPrincipal:
return AuthenticatedStaffPrincipal(
id=account.id,
email=account.email,
display_name=account.display_name,
role=account.role,
is_active=account.is_active,
)
def _create_authenticated_session(
self,
account: StaffAccount,
*,
ip_address: str | None,
user_agent: str | None,
) -> AdminAuthenticatedSession:
refresh_token = self.security_service.generate_refresh_token()
refresh_token_hash = self.security_service.hash_refresh_token(refresh_token)
expires_at = self.security_service.build_refresh_token_expiry()
staff_session = self.session_repository.create(
staff_account_id=account.id,
refresh_token_hash=refresh_token_hash,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent,
)
principal = self.build_principal(account)
access_token = self.security_service.issue_access_token(principal, staff_session.id)
return AdminAuthenticatedSession(
session_id=staff_session.id,
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in_seconds=self.security_service.settings.admin_auth_access_token_ttl_minutes * 60,
principal=principal,
)
def _rotate_session(
self,
staff_session: StaffSession,
account: StaffAccount,
*,
ip_address: str | None,
user_agent: str | None,
) -> AdminAuthenticatedSession:
refresh_token = self.security_service.generate_refresh_token()
staff_session.refresh_token_hash = self.security_service.hash_refresh_token(refresh_token)
staff_session.expires_at = self.security_service.build_refresh_token_expiry()
staff_session.last_used_at = datetime.now(timezone.utc)
staff_session.ip_address = ip_address
staff_session.user_agent = user_agent
persisted_session = self.session_repository.save(staff_session)
principal = self.build_principal(account)
access_token = self.security_service.issue_access_token(principal, persisted_session.id)
return AdminAuthenticatedSession(
session_id=persisted_session.id,
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in_seconds=self.security_service.settings.admin_auth_access_token_ttl_minutes * 60,
principal=principal,
)
@staticmethod
def _is_session_active(staff_session: StaffSession | None) -> bool:
if staff_session is None:
return False
if staff_session.revoked_at is not None:
return False
return staff_session.expires_at >= datetime.now(timezone.utc)

@ -0,0 +1,164 @@
# Estrategia De Credenciais Para StaffAccount
Este documento define a estrategia inicial de credenciais para contas administrativas internas (`StaffAccount`).
## Objetivo
Permitir autenticacao web no `orquestrador-admin` sem misturar a identidade do painel com o `User` do atendimento.
## O que e o StaffAccount
`StaffAccount` e a conta administrativa interna do sistema.
Ela representa um funcionario ou administrador da empresa e existe apenas no dominio administrativo.
Nao deve ser usada para identificar cliente do atendimento.
Responsabilidades principais do `StaffAccount`:
- autenticar acesso ao painel administrativo
- carregar papel de autorizacao (`viewer`, `staff`, `admin`)
- auditar quem executou uma acao interna
- governar drafts, configuracoes, relatorios e publicacao de tools
## O que e a StaffSession
`StaffSession` representa uma sessao administrativa ativa derivada de um login do `StaffAccount`.
Cada sessao possui:
- `session_id`
- `refresh_token_hash`
- data de expiracao
- marcador de revogacao
- metadados simples de IP e user-agent
Isso permite tratar login, refresh e logout sem depender apenas de um access token solto.
## Identificador de login
O identificador principal de login sera:
- `email`
Regras:
- deve ser unico no banco administrativo
- deve ser normalizado em lowercase na camada de servico e repositorio
- nao sera compartilhado com a identidade do atendimento
## Segredo primario
O segredo primario inicial sera:
- senha gerenciada internamente
A senha nunca deve ser armazenada em texto puro.
## Estrategia de hash
A estrategia inicial definida para `password_hash` e:
- esquema: `pbkdf2_sha256`
- iteracoes padrao: `390000`
- salt aleatorio por conta
- pepper opcional via ambiente: `ADMIN_AUTH_PASSWORD_PEPPER`
Formato de persistencia adotado:
- `pbkdf2_sha256$<iterations>$<salt>$<digest>`
## Politica inicial de senha
Politica minima definida:
- tamanho minimo: `12`
- exigir letra maiuscula
- exigir letra minuscula
- exigir digito
- exigir simbolo
A validacao dessa politica ja esta refletida no runtime administrativo.
## Tokens e sessao
Estrategia inicial para a autenticacao web:
- access token curto: `30` minutos
- refresh token: `7` dias
- secret de assinatura dedicado: `ADMIN_AUTH_TOKEN_SECRET`
- issuer do admin runtime: `ADMIN_AUTH_TOKEN_ISSUER`
- tamanho configuravel do refresh token: `ADMIN_AUTH_REFRESH_TOKEN_BYTES`
Nesta etapa, o runtime administrativo ja faz:
- emissao de access token assinado por HMAC-SHA256
- emissao de refresh token opaco
- armazenamento hash do refresh token em `StaffSession`
- rotacao do refresh token no endpoint de refresh
- revogacao da sessao no logout
## Bootstrap do primeiro admin
O primeiro admin deve nascer por fluxo controlado, nunca por startup automatico.
Variaveis previstas:
- `ADMIN_BOOTSTRAP_ENABLED`
- `ADMIN_BOOTSTRAP_EMAIL`
- `ADMIN_BOOTSTRAP_DISPLAY_NAME`
- `ADMIN_BOOTSTRAP_PASSWORD`
- `ADMIN_BOOTSTRAP_ROLE`
Regras:
- o bootstrap deve ser executado por comando explicito no futuro
- nao deve criar conta automaticamente ao subir o servico
- o papel padrao do bootstrap e `admin`
## Relacao com papeis e permissoes
A conta `StaffAccount` continua acoplada a hierarquia compartilhada:
- `viewer`
- `staff`
- `admin`
A senha autentica a identidade; o `role` governa autorizacao.
## Configuracoes adicionadas ao admin runtime
As configuracoes de credenciais passam a existir em `admin_app/core/settings.py`:
- `admin_auth_password_hash_scheme`
- `admin_auth_password_hash_iterations`
- `admin_auth_password_min_length`
- `admin_auth_password_require_uppercase`
- `admin_auth_password_require_lowercase`
- `admin_auth_password_require_digit`
- `admin_auth_password_require_symbol`
- `admin_auth_password_pepper`
- `admin_auth_token_secret`
- `admin_auth_token_issuer`
- `admin_auth_access_token_ttl_minutes`
- `admin_auth_refresh_token_ttl_days`
- `admin_auth_refresh_token_bytes`
- `admin_bootstrap_enabled`
- `admin_bootstrap_email`
- `admin_bootstrap_display_name`
- `admin_bootstrap_password`
- `admin_bootstrap_role`
## Endpoints iniciais de autenticacao
Nesta etapa, o `orquestrador-admin` passa a expor:
- `POST /auth/login`
- `POST /auth/refresh`
- `POST /auth/logout`
- `GET /auth/me`
## Proximos passos naturais
- implementar autorizacao por papel nas demais rotas administrativas
- implementar fluxo explicito de bootstrap do primeiro admin
- implementar gestao de sessoes revogaveis por tela administrativa

@ -64,10 +64,10 @@ admin_app/
# reports.py
core/
settings.py
# security.py
security.py
db/
models/
# staff_account.py
staff_account.py
# tool_draft.py
# tool_generation_job.py
# tool_publication.py
@ -145,3 +145,5 @@ Quando o admin ganhar runtime real, o deploy vai evoluir para dois servicos dist
Sem mover o runtime atual de `app/` nesta etapa.

@ -92,3 +92,4 @@ Ele cobre:
- criar a entidade `StaffAccount`
- plugar a role do usuario interno ao contrato compartilhado
- modelar a persistencia de drafts/publicacoes de tool

@ -81,8 +81,6 @@ def role_includes(role: StaffRole | str, minimum_role: StaffRole | str) -> bool:
return _ROLE_HIERARCHY[normalized_role] >= _ROLE_HIERARCHY[normalized_minimum]
def role_has_permission(
role: StaffRole | str, permission: AdminPermission | str
) -> bool:
def role_has_permission(role: StaffRole | str, permission: AdminPermission | str) -> bool:
normalized_permission = normalize_admin_permission(permission)
return normalized_permission in permissions_for_role(role)

@ -2,8 +2,10 @@ import unittest
from fastapi.testclient import TestClient
from admin_app.api.dependencies import get_current_staff_principal
from admin_app.app_factory import create_app
from admin_app.core.settings import AdminSettings
from shared.contracts import StaffRole
class AdminAppBootstrapTests(unittest.TestCase):
@ -45,6 +47,17 @@ class AdminAppBootstrapTests(unittest.TestCase):
admin_debug=True,
)
app = create_app(settings)
app.dependency_overrides[get_current_staff_principal] = lambda: type(
"Principal",
(),
{
"id": 1,
"email": "viewer@empresa.com",
"display_name": "Viewer",
"role": StaffRole.VIEWER,
"is_active": True,
},
)()
client = TestClient(app)
response = client.get("/admin/system/info")

@ -0,0 +1,26 @@
import unittest
from admin_app.db.models import AuditLog
class AuditLogModelTests(unittest.TestCase):
def test_audit_log_declares_expected_table_and_columns(self):
self.assertEqual(AuditLog.__tablename__, "admin_audit_logs")
self.assertIn("actor_staff_account_id", AuditLog.__table__.columns)
self.assertIn("event_type", AuditLog.__table__.columns)
self.assertIn("resource_type", AuditLog.__table__.columns)
self.assertIn("resource_id", AuditLog.__table__.columns)
self.assertIn("outcome", AuditLog.__table__.columns)
self.assertIn("message", AuditLog.__table__.columns)
self.assertIn("payload_json", AuditLog.__table__.columns)
self.assertIn("ip_address", AuditLog.__table__.columns)
self.assertIn("user_agent", AuditLog.__table__.columns)
def test_audit_log_uses_staff_account_foreign_key(self):
foreign_keys = list(AuditLog.__table__.columns["actor_staff_account_id"].foreign_keys)
self.assertEqual(len(foreign_keys), 1)
self.assertEqual(str(foreign_keys[0].target_fullname), "staff_accounts.id")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,76 @@
import unittest
from admin_app.db.models import AuditLog
from admin_app.services import AuditService
class _FakeAuditLogRepository:
def __init__(self):
self.entries: list[AuditLog] = []
self._next_id = 1
def create(self, **kwargs) -> AuditLog:
audit_log = AuditLog(id=self._next_id, **kwargs)
self.entries.append(audit_log)
self._next_id += 1
return audit_log
def list_recent(self, limit: int = 50) -> list[AuditLog]:
return list(reversed(self.entries))[:limit]
class AdminAuditServiceTests(unittest.TestCase):
def setUp(self):
self.repository = _FakeAuditLogRepository()
self.service = AuditService(self.repository)
def test_record_tool_approval_keeps_actor_and_version_metadata(self):
audit_entry = self.service.record_tool_approval(
actor_staff_account_id=7,
tool_name="consultar_clientes_vip",
tool_version=3,
ip_address="127.0.0.1",
user_agent="pytest",
)
self.assertEqual(audit_entry.event_type, "tool.approval.recorded")
self.assertEqual(audit_entry.resource_id, "consultar_clientes_vip")
self.assertEqual(audit_entry.payload_json["tool_version"], 3)
self.assertEqual(audit_entry.actor_staff_account_id, 7)
def test_record_tool_publication_keeps_actor_and_version_metadata(self):
audit_entry = self.service.record_tool_publication(
actor_staff_account_id=9,
tool_name="emitir_relatorio_receita",
tool_version=5,
ip_address="127.0.0.1",
user_agent="pytest",
)
self.assertEqual(audit_entry.event_type, "tool.publication.recorded")
self.assertEqual(audit_entry.resource_id, "emitir_relatorio_receita")
self.assertEqual(audit_entry.payload_json["tool_version"], 5)
self.assertEqual(audit_entry.actor_staff_account_id, 9)
def test_list_recent_returns_newest_first(self):
self.service.record_login_failed(
email="viewer@empresa.com",
ip_address="127.0.0.1",
user_agent="pytest",
)
self.service.record_tool_publication(
actor_staff_account_id=1,
tool_name="publicar_x",
tool_version=1,
ip_address="127.0.0.1",
user_agent="pytest",
)
recent = self.service.list_recent(limit=2)
self.assertEqual(recent[0].event_type, "tool.publication.recorded")
self.assertEqual(recent[1].event_type, "staff.login.failed")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,206 @@
import unittest
from datetime import datetime, timedelta, timezone
from admin_app.core import AdminSecurityService, AdminSettings
from admin_app.db.models import AuditLog, StaffAccount, StaffSession
from admin_app.services import AuditService
from admin_app.services.auth_service import AuthService
from shared.contracts import StaffRole
class _FakeStaffAccountRepository:
def __init__(self, account: StaffAccount | None):
self.account = account
def get_by_email(self, email: str) -> StaffAccount | None:
if self.account and self.account.email == email:
return self.account
return None
def get_by_id(self, staff_account_id: int) -> StaffAccount | None:
if self.account and self.account.id == staff_account_id:
return self.account
return None
def update_last_login(self, staff_account: StaffAccount) -> StaffAccount:
self.account = staff_account
return staff_account
class _FakeStaffSessionRepository:
def __init__(self):
self.sessions: dict[int, StaffSession] = {}
self._next_id = 1
def create(self, *, staff_account_id: int, refresh_token_hash: str, expires_at: datetime, ip_address: str | None, user_agent: str | None) -> StaffSession:
session = StaffSession(
id=self._next_id,
staff_account_id=staff_account_id,
refresh_token_hash=refresh_token_hash,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent,
)
self.sessions[session.id] = session
self._next_id += 1
return session
def get_by_id(self, session_id: int) -> StaffSession | None:
return self.sessions.get(session_id)
def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None:
for session in self.sessions.values():
if session.refresh_token_hash == refresh_token_hash:
return session
return None
def save(self, staff_session: StaffSession) -> StaffSession:
self.sessions[staff_session.id] = staff_session
return staff_session
class _FakeAuditLogRepository:
def __init__(self):
self.entries: list[AuditLog] = []
self._next_id = 1
def create(self, **kwargs) -> AuditLog:
audit_log = AuditLog(id=self._next_id, **kwargs)
self.entries.append(audit_log)
self._next_id += 1
return audit_log
def list_recent(self, limit: int = 50) -> list[AuditLog]:
return list(reversed(self.entries))[:limit]
class AdminAuthServiceTests(unittest.TestCase):
def setUp(self):
self.security_service = AdminSecurityService(
AdminSettings(
admin_auth_token_secret="test-secret",
admin_auth_password_pepper="pepper",
)
)
self.account = StaffAccount(
id=1,
email="admin@empresa.com",
display_name="Administrador",
password_hash=self.security_service.hash_password("SenhaMuitoSegura!123"),
role=StaffRole.ADMIN,
is_active=True,
)
self.account_repository = _FakeStaffAccountRepository(self.account)
self.session_repository = _FakeStaffSessionRepository()
self.audit_repository = _FakeAuditLogRepository()
self.audit_service = AuditService(self.audit_repository)
self.auth_service = AuthService(
account_repository=self.account_repository,
session_repository=self.session_repository,
security_service=self.security_service,
audit_service=self.audit_service,
)
def test_login_creates_authenticated_session_with_refresh_token(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertIsNotNone(session)
self.assertEqual(session.session_id, 1)
self.assertTrue(session.access_token)
self.assertTrue(session.refresh_token)
self.assertEqual(session.principal.role, StaffRole.ADMIN)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.succeeded")
def test_login_failure_creates_audit_entry(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="senha-incorreta",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertIsNone(session)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.failed")
self.assertEqual(self.audit_repository.entries[-1].outcome, "failed")
def test_refresh_session_rotates_refresh_token(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
refreshed = self.auth_service.refresh_session(
refresh_token=session.refresh_token,
ip_address="127.0.0.1",
user_agent="unittest-refresh",
)
self.assertIsNotNone(refreshed)
self.assertEqual(refreshed.session_id, session.session_id)
self.assertNotEqual(refreshed.refresh_token, session.refresh_token)
def test_logout_revokes_session_and_creates_audit_entry(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertTrue(
self.auth_service.logout(
session.session_id,
actor_staff_account_id=self.account.id,
ip_address="127.0.0.1",
user_agent="unittest",
)
)
self.assertIsNotNone(self.session_repository.get_by_id(session.session_id).revoked_at)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.logout.succeeded")
def test_get_authenticated_context_rejects_revoked_session(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.auth_service.logout(
session.session_id,
actor_staff_account_id=self.account.id,
ip_address="127.0.0.1",
user_agent="unittest",
)
with self.assertRaises(ValueError):
self.auth_service.get_authenticated_context(session.access_token)
def test_refresh_session_rejects_expired_session(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
stored_session = self.session_repository.get_by_id(session.session_id)
stored_session.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1)
self.session_repository.save(stored_session)
refreshed = self.auth_service.refresh_session(
refresh_token=session.refresh_token,
ip_address="127.0.0.1",
user_agent="unittest-refresh",
)
self.assertIsNone(refreshed)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,150 @@
import unittest
from fastapi.testclient import TestClient
from admin_app.api.dependencies import get_auth_service, get_current_staff_context, get_current_staff_principal
from admin_app.app_factory import create_app
from admin_app.core import (
AdminAuthenticatedSession,
AdminSettings,
AuthenticatedStaffContext,
AuthenticatedStaffPrincipal,
)
from shared.contracts import StaffRole
class _FakeAuthService:
def login(self, email: str, password: str, *, ip_address: str | None, user_agent: str | None):
if email == "admin@empresa.com" and password == "SenhaMuitoSegura!123":
principal = AuthenticatedStaffPrincipal(
id=1,
email="admin@empresa.com",
display_name="Administrador",
role=StaffRole.ADMIN,
is_active=True,
)
return AdminAuthenticatedSession(
session_id=77,
access_token="token-abc",
refresh_token="refresh-abc",
token_type="bearer",
expires_in_seconds=1800,
principal=principal,
)
return None
def refresh_session(self, refresh_token: str, *, ip_address: str | None, user_agent: str | None):
if refresh_token == "refresh-abc":
principal = AuthenticatedStaffPrincipal(
id=1,
email="admin@empresa.com",
display_name="Administrador",
role=StaffRole.ADMIN,
is_active=True,
)
return AdminAuthenticatedSession(
session_id=77,
access_token="token-new",
refresh_token="refresh-new",
token_type="bearer",
expires_in_seconds=1800,
principal=principal,
)
return None
def logout(
self,
session_id: int,
*,
actor_staff_account_id: int | None,
ip_address: str | None,
user_agent: str | None,
) -> bool:
return session_id == 77 and actor_staff_account_id == 1
class AdminAuthWebTests(unittest.TestCase):
def setUp(self):
app = create_app(AdminSettings(admin_auth_token_secret="test-secret"))
app.dependency_overrides[get_auth_service] = lambda: _FakeAuthService()
app.dependency_overrides[get_current_staff_principal] = lambda: AuthenticatedStaffPrincipal(
id=1,
email="admin@empresa.com",
display_name="Administrador",
role=StaffRole.ADMIN,
is_active=True,
)
app.dependency_overrides[get_current_staff_context] = lambda: AuthenticatedStaffContext(
principal=AuthenticatedStaffPrincipal(
id=1,
email="admin@empresa.com",
display_name="Administrador",
role=StaffRole.ADMIN,
is_active=True,
),
session_id=77,
)
self.client = TestClient(app)
self.app = app
def tearDown(self):
self.app.dependency_overrides.clear()
def test_login_returns_tokens_and_staff_account(self):
response = self.client.post(
"/auth/login",
json={"email": "admin@empresa.com", "password": "SenhaMuitoSegura!123"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["session_id"], 77)
self.assertEqual(response.json()["token_type"], "bearer")
self.assertEqual(response.json()["refresh_token"], "refresh-abc")
self.assertEqual(response.json()["staff_account"]["role"], "admin")
def test_refresh_returns_rotated_tokens(self):
response = self.client.post(
"/auth/refresh",
json={"refresh_token": "refresh-abc"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["access_token"], "token-new")
self.assertEqual(response.json()["refresh_token"], "refresh-new")
def test_refresh_rejects_invalid_token(self):
response = self.client.post(
"/auth/refresh",
json={"refresh_token": "refresh-invalido"},
)
self.assertEqual(response.status_code, 401)
self.assertEqual(response.json()["detail"], "Refresh token administrativo invalido.")
def test_logout_revokes_current_session(self):
response = self.client.post(
"/auth/logout",
headers={"Authorization": "Bearer token-abc", "User-Agent": "pytest"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["session_id"], 77)
self.assertEqual(response.json()["status"], "ok")
def test_me_returns_authenticated_staff_account(self):
response = self.client.get("/auth/me", headers={"Authorization": "Bearer token-abc"})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["email"], "admin@empresa.com")
self.assertEqual(response.json()["role"], "admin")
def test_system_access_returns_permissions_for_authenticated_staff(self):
response = self.client.get("/system/access", headers={"Authorization": "Bearer token-abc"})
self.assertEqual(response.status_code, 200)
self.assertIn("manage_settings", response.json()["permissions"])
self.assertEqual(response.json()["staff_account"]["role"], "admin")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,100 @@
import unittest
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from admin_app.api.dependencies import get_audit_service, get_current_staff_principal
from admin_app.app_factory import create_app
from admin_app.core import AuthenticatedStaffPrincipal, AdminSettings
from admin_app.db.models import AuditLog
from shared.contracts import StaffRole
class _FakeAuditService:
def list_recent(self, limit: int = 50) -> list[AuditLog]:
return [
AuditLog(
id=1,
actor_staff_account_id=10,
event_type="staff.login.succeeded",
resource_type="staff_account",
resource_id="10",
outcome="success",
message="Login administrativo concluido.",
payload_json={"session_id": 77},
ip_address="127.0.0.1",
user_agent="pytest",
created_at=datetime(2026, 3, 26, 12, 0, tzinfo=timezone.utc),
)
]
class AdminAuthorizationWebTests(unittest.TestCase):
def _build_client_with_role(self, role: StaffRole) -> tuple[TestClient, object]:
app = create_app(AdminSettings(admin_auth_token_secret="test-secret"))
app.dependency_overrides[get_current_staff_principal] = lambda: AuthenticatedStaffPrincipal(
id=10,
email="staff@empresa.com",
display_name="Equipe Interna",
role=role,
is_active=True,
)
app.dependency_overrides[get_audit_service] = lambda: _FakeAuditService()
return TestClient(app), app
def test_system_info_requires_authentication(self):
app = create_app(AdminSettings(admin_auth_token_secret="test-secret"))
client = TestClient(app)
response = client.get("/system/info")
self.assertEqual(response.status_code, 401)
self.assertEqual(response.json()["detail"], "Autenticacao administrativa obrigatoria.")
def test_viewer_can_access_system_info(self):
client, app = self._build_client_with_role(StaffRole.VIEWER)
try:
response = client.get("/system/info", headers={"Authorization": "Bearer token"})
finally:
app.dependency_overrides.clear()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["service"], "orquestrador-admin")
def test_viewer_can_access_audit_events(self):
client, app = self._build_client_with_role(StaffRole.VIEWER)
try:
response = client.get("/audit/events", headers={"Authorization": "Bearer token"})
finally:
app.dependency_overrides.clear()
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["events"][0]["event_type"], "staff.login.succeeded")
def test_staff_cannot_access_admin_only_capability(self):
client, app = self._build_client_with_role(StaffRole.STAFF)
try:
response = client.get("/system/admin-capabilities", headers={"Authorization": "Bearer token"})
finally:
app.dependency_overrides.clear()
self.assertEqual(response.status_code, 403)
self.assertEqual(
response.json()["detail"],
"Permissao administrativa insuficiente: 'manage_settings'.",
)
def test_admin_can_access_admin_only_capability(self):
client, app = self._build_client_with_role(StaffRole.ADMIN)
try:
response = client.get("/system/admin-capabilities", headers={"Authorization": "Bearer token"})
finally:
app.dependency_overrides.clear()
self.assertEqual(response.status_code, 200)
self.assertTrue(response.json()["allowed"])
self.assertEqual(response.json()["role"], "admin")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,62 @@
import unittest
from pydantic import ValidationError
from admin_app.core.security import AdminSecurityService
from admin_app.core.settings import AdminSettings
class AdminCredentialStrategyTests(unittest.TestCase):
def test_admin_settings_expose_secure_defaults_for_credentials(self):
settings = AdminSettings()
self.assertEqual(settings.admin_auth_password_hash_scheme, "pbkdf2_sha256")
self.assertEqual(settings.admin_auth_password_hash_iterations, 390000)
self.assertEqual(settings.admin_auth_password_min_length, 12)
self.assertEqual(settings.admin_auth_access_token_ttl_minutes, 30)
self.assertEqual(settings.admin_auth_refresh_token_ttl_days, 7)
self.assertFalse(settings.admin_bootstrap_enabled)
self.assertEqual(settings.admin_bootstrap_role, "admin")
def test_admin_settings_reject_insecure_password_policy(self):
with self.assertRaises(ValidationError):
AdminSettings(admin_auth_password_min_length=8)
with self.assertRaises(ValidationError):
AdminSettings(admin_auth_password_hash_iterations=50000)
def test_admin_settings_normalize_optional_bootstrap_values(self):
settings = AdminSettings(
admin_bootstrap_email=" ",
admin_bootstrap_display_name=" ",
admin_bootstrap_password=" ",
admin_auth_password_pepper=" ",
)
self.assertIsNone(settings.admin_bootstrap_email)
self.assertIsNone(settings.admin_bootstrap_display_name)
self.assertIsNone(settings.admin_bootstrap_password)
self.assertIsNone(settings.admin_auth_password_pepper)
def test_admin_security_service_builds_runtime_credential_strategy(self):
settings = AdminSettings(
admin_auth_password_pepper="secret-pepper",
admin_bootstrap_enabled=True,
admin_bootstrap_email="admin@empresa.com",
admin_bootstrap_display_name="Admin Inicial",
admin_bootstrap_password="SenhaMuitoSegura!123",
)
strategy = AdminSecurityService(settings).build_credential_strategy()
self.assertEqual(strategy.password.hash_scheme, "pbkdf2_sha256")
self.assertTrue(strategy.password.pepper_configured)
self.assertEqual(strategy.tokens.access_token_ttl_minutes, 30)
self.assertTrue(strategy.bootstrap.enabled)
self.assertEqual(strategy.bootstrap.email, "admin@empresa.com")
self.assertTrue(strategy.bootstrap.password_configured)
self.assertEqual(strategy.bootstrap.role, "admin")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,63 @@
import ast
import unittest
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
ADMIN_ROOT = REPO_ROOT / "admin_app"
PRODUCT_ROOT = REPO_ROOT / "app"
FORBIDDEN_ADMIN_IMPORT_PREFIXES = (
"app.repositories.user_repository",
"app.services.user",
)
FORBIDDEN_ADMIN_MODULE_IMPORTS = {
("app.db.mock_models", "User"),
}
def _iter_python_files(root: Path):
for path in root.rglob("*.py"):
if "__pycache__" in path.parts:
continue
yield path
def _iter_imports(path: Path):
source = path.read_text(encoding="utf-8-sig")
tree = ast.parse(source, filename=str(path))
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
yield ("import", alias.name, None)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
yield ("from", module, alias.name)
class AdminIdentityIsolationTests(unittest.TestCase):
def test_admin_app_does_not_depend_on_product_user_identity_modules(self):
violations: list[str] = []
for path in _iter_python_files(ADMIN_ROOT):
for import_kind, module, imported_name in _iter_imports(path):
if import_kind == "import" and module.startswith(FORBIDDEN_ADMIN_IMPORT_PREFIXES):
violations.append(f"{path.relative_to(REPO_ROOT)} imports {module}")
if import_kind == "from" and module.startswith(FORBIDDEN_ADMIN_IMPORT_PREFIXES):
violations.append(f"{path.relative_to(REPO_ROOT)} imports from {module}")
if import_kind == "from" and (module, imported_name) in FORBIDDEN_ADMIN_MODULE_IMPORTS:
violations.append(f"{path.relative_to(REPO_ROOT)} imports {imported_name} from {module}")
self.assertEqual(violations, [], "\n".join(violations))
def test_product_runtime_does_not_depend_on_admin_app_modules(self):
violations: list[str] = []
for path in _iter_python_files(PRODUCT_ROOT):
for _, module, _ in _iter_imports(path):
if module.startswith("admin_app"):
violations.append(f"{path.relative_to(REPO_ROOT)} imports {module}")
self.assertEqual(violations, [], "\n".join(violations))
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,56 @@
import unittest
from datetime import datetime, timedelta, timezone
from admin_app.core import AdminSecurityService, AdminSettings, AuthenticatedStaffPrincipal
from shared.contracts import StaffRole
class AdminSecurityServiceTests(unittest.TestCase):
def setUp(self):
self.settings = AdminSettings(
admin_auth_token_secret="test-secret",
admin_auth_password_pepper="pepper",
)
self.security_service = AdminSecurityService(self.settings)
def test_hash_password_and_verify_round_trip(self):
password_hash = self.security_service.hash_password("SenhaMuitoSegura!123")
self.assertTrue(self.security_service.verify_password("SenhaMuitoSegura!123", password_hash))
self.assertFalse(self.security_service.verify_password("senha-errada", password_hash))
def test_validate_password_strength_rejects_weak_password(self):
with self.assertRaises(ValueError):
self.security_service.validate_password_strength("fraca")
def test_issue_and_decode_access_token_round_trip(self):
principal = AuthenticatedStaffPrincipal(
id=7,
email="admin@empresa.com",
display_name="Admin",
role=StaffRole.ADMIN,
is_active=True,
)
token = self.security_service.issue_access_token(principal, session_id=99)
claims = self.security_service.decode_access_token(token)
self.assertEqual(claims.sub, "7")
self.assertEqual(claims.sid, 99)
self.assertEqual(claims.email, "admin@empresa.com")
self.assertEqual(claims.role, StaffRole.ADMIN)
self.assertEqual(claims.token_type, "access")
def test_refresh_token_hash_is_stable_for_same_token(self):
refresh_token = self.security_service.generate_refresh_token()
self.assertEqual(
self.security_service.hash_refresh_token(refresh_token),
self.security_service.hash_refresh_token(refresh_token),
)
def test_build_refresh_token_expiry_uses_refresh_ttl(self):
expires_at = self.security_service.build_refresh_token_expiry()
min_expected = datetime.now(timezone.utc) + timedelta(days=6)
self.assertGreater(expires_at, min_expected)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,49 @@
import unittest
from admin_app.db.models import StaffAccount
from shared.contracts import StaffRole
class StaffAccountModelTests(unittest.TestCase):
def test_staff_account_declares_expected_table_and_columns(self):
self.assertEqual(StaffAccount.__tablename__, "staff_accounts")
self.assertIn("email", StaffAccount.__table__.columns)
self.assertIn("display_name", StaffAccount.__table__.columns)
self.assertIn("password_hash", StaffAccount.__table__.columns)
self.assertIn("role", StaffAccount.__table__.columns)
self.assertIn("is_active", StaffAccount.__table__.columns)
self.assertIn("last_login_at", StaffAccount.__table__.columns)
self.assertIn("created_at", StaffAccount.__table__.columns)
self.assertIn("updated_at", StaffAccount.__table__.columns)
def test_staff_account_role_column_uses_shared_hierarchy_values(self):
role_column = StaffAccount.__table__.columns["role"]
enum_values = tuple(role_column.type.enums)
self.assertEqual(
enum_values,
(
StaffRole.VIEWER.value,
StaffRole.STAFF.value,
StaffRole.ADMIN.value,
),
)
def test_staff_account_can_hold_admin_identity_fields(self):
account = StaffAccount(
email="admin@empresa.com",
display_name="Administrador Interno",
password_hash="hashed-secret",
role=StaffRole.ADMIN,
is_active=True,
)
self.assertEqual(account.email, "admin@empresa.com")
self.assertEqual(account.display_name, "Administrador Interno")
self.assertEqual(account.password_hash, "hashed-secret")
self.assertEqual(account.role, StaffRole.ADMIN)
self.assertTrue(account.is_active)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,24 @@
import unittest
from admin_app.db.models import StaffSession
class StaffSessionModelTests(unittest.TestCase):
def test_staff_session_declares_expected_table_and_columns(self):
self.assertEqual(StaffSession.__tablename__, "staff_sessions")
self.assertIn("staff_account_id", StaffSession.__table__.columns)
self.assertIn("refresh_token_hash", StaffSession.__table__.columns)
self.assertIn("expires_at", StaffSession.__table__.columns)
self.assertIn("last_used_at", StaffSession.__table__.columns)
self.assertIn("revoked_at", StaffSession.__table__.columns)
self.assertIn("ip_address", StaffSession.__table__.columns)
self.assertIn("user_agent", StaffSession.__table__.columns)
def test_staff_session_uses_staff_account_foreign_key(self):
foreign_keys = list(StaffSession.__table__.columns["staff_account_id"].foreign_keys)
self.assertEqual(len(foreign_keys), 1)
self.assertEqual(str(foreign_keys[0].target_fullname), "staff_accounts.id")
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save