Compare commits
No commits in common. '7e380a9c65ff52e99e2a7307dba6f28da9dd1a4d' and '640e42249819e29774bbd2f4b02f14b2b754eb87' have entirely different histories.
7e380a9c65
...
640e422498
@ -1,444 +0,0 @@
|
|||||||
"""Serviço isolado de geração de tools via LLM para o runtime administrativo.
|
|
||||||
|
|
||||||
Este módulo é a única camada do admin_app que conversa com o Vertex AI para fins
|
|
||||||
de geração de código. Ele é completamente separado do LLMService do product
|
|
||||||
(app.services.ai.llm_service) e usa configurações próprias do AdminSettings.
|
|
||||||
|
|
||||||
Separação arquitetural garantida por:
|
|
||||||
- shared.contracts.model_runtime_separation.ModelRuntimeTarget.TOOL_GENERATION
|
|
||||||
- config keys: admin_tool_generation_model / admin_tool_generation_fallback_model
|
|
||||||
- Nenhuma importação de app.* é permitida neste módulo.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import re
|
|
||||||
from time import perf_counter
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import vertexai
|
|
||||||
from google.api_core.exceptions import GoogleAPIError, NotFound
|
|
||||||
from vertexai.generative_models import GenerationConfig, GenerativeModel
|
|
||||||
|
|
||||||
from admin_app.core.settings import AdminSettings
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# ---- Constantes de geração ---------------------------------------------------
|
|
||||||
|
|
||||||
_PYTHON_BLOCK_RE = re.compile(
|
|
||||||
r"```python\s*\n(.*?)```",
|
|
||||||
re.DOTALL | re.IGNORECASE,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Padrões que o código gerado não pode conter.
|
|
||||||
# Aplicados antes das validações automáticas existentes no ToolManagementService.
|
|
||||||
_DANGEROUS_PATTERNS: tuple[tuple[str, str], ...] = (
|
|
||||||
(r"\bexec\s*\(", "uso de exec() proibido em tools geradas"),
|
|
||||||
(r"\beval\s*\(", "uso de eval() proibido em tools geradas"),
|
|
||||||
(r"\b__import__\s*\(", "uso de __import__() proibido em tools geradas"),
|
|
||||||
(r"os\.system\s*\(", "chamada a os.system() proibida em tools geradas"),
|
|
||||||
(r"os\.popen\s*\(", "chamada a os.popen() proibida em tools geradas"),
|
|
||||||
(r"\bsubprocess\b", "uso de subprocess proibido em tools geradas"),
|
|
||||||
(r"from\s+app\.", "importação de app.* proibida em tools geradas"),
|
|
||||||
(r"from\s+admin_app\.", "importação de admin_app.* proibida em tools geradas"),
|
|
||||||
(r"import\s+app\b", "importação direta de app proibida em tools geradas"),
|
|
||||||
(r"import\s+admin_app\b", "importação direta de admin_app proibida em tools geradas"),
|
|
||||||
(r"\bopen\s*\(", "acesso a sistema de arquivos via open() proibido em tools geradas"),
|
|
||||||
(r"__builtins__", "acesso a __builtins__ proibido em tools geradas"),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Mapeamento de tipo de parâmetro para anotação Python legível
|
|
||||||
_TYPE_ANNOTATION_MAP: dict[str, str] = {
|
|
||||||
"string": "str",
|
|
||||||
"integer": "int",
|
|
||||||
"number": "float",
|
|
||||||
"boolean": "bool",
|
|
||||||
"object": "dict",
|
|
||||||
"array": "list",
|
|
||||||
}
|
|
||||||
|
|
||||||
# Cache de modelos Vertex AI instanciados (por nome de modelo)
|
|
||||||
_MODEL_CACHE: dict[str, GenerativeModel] = {}
|
|
||||||
|
|
||||||
# Flag de controle de inicialização do SDK (evita reinit por instância)
|
|
||||||
_VERTEX_INITIALIZED: bool = False
|
|
||||||
|
|
||||||
|
|
||||||
class ToolGenerationService:
|
|
||||||
"""Gera implementações de tools via Vertex AI no contexto administrativo.
|
|
||||||
|
|
||||||
Responsabilidades:
|
|
||||||
- Construir prompt estruturado a partir dos metadados da tool
|
|
||||||
- Chamar o modelo LLM de geração (separado do modelo de atendimento)
|
|
||||||
- Extrair o bloco de código Python da resposta
|
|
||||||
- Aplicar linting de segurança antes de devolver o código
|
|
||||||
- Retornar resultado estruturado para o ToolManagementService
|
|
||||||
|
|
||||||
Não faz:
|
|
||||||
- Não persiste artefatos (responsabilidade do ToolManagementService)
|
|
||||||
- Não valida contrato nem assinatura (responsabilidade do ToolManagementService)
|
|
||||||
- Não executa o código gerado
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, settings: AdminSettings) -> None:
|
|
||||||
self.settings = settings
|
|
||||||
self._ensure_vertex_initialized()
|
|
||||||
|
|
||||||
def _ensure_vertex_initialized(self) -> None:
|
|
||||||
global _VERTEX_INITIALIZED
|
|
||||||
if _VERTEX_INITIALIZED:
|
|
||||||
return
|
|
||||||
# Reutiliza as credenciais do projeto Google já configuradas nas settings
|
|
||||||
# do admin (que leem do .env, idêntico ao product). O isolamento é nos
|
|
||||||
# parâmetros de modelo e temperatura — não na conta GCP.
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
project_id = os.environ.get("GOOGLE_PROJECT_ID", "")
|
|
||||||
location = os.environ.get("GOOGLE_LOCATION", "us-central1")
|
|
||||||
vertexai.init(project=project_id, location=location)
|
|
||||||
_VERTEX_INITIALIZED = True
|
|
||||||
logger.info(
|
|
||||||
"tool_generation_service_event=vertex_initialized project=%s location=%s",
|
|
||||||
project_id,
|
|
||||||
location,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=vertex_init_warning error=%s",
|
|
||||||
exc,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_model(self, model_name: str) -> GenerativeModel:
|
|
||||||
model = _MODEL_CACHE.get(model_name)
|
|
||||||
if model is None:
|
|
||||||
model = GenerativeModel(model_name)
|
|
||||||
_MODEL_CACHE[model_name] = model
|
|
||||||
return model
|
|
||||||
|
|
||||||
def _build_model_sequence(self, preferred_model: str | None) -> list[str]:
|
|
||||||
"""Constrói a sequência de modelos a tentar, respeitando o preferred e o fallback."""
|
|
||||||
sequence: list[str] = []
|
|
||||||
candidates = [
|
|
||||||
preferred_model,
|
|
||||||
self.settings.admin_tool_generation_model,
|
|
||||||
self.settings.admin_tool_generation_fallback_model,
|
|
||||||
]
|
|
||||||
for candidate in candidates:
|
|
||||||
normalized = str(candidate or "").strip()
|
|
||||||
if normalized and normalized not in sequence:
|
|
||||||
sequence.append(normalized)
|
|
||||||
return sequence
|
|
||||||
|
|
||||||
def _build_generation_prompt(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
tool_name: str,
|
|
||||||
display_name: str,
|
|
||||||
domain: str,
|
|
||||||
description: str,
|
|
||||||
business_goal: str,
|
|
||||||
parameters: list[dict],
|
|
||||||
) -> str:
|
|
||||||
"""Monta o prompt estruturado de geração enviado ao modelo.
|
|
||||||
|
|
||||||
O prompt descreve o contrato esperado, os restrições de importação,
|
|
||||||
os parâmetros e o objetivo operacional da tool.
|
|
||||||
"""
|
|
||||||
signature_parts: list[str] = []
|
|
||||||
parameter_lines: list[str] = []
|
|
||||||
|
|
||||||
for param in parameters:
|
|
||||||
name = str(param.get("name") or "").strip().lower()
|
|
||||||
if not name:
|
|
||||||
continue
|
|
||||||
param_type = str(param.get("parameter_type") or "string").strip().lower()
|
|
||||||
description_param = str(param.get("description") or "").strip()
|
|
||||||
required = bool(param.get("required", True))
|
|
||||||
annotation = _TYPE_ANNOTATION_MAP.get(param_type, "str")
|
|
||||||
|
|
||||||
if required:
|
|
||||||
signature_parts.append(f"{name}: {annotation}")
|
|
||||||
else:
|
|
||||||
signature_parts.append(f"{name}: {annotation} | None = None")
|
|
||||||
|
|
||||||
required_label = "obrigatório" if required else "opcional"
|
|
||||||
parameter_lines.append(
|
|
||||||
f" - {name} ({annotation}, {required_label}): {description_param}"
|
|
||||||
)
|
|
||||||
|
|
||||||
signature = ", ".join(signature_parts)
|
|
||||||
if signature:
|
|
||||||
full_signature = f"async def run(*, {signature}) -> dict:"
|
|
||||||
else:
|
|
||||||
full_signature = "async def run() -> dict:"
|
|
||||||
|
|
||||||
parameters_block = (
|
|
||||||
"\n".join(parameter_lines)
|
|
||||||
if parameter_lines
|
|
||||||
else " (nenhum parâmetro — a tool não recebe entrada contextual)"
|
|
||||||
)
|
|
||||||
|
|
||||||
domain_context_map = {
|
|
||||||
"vendas": (
|
|
||||||
"O bot atua em um sistema de atendimento para concessionária automotiva. "
|
|
||||||
"A tool opera no domínio de vendas: estoque de veículos, negociações, pedidos e cancelamentos."
|
|
||||||
),
|
|
||||||
"revisao": (
|
|
||||||
"O bot atua em um sistema de atendimento de oficina automotiva. "
|
|
||||||
"A tool opera no domínio de revisão: agendamentos, remarcações, listagem de serviços."
|
|
||||||
),
|
|
||||||
"locacao": (
|
|
||||||
"O bot atua em um sistema de atendimento de locadora de veículos. "
|
|
||||||
"A tool opera no domínio de locação: frota, contratos, pagamentos e devoluções."
|
|
||||||
),
|
|
||||||
"orquestracao": (
|
|
||||||
"O bot atua em um sistema de orquestração conversacional. "
|
|
||||||
"A tool opera no domínio de orquestração: controla fluxo, contexto e estado da conversa."
|
|
||||||
),
|
|
||||||
}
|
|
||||||
domain_context = domain_context_map.get(
|
|
||||||
str(domain or "").strip().lower(),
|
|
||||||
"O bot atua em um sistema de atendimento automatizado.",
|
|
||||||
)
|
|
||||||
|
|
||||||
return (
|
|
||||||
"Você é um especialista em Python que gera implementações realistas de tools "
|
|
||||||
"para um bot de atendimento.\n\n"
|
|
||||||
f"CONTEXTO DO DOMÍNIO:\n{domain_context}\n\n"
|
|
||||||
"CONTRATO OBRIGATÓRIO:\n"
|
|
||||||
"- A função deve ser assíncrona: async def run(...)\n"
|
|
||||||
"- Todos os parâmetros devem ser keyword-only (após *)\n"
|
|
||||||
"- O tipo de retorno deve ser dict (JSON-serializável)\n"
|
|
||||||
"- O módulo pode importar apenas stdlib (datetime, json, re, math, uuid, etc.)\n"
|
|
||||||
"- Proibido importar: app.*, admin_app.*, subprocess, os.system, os.popen\n"
|
|
||||||
"- Proibido usar: exec(), eval(), __import__(), open()\n\n"
|
|
||||||
"TOOL A IMPLEMENTAR:\n"
|
|
||||||
f"- Nome técnico: {tool_name}\n"
|
|
||||||
f"- Nome de exibição: {display_name}\n"
|
|
||||||
f"- Domínio: {domain}\n"
|
|
||||||
f"- Descrição funcional: {description}\n"
|
|
||||||
f"- Objetivo de negócio: {business_goal}\n\n"
|
|
||||||
f"PARÂMETROS DA TOOL:\n{parameters_block}\n\n"
|
|
||||||
f"ASSINATURA ESPERADA:\n{full_signature}\n\n"
|
|
||||||
"INSTRUÇÕES DE GERAÇÃO:\n"
|
|
||||||
"- Gere uma implementação realista que simule o comportamento esperado da tool.\n"
|
|
||||||
"- O retorno deve incluir os campos relevantes ao domínio (não apenas echo dos argumentos).\n"
|
|
||||||
"- Use dados fictícios mas verossímeis para simular a resposta operacional.\n"
|
|
||||||
"- Nenhuma explicação ou comentário fora do código. Retorne apenas o bloco Python.\n"
|
|
||||||
"- O módulo deve começar com um docstring descritivo.\n"
|
|
||||||
"- Envolva o código em ```python ... ```.\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _extract_python_block(self, raw_response: str) -> str | None:
|
|
||||||
"""Extrai o primeiro bloco ```python ... ``` da resposta do modelo."""
|
|
||||||
normalized = str(raw_response or "").strip()
|
|
||||||
match = _PYTHON_BLOCK_RE.search(normalized)
|
|
||||||
if match:
|
|
||||||
return match.group(1).strip()
|
|
||||||
# Fallback: se não há marcador de código mas o conteúdo parece Python
|
|
||||||
if normalized.startswith("async def run") or normalized.startswith('"""'):
|
|
||||||
return normalized
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _apply_safety_linting(self, source_code: str) -> list[str]:
|
|
||||||
"""Verifica padrões perigosos no código gerado antes da validação formal.
|
|
||||||
|
|
||||||
Retorna lista de issues. Lista vazia = linting passou.
|
|
||||||
"""
|
|
||||||
issues: list[str] = []
|
|
||||||
for pattern, description in _DANGEROUS_PATTERNS:
|
|
||||||
if re.search(pattern, source_code, re.MULTILINE):
|
|
||||||
issues.append(f"linting: {description}.")
|
|
||||||
return issues
|
|
||||||
|
|
||||||
async def generate_tool_source(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
tool_name: str,
|
|
||||||
display_name: str,
|
|
||||||
domain: str,
|
|
||||||
description: str,
|
|
||||||
business_goal: str,
|
|
||||||
parameters: list[dict],
|
|
||||||
preferred_model: str | None = None,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Gera o código Python da tool a partir dos metadados do draft.
|
|
||||||
|
|
||||||
Retorna um dicionário com:
|
|
||||||
- passed (bool): True se o código foi gerado e passou no linting
|
|
||||||
- generated_source_code (str | None): código Python gerado
|
|
||||||
- generation_model_used (str | None): modelo que gerou o código
|
|
||||||
- prompt_rendered (str): prompt enviado ao modelo (para auditoria)
|
|
||||||
- issues (list[str]): problemas encontrados (geração ou linting)
|
|
||||||
- elapsed_ms (float): tempo total de geração em milissegundos
|
|
||||||
"""
|
|
||||||
prompt = self._build_generation_prompt(
|
|
||||||
tool_name=tool_name,
|
|
||||||
display_name=display_name,
|
|
||||||
domain=domain,
|
|
||||||
description=description,
|
|
||||||
business_goal=business_goal,
|
|
||||||
parameters=parameters,
|
|
||||||
)
|
|
||||||
|
|
||||||
model_sequence = self._build_model_sequence(preferred_model)
|
|
||||||
generation_config = GenerationConfig(
|
|
||||||
temperature=self.settings.admin_tool_generation_temperature,
|
|
||||||
max_output_tokens=self.settings.admin_tool_generation_max_output_tokens,
|
|
||||||
)
|
|
||||||
|
|
||||||
raw_response: str | None = None
|
|
||||||
generation_model_used: str | None = None
|
|
||||||
last_error: Exception | None = None
|
|
||||||
started_at = perf_counter()
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
for model_name in model_sequence:
|
|
||||||
try:
|
|
||||||
model = self._get_model(model_name)
|
|
||||||
response = await asyncio.wait_for(
|
|
||||||
asyncio.to_thread(
|
|
||||||
model.generate_content,
|
|
||||||
prompt,
|
|
||||||
generation_config=generation_config,
|
|
||||||
),
|
|
||||||
timeout=float(self.settings.admin_tool_generation_timeout_seconds),
|
|
||||||
)
|
|
||||||
candidate = (
|
|
||||||
response.candidates[0]
|
|
||||||
if getattr(response, "candidates", None)
|
|
||||||
else None
|
|
||||||
)
|
|
||||||
content = getattr(candidate, "content", None)
|
|
||||||
parts = list(getattr(content, "parts", None) or [])
|
|
||||||
text_parts = [
|
|
||||||
getattr(part, "text", None)
|
|
||||||
for part in parts
|
|
||||||
if isinstance(getattr(part, "text", None), str)
|
|
||||||
]
|
|
||||||
raw_response = "\n".join(
|
|
||||||
t for t in text_parts if t and t.strip()
|
|
||||||
).strip() or None
|
|
||||||
|
|
||||||
if raw_response is None:
|
|
||||||
# Fallback para o atributo .text raiz
|
|
||||||
try:
|
|
||||||
raw_response = str(response.text or "").strip() or None
|
|
||||||
except (AttributeError, ValueError):
|
|
||||||
raw_response = None
|
|
||||||
|
|
||||||
generation_model_used = model_name
|
|
||||||
break
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
last_error = TimeoutError(
|
|
||||||
f"modelo '{model_name}' excedeu o timeout de "
|
|
||||||
f"{self.settings.admin_tool_generation_timeout_seconds}s para geração de tools."
|
|
||||||
)
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=timeout model=%s timeout_seconds=%s",
|
|
||||||
model_name,
|
|
||||||
self.settings.admin_tool_generation_timeout_seconds,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
except NotFound as exc:
|
|
||||||
last_error = exc
|
|
||||||
_MODEL_CACHE.pop(model_name, None)
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=model_not_found model=%s error=%s",
|
|
||||||
model_name,
|
|
||||||
exc,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
except GoogleAPIError as exc:
|
|
||||||
last_error = exc
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=api_error model=%s error=%s",
|
|
||||||
model_name,
|
|
||||||
exc,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
except Exception as exc:
|
|
||||||
last_error = exc
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=unexpected_error model=%s error=%s class=%s",
|
|
||||||
model_name,
|
|
||||||
exc,
|
|
||||||
exc.__class__.__name__,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
|
|
||||||
|
|
||||||
if raw_response is None or generation_model_used is None:
|
|
||||||
error_detail = str(last_error) if last_error else "nenhum modelo disponivel respondeu"
|
|
||||||
logger.error(
|
|
||||||
"tool_generation_service_event=generation_failed tool_name=%s elapsed_ms=%s error=%s",
|
|
||||||
tool_name,
|
|
||||||
elapsed_ms,
|
|
||||||
error_detail,
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"passed": False,
|
|
||||||
"generated_source_code": None,
|
|
||||||
"generation_model_used": None,
|
|
||||||
"prompt_rendered": prompt,
|
|
||||||
"issues": [f"falha na geração via LLM: {error_detail}"],
|
|
||||||
"elapsed_ms": elapsed_ms,
|
|
||||||
}
|
|
||||||
|
|
||||||
generated_source_code = self._extract_python_block(raw_response)
|
|
||||||
if generated_source_code is None:
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=no_code_block tool_name=%s model=%s elapsed_ms=%s",
|
|
||||||
tool_name,
|
|
||||||
generation_model_used,
|
|
||||||
elapsed_ms,
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"passed": False,
|
|
||||||
"generated_source_code": None,
|
|
||||||
"generation_model_used": generation_model_used,
|
|
||||||
"prompt_rendered": prompt,
|
|
||||||
"issues": ["o modelo não retornou um bloco de código Python identificável."],
|
|
||||||
"elapsed_ms": elapsed_ms,
|
|
||||||
}
|
|
||||||
|
|
||||||
linting_issues = self._apply_safety_linting(generated_source_code)
|
|
||||||
if linting_issues:
|
|
||||||
logger.warning(
|
|
||||||
"tool_generation_service_event=linting_failed tool_name=%s model=%s issues=%s elapsed_ms=%s",
|
|
||||||
tool_name,
|
|
||||||
generation_model_used,
|
|
||||||
linting_issues,
|
|
||||||
elapsed_ms,
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"passed": False,
|
|
||||||
"generated_source_code": generated_source_code,
|
|
||||||
"generation_model_used": generation_model_used,
|
|
||||||
"prompt_rendered": prompt,
|
|
||||||
"issues": linting_issues,
|
|
||||||
"elapsed_ms": elapsed_ms,
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"tool_generation_service_event=generation_succeeded tool_name=%s model=%s elapsed_ms=%s",
|
|
||||||
tool_name,
|
|
||||||
generation_model_used,
|
|
||||||
elapsed_ms,
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"passed": True,
|
|
||||||
"generated_source_code": generated_source_code,
|
|
||||||
"generation_model_used": generation_model_used,
|
|
||||||
"prompt_rendered": prompt,
|
|
||||||
"issues": [],
|
|
||||||
"elapsed_ms": elapsed_ms,
|
|
||||||
}
|
|
||||||
@ -1,266 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import threading
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from time import perf_counter
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from admin_app.core.settings import AdminSettings
|
|
||||||
from admin_app.db.database import AdminSessionLocal
|
|
||||||
from admin_app.repositories import (
|
|
||||||
ToolArtifactRepository,
|
|
||||||
ToolDraftRepository,
|
|
||||||
ToolMetadataRepository,
|
|
||||||
ToolVersionRepository,
|
|
||||||
)
|
|
||||||
from admin_app.services.tool_generation_service import ToolGenerationService
|
|
||||||
|
|
||||||
|
|
||||||
class ToolGenerationWorkerService:
|
|
||||||
"""Executa a pipeline de geracao em um worker dedicado do runtime admin.
|
|
||||||
|
|
||||||
O worker abre a propria sessao administrativa e cria uma instancia isolada do
|
|
||||||
ToolManagementService dentro da thread dedicada. Assim, a geracao e as
|
|
||||||
validacoes nao compartilham a sessao SQLAlchemy da request web nem o pool de
|
|
||||||
threads padrao usado pelas rotas sync do FastAPI.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_THREAD_NAME_PREFIX = "admin-tool-generation-worker"
|
|
||||||
_DEFAULT_POLL_AFTER_MS = 1200
|
|
||||||
|
|
||||||
def __init__(self, settings: AdminSettings) -> None:
|
|
||||||
self.settings = settings
|
|
||||||
self.max_workers = max(1, int(settings.admin_tool_generation_worker_max_workers))
|
|
||||||
self._executor = ThreadPoolExecutor(
|
|
||||||
max_workers=self.max_workers,
|
|
||||||
thread_name_prefix=self._THREAD_NAME_PREFIX,
|
|
||||||
)
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._pending_jobs = 0
|
|
||||||
self._jobs: dict[str, dict[str, Any]] = {}
|
|
||||||
|
|
||||||
def shutdown(self, *, wait: bool = False) -> None:
|
|
||||||
self._executor.shutdown(wait=wait, cancel_futures=True)
|
|
||||||
|
|
||||||
def execute_generation_pipeline(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
version_id: str,
|
|
||||||
runner_staff_account_id: int,
|
|
||||||
runner_name: str,
|
|
||||||
runner_role,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
submitted_at = datetime.now(UTC).isoformat()
|
|
||||||
with self._lock:
|
|
||||||
self._pending_jobs += 1
|
|
||||||
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
|
||||||
|
|
||||||
started_at = perf_counter()
|
|
||||||
future = self._executor.submit(
|
|
||||||
self._run_generation_pipeline_job,
|
|
||||||
version_id,
|
|
||||||
runner_staff_account_id,
|
|
||||||
runner_name,
|
|
||||||
runner_role,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
payload = future.result()
|
|
||||||
finally:
|
|
||||||
with self._lock:
|
|
||||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
|
||||||
pending_jobs_after_completion = self._pending_jobs
|
|
||||||
|
|
||||||
execution = {
|
|
||||||
"mode": "dedicated_generation_worker",
|
|
||||||
"target": "admin_tool_generation_worker",
|
|
||||||
"dispatch_state": "completed",
|
|
||||||
"worker_max_workers": self.max_workers,
|
|
||||||
"worker_pending_jobs": pending_jobs_after_completion,
|
|
||||||
"queued_jobs_before_submit": queued_jobs_before_submit,
|
|
||||||
"submitted_at": submitted_at,
|
|
||||||
"started_at": submitted_at,
|
|
||||||
"completed_at": datetime.now(UTC).isoformat(),
|
|
||||||
"elapsed_ms": round((perf_counter() - started_at) * 1000, 2),
|
|
||||||
"worker_thread_name": str(payload.pop("_worker_thread_name", "")) or None,
|
|
||||||
"poll_after_ms": None,
|
|
||||||
"last_error": None,
|
|
||||||
}
|
|
||||||
enriched_payload = dict(payload)
|
|
||||||
enriched_payload["execution"] = execution
|
|
||||||
return enriched_payload
|
|
||||||
|
|
||||||
def dispatch_generation_pipeline(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
version_id: str,
|
|
||||||
runner_staff_account_id: int,
|
|
||||||
runner_name: str,
|
|
||||||
runner_role,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
normalized_version_id = str(version_id or "").strip().lower()
|
|
||||||
if not normalized_version_id:
|
|
||||||
raise ValueError("Versao administrativa invalida para o worker de geracao.")
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
existing_job = self._jobs.get(normalized_version_id)
|
|
||||||
if existing_job is not None and existing_job.get("dispatch_state") in {"queued", "running"}:
|
|
||||||
return self._build_dispatch_snapshot_locked(existing_job)
|
|
||||||
|
|
||||||
self._pending_jobs += 1
|
|
||||||
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
|
||||||
job = {
|
|
||||||
"version_id": normalized_version_id,
|
|
||||||
"dispatch_state": "queued",
|
|
||||||
"queued_jobs_before_submit": queued_jobs_before_submit,
|
|
||||||
"submitted_at": datetime.now(UTC).isoformat(),
|
|
||||||
"started_at": None,
|
|
||||||
"completed_at": None,
|
|
||||||
"elapsed_ms": None,
|
|
||||||
"worker_thread_name": None,
|
|
||||||
"last_error": None,
|
|
||||||
"result_payload": None,
|
|
||||||
}
|
|
||||||
self._jobs[normalized_version_id] = job
|
|
||||||
self._executor.submit(
|
|
||||||
self._run_generation_pipeline_job_async,
|
|
||||||
normalized_version_id,
|
|
||||||
runner_staff_account_id,
|
|
||||||
runner_name,
|
|
||||||
runner_role,
|
|
||||||
)
|
|
||||||
return self._build_dispatch_snapshot_locked(job)
|
|
||||||
|
|
||||||
def get_generation_pipeline_dispatch(self, version_id: str) -> dict[str, Any] | None:
|
|
||||||
normalized_version_id = str(version_id or "").strip().lower()
|
|
||||||
if not normalized_version_id:
|
|
||||||
return None
|
|
||||||
with self._lock:
|
|
||||||
job = self._jobs.get(normalized_version_id)
|
|
||||||
if job is None:
|
|
||||||
return None
|
|
||||||
return self._build_dispatch_snapshot_locked(job)
|
|
||||||
|
|
||||||
def _run_generation_pipeline_job_async(
|
|
||||||
self,
|
|
||||||
version_id: str,
|
|
||||||
runner_staff_account_id: int,
|
|
||||||
runner_name: str,
|
|
||||||
runner_role,
|
|
||||||
) -> None:
|
|
||||||
self._mark_job_running(version_id)
|
|
||||||
try:
|
|
||||||
payload = self._run_generation_pipeline_job(
|
|
||||||
version_id,
|
|
||||||
runner_staff_account_id,
|
|
||||||
runner_name,
|
|
||||||
runner_role,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
self._mark_job_failed(version_id, exc)
|
|
||||||
return
|
|
||||||
self._mark_job_completed(version_id, payload)
|
|
||||||
|
|
||||||
def _mark_job_running(self, version_id: str) -> None:
|
|
||||||
with self._lock:
|
|
||||||
job = self._jobs.get(version_id)
|
|
||||||
if job is None:
|
|
||||||
return
|
|
||||||
job["dispatch_state"] = "running"
|
|
||||||
job["started_at"] = datetime.now(UTC).isoformat()
|
|
||||||
job["worker_thread_name"] = threading.current_thread().name
|
|
||||||
|
|
||||||
def _mark_job_completed(self, version_id: str, payload: dict[str, Any]) -> None:
|
|
||||||
with self._lock:
|
|
||||||
job = self._jobs.get(version_id)
|
|
||||||
if job is None:
|
|
||||||
return
|
|
||||||
completed_at = datetime.now(UTC).isoformat()
|
|
||||||
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
|
||||||
elapsed_ms = None
|
|
||||||
if started_reference is not None:
|
|
||||||
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
|
||||||
job["dispatch_state"] = "completed"
|
|
||||||
job["completed_at"] = completed_at
|
|
||||||
job["elapsed_ms"] = elapsed_ms
|
|
||||||
job["result_payload"] = dict(payload)
|
|
||||||
job["last_error"] = None
|
|
||||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
|
||||||
|
|
||||||
def _mark_job_failed(self, version_id: str, exc: Exception) -> None:
|
|
||||||
with self._lock:
|
|
||||||
job = self._jobs.get(version_id)
|
|
||||||
if job is None:
|
|
||||||
return
|
|
||||||
completed_at = datetime.now(UTC).isoformat()
|
|
||||||
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
|
||||||
elapsed_ms = None
|
|
||||||
if started_reference is not None:
|
|
||||||
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
|
||||||
job["dispatch_state"] = "failed"
|
|
||||||
job["completed_at"] = completed_at
|
|
||||||
job["elapsed_ms"] = elapsed_ms
|
|
||||||
job["last_error"] = f"{type(exc).__name__}: {exc}"
|
|
||||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
|
||||||
|
|
||||||
def _build_dispatch_snapshot_locked(self, job: dict[str, Any]) -> dict[str, Any]:
|
|
||||||
dispatch_state = str(job.get("dispatch_state") or "queued")
|
|
||||||
snapshot = {
|
|
||||||
"mode": "dedicated_generation_worker_async",
|
|
||||||
"target": "admin_tool_generation_worker",
|
|
||||||
"dispatch_state": dispatch_state,
|
|
||||||
"worker_max_workers": self.max_workers,
|
|
||||||
"worker_pending_jobs": self._pending_jobs,
|
|
||||||
"queued_jobs_before_submit": job.get("queued_jobs_before_submit", 0),
|
|
||||||
"submitted_at": job.get("submitted_at"),
|
|
||||||
"started_at": job.get("started_at"),
|
|
||||||
"completed_at": job.get("completed_at"),
|
|
||||||
"elapsed_ms": job.get("elapsed_ms"),
|
|
||||||
"worker_thread_name": job.get("worker_thread_name"),
|
|
||||||
"poll_after_ms": self._DEFAULT_POLL_AFTER_MS if dispatch_state in {"queued", "running"} else None,
|
|
||||||
"last_error": job.get("last_error"),
|
|
||||||
}
|
|
||||||
result_payload = job.get("result_payload")
|
|
||||||
if isinstance(result_payload, dict):
|
|
||||||
snapshot["result_payload"] = dict(result_payload)
|
|
||||||
return snapshot
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_job_timestamp(value: Any) -> datetime | None:
|
|
||||||
if not isinstance(value, str) or not value.strip():
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _run_generation_pipeline_job(
|
|
||||||
self,
|
|
||||||
version_id: str,
|
|
||||||
runner_staff_account_id: int,
|
|
||||||
runner_name: str,
|
|
||||||
runner_role,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
from admin_app.services.tool_management_service import ToolManagementService
|
|
||||||
|
|
||||||
db = AdminSessionLocal()
|
|
||||||
try:
|
|
||||||
service = ToolManagementService(
|
|
||||||
settings=self.settings,
|
|
||||||
draft_repository=ToolDraftRepository(db),
|
|
||||||
version_repository=ToolVersionRepository(db),
|
|
||||||
metadata_repository=ToolMetadataRepository(db),
|
|
||||||
artifact_repository=ToolArtifactRepository(db),
|
|
||||||
tool_generation_service=ToolGenerationService(self.settings),
|
|
||||||
)
|
|
||||||
payload = service.run_generation_pipeline(
|
|
||||||
version_id,
|
|
||||||
runner_staff_account_id=runner_staff_account_id,
|
|
||||||
runner_name=runner_name,
|
|
||||||
runner_role=runner_role,
|
|
||||||
)
|
|
||||||
payload = dict(payload)
|
|
||||||
payload["_worker_thread_name"] = threading.current_thread().name
|
|
||||||
return payload
|
|
||||||
finally:
|
|
||||||
db.close()
|
|
||||||
File diff suppressed because it is too large
Load Diff
@ -1,180 +0,0 @@
|
|||||||
import threading
|
|
||||||
import unittest
|
|
||||||
from types import SimpleNamespace
|
|
||||||
from unittest.mock import Mock, patch
|
|
||||||
|
|
||||||
from admin_app.core import AdminSettings
|
|
||||||
from admin_app.services.tool_generation_worker_service import ToolGenerationWorkerService
|
|
||||||
from admin_app.services.tool_management_service import ToolManagementService
|
|
||||||
from shared.contracts import StaffRole, ToolLifecycleStatus
|
|
||||||
|
|
||||||
|
|
||||||
class ToolGenerationWorkerServiceTests(unittest.TestCase):
|
|
||||||
def test_execute_generation_pipeline_uses_dedicated_worker_metadata(self):
|
|
||||||
worker = ToolGenerationWorkerService(AdminSettings())
|
|
||||||
main_thread_name = threading.current_thread().name
|
|
||||||
|
|
||||||
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
|
||||||
self.assertNotEqual(threading.current_thread().name, main_thread_name)
|
|
||||||
self.assertEqual(version_id, 'tool_version::resumo::v1')
|
|
||||||
self.assertEqual(runner_staff_account_id, 7)
|
|
||||||
self.assertEqual(runner_name, 'Diretoria')
|
|
||||||
self.assertEqual(runner_role, StaffRole.DIRETOR)
|
|
||||||
return {
|
|
||||||
'service': 'admin_tool_governance',
|
|
||||||
'version_id': version_id,
|
|
||||||
'status': 'generated',
|
|
||||||
'_worker_thread_name': threading.current_thread().name,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
|
||||||
payload = worker.execute_generation_pipeline(
|
|
||||||
version_id='tool_version::resumo::v1',
|
|
||||||
runner_staff_account_id=7,
|
|
||||||
runner_name='Diretoria',
|
|
||||||
runner_role=StaffRole.DIRETOR,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
worker.shutdown(wait=True)
|
|
||||||
|
|
||||||
execution = payload['execution']
|
|
||||||
self.assertEqual(execution['mode'], 'dedicated_generation_worker')
|
|
||||||
self.assertEqual(execution['target'], 'admin_tool_generation_worker')
|
|
||||||
self.assertEqual(execution['dispatch_state'], 'completed')
|
|
||||||
self.assertEqual(execution['worker_max_workers'], 1)
|
|
||||||
self.assertEqual(execution['queued_jobs_before_submit'], 0)
|
|
||||||
self.assertEqual(execution['worker_pending_jobs'], 0)
|
|
||||||
self.assertIsNotNone(execution['submitted_at'])
|
|
||||||
self.assertGreaterEqual(execution['elapsed_ms'], 0)
|
|
||||||
self.assertTrue(execution['worker_thread_name'].startswith('admin-tool-generation-worker'))
|
|
||||||
self.assertEqual(payload['version_id'], 'tool_version::resumo::v1')
|
|
||||||
self.assertEqual(payload['status'], 'generated')
|
|
||||||
|
|
||||||
def test_dispatch_generation_pipeline_returns_queued_snapshot_without_waiting_completion(self):
|
|
||||||
worker = ToolGenerationWorkerService(AdminSettings())
|
|
||||||
job_started = threading.Event()
|
|
||||||
release_job = threading.Event()
|
|
||||||
|
|
||||||
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
|
||||||
job_started.set()
|
|
||||||
release_job.wait(timeout=2)
|
|
||||||
return {
|
|
||||||
'service': 'admin_tool_governance',
|
|
||||||
'version_id': version_id,
|
|
||||||
'status': 'generated',
|
|
||||||
'_worker_thread_name': threading.current_thread().name,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
|
||||||
dispatch = worker.dispatch_generation_pipeline(
|
|
||||||
version_id='tool_version::assinc::v1',
|
|
||||||
runner_staff_account_id=9,
|
|
||||||
runner_name='Diretoria',
|
|
||||||
runner_role=StaffRole.DIRETOR,
|
|
||||||
)
|
|
||||||
self.assertEqual(dispatch['mode'], 'dedicated_generation_worker_async')
|
|
||||||
self.assertIn(dispatch['dispatch_state'], {'queued', 'running'})
|
|
||||||
self.assertEqual(dispatch['target'], 'admin_tool_generation_worker')
|
|
||||||
self.assertEqual(dispatch['queued_jobs_before_submit'], 0)
|
|
||||||
self.assertGreaterEqual(dispatch['poll_after_ms'], 1)
|
|
||||||
job_started.wait(timeout=2)
|
|
||||||
running_snapshot = worker.get_generation_pipeline_dispatch('tool_version::assinc::v1')
|
|
||||||
self.assertIsNotNone(running_snapshot)
|
|
||||||
self.assertIn(running_snapshot['dispatch_state'], {'running', 'completed'})
|
|
||||||
release_job.set()
|
|
||||||
release_job.wait(timeout=2)
|
|
||||||
finally:
|
|
||||||
worker.shutdown(wait=True)
|
|
||||||
|
|
||||||
|
|
||||||
class ToolManagementServiceWorkerFallbackTests(unittest.TestCase):
|
|
||||||
def test_run_generation_pipeline_in_worker_falls_back_to_inline_execution_metadata(self):
|
|
||||||
service = ToolManagementService(settings=AdminSettings())
|
|
||||||
|
|
||||||
with patch.object(
|
|
||||||
service,
|
|
||||||
'run_generation_pipeline',
|
|
||||||
return_value={
|
|
||||||
'service': 'admin_tool_governance',
|
|
||||||
'message': 'ok',
|
|
||||||
'version_id': 'tool_version::inline::v1',
|
|
||||||
'status': 'generated',
|
|
||||||
},
|
|
||||||
) as run_generation_pipeline:
|
|
||||||
payload = service.run_generation_pipeline_in_worker(
|
|
||||||
'tool_version::inline::v1',
|
|
||||||
runner_staff_account_id=4,
|
|
||||||
runner_name='Colaborador',
|
|
||||||
runner_role=StaffRole.COLABORADOR,
|
|
||||||
)
|
|
||||||
|
|
||||||
run_generation_pipeline.assert_called_once_with(
|
|
||||||
'tool_version::inline::v1',
|
|
||||||
runner_staff_account_id=4,
|
|
||||||
runner_name='Colaborador',
|
|
||||||
runner_role=StaffRole.COLABORADOR,
|
|
||||||
)
|
|
||||||
self.assertEqual(payload['execution']['mode'], 'inline_admin_service')
|
|
||||||
self.assertEqual(payload['execution']['target'], 'admin_inline_generation_pipeline')
|
|
||||||
self.assertEqual(payload['execution']['dispatch_state'], 'completed')
|
|
||||||
self.assertEqual(payload['execution']['queued_jobs_before_submit'], 0)
|
|
||||||
self.assertIsNone(payload['execution']['worker_max_workers'])
|
|
||||||
self.assertEqual(payload['version_id'], 'tool_version::inline::v1')
|
|
||||||
|
|
||||||
def test_run_generation_pipeline_in_worker_returns_queued_snapshot_when_dedicated_worker_accepts_job(self):
|
|
||||||
version = SimpleNamespace(
|
|
||||||
id=11,
|
|
||||||
version_id='tool_version::fila::v1',
|
|
||||||
tool_name='emitir_resumo_locacao',
|
|
||||||
version_number=1,
|
|
||||||
status=ToolLifecycleStatus.DRAFT,
|
|
||||||
summary='Resumo governado em fila.',
|
|
||||||
owner_display_name='Diretoria',
|
|
||||||
updated_at=None,
|
|
||||||
created_at=None,
|
|
||||||
)
|
|
||||||
draft_repository = Mock()
|
|
||||||
draft_repository.get_by_tool_name.return_value = SimpleNamespace(id=3)
|
|
||||||
version_repository = Mock()
|
|
||||||
version_repository.get_by_version_id.return_value = version
|
|
||||||
version_repository.list_versions.return_value = [version]
|
|
||||||
metadata_repository = Mock()
|
|
||||||
metadata_repository.get_by_tool_version_id.return_value = SimpleNamespace(display_name='Emitir resumo locacao')
|
|
||||||
worker = Mock()
|
|
||||||
worker.dispatch_generation_pipeline.return_value = {
|
|
||||||
'mode': 'dedicated_generation_worker_async',
|
|
||||||
'target': 'admin_tool_generation_worker',
|
|
||||||
'dispatch_state': 'queued',
|
|
||||||
'worker_max_workers': 1,
|
|
||||||
'worker_pending_jobs': 1,
|
|
||||||
'queued_jobs_before_submit': 0,
|
|
||||||
'submitted_at': '2026-04-02T10:00:00+00:00',
|
|
||||||
'started_at': None,
|
|
||||||
'completed_at': None,
|
|
||||||
'elapsed_ms': None,
|
|
||||||
'worker_thread_name': None,
|
|
||||||
'poll_after_ms': 1200,
|
|
||||||
'last_error': None,
|
|
||||||
}
|
|
||||||
service = ToolManagementService(
|
|
||||||
settings=AdminSettings(),
|
|
||||||
draft_repository=draft_repository,
|
|
||||||
version_repository=version_repository,
|
|
||||||
metadata_repository=metadata_repository,
|
|
||||||
tool_generation_worker_service=worker,
|
|
||||||
)
|
|
||||||
|
|
||||||
payload = service.run_generation_pipeline_in_worker(
|
|
||||||
'tool_version::fila::v1',
|
|
||||||
runner_staff_account_id=1,
|
|
||||||
runner_name='Diretoria',
|
|
||||||
runner_role=StaffRole.DIRETOR,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertEqual(payload['status'], ToolLifecycleStatus.DRAFT)
|
|
||||||
self.assertEqual(payload['execution']['dispatch_state'], 'queued')
|
|
||||||
self.assertEqual(payload['queue_entry']['gate'], 'generation_pipeline_queued')
|
|
||||||
self.assertEqual(payload['queue_entry']['automated_validation_status'], 'pending')
|
|
||||||
self.assertIn('request foi liberada', payload['message'])
|
|
||||||
Loading…
Reference in New Issue