You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/admin_app/services/tool_management_service.py

3167 lines
156 KiB
Python

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import asyncio
import hashlib
import inspect
import json
import re
import sys
import types
from datetime import UTC, datetime
from pydantic import ValidationError
from sqlalchemy.orm import Session
from admin_app.catalogs import BOOTSTRAP_TOOL_CATALOG, INTAKE_DOMAIN_OPTIONS
from admin_app.core.settings import AdminSettings
from admin_app.db.models import ToolDraft, ToolMetadata, ToolVersion
from admin_app.db.models.tool_artifact import (
ToolArtifactKind,
ToolArtifactStage,
ToolArtifactStatus,
)
from admin_app.repositories.tool_artifact_repository import ToolArtifactRepository
from admin_app.repositories.tool_draft_repository import ToolDraftRepository
from admin_app.repositories.tool_metadata_repository import ToolMetadataRepository
from admin_app.repositories.tool_version_repository import ToolVersionRepository
from app.services.tools.tool_registry import GeneratedToolCoreBoundaryViolation, ToolRegistry
from shared.contracts import (
AdminPermission,
GENERATED_TOOL_ENTRYPOINT,
GENERATED_TOOLS_PACKAGE,
PublishedToolContract,
ServiceName,
StaffRole,
TOOL_LIFECYCLE_STAGES,
ToolLifecycleStatus,
ToolParameterContract,
ToolParameterType,
ToolPublicationEnvelope,
ToolRuntimePublicationManifest,
build_generated_tool_file_path,
build_generated_tool_module_name,
build_generated_tool_module_path,
get_generated_tool_publication_manifest_path,
get_generated_tools_runtime_dir,
normalize_staff_role,
role_has_permission,
)
_PARAMETER_TYPE_DESCRIPTIONS = {
ToolParameterType.STRING: "Texto livre, codigos e identificadores.",
ToolParameterType.INTEGER: "Valores inteiros para limites, anos e contagens.",
ToolParameterType.NUMBER: "Valores numericos decimais, como preco e diaria.",
ToolParameterType.BOOLEAN: "Marcadores verdadeiro ou falso para decisoes operacionais.",
ToolParameterType.OBJECT: "Estruturas compostas para payloads complexos.",
ToolParameterType.ARRAY: "Colecoes ordenadas de valores.",
}
_TOOL_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]{2,63}$")
_PARAMETER_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]{1,63}$")
_RESERVED_CORE_TOOL_NAMES = frozenset(entry.tool_name for entry in BOOTSTRAP_TOOL_CATALOG)
_PUBLISHED_TOOL_STATUSES = (ToolLifecycleStatus.ACTIVE,)
_AUTOMATED_CONTRACT_VALIDATION_RULES = (
"publication_envelope_contract",
"published_tool_contract",
"generated_namespace_contract",
"generated_entrypoint_contract",
"metadata_identifier_contract",
"parameter_contract_rules",
)
_AUTOMATED_SIGNATURE_SCHEMA_VALIDATION_RULES = (
"generated_entrypoint_signature",
"reserved_runtime_parameter_names",
"parameter_schema_projection",
"required_parameter_alignment",
)
_AUTOMATED_IMPORT_LOADING_VALIDATION_RULES = (
"generated_module_render",
"generated_module_import",
"generated_entrypoint_load",
"generated_runtime_registry_boundary",
)
_AUTOMATED_SMOKE_TEST_RULES = (
"generated_entrypoint_execution",
"generated_runtime_dispatch_execution",
"generated_result_json_serialization",
)
_PARAMETER_SCHEMA_TYPE_MAPPING = {
ToolParameterType.STRING: "string",
ToolParameterType.INTEGER: "integer",
ToolParameterType.NUMBER: "number",
ToolParameterType.BOOLEAN: "boolean",
ToolParameterType.OBJECT: "object",
ToolParameterType.ARRAY: "array",
}
_SIGNATURE_RESERVED_PARAMETER_NAMES = frozenset({"user_id"})
_REVIEW_QUEUE_STATUSES = (
ToolLifecycleStatus.DRAFT,
ToolLifecycleStatus.GENERATED,
ToolLifecycleStatus.VALIDATED,
ToolLifecycleStatus.APPROVED,
ToolLifecycleStatus.FAILED,
)
_HUMAN_DECISION_NOTES_MIN_LENGTH = 12
class ToolManagementService:
def __init__(
self,
settings: AdminSettings,
draft_repository: ToolDraftRepository | None = None,
version_repository: ToolVersionRepository | None = None,
metadata_repository: ToolMetadataRepository | None = None,
artifact_repository: ToolArtifactRepository | None = None,
tool_generation_service=None, # ToolGenerationService | None
tool_generation_worker_service=None, # ToolGenerationWorkerService | None
):
self.settings = settings
self.draft_repository = draft_repository
self.version_repository = version_repository
self.metadata_repository = metadata_repository
self.artifact_repository = artifact_repository
# ServiÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§o isolado de geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o via LLM (runtime tool_generation, separado do atendimento).
# Pode ser None para manter compatibilidade retroativa (usa stub de validaÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o).
self.tool_generation_service = tool_generation_service
# Worker dedicado para executar a pipeline em thread prÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³pria do admin.
self.tool_generation_worker_service = tool_generation_worker_service
def run_generation_pipeline_in_worker(
self,
version_id: str,
*,
runner_staff_account_id: int,
runner_name: str,
runner_role: StaffRole | str,
) -> dict:
normalized_role = normalize_staff_role(runner_role)
if not role_has_permission(normalized_role, AdminPermission.MANAGE_TOOL_DRAFTS):
raise PermissionError(
f"Papel '{normalized_role.value}' sem permissao administrativa '{AdminPermission.MANAGE_TOOL_DRAFTS.value}'."
)
if self.tool_generation_worker_service is None:
payload = dict(
self.run_generation_pipeline(
version_id,
runner_staff_account_id=runner_staff_account_id,
runner_name=runner_name,
runner_role=runner_role,
)
)
payload["execution"] = {
"mode": "inline_admin_service",
"target": "admin_inline_generation_pipeline",
"dispatch_state": "completed",
"worker_max_workers": None,
"worker_pending_jobs": None,
"queued_jobs_before_submit": 0,
"submitted_at": None,
"started_at": None,
"completed_at": None,
"elapsed_ms": None,
"worker_thread_name": None,
"poll_after_ms": None,
"last_error": None,
}
return payload
if self.tool_generation_worker_service is not None and (
self.draft_repository is None
or self.version_repository is None
or self.metadata_repository is None
):
raise RuntimeError(
"Pipeline de geracao ainda nao esta completamente conectado ao armazenamento administrativo."
)
normalized_version_id = str(version_id or "").strip().lower()
version = self.version_repository.get_by_version_id(normalized_version_id)
if version is None:
raise LookupError("Versao administrativa nao encontrada.")
latest_versions_for_tool = self.version_repository.list_versions(tool_name=version.tool_name)
if latest_versions_for_tool and latest_versions_for_tool[0].version_id != version.version_id:
raise ValueError(
"Somente a versao mais recente da tool pode seguir pelo pipeline de geracao."
)
if version.status not in {ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.FAILED}:
raise ValueError(
f"A pipeline de geracao exige status em (draft, failed), mas a versao esta em '{version.status.value}'."
)
draft = self.draft_repository.get_by_tool_name(version.tool_name)
if draft is None:
raise RuntimeError("Draft raiz da tool nao encontrado para a pipeline de geracao.")
metadata = self.metadata_repository.get_by_tool_version_id(version.id)
if metadata is None:
raise RuntimeError("Metadados persistidos da versao nao encontrados para a pipeline de geracao.")
execution = self.tool_generation_worker_service.dispatch_generation_pipeline(
version_id=version_id,
runner_staff_account_id=runner_staff_account_id,
runner_name=runner_name,
runner_role=runner_role,
)
result_payload = execution.pop("result_payload", None)
if execution.get("dispatch_state") == "completed" and isinstance(result_payload, dict):
payload = dict(result_payload)
payload["execution"] = execution
return payload
pipeline_snapshot = self._build_pipeline_snapshot(version.status)
dispatch_state = str(execution.get("dispatch_state") or "queued").strip().lower()
message_by_dispatch_state = {
"queued": "Pipeline enfileirada no worker dedicado do admin. A request foi liberada sem esperar a geracao terminar.",
"running": "Pipeline em execucao no worker dedicado do admin. A request foi liberada sem bloquear o runtime administrativo.",
"failed": "O worker dedicado falhou antes de concluir a pipeline. Revise o erro registrado e tente novamente.",
}
next_steps_by_dispatch_state = {
"queued": [
"Acompanhe a fila de revisao para ver quando a versao sair de draft e entrar em generated.",
"Enquanto a job estiver na fila dedicada, o atendimento continua desacoplado da carga de geracao.",
],
"running": [
"A pipeline ja esta sendo executada em background no worker dedicado do admin.",
"Atualize a leitura da fila ou do detalhe para acompanhar a transicao da versao quando o worker concluir.",
],
"failed": [
"Revise o erro do worker dedicado e reenvie a pipeline quando a causa for corrigida.",
"Enquanto a versao permanecer sem uma geracao concluida, ela continua fora da revisao humana e da ativacao.",
],
}
return {
"message": message_by_dispatch_state.get(
dispatch_state,
"Pipeline encaminhada para o worker dedicado do admin.",
),
"version_id": version.version_id,
"tool_name": version.tool_name,
"version_number": version.version_number,
"status": version.status,
"current_step": pipeline_snapshot["current_step"],
"steps": pipeline_snapshot["steps"],
"queue_entry": self._serialize_review_queue_entry(version, worker_execution=execution),
"automated_validations": [],
"execution": execution,
"next_steps": next_steps_by_dispatch_state.get(
dispatch_state,
["Atualize a fila administrativa para acompanhar a pipeline dedicada."],
),
}
def _get_generation_pipeline_worker_execution(self, version_id: str) -> dict | None:
if self.tool_generation_worker_service is None:
return None
getter = getattr(self.tool_generation_worker_service, "get_generation_pipeline_dispatch", None)
if not callable(getter):
return None
execution = getter(version_id)
if not isinstance(execution, dict):
return None
return dict(execution)
def _resolve_repository_session(self) -> Session | None:
repository_sessions = [
repository.db
for repository in (
self.draft_repository,
self.version_repository,
self.metadata_repository,
self.artifact_repository,
)
if getattr(repository, "db", None) is not None
]
if not repository_sessions:
return None
primary_session = repository_sessions[0]
for repository_session in repository_sessions[1:]:
if repository_session is not primary_session:
raise RuntimeError("Tool governance repositories must share the same admin database session.")
return primary_session
@staticmethod
def _commit_repository_session(
repository_session: Session,
*,
draft: ToolDraft,
version: ToolVersion | None = None,
) -> None:
repository_session.commit()
repository_session.refresh(draft)
if version is not None:
repository_session.refresh(version)
def _build_submission_policy(
self,
*,
submitter_role: StaffRole | str | None = None,
) -> dict:
normalized_role = normalize_staff_role(submitter_role) if submitter_role is not None else None
submitter_can_publish_now = (
role_has_permission(normalized_role, AdminPermission.PUBLISH_TOOLS)
if normalized_role is not None
else False
)
return {
"mode": "draft_only",
"submitter_role": normalized_role,
"submitter_can_publish_now": submitter_can_publish_now,
"direct_publication_blocked": True,
"requires_director_approval": True,
"required_approver_role": StaffRole.DIRETOR,
"required_review_permission": AdminPermission.REVIEW_TOOL_GENERATIONS,
"required_publish_permission": AdminPermission.PUBLISH_TOOLS,
}
def build_overview_payload(self) -> dict:
catalog_payload = self.build_publications_payload()
catalog = catalog_payload["publications"]
persisted_draft_count = len(self.draft_repository.list_drafts()) if self.draft_repository else 0
persisted_version_count = 0
if self.version_repository is not None:
persisted_version_count = len(self.version_repository.list_versions())
elif self.draft_repository is not None:
persisted_version_count = sum(draft.version_count for draft in self.draft_repository.list_drafts())
persisted_metadata_count = len(self.metadata_repository.list_metadata()) if self.metadata_repository else 0
persisted_artifact_count = len(self.artifact_repository.list_artifacts()) if self.artifact_repository else 0
return {
"mode": "admin_tool_draft_governance",
"metrics": [
{
"key": "active_catalog",
"label": "Tools mapeadas",
"value": str(len(catalog)),
"description": "Catalogo governado persistido quando disponivel, com fallback bootstrap enquanto o admin ainda nao tiver metadados proprios.",
},
{
"key": "lifecycle_stages",
"label": "Etapas de lifecycle",
"value": str(len(TOOL_LIFECYCLE_STAGES)),
"description": "Estados compartilhados entre governanca administrativa e publicacao.",
},
{
"key": "parameter_types",
"label": "Tipos de parametro",
"value": str(len(ToolParameterType)),
"description": "Tipos aceitos pelo contrato inicial de publicacao de tools.",
},
{
"key": "persisted_drafts",
"label": "Drafts persistidos",
"value": str(persisted_draft_count),
"description": "Pre-cadastros administrativos ja gravados no armazenamento proprio do admin.",
},
{
"key": "persisted_versions",
"label": "Versoes administrativas",
"value": str(persisted_version_count),
"description": "Historico versionado das iteracoes de cada tool governada pelo admin.",
},
{
"key": "persisted_metadata",
"label": "Metadados persistidos",
"value": str(persisted_metadata_count),
"description": "Snapshots canonicos por versao com nome, descricao, parametros, status e autor da tool.",
},
{
"key": "persisted_artifacts",
"label": "Artefatos auditaveis",
"value": str(persisted_artifact_count),
"description": "Manifestos de geracao e relatorios de validacao gravados por versao para trilha administrativa.",
},
],
"workflow": self.build_lifecycle_payload(),
"next_steps": [
"Executar a pipeline de geracao entre o cadastro manual e a validacao da versao.",
"Usar a fila de revisao para acompanhar geracao, validacao, aprovacao e ativacao de cada tool.",
"Conectar publicacoes versionadas ao runtime de produto com rollback controlado.",
],
}
def build_contracts_payload(self) -> dict:
return {
"publication_source_service": ServiceName.ADMIN,
"publication_target_service": ServiceName.PRODUCT,
"lifecycle_statuses": self.build_lifecycle_payload(),
"parameter_types": [
{
"code": parameter_type,
"label": parameter_type.value.upper(),
"description": _PARAMETER_TYPE_DESCRIPTIONS[parameter_type],
}
for parameter_type in ToolParameterType
],
"publication_fields": [
"source_service",
"target_service",
"publication_id",
"published_tool",
"emitted_at",
],
"published_tool_fields": [
"tool_name",
"display_name",
"description",
"version",
"status",
"parameters",
"implementation_module",
"implementation_callable",
"checksum",
"published_at",
"published_by",
],
}
def build_draft_form_payload(
self,
*,
submitter_role: StaffRole | str | None = None,
) -> dict:
submission_policy = self._build_submission_policy(submitter_role=submitter_role)
submitter_note = (
"Sua sessao pode cadastrar e salvar o draft, mas nao publica a tool diretamente."
if not submission_policy["submitter_can_publish_now"]
else "Mesmo com permissao de publicacao, este formulario sempre salva a tool primeiro como draft versionado."
)
return {
"mode": "validated_preview",
"submission_policy": submission_policy,
"domain_options": [
{
"value": option.value,
"label": option.label,
"description": option.description,
}
for option in INTAKE_DOMAIN_OPTIONS
],
"parameter_types": [
{
"code": parameter_type,
"label": parameter_type.value.upper(),
"description": _PARAMETER_TYPE_DESCRIPTIONS[parameter_type],
}
for parameter_type in ToolParameterType
],
"naming_rules": [
"tool_name deve usar snake_case minusculo, sem espacos, com 3 a 64 caracteres.",
"tool_name nao pode reutilizar nomes reservados pelo catalogo core ja publicado.",
"display_name deve explicar claramente a acao operacional que o bot vai executar.",
"Cada parametro precisa de nome, tipo, descricao e marcador de obrigatoriedade.",
],
"submission_notes": [
"O colaborador pode preencher, validar e persistir o draft da tool no painel.",
submitter_note,
"Toda tool nova segue para revisao e aprovacao de um diretor antes de qualquer publicacao.",
"Reenvios da mesma tool reaproveitam o draft raiz e geram uma nova versao administrativa.",
],
"approval_notes": [
"Diretor revisa objetivo, parametros e aderencia ao contrato compartilhado.",
"A publicacao para o runtime de produto so pode acontecer apos aprovacao humana.",
"Campos livres e payloads complexos exigem criterio maior na etapa de revisao.",
],
}
def build_drafts_payload(self) -> dict:
if self.draft_repository is None:
return {
"storage_status": "pending_persistence",
"message": (
"A nova tela de cadastro ja valida o pre-cadastro da tool no painel, mas a persistencia de ToolDraft ainda nao foi conectada neste runtime."
),
"drafts": [],
"supported_statuses": [ToolLifecycleStatus.DRAFT],
}
drafts = self.draft_repository.list_drafts(statuses=(ToolLifecycleStatus.DRAFT,))
message = (
"Nenhum draft administrativo salvo ainda."
if not drafts
else f"{len(drafts)} draft(s) administrativo(s) salvo(s) no admin com historico versionado."
)
return {
"storage_status": "admin_database",
"message": message,
"drafts": [self._serialize_draft_summary(draft) for draft in drafts],
"supported_statuses": [ToolLifecycleStatus.DRAFT],
}
def build_review_queue_payload(self) -> dict:
queued_versions = self._list_latest_versions(statuses=_REVIEW_QUEUE_STATUSES)
message = (
"Nenhuma versao aguardando execucao do pipeline, validacao, aprovacao ou publicacao."
if not queued_versions
else f"{len(queued_versions)} versao(oes) em alguma etapa do pipeline antes da ativacao."
)
return {
"queue_mode": "governed_admin_queue",
"message": message,
"items": [self._serialize_review_queue_entry(version) for version in queued_versions],
"supported_statuses": list(_REVIEW_QUEUE_STATUSES),
}
def build_review_detail_payload(self, version_id: str) -> dict:
if (
self.draft_repository is None
or self.version_repository is None
or self.metadata_repository is None
):
raise RuntimeError(
"Fluxo de governanca de tools ainda nao esta completamente conectado ao armazenamento administrativo."
)
normalized_version_id = str(version_id or "").strip().lower()
version = self.version_repository.get_by_version_id(normalized_version_id)
if version is None:
raise LookupError("Versao administrativa nao encontrada.")
draft = self.draft_repository.get_by_tool_name(version.tool_name)
if draft is None:
raise RuntimeError("Draft raiz da tool nao encontrado para a versao governada.")
metadata = self.metadata_repository.get_by_tool_version_id(version.id)
if metadata is None:
raise RuntimeError("Metadados persistidos da versao nao encontrados para a governanca administrativa.")
validation_payload = {}
if self.artifact_repository is not None:
validation_artifact = self.artifact_repository.get_by_tool_version_and_kind(
version.id,
ToolArtifactKind.VALIDATION_REPORT,
)
if validation_artifact is not None:
validation_payload = dict(validation_artifact.payload_json or {})
automated_validation = self._extract_latest_automated_validation(version.id)
generated_source_code = str(validation_payload.get("generated_source_code") or "").strip()
worker_execution = self._get_generation_pipeline_worker_execution(version.version_id)
automated_validation_summary = automated_validation.get("summary")
if not generated_source_code and isinstance(worker_execution, dict):
dispatch_state = str(worker_execution.get("dispatch_state") or "").strip().lower()
if dispatch_state == "queued":
automated_validation_summary = "Pipeline enfileirada no worker dedicado aguardando execucao."
elif dispatch_state == "running":
automated_validation_summary = "Pipeline em execucao no worker dedicado do admin."
elif dispatch_state == "failed":
automated_validation_summary = worker_execution.get("last_error") or (
"O worker dedicado falhou antes de concluir a pipeline."
)
return {
"version_id": version.version_id,
"tool_name": version.tool_name,
"display_name": metadata.display_name,
"domain": metadata.domain,
"version_number": version.version_number,
"status": version.status,
"summary": version.summary,
"description": metadata.description,
"business_goal": version.business_goal,
"owner_name": version.owner_display_name,
"parameters": self._serialize_parameters_for_response(metadata.parameters_json),
"queue_entry": self._serialize_review_queue_entry(version, worker_execution=worker_execution),
"automated_validations": list(validation_payload.get("automated_checks") or []),
"automated_validation_summary": automated_validation_summary,
"generated_module": build_generated_tool_module_name(version.tool_name),
"generated_callable": GENERATED_TOOL_ENTRYPOINT,
"generated_source_code": generated_source_code,
"execution": worker_execution,
"human_gate": self._build_human_review_gate(version),
"decision_history": self._list_governance_history_entries(version.id),
"next_steps": self._build_review_detail_next_steps(
version,
bool(generated_source_code),
worker_execution=worker_execution,
),
}
def build_publications_payload(self) -> dict:
publications_by_tool_name = {
publication["tool_name"]: publication
for publication in self.list_publication_catalog()
}
published_metadata_entries = self._list_latest_metadata_entries(
statuses=_PUBLISHED_TOOL_STATUSES,
)
if published_metadata_entries:
for metadata in published_metadata_entries:
publications_by_tool_name[metadata.tool_name] = self._serialize_metadata_publication(
metadata
)
return {
"source": "hybrid_runtime_catalog",
"target_service": ServiceName.PRODUCT,
"publications": list(publications_by_tool_name.values()),
}
return {
"source": "bootstrap_catalog",
"target_service": ServiceName.PRODUCT,
"publications": list(publications_by_tool_name.values()),
}
def run_generation_pipeline(
self,
version_id: str,
*,
runner_staff_account_id: int,
runner_name: str,
runner_role: StaffRole | str,
) -> dict:
normalized_role = normalize_staff_role(runner_role)
if not role_has_permission(normalized_role, AdminPermission.MANAGE_TOOL_DRAFTS):
raise PermissionError(
f"Papel '{normalized_role.value}' sem permissao administrativa '{AdminPermission.MANAGE_TOOL_DRAFTS.value}'."
)
if (
self.draft_repository is None
or self.version_repository is None
or self.metadata_repository is None
):
raise RuntimeError(
"Pipeline de geracao ainda nao esta completamente conectado ao armazenamento administrativo."
)
normalized_version_id = str(version_id or "").strip().lower()
version = self.version_repository.get_by_version_id(normalized_version_id)
if version is None:
raise LookupError("Versao administrativa nao encontrada.")
latest_versions_for_tool = self.version_repository.list_versions(tool_name=version.tool_name)
if latest_versions_for_tool and latest_versions_for_tool[0].version_id != version.version_id:
raise ValueError(
"Somente a versao mais recente da tool pode seguir pelo pipeline de geracao."
)
if version.status not in {ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.FAILED}:
raise ValueError(
f"A pipeline de geracao exige status em (draft, failed), mas a versao esta em '{version.status.value}'."
)
draft = self.draft_repository.get_by_tool_name(version.tool_name)
if draft is None:
raise RuntimeError("Draft raiz da tool nao encontrado para a pipeline de geracao.")
metadata = self.metadata_repository.get_by_tool_version_id(version.id)
if metadata is None:
raise RuntimeError("Metadados persistidos da versao nao encontrados para a pipeline de geracao.")
# ---- Fase 7: GeraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o de cÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³digo via LLM isolado do runtime de atendimento ----
# O tool_generation_service é None em modo de compatibilidade (usa stub).
# Quando presente, gera cÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³digo real usando o modelo do runtime de geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o.
llm_generated_source: str | None = None
llm_generation_model: str | None = None
llm_generation_issues: list[str] = []
if self.tool_generation_service is not None:
preferred_model = str(version.generation_model or "").strip() or None
generation_result = asyncio.run(
self.tool_generation_service.generate_tool_source(
tool_name=version.tool_name,
display_name=metadata.display_name,
domain=metadata.domain,
description=metadata.description,
business_goal=version.business_goal,
parameters=list(metadata.parameters_json or []),
preferred_model=preferred_model,
)
)
llm_generated_source = generation_result.get("generated_source_code")
llm_generation_model = generation_result.get("generation_model_used")
llm_generation_issues = list(generation_result.get("issues") or [])
# ---- fim Fase 7 ----
repository_session = self._resolve_repository_session()
atomic_write_options = {"commit": False} if repository_session is not None else {}
artifact_commit = False if repository_session is not None else None
automated_validation_result: dict | None = None
# Se o LLM falhou (issues presentes e sem cÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³digo), marca FAILED imediatamente.
if llm_generation_issues and not llm_generated_source:
try:
self.version_repository.update_status(
version,
status=ToolLifecycleStatus.FAILED,
**atomic_write_options,
)
self.metadata_repository.update_status(
metadata,
status=ToolLifecycleStatus.FAILED,
**atomic_write_options,
)
self.draft_repository.update_status(
draft,
status=ToolLifecycleStatus.FAILED,
**atomic_write_options,
)
self._persist_generation_pipeline_artifact(
draft=draft,
version=version,
actor_staff_account_id=runner_staff_account_id,
actor_name=runner_name,
actor_role=normalized_role,
llm_generated_source=None,
llm_generation_model=llm_generation_model,
llm_generation_issues=llm_generation_issues,
commit=artifact_commit,
)
if repository_session is not None:
self._commit_repository_session(
repository_session,
draft=draft,
version=version,
)
except Exception:
if repository_session is not None:
repository_session.rollback()
raise
pipeline_snapshot = self._build_pipeline_snapshot(ToolLifecycleStatus.FAILED)
return {
"message": (
"A geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o via LLM falhou antes das validaÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃµes automÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ¡ticas. "
"Verifique os issues de geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o e execute a pipeline novamente."
),
"version_id": version.version_id,
"tool_name": version.tool_name,
"version_number": version.version_number,
"status": ToolLifecycleStatus.FAILED,
"current_step": pipeline_snapshot["current_step"],
"steps": pipeline_snapshot["steps"],
"queue_entry": self._serialize_review_queue_entry(version),
"automated_validations": [],
"next_steps": [
"Verifique o modelo de geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o configurado e se o Vertex AI estÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ¡ acessÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ­vel.",
*[f"Issue: {issue}" for issue in llm_generation_issues],
],
}
try:
self._persist_generation_pipeline_artifact(
draft=draft,
version=version,
actor_staff_account_id=runner_staff_account_id,
actor_name=runner_name,
actor_role=normalized_role,
llm_generated_source=llm_generated_source,
llm_generation_model=llm_generation_model,
llm_generation_issues=llm_generation_issues,
commit=artifact_commit,
)
automated_validation_result = self._execute_automated_contract_validation(
draft=draft,
version=version,
metadata=metadata,
actor_staff_account_id=runner_staff_account_id,
actor_name=runner_name,
llm_generated_source=llm_generated_source,
commit=artifact_commit,
)
pipeline_status = (
ToolLifecycleStatus.GENERATED
if automated_validation_result["passed"]
else ToolLifecycleStatus.FAILED
)
self.version_repository.update_status(
version,
status=pipeline_status,
**atomic_write_options,
)
self.metadata_repository.update_status(
metadata,
status=pipeline_status,
**atomic_write_options,
)
self.draft_repository.update_status(
draft,
status=pipeline_status,
**atomic_write_options,
)
if repository_session is not None:
self._commit_repository_session(
repository_session,
draft=draft,
version=version,
)
except Exception:
if repository_session is not None:
repository_session.rollback()
raise
pipeline_snapshot = self._build_pipeline_snapshot(version.status)
if automated_validation_result and automated_validation_result["passed"]:
message = (
"Pipeline de geracao executado com sucesso e as validacoes automaticas de contrato, assinatura, importacao e smoke tests passaram. "
"A versao agora segue para a proxima etapa de validacao governada."
)
next_steps = [
"Usar a fila de revisao para concluir a validacao governada antes da aprovacao da diretoria.",
"Apenas versoes validadas podem seguir para aprovacao e ativacao no catalogo governado.",
]
else:
message = (
"Pipeline de geracao executado, mas alguma validacao automatica de contrato, assinatura, importacao ou smoke test falhou. "
"A versao foi marcada como failed para ajuste e nova tentativa."
)
next_steps = [
"Ajustar metadados, assinatura esperada, importacao do modulo e smoke tests antes de rodar o pipeline novamente.",
"Enquanto alguma validacao automatica falhar, a versao nao pode seguir para aprovacao e ativacao.",
]
return {
"message": message,
"version_id": version.version_id,
"tool_name": version.tool_name,
"version_number": version.version_number,
"status": version.status,
"current_step": pipeline_snapshot["current_step"],
"steps": pipeline_snapshot["steps"],
"queue_entry": self._serialize_review_queue_entry(version),
"automated_validations": list((automated_validation_result or {}).get("automated_checks") or []),
"next_steps": next_steps,
}
def review_version(
self,
version_id: str,
*,
reviewer_staff_account_id: int,
reviewer_name: str,
reviewer_role: StaffRole | str,
decision_notes: str,
reviewed_generated_code: bool,
) -> dict:
normalized_notes = self._normalize_human_decision_notes(decision_notes)
normalized_version_id = str(version_id or "").strip().lower()
version = (
self.version_repository.get_by_version_id(normalized_version_id)
if self.version_repository is not None
else None
)
if version is not None and version.status == ToolLifecycleStatus.GENERATED:
if not reviewed_generated_code:
raise ValueError(
"A revisao humana exige confirmar que o codigo gerado foi analisado antes da validacao."
)
if not self._version_has_generated_source(normalized_version_id):
raise ValueError(
"A revisao humana exige que a pipeline tenha registrado o codigo completo gerado para esta versao."
)
return self._transition_version_status(
normalized_version_id,
target_status=ToolLifecycleStatus.VALIDATED,
allowed_current_statuses=(ToolLifecycleStatus.GENERATED,),
actor_staff_account_id=reviewer_staff_account_id,
actor_name=reviewer_name,
actor_role=reviewer_role,
required_permission=AdminPermission.REVIEW_TOOL_GENERATIONS,
artifact_kind=ToolArtifactKind.DIRECTOR_REVIEW,
artifact_summary="Revisao inicial de diretor registrada para a versao governada.",
success_message="Versao revisada por diretor com sucesso e pronta para aprovacao.",
decision_notes=normalized_notes,
reviewed_generated_code=reviewed_generated_code,
next_steps=[
"A diretoria ainda precisa aprovar formalmente a versao antes da publicacao.",
"Depois da aprovacao, a publicacao ativa a tool no catalogo governado do produto.",
],
)
def approve_version(
self,
version_id: str,
*,
approver_staff_account_id: int,
approver_name: str,
approver_role: StaffRole | str,
decision_notes: str,
) -> dict:
normalized_notes = self._normalize_human_decision_notes(decision_notes)
return self._transition_version_status(
version_id,
target_status=ToolLifecycleStatus.APPROVED,
allowed_current_statuses=(ToolLifecycleStatus.VALIDATED,),
actor_staff_account_id=approver_staff_account_id,
actor_name=approver_name,
actor_role=approver_role,
required_permission=AdminPermission.REVIEW_TOOL_GENERATIONS,
artifact_kind=ToolArtifactKind.DIRECTOR_APPROVAL,
artifact_summary="Aprovacao de diretor registrada para a versao governada.",
success_message="Versao aprovada por diretor com sucesso e pronta para publicacao.",
decision_notes=normalized_notes,
next_steps=[
"A publicacao administrativa ainda precisa ser executada antes da ativacao.",
"Enquanto a versao estiver apenas aprovada, ela permanece fora do catalogo ativo do produto.",
],
)
def publish_version(
self,
version_id: str,
*,
publisher_staff_account_id: int,
publisher_name: str,
publisher_role: StaffRole | str,
) -> dict:
return self._transition_version_status(
version_id,
target_status=ToolLifecycleStatus.ACTIVE,
allowed_current_statuses=(ToolLifecycleStatus.APPROVED,),
actor_staff_account_id=publisher_staff_account_id,
actor_name=publisher_name,
actor_role=publisher_role,
required_permission=AdminPermission.PUBLISH_TOOLS,
artifact_kind=ToolArtifactKind.PUBLICATION_RELEASE,
artifact_summary="Publicacao administrativa concluida pela diretoria antes da ativacao.",
success_message="Versao publicada com sucesso e ativada no catalogo governado.",
next_steps=[
"A versao ativa agora pode ser consumida pelo runtime governado do produto.",
"Se uma nova versao for publicada para a mesma tool, a ativa anterior sera arquivada automaticamente.",
],
)
def deactivate_version(
self,
version_id: str,
*,
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole | str,
decision_notes: str,
) -> dict:
payload = self._transition_version_status(
version_id,
target_status=ToolLifecycleStatus.ARCHIVED,
allowed_current_statuses=(ToolLifecycleStatus.ACTIVE,),
actor_staff_account_id=actor_staff_account_id,
actor_name=actor_name,
actor_role=actor_role,
required_permission=AdminPermission.PUBLISH_TOOLS,
artifact_kind=ToolArtifactKind.PUBLICATION_DEACTIVATION,
artifact_summary="Publicacao ativa desativada pela diretoria.",
success_message="Versao ativa desativada com sucesso e retirada do catalogo governado.",
decision_notes=self._normalize_human_decision_notes(decision_notes),
next_steps=[
"A versao saiu do catalogo ativo e agora permanece apenas para historico e auditoria.",
"Se houver uma versao arquivada anterior da mesma tool, a diretoria pode executar rollback controlado quando necessario.",
],
)
payload["queue_entry"] = None
return payload
def rollback_version(
self,
version_id: str,
*,
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole | str,
decision_notes: str,
) -> dict:
normalized_role = normalize_staff_role(actor_role)
if not role_has_permission(normalized_role, AdminPermission.PUBLISH_TOOLS):
raise PermissionError(
f"Papel '{normalized_role.value}' sem permissao administrativa '{AdminPermission.PUBLISH_TOOLS.value}'."
)
if (
self.draft_repository is None
or self.version_repository is None
or self.metadata_repository is None
):
raise RuntimeError(
"Fluxo de governanca de tools ainda nao esta completamente conectado ao armazenamento administrativo."
)
normalized_version_id = str(version_id or "").strip().lower()
current_version = self.version_repository.get_by_version_id(normalized_version_id)
if current_version is None:
raise LookupError("Versao administrativa nao encontrada.")
if current_version.status != ToolLifecycleStatus.ACTIVE:
raise ValueError(
f"O rollback exige uma versao atualmente active, mas a versao esta em '{current_version.status.value}'."
)
rollback_version = self._find_latest_archived_version(
tool_name=current_version.tool_name,
excluding_version_id=current_version.id,
)
if rollback_version is None:
raise ValueError("Nenhuma versao arquivada disponivel para rollback controlado desta tool.")
draft = self.draft_repository.get_by_tool_name(current_version.tool_name)
if draft is None:
raise RuntimeError("Draft raiz da tool nao encontrado para o rollback governado.")
current_metadata = self.metadata_repository.get_by_tool_version_id(current_version.id)
rollback_metadata = self.metadata_repository.get_by_tool_version_id(rollback_version.id)
if current_metadata is None or rollback_metadata is None:
raise RuntimeError("Metadados persistidos nao encontrados para executar o rollback governado.")
normalized_notes = self._normalize_human_decision_notes(decision_notes)
repository_session = self._resolve_repository_session()
atomic_write_options = {"commit": False} if repository_session is not None else {}
artifact_commit = False if repository_session is not None else None
try:
self._ensure_human_governance_ready_for_activation(rollback_version.id)
self.version_repository.update_status(
current_version,
status=ToolLifecycleStatus.ARCHIVED,
**atomic_write_options,
)
self.metadata_repository.update_status(
current_metadata,
status=ToolLifecycleStatus.ARCHIVED,
**atomic_write_options,
)
self.version_repository.update_status(
rollback_version,
status=ToolLifecycleStatus.ACTIVE,
**atomic_write_options,
)
self.metadata_repository.update_status(
rollback_metadata,
status=ToolLifecycleStatus.ACTIVE,
**atomic_write_options,
)
self.draft_repository.update_status(
draft,
status=ToolLifecycleStatus.ACTIVE,
**atomic_write_options,
)
self._persist_governance_artifact(
draft=draft,
version=current_version,
artifact_kind=ToolArtifactKind.PUBLICATION_DEACTIVATION,
summary="Versao ativa desativada para permitir rollback controlado.",
previous_status=ToolLifecycleStatus.ACTIVE,
current_status=ToolLifecycleStatus.ARCHIVED,
actor_staff_account_id=actor_staff_account_id,
actor_name=actor_name,
actor_role=normalized_role,
decision_notes=normalized_notes,
extra_payload={
"deactivated_for_rollback": True,
"rollback_target_version_id": rollback_version.version_id,
"rollback_target_version_number": rollback_version.version_number,
},
commit=artifact_commit,
)
self._persist_governance_artifact(
draft=draft,
version=rollback_version,
artifact_kind=ToolArtifactKind.PUBLICATION_ROLLBACK,
summary="Rollback controlado executado para restaurar a versao arquivada no catalogo ativo.",
previous_status=ToolLifecycleStatus.ARCHIVED,
current_status=ToolLifecycleStatus.ACTIVE,
actor_staff_account_id=actor_staff_account_id,
actor_name=actor_name,
actor_role=normalized_role,
decision_notes=normalized_notes,
extra_payload={
"rollback_from_version_id": current_version.version_id,
"rollback_from_version_number": current_version.version_number,
},
commit=artifact_commit,
)
if repository_session is not None:
self._commit_repository_session(
repository_session,
draft=draft,
version=rollback_version,
)
except Exception:
if repository_session is not None:
repository_session.rollback()
raise
self._synchronize_product_runtime_publication_snapshot()
return {
"message": (
"Rollback executado com sucesso e a versao arquivada voltou ao catalogo governado como ativa."
),
"version_id": rollback_version.version_id,
"tool_name": rollback_version.tool_name,
"version_number": rollback_version.version_number,
"status": rollback_version.status,
"queue_entry": None,
"publication": self._serialize_metadata_publication(rollback_metadata),
"next_steps": [
"A versao restaurada voltou ao catalogo ativo do produto sob governanca da diretoria.",
"A versao que estava ativa foi arquivada para manter trilha auditavel do rollback controlado.",
],
}
def _transition_version_status(
self,
version_id: str,
*,
target_status: ToolLifecycleStatus,
allowed_current_statuses: tuple[ToolLifecycleStatus, ...],
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole | str,
required_permission: AdminPermission,
artifact_kind: ToolArtifactKind,
artifact_summary: str,
success_message: str,
decision_notes: str | None = None,
reviewed_generated_code: bool | None = None,
next_steps: list[str],
) -> dict:
normalized_role = normalize_staff_role(actor_role)
if not role_has_permission(normalized_role, required_permission):
raise PermissionError(
f"Papel '{normalized_role.value}' sem permissao administrativa '{required_permission.value}'."
)
if (
self.draft_repository is None
or self.version_repository is None
or self.metadata_repository is None
):
raise RuntimeError(
"Fluxo de governanca de tools ainda nao esta completamente conectado ao armazenamento administrativo."
)
normalized_version_id = str(version_id or "").strip().lower()
version = self.version_repository.get_by_version_id(normalized_version_id)
if version is None:
raise LookupError("Versao administrativa nao encontrada.")
latest_versions_for_tool = self.version_repository.list_versions(tool_name=version.tool_name)
if latest_versions_for_tool and latest_versions_for_tool[0].version_id != version.version_id:
raise ValueError(
"Somente a versao mais recente da tool pode seguir para revisao, aprovacao e publicacao."
)
if version.status not in allowed_current_statuses:
expected_statuses = ", ".join(status.value for status in allowed_current_statuses)
raise ValueError(
f"A transicao solicitada exige status em ({expected_statuses}), mas a versao esta em '{version.status.value}'."
)
draft = self.draft_repository.get_by_tool_name(version.tool_name)
if draft is None:
raise RuntimeError("Draft raiz da tool nao encontrado para a versao governada.")
metadata = self.metadata_repository.get_by_tool_version_id(version.id)
if metadata is None:
raise RuntimeError("Metadados persistidos da versao nao encontrados para a governanca administrativa.")
previous_status = version.status
repository_session = self._resolve_repository_session()
atomic_write_options = {"commit": False} if repository_session is not None else {}
artifact_commit = False if repository_session is not None else None
try:
if target_status == ToolLifecycleStatus.ACTIVE:
self._ensure_human_governance_ready_for_activation(version.id)
self._archive_active_publications(
tool_name=version.tool_name,
excluding_version_id=version.id,
**atomic_write_options,
)
self.version_repository.update_status(
version,
status=target_status,
**atomic_write_options,
)
self.metadata_repository.update_status(
metadata,
status=target_status,
**atomic_write_options,
)
self.draft_repository.update_status(
draft,
status=target_status,
**atomic_write_options,
)
self._persist_governance_artifact(
draft=draft,
version=version,
artifact_kind=artifact_kind,
summary=artifact_summary,
previous_status=previous_status,
current_status=target_status,
actor_staff_account_id=actor_staff_account_id,
actor_name=actor_name,
actor_role=normalized_role,
decision_notes=decision_notes,
reviewed_generated_code=reviewed_generated_code,
commit=artifact_commit,
)
if repository_session is not None:
self._commit_repository_session(
repository_session,
draft=draft,
version=version,
)
except Exception:
if repository_session is not None:
repository_session.rollback()
raise
if target_status in {ToolLifecycleStatus.ACTIVE, ToolLifecycleStatus.ARCHIVED}:
self._synchronize_product_runtime_publication_snapshot()
queue_entry = None
publication = None
if target_status == ToolLifecycleStatus.ACTIVE:
publication = self._serialize_metadata_publication(metadata)
else:
queue_entry = self._serialize_review_queue_entry(version)
return {
"message": success_message,
"version_id": version.version_id,
"tool_name": version.tool_name,
"version_number": version.version_number,
"status": target_status,
"queue_entry": queue_entry,
"publication": publication,
"next_steps": next_steps,
}
def create_draft_submission(
self,
payload: dict,
*,
owner_staff_account_id: int | None = None,
owner_name: str | None = None,
owner_role: StaffRole | str | None = None,
) -> dict:
normalized = self._normalize_draft_payload(payload)
warnings = self._build_intake_warnings(normalized)
required_parameter_count = sum(1 for parameter in normalized["parameters"] if parameter["required"])
summary = self._build_draft_summary(normalized)
stored_parameters = self._serialize_parameters_for_storage(normalized["parameters"])
generation_model = normalized["generation_model"]
submission_policy = self._build_submission_policy(submitter_role=owner_role)
if self.draft_repository is None:
version_number = 1
version_count = 1
version_id = self._build_preview_version_id(normalized["tool_name"], version_number)
return {
"storage_status": "validated_preview",
"message": "Pre-cadastro validado no painel sem publicacao direta. A persistencia definitiva entra na fase de governanca de tools.",
"submission_policy": submission_policy,
"draft_preview": {
"draft_id": f"preview::{normalized['tool_name']}",
"version_id": version_id,
"tool_name": normalized["tool_name"],
"display_name": normalized["display_name"],
"domain": normalized["domain"],
"status": ToolLifecycleStatus.DRAFT,
"summary": summary,
"business_goal": normalized["business_goal"],
"version_number": version_number,
"version_count": version_count,
"parameter_count": len(normalized["parameters"]),
"required_parameter_count": required_parameter_count,
"generation_model": generation_model,
"requires_director_approval": True,
"owner_name": owner_name,
"parameters": normalized["parameters"],
},
"warnings": warnings,
"next_steps": [
"Persistir o draft administrativo em armazenamento proprio do admin na fase 5.",
"Encaminhar a tool para revisao e aprovacao de um diretor.",
"Executar pipeline de geracao, validacao e publicacao antes da ativacao no produto.",
],
}
if owner_staff_account_id is None:
raise ValueError("owner_staff_account_id e obrigatorio para persistir o draft.")
repository_session = self._resolve_repository_session()
atomic_write_options = {"commit": False} if repository_session is not None else {}
artifact_commit = False if repository_session is not None else None
owner_display_name = owner_name or "Autor administrativo"
existing_draft = self.draft_repository.get_by_tool_name(normalized["tool_name"])
next_version_number = self._resolve_next_version_number(normalized["tool_name"], existing_draft)
next_version_count = next_version_number if existing_draft is None else max(existing_draft.version_count + 1, next_version_number)
try:
if existing_draft is None:
draft = self.draft_repository.create(
tool_name=normalized["tool_name"],
display_name=normalized["display_name"],
domain=normalized["domain"],
description=normalized["description"],
business_goal=normalized["business_goal"],
summary=summary,
parameters_json=stored_parameters,
required_parameter_count=required_parameter_count,
current_version_number=next_version_number,
version_count=next_version_count,
generation_model=generation_model,
owner_staff_account_id=owner_staff_account_id,
owner_display_name=owner_display_name,
requires_director_approval=True,
**atomic_write_options,
)
else:
draft = self.draft_repository.update_submission(
existing_draft,
display_name=normalized["display_name"],
domain=normalized["domain"],
description=normalized["description"],
business_goal=normalized["business_goal"],
summary=summary,
parameters_json=stored_parameters,
required_parameter_count=required_parameter_count,
current_version_number=next_version_number,
version_count=next_version_count,
generation_model=generation_model,
owner_staff_account_id=owner_staff_account_id,
owner_display_name=owner_display_name,
requires_director_approval=True,
**atomic_write_options,
)
version = None
if self.version_repository is not None:
version = self.version_repository.create(
draft_id=draft.id,
tool_name=draft.tool_name,
version_number=next_version_number,
summary=summary,
description=normalized["description"],
business_goal=normalized["business_goal"],
parameters_json=stored_parameters,
required_parameter_count=required_parameter_count,
generation_model=generation_model,
owner_staff_account_id=owner_staff_account_id,
owner_display_name=owner_display_name,
status=ToolLifecycleStatus.DRAFT,
requires_director_approval=True,
**atomic_write_options,
)
if version is not None and self.metadata_repository is not None:
self.metadata_repository.upsert_version_metadata(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=draft.tool_name,
display_name=draft.display_name,
domain=draft.domain,
description=draft.description,
parameters_json=stored_parameters,
version_number=version.version_number,
status=version.status,
author_staff_account_id=version.owner_staff_account_id,
author_display_name=version.owner_display_name,
**atomic_write_options,
)
if version is not None and self.artifact_repository is not None:
self._persist_initial_version_artifacts(
draft=draft,
version=version,
summary=summary,
warnings=warnings,
stored_parameters=stored_parameters,
required_parameter_count=required_parameter_count,
generation_model=generation_model,
owner_staff_account_id=owner_staff_account_id,
owner_name=owner_display_name,
commit=artifact_commit,
)
if repository_session is not None:
self._commit_repository_session(
repository_session,
draft=draft,
version=version,
)
except Exception:
if repository_session is not None:
repository_session.rollback()
raise
return {
"storage_status": "admin_database",
"message": "Draft administrativo persistido com sucesso sem publicacao direta, em fluxo versionado e governado.",
"submission_policy": submission_policy,
"draft_preview": self._serialize_draft_preview(draft, version),
"warnings": warnings,
"next_steps": [
f"Executar a pipeline de geracao para a versao v{draft.current_version_number} antes da validacao.",
"Depois da geracao, validar a versao e encaminhar para aprovacao de diretor.",
"Persistir artefatos e publicacoes associados a cada versao governada.",
],
}
def preview_draft_submission(
self,
payload: dict,
*,
owner_name: str | None = None,
owner_role: StaffRole | str | None = None,
) -> dict:
normalized = self._normalize_draft_payload(payload)
warnings = self._build_intake_warnings(normalized)
required_parameter_count = sum(1 for parameter in normalized["parameters"] if parameter["required"])
summary = self._build_draft_summary(normalized)
generation_model = normalized["generation_model"]
submission_policy = self._build_submission_policy(submitter_role=owner_role)
existing_draft = None
if self.draft_repository is not None:
existing_draft = self.draft_repository.get_by_tool_name(normalized["tool_name"])
version_number = self._resolve_next_version_number(normalized["tool_name"], existing_draft)
version_count = version_number if existing_draft is None else max(existing_draft.version_count + 1, version_number)
return {
"storage_status": "validated_preview",
"message": "Pre-cadastro validado no painel com numeracao de versao reservada para a tool, sem publicacao direta nesta etapa.",
"submission_policy": submission_policy,
"draft_preview": {
"draft_id": existing_draft.draft_id if existing_draft is not None else f"preview::{normalized['tool_name']}",
"version_id": self._build_preview_version_id(normalized["tool_name"], version_number),
"tool_name": normalized["tool_name"],
"display_name": normalized["display_name"],
"domain": normalized["domain"],
"status": ToolLifecycleStatus.DRAFT,
"summary": summary,
"business_goal": normalized["business_goal"],
"version_number": version_number,
"version_count": version_count,
"parameter_count": len(normalized["parameters"]),
"required_parameter_count": required_parameter_count,
"generation_model": generation_model,
"requires_director_approval": True,
"owner_name": owner_name,
"parameters": normalized["parameters"],
},
"warnings": warnings,
"next_steps": [
"Persistir a nova versao administrativa para consolidar o historico da tool.",
"Encaminhar a versao para revisao e aprovacao de um diretor.",
"Executar pipeline de geracao, validacao e publicacao antes da ativacao no produto.",
],
}
def build_lifecycle_payload(self) -> list[dict]:
return [
{
"code": stage.code,
"label": stage.label,
"description": stage.description,
"order": stage.order,
"terminal": stage.terminal,
}
for stage in TOOL_LIFECYCLE_STAGES
]
def list_publication_catalog(self) -> list[dict]:
published_at = datetime.now(UTC)
return [
{
"publication_id": f"bootstrap::{entry.tool_name}::v1",
"tool_name": entry.tool_name,
"display_name": entry.display_name,
"description": entry.description,
"domain": entry.domain,
"version": 1,
"status": ToolLifecycleStatus.ACTIVE,
"parameter_count": entry.parameter_count,
"implementation_module": "app.services.tools.handlers",
"implementation_callable": entry.tool_name,
"published_by": "bootstrap_catalog",
"published_at": published_at,
}
for entry in BOOTSTRAP_TOOL_CATALOG
]
def _archive_active_publications(
self,
*,
tool_name: str,
excluding_version_id: int,
commit: bool = True,
) -> None:
if self.version_repository is not None:
for active_version in self.version_repository.list_versions(
tool_name=tool_name,
statuses=(ToolLifecycleStatus.ACTIVE,),
):
if active_version.id == excluding_version_id:
continue
self.version_repository.update_status(
active_version,
status=ToolLifecycleStatus.ARCHIVED,
commit=commit,
)
if self.metadata_repository is not None:
for active_metadata in self.metadata_repository.list_metadata(
tool_name=tool_name,
statuses=(ToolLifecycleStatus.ACTIVE,),
):
if active_metadata.tool_version_id == excluding_version_id:
continue
self.metadata_repository.update_status(
active_metadata,
status=ToolLifecycleStatus.ARCHIVED,
commit=commit,
)
def _persist_governance_artifact(
self,
*,
draft: ToolDraft,
version: ToolVersion,
artifact_kind: ToolArtifactKind,
summary: str,
previous_status: ToolLifecycleStatus,
current_status: ToolLifecycleStatus,
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole,
decision_notes: str | None = None,
reviewed_generated_code: bool | None = None,
extra_payload: dict | None = None,
commit: bool | None = None,
) -> None:
if self.artifact_repository is None:
return
artifact_write_options = {"commit": commit} if commit is not None else {}
self.artifact_repository.upsert_version_artifact(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=version.tool_name,
version_number=version.version_number,
artifact_stage=ToolArtifactStage.GOVERNANCE,
artifact_kind=artifact_kind,
artifact_status=ToolArtifactStatus.SUCCEEDED,
summary=summary,
payload_json=self._build_governance_artifact_payload(
version=version,
artifact_kind=artifact_kind,
previous_status=previous_status,
current_status=current_status,
actor_staff_account_id=actor_staff_account_id,
actor_name=actor_name,
actor_role=actor_role,
decision_notes=decision_notes,
reviewed_generated_code=reviewed_generated_code,
extra_payload=extra_payload,
),
author_staff_account_id=actor_staff_account_id,
author_display_name=actor_name,
**artifact_write_options,
)
@staticmethod
def _build_governance_artifact_payload(
*,
version: ToolVersion,
artifact_kind: ToolArtifactKind,
previous_status: ToolLifecycleStatus,
current_status: ToolLifecycleStatus,
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole,
decision_notes: str | None = None,
reviewed_generated_code: bool | None = None,
extra_payload: dict | None = None,
) -> dict:
payload = {
"source": "director_governance",
"action": artifact_kind.value,
"tool_name": version.tool_name,
"version_id": version.version_id,
"version_number": version.version_number,
"previous_status": previous_status.value,
"current_status": current_status.value,
"actor_staff_account_id": actor_staff_account_id,
"actor_display_name": actor_name,
"actor_role": actor_role.value,
"decision_notes": str(decision_notes or "").strip() or None,
"reviewed_generated_code": reviewed_generated_code,
}
if extra_payload:
payload.update(extra_payload)
return payload
def _persist_initial_version_artifacts(
self,
*,
draft: ToolDraft,
version: ToolVersion,
summary: str,
warnings: list[str],
stored_parameters: list[dict],
required_parameter_count: int,
generation_model: str | None = None,
owner_staff_account_id: int,
owner_name: str,
commit: bool | None = None,
) -> None:
if self.artifact_repository is None:
return
artifact_write_options = {"commit": commit} if commit is not None else {}
generation_payload = self._build_generation_artifact_payload(
draft=draft,
version=version,
summary=summary,
stored_parameters=stored_parameters,
generation_model=generation_model,
)
validation_payload = self._build_validation_artifact_payload(
draft=draft,
version=version,
warnings=warnings,
stored_parameters=stored_parameters,
required_parameter_count=required_parameter_count,
)
self.artifact_repository.upsert_version_artifact(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=draft.tool_name,
version_number=version.version_number,
artifact_stage=ToolArtifactStage.GENERATION,
artifact_kind=ToolArtifactKind.GENERATION_REQUEST,
artifact_status=ToolArtifactStatus.PENDING,
summary="Manifesto inicial de geracao persistido para auditoria da versao.",
payload_json=generation_payload,
author_staff_account_id=owner_staff_account_id,
author_display_name=owner_name,
**artifact_write_options,
)
self.artifact_repository.upsert_version_artifact(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=draft.tool_name,
version_number=version.version_number,
artifact_stage=ToolArtifactStage.VALIDATION,
artifact_kind=ToolArtifactKind.VALIDATION_REPORT,
artifact_status=ToolArtifactStatus.SUCCEEDED,
summary="Relatorio de validacao do pre-cadastro persistido para auditoria da versao.",
payload_json=validation_payload,
author_staff_account_id=owner_staff_account_id,
author_display_name=owner_name,
**artifact_write_options,
)
@staticmethod
def _build_generation_artifact_payload(
*,
draft: ToolDraft,
version: ToolVersion,
summary: str,
stored_parameters: list[dict],
generation_model: str | None = None,
) -> dict:
return {
"source": "admin_draft_intake",
"tool_name": draft.tool_name,
"display_name": draft.display_name,
"domain": draft.domain,
"version_number": version.version_number,
"draft_id": draft.draft_id,
"version_id": version.version_id,
"business_goal": draft.business_goal,
"description": draft.description,
"summary": summary,
"generation_model": generation_model,
"parameters": list(stored_parameters),
"requires_director_approval": draft.requires_director_approval,
"target_package": GENERATED_TOOLS_PACKAGE,
"target_module": build_generated_tool_module_name(draft.tool_name),
"target_file_path": build_generated_tool_module_path(draft.tool_name),
"target_callable": GENERATED_TOOL_ENTRYPOINT,
"reserved_lifecycle_target": ToolLifecycleStatus.GENERATED.value,
}
@staticmethod
def _build_validation_artifact_payload(
*,
draft: ToolDraft,
version: ToolVersion,
warnings: list[str],
stored_parameters: list[dict],
required_parameter_count: int,
) -> dict:
return {
"source": "admin_draft_intake",
"tool_name": draft.tool_name,
"version_number": version.version_number,
"draft_id": draft.draft_id,
"version_id": version.version_id,
"validation_status": "passed",
"warnings": list(warnings),
"parameter_count": len(stored_parameters),
"required_parameter_count": required_parameter_count,
"checked_rules": [
"tool_name_snake_case",
"display_name_min_length",
"domain_catalog",
"description_min_length",
"business_goal_min_length",
"parameter_contracts",
],
}
def _execute_automated_contract_validation(
self,
*,
draft: ToolDraft,
version: ToolVersion,
metadata: ToolMetadata,
actor_staff_account_id: int,
actor_name: str,
llm_generated_source: str | None = None,
commit: bool | None = None,
) -> dict:
previous_validation_payload = {}
if self.artifact_repository is not None:
existing_validation_artifact = self.artifact_repository.get_by_tool_version_and_kind(
version.id,
ToolArtifactKind.VALIDATION_REPORT,
)
if existing_validation_artifact is not None:
previous_validation_payload = dict(existing_validation_artifact.payload_json or {})
contract_validation_issues = self._collect_tool_contract_validation_issues(
version=version,
metadata=metadata,
)
signature_schema_blueprint = self._build_generated_signature_and_parameter_schema(
metadata=metadata,
)
signature_schema_issues = list(signature_schema_blueprint["issues"])
import_loading_result = self._validate_generated_tool_import_loading(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
llm_generated_source=llm_generated_source,
)
smoke_test_result = self._run_generated_tool_minimal_smoke_tests(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
import_loading_result=import_loading_result,
llm_generated_source=llm_generated_source,
)
automated_checks = [
{
"key": "tool_contract",
"label": "Contrato da tool",
"status": "passed" if not contract_validation_issues else "failed",
"summary": (
"O contrato compartilhado da tool foi validado automaticamente com sucesso."
if not contract_validation_issues
else "A validacao automatica do contrato encontrou inconsistencias bloqueantes."
),
"blocking_issues": list(contract_validation_issues),
},
{
"key": "tool_signature_schema",
"label": "Assinatura e schema de parametros",
"status": "passed" if not signature_schema_issues else "failed",
"summary": (
"A assinatura esperada do entrypoint run e o schema dos parametros foram validados automaticamente."
if not signature_schema_issues
else "A validacao automatica da assinatura esperada e do schema dos parametros encontrou inconsistencias bloqueantes."
),
"blocking_issues": list(signature_schema_issues),
},
{
"key": "tool_import_loading",
"label": "Importacao e carregamento da tool",
"status": "passed" if import_loading_result["passed"] else "failed",
"summary": (
"O modulo gerado pode ser importado e o runtime conseguiu carregar o entrypoint run."
if import_loading_result["passed"]
else "A validacao automatica de importacao e carregamento da tool encontrou inconsistencias bloqueantes."
),
"blocking_issues": list(import_loading_result["issues"]),
},
{
"key": "tool_smoke_tests",
"label": "Testes minimos automaticos",
"status": "passed" if smoke_test_result["passed"] else "failed",
"summary": (
"Os testes minimos automaticos executaram o entrypoint gerado e o runtime sandboxado com sucesso."
if smoke_test_result["passed"]
else "Os testes minimos automaticos da tool encontraram inconsistencias bloqueantes."
),
"blocking_issues": list(smoke_test_result["issues"]),
},
]
all_validation_issues = [
*contract_validation_issues,
*signature_schema_issues,
*import_loading_result["issues"],
*smoke_test_result["issues"],
]
passed = all(check["status"] == "passed" for check in automated_checks)
validation_payload = self._build_automated_validation_artifact_payload(
draft=draft,
version=version,
metadata=metadata,
intake_validation=previous_validation_payload,
automated_checks=automated_checks,
validation_issues=all_validation_issues,
signature_schema_blueprint=signature_schema_blueprint,
import_loading_result=import_loading_result,
smoke_test_result=smoke_test_result,
)
if self.artifact_repository is not None:
artifact_write_options = {"commit": commit} if commit is not None else {}
self.artifact_repository.upsert_version_artifact(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=draft.tool_name,
version_number=version.version_number,
artifact_stage=ToolArtifactStage.VALIDATION,
artifact_kind=ToolArtifactKind.VALIDATION_REPORT,
artifact_status=(
ToolArtifactStatus.SUCCEEDED if passed else ToolArtifactStatus.FAILED
),
summary=(
"Validacoes automaticas de contrato, assinatura, importacao e testes minimos concluidas para a versao governada."
if passed
else "Validacoes automaticas de contrato, assinatura, importacao e testes minimos falharam para a versao governada."
),
payload_json=validation_payload,
author_staff_account_id=actor_staff_account_id,
author_display_name=actor_name,
**artifact_write_options,
)
return {
"passed": passed,
"automated_checks": automated_checks,
"validation_payload": validation_payload,
}
def _build_automated_validation_artifact_payload(
self,
*,
draft: ToolDraft,
version: ToolVersion,
metadata: ToolMetadata,
intake_validation: dict,
automated_checks: list[dict],
validation_issues: list[str],
signature_schema_blueprint: dict,
import_loading_result: dict,
smoke_test_result: dict,
) -> dict:
publication_envelope = None
if not validation_issues:
publication_envelope = self._build_generated_publication_envelope(
version=version,
metadata=metadata,
).model_dump(mode="json")
return {
"source": "admin_generation_pipeline",
"tool_name": draft.tool_name,
"version_number": version.version_number,
"draft_id": draft.draft_id,
"version_id": version.version_id,
"validation_status": "passed" if not validation_issues else "failed",
"validation_scope": "tool_contract",
"warnings": list((intake_validation or {}).get("warnings") or []),
"blocking_issues": list(validation_issues),
"parameter_count": len(version.parameters_json or []),
"required_parameter_count": version.required_parameter_count,
"checked_rules": list(
_AUTOMATED_CONTRACT_VALIDATION_RULES
+ _AUTOMATED_SIGNATURE_SCHEMA_VALIDATION_RULES
+ _AUTOMATED_IMPORT_LOADING_VALIDATION_RULES
+ _AUTOMATED_SMOKE_TEST_RULES
),
"intake_validation": dict(intake_validation or {}),
"automated_checks": list(automated_checks),
"signature_schema": dict(signature_schema_blueprint),
"import_loading": dict(import_loading_result),
"smoke_tests": dict(smoke_test_result),
"generated_source_code": (
str(import_loading_result.get("rendered_source") or "").strip()
or self._render_generated_tool_module_source(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
)
),
"publication_envelope": publication_envelope,
}
def _build_generated_signature_and_parameter_schema(
self,
*,
metadata: ToolMetadata,
) -> dict:
serialized_parameters = self._serialize_parameters_for_response(metadata.parameters_json)
signature_parameters: list[inspect.Parameter] = []
required_parameters: list[str] = []
optional_parameters: list[str] = []
parameter_schema_properties: dict[str, dict] = {}
issues: list[str] = []
for parameter in serialized_parameters:
parameter_name = parameter["name"]
parameter_type = parameter["parameter_type"]
parameter_schema_properties[parameter_name] = {
"type": _PARAMETER_SCHEMA_TYPE_MAPPING[parameter_type],
"description": parameter["description"],
}
if parameter_type == ToolParameterType.OBJECT:
parameter_schema_properties[parameter_name]["additionalProperties"] = True
if parameter_name in _SIGNATURE_RESERVED_PARAMETER_NAMES:
issues.append(
f"parameter '{parameter_name}' is reserved for runtime-injected context and cannot be declared in the generated tool signature."
)
if parameter["required"]:
required_parameters.append(parameter_name)
else:
optional_parameters.append(parameter_name)
try:
signature_parameters.append(
inspect.Parameter(
parameter_name,
inspect.Parameter.KEYWORD_ONLY,
default=(
inspect.Parameter.empty
if parameter["required"]
else None
),
)
)
except ValueError as exc:
issues.append(
f"parameter '{parameter_name}' cannot be represented in the generated entrypoint signature: {exc}"
)
try:
generated_signature = inspect.Signature(parameters=signature_parameters)
signature_text = f"{GENERATED_TOOL_ENTRYPOINT}{generated_signature}"
except ValueError as exc:
signature_text = None
issues.append(f"generated entrypoint signature is invalid: {exc}")
return {
"callable_name": GENERATED_TOOL_ENTRYPOINT,
"signature": signature_text,
"parameter_mode": "keyword_only",
"runtime_injected_arguments": ["user_id"],
"required_parameters": required_parameters,
"optional_parameters": optional_parameters,
"parameter_schema": {
"type": "object",
"properties": parameter_schema_properties,
"required": required_parameters,
"additionalProperties": False,
},
"issues": issues,
}
def _load_generated_tool_handler_in_memory(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
signature_schema_blueprint: dict,
llm_generated_source: str | None = None,
) -> dict:
module_name = build_generated_tool_module_name(version.tool_name)
module_path = build_generated_tool_module_path(version.tool_name)
package_name = GENERATED_TOOLS_PACKAGE
rendered_source = self._render_generated_tool_module_source(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
pregenerated_source=llm_generated_source,
)
issues: list[str] = []
handler = None
loaded_signature = None
sandbox_package_root = f"in_memory::{package_name}"
previous_package_module = sys.modules.pop(package_name, None)
previous_tool_module = sys.modules.pop(module_name, None)
try:
package_module = types.ModuleType(package_name)
package_module.__file__ = f"{package_name}/__init__.py"
package_module.__package__ = package_name
package_module.__path__ = [sandbox_package_root]
sys.modules[package_name] = package_module
module = types.ModuleType(module_name)
module.__file__ = module_path
module.__package__ = package_name
sys.modules[module_name] = module
compiled_module = compile(rendered_source, module_path, "exec")
exec(compiled_module, module.__dict__)
handler = getattr(module, GENERATED_TOOL_ENTRYPOINT, None)
if handler is None:
issues.append(
f"generated module '{module_name}' does not expose the governed entrypoint '{GENERATED_TOOL_ENTRYPOINT}'."
)
else:
loaded_signature = f"{handler.__name__}{inspect.signature(handler)}"
if not inspect.iscoroutinefunction(handler):
issues.append(
f"generated module '{module_name}' must expose an async '{GENERATED_TOOL_ENTRYPOINT}' callable."
)
except Exception as exc:
issues.append(
f"generated module import failed: {exc.__class__.__name__}: {exc}"
)
finally:
sys.modules.pop(module_name, None)
sys.modules.pop(package_name, None)
if previous_package_module is not None:
sys.modules[package_name] = previous_package_module
if previous_tool_module is not None:
sys.modules[module_name] = previous_tool_module
return {
"module_name": module_name,
"module_path": module_path,
"loaded_callable": GENERATED_TOOL_ENTRYPOINT,
"loaded_signature": loaded_signature,
"sandbox_package_root": sandbox_package_root,
"rendered_source": rendered_source,
"handler": handler,
"issues": issues,
}
def _validate_generated_tool_import_loading(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
signature_schema_blueprint: dict,
llm_generated_source: str | None = None,
) -> dict:
if signature_schema_blueprint["issues"]:
return {
"passed": False,
"module_name": build_generated_tool_module_name(version.tool_name),
"module_path": build_generated_tool_module_path(version.tool_name),
"loaded_callable": GENERATED_TOOL_ENTRYPOINT,
"loaded_signature": None,
"sandbox_package_root": None,
"rendered_source": self._render_generated_tool_module_source(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
pregenerated_source=llm_generated_source,
),
"issues": [
"generated import/loading validation skipped because the signature/schema blueprint is invalid."
],
}
load_result = self._load_generated_tool_handler_in_memory(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
llm_generated_source=llm_generated_source,
)
issues = list(load_result["issues"])
handler = load_result["handler"]
loaded_signature = load_result["loaded_signature"]
if handler is not None and loaded_signature != signature_schema_blueprint["signature"]:
issues.append(
"loaded entrypoint signature differs from the validated signature/schema blueprint."
)
if handler is not None and not issues:
try:
registry = ToolRegistry.__new__(ToolRegistry)
registry._tools = []
registry.register_generated_tool(
name=version.tool_name,
description=metadata.description,
parameters=list(metadata.parameters_json or []),
handler=handler,
)
except GeneratedToolCoreBoundaryViolation as exc:
issues.append(str(exc))
return {
"passed": not issues,
"module_name": load_result["module_name"],
"module_path": load_result["module_path"],
"loaded_callable": load_result["loaded_callable"],
"loaded_signature": loaded_signature,
"sandbox_package_root": load_result["sandbox_package_root"],
"rendered_source": load_result["rendered_source"],
"issues": issues,
}
def _run_generated_tool_minimal_smoke_tests(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
signature_schema_blueprint: dict,
import_loading_result: dict,
llm_generated_source: str | None = None,
) -> dict:
if signature_schema_blueprint["issues"]:
return {
"passed": False,
"module_name": build_generated_tool_module_name(version.tool_name),
"module_path": build_generated_tool_module_path(version.tool_name),
"sandbox_package_root": None,
"invocation_arguments": {},
"direct_result_type": None,
"runtime_result_type": None,
"issues": [
"generated smoke tests skipped because the signature/schema blueprint is invalid."
],
}
if not import_loading_result["passed"]:
return {
"passed": False,
"module_name": build_generated_tool_module_name(version.tool_name),
"module_path": build_generated_tool_module_path(version.tool_name),
"sandbox_package_root": import_loading_result.get("sandbox_package_root"),
"invocation_arguments": {},
"direct_result_type": None,
"runtime_result_type": None,
"issues": [
"generated smoke tests skipped because import/loading validation did not pass."
],
}
load_result = self._load_generated_tool_handler_in_memory(
version=version,
metadata=metadata,
signature_schema_blueprint=signature_schema_blueprint,
llm_generated_source=llm_generated_source,
)
issues = list(load_result["issues"])
handler = load_result["handler"]
invocation_arguments = self._build_generated_tool_smoke_test_arguments(metadata.parameters_json)
direct_result_type = None
runtime_result_type = None
if handler is not None and not issues:
try:
direct_result = asyncio.run(handler(**invocation_arguments))
direct_result_type = type(direct_result).__name__
if direct_result is None:
issues.append("generated entrypoint smoke test returned no payload.")
else:
json.dumps(direct_result)
except TypeError as exc:
issues.append(
f"generated entrypoint smoke test returned a non-JSON-serializable payload: {exc}"
)
except Exception as exc:
issues.append(
f"generated entrypoint smoke test failed: {exc.__class__.__name__}: {exc}"
)
if handler is not None and not issues:
try:
registry = ToolRegistry.__new__(ToolRegistry)
registry._tools = []
registry.register_generated_tool(
name=version.tool_name,
description=metadata.description,
parameters=list(metadata.parameters_json or []),
handler=handler,
)
runtime_result = asyncio.run(
registry.execute(version.tool_name, invocation_arguments)
)
runtime_result_type = type(runtime_result).__name__
if runtime_result is None:
issues.append("generated runtime smoke test returned no payload.")
else:
json.dumps(runtime_result)
except TypeError as exc:
issues.append(
f"generated runtime smoke test returned a non-JSON-serializable payload: {exc}"
)
except Exception as exc:
issues.append(
f"generated runtime smoke test failed: {exc.__class__.__name__}: {exc}"
)
return {
"passed": not issues,
"module_name": load_result["module_name"],
"module_path": load_result["module_path"],
"sandbox_package_root": load_result["sandbox_package_root"],
"invocation_arguments": dict(invocation_arguments),
"direct_result_type": direct_result_type,
"runtime_result_type": runtime_result_type,
"issues": issues,
}
def _build_generated_tool_smoke_test_arguments(
self,
parameters_json: list[dict] | None,
) -> dict[str, object]:
serialized_parameters = self._serialize_parameters_for_response(parameters_json)
return {
parameter["name"]: self._build_generated_tool_smoke_test_argument_value(parameter)
for parameter in serialized_parameters
}
@staticmethod
def _build_generated_tool_smoke_test_argument_value(parameter: dict) -> object:
parameter_name = str(parameter.get("name") or "value").strip().lower() or "value"
parameter_type = parameter.get("parameter_type", ToolParameterType.STRING)
if parameter_type == ToolParameterType.INTEGER:
return 1
if parameter_type == ToolParameterType.NUMBER:
return 1.5
if parameter_type == ToolParameterType.BOOLEAN:
return True
if parameter_type == ToolParameterType.OBJECT:
return {"sample": parameter_name}
if parameter_type == ToolParameterType.ARRAY:
return [f"sample_{parameter_name}"]
return f"sample_{parameter_name}"
def _render_generated_tool_module_source(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
signature_schema_blueprint: dict,
pregenerated_source: str | None = None,
) -> str:
# Fase 7: quando o LLM gerou cÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³digo real, usa diretamente.
# O smoke test e o import loading validarÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o o cÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ³digo LLM com o mesmo rigor do stub.
if pregenerated_source:
return pregenerated_source
# Fallback: stub de validaÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o estrutural (sem tool_generation_service ou se gerou None).
serialized_parameters = self._serialize_parameters_for_response(metadata.parameters_json)
if serialized_parameters:
signature_tokens = []
response_argument_lines = []
for parameter in serialized_parameters:
parameter_name = parameter["name"]
if parameter["required"]:
signature_tokens.append(parameter_name)
else:
signature_tokens.append(f"{parameter_name}=None")
response_argument_lines.append(f' "{parameter_name}": {parameter_name},')
function_signature = f"*, {', '.join(signature_tokens)}"
response_arguments = "\n".join(response_argument_lines)
response_payload = (
' "received_arguments": {\n'
f"{response_arguments}\n"
' },\n'
)
else:
function_signature = ""
response_payload = ' "received_arguments": {},\n'
return (
f'"""Admin-governed generated tool scaffold for {version.tool_name} v{version.version_number}."""\n\n'
f"async def {GENERATED_TOOL_ENTRYPOINT}({function_signature}):\n"
" return {\n"
f' "tool_name": "{version.tool_name}",\n'
f' "version": {version.version_number},\n'
' "runtime_status": "generated_validation_stub",\n'
f"{response_payload}"
" }\n"
)
def _collect_tool_contract_validation_issues(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
) -> list[str]:
issues: list[str] = []
tool_name = str(metadata.tool_name or "").strip().lower()
display_name = str(metadata.display_name or "").strip()
description = str(metadata.description or "").strip()
expected_metadata_id = f"tool_metadata::{tool_name}::v{int(metadata.version_number)}"
if not _TOOL_NAME_PATTERN.fullmatch(tool_name):
issues.append("tool_name persisted is invalid for the shared publication contract.")
if len(display_name) < 4:
issues.append("display_name persisted must have at least 4 characters for publication.")
if len(description) < 16:
issues.append("description persisted must have at least 16 characters for publication.")
if str(metadata.metadata_id or "").strip().lower() != expected_metadata_id:
issues.append("metadata_id persisted is inconsistent with the governed version identifier.")
seen_parameter_names: set[str] = set()
for raw_parameter in metadata.parameters_json or []:
parameter_name = str((raw_parameter or {}).get("name") or "").strip().lower()
parameter_description = str((raw_parameter or {}).get("description") or "").strip()
parameter_type = str((raw_parameter or {}).get("parameter_type") or "").strip().lower()
if not _PARAMETER_NAME_PATTERN.fullmatch(parameter_name):
issues.append(f"parameter '{parameter_name or '<empty>'}' violates the shared naming contract.")
if parameter_name in seen_parameter_names:
issues.append(f"parameter '{parameter_name}' is duplicated in the persisted contract.")
seen_parameter_names.add(parameter_name)
if parameter_type not in {item.value for item in ToolParameterType}:
issues.append(f"parameter '{parameter_name or '<empty>'}' uses an unsupported parameter_type.")
if len(parameter_description) < 8:
issues.append(f"parameter '{parameter_name or '<empty>'}' must describe its contract with at least 8 characters.")
try:
self._build_generated_publication_envelope(version=version, metadata=metadata)
except (ValidationError, ValueError) as exc:
issues.extend(self._format_contract_validation_errors(exc))
return issues
def _build_generated_publication_envelope(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
) -> ToolPublicationEnvelope:
parameters = tuple(
ToolParameterContract(
name=parameter["name"],
parameter_type=parameter["parameter_type"],
description=parameter["description"],
required=parameter["required"],
)
for parameter in self._serialize_parameters_for_response(metadata.parameters_json)
)
published_tool = PublishedToolContract(
tool_name=metadata.tool_name,
display_name=metadata.display_name,
description=metadata.description,
version=metadata.version_number,
status=ToolLifecycleStatus.GENERATED,
parameters=parameters,
implementation_module=build_generated_tool_module_name(version.tool_name),
implementation_callable=GENERATED_TOOL_ENTRYPOINT,
)
return ToolPublicationEnvelope(
source_service=ServiceName.ADMIN,
target_service=ServiceName.PRODUCT,
publication_id=metadata.metadata_id,
published_tool=published_tool,
emitted_at=datetime.now(UTC),
)
def _get_version_by_tool_version_id(self, tool_version_id: int) -> ToolVersion | None:
if self.version_repository is None:
return None
for version in self.version_repository.list_versions():
if version.id == tool_version_id:
return version
return None
def _get_generated_source_code_for_version(self, tool_version_id: int) -> str:
if self.artifact_repository is None:
raise RuntimeError(
"Nao foi possivel sincronizar o runtime do product sem os artefatos de validacao da tool."
)
validation_artifact = self.artifact_repository.get_by_tool_version_and_kind(
tool_version_id,
ToolArtifactKind.VALIDATION_REPORT,
)
if validation_artifact is None:
raise RuntimeError("Artefato de validacao nao encontrado para sincronizar a tool publicada no product.")
generated_source_code = str((validation_artifact.payload_json or {}).get("generated_source_code") or "").strip()
if not generated_source_code:
raise RuntimeError("O codigo gerado da tool publicada nao foi encontrado para sincronizacao do runtime.")
return generated_source_code
def _build_published_runtime_envelope(
self,
*,
version: ToolVersion,
metadata: ToolMetadata,
generated_source_code: str,
) -> ToolPublicationEnvelope:
generated_envelope = self._build_generated_publication_envelope(version=version, metadata=metadata)
published_tool = generated_envelope.published_tool.model_copy(
update={
"status": ToolLifecycleStatus.ACTIVE,
"checksum": hashlib.sha256(generated_source_code.encode("utf-8")).hexdigest(),
"published_at": metadata.updated_at or metadata.created_at,
"published_by": metadata.author_display_name,
}
)
return generated_envelope.model_copy(
update={
"published_tool": published_tool,
"emitted_at": datetime.now(UTC),
}
)
@staticmethod
def _write_runtime_snapshot_file(path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_suffix(f"{path.suffix}.tmp")
temp_path.write_text(content, encoding="utf-8")
temp_path.replace(path)
def _synchronize_product_runtime_publication_snapshot(self) -> None:
runtime_dir = get_generated_tools_runtime_dir()
runtime_dir.mkdir(parents=True, exist_ok=True)
init_file = runtime_dir / "__init__.py"
if not init_file.exists():
init_file.write_text(
"\"\"\"Isolated runtime package for admin-governed generated tools.\"\"\"\\n",
encoding="utf-8",
)
active_metadata_entries = self._list_latest_metadata_entries(statuses=_PUBLISHED_TOOL_STATUSES)
publication_envelopes: list[ToolPublicationEnvelope] = []
for metadata in active_metadata_entries:
version = self._get_version_by_tool_version_id(metadata.tool_version_id)
if version is None:
raise RuntimeError(
f"Versao publicada nao encontrada para sincronizar o runtime da tool '{metadata.tool_name}'."
)
generated_source_code = self._get_generated_source_code_for_version(version.id)
self._write_runtime_snapshot_file(
build_generated_tool_file_path(metadata.tool_name),
generated_source_code,
)
publication_envelopes.append(
self._build_published_runtime_envelope(
version=version,
metadata=metadata,
generated_source_code=generated_source_code,
)
)
manifest = ToolRuntimePublicationManifest(
source_service=ServiceName.ADMIN,
target_service=ServiceName.PRODUCT,
emitted_at=datetime.now(UTC),
publications=tuple(publication_envelopes),
)
self._write_runtime_snapshot_file(
get_generated_tool_publication_manifest_path(),
json.dumps(manifest.model_dump(mode="json"), ensure_ascii=True, indent=2, sort_keys=True),
)
@staticmethod
def _format_contract_validation_errors(error: ValidationError | ValueError) -> list[str]:
if isinstance(error, ValidationError):
return [
f"{'.'.join(str(item) for item in issue['loc'])}: {issue['msg']}"
for issue in error.errors()
]
return [str(error)]
def _persist_generation_pipeline_artifact(
self,
*,
draft: ToolDraft,
version: ToolVersion,
actor_staff_account_id: int,
actor_name: str,
actor_role: StaffRole,
llm_generated_source: str | None = None,
llm_generation_model: str | None = None,
llm_generation_issues: list[str] | None = None,
commit: bool | None = None,
) -> None:
if self.artifact_repository is None:
return
artifact_write_options = {"commit": commit} if commit is not None else {}
generation_payload = self._build_generation_artifact_payload(
draft=draft,
version=version,
summary=version.summary,
stored_parameters=list(version.parameters_json or []),
)
generation_payload.update(
{
"source": "admin_generation_pipeline_llm" if llm_generated_source else "admin_generation_pipeline",
"pipeline_status": "completed",
"triggered_by": actor_name,
"triggered_by_role": actor_role.value,
"generated_at": datetime.now(UTC).isoformat(),
# ---- Fase 7: rastreabilidade da geraÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ§ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢â¬Â ÃƒÂ¢Ã¢â€šÂ¬Ã¢â€žÂ¢ÃƒÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã ÃÆÃ¢Ã¢ââ¬Å¡Ã¬Ã¢ââ¬Å¾Ã¢ÃÆÃ†â€™Ãƒâ€ Ã¢â¬â„¢ÃƒÆÃ¢Ã¢ââ¬Å¡Ã¬Ã…Ã¡ÃÆÃ†â€™ÃƒÂ¢Ã¢â€šÂ¬Ã…¡ÃÆÃ¢â¬Å¡Ãƒâ€šÃ£o via LLM ----
"generation_model_used": llm_generation_model,
"generation_issues": list(llm_generation_issues or []),
"generation_source": "llm" if llm_generated_source else "stub",
}
)
self.artifact_repository.upsert_version_artifact(
draft_id=draft.id,
tool_version_id=version.id,
tool_name=version.tool_name,
version_number=version.version_number,
artifact_stage=ToolArtifactStage.GENERATION,
artifact_kind=ToolArtifactKind.GENERATION_REQUEST,
artifact_status=ToolArtifactStatus.SUCCEEDED,
summary="Pipeline de geracao concluido para a versao administrativa.",
payload_json=generation_payload,
author_staff_account_id=actor_staff_account_id,
author_display_name=actor_name,
**artifact_write_options,
)
def _build_pipeline_snapshot(self, status: ToolLifecycleStatus) -> dict:
normalized_status = (
status
if isinstance(status, ToolLifecycleStatus)
else ToolLifecycleStatus(str(status or "").strip().lower())
)
current_step_by_status = {
ToolLifecycleStatus.DRAFT: "generation",
ToolLifecycleStatus.GENERATED: "validation",
ToolLifecycleStatus.VALIDATED: "approval",
ToolLifecycleStatus.APPROVED: "activation",
ToolLifecycleStatus.ACTIVE: "activation",
ToolLifecycleStatus.FAILED: "generation",
ToolLifecycleStatus.ARCHIVED: "activation",
}
step_states_by_status = {
ToolLifecycleStatus.DRAFT: {
"manual_intake": "completed",
"generation": "current",
"validation": "pending",
"approval": "pending",
"activation": "pending",
},
ToolLifecycleStatus.GENERATED: {
"manual_intake": "completed",
"generation": "completed",
"validation": "current",
"approval": "pending",
"activation": "pending",
},
ToolLifecycleStatus.VALIDATED: {
"manual_intake": "completed",
"generation": "completed",
"validation": "completed",
"approval": "current",
"activation": "pending",
},
ToolLifecycleStatus.APPROVED: {
"manual_intake": "completed",
"generation": "completed",
"validation": "completed",
"approval": "completed",
"activation": "current",
},
ToolLifecycleStatus.ACTIVE: {
"manual_intake": "completed",
"generation": "completed",
"validation": "completed",
"approval": "completed",
"activation": "completed",
},
ToolLifecycleStatus.FAILED: {
"manual_intake": "completed",
"generation": "failed",
"validation": "pending",
"approval": "pending",
"activation": "pending",
},
ToolLifecycleStatus.ARCHIVED: {
"manual_intake": "completed",
"generation": "completed",
"validation": "completed",
"approval": "completed",
"activation": "completed",
},
}
descriptions = {
"manual_intake": "Cadastro manual consolidado no admin e pronto para seguir no pipeline.",
"generation": "Geracao da implementacao isolada da tool dentro do namespace governado.",
"validation": "Validacao da versao gerada antes da aprovacao humana e da ativacao.",
"approval": "Aprovacao humana da diretoria antes da publicacao controlada.",
"activation": "Ativacao da versao aprovada no catalogo governado do produto.",
}
labels = {
"manual_intake": "Cadastro manual",
"generation": "Geracao",
"validation": "Validacao",
"approval": "Aprovacao",
"activation": "Ativacao",
}
step_states = step_states_by_status[normalized_status]
return {
"current_step": current_step_by_status[normalized_status],
"steps": [
{
"key": step_key,
"label": labels[step_key],
"state": step_states[step_key],
"description": descriptions[step_key],
}
for step_key in ("manual_intake", "generation", "validation", "approval", "activation")
],
}
def _list_latest_versions(
self,
*,
statuses: tuple[ToolLifecycleStatus, ...] | None = None,
) -> list[ToolVersion]:
if self.version_repository is None:
return []
latest_by_tool_name: dict[str, ToolVersion | None] = {}
for version in self.version_repository.list_versions():
normalized_tool_name = str(version.tool_name or "").strip().lower()
if normalized_tool_name in latest_by_tool_name:
continue
if statuses is not None and version.status not in statuses:
latest_by_tool_name[normalized_tool_name] = None
continue
latest_by_tool_name[normalized_tool_name] = version
return [version for version in latest_by_tool_name.values() if version is not None]
def _serialize_review_queue_entry(
self,
version: ToolVersion,
*,
worker_execution: dict | None = None,
) -> dict:
metadata = (
self.metadata_repository.get_by_tool_version_id(version.id)
if self.metadata_repository is not None
else None
)
display_name = metadata.display_name if metadata is not None else version.tool_name.replace("_", " ").title()
automated_validation = self._extract_latest_automated_validation(version.id)
gate = self._build_review_gate(version.status)
automated_validation_status = automated_validation.get("status")
automated_validation_summary = automated_validation.get("summary")
effective_worker_execution = worker_execution
if effective_worker_execution is None and version.status in {
ToolLifecycleStatus.DRAFT,
ToolLifecycleStatus.FAILED,
}:
effective_worker_execution = self._get_generation_pipeline_worker_execution(version.version_id)
if isinstance(effective_worker_execution, dict):
dispatch_state = str(effective_worker_execution.get("dispatch_state") or "").strip().lower()
if dispatch_state == "queued":
gate = "generation_pipeline_queued"
automated_validation_status = "pending"
automated_validation_summary = "Pipeline enfileirada no worker dedicado aguardando execucao."
elif dispatch_state == "running":
gate = "generation_pipeline_running"
automated_validation_status = "running"
automated_validation_summary = "Pipeline em execucao no worker dedicado do admin."
elif dispatch_state == "failed":
gate = "generation_worker_failed"
automated_validation_status = "failed"
automated_validation_summary = effective_worker_execution.get("last_error") or (
"O worker dedicado falhou antes de concluir a pipeline."
)
return {
"entry_id": version.version_id,
"version_id": version.version_id,
"version_number": version.version_number,
"tool_name": version.tool_name,
"display_name": display_name,
"status": version.status,
"gate": gate,
"summary": version.summary,
"owner_name": version.owner_display_name,
"automated_validation_status": automated_validation_status,
"automated_validation_summary": automated_validation_summary,
"queued_at": version.updated_at or version.created_at,
}
def _version_has_generated_source(self, version_id: str) -> bool:
if self.version_repository is None or self.artifact_repository is None:
return False
normalized_version_id = str(version_id or "").strip().lower()
version = self.version_repository.get_by_version_id(normalized_version_id)
if version is None:
return False
validation_artifact = self.artifact_repository.get_by_tool_version_and_kind(
version.id,
ToolArtifactKind.VALIDATION_REPORT,
)
if validation_artifact is None:
return False
generated_source_code = str((validation_artifact.payload_json or {}).get("generated_source_code") or "").strip()
return bool(generated_source_code)
def _find_latest_archived_version(
self,
*,
tool_name: str,
excluding_version_id: int | None = None,
) -> ToolVersion | None:
if self.version_repository is None:
return None
for archived_version in self.version_repository.list_versions(
tool_name=tool_name,
statuses=(ToolLifecycleStatus.ARCHIVED,),
):
if excluding_version_id is not None and archived_version.id == excluding_version_id:
continue
return archived_version
return None
@staticmethod
def _normalize_human_decision_notes(decision_notes: str) -> str:
normalized_notes = str(decision_notes or "").strip()
if len(normalized_notes) < _HUMAN_DECISION_NOTES_MIN_LENGTH:
raise ValueError(
"A decisao humana precisa registrar um parecer com pelo menos "
f"{_HUMAN_DECISION_NOTES_MIN_LENGTH} caracteres."
)
return normalized_notes
def _ensure_human_governance_ready_for_activation(self, tool_version_id: int) -> None:
if self.artifact_repository is None:
raise RuntimeError(
"A ativacao governada exige trilha de auditoria habilitada para validar a aprovacao humana."
)
review_artifact = self.artifact_repository.get_by_tool_version_and_kind(
tool_version_id,
ToolArtifactKind.DIRECTOR_REVIEW,
)
approval_artifact = self.artifact_repository.get_by_tool_version_and_kind(
tool_version_id,
ToolArtifactKind.DIRECTOR_APPROVAL,
)
review_payload = dict(review_artifact.payload_json or {}) if review_artifact is not None else {}
approval_payload = dict(approval_artifact.payload_json or {}) if approval_artifact is not None else {}
if not review_payload.get("decision_notes") or not bool(review_payload.get("reviewed_generated_code")):
raise ValueError(
"A ativacao exige uma revisao humana registrada com parecer e confirmacao de leitura do codigo gerado."
)
if not approval_payload.get("decision_notes"):
raise ValueError(
"A ativacao exige uma aprovacao humana registrada com parecer explicito da diretoria."
)
def _build_human_review_gate(self, version: ToolVersion) -> dict:
rollback_candidate = None
if version.status == ToolLifecycleStatus.ACTIVE:
rollback_candidate = self._find_latest_archived_version(
tool_name=version.tool_name,
excluding_version_id=version.id,
)
return {
"current_gate": ToolManagementService._build_review_gate(version.status),
"review_action_available": version.status == ToolLifecycleStatus.GENERATED,
"approval_action_available": version.status == ToolLifecycleStatus.VALIDATED,
"publication_action_available": version.status == ToolLifecycleStatus.APPROVED,
"deactivation_action_available": version.status == ToolLifecycleStatus.ACTIVE,
"rollback_action_available": version.status == ToolLifecycleStatus.ACTIVE and rollback_candidate is not None,
"rollback_target_version_id": rollback_candidate.version_id if rollback_candidate is not None else None,
"rollback_target_version_number": rollback_candidate.version_number if rollback_candidate is not None else None,
"requires_decision_notes": version.status in {
ToolLifecycleStatus.GENERATED,
ToolLifecycleStatus.VALIDATED,
ToolLifecycleStatus.ACTIVE,
},
"requires_code_review_confirmation": version.status == ToolLifecycleStatus.GENERATED,
}
def _build_review_detail_next_steps(
self,
version: ToolVersion,
generated_source_available: bool,
*,
worker_execution: dict | None = None,
) -> list[str]:
status = version.status
dispatch_state = ""
if isinstance(worker_execution, dict):
dispatch_state = str(worker_execution.get("dispatch_state") or "").strip().lower()
if status in {ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.FAILED} and dispatch_state == "queued":
next_steps = [
"A pipeline esta enfileirada no worker dedicado do admin e ainda nao entrou na etapa de validacao.",
"Atualize este detalhe apos a execucao para acompanhar quando a versao sair de draft e entrar em generated.",
]
elif status in {ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.FAILED} and dispatch_state == "running":
next_steps = [
"A pipeline esta sendo executada em background no worker dedicado do admin.",
"Atualize este detalhe quando a geracao concluir para revisar o codigo e as validacoes automaticas.",
]
elif status in {ToolLifecycleStatus.DRAFT, ToolLifecycleStatus.FAILED} and dispatch_state == "failed":
next_steps = [
"O worker dedicado falhou antes de concluir a pipeline. Corrija a causa e reenvie a geracao.",
"Enquanto a execucao dedicada nao concluir com sucesso, a versao permanece fora da revisao humana e da ativacao.",
]
else:
next_steps_by_status = {
ToolLifecycleStatus.DRAFT: [
"Execute a pipeline de geracao para produzir o modulo governado antes da revisao humana.",
"Enquanto a versao estiver em draft, ela permanece fora da aprovacao e da ativacao.",
],
ToolLifecycleStatus.GENERATED: [
"Analise o codigo completo gerado, confirme a leitura manual e registre a revisao da diretoria.",
"Somente depois da revisao humana a versao pode seguir para aprovacao formal.",
],
ToolLifecycleStatus.VALIDATED: [
"Registre o parecer final de aprovacao da diretoria antes da publicacao.",
"A ativacao continua bloqueada ate existir aprovacao humana explicita.",
],
ToolLifecycleStatus.APPROVED: [
"A versao ja foi aprovada pela diretoria e agora pode seguir para publicacao controlada.",
"A ativacao vai validar novamente a trilha de revisao e aprovacao humana antes de entrar no catalogo.",
],
ToolLifecycleStatus.ACTIVE: [
"A versao esta ativa no catalogo governado e pode ser desativada com parecer explicito da diretoria.",
"Quando houver uma versao arquivada anterior, o rollback controlado pode restaurar rapidamente a publicacao anterior.",
],
ToolLifecycleStatus.ARCHIVED: [
"Esta versao foi retirada do catalogo ativo e permanece arquivada para historico e auditoria.",
"A diretoria pode restaurar uma versao arquivada por rollback controlado a partir da publicacao ativa correspondente.",
],
ToolLifecycleStatus.FAILED: [
"Corrija os bloqueios da pipeline e execute uma nova geracao antes de voltar para a revisao humana.",
"Enquanto a versao estiver em failed, a aprovacao e a ativacao permanecem indisponiveis.",
],
}
next_steps = list(
next_steps_by_status.get(status, ["Acompanhe a governanca da versao pela trilha administrativa."])
)
if status == ToolLifecycleStatus.ACTIVE:
rollback_candidate = self._find_latest_archived_version(
tool_name=version.tool_name,
excluding_version_id=version.id,
)
if rollback_candidate is not None:
next_steps.append(
f"Ha uma versao arquivada disponivel para rollback: v{rollback_candidate.version_number}."
)
if not generated_source_available:
next_steps.append("O codigo completo aparece aqui assim que a pipeline gerar e registrar a funcao governada.")
return next_steps
def _list_governance_history_entries(self, tool_version_id: int) -> list[dict]:
if self.artifact_repository is None:
return []
history_entries = self.artifact_repository.list_artifacts(
tool_version_id=tool_version_id,
artifact_stage=ToolArtifactStage.GOVERNANCE,
)
return [
self._serialize_governance_history_entry(artifact)
for artifact in reversed(history_entries)
]
@staticmethod
def _serialize_governance_history_entry(artifact) -> dict:
payload = dict(artifact.payload_json or {})
label_by_kind = {
ToolArtifactKind.DIRECTOR_REVIEW: "Revisao humana registrada",
ToolArtifactKind.DIRECTOR_APPROVAL: "Aprovacao humana registrada",
ToolArtifactKind.PUBLICATION_RELEASE: "Publicacao administrativa registrada",
ToolArtifactKind.PUBLICATION_DEACTIVATION: "Desativacao registrada",
ToolArtifactKind.PUBLICATION_ROLLBACK: "Rollback registrado",
}
return {
"action_key": artifact.artifact_kind.value,
"label": label_by_kind.get(artifact.artifact_kind, "Governanca registrada"),
"summary": artifact.summary,
"previous_status": payload.get("previous_status"),
"current_status": payload.get("current_status"),
"actor_name": payload.get("actor_display_name"),
"actor_role": payload.get("actor_role"),
"decision_notes": payload.get("decision_notes"),
"reviewed_generated_code": payload.get("reviewed_generated_code"),
"recorded_at": artifact.updated_at or artifact.created_at,
}
@staticmethod
def _build_review_gate(status: ToolLifecycleStatus) -> str:
gate_by_status = {
ToolLifecycleStatus.DRAFT: "generation_pipeline_required",
ToolLifecycleStatus.GENERATED: "validation_required",
ToolLifecycleStatus.VALIDATED: "director_approval_required",
ToolLifecycleStatus.APPROVED: "director_publication_required",
ToolLifecycleStatus.ACTIVE: "publication_active",
ToolLifecycleStatus.ARCHIVED: "archived_history",
ToolLifecycleStatus.FAILED: "pipeline_retry_required",
}
return gate_by_status.get(status, "governance_required")
def _extract_latest_automated_validation(self, tool_version_id: int) -> dict:
if self.artifact_repository is None:
return {}
validation_artifact = self.artifact_repository.get_by_tool_version_and_kind(
tool_version_id,
ToolArtifactKind.VALIDATION_REPORT,
)
if validation_artifact is None:
return {}
automated_checks = list((validation_artifact.payload_json or {}).get("automated_checks") or [])
if not automated_checks:
return {}
passed_count = sum(
1
for check in automated_checks
if str((check or {}).get("status") or "").strip().lower() == "passed"
)
total_checks = len(automated_checks)
overall_status = "passed" if passed_count == total_checks else "failed"
if overall_status == "passed":
summary = f"{passed_count}/{total_checks} validacoes automaticas passaram antes da revisao humana."
else:
failed_labels = [
str((check or {}).get("label") or "validacao automatica").strip().lower()
for check in automated_checks
if str((check or {}).get("status") or "").strip().lower() != "passed"
]
summary = f"{passed_count}/{total_checks} validacoes automaticas passaram; revisar {', '.join(failed_labels)}."
return {
"status": overall_status,
"summary": summary,
}
def _list_latest_metadata_entries(
self,
*,
statuses: tuple[ToolLifecycleStatus, ...] | None = None,
) -> list[ToolMetadata]:
if self.metadata_repository is None:
return []
latest_by_tool_name: dict[str, ToolMetadata] = {}
for metadata in self.metadata_repository.list_metadata(statuses=statuses):
normalized_tool_name = str(metadata.tool_name or "").strip().lower()
if normalized_tool_name in latest_by_tool_name:
continue
latest_by_tool_name[normalized_tool_name] = metadata
return list(latest_by_tool_name.values())
def _serialize_metadata_publication(self, metadata: ToolMetadata) -> dict:
parameters = self._serialize_parameters_for_response(metadata.parameters_json)
version_record = None
if self.version_repository is not None:
for candidate in self.version_repository.list_versions(tool_name=metadata.tool_name):
if candidate.id == metadata.tool_version_id:
version_record = candidate
break
rollback_candidate = None
if metadata.status == ToolLifecycleStatus.ACTIVE:
rollback_candidate = self._find_latest_archived_version(
tool_name=metadata.tool_name,
excluding_version_id=metadata.tool_version_id,
)
return {
"publication_id": metadata.metadata_id,
"tool_name": metadata.tool_name,
"display_name": metadata.display_name,
"description": metadata.description,
"domain": metadata.domain,
"version": metadata.version_number,
"status": metadata.status,
"version_id": version_record.version_id if version_record is not None else None,
"parameter_count": len(parameters),
"parameters": parameters,
"author_name": metadata.author_display_name,
"implementation_module": build_generated_tool_module_name(metadata.tool_name),
"implementation_callable": GENERATED_TOOL_ENTRYPOINT,
"published_by": metadata.author_display_name,
"published_at": metadata.updated_at or metadata.created_at,
"deactivation_action_available": metadata.status == ToolLifecycleStatus.ACTIVE and version_record is not None,
"rollback_action_available": metadata.status == ToolLifecycleStatus.ACTIVE and rollback_candidate is not None,
"rollback_target_version_id": rollback_candidate.version_id if rollback_candidate is not None else None,
"rollback_target_version_number": rollback_candidate.version_number if rollback_candidate is not None else None,
}
def _serialize_draft_summary(self, draft: ToolDraft) -> dict:
return {
"draft_id": draft.draft_id,
"tool_name": draft.tool_name,
"display_name": draft.display_name,
"status": draft.status,
"summary": draft.summary,
"current_version_number": draft.current_version_number,
"version_count": draft.version_count,
"owner_name": draft.owner_display_name,
"updated_at": draft.updated_at,
}
def _serialize_draft_preview(
self,
draft: ToolDraft,
version: ToolVersion | None = None,
) -> dict:
parameters = self._serialize_parameters_for_response(draft.parameters_json)
version_id = version.version_id if version is not None else self._build_preview_version_id(
draft.tool_name,
draft.current_version_number,
)
version_number = version.version_number if version is not None else draft.current_version_number
return {
"draft_id": draft.draft_id,
"version_id": version_id,
"tool_name": draft.tool_name,
"display_name": draft.display_name,
"domain": draft.domain,
"status": draft.status,
"summary": draft.summary,
"business_goal": draft.business_goal,
"version_number": version_number,
"version_count": draft.version_count,
"parameter_count": len(parameters),
"required_parameter_count": draft.required_parameter_count,
"generation_model": version.generation_model if version is not None else draft.generation_model,
"requires_director_approval": draft.requires_director_approval,
"owner_name": draft.owner_display_name,
"parameters": parameters,
}
@staticmethod
def _serialize_parameters_for_storage(parameters: list[dict]) -> list[dict]:
return [
{
"name": parameter["name"],
"parameter_type": parameter["parameter_type"].value,
"description": parameter["description"],
"required": parameter["required"],
}
for parameter in parameters
]
@staticmethod
def _serialize_parameters_for_response(parameters_json: list[dict] | None) -> list[dict]:
return [
{
"name": str((parameter or {}).get("name") or "").strip().lower(),
"parameter_type": ToolParameterType(str((parameter or {}).get("parameter_type") or "string").strip().lower()),
"description": str((parameter or {}).get("description") or "").strip(),
"required": bool((parameter or {}).get("required", True)),
}
for parameter in (parameters_json or [])
]
@staticmethod
def _build_draft_summary(payload: dict) -> str:
return (
f"{payload['display_name']} pronta para seguir como draft com {len(payload['parameters'])} parametro(s) e revisao obrigatoria de diretor."
)
@staticmethod
def _build_preview_version_id(tool_name: str, version_number: int) -> str:
return f"tool_version::{str(tool_name or '').strip().lower()}::v{int(version_number)}"
def _resolve_next_version_number(
self,
tool_name: str,
existing_draft: ToolDraft | None,
) -> int:
repository_version = (
self.version_repository.get_next_version_number(tool_name)
if self.version_repository is not None
else 1
)
if existing_draft is None:
return repository_version
return max(repository_version, existing_draft.current_version_number + 1)
def _normalize_draft_payload(self, payload: dict) -> dict:
tool_name = str(payload.get("tool_name") or "").strip().lower()
if not _TOOL_NAME_PATTERN.fullmatch(tool_name):
raise ValueError("tool_name deve usar snake_case minusculo com 3 a 64 caracteres.")
if tool_name in _RESERVED_CORE_TOOL_NAMES:
raise ValueError(
"tool_name reservado pelo catalogo core do sistema. Gere uma nova tool sem sobrescrever uma capability interna."
)
display_name = str(payload.get("display_name") or "").strip()
if len(display_name) < 4:
raise ValueError("display_name precisa ter pelo menos 4 caracteres.")
domain = str(payload.get("domain") or "").strip().lower()
valid_domains = {option.value for option in INTAKE_DOMAIN_OPTIONS}
if domain not in valid_domains:
raise ValueError("Selecione um dominio valido para a nova tool.")
description = str(payload.get("description") or "").strip()
if len(description) < 16:
raise ValueError("A descricao precisa ter pelo menos 16 caracteres para contextualizar a tool.")
business_goal = str(payload.get("business_goal") or "").strip()
if len(business_goal) < 12:
raise ValueError("Explique o objetivo operacional da tool com pelo menos 12 caracteres.")
raw_parameters = payload.get("parameters") or []
if not isinstance(raw_parameters, list):
raise ValueError("Os parametros enviados para a tool sao invalidos.")
seen_parameter_names: set[str] = set()
parameters: list[dict] = []
for raw_parameter in raw_parameters:
name = str((raw_parameter or {}).get("name") or "").strip().lower()
if not name:
continue
if not _PARAMETER_NAME_PATTERN.fullmatch(name):
raise ValueError("Cada parametro deve usar snake_case minusculo com pelo menos 2 caracteres.")
if name in seen_parameter_names:
raise ValueError("Nao e permitido repetir nomes de parametro na mesma tool.")
seen_parameter_names.add(name)
raw_parameter_type = (raw_parameter or {}).get("parameter_type") or ""
parameter_type = (
raw_parameter_type
if isinstance(raw_parameter_type, ToolParameterType)
else ToolParameterType(str(raw_parameter_type).strip().lower())
)
parameter_description = str((raw_parameter or {}).get("description") or "").strip()
if len(parameter_description) < 8:
raise ValueError("Cada parametro precisa de uma descricao com pelo menos 8 caracteres.")
parameters.append(
{
"name": name,
"parameter_type": parameter_type,
"description": parameter_description,
"required": bool((raw_parameter or {}).get("required", True)),
}
)
if len(parameters) > 10:
raise ValueError("A fase inicial do painel aceita no maximo 10 parametros por tool.")
return {
"tool_name": tool_name,
"display_name": display_name,
"domain": domain,
"description": description,
"business_goal": business_goal,
"generation_model": (
str(payload.get("generation_model") or "").strip() or None
),
"parameters": parameters,
}
def _build_intake_warnings(self, payload: dict) -> list[str]:
warnings: list[str] = []
parameters = payload["parameters"]
if not parameters:
warnings.append("A tool foi cadastrada sem parametros. Confirme se a acao realmente nao exige entrada contextual.")
if len(parameters) >= 6:
warnings.append("A quantidade de parametros ja pede uma revisao mais cuidadosa antes da aprovacao de diretor.")
if any(parameter["parameter_type"] in {ToolParameterType.OBJECT, ToolParameterType.ARRAY} for parameter in parameters):
warnings.append("Parametros compostos exigem atencao extra na revisao porque podem esconder payloads mais sensiveis.")
if payload["domain"] == "orquestracao":
warnings.append("Tools de orquestracao precisam confirmar claramente como afetam o fluxo do bot antes da ativacao.")
return warnings