You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_admin_tool_management_...

1374 lines
67 KiB
Python

import unittest
from unittest.mock import patch
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from admin_app.core import AdminSettings
from admin_app.db.database import AdminBase
from admin_app.db.models import StaffAccount, ToolArtifact, ToolDraft, ToolMetadata, ToolVersion
from admin_app.db.models.tool_artifact import ToolArtifactKind
from admin_app.repositories.tool_artifact_repository import ToolArtifactRepository
from admin_app.repositories.tool_draft_repository import ToolDraftRepository
from admin_app.repositories.tool_metadata_repository import ToolMetadataRepository
from admin_app.repositories.tool_version_repository import ToolVersionRepository
from admin_app.services.tool_management_service import ToolManagementService
from shared.contracts import (
AdminPermission,
GENERATED_TOOL_ENTRYPOINT,
GENERATED_TOOLS_PACKAGE,
StaffRole,
ToolLifecycleStatus,
ToolParameterType,
build_generated_tool_module_name,
build_generated_tool_module_path,
)
class _FakeToolDraftRepository:
def __init__(self):
self.drafts: list[ToolDraft] = []
self.next_id = 1
def list_drafts(self, *, statuses=None) -> list[ToolDraft]:
drafts = sorted(
self.drafts,
key=lambda draft: draft.updated_at or draft.created_at or datetime.min.replace(tzinfo=timezone.utc),
reverse=True,
)
if statuses:
allowed = set(statuses)
drafts = [draft for draft in drafts if draft.status in allowed]
return drafts
def get_by_tool_name(self, tool_name: str) -> ToolDraft | None:
normalized = str(tool_name or "").strip().lower()
for draft in self.drafts:
if draft.tool_name == normalized:
return draft
return None
def create(
self,
*,
tool_name: str,
display_name: str,
domain: str,
description: str,
business_goal: str,
summary: str,
parameters_json: list[dict],
required_parameter_count: int,
current_version_number: int,
version_count: int,
owner_staff_account_id: int,
owner_display_name: str,
requires_director_approval: bool = True,
commit: bool = True,
) -> ToolDraft:
now = datetime(2026, 3, 31, 15, 0, tzinfo=timezone.utc)
draft = ToolDraft(
id=self.next_id,
draft_id=f"draft_fake_{self.next_id}",
tool_name=tool_name,
display_name=display_name,
domain=domain,
description=description,
business_goal=business_goal,
status=ToolLifecycleStatus.DRAFT,
summary=summary,
parameters_json=parameters_json,
required_parameter_count=required_parameter_count,
current_version_number=current_version_number,
version_count=version_count,
requires_director_approval=requires_director_approval,
owner_staff_account_id=owner_staff_account_id,
owner_display_name=owner_display_name,
created_at=now,
updated_at=now,
)
self.next_id += 1
self.drafts.append(draft)
return draft
def update_submission(
self,
draft: ToolDraft,
*,
display_name: str,
domain: str,
description: str,
business_goal: str,
summary: str,
parameters_json: list[dict],
required_parameter_count: int,
current_version_number: int,
version_count: int,
owner_staff_account_id: int,
owner_display_name: str,
requires_director_approval: bool = True,
commit: bool = True,
) -> ToolDraft:
draft.display_name = display_name
draft.domain = domain
draft.description = description
draft.business_goal = business_goal
draft.status = ToolLifecycleStatus.DRAFT
draft.summary = summary
draft.parameters_json = parameters_json
draft.required_parameter_count = required_parameter_count
draft.current_version_number = current_version_number
draft.version_count = version_count
draft.requires_director_approval = requires_director_approval
draft.owner_staff_account_id = owner_staff_account_id
draft.owner_display_name = owner_display_name
draft.updated_at = datetime(2026, 3, 31, 15, current_version_number, tzinfo=timezone.utc)
return draft
def update_status(self, draft: ToolDraft, *, status: ToolLifecycleStatus, commit: bool = True) -> ToolDraft:
draft.status = status
draft.updated_at = datetime(2026, 3, 31, 15, draft.current_version_number, 30, tzinfo=timezone.utc)
return draft
class _FakeToolVersionRepository:
def __init__(self):
self.versions: list[ToolVersion] = []
self.next_id = 1
def list_versions(self, *, tool_name=None, draft_id=None, statuses=None) -> list[ToolVersion]:
versions = sorted(
self.versions,
key=lambda version: (
version.version_number,
version.updated_at or version.created_at or datetime.min.replace(tzinfo=timezone.utc),
),
reverse=True,
)
if tool_name:
normalized = str(tool_name).strip().lower()
versions = [version for version in versions if version.tool_name == normalized]
if draft_id is not None:
versions = [version for version in versions if version.draft_id == draft_id]
if statuses:
allowed = set(statuses)
versions = [version for version in versions if version.status in allowed]
return versions
def get_next_version_number(self, tool_name: str) -> int:
versions = self.list_versions(tool_name=tool_name)
return (versions[0].version_number if versions else 0) + 1
def get_by_version_id(self, version_id: str) -> ToolVersion | None:
normalized = str(version_id or "").strip().lower()
for version in self.versions:
if version.version_id == normalized:
return version
return None
def create(
self,
*,
draft_id: int,
tool_name: str,
version_number: int,
summary: str,
description: str,
business_goal: str,
parameters_json: list[dict],
required_parameter_count: int,
owner_staff_account_id: int,
owner_display_name: str,
status: ToolLifecycleStatus = ToolLifecycleStatus.DRAFT,
requires_director_approval: bool = True,
commit: bool = True,
) -> ToolVersion:
now = datetime(2026, 3, 31, 16, version_number, tzinfo=timezone.utc)
version = ToolVersion(
id=self.next_id,
version_id=self.build_version_id(tool_name, version_number),
draft_id=draft_id,
tool_name=tool_name,
version_number=version_number,
status=status,
summary=summary,
description=description,
business_goal=business_goal,
parameters_json=parameters_json,
required_parameter_count=required_parameter_count,
requires_director_approval=requires_director_approval,
owner_staff_account_id=owner_staff_account_id,
owner_display_name=owner_display_name,
created_at=now,
updated_at=now,
)
self.next_id += 1
self.versions.append(version)
return version
def update_status(self, version: ToolVersion, *, status: ToolLifecycleStatus, commit: bool = True) -> ToolVersion:
version.status = status
version.updated_at = datetime(2026, 3, 31, 16, version.version_number, 30, tzinfo=timezone.utc)
return version
@staticmethod
def build_version_id(tool_name: str, version_number: int) -> str:
normalized = str(tool_name or "").strip().lower()
return f"tool_version::{normalized}::v{int(version_number)}"
class _FakeToolMetadataRepository:
def __init__(self):
self.metadata_entries: list[ToolMetadata] = []
self.next_id = 1
def list_metadata(self, *, tool_name=None, statuses=None) -> list[ToolMetadata]:
metadata_entries = sorted(
self.metadata_entries,
key=lambda metadata: (
metadata.version_number,
metadata.updated_at or metadata.created_at or datetime.min.replace(tzinfo=timezone.utc),
),
reverse=True,
)
if tool_name:
normalized = str(tool_name).strip().lower()
metadata_entries = [metadata for metadata in metadata_entries if metadata.tool_name == normalized]
if statuses:
allowed = set(statuses)
metadata_entries = [metadata for metadata in metadata_entries if metadata.status in allowed]
return metadata_entries
def get_by_tool_version_id(self, tool_version_id: int) -> ToolMetadata | None:
for metadata in self.metadata_entries:
if metadata.tool_version_id == tool_version_id:
return metadata
return None
def create(self, **kwargs) -> ToolMetadata:
version_number = kwargs["version_number"]
now = datetime(2026, 3, 31, 17, version_number, tzinfo=timezone.utc)
metadata = ToolMetadata(
id=self.next_id,
metadata_id=self.build_metadata_id(kwargs["tool_name"], version_number),
created_at=now,
updated_at=now,
**kwargs,
)
self.next_id += 1
self.metadata_entries.append(metadata)
return metadata
def update_metadata(self, metadata: ToolMetadata, **kwargs) -> ToolMetadata:
metadata.display_name = kwargs["display_name"]
metadata.domain = kwargs["domain"]
metadata.description = kwargs["description"]
metadata.parameters_json = kwargs["parameters_json"]
metadata.status = kwargs["status"]
metadata.author_staff_account_id = kwargs["author_staff_account_id"]
metadata.author_display_name = kwargs["author_display_name"]
metadata.updated_at = datetime(2026, 3, 31, 17, metadata.version_number, tzinfo=timezone.utc)
return metadata
def update_status(self, metadata: ToolMetadata, *, status: ToolLifecycleStatus, commit: bool = True) -> ToolMetadata:
metadata.status = status
metadata.updated_at = datetime(2026, 3, 31, 17, metadata.version_number, 30, tzinfo=timezone.utc)
return metadata
def upsert_version_metadata(self, **kwargs) -> ToolMetadata:
existing = self.get_by_tool_version_id(kwargs["tool_version_id"])
if existing is None:
return self.create(**kwargs)
return self.update_metadata(existing, **kwargs)
@staticmethod
def build_metadata_id(tool_name: str, version_number: int) -> str:
normalized = str(tool_name or "").strip().lower()
return f"tool_metadata::{normalized}::v{int(version_number)}"
class _FakeToolArtifactRepository:
def __init__(self):
self.artifacts: list[ToolArtifact] = []
self.next_id = 1
def list_artifacts(self, *, tool_name=None, tool_version_id=None, artifact_stage=None, artifact_kind=None) -> list[ToolArtifact]:
artifacts = sorted(
self.artifacts,
key=lambda artifact: (
artifact.version_number,
artifact.updated_at or artifact.created_at or datetime.min.replace(tzinfo=timezone.utc),
),
reverse=True,
)
if tool_name:
normalized = str(tool_name).strip().lower()
artifacts = [artifact for artifact in artifacts if artifact.tool_name == normalized]
if tool_version_id is not None:
artifacts = [artifact for artifact in artifacts if artifact.tool_version_id == tool_version_id]
if artifact_stage:
artifacts = [artifact for artifact in artifacts if artifact.artifact_stage == artifact_stage]
if artifact_kind:
artifacts = [artifact for artifact in artifacts if artifact.artifact_kind == artifact_kind]
return artifacts
def get_by_tool_version_and_kind(self, tool_version_id: int, artifact_kind) -> ToolArtifact | None:
for artifact in self.artifacts:
if artifact.tool_version_id == tool_version_id and artifact.artifact_kind == artifact_kind:
return artifact
return None
def create(self, **kwargs) -> ToolArtifact:
version_number = kwargs["version_number"]
now = datetime(2026, 3, 31, 18, version_number, self.next_id, tzinfo=timezone.utc)
artifact = ToolArtifact(
id=self.next_id,
artifact_id=self.build_artifact_id(kwargs["tool_name"], version_number, kwargs["artifact_kind"]),
created_at=now,
updated_at=now,
checksum=kwargs.get("checksum") or f"fake-checksum-{self.next_id}",
**kwargs,
)
self.next_id += 1
self.artifacts.append(artifact)
return artifact
def update_artifact(self, artifact: ToolArtifact, **kwargs) -> ToolArtifact:
artifact.artifact_status = kwargs["artifact_status"]
artifact.storage_kind = kwargs.get("storage_kind", artifact.storage_kind)
artifact.summary = kwargs["summary"]
artifact.payload_json = kwargs["payload_json"]
artifact.checksum = kwargs.get("checksum", artifact.checksum)
artifact.author_staff_account_id = kwargs["author_staff_account_id"]
artifact.author_display_name = kwargs["author_display_name"]
artifact.updated_at = datetime(2026, 3, 31, 18, artifact.version_number, tzinfo=timezone.utc)
return artifact
def upsert_version_artifact(self, **kwargs) -> ToolArtifact:
existing = self.get_by_tool_version_and_kind(kwargs["tool_version_id"], kwargs["artifact_kind"])
if existing is None:
return self.create(**kwargs)
return self.update_artifact(existing, **kwargs)
@staticmethod
def build_artifact_id(tool_name: str, version_number: int, artifact_kind) -> str:
normalized = str(tool_name or "").strip().lower()
return f"tool_artifact::{normalized}::v{int(version_number)}::{artifact_kind.value}"
class _FailingToolArtifactRepository(ToolArtifactRepository):
def __init__(self, db, *, fail_on_call: int):
super().__init__(db)
self.fail_on_call = fail_on_call
self.calls = 0
def upsert_version_artifact(self, **kwargs):
self.calls += 1
if self.calls == self.fail_on_call:
raise RuntimeError("artifact persistence failure")
return super().upsert_version_artifact(**kwargs)
class AdminToolManagementServiceTests(unittest.TestCase):
def setUp(self):
self.draft_repository = _FakeToolDraftRepository()
self.version_repository = _FakeToolVersionRepository()
self.metadata_repository = _FakeToolMetadataRepository()
self.artifact_repository = _FakeToolArtifactRepository()
self.service = ToolManagementService(
settings=AdminSettings(admin_api_prefix="/admin"),
draft_repository=self.draft_repository,
version_repository=self.version_repository,
metadata_repository=self.metadata_repository,
artifact_repository=self.artifact_repository,
)
def test_create_draft_submission_persists_initial_tool_version_metadata_and_artifacts(self):
payload = self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_vendas_periodo",
"display_name": "Consultar vendas por periodo",
"description": "Consulta vendas consolidadas por periodo informado no painel.",
"business_goal": "Ajudar o time interno a acompanhar o desempenho comercial com mais agilidade.",
"parameters": [
{
"name": "periodo_inicio",
"parameter_type": "string",
"description": "Data inicial usada no filtro.",
"required": True,
},
{
"name": "periodo_fim",
"parameter_type": "string",
"description": "Data final usada no filtro.",
"required": True,
},
],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
owner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["storage_status"], "admin_database")
self.assertEqual(payload["submission_policy"]["mode"], "draft_only")
self.assertEqual(payload["submission_policy"]["submitter_role"], StaffRole.COLABORADOR)
self.assertFalse(payload["submission_policy"]["submitter_can_publish_now"])
self.assertTrue(payload["submission_policy"]["direct_publication_blocked"])
self.assertEqual(payload["submission_policy"]["required_approver_role"], StaffRole.DIRETOR)
self.assertEqual(payload["submission_policy"]["required_publish_permission"], AdminPermission.PUBLISH_TOOLS)
self.assertEqual(payload["draft_preview"]["draft_id"], "draft_fake_1")
self.assertEqual(payload["draft_preview"]["version_id"], "tool_version::consultar_vendas_periodo::v1")
self.assertEqual(payload["draft_preview"]["version_number"], 1)
self.assertEqual(payload["draft_preview"]["version_count"], 1)
self.assertEqual(payload["draft_preview"]["status"], ToolLifecycleStatus.DRAFT)
self.assertEqual(payload["draft_preview"]["owner_name"], "Equipe Interna")
self.assertEqual(len(self.draft_repository.drafts), 1)
self.assertEqual(len(self.version_repository.versions), 1)
self.assertEqual(len(self.metadata_repository.metadata_entries), 1)
self.assertEqual(self.metadata_repository.metadata_entries[0].author_display_name, "Equipe Interna")
self.assertEqual(self.metadata_repository.metadata_entries[0].version_number, 1)
self.assertEqual(len(self.artifact_repository.artifacts), 2)
artifacts_by_kind = {
artifact.artifact_kind: artifact
for artifact in self.artifact_repository.artifacts
}
generation_artifact = artifacts_by_kind[ToolArtifactKind.GENERATION_REQUEST]
validation_artifact = artifacts_by_kind[ToolArtifactKind.VALIDATION_REPORT]
self.assertEqual(generation_artifact.tool_name, "consultar_vendas_periodo")
self.assertEqual(generation_artifact.payload_json["target_package"], GENERATED_TOOLS_PACKAGE)
self.assertEqual(
generation_artifact.payload_json["target_module"],
build_generated_tool_module_name("consultar_vendas_periodo"),
)
self.assertEqual(
generation_artifact.payload_json["target_file_path"],
build_generated_tool_module_path("consultar_vendas_periodo"),
)
self.assertEqual(generation_artifact.payload_json["target_callable"], GENERATED_TOOL_ENTRYPOINT)
self.assertEqual(generation_artifact.payload_json["reserved_lifecycle_target"], "generated")
self.assertEqual(generation_artifact.payload_json["parameters"][0]["name"], "periodo_inicio")
self.assertEqual(validation_artifact.payload_json["validation_status"], "passed")
self.assertEqual(validation_artifact.payload_json["parameter_count"], 2)
self.assertEqual(validation_artifact.author_display_name, "Equipe Interna")
self.assertTrue(generation_artifact.checksum)
self.assertTrue(validation_artifact.checksum)
def test_create_draft_submission_blocks_tool_name_reserved_by_core_catalog(self):
with self.assertRaisesRegex(ValueError, "catalogo core do sistema"):
self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_estoque",
"display_name": "Consultar estoque paralelo",
"description": "Tentativa de sobrescrever a tool core de estoque com uma versao administrativa.",
"business_goal": "Validar que o admin nao permite reutilizar nomes reservados do runtime core.",
"parameters": [],
},
owner_staff_account_id=11,
owner_name="Equipe Interna",
)
self.assertEqual(len(self.draft_repository.drafts), 0)
self.assertEqual(len(self.version_repository.versions), 0)
self.assertEqual(len(self.metadata_repository.metadata_entries), 0)
self.assertEqual(len(self.artifact_repository.artifacts), 0)
def test_create_draft_submission_reuses_root_draft_and_increments_version(self):
self.service.create_draft_submission(
{
"domain": "locacao",
"tool_name": "emitir_resumo_locacao",
"display_name": "Emitir resumo de locacao",
"description": "Resume o contrato atual de locacao para consulta administrativa.",
"business_goal": "Dar visibilidade rapida ao status do contrato e dos dados principais.",
"parameters": [],
},
owner_staff_account_id=3,
owner_name="Analista de Locacao",
)
payload = self.service.create_draft_submission(
{
"domain": "locacao",
"tool_name": "emitir_resumo_locacao",
"display_name": "Emitir resumo de locacao",
"description": "Resume o contrato atual de locacao e os principais eventos administrativos.",
"business_goal": "Dar visibilidade rapida ao status do contrato, do pagamento e dos dados principais.",
"parameters": [
{
"name": "contrato_id",
"parameter_type": "string",
"description": "Identificador do contrato consultado.",
"required": True,
}
],
},
owner_staff_account_id=4,
owner_name="Coordenacao de Locacao",
)
self.assertEqual(payload["draft_preview"]["version_id"], "tool_version::emitir_resumo_locacao::v2")
self.assertEqual(payload["draft_preview"]["version_number"], 2)
self.assertEqual(payload["draft_preview"]["version_count"], 2)
self.assertEqual(len(self.draft_repository.drafts), 1)
self.assertEqual(len(self.version_repository.versions), 2)
self.assertEqual(len(self.metadata_repository.metadata_entries), 2)
self.assertEqual(len(self.artifact_repository.artifacts), 4)
self.assertEqual(self.draft_repository.drafts[0].current_version_number, 2)
self.assertEqual(self.draft_repository.drafts[0].version_count, 2)
self.assertEqual(self.draft_repository.drafts[0].owner_display_name, "Coordenacao de Locacao")
def test_build_drafts_payload_returns_versioned_draft_summaries(self):
self.service.create_draft_submission(
{
"domain": "orquestracao",
"tool_name": "priorizar_contato_quente",
"display_name": "Priorizar contato quente",
"description": "Classifica contatos mais quentes para orientar o proximo passo do atendimento.",
"business_goal": "Dar mais foco comercial ao time interno ao identificar oportunidades mais urgentes.",
"parameters": [],
},
owner_staff_account_id=5,
owner_name="Equipe de Tools",
)
self.service.create_draft_submission(
{
"domain": "orquestracao",
"tool_name": "priorizar_contato_quente",
"display_name": "Priorizar contato quente",
"description": "Classifica contatos mais quentes com sinais adicionais para orientar o atendimento.",
"business_goal": "Dar mais foco comercial ao time interno ao identificar oportunidades quentes com mais contexto.",
"parameters": [],
},
owner_staff_account_id=6,
owner_name="Diretoria Comercial",
)
payload = self.service.build_drafts_payload()
self.assertEqual(payload["storage_status"], "admin_database")
self.assertEqual(len(payload["drafts"]), 1)
self.assertEqual(payload["drafts"][0]["tool_name"], "priorizar_contato_quente")
self.assertEqual(payload["drafts"][0]["current_version_number"], 2)
self.assertEqual(payload["drafts"][0]["version_count"], 2)
self.assertEqual(payload["drafts"][0]["owner_name"], "Diretoria Comercial")
self.assertEqual(payload["supported_statuses"], [ToolLifecycleStatus.DRAFT])
def test_build_publications_payload_keeps_bootstrap_catalog_for_draft_metadata(self):
self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
)
payload = self.service.build_publications_payload()
self.assertEqual(payload["source"], "bootstrap_catalog")
self.assertEqual(payload["target_service"], "product")
self.assertGreaterEqual(len(payload["publications"]), 10)
self.assertNotIn("consultar_revisao_aberta", [item["tool_name"] for item in payload["publications"]])
def test_build_publications_payload_merges_active_governed_publications_with_bootstrap_catalog(self):
self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
)
self.metadata_repository.metadata_entries[0].status = ToolLifecycleStatus.ACTIVE
payload = self.service.build_publications_payload()
self.assertEqual(payload["source"], "hybrid_runtime_catalog")
self.assertEqual(payload["target_service"], "product")
self.assertGreaterEqual(len(payload["publications"]), 11)
self.assertIn("consultar_estoque", [item["tool_name"] for item in payload["publications"]])
publication = next(
item for item in payload["publications"] if item["tool_name"] == "consultar_revisao_aberta"
)
self.assertEqual(publication["publication_id"], "tool_metadata::consultar_revisao_aberta::v1")
self.assertEqual(publication["version"], 1)
self.assertEqual(publication["status"], ToolLifecycleStatus.ACTIVE)
self.assertEqual(publication["author_name"], "Operacao de Oficina")
self.assertEqual(publication["implementation_module"], build_generated_tool_module_name("consultar_revisao_aberta"))
self.assertEqual(publication["implementation_callable"], GENERATED_TOOL_ENTRYPOINT)
self.assertEqual(publication["parameter_count"], 1)
self.assertEqual(publication["parameters"][0]["parameter_type"], ToolParameterType.STRING)
def test_build_review_queue_payload_returns_latest_version_pending_director_action(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
)
payload = self.service.build_review_queue_payload()
self.assertEqual(payload["queue_mode"], "governed_admin_queue")
self.assertEqual(len(payload["items"]), 1)
self.assertEqual(payload["items"][0]["version_id"], intake_payload["draft_preview"]["version_id"])
self.assertEqual(payload["items"][0]["version_number"], 1)
self.assertEqual(payload["items"][0]["status"], ToolLifecycleStatus.DRAFT)
self.assertEqual(payload["items"][0]["gate"], "generation_pipeline_required")
self.assertIn(ToolLifecycleStatus.APPROVED, payload["supported_statuses"])
def test_run_generation_pipeline_promotes_version_to_generated(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
owner_role=StaffRole.COLABORADOR,
)
payload = self.service.run_generation_pipeline(
intake_payload["draft_preview"]["version_id"],
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["status"], ToolLifecycleStatus.GENERATED)
self.assertEqual(payload["current_step"], "validation")
self.assertEqual(payload["queue_entry"]["gate"], "validation_required")
self.assertEqual(payload["queue_entry"]["automated_validation_status"], "passed")
self.assertEqual(payload["queue_entry"]["automated_validation_summary"], "4/4 validacoes automaticas passaram antes da revisao humana.")
self.assertEqual(len(payload["automated_validations"]), 4)
automated_checks = {check["key"]: check for check in payload["automated_validations"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "passed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "passed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "passed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "passed")
steps_by_key = {step["key"]: step for step in payload["steps"]}
self.assertEqual(steps_by_key["manual_intake"]["state"], "completed")
self.assertEqual(steps_by_key["generation"]["state"], "completed")
self.assertEqual(steps_by_key["validation"]["state"], "current")
self.assertEqual(self.draft_repository.drafts[0].status, ToolLifecycleStatus.GENERATED)
self.assertEqual(self.version_repository.versions[0].status, ToolLifecycleStatus.GENERATED)
self.assertEqual(self.metadata_repository.metadata_entries[0].status, ToolLifecycleStatus.GENERATED)
generation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.GENERATION_REQUEST
)
self.assertEqual(generation_artifact.payload_json["pipeline_status"], "completed")
self.assertEqual(generation_artifact.payload_json["triggered_by_role"], StaffRole.COLABORADOR.value)
validation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.VALIDATION_REPORT
)
self.assertEqual(validation_artifact.artifact_status.value, "succeeded")
self.assertEqual(validation_artifact.payload_json["validation_scope"], "tool_contract")
automated_checks = {check["key"]: check for check in validation_artifact.payload_json["automated_checks"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "passed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "passed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "passed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "passed")
self.assertEqual(validation_artifact.payload_json["signature_schema"]["callable_name"], GENERATED_TOOL_ENTRYPOINT)
self.assertEqual(validation_artifact.payload_json["signature_schema"]["signature"], "run(*, placa)")
self.assertEqual(validation_artifact.payload_json["import_loading"]["loaded_callable"], GENERATED_TOOL_ENTRYPOINT)
self.assertEqual(validation_artifact.payload_json["import_loading"]["loaded_signature"], "run(*, placa)")
self.assertEqual(validation_artifact.payload_json["smoke_tests"]["direct_result_type"], "dict")
self.assertEqual(validation_artifact.payload_json["smoke_tests"]["runtime_result_type"], "dict")
self.assertEqual(validation_artifact.payload_json["smoke_tests"]["invocation_arguments"]["placa"], "sample_placa")
self.assertEqual(validation_artifact.payload_json["publication_envelope"]["target_service"], "product")
def test_run_generation_pipeline_marks_version_failed_when_contract_validation_fails(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
owner_role=StaffRole.COLABORADOR,
)
self.metadata_repository.metadata_entries[0].display_name = "No"
payload = self.service.run_generation_pipeline(
intake_payload["draft_preview"]["version_id"],
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["status"], ToolLifecycleStatus.FAILED)
self.assertEqual(payload["current_step"], "generation")
self.assertEqual(payload["queue_entry"]["gate"], "pipeline_retry_required")
self.assertEqual(payload["queue_entry"]["automated_validation_status"], "failed")
self.assertIn("revisar contrato da tool", payload["queue_entry"]["automated_validation_summary"].lower())
automated_checks = {check["key"]: check for check in payload["automated_validations"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "failed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "passed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "passed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "passed")
self.assertTrue(automated_checks["tool_contract"]["blocking_issues"])
self.assertEqual(self.draft_repository.drafts[0].status, ToolLifecycleStatus.FAILED)
self.assertEqual(self.version_repository.versions[0].status, ToolLifecycleStatus.FAILED)
self.assertEqual(self.metadata_repository.metadata_entries[0].status, ToolLifecycleStatus.FAILED)
validation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.VALIDATION_REPORT
)
self.assertEqual(validation_artifact.artifact_status.value, "failed")
self.assertIn("display_name", validation_artifact.payload_json["blocking_issues"][0])
def test_run_generation_pipeline_marks_version_failed_when_signature_schema_validation_fails(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "user_id",
"parameter_type": "string",
"description": "Identificador do usuario usado no filtro administrativo.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
owner_role=StaffRole.COLABORADOR,
)
payload = self.service.run_generation_pipeline(
intake_payload["draft_preview"]["version_id"],
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["status"], ToolLifecycleStatus.FAILED)
self.assertEqual(payload["queue_entry"]["gate"], "pipeline_retry_required")
automated_checks = {check["key"]: check for check in payload["automated_validations"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "passed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "failed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "failed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "failed")
self.assertIn("user_id", automated_checks["tool_signature_schema"]["blocking_issues"][0])
validation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.VALIDATION_REPORT
)
self.assertEqual(validation_artifact.artifact_status.value, "failed")
self.assertIn("user_id", validation_artifact.payload_json["signature_schema"]["issues"][0])
def test_run_generation_pipeline_marks_version_failed_when_import_loading_validation_fails(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
owner_role=StaffRole.COLABORADOR,
)
with patch.object(
self.service,
"_render_generated_tool_module_source",
return_value="async def run(:\n pass\n",
):
payload = self.service.run_generation_pipeline(
intake_payload["draft_preview"]["version_id"],
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["status"], ToolLifecycleStatus.FAILED)
self.assertEqual(payload["queue_entry"]["gate"], "pipeline_retry_required")
automated_checks = {check["key"]: check for check in payload["automated_validations"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "passed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "passed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "failed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "failed")
self.assertIn("SyntaxError", automated_checks["tool_import_loading"]["blocking_issues"][0])
validation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.VALIDATION_REPORT
)
self.assertEqual(validation_artifact.artifact_status.value, "failed")
self.assertIn("SyntaxError", validation_artifact.payload_json["import_loading"]["issues"][0])
self.assertIn("import/loading validation did not pass", validation_artifact.payload_json["smoke_tests"]["issues"][0])
def test_run_generation_pipeline_marks_version_failed_when_smoke_tests_fail(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
owner_role=StaffRole.COLABORADOR,
)
with patch.object(
self.service,
"_render_generated_tool_module_source",
return_value='async def run(*, placa):\n raise RuntimeError("smoke test boom")\n',
):
payload = self.service.run_generation_pipeline(
intake_payload["draft_preview"]["version_id"],
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.assertEqual(payload["status"], ToolLifecycleStatus.FAILED)
self.assertEqual(payload["queue_entry"]["gate"], "pipeline_retry_required")
automated_checks = {check["key"]: check for check in payload["automated_validations"]}
self.assertEqual(automated_checks["tool_contract"]["status"], "passed")
self.assertEqual(automated_checks["tool_signature_schema"]["status"], "passed")
self.assertEqual(automated_checks["tool_import_loading"]["status"], "passed")
self.assertEqual(automated_checks["tool_smoke_tests"]["status"], "failed")
self.assertIn("RuntimeError", automated_checks["tool_smoke_tests"]["blocking_issues"][0])
validation_artifact = next(
artifact
for artifact in self.artifact_repository.artifacts
if artifact.artifact_kind == ToolArtifactKind.VALIDATION_REPORT
)
self.assertEqual(validation_artifact.artifact_status.value, "failed")
self.assertIn("RuntimeError", validation_artifact.payload_json["smoke_tests"]["issues"][0])
def test_review_requires_generation_pipeline_before_validation(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "locacao",
"tool_name": "emitir_resumo_locacao",
"display_name": "Emitir resumo de locacao",
"description": "Resume contratos de locacao com filtros operacionais para o time interno.",
"business_goal": "Dar visibilidade rapida aos contratos e aos principais dados da locacao.",
"parameters": [],
},
owner_staff_account_id=3,
owner_name="Equipe Interna",
)
with self.assertRaisesRegex(ValueError, "generated"):
self.service.review_version(
intake_payload["draft_preview"]["version_id"],
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Aguardando a geracao controlada da funcao.",
reviewed_generated_code=True,
)
def test_director_must_review_approve_and_publish_before_activation(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "locacao",
"tool_name": "emitir_resumo_locacao",
"display_name": "Emitir resumo de locacao",
"description": "Resume contratos de locacao com filtros operacionais para o time interno.",
"business_goal": "Dar visibilidade rapida aos contratos e aos principais dados da locacao.",
"parameters": [
{
"name": "contrato_id",
"parameter_type": "string",
"description": "Identificador do contrato consultado.",
"required": True,
}
],
},
owner_staff_account_id=3,
owner_name="Equipe Interna",
)
version_id = intake_payload["draft_preview"]["version_id"]
pipeline_payload = self.service.run_generation_pipeline(
version_id,
runner_staff_account_id=3,
runner_name="Equipe Interna",
runner_role=StaffRole.COLABORADOR,
)
review_payload = self.service.review_version(
version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Analisei o codigo completo gerado e a estrutura esta aderente ao fluxo governado.",
reviewed_generated_code=True,
)
approve_payload = self.service.approve_version(
version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Aprovacao formal registrada apos revisao tecnica e leitura integral do codigo.",
)
publish_payload = self.service.publish_version(
version_id,
publisher_staff_account_id=99,
publisher_name="Diretoria",
publisher_role=StaffRole.DIRETOR,
)
self.assertEqual(pipeline_payload["status"], ToolLifecycleStatus.GENERATED)
self.assertEqual(len(pipeline_payload["automated_validations"]), 4)
self.assertTrue(all(check["status"] == "passed" for check in pipeline_payload["automated_validations"]))
self.assertEqual(review_payload["status"], ToolLifecycleStatus.VALIDATED)
self.assertEqual(review_payload["queue_entry"]["gate"], "director_approval_required")
self.assertEqual(approve_payload["status"], ToolLifecycleStatus.APPROVED)
self.assertEqual(approve_payload["queue_entry"]["gate"], "director_publication_required")
self.assertEqual(publish_payload["status"], ToolLifecycleStatus.ACTIVE)
self.assertIsNone(publish_payload["queue_entry"])
self.assertEqual(publish_payload["publication"]["tool_name"], "emitir_resumo_locacao")
self.assertEqual(self.draft_repository.drafts[0].status, ToolLifecycleStatus.ACTIVE)
self.assertEqual(self.version_repository.versions[0].status, ToolLifecycleStatus.ACTIVE)
self.assertEqual(self.metadata_repository.metadata_entries[0].status, ToolLifecycleStatus.ACTIVE)
artifact_kinds = {artifact.artifact_kind for artifact in self.artifact_repository.artifacts}
self.assertIn(ToolArtifactKind.DIRECTOR_REVIEW, artifact_kinds)
self.assertIn(ToolArtifactKind.DIRECTOR_APPROVAL, artifact_kinds)
self.assertIn(ToolArtifactKind.PUBLICATION_RELEASE, artifact_kinds)
def test_publish_requires_prior_director_review_and_approval(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
)
with self.assertRaisesRegex(ValueError, "approved"):
self.service.publish_version(
intake_payload["draft_preview"]["version_id"],
publisher_staff_account_id=99,
publisher_name="Diretoria",
publisher_role=StaffRole.DIRETOR,
)
def test_build_review_detail_payload_exposes_generated_source_and_human_history(self):
intake_payload = self.service.create_draft_submission(
{
"domain": "revisao",
"tool_name": "consultar_revisao_aberta",
"display_name": "Consultar revisao aberta",
"description": "Consulta revisoes abertas com filtros administrativos para a oficina.",
"business_goal": "Ajudar o time a localizar revisoes abertas com mais contexto operacional.",
"parameters": [
{
"name": "placa",
"parameter_type": "string",
"description": "Placa usada na busca da revisao.",
"required": True,
}
],
},
owner_staff_account_id=8,
owner_name="Operacao de Oficina",
)
version_id = intake_payload["draft_preview"]["version_id"]
self.service.run_generation_pipeline(
version_id,
runner_staff_account_id=8,
runner_name="Operacao de Oficina",
runner_role=StaffRole.COLABORADOR,
)
self.service.review_version(
version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Analise completa do codigo gerado antes da validacao humana.",
reviewed_generated_code=True,
)
self.service.approve_version(
version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Aprovacao formal da versao apos revisao humana detalhada.",
)
payload = self.service.build_review_detail_payload(version_id)
self.assertEqual(payload["status"], ToolLifecycleStatus.APPROVED)
self.assertEqual(payload["human_gate"]["publication_action_available"], True)
self.assertIn("async def run", payload["generated_source_code"])
self.assertEqual(len(payload["automated_validations"]), 4)
self.assertEqual(len(payload["decision_history"]), 2)
self.assertEqual(payload["decision_history"][0]["action_key"], ToolArtifactKind.DIRECTOR_REVIEW.value)
self.assertTrue(payload["decision_history"][0]["reviewed_generated_code"])
self.assertIn("aprovacao formal", payload["decision_history"][1]["decision_notes"].lower())
def test_publishing_new_version_archives_previous_active_version(self):
first_intake = self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_funil_comercial",
"display_name": "Consultar funil comercial",
"description": "Consulta o funil comercial consolidado para acompanhamento administrativo.",
"business_goal": "Dar visibilidade ao time interno sobre os principais gargalos do funil.",
"parameters": [],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
)
first_version_id = first_intake["draft_preview"]["version_id"]
self.service.run_generation_pipeline(first_version_id, runner_staff_account_id=7, runner_name="Equipe Interna", runner_role=StaffRole.COLABORADOR)
self.service.review_version(
first_version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Primeira versao revisada com leitura integral do codigo gerado.",
reviewed_generated_code=True,
)
self.service.approve_version(
first_version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Primeira versao aprovada para ativacao controlada.",
)
self.service.publish_version(first_version_id, publisher_staff_account_id=99, publisher_name="Diretoria", publisher_role=StaffRole.DIRETOR)
second_intake = self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_funil_comercial",
"display_name": "Consultar funil comercial",
"description": "Consulta o funil comercial consolidado com campos adicionais para acompanhamento administrativo.",
"business_goal": "Dar visibilidade ao time interno sobre gargalos, volume e conversao do funil.",
"parameters": [],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
)
second_version_id = second_intake["draft_preview"]["version_id"]
self.service.run_generation_pipeline(second_version_id, runner_staff_account_id=7, runner_name="Equipe Interna", runner_role=StaffRole.COLABORADOR)
self.service.review_version(
second_version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Nova versao revisada com comparativo do codigo completo gerado.",
reviewed_generated_code=True,
)
self.service.approve_version(
second_version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Nova versao aprovada para substituir a publicacao anterior.",
)
self.service.publish_version(second_version_id, publisher_staff_account_id=99, publisher_name="Diretoria", publisher_role=StaffRole.DIRETOR)
versions_by_number = {version.version_number: version for version in self.version_repository.versions}
metadata_by_number = {metadata.version_number: metadata for metadata in self.metadata_repository.metadata_entries}
self.assertEqual(versions_by_number[1].status, ToolLifecycleStatus.ARCHIVED)
self.assertEqual(metadata_by_number[1].status, ToolLifecycleStatus.ARCHIVED)
self.assertEqual(versions_by_number[2].status, ToolLifecycleStatus.ACTIVE)
self.assertEqual(metadata_by_number[2].status, ToolLifecycleStatus.ACTIVE)
def test_deactivating_active_version_archives_publication_and_removes_tool_from_catalog(self):
intake = self.service.create_draft_submission(
{
"domain": "locacao",
"tool_name": "emitir_resumo_locacao",
"display_name": "Emitir resumo de locacao",
"description": "Resume contratos de locacao com filtros operacionais para o time interno.",
"business_goal": "Dar visibilidade rapida aos contratos e aos principais dados da locacao.",
"parameters": [],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
)
version_id = intake["draft_preview"]["version_id"]
self.service.run_generation_pipeline(version_id, runner_staff_account_id=7, runner_name="Equipe Interna", runner_role=StaffRole.COLABORADOR)
self.service.review_version(
version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Analisei a versao ativa antes da desativacao controlada.",
reviewed_generated_code=True,
)
self.service.approve_version(
version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Aprovacao formal para ativar e depois validar a desativacao controlada.",
)
self.service.publish_version(version_id, publisher_staff_account_id=99, publisher_name="Diretoria", publisher_role=StaffRole.DIRETOR)
payload = self.service.deactivate_version(
version_id,
actor_staff_account_id=99,
actor_name="Diretoria",
actor_role=StaffRole.DIRETOR,
decision_notes="Desativacao controlada da tool apos encerramento temporario do uso.",
)
self.assertEqual(payload["status"], ToolLifecycleStatus.ARCHIVED)
self.assertIsNone(payload["queue_entry"])
detail = self.service.build_review_detail_payload(version_id)
self.assertEqual(detail["status"], ToolLifecycleStatus.ARCHIVED)
self.assertFalse(detail["human_gate"]["deactivation_action_available"])
self.assertFalse(detail["human_gate"]["rollback_action_available"])
self.assertEqual(detail["decision_history"][-1]["action_key"], ToolArtifactKind.PUBLICATION_DEACTIVATION.value)
publications = self.service.build_publications_payload()
self.assertNotIn("emitir_resumo_locacao", [item["tool_name"] for item in publications["publications"]])
def test_rollback_restores_latest_archived_version_into_active_catalog(self):
first_intake = self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_funil_comercial",
"display_name": "Consultar funil comercial",
"description": "Consulta o funil comercial consolidado para acompanhamento administrativo.",
"business_goal": "Dar visibilidade ao time interno sobre os principais gargalos do funil.",
"parameters": [],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
)
first_version_id = first_intake["draft_preview"]["version_id"]
self.service.run_generation_pipeline(first_version_id, runner_staff_account_id=7, runner_name="Equipe Interna", runner_role=StaffRole.COLABORADOR)
self.service.review_version(
first_version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Primeira versao revisada antes da futura ativacao controlada.",
reviewed_generated_code=True,
)
self.service.approve_version(
first_version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Primeira versao aprovada para publicacao inicial.",
)
self.service.publish_version(first_version_id, publisher_staff_account_id=99, publisher_name="Diretoria", publisher_role=StaffRole.DIRETOR)
second_intake = self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consultar_funil_comercial",
"display_name": "Consultar funil comercial",
"description": "Consulta o funil comercial consolidado com campos adicionais para acompanhamento administrativo.",
"business_goal": "Dar visibilidade ao time interno sobre gargalos, volume e conversao do funil.",
"parameters": [],
},
owner_staff_account_id=7,
owner_name="Equipe Interna",
)
second_version_id = second_intake["draft_preview"]["version_id"]
self.service.run_generation_pipeline(second_version_id, runner_staff_account_id=7, runner_name="Equipe Interna", runner_role=StaffRole.COLABORADOR)
self.service.review_version(
second_version_id,
reviewer_staff_account_id=99,
reviewer_name="Diretoria",
reviewer_role=StaffRole.DIRETOR,
decision_notes="Nova versao revisada com leitura integral antes da substituicao.",
reviewed_generated_code=True,
)
self.service.approve_version(
second_version_id,
approver_staff_account_id=99,
approver_name="Diretoria",
approver_role=StaffRole.DIRETOR,
decision_notes="Nova versao aprovada para substituir a publicacao anterior.",
)
self.service.publish_version(second_version_id, publisher_staff_account_id=99, publisher_name="Diretoria", publisher_role=StaffRole.DIRETOR)
active_detail = self.service.build_review_detail_payload(second_version_id)
self.assertTrue(active_detail["human_gate"]["deactivation_action_available"])
self.assertTrue(active_detail["human_gate"]["rollback_action_available"])
self.assertEqual(active_detail["human_gate"]["rollback_target_version_number"], 1)
payload = self.service.rollback_version(
second_version_id,
actor_staff_account_id=99,
actor_name="Diretoria",
actor_role=StaffRole.DIRETOR,
decision_notes="Rollback controlado para restaurar a versao anterior mais estavel.",
)
versions_by_number = {version.version_number: version for version in self.version_repository.versions}
metadata_by_number = {metadata.version_number: metadata for metadata in self.metadata_repository.metadata_entries}
self.assertEqual(payload["status"], ToolLifecycleStatus.ACTIVE)
self.assertEqual(payload["version_id"], first_version_id)
self.assertEqual(versions_by_number[1].status, ToolLifecycleStatus.ACTIVE)
self.assertEqual(metadata_by_number[1].status, ToolLifecycleStatus.ACTIVE)
self.assertEqual(versions_by_number[2].status, ToolLifecycleStatus.ARCHIVED)
self.assertEqual(metadata_by_number[2].status, ToolLifecycleStatus.ARCHIVED)
restored_detail = self.service.build_review_detail_payload(first_version_id)
self.assertEqual(restored_detail["decision_history"][-1]["action_key"], ToolArtifactKind.PUBLICATION_ROLLBACK.value)
publications = self.service.build_publications_payload()
restored_publication = next(item for item in publications["publications"] if item["tool_name"] == "consultar_funil_comercial")
self.assertEqual(restored_publication["version_id"], first_version_id)
self.assertTrue(restored_publication["deactivation_action_available"])
class AdminToolManagementTransactionalPersistenceTests(unittest.TestCase):
def setUp(self):
self.engine = create_engine("sqlite:///:memory:")
AdminBase.metadata.create_all(bind=self.engine)
session_local = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
self.db = session_local()
owner = StaffAccount(
email="tools-admin@example.com",
display_name="Equipe Interna",
password_hash="hash",
role=StaffRole.COLABORADOR,
is_active=True,
)
self.db.add(owner)
self.db.commit()
self.db.refresh(owner)
self.owner = owner
self.draft_repository = ToolDraftRepository(self.db)
self.version_repository = ToolVersionRepository(self.db)
self.metadata_repository = ToolMetadataRepository(self.db)
self.artifact_repository = _FailingToolArtifactRepository(self.db, fail_on_call=2)
self.service = ToolManagementService(
settings=AdminSettings(admin_api_prefix="/admin"),
draft_repository=self.draft_repository,
version_repository=self.version_repository,
metadata_repository=self.metadata_repository,
artifact_repository=self.artifact_repository,
)
def tearDown(self):
self.db.close()
self.engine.dispose()
def test_create_draft_submission_rolls_back_all_rows_when_artifact_persistence_fails(self):
with self.assertRaisesRegex(RuntimeError, "artifact persistence failure"):
self.service.create_draft_submission(
{
"domain": "vendas",
"tool_name": "consolidar_funil_interno",
"display_name": "Consolidar funil interno",
"description": "Consolida indicadores internos do funil comercial para acompanhamento administrativo.",
"business_goal": "Permitir que o time acompanhe a saude do funil sem depender de consultas manuais repetidas.",
"parameters": [
{
"name": "periodo_inicio",
"parameter_type": "string",
"description": "Data inicial usada na consolidacao.",
"required": True,
}
],
},
owner_staff_account_id=self.owner.id,
owner_name=self.owner.display_name,
)
self.assertEqual(self.draft_repository.list_drafts(), [])
self.assertEqual(self.version_repository.list_versions(), [])
self.assertEqual(self.metadata_repository.list_metadata(), [])
self.assertEqual(self.artifact_repository.list_artifacts(), [])
if __name__ == "__main__":
unittest.main()