import asyncio import json import logging from time import perf_counter from typing import Dict, Any, List, Optional import vertexai from google.api_core.exceptions import NotFound from vertexai.generative_models import FunctionDeclaration, GenerativeModel, Part, Tool from app.core.settings import settings from app.models.tool_model import ToolDefinition logger = logging.getLogger(__name__) IMAGE_ANALYSIS_FAILURE_MESSAGE = "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida." INVALID_RECEIPT_WATERMARK_MESSAGE = "O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel." VALID_RECEIPT_WATERMARK_MARKER = "[watermark_sysaltiia_ok]" IMAGE_ANALYSIS_BLOCKING_PREFIXES = ( IMAGE_ANALYSIS_FAILURE_MESSAGE.lower(), INVALID_RECEIPT_WATERMARK_MESSAGE.lower(), ) # Essa classe encapsula a integracao com o Vertex AI: # inicializacao, cache de modelos e serializacao das tools. class LLMService: _vertex_initialized = False _models: dict[str, GenerativeModel] = {} _vertex_tools_cache: dict[str, Optional[List[Tool]]] = {} def __init__(self): """Inicializa o cliente Vertex AI e define modelos de fallback.""" if not LLMService._vertex_initialized: vertexai.init( project=settings.google_project_id, location=settings.google_location, ) LLMService._vertex_initialized = True configured = settings.vertex_model_name.strip() fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"] self.model_names = [configured] + [m for m in fallback_models if m != configured] def _log_llm_event(self, event: str, **payload) -> None: logger.info("llm_service_event=%s payload=%s", event, payload) # Transforma anexos de imagem em uma mensagem textual pronta para o orquestrador. async def extract_image_workflow_message( self, *, caption: str | None, attachments: List[Dict[str, Any]], ) -> str: """Analisa imagem(ns) e devolve uma mensagem textual pronta para o orquestrador.""" if not attachments: return str(caption or "").strip() prompt = self._build_image_workflow_prompt(caption=caption) contents: List[Any] = [prompt] for attachment in attachments: raw_data = attachment.get("data") mime_type = str(attachment.get("mime_type") or "image/jpeg").strip() or "image/jpeg" if not isinstance(raw_data, (bytes, bytearray)) or not raw_data: continue contents.append(Part.from_data(data=bytes(raw_data), mime_type=mime_type)) if len(contents) == 1: return IMAGE_ANALYSIS_FAILURE_MESSAGE response = None last_error = None selected_model_name = None attempts = 0 started_at = perf_counter() for model_name in self.model_names: attempts += 1 try: model = self._get_model(model_name) response = await asyncio.to_thread(model.generate_content, contents) selected_model_name = model_name break except NotFound as err: last_error = err LLMService._models.pop(model_name, None) continue if response is None: self._log_llm_event( "image_workflow_failed", elapsed_ms=round((perf_counter() - started_at) * 1000, 2), attempts=attempts, attachments_count=len(attachments), caption_present=bool(str(caption or "").strip()), ) if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}" ) from last_error raise RuntimeError("Falha ao analisar imagem no Vertex AI.") payload = self._extract_response_payload(response) self._log_llm_event( "image_workflow_completed", model_name=selected_model_name, elapsed_ms=round((perf_counter() - started_at) * 1000, 2), attempts=attempts, attachments_count=len(attachments), caption_present=bool(str(caption or "").strip()), ) extracted_text = (payload.get("response") or "").strip() or (caption or "").strip() return self._coerce_image_workflow_response(extracted_text) # Define o prompt de extracao usado para comprovantes e multas em imagem. def _build_image_workflow_prompt(self, *, caption: str | None) -> str: normalized_caption = (caption or "").strip() or "sem legenda" return ( "Voce esta preparando uma mensagem textual curta para um orquestrador de atendimento automotivo e locacao. " "Analise a imagem enviada pelo usuario e a legenda opcional. " "Se a imagem for comprovante de pagamento ou nota fiscal, so considere o documento valido quando houver no fundo a marca d'agua exatamente escrita como SysaltiIA, com essa mesma grafia. " f"Se essa marca d'agua SysaltiIA nao estiver visivel com clareza, responda exatamente: {INVALID_RECEIPT_WATERMARK_MESSAGE} " f"Se o comprovante de pagamento ou a nota fiscal estiver valido com a marca d'agua correta, prefixe a resposta exatamente com {VALID_RECEIPT_WATERMARK_MARKER} e um espaco antes do texto final. " "Se for comprovante de pagamento de aluguel, responda com uma frase objetiva em portugues no formato: " "Registrar pagamento de aluguel: contrato <...>; placa <...>; valor <...>; data_pagamento <...>; favorecido <...>; identificador_comprovante <...>; observacoes <...>. " "Se a data de pagamento incluir hora e minuto visiveis na imagem, preserve a data e a hora no campo data_pagamento no formato DD/MM/AAAA HH:MM. " "Nao reduza para somente a data quando a hora estiver visivel. " "Se apenas a data estiver visivel, use somente a data. " "Se for multa de transito relacionada a carro alugado, responda com uma frase objetiva em portugues no formato: " "Registrar multa de aluguel: placa <...>; contrato <...>; auto_infracao <...>; orgao_emissor <...>; valor <...>; data_infracao <...>; vencimento <...>; observacoes <...>. " "Se for outro documento automotivo util, resuma em uma frase com os dados importantes. " f"Se nao conseguir identificar com seguranca, responda exatamente: {IMAGE_ANALYSIS_FAILURE_MESSAGE} " "Use apenas dados observaveis e nao invente informacoes. " f"Legenda do usuario: {normalized_caption}" ) # Aplica validacoes extras ao retorno multimodal antes de acionar o orquestrador. def _coerce_image_workflow_response(self, text: str) -> str: normalized = str(text or "").strip() if not normalized: return "" lowered = normalized.lower() marker = VALID_RECEIPT_WATERMARK_MARKER.lower() if lowered.startswith(marker): return normalized[len(VALID_RECEIPT_WATERMARK_MARKER):].strip() if lowered.startswith(IMAGE_ANALYSIS_FAILURE_MESSAGE.lower()) or lowered.startswith( INVALID_RECEIPT_WATERMARK_MESSAGE.lower() ): return normalized if self._looks_like_watermark_sensitive_image_response(normalized): return INVALID_RECEIPT_WATERMARK_MESSAGE return normalized # Reconhece respostas que so deveriam seguir com a confirmacao da watermark. def _looks_like_watermark_sensitive_image_response(self, text: str) -> bool: normalized = str(text or "").strip().lower() return bool( normalized.startswith("registrar pagamento de aluguel:") or normalized.startswith("nota fiscal") or normalized.startswith("comprovante") or "nota fiscal" in normalized or "comprovante" in normalized ) def build_vertex_tools(self, tools: List[ToolDefinition]) -> Optional[List[Tool]]: """Converte tools internas para o formato esperado pelo Vertex AI.""" # Vertex espera uma lista de Tool, com function_declarations agrupadas em um unico Tool. if not tools: return None cache_key = json.dumps( [ { "name": tool.name, "description": tool.description, "parameters": tool.parameters, } for tool in tools ], sort_keys=True, ensure_ascii=True, separators=(",", ":"), ) cached = LLMService._vertex_tools_cache.get(cache_key) if cached is not None: return cached function_declarations = [ FunctionDeclaration( name=tool.name, description=tool.description, parameters=tool.parameters, ) for tool in tools ] vertex_tools = [Tool(function_declarations=function_declarations)] LLMService._vertex_tools_cache[cache_key] = vertex_tools return vertex_tools def _get_model(self, model_name: str) -> GenerativeModel: model = LLMService._models.get(model_name) if model is None: model = GenerativeModel(model_name) LLMService._models[model_name] = model return model def _extract_response_payload(self, response) -> Dict[str, Any]: candidate = response.candidates[0] if getattr(response, "candidates", None) else None content = getattr(candidate, "content", None) parts = list(getattr(content, "parts", None) or []) tool_call = None text_parts: list[str] = [] for part in parts: function_call = getattr(part, "function_call", None) if function_call is not None and tool_call is None: tool_call = { "name": function_call.name, "arguments": dict(function_call.args), } text_value = getattr(part, "text", None) if isinstance(text_value, str) and text_value.strip(): text_parts.append(text_value) response_text = "\n".join(text_parts).strip() or None return { "response": response_text, "tool_call": tool_call, } async def generate_response( self, message: str, tools: List[ToolDefinition], history: List[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Gera resposta textual ou chamada de tool a partir da mensagem do usuario.""" vertex_tools = self.build_vertex_tools(tools) response = None last_error = None selected_model_name = None attempts = 0 started_at = perf_counter() # Tenta o modelo configurado e cai para nomes alternativos # quando o principal nao estiver disponivel no projeto/regiao. for model_name in self.model_names: attempts += 1 try: model = self._get_model(model_name) chat = model.start_chat(history=history or []) send_kwargs = {"tools": vertex_tools} if vertex_tools else {} response = await asyncio.to_thread(chat.send_message, message, **send_kwargs) selected_model_name = model_name break except NotFound as err: last_error = err LLMService._models.pop(model_name, None) continue if response is None: self._log_llm_event( "generate_response_failed", elapsed_ms=round((perf_counter() - started_at) * 1000, 2), attempts=attempts, tools_count=len(tools or []), history_count=len(history or []), ) if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}" ) from last_error raise RuntimeError("Falha ao gerar resposta no Vertex AI.") payload = self._extract_response_payload(response) self._log_llm_event( "generate_response_completed", model_name=selected_model_name, elapsed_ms=round((perf_counter() - started_at) * 1000, 2), attempts=attempts, tools_count=len(tools or []), history_count=len(history or []), tool_call=bool(payload.get("tool_call")), ) return payload async def warmup(self) -> None: """Preaquece conexao/modelo para reduzir latencia da primeira requisicao real.""" try: await self.generate_response( message="Responda apenas: ok", tools=[], ) except Exception: # Warmup e melhor esforco; falhas nao devem bloquear inicializacao. return