Compare commits
2 Commits
640e422498
...
7e380a9c65
| Author | SHA1 | Date |
|---|---|---|
|
|
7e380a9c65 | 1 week ago |
|
|
de455b8566 | 1 week ago |
@ -0,0 +1,444 @@
|
||||
"""Serviço isolado de geração de tools via LLM para o runtime administrativo.
|
||||
|
||||
Este módulo é a única camada do admin_app que conversa com o Vertex AI para fins
|
||||
de geração de código. Ele é completamente separado do LLMService do product
|
||||
(app.services.ai.llm_service) e usa configurações próprias do AdminSettings.
|
||||
|
||||
Separação arquitetural garantida por:
|
||||
- shared.contracts.model_runtime_separation.ModelRuntimeTarget.TOOL_GENERATION
|
||||
- config keys: admin_tool_generation_model / admin_tool_generation_fallback_model
|
||||
- Nenhuma importação de app.* é permitida neste módulo.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from time import perf_counter
|
||||
from typing import Any
|
||||
|
||||
import vertexai
|
||||
from google.api_core.exceptions import GoogleAPIError, NotFound
|
||||
from vertexai.generative_models import GenerationConfig, GenerativeModel
|
||||
|
||||
from admin_app.core.settings import AdminSettings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---- Constantes de geração ---------------------------------------------------
|
||||
|
||||
_PYTHON_BLOCK_RE = re.compile(
|
||||
r"```python\s*\n(.*?)```",
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Padrões que o código gerado não pode conter.
|
||||
# Aplicados antes das validações automáticas existentes no ToolManagementService.
|
||||
_DANGEROUS_PATTERNS: tuple[tuple[str, str], ...] = (
|
||||
(r"\bexec\s*\(", "uso de exec() proibido em tools geradas"),
|
||||
(r"\beval\s*\(", "uso de eval() proibido em tools geradas"),
|
||||
(r"\b__import__\s*\(", "uso de __import__() proibido em tools geradas"),
|
||||
(r"os\.system\s*\(", "chamada a os.system() proibida em tools geradas"),
|
||||
(r"os\.popen\s*\(", "chamada a os.popen() proibida em tools geradas"),
|
||||
(r"\bsubprocess\b", "uso de subprocess proibido em tools geradas"),
|
||||
(r"from\s+app\.", "importação de app.* proibida em tools geradas"),
|
||||
(r"from\s+admin_app\.", "importação de admin_app.* proibida em tools geradas"),
|
||||
(r"import\s+app\b", "importação direta de app proibida em tools geradas"),
|
||||
(r"import\s+admin_app\b", "importação direta de admin_app proibida em tools geradas"),
|
||||
(r"\bopen\s*\(", "acesso a sistema de arquivos via open() proibido em tools geradas"),
|
||||
(r"__builtins__", "acesso a __builtins__ proibido em tools geradas"),
|
||||
)
|
||||
|
||||
# Mapeamento de tipo de parâmetro para anotação Python legível
|
||||
_TYPE_ANNOTATION_MAP: dict[str, str] = {
|
||||
"string": "str",
|
||||
"integer": "int",
|
||||
"number": "float",
|
||||
"boolean": "bool",
|
||||
"object": "dict",
|
||||
"array": "list",
|
||||
}
|
||||
|
||||
# Cache de modelos Vertex AI instanciados (por nome de modelo)
|
||||
_MODEL_CACHE: dict[str, GenerativeModel] = {}
|
||||
|
||||
# Flag de controle de inicialização do SDK (evita reinit por instância)
|
||||
_VERTEX_INITIALIZED: bool = False
|
||||
|
||||
|
||||
class ToolGenerationService:
|
||||
"""Gera implementações de tools via Vertex AI no contexto administrativo.
|
||||
|
||||
Responsabilidades:
|
||||
- Construir prompt estruturado a partir dos metadados da tool
|
||||
- Chamar o modelo LLM de geração (separado do modelo de atendimento)
|
||||
- Extrair o bloco de código Python da resposta
|
||||
- Aplicar linting de segurança antes de devolver o código
|
||||
- Retornar resultado estruturado para o ToolManagementService
|
||||
|
||||
Não faz:
|
||||
- Não persiste artefatos (responsabilidade do ToolManagementService)
|
||||
- Não valida contrato nem assinatura (responsabilidade do ToolManagementService)
|
||||
- Não executa o código gerado
|
||||
"""
|
||||
|
||||
def __init__(self, settings: AdminSettings) -> None:
|
||||
self.settings = settings
|
||||
self._ensure_vertex_initialized()
|
||||
|
||||
def _ensure_vertex_initialized(self) -> None:
|
||||
global _VERTEX_INITIALIZED
|
||||
if _VERTEX_INITIALIZED:
|
||||
return
|
||||
# Reutiliza as credenciais do projeto Google já configuradas nas settings
|
||||
# do admin (que leem do .env, idêntico ao product). O isolamento é nos
|
||||
# parâmetros de modelo e temperatura — não na conta GCP.
|
||||
try:
|
||||
import os
|
||||
project_id = os.environ.get("GOOGLE_PROJECT_ID", "")
|
||||
location = os.environ.get("GOOGLE_LOCATION", "us-central1")
|
||||
vertexai.init(project=project_id, location=location)
|
||||
_VERTEX_INITIALIZED = True
|
||||
logger.info(
|
||||
"tool_generation_service_event=vertex_initialized project=%s location=%s",
|
||||
project_id,
|
||||
location,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"tool_generation_service_event=vertex_init_warning error=%s",
|
||||
exc,
|
||||
)
|
||||
|
||||
def _get_model(self, model_name: str) -> GenerativeModel:
|
||||
model = _MODEL_CACHE.get(model_name)
|
||||
if model is None:
|
||||
model = GenerativeModel(model_name)
|
||||
_MODEL_CACHE[model_name] = model
|
||||
return model
|
||||
|
||||
def _build_model_sequence(self, preferred_model: str | None) -> list[str]:
|
||||
"""Constrói a sequência de modelos a tentar, respeitando o preferred e o fallback."""
|
||||
sequence: list[str] = []
|
||||
candidates = [
|
||||
preferred_model,
|
||||
self.settings.admin_tool_generation_model,
|
||||
self.settings.admin_tool_generation_fallback_model,
|
||||
]
|
||||
for candidate in candidates:
|
||||
normalized = str(candidate or "").strip()
|
||||
if normalized and normalized not in sequence:
|
||||
sequence.append(normalized)
|
||||
return sequence
|
||||
|
||||
def _build_generation_prompt(
|
||||
self,
|
||||
*,
|
||||
tool_name: str,
|
||||
display_name: str,
|
||||
domain: str,
|
||||
description: str,
|
||||
business_goal: str,
|
||||
parameters: list[dict],
|
||||
) -> str:
|
||||
"""Monta o prompt estruturado de geração enviado ao modelo.
|
||||
|
||||
O prompt descreve o contrato esperado, os restrições de importação,
|
||||
os parâmetros e o objetivo operacional da tool.
|
||||
"""
|
||||
signature_parts: list[str] = []
|
||||
parameter_lines: list[str] = []
|
||||
|
||||
for param in parameters:
|
||||
name = str(param.get("name") or "").strip().lower()
|
||||
if not name:
|
||||
continue
|
||||
param_type = str(param.get("parameter_type") or "string").strip().lower()
|
||||
description_param = str(param.get("description") or "").strip()
|
||||
required = bool(param.get("required", True))
|
||||
annotation = _TYPE_ANNOTATION_MAP.get(param_type, "str")
|
||||
|
||||
if required:
|
||||
signature_parts.append(f"{name}: {annotation}")
|
||||
else:
|
||||
signature_parts.append(f"{name}: {annotation} | None = None")
|
||||
|
||||
required_label = "obrigatório" if required else "opcional"
|
||||
parameter_lines.append(
|
||||
f" - {name} ({annotation}, {required_label}): {description_param}"
|
||||
)
|
||||
|
||||
signature = ", ".join(signature_parts)
|
||||
if signature:
|
||||
full_signature = f"async def run(*, {signature}) -> dict:"
|
||||
else:
|
||||
full_signature = "async def run() -> dict:"
|
||||
|
||||
parameters_block = (
|
||||
"\n".join(parameter_lines)
|
||||
if parameter_lines
|
||||
else " (nenhum parâmetro — a tool não recebe entrada contextual)"
|
||||
)
|
||||
|
||||
domain_context_map = {
|
||||
"vendas": (
|
||||
"O bot atua em um sistema de atendimento para concessionária automotiva. "
|
||||
"A tool opera no domínio de vendas: estoque de veículos, negociações, pedidos e cancelamentos."
|
||||
),
|
||||
"revisao": (
|
||||
"O bot atua em um sistema de atendimento de oficina automotiva. "
|
||||
"A tool opera no domínio de revisão: agendamentos, remarcações, listagem de serviços."
|
||||
),
|
||||
"locacao": (
|
||||
"O bot atua em um sistema de atendimento de locadora de veículos. "
|
||||
"A tool opera no domínio de locação: frota, contratos, pagamentos e devoluções."
|
||||
),
|
||||
"orquestracao": (
|
||||
"O bot atua em um sistema de orquestração conversacional. "
|
||||
"A tool opera no domínio de orquestração: controla fluxo, contexto e estado da conversa."
|
||||
),
|
||||
}
|
||||
domain_context = domain_context_map.get(
|
||||
str(domain or "").strip().lower(),
|
||||
"O bot atua em um sistema de atendimento automatizado.",
|
||||
)
|
||||
|
||||
return (
|
||||
"Você é um especialista em Python que gera implementações realistas de tools "
|
||||
"para um bot de atendimento.\n\n"
|
||||
f"CONTEXTO DO DOMÍNIO:\n{domain_context}\n\n"
|
||||
"CONTRATO OBRIGATÓRIO:\n"
|
||||
"- A função deve ser assíncrona: async def run(...)\n"
|
||||
"- Todos os parâmetros devem ser keyword-only (após *)\n"
|
||||
"- O tipo de retorno deve ser dict (JSON-serializável)\n"
|
||||
"- O módulo pode importar apenas stdlib (datetime, json, re, math, uuid, etc.)\n"
|
||||
"- Proibido importar: app.*, admin_app.*, subprocess, os.system, os.popen\n"
|
||||
"- Proibido usar: exec(), eval(), __import__(), open()\n\n"
|
||||
"TOOL A IMPLEMENTAR:\n"
|
||||
f"- Nome técnico: {tool_name}\n"
|
||||
f"- Nome de exibição: {display_name}\n"
|
||||
f"- Domínio: {domain}\n"
|
||||
f"- Descrição funcional: {description}\n"
|
||||
f"- Objetivo de negócio: {business_goal}\n\n"
|
||||
f"PARÂMETROS DA TOOL:\n{parameters_block}\n\n"
|
||||
f"ASSINATURA ESPERADA:\n{full_signature}\n\n"
|
||||
"INSTRUÇÕES DE GERAÇÃO:\n"
|
||||
"- Gere uma implementação realista que simule o comportamento esperado da tool.\n"
|
||||
"- O retorno deve incluir os campos relevantes ao domínio (não apenas echo dos argumentos).\n"
|
||||
"- Use dados fictícios mas verossímeis para simular a resposta operacional.\n"
|
||||
"- Nenhuma explicação ou comentário fora do código. Retorne apenas o bloco Python.\n"
|
||||
"- O módulo deve começar com um docstring descritivo.\n"
|
||||
"- Envolva o código em ```python ... ```.\n"
|
||||
)
|
||||
|
||||
def _extract_python_block(self, raw_response: str) -> str | None:
|
||||
"""Extrai o primeiro bloco ```python ... ``` da resposta do modelo."""
|
||||
normalized = str(raw_response or "").strip()
|
||||
match = _PYTHON_BLOCK_RE.search(normalized)
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
# Fallback: se não há marcador de código mas o conteúdo parece Python
|
||||
if normalized.startswith("async def run") or normalized.startswith('"""'):
|
||||
return normalized
|
||||
return None
|
||||
|
||||
def _apply_safety_linting(self, source_code: str) -> list[str]:
|
||||
"""Verifica padrões perigosos no código gerado antes da validação formal.
|
||||
|
||||
Retorna lista de issues. Lista vazia = linting passou.
|
||||
"""
|
||||
issues: list[str] = []
|
||||
for pattern, description in _DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, source_code, re.MULTILINE):
|
||||
issues.append(f"linting: {description}.")
|
||||
return issues
|
||||
|
||||
async def generate_tool_source(
|
||||
self,
|
||||
*,
|
||||
tool_name: str,
|
||||
display_name: str,
|
||||
domain: str,
|
||||
description: str,
|
||||
business_goal: str,
|
||||
parameters: list[dict],
|
||||
preferred_model: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Gera o código Python da tool a partir dos metadados do draft.
|
||||
|
||||
Retorna um dicionário com:
|
||||
- passed (bool): True se o código foi gerado e passou no linting
|
||||
- generated_source_code (str | None): código Python gerado
|
||||
- generation_model_used (str | None): modelo que gerou o código
|
||||
- prompt_rendered (str): prompt enviado ao modelo (para auditoria)
|
||||
- issues (list[str]): problemas encontrados (geração ou linting)
|
||||
- elapsed_ms (float): tempo total de geração em milissegundos
|
||||
"""
|
||||
prompt = self._build_generation_prompt(
|
||||
tool_name=tool_name,
|
||||
display_name=display_name,
|
||||
domain=domain,
|
||||
description=description,
|
||||
business_goal=business_goal,
|
||||
parameters=parameters,
|
||||
)
|
||||
|
||||
model_sequence = self._build_model_sequence(preferred_model)
|
||||
generation_config = GenerationConfig(
|
||||
temperature=self.settings.admin_tool_generation_temperature,
|
||||
max_output_tokens=self.settings.admin_tool_generation_max_output_tokens,
|
||||
)
|
||||
|
||||
raw_response: str | None = None
|
||||
generation_model_used: str | None = None
|
||||
last_error: Exception | None = None
|
||||
started_at = perf_counter()
|
||||
|
||||
import asyncio
|
||||
|
||||
for model_name in model_sequence:
|
||||
try:
|
||||
model = self._get_model(model_name)
|
||||
response = await asyncio.wait_for(
|
||||
asyncio.to_thread(
|
||||
model.generate_content,
|
||||
prompt,
|
||||
generation_config=generation_config,
|
||||
),
|
||||
timeout=float(self.settings.admin_tool_generation_timeout_seconds),
|
||||
)
|
||||
candidate = (
|
||||
response.candidates[0]
|
||||
if getattr(response, "candidates", None)
|
||||
else None
|
||||
)
|
||||
content = getattr(candidate, "content", None)
|
||||
parts = list(getattr(content, "parts", None) or [])
|
||||
text_parts = [
|
||||
getattr(part, "text", None)
|
||||
for part in parts
|
||||
if isinstance(getattr(part, "text", None), str)
|
||||
]
|
||||
raw_response = "\n".join(
|
||||
t for t in text_parts if t and t.strip()
|
||||
).strip() or None
|
||||
|
||||
if raw_response is None:
|
||||
# Fallback para o atributo .text raiz
|
||||
try:
|
||||
raw_response = str(response.text or "").strip() or None
|
||||
except (AttributeError, ValueError):
|
||||
raw_response = None
|
||||
|
||||
generation_model_used = model_name
|
||||
break
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
last_error = TimeoutError(
|
||||
f"modelo '{model_name}' excedeu o timeout de "
|
||||
f"{self.settings.admin_tool_generation_timeout_seconds}s para geração de tools."
|
||||
)
|
||||
logger.warning(
|
||||
"tool_generation_service_event=timeout model=%s timeout_seconds=%s",
|
||||
model_name,
|
||||
self.settings.admin_tool_generation_timeout_seconds,
|
||||
)
|
||||
continue
|
||||
|
||||
except NotFound as exc:
|
||||
last_error = exc
|
||||
_MODEL_CACHE.pop(model_name, None)
|
||||
logger.warning(
|
||||
"tool_generation_service_event=model_not_found model=%s error=%s",
|
||||
model_name,
|
||||
exc,
|
||||
)
|
||||
continue
|
||||
|
||||
except GoogleAPIError as exc:
|
||||
last_error = exc
|
||||
logger.warning(
|
||||
"tool_generation_service_event=api_error model=%s error=%s",
|
||||
model_name,
|
||||
exc,
|
||||
)
|
||||
continue
|
||||
|
||||
except Exception as exc:
|
||||
last_error = exc
|
||||
logger.warning(
|
||||
"tool_generation_service_event=unexpected_error model=%s error=%s class=%s",
|
||||
model_name,
|
||||
exc,
|
||||
exc.__class__.__name__,
|
||||
)
|
||||
continue
|
||||
|
||||
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
|
||||
|
||||
if raw_response is None or generation_model_used is None:
|
||||
error_detail = str(last_error) if last_error else "nenhum modelo disponivel respondeu"
|
||||
logger.error(
|
||||
"tool_generation_service_event=generation_failed tool_name=%s elapsed_ms=%s error=%s",
|
||||
tool_name,
|
||||
elapsed_ms,
|
||||
error_detail,
|
||||
)
|
||||
return {
|
||||
"passed": False,
|
||||
"generated_source_code": None,
|
||||
"generation_model_used": None,
|
||||
"prompt_rendered": prompt,
|
||||
"issues": [f"falha na geração via LLM: {error_detail}"],
|
||||
"elapsed_ms": elapsed_ms,
|
||||
}
|
||||
|
||||
generated_source_code = self._extract_python_block(raw_response)
|
||||
if generated_source_code is None:
|
||||
logger.warning(
|
||||
"tool_generation_service_event=no_code_block tool_name=%s model=%s elapsed_ms=%s",
|
||||
tool_name,
|
||||
generation_model_used,
|
||||
elapsed_ms,
|
||||
)
|
||||
return {
|
||||
"passed": False,
|
||||
"generated_source_code": None,
|
||||
"generation_model_used": generation_model_used,
|
||||
"prompt_rendered": prompt,
|
||||
"issues": ["o modelo não retornou um bloco de código Python identificável."],
|
||||
"elapsed_ms": elapsed_ms,
|
||||
}
|
||||
|
||||
linting_issues = self._apply_safety_linting(generated_source_code)
|
||||
if linting_issues:
|
||||
logger.warning(
|
||||
"tool_generation_service_event=linting_failed tool_name=%s model=%s issues=%s elapsed_ms=%s",
|
||||
tool_name,
|
||||
generation_model_used,
|
||||
linting_issues,
|
||||
elapsed_ms,
|
||||
)
|
||||
return {
|
||||
"passed": False,
|
||||
"generated_source_code": generated_source_code,
|
||||
"generation_model_used": generation_model_used,
|
||||
"prompt_rendered": prompt,
|
||||
"issues": linting_issues,
|
||||
"elapsed_ms": elapsed_ms,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"tool_generation_service_event=generation_succeeded tool_name=%s model=%s elapsed_ms=%s",
|
||||
tool_name,
|
||||
generation_model_used,
|
||||
elapsed_ms,
|
||||
)
|
||||
return {
|
||||
"passed": True,
|
||||
"generated_source_code": generated_source_code,
|
||||
"generation_model_used": generation_model_used,
|
||||
"prompt_rendered": prompt,
|
||||
"issues": [],
|
||||
"elapsed_ms": elapsed_ms,
|
||||
}
|
||||
@ -0,0 +1,266 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import UTC, datetime
|
||||
from time import perf_counter
|
||||
from typing import Any
|
||||
|
||||
from admin_app.core.settings import AdminSettings
|
||||
from admin_app.db.database import AdminSessionLocal
|
||||
from admin_app.repositories import (
|
||||
ToolArtifactRepository,
|
||||
ToolDraftRepository,
|
||||
ToolMetadataRepository,
|
||||
ToolVersionRepository,
|
||||
)
|
||||
from admin_app.services.tool_generation_service import ToolGenerationService
|
||||
|
||||
|
||||
class ToolGenerationWorkerService:
|
||||
"""Executa a pipeline de geracao em um worker dedicado do runtime admin.
|
||||
|
||||
O worker abre a propria sessao administrativa e cria uma instancia isolada do
|
||||
ToolManagementService dentro da thread dedicada. Assim, a geracao e as
|
||||
validacoes nao compartilham a sessao SQLAlchemy da request web nem o pool de
|
||||
threads padrao usado pelas rotas sync do FastAPI.
|
||||
"""
|
||||
|
||||
_THREAD_NAME_PREFIX = "admin-tool-generation-worker"
|
||||
_DEFAULT_POLL_AFTER_MS = 1200
|
||||
|
||||
def __init__(self, settings: AdminSettings) -> None:
|
||||
self.settings = settings
|
||||
self.max_workers = max(1, int(settings.admin_tool_generation_worker_max_workers))
|
||||
self._executor = ThreadPoolExecutor(
|
||||
max_workers=self.max_workers,
|
||||
thread_name_prefix=self._THREAD_NAME_PREFIX,
|
||||
)
|
||||
self._lock = threading.Lock()
|
||||
self._pending_jobs = 0
|
||||
self._jobs: dict[str, dict[str, Any]] = {}
|
||||
|
||||
def shutdown(self, *, wait: bool = False) -> None:
|
||||
self._executor.shutdown(wait=wait, cancel_futures=True)
|
||||
|
||||
def execute_generation_pipeline(
|
||||
self,
|
||||
*,
|
||||
version_id: str,
|
||||
runner_staff_account_id: int,
|
||||
runner_name: str,
|
||||
runner_role,
|
||||
) -> dict[str, Any]:
|
||||
submitted_at = datetime.now(UTC).isoformat()
|
||||
with self._lock:
|
||||
self._pending_jobs += 1
|
||||
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
||||
|
||||
started_at = perf_counter()
|
||||
future = self._executor.submit(
|
||||
self._run_generation_pipeline_job,
|
||||
version_id,
|
||||
runner_staff_account_id,
|
||||
runner_name,
|
||||
runner_role,
|
||||
)
|
||||
try:
|
||||
payload = future.result()
|
||||
finally:
|
||||
with self._lock:
|
||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||
pending_jobs_after_completion = self._pending_jobs
|
||||
|
||||
execution = {
|
||||
"mode": "dedicated_generation_worker",
|
||||
"target": "admin_tool_generation_worker",
|
||||
"dispatch_state": "completed",
|
||||
"worker_max_workers": self.max_workers,
|
||||
"worker_pending_jobs": pending_jobs_after_completion,
|
||||
"queued_jobs_before_submit": queued_jobs_before_submit,
|
||||
"submitted_at": submitted_at,
|
||||
"started_at": submitted_at,
|
||||
"completed_at": datetime.now(UTC).isoformat(),
|
||||
"elapsed_ms": round((perf_counter() - started_at) * 1000, 2),
|
||||
"worker_thread_name": str(payload.pop("_worker_thread_name", "")) or None,
|
||||
"poll_after_ms": None,
|
||||
"last_error": None,
|
||||
}
|
||||
enriched_payload = dict(payload)
|
||||
enriched_payload["execution"] = execution
|
||||
return enriched_payload
|
||||
|
||||
def dispatch_generation_pipeline(
|
||||
self,
|
||||
*,
|
||||
version_id: str,
|
||||
runner_staff_account_id: int,
|
||||
runner_name: str,
|
||||
runner_role,
|
||||
) -> dict[str, Any]:
|
||||
normalized_version_id = str(version_id or "").strip().lower()
|
||||
if not normalized_version_id:
|
||||
raise ValueError("Versao administrativa invalida para o worker de geracao.")
|
||||
|
||||
with self._lock:
|
||||
existing_job = self._jobs.get(normalized_version_id)
|
||||
if existing_job is not None and existing_job.get("dispatch_state") in {"queued", "running"}:
|
||||
return self._build_dispatch_snapshot_locked(existing_job)
|
||||
|
||||
self._pending_jobs += 1
|
||||
queued_jobs_before_submit = max(self._pending_jobs - 1, 0)
|
||||
job = {
|
||||
"version_id": normalized_version_id,
|
||||
"dispatch_state": "queued",
|
||||
"queued_jobs_before_submit": queued_jobs_before_submit,
|
||||
"submitted_at": datetime.now(UTC).isoformat(),
|
||||
"started_at": None,
|
||||
"completed_at": None,
|
||||
"elapsed_ms": None,
|
||||
"worker_thread_name": None,
|
||||
"last_error": None,
|
||||
"result_payload": None,
|
||||
}
|
||||
self._jobs[normalized_version_id] = job
|
||||
self._executor.submit(
|
||||
self._run_generation_pipeline_job_async,
|
||||
normalized_version_id,
|
||||
runner_staff_account_id,
|
||||
runner_name,
|
||||
runner_role,
|
||||
)
|
||||
return self._build_dispatch_snapshot_locked(job)
|
||||
|
||||
def get_generation_pipeline_dispatch(self, version_id: str) -> dict[str, Any] | None:
|
||||
normalized_version_id = str(version_id or "").strip().lower()
|
||||
if not normalized_version_id:
|
||||
return None
|
||||
with self._lock:
|
||||
job = self._jobs.get(normalized_version_id)
|
||||
if job is None:
|
||||
return None
|
||||
return self._build_dispatch_snapshot_locked(job)
|
||||
|
||||
def _run_generation_pipeline_job_async(
|
||||
self,
|
||||
version_id: str,
|
||||
runner_staff_account_id: int,
|
||||
runner_name: str,
|
||||
runner_role,
|
||||
) -> None:
|
||||
self._mark_job_running(version_id)
|
||||
try:
|
||||
payload = self._run_generation_pipeline_job(
|
||||
version_id,
|
||||
runner_staff_account_id,
|
||||
runner_name,
|
||||
runner_role,
|
||||
)
|
||||
except Exception as exc:
|
||||
self._mark_job_failed(version_id, exc)
|
||||
return
|
||||
self._mark_job_completed(version_id, payload)
|
||||
|
||||
def _mark_job_running(self, version_id: str) -> None:
|
||||
with self._lock:
|
||||
job = self._jobs.get(version_id)
|
||||
if job is None:
|
||||
return
|
||||
job["dispatch_state"] = "running"
|
||||
job["started_at"] = datetime.now(UTC).isoformat()
|
||||
job["worker_thread_name"] = threading.current_thread().name
|
||||
|
||||
def _mark_job_completed(self, version_id: str, payload: dict[str, Any]) -> None:
|
||||
with self._lock:
|
||||
job = self._jobs.get(version_id)
|
||||
if job is None:
|
||||
return
|
||||
completed_at = datetime.now(UTC).isoformat()
|
||||
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
||||
elapsed_ms = None
|
||||
if started_reference is not None:
|
||||
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
||||
job["dispatch_state"] = "completed"
|
||||
job["completed_at"] = completed_at
|
||||
job["elapsed_ms"] = elapsed_ms
|
||||
job["result_payload"] = dict(payload)
|
||||
job["last_error"] = None
|
||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||
|
||||
def _mark_job_failed(self, version_id: str, exc: Exception) -> None:
|
||||
with self._lock:
|
||||
job = self._jobs.get(version_id)
|
||||
if job is None:
|
||||
return
|
||||
completed_at = datetime.now(UTC).isoformat()
|
||||
started_reference = self._parse_job_timestamp(job.get("started_at")) or self._parse_job_timestamp(job.get("submitted_at"))
|
||||
elapsed_ms = None
|
||||
if started_reference is not None:
|
||||
elapsed_ms = round((datetime.now(UTC) - started_reference).total_seconds() * 1000, 2)
|
||||
job["dispatch_state"] = "failed"
|
||||
job["completed_at"] = completed_at
|
||||
job["elapsed_ms"] = elapsed_ms
|
||||
job["last_error"] = f"{type(exc).__name__}: {exc}"
|
||||
self._pending_jobs = max(self._pending_jobs - 1, 0)
|
||||
|
||||
def _build_dispatch_snapshot_locked(self, job: dict[str, Any]) -> dict[str, Any]:
|
||||
dispatch_state = str(job.get("dispatch_state") or "queued")
|
||||
snapshot = {
|
||||
"mode": "dedicated_generation_worker_async",
|
||||
"target": "admin_tool_generation_worker",
|
||||
"dispatch_state": dispatch_state,
|
||||
"worker_max_workers": self.max_workers,
|
||||
"worker_pending_jobs": self._pending_jobs,
|
||||
"queued_jobs_before_submit": job.get("queued_jobs_before_submit", 0),
|
||||
"submitted_at": job.get("submitted_at"),
|
||||
"started_at": job.get("started_at"),
|
||||
"completed_at": job.get("completed_at"),
|
||||
"elapsed_ms": job.get("elapsed_ms"),
|
||||
"worker_thread_name": job.get("worker_thread_name"),
|
||||
"poll_after_ms": self._DEFAULT_POLL_AFTER_MS if dispatch_state in {"queued", "running"} else None,
|
||||
"last_error": job.get("last_error"),
|
||||
}
|
||||
result_payload = job.get("result_payload")
|
||||
if isinstance(result_payload, dict):
|
||||
snapshot["result_payload"] = dict(result_payload)
|
||||
return snapshot
|
||||
|
||||
@staticmethod
|
||||
def _parse_job_timestamp(value: Any) -> datetime | None:
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(value)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _run_generation_pipeline_job(
|
||||
self,
|
||||
version_id: str,
|
||||
runner_staff_account_id: int,
|
||||
runner_name: str,
|
||||
runner_role,
|
||||
) -> dict[str, Any]:
|
||||
from admin_app.services.tool_management_service import ToolManagementService
|
||||
|
||||
db = AdminSessionLocal()
|
||||
try:
|
||||
service = ToolManagementService(
|
||||
settings=self.settings,
|
||||
draft_repository=ToolDraftRepository(db),
|
||||
version_repository=ToolVersionRepository(db),
|
||||
metadata_repository=ToolMetadataRepository(db),
|
||||
artifact_repository=ToolArtifactRepository(db),
|
||||
tool_generation_service=ToolGenerationService(self.settings),
|
||||
)
|
||||
payload = service.run_generation_pipeline(
|
||||
version_id,
|
||||
runner_staff_account_id=runner_staff_account_id,
|
||||
runner_name=runner_name,
|
||||
runner_role=runner_role,
|
||||
)
|
||||
payload = dict(payload)
|
||||
payload["_worker_thread_name"] = threading.current_thread().name
|
||||
return payload
|
||||
finally:
|
||||
db.close()
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,180 @@
|
||||
import threading
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from admin_app.core import AdminSettings
|
||||
from admin_app.services.tool_generation_worker_service import ToolGenerationWorkerService
|
||||
from admin_app.services.tool_management_service import ToolManagementService
|
||||
from shared.contracts import StaffRole, ToolLifecycleStatus
|
||||
|
||||
|
||||
class ToolGenerationWorkerServiceTests(unittest.TestCase):
|
||||
def test_execute_generation_pipeline_uses_dedicated_worker_metadata(self):
|
||||
worker = ToolGenerationWorkerService(AdminSettings())
|
||||
main_thread_name = threading.current_thread().name
|
||||
|
||||
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
||||
self.assertNotEqual(threading.current_thread().name, main_thread_name)
|
||||
self.assertEqual(version_id, 'tool_version::resumo::v1')
|
||||
self.assertEqual(runner_staff_account_id, 7)
|
||||
self.assertEqual(runner_name, 'Diretoria')
|
||||
self.assertEqual(runner_role, StaffRole.DIRETOR)
|
||||
return {
|
||||
'service': 'admin_tool_governance',
|
||||
'version_id': version_id,
|
||||
'status': 'generated',
|
||||
'_worker_thread_name': threading.current_thread().name,
|
||||
}
|
||||
|
||||
try:
|
||||
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
||||
payload = worker.execute_generation_pipeline(
|
||||
version_id='tool_version::resumo::v1',
|
||||
runner_staff_account_id=7,
|
||||
runner_name='Diretoria',
|
||||
runner_role=StaffRole.DIRETOR,
|
||||
)
|
||||
finally:
|
||||
worker.shutdown(wait=True)
|
||||
|
||||
execution = payload['execution']
|
||||
self.assertEqual(execution['mode'], 'dedicated_generation_worker')
|
||||
self.assertEqual(execution['target'], 'admin_tool_generation_worker')
|
||||
self.assertEqual(execution['dispatch_state'], 'completed')
|
||||
self.assertEqual(execution['worker_max_workers'], 1)
|
||||
self.assertEqual(execution['queued_jobs_before_submit'], 0)
|
||||
self.assertEqual(execution['worker_pending_jobs'], 0)
|
||||
self.assertIsNotNone(execution['submitted_at'])
|
||||
self.assertGreaterEqual(execution['elapsed_ms'], 0)
|
||||
self.assertTrue(execution['worker_thread_name'].startswith('admin-tool-generation-worker'))
|
||||
self.assertEqual(payload['version_id'], 'tool_version::resumo::v1')
|
||||
self.assertEqual(payload['status'], 'generated')
|
||||
|
||||
def test_dispatch_generation_pipeline_returns_queued_snapshot_without_waiting_completion(self):
|
||||
worker = ToolGenerationWorkerService(AdminSettings())
|
||||
job_started = threading.Event()
|
||||
release_job = threading.Event()
|
||||
|
||||
def fake_job(version_id, runner_staff_account_id, runner_name, runner_role):
|
||||
job_started.set()
|
||||
release_job.wait(timeout=2)
|
||||
return {
|
||||
'service': 'admin_tool_governance',
|
||||
'version_id': version_id,
|
||||
'status': 'generated',
|
||||
'_worker_thread_name': threading.current_thread().name,
|
||||
}
|
||||
|
||||
try:
|
||||
with patch.object(worker, '_run_generation_pipeline_job', side_effect=fake_job):
|
||||
dispatch = worker.dispatch_generation_pipeline(
|
||||
version_id='tool_version::assinc::v1',
|
||||
runner_staff_account_id=9,
|
||||
runner_name='Diretoria',
|
||||
runner_role=StaffRole.DIRETOR,
|
||||
)
|
||||
self.assertEqual(dispatch['mode'], 'dedicated_generation_worker_async')
|
||||
self.assertIn(dispatch['dispatch_state'], {'queued', 'running'})
|
||||
self.assertEqual(dispatch['target'], 'admin_tool_generation_worker')
|
||||
self.assertEqual(dispatch['queued_jobs_before_submit'], 0)
|
||||
self.assertGreaterEqual(dispatch['poll_after_ms'], 1)
|
||||
job_started.wait(timeout=2)
|
||||
running_snapshot = worker.get_generation_pipeline_dispatch('tool_version::assinc::v1')
|
||||
self.assertIsNotNone(running_snapshot)
|
||||
self.assertIn(running_snapshot['dispatch_state'], {'running', 'completed'})
|
||||
release_job.set()
|
||||
release_job.wait(timeout=2)
|
||||
finally:
|
||||
worker.shutdown(wait=True)
|
||||
|
||||
|
||||
class ToolManagementServiceWorkerFallbackTests(unittest.TestCase):
|
||||
def test_run_generation_pipeline_in_worker_falls_back_to_inline_execution_metadata(self):
|
||||
service = ToolManagementService(settings=AdminSettings())
|
||||
|
||||
with patch.object(
|
||||
service,
|
||||
'run_generation_pipeline',
|
||||
return_value={
|
||||
'service': 'admin_tool_governance',
|
||||
'message': 'ok',
|
||||
'version_id': 'tool_version::inline::v1',
|
||||
'status': 'generated',
|
||||
},
|
||||
) as run_generation_pipeline:
|
||||
payload = service.run_generation_pipeline_in_worker(
|
||||
'tool_version::inline::v1',
|
||||
runner_staff_account_id=4,
|
||||
runner_name='Colaborador',
|
||||
runner_role=StaffRole.COLABORADOR,
|
||||
)
|
||||
|
||||
run_generation_pipeline.assert_called_once_with(
|
||||
'tool_version::inline::v1',
|
||||
runner_staff_account_id=4,
|
||||
runner_name='Colaborador',
|
||||
runner_role=StaffRole.COLABORADOR,
|
||||
)
|
||||
self.assertEqual(payload['execution']['mode'], 'inline_admin_service')
|
||||
self.assertEqual(payload['execution']['target'], 'admin_inline_generation_pipeline')
|
||||
self.assertEqual(payload['execution']['dispatch_state'], 'completed')
|
||||
self.assertEqual(payload['execution']['queued_jobs_before_submit'], 0)
|
||||
self.assertIsNone(payload['execution']['worker_max_workers'])
|
||||
self.assertEqual(payload['version_id'], 'tool_version::inline::v1')
|
||||
|
||||
def test_run_generation_pipeline_in_worker_returns_queued_snapshot_when_dedicated_worker_accepts_job(self):
|
||||
version = SimpleNamespace(
|
||||
id=11,
|
||||
version_id='tool_version::fila::v1',
|
||||
tool_name='emitir_resumo_locacao',
|
||||
version_number=1,
|
||||
status=ToolLifecycleStatus.DRAFT,
|
||||
summary='Resumo governado em fila.',
|
||||
owner_display_name='Diretoria',
|
||||
updated_at=None,
|
||||
created_at=None,
|
||||
)
|
||||
draft_repository = Mock()
|
||||
draft_repository.get_by_tool_name.return_value = SimpleNamespace(id=3)
|
||||
version_repository = Mock()
|
||||
version_repository.get_by_version_id.return_value = version
|
||||
version_repository.list_versions.return_value = [version]
|
||||
metadata_repository = Mock()
|
||||
metadata_repository.get_by_tool_version_id.return_value = SimpleNamespace(display_name='Emitir resumo locacao')
|
||||
worker = Mock()
|
||||
worker.dispatch_generation_pipeline.return_value = {
|
||||
'mode': 'dedicated_generation_worker_async',
|
||||
'target': 'admin_tool_generation_worker',
|
||||
'dispatch_state': 'queued',
|
||||
'worker_max_workers': 1,
|
||||
'worker_pending_jobs': 1,
|
||||
'queued_jobs_before_submit': 0,
|
||||
'submitted_at': '2026-04-02T10:00:00+00:00',
|
||||
'started_at': None,
|
||||
'completed_at': None,
|
||||
'elapsed_ms': None,
|
||||
'worker_thread_name': None,
|
||||
'poll_after_ms': 1200,
|
||||
'last_error': None,
|
||||
}
|
||||
service = ToolManagementService(
|
||||
settings=AdminSettings(),
|
||||
draft_repository=draft_repository,
|
||||
version_repository=version_repository,
|
||||
metadata_repository=metadata_repository,
|
||||
tool_generation_worker_service=worker,
|
||||
)
|
||||
|
||||
payload = service.run_generation_pipeline_in_worker(
|
||||
'tool_version::fila::v1',
|
||||
runner_staff_account_id=1,
|
||||
runner_name='Diretoria',
|
||||
runner_role=StaffRole.DIRETOR,
|
||||
)
|
||||
|
||||
self.assertEqual(payload['status'], ToolLifecycleStatus.DRAFT)
|
||||
self.assertEqual(payload['execution']['dispatch_state'], 'queued')
|
||||
self.assertEqual(payload['queue_entry']['gate'], 'generation_pipeline_queued')
|
||||
self.assertEqual(payload['queue_entry']['automated_validation_status'], 'pending')
|
||||
self.assertIn('request foi liberada', payload['message'])
|
||||
Loading…
Reference in New Issue