import asyncio import json from typing import Dict, Any, List, Optional import vertexai from google.api_core.exceptions import NotFound from vertexai.generative_models import FunctionDeclaration, GenerativeModel, Part, Tool from app.core.settings import settings from app.models.tool_model import ToolDefinition # Essa classe encapsula a integracao com o Vertex AI: # inicializacao, cache de modelos e serializacao das tools. class LLMService: _vertex_initialized = False _models: dict[str, GenerativeModel] = {} _vertex_tools_cache: dict[str, Optional[List[Tool]]] = {} def __init__(self): """Inicializa o cliente Vertex AI e define modelos de fallback.""" if not LLMService._vertex_initialized: vertexai.init( project=settings.google_project_id, location=settings.google_location, ) LLMService._vertex_initialized = True configured = settings.vertex_model_name.strip() fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"] self.model_names = [configured] + [m for m in fallback_models if m != configured] async def extract_image_workflow_message( self, *, caption: str | None, attachments: List[Dict[str, Any]], ) -> str: """Analisa imagem(ns) e devolve uma mensagem textual pronta para o orquestrador.""" if not attachments: return str(caption or "").strip() prompt = self._build_image_workflow_prompt(caption=caption) contents: List[Any] = [prompt] for attachment in attachments: raw_data = attachment.get("data") mime_type = str(attachment.get("mime_type") or "image/jpeg").strip() or "image/jpeg" if not isinstance(raw_data, (bytes, bytearray)) or not raw_data: continue contents.append(Part.from_data(data=bytes(raw_data), mime_type=mime_type)) if len(contents) == 1: return "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida." response = None last_error = None for model_name in self.model_names: try: model = self._get_model(model_name) response = await asyncio.to_thread(model.generate_content, contents) break except NotFound as err: last_error = err LLMService._models.pop(model_name, None) continue if response is None: if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}" ) from last_error raise RuntimeError("Falha ao analisar imagem no Vertex AI.") payload = self._extract_response_payload(response) return (payload.get("response") or "").strip() or (caption or "").strip() def _build_image_workflow_prompt(self, *, caption: str | None) -> str: normalized_caption = (caption or "").strip() or "sem legenda" return ( "Voce esta preparando uma mensagem textual curta para um orquestrador de atendimento automotivo e locacao. " "Analise a imagem enviada pelo usuario e a legenda opcional. " "Se for comprovante de pagamento de aluguel, responda com uma frase objetiva em portugues no formato: " "Registrar pagamento de aluguel: contrato <...>; placa <...>; valor <...>; data_pagamento <...>; favorecido <...>; identificador_comprovante <...>; observacoes <...>. " "Se a data de pagamento incluir hora e minuto visiveis na imagem, preserve a data e a hora no campo data_pagamento no formato DD/MM/AAAA HH:MM. " "Nao reduza para somente a data quando a hora estiver visivel. " "Se apenas a data estiver visivel, use somente a data. " "Se for multa de transito relacionada a carro alugado, responda com uma frase objetiva em portugues no formato: " "Registrar multa de aluguel: placa <...>; contrato <...>; auto_infracao <...>; orgao_emissor <...>; valor <...>; data_infracao <...>; vencimento <...>; observacoes <...>. " "Se for outro documento automotivo util, resuma em uma frase com os dados importantes. " "Se nao conseguir identificar com seguranca, responda exatamente: Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida. " "Use apenas dados observaveis e nao invente informacoes. " f"Legenda do usuario: {normalized_caption}" ) def build_vertex_tools(self, tools: List[ToolDefinition]) -> Optional[List[Tool]]: """Converte tools internas para o formato esperado pelo Vertex AI.""" # Vertex espera uma lista de Tool, com function_declarations agrupadas em um unico Tool. if not tools: return None cache_key = json.dumps( [ { "name": tool.name, "description": tool.description, "parameters": tool.parameters, } for tool in tools ], sort_keys=True, ensure_ascii=True, separators=(",", ":"), ) cached = LLMService._vertex_tools_cache.get(cache_key) if cached is not None: return cached function_declarations = [ FunctionDeclaration( name=tool.name, description=tool.description, parameters=tool.parameters, ) for tool in tools ] vertex_tools = [Tool(function_declarations=function_declarations)] LLMService._vertex_tools_cache[cache_key] = vertex_tools return vertex_tools def _get_model(self, model_name: str) -> GenerativeModel: model = LLMService._models.get(model_name) if model is None: model = GenerativeModel(model_name) LLMService._models[model_name] = model return model def _extract_response_payload(self, response) -> Dict[str, Any]: candidate = response.candidates[0] if getattr(response, "candidates", None) else None content = getattr(candidate, "content", None) parts = list(getattr(content, "parts", None) or []) tool_call = None text_parts: list[str] = [] for part in parts: function_call = getattr(part, "function_call", None) if function_call is not None and tool_call is None: tool_call = { "name": function_call.name, "arguments": dict(function_call.args), } text_value = getattr(part, "text", None) if isinstance(text_value, str) and text_value.strip(): text_parts.append(text_value) response_text = "\n".join(text_parts).strip() or None return { "response": response_text, "tool_call": tool_call, } async def generate_response( self, message: str, tools: List[ToolDefinition], history: List[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Gera resposta textual ou chamada de tool a partir da mensagem do usuario.""" vertex_tools = self.build_vertex_tools(tools) response = None last_error = None # Tenta o modelo configurado e cai para nomes alternativos # quando o principal nao estiver disponivel no projeto/regiao. for model_name in self.model_names: try: model = self._get_model(model_name) chat = model.start_chat(history=history or []) send_kwargs = {"tools": vertex_tools} if vertex_tools else {} response = await asyncio.to_thread(chat.send_message, message, **send_kwargs) break except NotFound as err: last_error = err LLMService._models.pop(model_name, None) continue if response is None: if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}" ) from last_error raise RuntimeError("Falha ao gerar resposta no Vertex AI.") return self._extract_response_payload(response) async def warmup(self) -> None: """Preaquece conexao/modelo para reduzir latencia da primeira requisicao real.""" try: await self.generate_response( message="Responda apenas: ok", tools=[], ) except Exception: # Warmup e melhor esforco; falhas nao devem bloquear inicializacao. return