You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/integrations/telegram_satellite_service.py

855 lines
33 KiB
Python

import asyncio
import logging
import os
import tempfile
from datetime import timedelta
from time import perf_counter
from typing import Any, Dict, List
import aiohttp
from fastapi import HTTPException
from app.core.settings import settings
from app.core.time_utils import utc_now
from app.db.database import SessionLocal
from app.db.mock_database import SessionMockLocal
from app.services.ai.llm_service import (
IMAGE_ANALYSIS_BLOCKING_PREFIXES,
LLMService,
)
from app.services.orchestration.conversation_state_repository import ConversationStateRepository
from app.services.orchestration.orquestrador_service import OrquestradorService
from app.services.orchestration.sensitive_data import mask_sensitive_payload
from app.services.orchestration.state_repository_factory import get_conversation_state_repository
from app.services.user.user_service import UserService
logger = logging.getLogger(__name__)
TELEGRAM_MESSAGE_SAFE_LIMIT = 3800
TELEGRAM_MAX_CONCURRENT_CHATS = 8
TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages"
TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100
TELEGRAM_RUNTIME_BUCKET = "telegram_runtime_state"
TELEGRAM_RUNTIME_OWNER_ID = 0
TELEGRAM_RUNTIME_CURSOR_TTL_DAYS = 30
TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS = 3
TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS = 1.0
def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]:
normalized = str(text or "").strip()
if not normalized:
return [""]
if len(normalized) <= limit:
return [normalized]
chunks: List[str] = []
current = ""
def flush_current() -> None:
nonlocal current
if current:
chunks.append(current)
current = ""
paragraphs = normalized.split("\n\n")
for paragraph in paragraphs:
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
if len(candidate) <= limit:
current = candidate
continue
flush_current()
if len(paragraph) <= limit:
current = paragraph
continue
line_buffer = ""
for line in paragraph.split("\n"):
line_candidate = line if not line_buffer else f"{line_buffer}\n{line}"
if len(line_candidate) <= limit:
line_buffer = line_candidate
continue
if line_buffer:
chunks.append(line_buffer)
line_buffer = ""
while len(line) > limit:
chunks.append(line[:limit])
line = line[limit:]
line_buffer = line
if line_buffer:
current = line_buffer
flush_current()
return chunks or [normalized[:limit]]
def _ensure_supported_runtime_configuration() -> None:
"""
Em producao, o satelite nao deve operar com estado conversacional apenas em memoria,
porque isso quebra continuidade entre reinicios e instancias.
"""
if settings.environment == "production" and settings.conversation_state_backend == "memory":
raise RuntimeError(
"Telegram satellite em producao exige conversation_state_backend diferente de memory."
)
def _acquire_single_instance_lock(lock_name: str):
"""
Garante apenas uma instancia local do satelite por lock de arquivo.
Retorna o handle do arquivo para manter o lock vivo.
"""
lock_path = os.path.join(tempfile.gettempdir(), lock_name)
handle = open(lock_path, "a+")
try:
import msvcrt
try:
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
except ImportError:
import fcntl
try:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
class TelegramSatelliteService:
"""
Interface satelite para Telegram.
Processa mensagens direto no OrquestradorService e publica respostas no chat.
"""
def __init__(
self,
token: str,
state_repository: ConversationStateRepository | None = None,
):
"""Configura cliente Telegram com URL base e timeouts padrao."""
self.base_url = f"https://api.telegram.org/bot{token}"
self.file_base_url = f"https://api.telegram.org/file/bot{token}"
self.polling_timeout = settings.telegram_polling_timeout
self.request_timeout = settings.telegram_request_timeout
self.state = state_repository or get_conversation_state_repository()
self._last_update_id = -1
self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {}
self._chat_workers: dict[int, asyncio.Task[None]] = {}
self._chat_workers_lock = asyncio.Lock()
self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS)
def _log_telegram_event(self, event: str, **payload) -> None:
logger.info("telegram_event=%s payload=%s", event, mask_sensitive_payload(payload))
def _elapsed_ms(self, started_at: float) -> float:
return round((perf_counter() - started_at) * 1000, 2)
async def run(self) -> None:
"""Inicia loop de long polling para consumir atualizacoes do bot."""
logger.info("Telegram satellite iniciado com long polling.")
await self._warmup_llm()
offset = None
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
offset = await self._initialize_offset(session=session)
while True:
updates = await self._get_updates(session=session, offset=offset)
for update in updates:
update_id = update.get("update_id")
if not isinstance(update_id, int):
continue
if update_id <= self._last_update_id:
continue
self._last_update_id = update_id
offset = update_id + 1
await self._schedule_update_processing(session=session, update=update)
finally:
await self._shutdown_chat_workers()
def _extract_chat_id(self, update: Dict[str, Any]) -> int | None:
message = update.get("message", {})
chat = message.get("chat", {})
chat_id = chat.get("id")
return chat_id if isinstance(chat_id, int) else None
def _build_update_idempotency_key(self, update: Dict[str, Any]) -> str | None:
chat_id = self._extract_chat_id(update)
message = update.get("message", {})
message_id = message.get("message_id")
if isinstance(chat_id, int) and isinstance(message_id, int):
return f"telegram:message:{chat_id}:{message_id}"
update_id = update.get("update_id")
if isinstance(update_id, int):
return f"telegram:update:{update_id}"
return None
def _idempotency_owner_id(self, update: Dict[str, Any]) -> int | None:
chat_id = self._extract_chat_id(update)
if isinstance(chat_id, int):
return chat_id
update_id = update.get("update_id")
return update_id if isinstance(update_id, int) else None
def _get_processed_update(self, update: Dict[str, Any]) -> dict | None:
owner_id = self._idempotency_owner_id(update)
idempotency_key = self._build_update_idempotency_key(update)
if owner_id is None or not idempotency_key:
return None
entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True)
if not isinstance(entry, dict):
return None
items = entry.get("items")
if not isinstance(items, dict):
return None
payload = items.get(idempotency_key)
return payload if isinstance(payload, dict) else None
def _store_processed_update(self, update: Dict[str, Any], answer: str) -> None:
owner_id = self._idempotency_owner_id(update)
idempotency_key = self._build_update_idempotency_key(update)
if owner_id is None or not idempotency_key:
return
now = utc_now().replace(microsecond=0)
expires_at = now + timedelta(minutes=settings.conversation_state_ttl_minutes)
entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) or {}
items = dict(entry.get("items") or {})
items[idempotency_key] = {
"answer": str(answer or ""),
"processed_at": now,
}
if len(items) > TELEGRAM_IDEMPOTENCY_CACHE_LIMIT:
ordered = sorted(
items.items(),
key=lambda item: item[1].get("processed_at") or now,
reverse=True,
)
items = dict(ordered[:TELEGRAM_IDEMPOTENCY_CACHE_LIMIT])
self.state.set_entry(
TELEGRAM_IDEMPOTENCY_BUCKET,
owner_id,
{
"items": items,
"expires_at": expires_at,
},
)
def _get_runtime_state(self) -> dict:
entry = self.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
return entry if isinstance(entry, dict) else {}
def _persist_last_processed_update_id(self, update_id: int) -> None:
if update_id < 0:
return
entry = self._get_runtime_state()
current_last_update_id = entry.get("last_update_id")
if isinstance(current_last_update_id, int) and current_last_update_id >= update_id:
self._last_update_id = max(self._last_update_id, current_last_update_id)
return
now = utc_now().replace(microsecond=0)
expires_at = now + timedelta(days=TELEGRAM_RUNTIME_CURSOR_TTL_DAYS)
self.state.set_entry(
TELEGRAM_RUNTIME_BUCKET,
TELEGRAM_RUNTIME_OWNER_ID,
{
"last_update_id": update_id,
"updated_at": now,
"expires_at": expires_at,
},
)
self._last_update_id = max(self._last_update_id, update_id)
async def _schedule_update_processing(
self,
session: aiohttp.ClientSession,
update: Dict[str, Any],
) -> None:
chat_id = self._extract_chat_id(update)
if chat_id is None:
async with self._chat_processing_semaphore:
await self._handle_update(session=session, update=update)
return
async with self._chat_workers_lock:
queue = self._chat_queues.get(chat_id)
if queue is None:
queue = asyncio.Queue()
self._chat_queues[chat_id] = queue
update["_orq_enqueued_at_perf"] = perf_counter()
queue.put_nowait(update)
queue_size = queue.qsize()
worker = self._chat_workers.get(chat_id)
worker_active = worker is not None and not worker.done()
self._log_telegram_event(
"chat_update_enqueued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_size=queue_size,
worker_active=worker_active,
)
if worker is None or worker.done():
self._chat_workers[chat_id] = asyncio.create_task(
self._run_chat_worker(
chat_id=chat_id,
session=session,
queue=queue,
)
)
async def _run_chat_worker(
self,
*,
chat_id: int,
session: aiohttp.ClientSession,
queue: asyncio.Queue[Dict[str, Any]],
) -> None:
current_task = asyncio.current_task()
try:
while True:
update = await queue.get()
queued_at_perf = update.get("_orq_enqueued_at_perf")
queue_wait_ms = (
round((perf_counter() - queued_at_perf) * 1000, 2)
if isinstance(queued_at_perf, (int, float))
else None
)
self._log_telegram_event(
"chat_update_dequeued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_wait_ms=queue_wait_ms,
queue_size=queue.qsize(),
)
try:
async with self._chat_processing_semaphore:
await self._handle_update(session=session, update=update)
finally:
queue.task_done()
async with self._chat_workers_lock:
if queue.empty():
if self._chat_workers.get(chat_id) is current_task:
self._chat_workers.pop(chat_id, None)
if self._chat_queues.get(chat_id) is queue:
self._chat_queues.pop(chat_id, None)
return
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Falha inesperada no worker do chat %s.", chat_id)
finally:
async with self._chat_workers_lock:
if self._chat_workers.get(chat_id) is current_task:
self._chat_workers.pop(chat_id, None)
if self._chat_queues.get(chat_id) is queue and queue.empty():
self._chat_queues.pop(chat_id, None)
async def _shutdown_chat_workers(self) -> None:
async with self._chat_workers_lock:
workers = list(self._chat_workers.values())
self._chat_workers = {}
self._chat_queues = {}
for worker in workers:
worker.cancel()
if workers:
await asyncio.gather(*workers, return_exceptions=True)
async def _warmup_llm(self) -> None:
"""Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario."""
try:
await LLMService().warmup()
logger.info("Warmup de LLM concluido no Telegram satellite.")
except Exception:
logger.exception("Falha no warmup de LLM do Telegram satellite.")
async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None:
"""
Retoma o polling a partir do ultimo update persistido.
Sem cursor salvo, faz um bootstrap conservador e registra o ponto inicial.
"""
runtime_state = self._get_runtime_state()
last_update_id = runtime_state.get("last_update_id")
if isinstance(last_update_id, int) and last_update_id >= 0:
self._last_update_id = last_update_id
logger.info("Retomando polling do Telegram a partir do update_id persistido %s.", last_update_id)
return last_update_id + 1
payload: Dict[str, Any] = {
"timeout": 0,
"limit": 100,
"allowed_updates": ["message"],
}
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha ao inicializar offset no Telegram: %s", data)
return None
updates = data.get("result", [])
if not updates:
return None
last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int))
if last_id < 0:
return None
self._persist_last_processed_update_id(last_id)
logger.info(
"Bootstrap inicial do Telegram sem cursor persistido: %s update(s) anteriores ignorados.",
len(updates),
)
return last_id + 1
async def _get_updates(
self,
session: aiohttp.ClientSession,
offset: int | None,
) -> List[Dict[str, Any]]:
"""Busca novas mensagens no Telegram a partir do offset informado."""
payload: Dict[str, Any] = {
"timeout": self.polling_timeout,
"limit": 100,
"allowed_updates": ["message"],
}
if offset is not None:
payload["offset"] = offset
started_at = perf_counter()
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
self._log_telegram_event(
"get_updates_failed",
offset=offset,
elapsed_ms=self._elapsed_ms(started_at),
response=data,
)
logger.warning("Falha em getUpdates: %s", data)
return []
updates = data.get("result", [])
if updates:
self._log_telegram_event(
"get_updates_completed",
offset=offset,
updates_count=len(updates),
elapsed_ms=self._elapsed_ms(started_at),
)
return updates
async def _handle_update(
self,
session: aiohttp.ClientSession,
update: Dict[str, Any],
) -> None:
"""Processa uma atualizacao recebida e envia resposta ao chat."""
started_at = perf_counter()
message = update.get("message", {})
text = message.get("text") or message.get("caption")
chat = message.get("chat", {})
chat_id = chat.get("id")
sender = message.get("from", {})
update_id = update.get("update_id")
if not chat_id:
return
cached_update = self._get_processed_update(update)
if cached_update:
cached_answer = str(cached_update.get("answer") or "").strip()
if cached_answer:
logger.info(
"Reutilizando resposta em reentrega do Telegram. chat_id=%s update_key=%s",
chat_id,
self._build_update_idempotency_key(update),
)
await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=True,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(cached_answer),
image_count=0,
)
return
image_attachments = await self._extract_image_attachments(session=session, message=message)
image_count = len(image_attachments)
if not text and not image_attachments:
self._log_telegram_event(
"update_ignored",
update_id=update_id,
chat_id=chat_id,
reason="empty_text_and_no_image",
elapsed_ms=self._elapsed_ms(started_at),
)
return
try:
answer = await self._process_message(
text=text or "",
sender=sender,
chat_id=chat_id,
image_attachments=image_attachments,
)
except HTTPException as exc:
logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", mask_sensitive_payload(exc.detail))
answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada."
except Exception:
logger.exception("Erro ao processar mensagem do Telegram.")
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
self._store_processed_update(update=update, answer=answer)
if isinstance(update_id, int):
self._persist_last_processed_update_id(update_id)
await self._deliver_message(session=session, chat_id=chat_id, text=answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=False,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(str(answer or "")),
image_count=image_count,
)
async def _deliver_message(
self,
*,
session: aiohttp.ClientSession,
chat_id: int,
text: str,
) -> None:
"""Entrega a resposta ao Telegram sem deixar falhas de transporte derrubarem o worker."""
try:
await self._send_message(session=session, chat_id=chat_id, text=text)
except Exception:
logger.exception("Falha inesperada ao entregar mensagem ao Telegram. chat_id=%s", chat_id)
async def _send_message(
self,
session: aiohttp.ClientSession,
chat_id: int,
text: str,
) -> None:
"""Envia mensagem de texto para o chat informado no Telegram."""
chunks = _split_telegram_text(text)
started_at = perf_counter()
total_attempts = 0
successful_chunks = 0
for chunk_index, chunk in enumerate(chunks, start=1):
payload = {
"chat_id": chat_id,
"text": chunk,
}
for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1):
total_attempts += 1
try:
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em sendMessage: %s", data)
else:
successful_chunks += 1
break
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc:
if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS:
logger.warning(
"Falha de transporte ao enviar mensagem ao Telegram apos %s tentativa(s). chat_id=%s chunk=%s erro=%s",
TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS,
chat_id,
chunk_index,
exc,
)
break
delay_seconds = TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS * attempt
logger.warning(
"Falha temporaria ao enviar mensagem ao Telegram. chat_id=%s chunk=%s tentativa=%s/%s retry_em=%.1fs erro=%s",
chat_id,
chunk_index,
attempt,
TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS,
delay_seconds,
exc,
)
await asyncio.sleep(delay_seconds)
self._log_telegram_event(
"send_message_completed",
chat_id=chat_id,
chunk_count=len(chunks),
successful_chunks=successful_chunks,
total_attempts=total_attempts,
elapsed_ms=self._elapsed_ms(started_at),
text_chars=len(str(text or "")),
)
# Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver.
async def _process_message(
self,
text: str,
sender: Dict[str, Any],
chat_id: int,
image_attachments: List[Dict[str, Any]] | None = None,
) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
started_at = perf_counter()
message_text = text
image_processing_ms = None
if image_attachments:
image_started_at = perf_counter()
image_message = await self._build_orchestration_message_from_image(
caption=text,
image_attachments=image_attachments,
)
image_processing_ms = self._elapsed_ms(image_started_at)
if self._is_image_analysis_failure_message(image_message):
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_offloaded=False,
used_image_attachments=True,
input_chars=len(str(text or "")),
response_chars=len(image_message),
)
return image_message
message_text = image_message
orchestration_started_at = perf_counter()
answer = await asyncio.to_thread(
self._run_blocking_orchestration_turn,
message_text=message_text,
sender=sender,
chat_id=chat_id,
)
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_ms=self._elapsed_ms(orchestration_started_at),
orchestration_offloaded=True,
used_image_attachments=bool(image_attachments),
input_chars=len(message_text),
response_chars=len(str(answer or "")),
)
return answer
def _run_blocking_orchestration_turn(
self,
*,
message_text: str,
sender: Dict[str, Any],
chat_id: int,
) -> str:
"""
Executa o turno do orquestrador fora do loop async principal.
Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes.
"""
started_at = perf_counter()
tools_db = SessionLocal()
mock_db = SessionMockLocal()
try:
user_resolution_started_at = perf_counter()
user_service = UserService(mock_db)
external_id = str(sender.get("id") or chat_id)
first_name = (sender.get("first_name") or "").strip()
last_name = (sender.get("last_name") or "").strip()
display_name = f"{first_name} {last_name}".strip() or None
username = sender.get("username")
user = user_service.get_or_create(
channel="telegram",
external_id=external_id,
name=display_name,
username=username,
)
user_resolution_ms = self._elapsed_ms(user_resolution_started_at)
service_init_started_at = perf_counter()
service = OrquestradorService(
tools_db,
state_repository=self.state,
)
service_init_ms = self._elapsed_ms(service_init_started_at)
orchestration_started_at = perf_counter()
response = asyncio.run(service.handle_message(message=message_text, user_id=user.id))
orchestration_ms = self._elapsed_ms(orchestration_started_at)
self._log_telegram_event(
"blocking_turn_completed",
chat_id=chat_id,
user_id=user.id,
elapsed_ms=self._elapsed_ms(started_at),
user_resolution_ms=user_resolution_ms,
service_init_ms=service_init_ms,
orchestration_ms=orchestration_ms,
input_chars=len(message_text),
response_chars=len(str(response or "")),
)
return response
finally:
tools_db.close()
mock_db.close()
# Filtra documentos do Telegram para aceitar apenas imagens.
def _is_supported_image_document(self, document: Dict[str, Any]) -> bool:
mime_type = str((document or {}).get("mime_type") or "").strip().lower()
return mime_type.startswith("image/")
# Reconhece a resposta padrao quando a leitura da imagem falha.
def _is_image_analysis_failure_message(self, text: str) -> bool:
normalized = str(text or "").strip().lower()
return any(normalized.startswith(prefix) for prefix in IMAGE_ANALYSIS_BLOCKING_PREFIXES)
# Extrai a melhor foto e documentos de imagem anexados na mensagem.
async def _extract_image_attachments(
self,
session: aiohttp.ClientSession,
message: Dict[str, Any],
) -> List[Dict[str, Any]]:
attachments: List[Dict[str, Any]] = []
photos = message.get("photo") or []
if isinstance(photos, list) and photos:
best_photo = max(
(item for item in photos if isinstance(item, dict) and item.get("file_id")),
key=lambda item: int(item.get("file_size") or 0),
default=None,
)
if best_photo is not None:
attachment = await self._download_image_attachment(
session=session,
file_id=str(best_photo.get("file_id")),
mime_type="image/jpeg",
file_name="telegram_photo.jpg",
)
if attachment:
attachments.append(attachment)
document = message.get("document") or {}
if isinstance(document, dict) and document.get("file_id") and self._is_supported_image_document(document):
attachment = await self._download_image_attachment(
session=session,
file_id=str(document.get("file_id")),
mime_type=str(document.get("mime_type") or "image/jpeg"),
file_name=str(document.get("file_name") or "telegram_image"),
)
if attachment:
attachments.append(attachment)
return attachments
# Baixa um anexo de imagem do Telegram e devolve seu payload bruto.
async def _download_image_attachment(
self,
session: aiohttp.ClientSession,
file_id: str,
mime_type: str,
file_name: str,
) -> Dict[str, Any] | None:
async with session.post(f"{self.base_url}/getFile", json={"file_id": file_id}) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em getFile para imagem do Telegram: %s", data)
return None
file_path = str((data.get("result") or {}).get("file_path") or "").strip()
if not file_path:
return None
async with session.get(f"{self.file_base_url}/{file_path}") as file_response:
if file_response.status >= 400:
logger.warning("Falha ao baixar arquivo do Telegram: status=%s path=%s", file_response.status, file_path)
return None
payload = await file_response.read()
if not payload:
return None
return {
"mime_type": mime_type,
"file_name": file_name,
"data": payload,
}
# Combina legenda e texto extraido da imagem em uma mensagem unica para o fluxo.
async def _build_orchestration_message_from_image(
self,
*,
caption: str,
image_attachments: List[Dict[str, Any]],
) -> str:
extracted_message = await LLMService().extract_image_workflow_message(
caption=caption,
attachments=image_attachments,
)
caption_text = str(caption or "").strip()
extracted_text = str(extracted_message or "").strip()
if self._is_image_analysis_failure_message(extracted_text):
return extracted_text
if caption_text and extracted_text:
return (
"[imagem recebida no telegram]\n"
f"Legenda do usuario: {caption_text}\n"
f"Dados extraidos da imagem: {extracted_text}"
)
if extracted_text:
return f"[imagem recebida no telegram]\nDados extraidos da imagem: {extracted_text}"
if caption_text:
return caption_text
return "Recebi uma imagem, mas preciso que voce descreva o documento ou envie uma foto mais nitida."
async def main() -> None:
"""Inicializa servico satelite do Telegram e inicia processamento continuo."""
token = settings.telegram_bot_token
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.")
_ensure_supported_runtime_configuration()
lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
service = TelegramSatelliteService(token=token)
try:
await service.run()
finally:
lock_handle.close()
if __name__ == "__main__":
asyncio.run(main())