From 9316e3e4958f4b3418d2ec569a67d9b95de1d8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Tue, 10 Mar 2026 11:10:58 -0300 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(orchestration):=20adicionar=20?= =?UTF-8?q?infraestrutura=20de=20estado=20conversacional=20com=20Redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduz a abstracao de repositorio de estado conversacional e a implementacao com Redis para suportar persistencia entre reinicios e execucao em multiplas instancias. Tambem adiciona configuracoes de ambiente, dependencias e suporte local via Docker Compose para alternar entre backend em memoria e Redis de forma explicita. --- .env.example | 12 ++ README.md | 14 ++ app/core/settings.py | 9 ++ .../conversation_state_repository.py | 23 +++ .../orchestration/redis_state_repository.py | 136 ++++++++++++++++++ .../orchestration/state_repository_factory.py | 39 +++++ docker-compose.yml | 18 +++ requirements.txt | 1 + 8 files changed, 252 insertions(+) create mode 100644 app/services/orchestration/conversation_state_repository.py create mode 100644 app/services/orchestration/redis_state_repository.py create mode 100644 app/services/orchestration/state_repository_factory.py diff --git a/.env.example b/.env.example index 0ce4b4d..37526c6 100644 --- a/.env.example +++ b/.env.example @@ -49,3 +49,15 @@ DEBUG=true TELEGRAM_BOT_TOKEN= TELEGRAM_POLLING_TIMEOUT=30 TELEGRAM_REQUEST_TIMEOUT=45 + +# ============================================ +# ESTADO CONVERSACIONAL +# ============================================ +# Valores: memory, redis +CONVERSATION_STATE_BACKEND=memory +CONVERSATION_STATE_TTL_MINUTES=60 + +# Redis usado quando CONVERSATION_STATE_BACKEND=redis +REDIS_URL=redis://127.0.0.1:6379/0 +REDIS_KEY_PREFIX=orquestrador +REDIS_SOCKET_TIMEOUT_SECONDS=5 diff --git a/README.md b/README.md index eebd950..9614bb8 100644 --- a/README.md +++ b/README.md @@ -237,6 +237,12 @@ uvicorn app.main:app --reload docker-compose up ``` +Para testar estado conversacional em Redis com Docker Compose: + +```bash +docker-compose up redis app +``` + Arquivos úteis: - `TEST_CASES.md` - `DEPLOY_SERVIDOR.md` @@ -279,6 +285,14 @@ Principais variáveis: - `TELEGRAM_POLLING_TIMEOUT` - `TELEGRAM_REQUEST_TIMEOUT` +### Estado Conversacional + +- `CONVERSATION_STATE_BACKEND` (`memory` ou `redis`) +- `CONVERSATION_STATE_TTL_MINUTES` +- `REDIS_URL` +- `REDIS_KEY_PREFIX` +- `REDIS_SOCKET_TIMEOUT_SECONDS` + ## Telegram Satellite Service Existe um serviço satélite para atendimento via Telegram em long polling. diff --git a/app/core/settings.py b/app/core/settings.py index df9173b..6ab82ee 100644 --- a/app/core/settings.py +++ b/app/core/settings.py @@ -40,6 +40,15 @@ class Settings(BaseSettings): telegram_polling_timeout: int = 30 telegram_request_timeout: int = 45 + # Conversation state backend + conversation_state_backend: str = "memory" + conversation_state_ttl_minutes: int = 60 + + # Redis conversation state + redis_url: str = "redis://127.0.0.1:6379/0" + redis_key_prefix: str = "orquestrador" + redis_socket_timeout_seconds: int = 5 + class Config: env_file = ".env" extra = "ignore" diff --git a/app/services/orchestration/conversation_state_repository.py b/app/services/orchestration/conversation_state_repository.py new file mode 100644 index 0000000..8293f12 --- /dev/null +++ b/app/services/orchestration/conversation_state_repository.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod + + +class ConversationStateRepository(ABC): + @abstractmethod + def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: + pass + + @abstractmethod + def get_user_context(self, user_id: int | None) -> dict | None: + pass + + @abstractmethod + def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: + pass + + @abstractmethod + def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None: + pass + + @abstractmethod + def pop_entry(self, bucket: str, user_id: int | None) -> dict | None: + pass diff --git a/app/services/orchestration/redis_state_repository.py b/app/services/orchestration/redis_state_repository.py new file mode 100644 index 0000000..532b449 --- /dev/null +++ b/app/services/orchestration/redis_state_repository.py @@ -0,0 +1,136 @@ +import json +from datetime import datetime, timedelta +from typing import Any + +from redis import Redis + +from app.services.orchestration.conversation_state_repository import ConversationStateRepository + + +class RedisConversationStateRepository(ConversationStateRepository): + def __init__( + self, + redis_client: Redis, + key_prefix: str = "orquestrador", + default_ttl_minutes: int = 60, + ) -> None: + self.redis = redis_client + self.key_prefix = key_prefix + self.default_ttl_minutes = default_ttl_minutes + + def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: + if user_id is None: + return + + now = datetime.utcnow() + key = self._bucket_key("user_contexts", user_id) + context = self._load(key) + + if context and self._entry_not_expired(context, now=now): + context["expires_at"] = now + else: + context = { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "expires_at": now, + } + + expires_at = now.replace(microsecond=0) + self._minutes_delta(ttl_minutes) + context["expires_at"] = expires_at + self._save(key, context, ttl_seconds=ttl_minutes * 60) + + def get_user_context(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + key = self._bucket_key("user_contexts", user_id) + context = self._load(key) + if not context: + return None + if not self._entry_not_expired(context): + self.redis.delete(key) + return None + return context + + def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: + if user_id is None: + return None + key = self._bucket_key(bucket, user_id) + entry = self._load(key) + if not entry: + return None + if expire and not self._entry_not_expired(entry): + self.redis.delete(key) + return None + return entry + + def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None: + if user_id is None: + return + ttl_seconds = self._ttl_from_entry(value) + if ttl_seconds is None: + ttl_seconds = self.default_ttl_minutes * 60 + self._save(self._bucket_key(bucket, user_id), value, ttl_seconds=ttl_seconds) + + def pop_entry(self, bucket: str, user_id: int | None) -> dict | None: + if user_id is None: + return None + key = self._bucket_key(bucket, user_id) + entry = self._load(key) + self.redis.delete(key) + return entry + + def _bucket_key(self, bucket: str, user_id: int) -> str: + return f"{self.key_prefix}:{bucket}:{user_id}" + + def _load(self, key: str) -> dict | None: + raw = self.redis.get(key) + if not raw: + return None + return self._deserialize(json.loads(raw)) + + def _save(self, key: str, value: dict, ttl_seconds: int) -> None: + payload = json.dumps(self._serialize(value), ensure_ascii=True, separators=(",", ":")) + self.redis.set(name=key, value=payload, ex=max(1, int(ttl_seconds))) + + def _entry_not_expired(self, entry: dict, now: datetime | None = None) -> bool: + expires_at = entry.get("expires_at") + if not isinstance(expires_at, datetime): + return True + return expires_at >= (now or datetime.utcnow()) + + def _ttl_from_entry(self, entry: dict) -> int | None: + expires_at = entry.get("expires_at") + if not isinstance(expires_at, datetime): + return None + delta = int((expires_at - datetime.utcnow()).total_seconds()) + return max(1, delta) + + def _minutes_delta(self, minutes: int): + return timedelta(minutes=minutes) + + def _serialize(self, value: Any): + if isinstance(value, datetime): + return {"__datetime__": value.isoformat()} + if isinstance(value, dict): + return {key: self._serialize(item) for key, item in value.items()} + if isinstance(value, list): + return [self._serialize(item) for item in value] + return value + + def _deserialize(self, value: Any): + if isinstance(value, dict): + if set(value.keys()) == {"__datetime__"}: + try: + return datetime.fromisoformat(value["__datetime__"]) + except ValueError: + return value["__datetime__"] + return {key: self._deserialize(item) for key, item in value.items()} + if isinstance(value, list): + return [self._deserialize(item) for item in value] + return value diff --git a/app/services/orchestration/state_repository_factory.py b/app/services/orchestration/state_repository_factory.py new file mode 100644 index 0000000..346d84b --- /dev/null +++ b/app/services/orchestration/state_repository_factory.py @@ -0,0 +1,39 @@ +from app.core.settings import settings +from app.services.orchestration.conversation_state_repository import ConversationStateRepository +from app.services.orchestration.conversation_state_store import ConversationStateStore + + +_state_repository: ConversationStateRepository | None = None + + +def get_conversation_state_repository() -> ConversationStateRepository: + global _state_repository + if _state_repository is not None: + return _state_repository + + backend = settings.conversation_state_backend.strip().lower() + if backend == "redis": + from redis import Redis + + from app.services.orchestration.redis_state_repository import RedisConversationStateRepository + + redis_client = Redis.from_url( + settings.redis_url, + decode_responses=True, + socket_timeout=settings.redis_socket_timeout_seconds, + ) + try: + redis_client.ping() + except Exception as exc: + raise RuntimeError( + f"Falha ao conectar no Redis configurado em REDIS_URL={settings.redis_url!r}." + ) from exc + _state_repository = RedisConversationStateRepository( + redis_client=redis_client, + key_prefix=settings.redis_key_prefix, + default_ttl_minutes=settings.conversation_state_ttl_minutes, + ) + return _state_repository + + _state_repository = ConversationStateStore() + return _state_repository diff --git a/docker-compose.yml b/docker-compose.yml index 7ebad16..f9ab309 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,17 @@ services: timeout: 5s retries: 5 + redis: + image: redis:7-alpine + container_name: orquestrador_redis + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + app: build: . container_name: orquestrador_app @@ -34,9 +45,16 @@ services: MOCKAROO_API_KEY: ${MOCKAROO_API_KEY} MOCKAROO_BASE_URL: ${MOCKAROO_BASE_URL:-https://my.api.mockaroo.com} USE_MOCKAROO_WRITES: ${USE_MOCKAROO_WRITES:-false} + CONVERSATION_STATE_BACKEND: ${CONVERSATION_STATE_BACKEND:-redis} + CONVERSATION_STATE_TTL_MINUTES: ${CONVERSATION_STATE_TTL_MINUTES:-60} + REDIS_URL: ${REDIS_URL:-redis://redis:6379/0} + REDIS_KEY_PREFIX: ${REDIS_KEY_PREFIX:-orquestrador} + REDIS_SOCKET_TIMEOUT_SECONDS: ${REDIS_SOCKET_TIMEOUT_SECONDS:-5} depends_on: postgres: condition: service_healthy + redis: + condition: service_healthy volumes: - .:/app command: uvicorn app.main:app --host 0.0.0.0 --port 8080 --reload diff --git a/requirements.txt b/requirements.txt index 3bacd60..b8d938c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -58,6 +58,7 @@ python-dateutil==2.9.0.post0 python-dotenv==1.2.1 PyYAML==6.0.3 requests==2.32.5 +redis==6.4.0 shapely==2.1.2 six==1.17.0 sniffio==1.3.1