Compare commits
2 Commits
640e422498
...
7e380a9c65
| Author | SHA1 | Date |
|---|---|---|
|
|
7e380a9c65 | 2 weeks ago |
|
|
de455b8566 | 2 weeks ago |
@ -0,0 +1,444 @@
|
|||||||
|
"""Serviço isolado de geração de tools via LLM para o runtime administrativo.
|
||||||
|
|
||||||
|
Este módulo é a única camada do admin_app que conversa com o Vertex AI para fins
|
||||||
|
de geração de código. Ele é completamente separado do LLMService do product
|
||||||
|
(app.services.ai.llm_service) e usa configurações próprias do AdminSettings.
|
||||||
|
|
||||||
|
Separação arquitetural garantida por:
|
||||||
|
- shared.contracts.model_runtime_separation.ModelRuntimeTarget.TOOL_GENERATION
|
||||||
|
- config keys: admin_tool_generation_model / admin_tool_generation_fallback_model
|
||||||
|
- Nenhuma importação de app.* é permitida neste módulo.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import vertexai
|
||||||
|
from google.api_core.exceptions import GoogleAPIError, NotFound
|
||||||
|
from vertexai.generative_models import GenerationConfig, GenerativeModel
|
||||||
|
|
||||||
|
from admin_app.core.settings import AdminSettings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ---- Constantes de geração ---------------------------------------------------
|
||||||
|
|
||||||
|
_PYTHON_BLOCK_RE = re.compile(
|
||||||
|
r"```python\s*\n(.*?)```",
|
||||||
|
re.DOTALL | re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Padrões que o código gerado não pode conter.
|
||||||
|
# Aplicados antes das validações automáticas existentes no ToolManagementService.
|
||||||
|
_DANGEROUS_PATTERNS: tuple[tuple[str, str], ...] = (
|
||||||
|
(r"\bexec\s*\(", "uso de exec() proibido em tools geradas"),
|
||||||
|
(r"\beval\s*\(", "uso de eval() proibido em tools geradas"),
|
||||||
|
(r"\b__import__\s*\(", "uso de __import__() proibido em tools geradas"),
|
||||||
|
(r"os\.system\s*\(", "chamada a os.system() proibida em tools geradas"),
|
||||||
|
(r"os\.popen\s*\(", "chamada a os.popen() proibida em tools geradas"),
|
||||||
|
(r"\bsubprocess\b", "uso de subprocess proibido em tools geradas"),
|
||||||
|
(r"from\s+app\.", "importação de app.* proibida em tools geradas"),
|
||||||
|
(r"from\s+admin_app\.", "importação de admin_app.* proibida em tools geradas"),
|
||||||
|
(r"import\s+app\b", "importação direta de app proibida em tools geradas"),
|
||||||
|
(r"import\s+admin_app\b", "importação direta de admin_app proibida em tools geradas"),
|
||||||
|
(r"\bopen\s*\(", "acesso a sistema de arquivos via open() proibido em tools geradas"),
|
||||||
|
(r"__builtins__", "acesso a __builtins__ proibido em tools geradas"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mapeamento de tipo de parâmetro para anotação Python legível
|
||||||
|
_TYPE_ANNOTATION_MAP: dict[str, str] = {
|
||||||
|
"string": "str",
|
||||||
|
"integer": "int",
|
||||||
|
"number": "float",
|
||||||
|
"boolean": "bool",
|
||||||
|
"object": "dict",
|
||||||
|
"array": "list",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Cache de modelos Vertex AI instanciados (por nome de modelo)
|
||||||
|
_MODEL_CACHE: dict[str, GenerativeModel] = {}
|
||||||
|
|
||||||
|
# Flag de controle de inicialização do SDK (evita reinit por instância)
|
||||||
|
_VERTEX_INITIALIZED: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class ToolGenerationService:
|
||||||
|
"""Gera implementações de tools via Vertex AI no contexto administrativo.
|
||||||
|
|
||||||
|
Responsabilidades:
|
||||||
|
- Construir prompt estruturado a partir dos metadados da tool
|
||||||
|
- Chamar o modelo LLM de geração (separado do modelo de atendimento)
|
||||||
|
- Extrair o bloco de código Python da resposta
|
||||||
|
- Aplicar linting de segurança antes de devolver o código
|
||||||
|
- Retornar resultado estruturado para o ToolManagementService
|
||||||
|
|
||||||
|
Não faz:
|
||||||
|
- Não persiste artefatos (responsabilidade do ToolManagementService)
|
||||||
|
- Não valida contrato nem assinatura (responsabilidade do ToolManagementService)
|
||||||
|
- Não executa o código gerado
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, settings: AdminSettings) -> None:
|
||||||
|
self.settings = settings
|
||||||
|
self._ensure_vertex_initialized()
|
||||||
|
|
||||||
|
def _ensure_vertex_initialized(self) -> None:
|
||||||
|
global _VERTEX_INITIALIZED
|
||||||
|
if _VERTEX_INITIALIZED:
|
||||||
|
return
|
||||||
|
# Reutiliza as credenciais do projeto Google já configuradas nas settings
|
||||||
|
# do admin (que leem do .env, idêntico ao product). O isolamento é nos
|
||||||
|
# parâmetros de modelo e temperatura — não na conta GCP.
|
||||||
|
try:
|
||||||
|
import os
|
||||||
|
project_id = os.environ.get("GOOGLE_PROJECT_ID", "")
|
||||||
|
location = os.environ.get("GOOGLE_LOCATION", "us-central1")
|
||||||
|
vertexai.init(project=project_id, location=location)
|
||||||
|
_VERTEX_INITIALIZED = True
|
||||||
|
logger.info(
|
||||||
|
"tool_generation_service_event=vertex_initialized project=%s location=%s",
|
||||||
|
project_id,
|
||||||
|
location,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=vertex_init_warning error=%s",
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_model(self, model_name: str) -> GenerativeModel:
|
||||||
|
model = _MODEL_CACHE.get(model_name)
|
||||||
|
if model is None:
|
||||||
|
model = GenerativeModel(model_name)
|
||||||
|
_MODEL_CACHE[model_name] = model
|
||||||
|
return model
|
||||||
|
|
||||||
|
def _build_model_sequence(self, preferred_model: str | None) -> list[str]:
|
||||||
|
"""Constrói a sequência de modelos a tentar, respeitando o preferred e o fallback."""
|
||||||
|
sequence: list[str] = []
|
||||||
|
candidates = [
|
||||||
|
preferred_model,
|
||||||
|
self.settings.admin_tool_generation_model,
|
||||||
|
self.settings.admin_tool_generation_fallback_model,
|
||||||
|
]
|
||||||
|
for candidate in candidates:
|
||||||
|
normalized = str(candidate or "").strip()
|
||||||
|
if normalized and normalized not in sequence:
|
||||||
|
sequence.append(normalized)
|
||||||
|
return sequence
|
||||||
|
|
||||||
|
def _build_generation_prompt(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
tool_name: str,
|
||||||
|
display_name: str,
|
||||||
|
domain: str,
|
||||||
|
description: str,
|
||||||
|
business_goal: str,
|
||||||
|
parameters: list[dict],
|
||||||
|
) -> str:
|
||||||
|
"""Monta o prompt estruturado de geração enviado ao modelo.
|
||||||
|
|
||||||
|
O prompt descreve o contrato esperado, os restrições de importação,
|
||||||
|
os parâmetros e o objetivo operacional da tool.
|
||||||
|
"""
|
||||||
|
signature_parts: list[str] = []
|
||||||
|
parameter_lines: list[str] = []
|
||||||
|
|
||||||
|
for param in parameters:
|
||||||
|
name = str(param.get("name") or "").strip().lower()
|
||||||
|
if not name:
|
||||||
|
continue
|
||||||
|
param_type = str(param.get("parameter_type") or "string").strip().lower()
|
||||||
|
description_param = str(param.get("description") or "").strip()
|
||||||
|
required = bool(param.get("required", True))
|
||||||
|
annotation = _TYPE_ANNOTATION_MAP.get(param_type, "str")
|
||||||
|
|
||||||
|
if required:
|
||||||
|
signature_parts.append(f"{name}: {annotation}")
|
||||||
|
else:
|
||||||
|
signature_parts.append(f"{name}: {annotation} | None = None")
|
||||||
|
|
||||||
|
required_label = "obrigatório" if required else "opcional"
|
||||||
|
parameter_lines.append(
|
||||||
|
f" - {name} ({annotation}, {required_label}): {description_param}"
|
||||||
|
)
|
||||||
|
|
||||||
|
signature = ", ".join(signature_parts)
|
||||||
|
if signature:
|
||||||
|
full_signature = f"async def run(*, {signature}) -> dict:"
|
||||||
|
else:
|
||||||
|
full_signature = "async def run() -> dict:"
|
||||||
|
|
||||||
|
parameters_block = (
|
||||||
|
"\n".join(parameter_lines)
|
||||||
|
if parameter_lines
|
||||||
|
else " (nenhum parâmetro — a tool não recebe entrada contextual)"
|
||||||
|
)
|
||||||
|
|
||||||
|
domain_context_map = {
|
||||||
|
"vendas": (
|
||||||
|
"O bot atua em um sistema de atendimento para concessionária automotiva. "
|
||||||
|
"A tool opera no domínio de vendas: estoque de veículos, negociações, pedidos e cancelamentos."
|
||||||
|
),
|
||||||
|
"revisao": (
|
||||||
|
"O bot atua em um sistema de atendimento de oficina automotiva. "
|
||||||
|
"A tool opera no domínio de revisão: agendamentos, remarcações, listagem de serviços."
|
||||||
|
),
|
||||||
|
"locacao": (
|
||||||
|
"O bot atua em um sistema de atendimento de locadora de veículos. "
|
||||||
|
"A tool opera no domínio de locação: frota, contratos, pagamentos e devoluções."
|
||||||
|
),
|
||||||
|
"orquestracao": (
|
||||||
|
"O bot atua em um sistema de orquestração conversacional. "
|
||||||
|
"A tool opera no domínio de orquestração: controla fluxo, contexto e estado da conversa."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
domain_context = domain_context_map.get(
|
||||||
|
str(domain or "").strip().lower(),
|
||||||
|
"O bot atua em um sistema de atendimento automatizado.",
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
"Você é um especialista em Python que gera implementações realistas de tools "
|
||||||
|
"para um bot de atendimento.\n\n"
|
||||||
|
f"CONTEXTO DO DOMÍNIO:\n{domain_context}\n\n"
|
||||||
|
"CONTRATO OBRIGATÓRIO:\n"
|
||||||
|
"- A função deve ser assíncrona: async def run(...)\n"
|
||||||
|
"- Todos os parâmetros devem ser keyword-only (após *)\n"
|
||||||
|
"- O tipo de retorno deve ser dict (JSON-serializável)\n"
|
||||||
|
"- O módulo pode importar apenas stdlib (datetime, json, re, math, uuid, etc.)\n"
|
||||||
|
"- Proibido importar: app.*, admin_app.*, subprocess, os.system, os.popen\n"
|
||||||
|
"- Proibido usar: exec(), eval(), __import__(), open()\n\n"
|
||||||
|
"TOOL A IMPLEMENTAR:\n"
|
||||||
|
f"- Nome técnico: {tool_name}\n"
|
||||||
|
f"- Nome de exibição: {display_name}\n"
|
||||||
|
f"- Domínio: {domain}\n"
|
||||||
|
f"- Descrição funcional: {description}\n"
|
||||||
|
f"- Objetivo de negócio: {business_goal}\n\n"
|
||||||
|
f"PARÂMETROS DA TOOL:\n{parameters_block}\n\n"
|
||||||
|
f"ASSINATURA ESPERADA:\n{full_signature}\n\n"
|
||||||
|
"INSTRUÇÕES DE GERAÇÃO:\n"
|
||||||
|
"- Gere uma implementação realista que simule o comportamento esperado da tool.\n"
|
||||||
|
"- O retorno deve incluir os campos relevantes ao domínio (não apenas echo dos argumentos).\n"
|
||||||
|
"- Use dados fictícios mas verossímeis para simular a resposta operacional.\n"
|
||||||
|
"- Nenhuma explicação ou comentário fora do código. Retorne apenas o bloco Python.\n"
|
||||||
|
"- O módulo deve começar com um docstring descritivo.\n"
|
||||||
|
"- Envolva o código em ```python ... ```.\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _extract_python_block(self, raw_response: str) -> str | None:
|
||||||
|
"""Extrai o primeiro bloco ```python ... ``` da resposta do modelo."""
|
||||||
|
normalized = str(raw_response or "").strip()
|
||||||
|
match = _PYTHON_BLOCK_RE.search(normalized)
|
||||||
|
if match:
|
||||||
|
return match.group(1).strip()
|
||||||
|
# Fallback: se não há marcador de código mas o conteúdo parece Python
|
||||||
|
if normalized.startswith("async def run") or normalized.startswith('"""'):
|
||||||
|
return normalized
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _apply_safety_linting(self, source_code: str) -> list[str]:
|
||||||
|
"""Verifica padrões perigosos no código gerado antes da validação formal.
|
||||||
|
|
||||||
|
Retorna lista de issues. Lista vazia = linting passou.
|
||||||
|
"""
|
||||||
|
issues: list[str] = []
|
||||||
|
for pattern, description in _DANGEROUS_PATTERNS:
|
||||||
|
if re.search(pattern, source_code, re.MULTILINE):
|
||||||
|
issues.append(f"linting: {description}.")
|
||||||
|
return issues
|
||||||
|
|
||||||
|
async def generate_tool_source(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
tool_name: str,
|
||||||
|
display_name: str,
|
||||||
|
domain: str,
|
||||||
|
description: str,
|
||||||
|
business_goal: str,
|
||||||
|
parameters: list[dict],
|
||||||
|
preferred_model: str | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Gera o código Python da tool a partir dos metadados do draft.
|
||||||
|
|
||||||
|
Retorna um dicionário com:
|
||||||
|
- passed (bool): True se o código foi gerado e passou no linting
|
||||||
|
- generated_source_code (str | None): código Python gerado
|
||||||
|
- generation_model_used (str | None): modelo que gerou o código
|
||||||
|
- prompt_rendered (str): prompt enviado ao modelo (para auditoria)
|
||||||
|
- issues (list[str]): problemas encontrados (geração ou linting)
|
||||||
|
- elapsed_ms (float): tempo total de geração em milissegundos
|
||||||
|
"""
|
||||||
|
prompt = self._build_generation_prompt(
|
||||||
|
tool_name=tool_name,
|
||||||
|
display_name=display_name,
|
||||||
|
domain=domain,
|
||||||
|
description=description,
|
||||||
|
business_goal=business_goal,
|
||||||
|
parameters=parameters,
|
||||||
|
)
|
||||||
|
|
||||||
|
model_sequence = self._build_model_sequence(preferred_model)
|
||||||
|
generation_config = GenerationConfig(
|
||||||
|
temperature=self.settings.admin_tool_generation_temperature,
|
||||||
|
max_output_tokens=self.settings.admin_tool_generation_max_output_tokens,
|
||||||
|
)
|
||||||
|
|
||||||
|
raw_response: str | None = None
|
||||||
|
generation_model_used: str | None = None
|
||||||
|
last_error: Exception | None = None
|
||||||
|
started_at = perf_counter()
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
for model_name in model_sequence:
|
||||||
|
try:
|
||||||
|
model = self._get_model(model_name)
|
||||||
|
response = await asyncio.wait_for(
|
||||||
|
asyncio.to_thread(
|
||||||
|
model.generate_content,
|
||||||
|
prompt,
|
||||||
|
generation_config=generation_config,
|
||||||
|
),
|
||||||
|
timeout=float(self.settings.admin_tool_generation_timeout_seconds),
|
||||||
|
)
|
||||||
|
candidate = (
|
||||||
|
response.candidates[0]
|
||||||
|
if getattr(response, "candidates", None)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
content = getattr(candidate, "content", None)
|
||||||
|
parts = list(getattr(content, "parts", None) or [])
|
||||||
|
text_parts = [
|
||||||
|
getattr(part, "text", None)
|
||||||
|
for part in parts
|
||||||
|
if isinstance(getattr(part, "text", None), str)
|
||||||
|
]
|
||||||
|
raw_response = "\n".join(
|
||||||
|
t for t in text_parts if t and t.strip()
|
||||||
|
).strip() or None
|
||||||
|
|
||||||
|
if raw_response is None:
|
||||||
|
# Fallback para o atributo .text raiz
|
||||||
|
try:
|
||||||
|
raw_response = str(response.text or "").strip() or None
|
||||||
|
except (AttributeError, ValueError):
|
||||||
|
raw_response = None
|
||||||
|
|
||||||
|
generation_model_used = model_name
|
||||||
|
break
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
last_error = TimeoutError(
|
||||||
|
f"modelo '{model_name}' excedeu o timeout de "
|
||||||
|
f"{self.settings.admin_tool_generation_timeout_seconds}s para geração de tools."
|
||||||
|
)
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=timeout model=%s timeout_seconds=%s",
|
||||||
|
model_name,
|
||||||
|
self.settings.admin_tool_generation_timeout_seconds,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
except NotFound as exc:
|
||||||
|
last_error = exc
|
||||||
|
_MODEL_CACHE.pop(model_name, None)
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=model_not_found model=%s error=%s",
|
||||||
|
model_name,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
except GoogleAPIError as exc:
|
||||||
|
last_error = exc
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=api_error model=%s error=%s",
|
||||||
|
model_name,
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
last_error = exc
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=unexpected_error model=%s error=%s class=%s",
|
||||||
|
model_name,
|
||||||
|
exc,
|
||||||
|
exc.__class__.__name__,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
|
||||||
|
|
||||||
|
if raw_response is None or generation_model_used is None:
|
||||||
|
error_detail = str(last_error) if last_error else "nenhum modelo disponivel respondeu"
|
||||||
|
logger.error(
|
||||||
|
"tool_generation_service_event=generation_failed tool_name=%s elapsed_ms=%s error=%s",
|
||||||
|
tool_name,
|
||||||
|
elapsed_ms,
|
||||||
|
error_detail,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"passed": False,
|
||||||
|
"generated_source_code": None,
|
||||||
|
"generation_model_used": None,
|
||||||
|
"prompt_rendered": prompt,
|
||||||
|
"issues": [f"falha na geração via LLM: {error_detail}"],
|
||||||
|
"elapsed_ms": elapsed_ms,
|
||||||
|
}
|
||||||
|
|
||||||
|
generated_source_code = self._extract_python_block(raw_response)
|
||||||
|
if generated_source_code is None:
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=no_code_block tool_name=%s model=%s elapsed_ms=%s",
|
||||||
|
tool_name,
|
||||||
|
generation_model_used,
|
||||||
|
elapsed_ms,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"passed": False,
|
||||||
|
"generated_source_code": None,
|
||||||
|
"generation_model_used": generation_model_used,
|
||||||
|
"prompt_rendered": prompt,
|
||||||
|
"issues": ["o modelo não retornou um bloco de código Python identificável."],
|
||||||
|
"elapsed_ms": elapsed_ms,
|
||||||
|
}
|
||||||
|
|
||||||
|
linting_issues = self._apply_safety_linting(generated_source_code)
|
||||||
|
if linting_issues:
|
||||||
|
logger.warning(
|
||||||
|
"tool_generation_service_event=linting_failed tool_name=%s model=%s issues=%s elapsed_ms=%s",
|
||||||
|
tool_name,
|
||||||
|
generation_model_used,
|
||||||
|
linting_issues,
|
||||||
|
elapsed_ms,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"passed": False,
|
||||||
|
"generated_source_code": generated_source_code,
|
||||||
|
"generation_model_used": generation_model_used,
|
||||||
|
"prompt_rendered": prompt,
|
||||||
|
"issues": linting_issues,
|
||||||
|
"elapsed_ms": elapsed_ms,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"tool_generation_service_event=generation_succeeded tool_name=%s model=%s elapsed_ms=%s",
|
||||||
|
tool_name,
|
||||||
|
generation_model_used,
|
||||||
|
elapsed_ms,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"passed": True,
|
||||||
|
"generated_source_code": generated_source_code,
|
||||||
|
"generation_model_used": generation_model_used,
|
||||||
|
"prompt_rendered": prompt,
|
||||||
|
"issues": [],
|
||||||
|
"elapsed_ms": elapsed_ms,
|
||||||
|
}
|
||||||
@ -0,0 +1,266 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from admin_app.core.settings import AdminSettings
|
||||||
|
from admin_app.db.database import AdminSessionLocal
|
||||||
|
from admin_app.repositories import (
|
||||||
|
ToolArtifactRepository,
|
||||||
|
ToolDraftRepository,
|
||||||
|
ToolMetadataRepository,
|
||||||
|
ToolVersionRepository,
|
||||||
|
)
|
||||||
|
from admin_app.services.tool_generation_service import ToolGenerationService
|
||||||
|
|
||||||
|
|
||||||
|
class ToolGenerationWorkerService:
|
||||||
|
"""Executa a pipeline de geracao em um worker dedicado do runtime admin.
|
||||||
|
|
||||||
|
O worker abre a propria sessao administrativa e cria uma instancia isolada do
|
||||||
|
ToolManagementService dentro da thread dedicada. Assim, a geracao e as
|
||||||
|
validacoes nao compartilham a sessao SQLAlchemy da request web nem o pool de
|
||||||
|
threads padrao usado pelas rotas sync do FastAPI.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_THREAD_NAME_PREFIX = "admin-tool-generation-worker"
|
||||||
|
_DEFAULT_POLL_AFTER_MS = 1200
|
||||||
|
|
||||||
|
def __init__(self, settings: AdminSettings) -> None:
|
||||||
|
self.settings = settings
|
||||||
|
self.max_workers = max(1, int(settings.admin_tool_generation_worker_max_workers))
|
||||||
|
self._executor = ThreadPoolExecutor(
|
||||||
|
max_workers=self.max_workers,
|
||||||
|
thread_name_prefix=self._THREAD_NAME_PREFIX,
|
||||||
|
)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._pending_jobs = 0
|
||||||
|
self._jobs: dict[str, dict[str, Any]] = {}
|
||||||
|
|
||||||
|
def shutdown(self, *, wait: bool = False) -> None:
|
||||||
|
self._executor.shutdown(wait=wait, cancel_futures=True)
|
||||||
|
|
||||||
|
def execute_generation_pipeline(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
version_id: str,
|
||||||
|
runner_staff_account_id: int,
|
||||||
|
runner_name: str,
|
||||||
|
runner_role,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
submitted_at = datetime.now(UTC).isoformat()
|
||||||
|
with self._lock:
|
||||||
|
self._pending_jobs += 1
|
||||||
|
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
||||||
|
|
||||||
|
started_at = perf_counter()
|
||||||
|
future = self._executor.submit(
|
||||||
|
self._run_generation_pipeline_job,
|
||||||
|
version_id,
|
||||||
|
runner_staff_account_id,
|
||||||
|
runner_name,
|
||||||
|
runner_role,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
payload = future.result()
|
||||||
|
finally:
|
||||||
|
with self._lock:
|
||||||
|
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||||
|
pending_jobs_after_completion = self._pending_jobs
|
||||||
|
|
||||||
|
execution = {
|
||||||
|
"mode": "dedicated_generation_worker",
|
||||||
|
"target": "admin_tool_generation_worker",
|
||||||
|
"dispatch_state": "completed",
|
||||||
|
"worker_max_workers": self.max_workers,
|
||||||
|
"worker_pending_jobs": pending_jobs_after_completion,
|
||||||
|
"queued_jobs_before_submit": queued_jobs_before_submit,
|
||||||
|
"submitted_at": submitted_at,
|
||||||
|
"started_at": submitted_at,
|
||||||
|
"completed_at": datetime.now(UTC).isoformat(),
|
||||||
|
"elapsed_ms": round((perf_counter() - started_at) * 1000, 2),
|
||||||
|
"worker_thread_name": str(payload.pop("_worker_thread_name", "")) or None,
|
||||||
|
"poll_after_ms": None,
|
||||||
|
"last_error": None,
|
||||||
|
}
|
||||||
|
enriched_payload = dict(payload)
|
||||||
|
enriched_payload["execution"] = execution
|
||||||
|
return enriched_payload
|
||||||
|
|
||||||
|
def dispatch_generation_pipeline(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
version_id: str,
|
||||||
|
runner_staff_account_id: int,
|
||||||
|
runner_name: str,
|
||||||
|
runner_role,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
normalized_version_id = str(version_id or "").strip().lower()
|
||||||
|
if not normalized_version_id:
|
||||||
|
raise ValueError("Versao administrativa invalida para o worker de geracao.")
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
existing_job = self._jobs.get(normalized_version_id)
|
||||||
|
if existing_job is not None and existing_job.get("dispatch_state") in {"queued", "running"}:
|
||||||
|
return self._build_dispatch_snapshot_locked(existing_job)
|
||||||
|
|
||||||
|
self._pending_jobs += 1
|
||||||
|
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
||||||
|
job = {
|
||||||
|
"version_id": normalized_version_id,
|
||||||
|
"dispatch_state": "queued",
|
||||||
|
"queued_jobs_before_submit": queued_jobs_before_submit,
|
||||||
|
"submitted_at": datetime.now(UTC).isoformat(),
|
||||||
|
"started_at": None,
|
||||||
|
"completed_at": None,
|
||||||
|
"elapsed_ms": None,
|
||||||
|
"worker_thread_name": None,
|
||||||
|
"last_error": None,
|
||||||
|
"result_payload": None,
|
||||||
|
}
|
||||||
|
self._jobs[normalized_version_id] = job
|
||||||
|
self._executor.submit(
|
||||||
|
self._run_generation_pipeline_job_async,
|
||||||
|
normalized_version_id,
|
||||||
|
runner_staff_account_id,
|
||||||
|
runner_name,
|
||||||
|
runner_role,
|
||||||
|
)
|
||||||
|
return self._build_dispatch_snapshot_locked(job)
|
||||||
|
|
||||||
|
def get_generation_pipeline_dispatch(self, version_id: str) -> dict[str, Any] | None:
|
||||||
|
normalized_version_id = str(version_id or "").strip().lower()
|
||||||
|
if not normalized_version_id:
|
||||||
|
return None
|
||||||
|
with self._lock:
|
||||||
|
job = self._jobs.get(normalized_version_id)
|
||||||
|
if job is None:
|
||||||
|
return None
|
||||||
|
return self._build_dispatch_snapshot_locked(job)
|
||||||
|
|
||||||
|
def _run_generation_pipeline_job_async(
|
||||||
|
self,
|
||||||
|
version_id: str,
|
||||||
|
runner_staff_account_id: int,
|
||||||
|
runner_name: str,
|
||||||
|
runner_role,
|
||||||
|
) -> None:
|
||||||
|
self._mark_job_running(version_id)
|
||||||
|
try:
|
||||||
|
payload = self._run_generation_pipeline_job(
|
||||||
|
version_id,
|
||||||
|
runner_staff_account_id,
|
||||||
|
runner_name,
|
||||||
|
runner_role,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
self._mark_job_failed(version_id, exc)
|
||||||
|
return
|
||||||
|
self._mark_job_completed(version_id, payload)
|
||||||
|
|
||||||
|
def _mark_job_running(self, version_id: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
job = self._jobs.get(version_id)
|
||||||
|
if job is None:
|
||||||
|
return
|
||||||
|
job["dispatch_state"] = "running"
|
||||||
|
job["started_at"] = datetime.now(UTC).isoformat()
|
||||||
|
job["worker_thread_name"] = threading.current_thread().name
|
||||||
|
|
||||||
|
def _mark_job_completed(self, version_id: str, payload: dict[str, Any]) -> None:
|
||||||
|
with self._lock:
|
||||||
|
job = self._jobs.get(version_id)
|
||||||
|
if job is None:
|
||||||
|
return
|
||||||
|
completed_at = datetime.now(UTC).isoformat()
|
||||||
|
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
||||||
|
elapsed_ms = None
|
||||||
|
if started_reference is not None:
|
||||||
|
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
||||||
|
job["dispatch_state"] = "completed"
|
||||||
|
job["completed_at"] = completed_at
|
||||||
|
job["elapsed_ms"] = elapsed_ms
|
||||||
|
job["result_payload"] = dict(payload)
|
||||||
|
job["last_error"] = None
|
||||||
|
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||||
|
|
||||||
|
def _mark_job_failed(self, version_id: str, exc: Exception) -> None:
|
||||||
|
with self._lock:
|
||||||
|
job = self._jobs.get(version_id)
|
||||||
|
if job is None:
|
||||||
|
return
|
||||||
|
completed_at = datetime.now(UTC).isoformat()
|
||||||
|
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
||||||
|
elapsed_ms = None
|
||||||
|
if started_reference is not None:
|
||||||
|
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
||||||
|
job["dispatch_state"] = "failed"
|
||||||
|
job["completed_at"] = completed_at
|
||||||
|
job["elapsed_ms"] = elapsed_ms
|
||||||
|
job["last_error"] = f"{type(exc).__name__}: {exc}"
|
||||||
|
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||||
|
|
||||||
|
def _build_dispatch_snapshot_locked(self, job: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
dispatch_state = str(job.get("dispatch_state") or "queued")
|
||||||
|
snapshot = {
|
||||||
|
"mode": "dedicated_generation_worker_async",
|
||||||
|
"target": "admin_tool_generation_worker",
|
||||||
|
"dispatch_state": dispatch_state,
|
||||||
|
"worker_max_workers": self.max_workers,
|
||||||
|
"worker_pending_jobs": self._pending_jobs,
|
||||||
|
"queued_jobs_before_submit": job.get("queued_jobs_before_submit", 0),
|
||||||
|
"submitted_at": job.get("submitted_at"),
|
||||||
|
"started_at": job.get("started_at"),
|
||||||
|
"completed_at": job.get("completed_at"),
|
||||||
|
"elapsed_ms": job.get("elapsed_ms"),
|
||||||
|
"worker_thread_name": job.get("worker_thread_name"),
|
||||||
|
"poll_after_ms": self._DEFAULT_POLL_AFTER_MS if dispatch_state in {"queued", "running"} else None,
|
||||||
|
"last_error": job.get("last_error"),
|
||||||
|
}
|
||||||
|
result_payload = job.get("result_payload")
|
||||||
|
if isinstance(result_payload, dict):
|
||||||
|
snapshot["result_payload"] = dict(result_payload)
|
||||||
|
return snapshot
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_job_timestamp(value: Any) -> datetime | None:
|
||||||
|
if not isinstance(value, str) or not value.strip():
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _run_generation_pipeline_job(
|
||||||
|
self,
|
||||||
|
version_id: str,
|
||||||
|
runner_staff_account_id: int,
|
||||||
|
runner_name: str,
|
||||||
|
runner_role,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
from admin_app.services.tool_management_service import ToolManagementService
|
||||||
|
|
||||||
|
db = AdminSessionLocal()
|
||||||
|
try:
|
||||||
|
service = ToolManagementService(
|
||||||
|
settings=self.settings,
|
||||||
|
draft_repository=ToolDraftRepository(db),
|
||||||
|
version_repository=ToolVersionRepository(db),
|
||||||
|
metadata_repository=ToolMetadataRepository(db),
|
||||||
|
artifact_repository=ToolArtifactRepository(db),
|
||||||
|
tool_generation_service=ToolGenerationService(self.settings),
|
||||||
|
)
|
||||||
|
payload = service.run_generation_pipeline(
|
||||||
|
version_id,
|
||||||
|
runner_staff_account_id=runner_staff_account_id,
|
||||||
|
runner_name=runner_name,
|
||||||
|
runner_role=runner_role,
|
||||||
|
)
|
||||||
|
payload = dict(payload)
|
||||||
|
payload["_worker_thread_name"] = threading.current_thread().name
|
||||||
|
return payload
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,180 @@
|
|||||||
|
import threading
|
||||||
|
import unittest
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
from admin_app.core import AdminSettings
|
||||||
|
from admin_app.services.tool_generation_worker_service import ToolGenerationWorkerService
|
||||||
|
from admin_app.services.tool_management_service import ToolManagementService
|
||||||
|
from shared.contracts import StaffRole, ToolLifecycleStatus
|
||||||
|
|
||||||
|
|
||||||
|
class ToolGenerationWorkerServiceTests(unittest.TestCase):
|
||||||
|
def test_execute_generation_pipeline_uses_dedicated_worker_metadata(self):
|
||||||
|
worker = ToolGenerationWorkerService(AdminSettings())
|
||||||
|
main_thread_name = threading.current_thread().name
|
||||||
|
|
||||||
|
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
||||||
|
self.assertNotEqual(threading.current_thread().name, main_thread_name)
|
||||||
|
self.assertEqual(version_id, 'tool_version::resumo::v1')
|
||||||
|
self.assertEqual(runner_staff_account_id, 7)
|
||||||
|
self.assertEqual(runner_name, 'Diretoria')
|
||||||
|
self.assertEqual(runner_role, StaffRole.DIRETOR)
|
||||||
|
return {
|
||||||
|
'service': 'admin_tool_governance',
|
||||||
|
'version_id': version_id,
|
||||||
|
'status': 'generated',
|
||||||
|
'_worker_thread_name': threading.current_thread().name,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
||||||
|
payload = worker.execute_generation_pipeline(
|
||||||
|
version_id='tool_version::resumo::v1',
|
||||||
|
runner_staff_account_id=7,
|
||||||
|
runner_name='Diretoria',
|
||||||
|
runner_role=StaffRole.DIRETOR,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
worker.shutdown(wait=True)
|
||||||
|
|
||||||
|
execution = payload['execution']
|
||||||
|
self.assertEqual(execution['mode'], 'dedicated_generation_worker')
|
||||||
|
self.assertEqual(execution['target'], 'admin_tool_generation_worker')
|
||||||
|
self.assertEqual(execution['dispatch_state'], 'completed')
|
||||||
|
self.assertEqual(execution['worker_max_workers'], 1)
|
||||||
|
self.assertEqual(execution['queued_jobs_before_submit'], 0)
|
||||||
|
self.assertEqual(execution['worker_pending_jobs'], 0)
|
||||||
|
self.assertIsNotNone(execution['submitted_at'])
|
||||||
|
self.assertGreaterEqual(execution['elapsed_ms'], 0)
|
||||||
|
self.assertTrue(execution['worker_thread_name'].startswith('admin-tool-generation-worker'))
|
||||||
|
self.assertEqual(payload['version_id'], 'tool_version::resumo::v1')
|
||||||
|
self.assertEqual(payload['status'], 'generated')
|
||||||
|
|
||||||
|
def test_dispatch_generation_pipeline_returns_queued_snapshot_without_waiting_completion(self):
|
||||||
|
worker = ToolGenerationWorkerService(AdminSettings())
|
||||||
|
job_started = threading.Event()
|
||||||
|
release_job = threading.Event()
|
||||||
|
|
||||||
|
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
||||||
|
job_started.set()
|
||||||
|
release_job.wait(timeout=2)
|
||||||
|
return {
|
||||||
|
'service': 'admin_tool_governance',
|
||||||
|
'version_id': version_id,
|
||||||
|
'status': 'generated',
|
||||||
|
'_worker_thread_name': threading.current_thread().name,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
||||||
|
dispatch = worker.dispatch_generation_pipeline(
|
||||||
|
version_id='tool_version::assinc::v1',
|
||||||
|
runner_staff_account_id=9,
|
||||||
|
runner_name='Diretoria',
|
||||||
|
runner_role=StaffRole.DIRETOR,
|
||||||
|
)
|
||||||
|
self.assertEqual(dispatch['mode'], 'dedicated_generation_worker_async')
|
||||||
|
self.assertIn(dispatch['dispatch_state'], {'queued', 'running'})
|
||||||
|
self.assertEqual(dispatch['target'], 'admin_tool_generation_worker')
|
||||||
|
self.assertEqual(dispatch['queued_jobs_before_submit'], 0)
|
||||||
|
self.assertGreaterEqual(dispatch['poll_after_ms'], 1)
|
||||||
|
job_started.wait(timeout=2)
|
||||||
|
running_snapshot = worker.get_generation_pipeline_dispatch('tool_version::assinc::v1')
|
||||||
|
self.assertIsNotNone(running_snapshot)
|
||||||
|
self.assertIn(running_snapshot['dispatch_state'], {'running', 'completed'})
|
||||||
|
release_job.set()
|
||||||
|
release_job.wait(timeout=2)
|
||||||
|
finally:
|
||||||
|
worker.shutdown(wait=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ToolManagementServiceWorkerFallbackTests(unittest.TestCase):
|
||||||
|
def test_run_generation_pipeline_in_worker_falls_back_to_inline_execution_metadata(self):
|
||||||
|
service = ToolManagementService(settings=AdminSettings())
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
service,
|
||||||
|
'run_generation_pipeline',
|
||||||
|
return_value={
|
||||||
|
'service': 'admin_tool_governance',
|
||||||
|
'message': 'ok',
|
||||||
|
'version_id': 'tool_version::inline::v1',
|
||||||
|
'status': 'generated',
|
||||||
|
},
|
||||||
|
) as run_generation_pipeline:
|
||||||
|
payload = service.run_generation_pipeline_in_worker(
|
||||||
|
'tool_version::inline::v1',
|
||||||
|
runner_staff_account_id=4,
|
||||||
|
runner_name='Colaborador',
|
||||||
|
runner_role=StaffRole.COLABORADOR,
|
||||||
|
)
|
||||||
|
|
||||||
|
run_generation_pipeline.assert_called_once_with(
|
||||||
|
'tool_version::inline::v1',
|
||||||
|
runner_staff_account_id=4,
|
||||||
|
runner_name='Colaborador',
|
||||||
|
runner_role=StaffRole.COLABORADOR,
|
||||||
|
)
|
||||||
|
self.assertEqual(payload['execution']['mode'], 'inline_admin_service')
|
||||||
|
self.assertEqual(payload['execution']['target'], 'admin_inline_generation_pipeline')
|
||||||
|
self.assertEqual(payload['execution']['dispatch_state'], 'completed')
|
||||||
|
self.assertEqual(payload['execution']['queued_jobs_before_submit'], 0)
|
||||||
|
self.assertIsNone(payload['execution']['worker_max_workers'])
|
||||||
|
self.assertEqual(payload['version_id'], 'tool_version::inline::v1')
|
||||||
|
|
||||||
|
def test_run_generation_pipeline_in_worker_returns_queued_snapshot_when_dedicated_worker_accepts_job(self):
|
||||||
|
version = SimpleNamespace(
|
||||||
|
id=11,
|
||||||
|
version_id='tool_version::fila::v1',
|
||||||
|
tool_name='emitir_resumo_locacao',
|
||||||
|
version_number=1,
|
||||||
|
status=ToolLifecycleStatus.DRAFT,
|
||||||
|
summary='Resumo governado em fila.',
|
||||||
|
owner_display_name='Diretoria',
|
||||||
|
updated_at=None,
|
||||||
|
created_at=None,
|
||||||
|
)
|
||||||
|
draft_repository = Mock()
|
||||||
|
draft_repository.get_by_tool_name.return_value = SimpleNamespace(id=3)
|
||||||
|
version_repository = Mock()
|
||||||
|
version_repository.get_by_version_id.return_value = version
|
||||||
|
version_repository.list_versions.return_value = [version]
|
||||||
|
metadata_repository = Mock()
|
||||||
|
metadata_repository.get_by_tool_version_id.return_value = SimpleNamespace(display_name='Emitir resumo locacao')
|
||||||
|
worker = Mock()
|
||||||
|
worker.dispatch_generation_pipeline.return_value = {
|
||||||
|
'mode': 'dedicated_generation_worker_async',
|
||||||
|
'target': 'admin_tool_generation_worker',
|
||||||
|
'dispatch_state': 'queued',
|
||||||
|
'worker_max_workers': 1,
|
||||||
|
'worker_pending_jobs': 1,
|
||||||
|
'queued_jobs_before_submit': 0,
|
||||||
|
'submitted_at': '2026-04-02T10:00:00+00:00',
|
||||||
|
'started_at': None,
|
||||||
|
'completed_at': None,
|
||||||
|
'elapsed_ms': None,
|
||||||
|
'worker_thread_name': None,
|
||||||
|
'poll_after_ms': 1200,
|
||||||
|
'last_error': None,
|
||||||
|
}
|
||||||
|
service = ToolManagementService(
|
||||||
|
settings=AdminSettings(),
|
||||||
|
draft_repository=draft_repository,
|
||||||
|
version_repository=version_repository,
|
||||||
|
metadata_repository=metadata_repository,
|
||||||
|
tool_generation_worker_service=worker,
|
||||||
|
)
|
||||||
|
|
||||||
|
payload = service.run_generation_pipeline_in_worker(
|
||||||
|
'tool_version::fila::v1',
|
||||||
|
runner_staff_account_id=1,
|
||||||
|
runner_name='Diretoria',
|
||||||
|
runner_role=StaffRole.DIRETOR,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(payload['status'], ToolLifecycleStatus.DRAFT)
|
||||||
|
self.assertEqual(payload['execution']['dispatch_state'], 'queued')
|
||||||
|
self.assertEqual(payload['queue_entry']['gate'], 'generation_pipeline_queued')
|
||||||
|
self.assertEqual(payload['queue_entry']['automated_validation_status'], 'pending')
|
||||||
|
self.assertIn('request foi liberada', payload['message'])
|
||||||
Loading…
Reference in New Issue