from __future__ import annotations import re from datetime import UTC, datetime from sqlalchemy.orm import Session from admin_app.catalogs import BOOTSTRAP_TOOL_CATALOG, INTAKE_DOMAIN_OPTIONS from admin_app.core.settings import AdminSettings from admin_app.db.models import ToolDraft, ToolMetadata, ToolVersion from admin_app.db.models.tool_artifact import ( ToolArtifactKind, ToolArtifactStage, ToolArtifactStatus, ) from admin_app.repositories.tool_artifact_repository import ToolArtifactRepository from admin_app.repositories.tool_draft_repository import ToolDraftRepository from admin_app.repositories.tool_metadata_repository import ToolMetadataRepository from admin_app.repositories.tool_version_repository import ToolVersionRepository from shared.contracts import ( AdminPermission, GENERATED_TOOL_ENTRYPOINT, GENERATED_TOOLS_PACKAGE, ServiceName, StaffRole, TOOL_LIFECYCLE_STAGES, ToolLifecycleStatus, ToolParameterType, build_generated_tool_module_name, build_generated_tool_module_path, normalize_staff_role, role_has_permission, ) _PARAMETER_TYPE_DESCRIPTIONS = { ToolParameterType.STRING: "Texto livre, codigos e identificadores.", ToolParameterType.INTEGER: "Valores inteiros para limites, anos e contagens.", ToolParameterType.NUMBER: "Valores numericos decimais, como preco e diaria.", ToolParameterType.BOOLEAN: "Marcadores verdadeiro ou falso para decisoes operacionais.", ToolParameterType.OBJECT: "Estruturas compostas para payloads complexos.", ToolParameterType.ARRAY: "Colecoes ordenadas de valores.", } _TOOL_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,63}$") _PARAMETER_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]{1,63}$") _RESERVED_CORE_TOOL_NAMES = frozenset(entry.tool_name for entry in BOOTSTRAP_TOOL_CATALOG) _PUBLISHED_TOOL_STATUSES = (ToolLifecycleStatus.ACTIVE,) _REVIEW_QUEUE_STATUSES = ( ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.GENERATED, ToolLifecycleStatus.VALIDATED, ToolLifecycleStatus.APPROVED, ToolLifecycleStatus.FAILED, ) class ToolManagementService: def __init__( self, settings: AdminSettings, draft_repository: ToolDraftRepository | None = None, version_repository: ToolVersionRepository | None = None, metadata_repository: ToolMetadataRepository | None = None, artifact_repository: ToolArtifactRepository | None = None, ): self.settings = settings self.draft_repository = draft_repository self.version_repository = version_repository self.metadata_repository = metadata_repository self.artifact_repository = artifact_repository def _resolve_repository_session(self) -> Session | None: repository_sessions = [ repository.db for repository in ( self.draft_repository, self.version_repository, self.metadata_repository, self.artifact_repository, ) if getattr(repository, "db", None) is not None ] if not repository_sessions: return None primary_session = repository_sessions[0] for repository_session in repository_sessions[1:]: if repository_session is not primary_session: raise RuntimeError("Tool governance repositories must share the same admin database session.") return primary_session @staticmethod def _commit_repository_session( repository_session: Session, *, draft: ToolDraft, version: ToolVersion | None = None, ) -> None: repository_session.commit() repository_session.refresh(draft) if version is not None: repository_session.refresh(version) def _build_submission_policy( self, *, submitter_role: StaffRole | str | None = None, ) -> dict: normalized_role = normalize_staff_role(submitter_role) if submitter_role is not None else None submitter_can_publish_now = ( role_has_permission(normalized_role, AdminPermission.PUBLISH_TOOLS) if normalized_role is not None else False ) return { "mode": "draft_only", "submitter_role": normalized_role, "submitter_can_publish_now": submitter_can_publish_now, "direct_publication_blocked": True, "requires_director_approval": True, "required_approver_role": StaffRole.DIRETOR, "required_review_permission": AdminPermission.REVIEW_TOOL_GENERATIONS, "required_publish_permission": AdminPermission.PUBLISH_TOOLS, } def build_overview_payload(self) -> dict: catalog_payload = self.build_publications_payload() catalog = catalog_payload["publications"] persisted_draft_count = len(self.draft_repository.list_drafts()) if self.draft_repository else 0 persisted_version_count = 0 if self.version_repository is not None: persisted_version_count = len(self.version_repository.list_versions()) elif self.draft_repository is not None: persisted_version_count = sum(draft.version_count for draft in self.draft_repository.list_drafts()) persisted_metadata_count = len(self.metadata_repository.list_metadata()) if self.metadata_repository else 0 persisted_artifact_count = len(self.artifact_repository.list_artifacts()) if self.artifact_repository else 0 return { "mode": "admin_tool_draft_governance", "metrics": [ { "key": "active_catalog", "label": "Tools mapeadas", "value": str(len(catalog)), "description": "Catalogo governado persistido quando disponivel, com fallback bootstrap enquanto o admin ainda nao tiver metadados proprios.", }, { "key": "lifecycle_stages", "label": "Etapas de lifecycle", "value": str(len(TOOL_LIFECYCLE_STAGES)), "description": "Estados compartilhados entre governanca administrativa e publicacao.", }, { "key": "parameter_types", "label": "Tipos de parametro", "value": str(len(ToolParameterType)), "description": "Tipos aceitos pelo contrato inicial de publicacao de tools.", }, { "key": "persisted_drafts", "label": "Drafts persistidos", "value": str(persisted_draft_count), "description": "Pre-cadastros administrativos ja gravados no armazenamento proprio do admin.", }, { "key": "persisted_versions", "label": "Versoes administrativas", "value": str(persisted_version_count), "description": "Historico versionado das iteracoes de cada tool governada pelo admin.", }, { "key": "persisted_metadata", "label": "Metadados persistidos", "value": str(persisted_metadata_count), "description": "Snapshots canonicos por versao com nome, descricao, parametros, status e autor da tool.", }, { "key": "persisted_artifacts", "label": "Artefatos auditaveis", "value": str(persisted_artifact_count), "description": "Manifestos de geracao e relatorios de validacao gravados por versao para trilha administrativa.", }, ], "workflow": self.build_lifecycle_payload(), "next_steps": [ "Persistir artefatos de geracao e validacao por versao sem perder o historico administrativo.", "Abrir filas de revisao, aprovacao e ativacao com auditoria ponta a ponta.", "Conectar publicacoes versionadas ao runtime de produto com rollback controlado.", ], } def build_contracts_payload(self) -> dict: return { "publication_source_service": ServiceName.ADMIN, "publication_target_service": ServiceName.PRODUCT, "lifecycle_statuses": self.build_lifecycle_payload(), "parameter_types": [ { "code": parameter_type, "label": parameter_type.value.upper(), "description": _PARAMETER_TYPE_DESCRIPTIONS[parameter_type], } for parameter_type in ToolParameterType ], "publication_fields": [ "source_service", "target_service", "publication_id", "published_tool", "emitted_at", ], "published_tool_fields": [ "tool_name", "display_name", "description", "version", "status", "parameters", "implementation_module", "implementation_callable", "checksum", "published_at", "published_by", ], } def build_draft_form_payload( self, *, submitter_role: StaffRole | str | None = None, ) -> dict: submission_policy = self._build_submission_policy(submitter_role=submitter_role) submitter_note = ( "Sua sessao pode cadastrar e salvar o draft, mas nao publica a tool diretamente." if not submission_policy["submitter_can_publish_now"] else "Mesmo com permissao de publicacao, este formulario sempre salva a tool primeiro como draft versionado." ) return { "mode": "validated_preview", "submission_policy": submission_policy, "domain_options": [ { "value": option.value, "label": option.label, "description": option.description, } for option in INTAKE_DOMAIN_OPTIONS ], "parameter_types": [ { "code": parameter_type, "label": parameter_type.value.upper(), "description": _PARAMETER_TYPE_DESCRIPTIONS[parameter_type], } for parameter_type in ToolParameterType ], "naming_rules": [ "tool_name deve usar snake_case minusculo, sem espacos, com 3 a 64 caracteres.", "tool_name nao pode reutilizar nomes reservados pelo catalogo core ja publicado.", "display_name deve explicar claramente a acao operacional que o bot vai executar.", "Cada parametro precisa de nome, tipo, descricao e marcador de obrigatoriedade.", ], "submission_notes": [ "O colaborador pode preencher, validar e persistir o draft da tool no painel.", submitter_note, "Toda tool nova segue para revisao e aprovacao de um diretor antes de qualquer publicacao.", "Reenvios da mesma tool reaproveitam o draft raiz e geram uma nova versao administrativa.", ], "approval_notes": [ "Diretor revisa objetivo, parametros e aderencia ao contrato compartilhado.", "A publicacao para o runtime de produto so pode acontecer apos aprovacao humana.", "Campos livres e payloads complexos exigem criterio maior na etapa de revisao.", ], } def build_drafts_payload(self) -> dict: if self.draft_repository is None: return { "storage_status": "pending_persistence", "message": ( "A nova tela de cadastro ja valida o pre-cadastro da tool no painel, mas a persistencia de ToolDraft ainda nao foi conectada neste runtime." ), "drafts": [], "supported_statuses": [ToolLifecycleStatus.DRAFT], } drafts = self.draft_repository.list_drafts(statuses=(ToolLifecycleStatus.DRAFT,)) message = ( "Nenhum draft administrativo salvo ainda." if not drafts else f"{len(drafts)} draft(s) administrativo(s) salvo(s) no admin com historico versionado." ) return { "storage_status": "admin_database", "message": message, "drafts": [self._serialize_draft_summary(draft) for draft in drafts], "supported_statuses": [ToolLifecycleStatus.DRAFT], } def build_review_queue_payload(self) -> dict: queued_versions = self._list_latest_versions(statuses=_REVIEW_QUEUE_STATUSES) message = ( "Nenhuma versao aguardando revisao, aprovacao ou publicacao de diretor." if not queued_versions else f"{len(queued_versions)} versao(oes) aguardando atuacao de diretor antes da ativacao." ) return { "queue_mode": "governed_admin_queue", "message": message, "items": [self._serialize_review_queue_entry(version) for version in queued_versions], "supported_statuses": list(_REVIEW_QUEUE_STATUSES), } def build_publications_payload(self) -> dict: publications_by_tool_name = { publication["tool_name"]: publication for publication in self.list_publication_catalog() } published_metadata_entries = self._list_latest_metadata_entries( statuses=_PUBLISHED_TOOL_STATUSES, ) if published_metadata_entries: for metadata in published_metadata_entries: publications_by_tool_name[metadata.tool_name] = self._serialize_metadata_publication( metadata ) return { "source": "hybrid_runtime_catalog", "target_service": ServiceName.PRODUCT, "publications": list(publications_by_tool_name.values()), } return { "source": "bootstrap_catalog", "target_service": ServiceName.PRODUCT, "publications": list(publications_by_tool_name.values()), } def review_version( self, version_id: str, *, reviewer_staff_account_id: int, reviewer_name: str, reviewer_role: StaffRole | str, ) -> dict: return self._transition_version_status( version_id, target_status=ToolLifecycleStatus.VALIDATED, allowed_current_statuses=( ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.GENERATED, ), actor_staff_account_id=reviewer_staff_account_id, actor_name=reviewer_name, actor_role=reviewer_role, required_permission=AdminPermission.REVIEW_TOOL_GENERATIONS, artifact_kind=ToolArtifactKind.DIRECTOR_REVIEW, artifact_summary="Revisao inicial de diretor registrada para a versao governada.", success_message="Versao revisada por diretor com sucesso e pronta para aprovacao.", next_steps=[ "A diretoria ainda precisa aprovar formalmente a versao antes da publicacao.", "Depois da aprovacao, a publicacao ativa a tool no catalogo governado do produto.", ], ) def approve_version( self, version_id: str, *, approver_staff_account_id: int, approver_name: str, approver_role: StaffRole | str, ) -> dict: return self._transition_version_status( version_id, target_status=ToolLifecycleStatus.APPROVED, allowed_current_statuses=(ToolLifecycleStatus.VALIDATED,), actor_staff_account_id=approver_staff_account_id, actor_name=approver_name, actor_role=approver_role, required_permission=AdminPermission.REVIEW_TOOL_GENERATIONS, artifact_kind=ToolArtifactKind.DIRECTOR_APPROVAL, artifact_summary="Aprovacao de diretor registrada para a versao governada.", success_message="Versao aprovada por diretor com sucesso e pronta para publicacao.", next_steps=[ "A publicacao administrativa ainda precisa ser executada antes da ativacao.", "Enquanto a versao estiver apenas aprovada, ela permanece fora do catalogo ativo do produto.", ], ) def publish_version( self, version_id: str, *, publisher_staff_account_id: int, publisher_name: str, publisher_role: StaffRole | str, ) -> dict: return self._transition_version_status( version_id, target_status=ToolLifecycleStatus.ACTIVE, allowed_current_statuses=(ToolLifecycleStatus.APPROVED,), actor_staff_account_id=publisher_staff_account_id, actor_name=publisher_name, actor_role=publisher_role, required_permission=AdminPermission.PUBLISH_TOOLS, artifact_kind=ToolArtifactKind.PUBLICATION_RELEASE, artifact_summary="Publicacao administrativa concluida pela diretoria antes da ativacao.", success_message="Versao publicada com sucesso e ativada no catalogo governado.", next_steps=[ "A versao ativa agora pode ser consumida pelo runtime governado do produto.", "Se uma nova versao for publicada para a mesma tool, a ativa anterior sera arquivada automaticamente.", ], ) def _transition_version_status( self, version_id: str, *, target_status: ToolLifecycleStatus, allowed_current_statuses: tuple[ToolLifecycleStatus, ...], actor_staff_account_id: int, actor_name: str, actor_role: StaffRole | str, required_permission: AdminPermission, artifact_kind: ToolArtifactKind, artifact_summary: str, success_message: str, next_steps: list[str], ) -> dict: normalized_role = normalize_staff_role(actor_role) if not role_has_permission(normalized_role, required_permission): raise PermissionError( f"Papel '{normalized_role.value}' sem permissao administrativa '{required_permission.value}'." ) if ( self.draft_repository is None or self.version_repository is None or self.metadata_repository is None ): raise RuntimeError( "Fluxo de governanca de tools ainda nao esta completamente conectado ao armazenamento administrativo." ) normalized_version_id = str(version_id or "").strip().lower() version = self.version_repository.get_by_version_id(normalized_version_id) if version is None: raise LookupError("Versao administrativa nao encontrada.") latest_versions_for_tool = self.version_repository.list_versions(tool_name=version.tool_name) if latest_versions_for_tool and latest_versions_for_tool[0].version_id != version.version_id: raise ValueError( "Somente a versao mais recente da tool pode seguir para revisao, aprovacao e publicacao." ) if version.status not in allowed_current_statuses: expected_statuses = ", ".join(status.value for status in allowed_current_statuses) raise ValueError( f"A transicao solicitada exige status em ({expected_statuses}), mas a versao esta em '{version.status.value}'." ) draft = self.draft_repository.get_by_tool_name(version.tool_name) if draft is None: raise RuntimeError("Draft raiz da tool nao encontrado para a versao governada.") metadata = self.metadata_repository.get_by_tool_version_id(version.id) if metadata is None: raise RuntimeError("Metadados persistidos da versao nao encontrados para a governanca administrativa.") previous_status = version.status repository_session = self._resolve_repository_session() atomic_write_options = {"commit": False} if repository_session is not None else {} artifact_commit = False if repository_session is not None else None try: if target_status == ToolLifecycleStatus.ACTIVE: self._archive_active_publications( tool_name=version.tool_name, excluding_version_id=version.id, **atomic_write_options, ) self.version_repository.update_status( version, status=target_status, **atomic_write_options, ) self.metadata_repository.update_status( metadata, status=target_status, **atomic_write_options, ) self.draft_repository.update_status( draft, status=target_status, **atomic_write_options, ) self._persist_governance_artifact( draft=draft, version=version, artifact_kind=artifact_kind, summary=artifact_summary, previous_status=previous_status, current_status=target_status, actor_staff_account_id=actor_staff_account_id, actor_name=actor_name, actor_role=normalized_role, commit=artifact_commit, ) if repository_session is not None: self._commit_repository_session( repository_session, draft=draft, version=version, ) except Exception: if repository_session is not None: repository_session.rollback() raise queue_entry = None publication = None if target_status == ToolLifecycleStatus.ACTIVE: publication = self._serialize_metadata_publication(metadata) else: queue_entry = self._serialize_review_queue_entry(version) return { "message": success_message, "version_id": version.version_id, "tool_name": version.tool_name, "version_number": version.version_number, "status": target_status, "queue_entry": queue_entry, "publication": publication, "next_steps": next_steps, } def create_draft_submission( self, payload: dict, *, owner_staff_account_id: int | None = None, owner_name: str | None = None, owner_role: StaffRole | str | None = None, ) -> dict: normalized = self._normalize_draft_payload(payload) warnings = self._build_intake_warnings(normalized) required_parameter_count = sum(1 for parameter in normalized["parameters"] if parameter["required"]) summary = self._build_draft_summary(normalized) stored_parameters = self._serialize_parameters_for_storage(normalized["parameters"]) submission_policy = self._build_submission_policy(submitter_role=owner_role) if self.draft_repository is None: version_number = 1 version_count = 1 version_id = self._build_preview_version_id(normalized["tool_name"], version_number) return { "storage_status": "validated_preview", "message": "Pre-cadastro validado no painel sem publicacao direta. A persistencia definitiva entra na fase de governanca de tools.", "submission_policy": submission_policy, "draft_preview": { "draft_id": f"preview::{normalized['tool_name']}", "version_id": version_id, "tool_name": normalized["tool_name"], "display_name": normalized["display_name"], "domain": normalized["domain"], "status": ToolLifecycleStatus.DRAFT, "summary": summary, "business_goal": normalized["business_goal"], "version_number": version_number, "version_count": version_count, "parameter_count": len(normalized["parameters"]), "required_parameter_count": required_parameter_count, "requires_director_approval": True, "owner_name": owner_name, "parameters": normalized["parameters"], }, "warnings": warnings, "next_steps": [ "Persistir o draft administrativo em armazenamento proprio do admin na fase 5.", "Encaminhar a tool para revisao e aprovacao de um diretor.", "Executar pipeline de geracao, validacao e publicacao antes da ativacao no produto.", ], } if owner_staff_account_id is None: raise ValueError("owner_staff_account_id e obrigatorio para persistir o draft.") repository_session = self._resolve_repository_session() atomic_write_options = {"commit": False} if repository_session is not None else {} artifact_commit = False if repository_session is not None else None owner_display_name = owner_name or "Autor administrativo" existing_draft = self.draft_repository.get_by_tool_name(normalized["tool_name"]) next_version_number = self._resolve_next_version_number(normalized["tool_name"], existing_draft) next_version_count = next_version_number if existing_draft is None else max(existing_draft.version_count + 1, next_version_number) try: if existing_draft is None: draft = self.draft_repository.create( tool_name=normalized["tool_name"], display_name=normalized["display_name"], domain=normalized["domain"], description=normalized["description"], business_goal=normalized["business_goal"], summary=summary, parameters_json=stored_parameters, required_parameter_count=required_parameter_count, current_version_number=next_version_number, version_count=next_version_count, owner_staff_account_id=owner_staff_account_id, owner_display_name=owner_display_name, requires_director_approval=True, **atomic_write_options, ) else: draft = self.draft_repository.update_submission( existing_draft, display_name=normalized["display_name"], domain=normalized["domain"], description=normalized["description"], business_goal=normalized["business_goal"], summary=summary, parameters_json=stored_parameters, required_parameter_count=required_parameter_count, current_version_number=next_version_number, version_count=next_version_count, owner_staff_account_id=owner_staff_account_id, owner_display_name=owner_display_name, requires_director_approval=True, **atomic_write_options, ) version = None if self.version_repository is not None: version = self.version_repository.create( draft_id=draft.id, tool_name=draft.tool_name, version_number=next_version_number, summary=summary, description=normalized["description"], business_goal=normalized["business_goal"], parameters_json=stored_parameters, required_parameter_count=required_parameter_count, owner_staff_account_id=owner_staff_account_id, owner_display_name=owner_display_name, status=ToolLifecycleStatus.DRAFT, requires_director_approval=True, **atomic_write_options, ) if version is not None and self.metadata_repository is not None: self.metadata_repository.upsert_version_metadata( draft_id=draft.id, tool_version_id=version.id, tool_name=draft.tool_name, display_name=draft.display_name, domain=draft.domain, description=draft.description, parameters_json=stored_parameters, version_number=version.version_number, status=version.status, author_staff_account_id=version.owner_staff_account_id, author_display_name=version.owner_display_name, **atomic_write_options, ) if version is not None and self.artifact_repository is not None: self._persist_initial_version_artifacts( draft=draft, version=version, summary=summary, warnings=warnings, stored_parameters=stored_parameters, required_parameter_count=required_parameter_count, owner_staff_account_id=owner_staff_account_id, owner_name=owner_display_name, commit=artifact_commit, ) if repository_session is not None: self._commit_repository_session( repository_session, draft=draft, version=version, ) except Exception: if repository_session is not None: repository_session.rollback() raise return { "storage_status": "admin_database", "message": "Draft administrativo persistido com sucesso sem publicacao direta, em fluxo versionado e governado.", "submission_policy": submission_policy, "draft_preview": self._serialize_draft_preview(draft, version), "warnings": warnings, "next_steps": [ f"Encaminhar a versao v{draft.current_version_number} para revisao e aprovacao de um diretor.", "Conectar a versao persistida ao pipeline de geracao e validacao automatica da tool.", "Persistir artefatos e publicacoes associados a cada versao governada.", ], } def preview_draft_submission( self, payload: dict, *, owner_name: str | None = None, owner_role: StaffRole | str | None = None, ) -> dict: normalized = self._normalize_draft_payload(payload) warnings = self._build_intake_warnings(normalized) required_parameter_count = sum(1 for parameter in normalized["parameters"] if parameter["required"]) summary = self._build_draft_summary(normalized) submission_policy = self._build_submission_policy(submitter_role=owner_role) existing_draft = None if self.draft_repository is not None: existing_draft = self.draft_repository.get_by_tool_name(normalized["tool_name"]) version_number = self._resolve_next_version_number(normalized["tool_name"], existing_draft) version_count = version_number if existing_draft is None else max(existing_draft.version_count + 1, version_number) return { "storage_status": "validated_preview", "message": "Pre-cadastro validado no painel com numeracao de versao reservada para a tool, sem publicacao direta nesta etapa.", "submission_policy": submission_policy, "draft_preview": { "draft_id": existing_draft.draft_id if existing_draft is not None else f"preview::{normalized['tool_name']}", "version_id": self._build_preview_version_id(normalized["tool_name"], version_number), "tool_name": normalized["tool_name"], "display_name": normalized["display_name"], "domain": normalized["domain"], "status": ToolLifecycleStatus.DRAFT, "summary": summary, "business_goal": normalized["business_goal"], "version_number": version_number, "version_count": version_count, "parameter_count": len(normalized["parameters"]), "required_parameter_count": required_parameter_count, "requires_director_approval": True, "owner_name": owner_name, "parameters": normalized["parameters"], }, "warnings": warnings, "next_steps": [ "Persistir a nova versao administrativa para consolidar o historico da tool.", "Encaminhar a versao para revisao e aprovacao de um diretor.", "Executar pipeline de geracao, validacao e publicacao antes da ativacao no produto.", ], } def build_lifecycle_payload(self) -> list[dict]: return [ { "code": stage.code, "label": stage.label, "description": stage.description, "order": stage.order, "terminal": stage.terminal, } for stage in TOOL_LIFECYCLE_STAGES ] def list_publication_catalog(self) -> list[dict]: published_at = datetime.now(UTC) return [ { "publication_id": f"bootstrap::{entry.tool_name}::v1", "tool_name": entry.tool_name, "display_name": entry.display_name, "description": entry.description, "domain": entry.domain, "version": 1, "status": ToolLifecycleStatus.ACTIVE, "parameter_count": entry.parameter_count, "implementation_module": "app.services.tools.handlers", "implementation_callable": entry.tool_name, "published_by": "bootstrap_catalog", "published_at": published_at, } for entry in BOOTSTRAP_TOOL_CATALOG ] def _archive_active_publications( self, *, tool_name: str, excluding_version_id: int, commit: bool = True, ) -> None: if self.version_repository is not None: for active_version in self.version_repository.list_versions( tool_name=tool_name, statuses=(ToolLifecycleStatus.ACTIVE,), ): if active_version.id == excluding_version_id: continue self.version_repository.update_status( active_version, status=ToolLifecycleStatus.ARCHIVED, commit=commit, ) if self.metadata_repository is not None: for active_metadata in self.metadata_repository.list_metadata( tool_name=tool_name, statuses=(ToolLifecycleStatus.ACTIVE,), ): if active_metadata.tool_version_id == excluding_version_id: continue self.metadata_repository.update_status( active_metadata, status=ToolLifecycleStatus.ARCHIVED, commit=commit, ) def _persist_governance_artifact( self, *, draft: ToolDraft, version: ToolVersion, artifact_kind: ToolArtifactKind, summary: str, previous_status: ToolLifecycleStatus, current_status: ToolLifecycleStatus, actor_staff_account_id: int, actor_name: str, actor_role: StaffRole, commit: bool | None = None, ) -> None: if self.artifact_repository is None: return artifact_write_options = {"commit": commit} if commit is not None else {} self.artifact_repository.upsert_version_artifact( draft_id=draft.id, tool_version_id=version.id, tool_name=version.tool_name, version_number=version.version_number, artifact_stage=ToolArtifactStage.GOVERNANCE, artifact_kind=artifact_kind, artifact_status=ToolArtifactStatus.SUCCEEDED, summary=summary, payload_json=self._build_governance_artifact_payload( version=version, artifact_kind=artifact_kind, previous_status=previous_status, current_status=current_status, actor_staff_account_id=actor_staff_account_id, actor_name=actor_name, actor_role=actor_role, ), author_staff_account_id=actor_staff_account_id, author_display_name=actor_name, **artifact_write_options, ) @staticmethod def _build_governance_artifact_payload( *, version: ToolVersion, artifact_kind: ToolArtifactKind, previous_status: ToolLifecycleStatus, current_status: ToolLifecycleStatus, actor_staff_account_id: int, actor_name: str, actor_role: StaffRole, ) -> dict: return { "source": "director_governance", "action": artifact_kind.value, "tool_name": version.tool_name, "version_id": version.version_id, "version_number": version.version_number, "previous_status": previous_status.value, "current_status": current_status.value, "actor_staff_account_id": actor_staff_account_id, "actor_display_name": actor_name, "actor_role": actor_role.value, } def _persist_initial_version_artifacts( self, *, draft: ToolDraft, version: ToolVersion, summary: str, warnings: list[str], stored_parameters: list[dict], required_parameter_count: int, owner_staff_account_id: int, owner_name: str, commit: bool | None = None, ) -> None: if self.artifact_repository is None: return artifact_write_options = {"commit": commit} if commit is not None else {} generation_payload = self._build_generation_artifact_payload( draft=draft, version=version, summary=summary, stored_parameters=stored_parameters, ) validation_payload = self._build_validation_artifact_payload( draft=draft, version=version, warnings=warnings, stored_parameters=stored_parameters, required_parameter_count=required_parameter_count, ) self.artifact_repository.upsert_version_artifact( draft_id=draft.id, tool_version_id=version.id, tool_name=draft.tool_name, version_number=version.version_number, artifact_stage=ToolArtifactStage.GENERATION, artifact_kind=ToolArtifactKind.GENERATION_REQUEST, artifact_status=ToolArtifactStatus.PENDING, summary="Manifesto inicial de geracao persistido para auditoria da versao.", payload_json=generation_payload, author_staff_account_id=owner_staff_account_id, author_display_name=owner_name, **artifact_write_options, ) self.artifact_repository.upsert_version_artifact( draft_id=draft.id, tool_version_id=version.id, tool_name=draft.tool_name, version_number=version.version_number, artifact_stage=ToolArtifactStage.VALIDATION, artifact_kind=ToolArtifactKind.VALIDATION_REPORT, artifact_status=ToolArtifactStatus.SUCCEEDED, summary="Relatorio de validacao do pre-cadastro persistido para auditoria da versao.", payload_json=validation_payload, author_staff_account_id=owner_staff_account_id, author_display_name=owner_name, **artifact_write_options, ) @staticmethod def _build_generation_artifact_payload( *, draft: ToolDraft, version: ToolVersion, summary: str, stored_parameters: list[dict], ) -> dict: return { "source": "admin_draft_intake", "tool_name": draft.tool_name, "display_name": draft.display_name, "domain": draft.domain, "version_number": version.version_number, "draft_id": draft.draft_id, "version_id": version.version_id, "business_goal": draft.business_goal, "description": draft.description, "summary": summary, "parameters": list(stored_parameters), "requires_director_approval": draft.requires_director_approval, "target_package": GENERATED_TOOLS_PACKAGE, "target_module": build_generated_tool_module_name(draft.tool_name), "target_file_path": build_generated_tool_module_path(draft.tool_name), "target_callable": GENERATED_TOOL_ENTRYPOINT, "reserved_lifecycle_target": ToolLifecycleStatus.GENERATED.value, } @staticmethod def _build_validation_artifact_payload( *, draft: ToolDraft, version: ToolVersion, warnings: list[str], stored_parameters: list[dict], required_parameter_count: int, ) -> dict: return { "source": "admin_draft_intake", "tool_name": draft.tool_name, "version_number": version.version_number, "draft_id": draft.draft_id, "version_id": version.version_id, "validation_status": "passed", "warnings": list(warnings), "parameter_count": len(stored_parameters), "required_parameter_count": required_parameter_count, "checked_rules": [ "tool_name_snake_case", "display_name_min_length", "domain_catalog", "description_min_length", "business_goal_min_length", "parameter_contracts", ], } def _list_latest_versions( self, *, statuses: tuple[ToolLifecycleStatus, ...] | None = None, ) -> list[ToolVersion]: if self.version_repository is None: return [] latest_by_tool_name: dict[str, ToolVersion | None] = {} for version in self.version_repository.list_versions(): normalized_tool_name = str(version.tool_name or "").strip().lower() if normalized_tool_name in latest_by_tool_name: continue if statuses is not None and version.status not in statuses: latest_by_tool_name[normalized_tool_name] = None continue latest_by_tool_name[normalized_tool_name] = version return [version for version in latest_by_tool_name.values() if version is not None] def _serialize_review_queue_entry(self, version: ToolVersion) -> dict: metadata = ( self.metadata_repository.get_by_tool_version_id(version.id) if self.metadata_repository is not None else None ) display_name = metadata.display_name if metadata is not None else version.tool_name.replace("_", " ").title() return { "entry_id": version.version_id, "version_id": version.version_id, "version_number": version.version_number, "tool_name": version.tool_name, "display_name": display_name, "status": version.status, "gate": self._build_review_gate(version.status), "summary": version.summary, "owner_name": version.owner_display_name, "queued_at": version.updated_at or version.created_at, } @staticmethod def _build_review_gate(status: ToolLifecycleStatus) -> str: gate_by_status = { ToolLifecycleStatus.DRAFT: "director_review_required", ToolLifecycleStatus.GENERATED: "validation_confirmation_required", ToolLifecycleStatus.VALIDATED: "director_approval_required", ToolLifecycleStatus.APPROVED: "director_publication_required", ToolLifecycleStatus.FAILED: "revision_required", } return gate_by_status.get(status, "governance_required") def _list_latest_metadata_entries( self, *, statuses: tuple[ToolLifecycleStatus, ...] | None = None, ) -> list[ToolMetadata]: if self.metadata_repository is None: return [] latest_by_tool_name: dict[str, ToolMetadata] = {} for metadata in self.metadata_repository.list_metadata(statuses=statuses): normalized_tool_name = str(metadata.tool_name or "").strip().lower() if normalized_tool_name in latest_by_tool_name: continue latest_by_tool_name[normalized_tool_name] = metadata return list(latest_by_tool_name.values()) def _serialize_metadata_publication(self, metadata: ToolMetadata) -> dict: parameters = self._serialize_parameters_for_response(metadata.parameters_json) return { "publication_id": metadata.metadata_id, "tool_name": metadata.tool_name, "display_name": metadata.display_name, "description": metadata.description, "domain": metadata.domain, "version": metadata.version_number, "status": metadata.status, "parameter_count": len(parameters), "parameters": parameters, "author_name": metadata.author_display_name, "implementation_module": build_generated_tool_module_name(metadata.tool_name), "implementation_callable": GENERATED_TOOL_ENTRYPOINT, "published_by": metadata.author_display_name, "published_at": metadata.updated_at or metadata.created_at, } def _serialize_draft_summary(self, draft: ToolDraft) -> dict: return { "draft_id": draft.draft_id, "tool_name": draft.tool_name, "display_name": draft.display_name, "status": draft.status, "summary": draft.summary, "current_version_number": draft.current_version_number, "version_count": draft.version_count, "owner_name": draft.owner_display_name, "updated_at": draft.updated_at, } def _serialize_draft_preview( self, draft: ToolDraft, version: ToolVersion | None = None, ) -> dict: parameters = self._serialize_parameters_for_response(draft.parameters_json) version_id = version.version_id if version is not None else self._build_preview_version_id( draft.tool_name, draft.current_version_number, ) version_number = version.version_number if version is not None else draft.current_version_number return { "draft_id": draft.draft_id, "version_id": version_id, "tool_name": draft.tool_name, "display_name": draft.display_name, "domain": draft.domain, "status": draft.status, "summary": draft.summary, "business_goal": draft.business_goal, "version_number": version_number, "version_count": draft.version_count, "parameter_count": len(parameters), "required_parameter_count": draft.required_parameter_count, "requires_director_approval": draft.requires_director_approval, "owner_name": draft.owner_display_name, "parameters": parameters, } @staticmethod def _serialize_parameters_for_storage(parameters: list[dict]) -> list[dict]: return [ { "name": parameter["name"], "parameter_type": parameter["parameter_type"].value, "description": parameter["description"], "required": parameter["required"], } for parameter in parameters ] @staticmethod def _serialize_parameters_for_response(parameters_json: list[dict] | None) -> list[dict]: return [ { "name": str((parameter or {}).get("name") or "").strip().lower(), "parameter_type": ToolParameterType(str((parameter or {}).get("parameter_type") or "string").strip().lower()), "description": str((parameter or {}).get("description") or "").strip(), "required": bool((parameter or {}).get("required", True)), } for parameter in (parameters_json or []) ] @staticmethod def _build_draft_summary(payload: dict) -> str: return ( f"{payload['display_name']} pronta para seguir como draft com {len(payload['parameters'])} parametro(s) e revisao obrigatoria de diretor." ) @staticmethod def _build_preview_version_id(tool_name: str, version_number: int) -> str: return f"tool_version::{str(tool_name or '').strip().lower()}::v{int(version_number)}" def _resolve_next_version_number( self, tool_name: str, existing_draft: ToolDraft | None, ) -> int: repository_version = ( self.version_repository.get_next_version_number(tool_name) if self.version_repository is not None else 1 ) if existing_draft is None: return repository_version return max(repository_version, existing_draft.current_version_number + 1) def _normalize_draft_payload(self, payload: dict) -> dict: tool_name = str(payload.get("tool_name") or "").strip().lower() if not _TOOL_NAME_PATTERN.fullmatch(tool_name): raise ValueError("tool_name deve usar snake_case minusculo com 3 a 64 caracteres.") if tool_name in _RESERVED_CORE_TOOL_NAMES: raise ValueError( "tool_name reservado pelo catalogo core do sistema. Gere uma nova tool sem sobrescrever uma capability interna." ) display_name = str(payload.get("display_name") or "").strip() if len(display_name) < 4: raise ValueError("display_name precisa ter pelo menos 4 caracteres.") domain = str(payload.get("domain") or "").strip().lower() valid_domains = {option.value for option in INTAKE_DOMAIN_OPTIONS} if domain not in valid_domains: raise ValueError("Selecione um dominio valido para a nova tool.") description = str(payload.get("description") or "").strip() if len(description) < 16: raise ValueError("A descricao precisa ter pelo menos 16 caracteres para contextualizar a tool.") business_goal = str(payload.get("business_goal") or "").strip() if len(business_goal) < 12: raise ValueError("Explique o objetivo operacional da tool com pelo menos 12 caracteres.") raw_parameters = payload.get("parameters") or [] if not isinstance(raw_parameters, list): raise ValueError("Os parametros enviados para a tool sao invalidos.") seen_parameter_names: set[str] = set() parameters: list[dict] = [] for raw_parameter in raw_parameters: name = str((raw_parameter or {}).get("name") or "").strip().lower() if not name: continue if not _PARAMETER_NAME_PATTERN.fullmatch(name): raise ValueError("Cada parametro deve usar snake_case minusculo com pelo menos 2 caracteres.") if name in seen_parameter_names: raise ValueError("Nao e permitido repetir nomes de parametro na mesma tool.") seen_parameter_names.add(name) raw_parameter_type = (raw_parameter or {}).get("parameter_type") or "" parameter_type = ( raw_parameter_type if isinstance(raw_parameter_type, ToolParameterType) else ToolParameterType(str(raw_parameter_type).strip().lower()) ) parameter_description = str((raw_parameter or {}).get("description") or "").strip() if len(parameter_description) < 8: raise ValueError("Cada parametro precisa de uma descricao com pelo menos 8 caracteres.") parameters.append( { "name": name, "parameter_type": parameter_type, "description": parameter_description, "required": bool((raw_parameter or {}).get("required", True)), } ) if len(parameters) > 10: raise ValueError("A fase inicial do painel aceita no maximo 10 parametros por tool.") return { "tool_name": tool_name, "display_name": display_name, "domain": domain, "description": description, "business_goal": business_goal, "parameters": parameters, } def _build_intake_warnings(self, payload: dict) -> list[str]: warnings: list[str] = [] parameters = payload["parameters"] if not parameters: warnings.append("A tool foi cadastrada sem parametros. Confirme se a acao realmente nao exige entrada contextual.") if len(parameters) >= 6: warnings.append("A quantidade de parametros ja pede uma revisao mais cuidadosa antes da aprovacao de diretor.") if any(parameter["parameter_type"] in {ToolParameterType.OBJECT, ToolParameterType.ARRAY} for parameter in parameters): warnings.append("Parametros compostos exigem atencao extra na revisao porque podem esconder payloads mais sensiveis.") if payload["domain"] == "orquestracao": warnings.append("Tools de orquestracao precisam confirmar claramente como afetam o fluxo do bot antes da ativacao.") return warnings