from __future__ import annotations from collections.abc import Iterable from shared.contracts import ( PRODUCT_OPERATIONAL_DATASETS, SYSTEM_FUNCTIONAL_CONFIGURATIONS, FunctionalConfigurationPropagation, ) ALLOWED_ADMIN_WRITE_TABLES: tuple[str, ...] = ( "admin_audit_logs", "staff_accounts", "staff_sessions", "tool_drafts", "tool_versions", ) class AdminWriteGovernanceViolation(RuntimeError): """Raised when the admin runtime attempts an ungoverned direct write.""" def ensure_direct_admin_write_allowed(table_name: str) -> None: normalized_table_name = str(table_name or "").strip().lower() if normalized_table_name in ALLOWED_ADMIN_WRITE_TABLES: return raise AdminWriteGovernanceViolation( "Escrita direta do admin bloqueada para a tabela " f"'{normalized_table_name or 'desconhecida'}'. " "Use um fluxo governado, versionado e auditavel antes de publicar qualquer efeito no product." ) def enforce_admin_session_write_governance( *, new: Iterable[object] = (), dirty: Iterable[object] = (), deleted: Iterable[object] = (), ) -> None: seen_tables: set[str] = set() for instance in (*tuple(new), *tuple(dirty), *tuple(deleted)): table_name = _resolve_table_name(instance) if table_name is None or table_name in seen_tables: continue ensure_direct_admin_write_allowed(table_name) seen_tables.add(table_name) def build_admin_write_governance_payload() -> dict: governed_configuration_keys = sorted( configuration.config_key for configuration in SYSTEM_FUNCTIONAL_CONFIGURATIONS if configuration.propagation == FunctionalConfigurationPropagation.VERSIONED_PUBLICATION ) return { "mode": "admin_internal_tables_only", "allowed_direct_write_tables": list(ALLOWED_ADMIN_WRITE_TABLES), "blocked_operational_dataset_keys": sorted( dataset.dataset_key for dataset in PRODUCT_OPERATIONAL_DATASETS ), "blocked_product_source_tables": sorted( {dataset.source_table for dataset in PRODUCT_OPERATIONAL_DATASETS} ), "governed_configuration_keys": governed_configuration_keys, "enforcement_points": [ "AdminSession.before_flush bloqueia escrita ORM fora do allowlist interno do admin.", "Contratos compartilhados mantem datasets operacionais com write_allowed=false.", "Configuracoes que afetam o runtime do product seguem versioned_publication antes de qualquer efeito operacional.", ], "governance_rules": [ "O admin nao escreve diretamente nas tabelas operacionais do product.", "Toda alteracao com efeito no product nasce como estado administrativo versionado.", "O product consome apenas configuracao publicada e aprovada.", ], } def build_admin_write_governance_source_payload() -> dict: return { "key": "admin_write_governance", "source": "runtime_guard", "mutable": False, "description": ( "Guard no AdminSession bloqueia escrita ORM fora das tabelas internas do admin e preserva a governanca versionada antes de qualquer efeito no product." ), } def _resolve_table_name(instance: object) -> str | None: table = getattr(instance, "__table__", None) if table is not None: table_name = getattr(table, "name", None) if table_name: return str(table_name).strip().lower() class_table_name = getattr(type(instance), "__tablename__", None) if class_table_name: return str(class_table_name).strip().lower() instance_table_name = getattr(instance, "__tablename__", None) if instance_table_name: return str(instance_table_name).strip().lower() return None