import asyncio import logging import os import tempfile from datetime import timedelta from typing import Any, Dict, List import aiohttp from fastapi import HTTPException from app.core.settings import settings from app.core.time_utils import utc_now from app.db.database import SessionLocal from app.db.mock_database import SessionMockLocal from app.services.ai.llm_service import ( IMAGE_ANALYSIS_BLOCKING_PREFIXES, LLMService, ) from app.services.orchestration.conversation_state_repository import ConversationStateRepository from app.services.orchestration.orquestrador_service import OrquestradorService from app.services.orchestration.sensitive_data import mask_sensitive_payload from app.services.orchestration.state_repository_factory import get_conversation_state_repository from app.services.user.user_service import UserService logger = logging.getLogger(__name__) TELEGRAM_MESSAGE_SAFE_LIMIT = 3800 TELEGRAM_MAX_CONCURRENT_CHATS = 8 TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages" TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100 TELEGRAM_RUNTIME_BUCKET = "telegram_runtime_state" TELEGRAM_RUNTIME_OWNER_ID = 0 TELEGRAM_RUNTIME_CURSOR_TTL_DAYS = 30 TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS = 3 TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS = 1.0 def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: normalized = str(text or "").strip() if not normalized: return [""] if len(normalized) <= limit: return [normalized] chunks: List[str] = [] current = "" def flush_current() -> None: nonlocal current if current: chunks.append(current) current = "" paragraphs = normalized.split("\n\n") for paragraph in paragraphs: candidate = paragraph if not current else f"{current}\n\n{paragraph}" if len(candidate) <= limit: current = candidate continue flush_current() if len(paragraph) <= limit: current = paragraph continue line_buffer = "" for line in paragraph.split("\n"): line_candidate = line if not line_buffer else f"{line_buffer}\n{line}" if len(line_candidate) <= limit: line_buffer = line_candidate continue if line_buffer: chunks.append(line_buffer) line_buffer = "" while len(line) > limit: chunks.append(line[:limit]) line = line[limit:] line_buffer = line if line_buffer: current = line_buffer flush_current() return chunks or [normalized[:limit]] def _ensure_supported_runtime_configuration() -> None: """ Em producao, o satelite nao deve operar com estado conversacional apenas em memoria, porque isso quebra continuidade entre reinicios e instancias. """ if settings.environment == "production" and settings.conversation_state_backend == "memory": raise RuntimeError( "Telegram satellite em producao exige conversation_state_backend diferente de memory." ) def _acquire_single_instance_lock(lock_name: str): """ Garante apenas uma instancia local do satelite por lock de arquivo. Retorna o handle do arquivo para manter o lock vivo. """ lock_path = os.path.join(tempfile.gettempdir(), lock_name) handle = open(lock_path, "a+") try: import msvcrt try: msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1) except OSError: handle.close() raise RuntimeError( "Ja existe uma instancia do Telegram satellite em execucao neste host." ) return handle except ImportError: import fcntl try: fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: handle.close() raise RuntimeError( "Ja existe uma instancia do Telegram satellite em execucao neste host." ) return handle class TelegramSatelliteService: """ Interface satelite para Telegram. Processa mensagens direto no OrquestradorService e publica respostas no chat. """ def __init__( self, token: str, state_repository: ConversationStateRepository | None = None, ): """Configura cliente Telegram com URL base e timeouts padrao.""" self.base_url = f"https://api.telegram.org/bot{token}" self.file_base_url = f"https://api.telegram.org/file/bot{token}" self.polling_timeout = settings.telegram_polling_timeout self.request_timeout = settings.telegram_request_timeout self.state = state_repository or get_conversation_state_repository() self._last_update_id = -1 self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {} self._chat_workers: dict[int, asyncio.Task[None]] = {} self._chat_workers_lock = asyncio.Lock() self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS) async def run(self) -> None: """Inicia loop de long polling para consumir atualizacoes do bot.""" logger.info("Telegram satellite iniciado com long polling.") await self._warmup_llm() offset = None timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession(timeout=timeout) as session: try: offset = await self._initialize_offset(session=session) while True: updates = await self._get_updates(session=session, offset=offset) for update in updates: update_id = update.get("update_id") if not isinstance(update_id, int): continue if update_id <= self._last_update_id: continue self._last_update_id = update_id offset = update_id + 1 await self._schedule_update_processing(session=session, update=update) finally: await self._shutdown_chat_workers() def _extract_chat_id(self, update: Dict[str, Any]) -> int | None: message = update.get("message", {}) chat = message.get("chat", {}) chat_id = chat.get("id") return chat_id if isinstance(chat_id, int) else None def _build_update_idempotency_key(self, update: Dict[str, Any]) -> str | None: chat_id = self._extract_chat_id(update) message = update.get("message", {}) message_id = message.get("message_id") if isinstance(chat_id, int) and isinstance(message_id, int): return f"telegram:message:{chat_id}:{message_id}" update_id = update.get("update_id") if isinstance(update_id, int): return f"telegram:update:{update_id}" return None def _idempotency_owner_id(self, update: Dict[str, Any]) -> int | None: chat_id = self._extract_chat_id(update) if isinstance(chat_id, int): return chat_id update_id = update.get("update_id") return update_id if isinstance(update_id, int) else None def _get_processed_update(self, update: Dict[str, Any]) -> dict | None: owner_id = self._idempotency_owner_id(update) idempotency_key = self._build_update_idempotency_key(update) if owner_id is None or not idempotency_key: return None entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) if not isinstance(entry, dict): return None items = entry.get("items") if not isinstance(items, dict): return None payload = items.get(idempotency_key) return payload if isinstance(payload, dict) else None def _store_processed_update(self, update: Dict[str, Any], answer: str) -> None: owner_id = self._idempotency_owner_id(update) idempotency_key = self._build_update_idempotency_key(update) if owner_id is None or not idempotency_key: return now = utc_now().replace(microsecond=0) expires_at = now + timedelta(minutes=settings.conversation_state_ttl_minutes) entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) or {} items = dict(entry.get("items") or {}) items[idempotency_key] = { "answer": str(answer or ""), "processed_at": now, } if len(items) > TELEGRAM_IDEMPOTENCY_CACHE_LIMIT: ordered = sorted( items.items(), key=lambda item: item[1].get("processed_at") or now, reverse=True, ) items = dict(ordered[:TELEGRAM_IDEMPOTENCY_CACHE_LIMIT]) self.state.set_entry( TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, { "items": items, "expires_at": expires_at, }, ) def _get_runtime_state(self) -> dict: entry = self.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) return entry if isinstance(entry, dict) else {} def _persist_last_processed_update_id(self, update_id: int) -> None: if update_id < 0: return entry = self._get_runtime_state() current_last_update_id = entry.get("last_update_id") if isinstance(current_last_update_id, int) and current_last_update_id >= update_id: self._last_update_id = max(self._last_update_id, current_last_update_id) return now = utc_now().replace(microsecond=0) expires_at = now + timedelta(days=TELEGRAM_RUNTIME_CURSOR_TTL_DAYS) self.state.set_entry( TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID, { "last_update_id": update_id, "updated_at": now, "expires_at": expires_at, }, ) self._last_update_id = max(self._last_update_id, update_id) async def _schedule_update_processing( self, session: aiohttp.ClientSession, update: Dict[str, Any], ) -> None: chat_id = self._extract_chat_id(update) if chat_id is None: async with self._chat_processing_semaphore: await self._handle_update(session=session, update=update) return async with self._chat_workers_lock: queue = self._chat_queues.get(chat_id) if queue is None: queue = asyncio.Queue() self._chat_queues[chat_id] = queue queue.put_nowait(update) worker = self._chat_workers.get(chat_id) if worker is None or worker.done(): self._chat_workers[chat_id] = asyncio.create_task( self._run_chat_worker( chat_id=chat_id, session=session, queue=queue, ) ) async def _run_chat_worker( self, *, chat_id: int, session: aiohttp.ClientSession, queue: asyncio.Queue[Dict[str, Any]], ) -> None: current_task = asyncio.current_task() try: while True: update = await queue.get() try: async with self._chat_processing_semaphore: await self._handle_update(session=session, update=update) finally: queue.task_done() async with self._chat_workers_lock: if queue.empty(): if self._chat_workers.get(chat_id) is current_task: self._chat_workers.pop(chat_id, None) if self._chat_queues.get(chat_id) is queue: self._chat_queues.pop(chat_id, None) return except asyncio.CancelledError: raise except Exception: logger.exception("Falha inesperada no worker do chat %s.", chat_id) finally: async with self._chat_workers_lock: if self._chat_workers.get(chat_id) is current_task: self._chat_workers.pop(chat_id, None) if self._chat_queues.get(chat_id) is queue and queue.empty(): self._chat_queues.pop(chat_id, None) async def _shutdown_chat_workers(self) -> None: async with self._chat_workers_lock: workers = list(self._chat_workers.values()) self._chat_workers = {} self._chat_queues = {} for worker in workers: worker.cancel() if workers: await asyncio.gather(*workers, return_exceptions=True) async def _warmup_llm(self) -> None: """Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario.""" try: await LLMService().warmup() logger.info("Warmup de LLM concluido no Telegram satellite.") except Exception: logger.exception("Falha no warmup de LLM do Telegram satellite.") async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None: """ Retoma o polling a partir do ultimo update persistido. Sem cursor salvo, faz um bootstrap conservador e registra o ponto inicial. """ runtime_state = self._get_runtime_state() last_update_id = runtime_state.get("last_update_id") if isinstance(last_update_id, int) and last_update_id >= 0: self._last_update_id = last_update_id logger.info("Retomando polling do Telegram a partir do update_id persistido %s.", last_update_id) return last_update_id + 1 payload: Dict[str, Any] = { "timeout": 0, "limit": 100, "allowed_updates": ["message"], } async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha ao inicializar offset no Telegram: %s", data) return None updates = data.get("result", []) if not updates: return None last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int)) if last_id < 0: return None self._persist_last_processed_update_id(last_id) logger.info( "Bootstrap inicial do Telegram sem cursor persistido: %s update(s) anteriores ignorados.", len(updates), ) return last_id + 1 async def _get_updates( self, session: aiohttp.ClientSession, offset: int | None, ) -> List[Dict[str, Any]]: """Busca novas mensagens no Telegram a partir do offset informado.""" payload: Dict[str, Any] = { "timeout": self.polling_timeout, "limit": 100, "allowed_updates": ["message"], } if offset is not None: payload["offset"] = offset async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em getUpdates: %s", data) return [] return data.get("result", []) async def _handle_update( self, session: aiohttp.ClientSession, update: Dict[str, Any], ) -> None: """Processa uma atualizacao recebida e envia resposta ao chat.""" message = update.get("message", {}) text = message.get("text") or message.get("caption") chat = message.get("chat", {}) chat_id = chat.get("id") sender = message.get("from", {}) if not chat_id: return cached_update = self._get_processed_update(update) if cached_update: cached_answer = str(cached_update.get("answer") or "").strip() if cached_answer: logger.info( "Reutilizando resposta em reentrega do Telegram. chat_id=%s update_key=%s", chat_id, self._build_update_idempotency_key(update), ) await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer) return image_attachments = await self._extract_image_attachments(session=session, message=message) if not text and not image_attachments: return try: answer = await self._process_message( text=text or "", sender=sender, chat_id=chat_id, image_attachments=image_attachments, ) except HTTPException as exc: logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", mask_sensitive_payload(exc.detail)) answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada." except Exception: logger.exception("Erro ao processar mensagem do Telegram.") answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." self._store_processed_update(update=update, answer=answer) update_id = update.get("update_id") if isinstance(update_id, int): self._persist_last_processed_update_id(update_id) await self._deliver_message(session=session, chat_id=chat_id, text=answer) async def _deliver_message( self, *, session: aiohttp.ClientSession, chat_id: int, text: str, ) -> None: """Entrega a resposta ao Telegram sem deixar falhas de transporte derrubarem o worker.""" try: await self._send_message(session=session, chat_id=chat_id, text=text) except Exception: logger.exception("Falha inesperada ao entregar mensagem ao Telegram. chat_id=%s", chat_id) async def _send_message( self, session: aiohttp.ClientSession, chat_id: int, text: str, ) -> None: """Envia mensagem de texto para o chat informado no Telegram.""" for chunk_index, chunk in enumerate(_split_telegram_text(text), start=1): payload = { "chat_id": chat_id, "text": chunk, } for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1): try: async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em sendMessage: %s", data) break except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc: if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS: logger.warning( "Falha de transporte ao enviar mensagem ao Telegram apos %s tentativa(s). chat_id=%s chunk=%s erro=%s", TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS, chat_id, chunk_index, exc, ) break delay_seconds = TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS * attempt logger.warning( "Falha temporaria ao enviar mensagem ao Telegram. chat_id=%s chunk=%s tentativa=%s/%s retry_em=%.1fs erro=%s", chat_id, chunk_index, attempt, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS, delay_seconds, exc, ) await asyncio.sleep(delay_seconds) # Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver. async def _process_message( self, text: str, sender: Dict[str, Any], chat_id: int, image_attachments: List[Dict[str, Any]] | None = None, ) -> str: """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" message_text = text if image_attachments: image_message = await self._build_orchestration_message_from_image( caption=text, image_attachments=image_attachments, ) if self._is_image_analysis_failure_message(image_message): return image_message message_text = image_message return await asyncio.to_thread( self._run_blocking_orchestration_turn, message_text=message_text, sender=sender, chat_id=chat_id, ) def _run_blocking_orchestration_turn( self, *, message_text: str, sender: Dict[str, Any], chat_id: int, ) -> str: """ Executa o turno do orquestrador fora do loop async principal. Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes. """ tools_db = SessionLocal() mock_db = SessionMockLocal() try: user_service = UserService(mock_db) external_id = str(sender.get("id") or chat_id) first_name = (sender.get("first_name") or "").strip() last_name = (sender.get("last_name") or "").strip() display_name = f"{first_name} {last_name}".strip() or None username = sender.get("username") user = user_service.get_or_create( channel="telegram", external_id=external_id, name=display_name, username=username, ) service = OrquestradorService( tools_db, state_repository=self.state, ) return asyncio.run(service.handle_message(message=message_text, user_id=user.id)) finally: tools_db.close() mock_db.close() # Filtra documentos do Telegram para aceitar apenas imagens. def _is_supported_image_document(self, document: Dict[str, Any]) -> bool: mime_type = str((document or {}).get("mime_type") or "").strip().lower() return mime_type.startswith("image/") # Reconhece a resposta padrao quando a leitura da imagem falha. def _is_image_analysis_failure_message(self, text: str) -> bool: normalized = str(text or "").strip().lower() return any(normalized.startswith(prefix) for prefix in IMAGE_ANALYSIS_BLOCKING_PREFIXES) # Extrai a melhor foto e documentos de imagem anexados na mensagem. async def _extract_image_attachments( self, session: aiohttp.ClientSession, message: Dict[str, Any], ) -> List[Dict[str, Any]]: attachments: List[Dict[str, Any]] = [] photos = message.get("photo") or [] if isinstance(photos, list) and photos: best_photo = max( (item for item in photos if isinstance(item, dict) and item.get("file_id")), key=lambda item: int(item.get("file_size") or 0), default=None, ) if best_photo is not None: attachment = await self._download_image_attachment( session=session, file_id=str(best_photo.get("file_id")), mime_type="image/jpeg", file_name="telegram_photo.jpg", ) if attachment: attachments.append(attachment) document = message.get("document") or {} if isinstance(document, dict) and document.get("file_id") and self._is_supported_image_document(document): attachment = await self._download_image_attachment( session=session, file_id=str(document.get("file_id")), mime_type=str(document.get("mime_type") or "image/jpeg"), file_name=str(document.get("file_name") or "telegram_image"), ) if attachment: attachments.append(attachment) return attachments # Baixa um anexo de imagem do Telegram e devolve seu payload bruto. async def _download_image_attachment( self, session: aiohttp.ClientSession, file_id: str, mime_type: str, file_name: str, ) -> Dict[str, Any] | None: async with session.post(f"{self.base_url}/getFile", json={"file_id": file_id}) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em getFile para imagem do Telegram: %s", data) return None file_path = str((data.get("result") or {}).get("file_path") or "").strip() if not file_path: return None async with session.get(f"{self.file_base_url}/{file_path}") as file_response: if file_response.status >= 400: logger.warning("Falha ao baixar arquivo do Telegram: status=%s path=%s", file_response.status, file_path) return None payload = await file_response.read() if not payload: return None return { "mime_type": mime_type, "file_name": file_name, "data": payload, } # Combina legenda e texto extraido da imagem em uma mensagem unica para o fluxo. async def _build_orchestration_message_from_image( self, *, caption: str, image_attachments: List[Dict[str, Any]], ) -> str: extracted_message = await LLMService().extract_image_workflow_message( caption=caption, attachments=image_attachments, ) caption_text = str(caption or "").strip() extracted_text = str(extracted_message or "").strip() if self._is_image_analysis_failure_message(extracted_text): return extracted_text if caption_text and extracted_text: return ( "[imagem recebida no telegram]\n" f"Legenda do usuario: {caption_text}\n" f"Dados extraidos da imagem: {extracted_text}" ) if extracted_text: return f"[imagem recebida no telegram]\nDados extraidos da imagem: {extracted_text}" if caption_text: return caption_text return "Recebi uma imagem, mas preciso que voce descreva o documento ou envie uma foto mais nitida." async def main() -> None: """Inicializa servico satelite do Telegram e inicia processamento continuo.""" token = settings.telegram_bot_token if not token: raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.") _ensure_supported_runtime_configuration() lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") service = TelegramSatelliteService(token=token) try: await service.run() finally: lock_handle.close() if __name__ == "__main__": asyncio.run(main())