✨ feat(orchestration): adicionar infraestrutura de estado conversacional com Redis
Introduz a abstracao de repositorio de estado conversacional e a implementacao com Redis para suportar persistencia entre reinicios e execucao em multiplas instancias. Tambem adiciona configuracoes de ambiente, dependencias e suporte local via Docker Compose para alternar entre backend em memoria e Redis de forma explicita.main
parent
6537808963
commit
9316e3e495
@ -0,0 +1,23 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ConversationStateRepository(ABC):
|
||||
@abstractmethod
|
||||
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_user_context(self, user_id: int | None) -> dict | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def pop_entry(self, bucket: str, user_id: int | None) -> dict | None:
|
||||
pass
|
||||
@ -0,0 +1,136 @@
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from redis import Redis
|
||||
|
||||
from app.services.orchestration.conversation_state_repository import ConversationStateRepository
|
||||
|
||||
|
||||
class RedisConversationStateRepository(ConversationStateRepository):
|
||||
def __init__(
|
||||
self,
|
||||
redis_client: Redis,
|
||||
key_prefix: str = "orquestrador",
|
||||
default_ttl_minutes: int = 60,
|
||||
) -> None:
|
||||
self.redis = redis_client
|
||||
self.key_prefix = key_prefix
|
||||
self.default_ttl_minutes = default_ttl_minutes
|
||||
|
||||
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
|
||||
if user_id is None:
|
||||
return
|
||||
|
||||
now = datetime.utcnow()
|
||||
key = self._bucket_key("user_contexts", user_id)
|
||||
context = self._load(key)
|
||||
|
||||
if context and self._entry_not_expired(context, now=now):
|
||||
context["expires_at"] = now
|
||||
else:
|
||||
context = {
|
||||
"active_domain": "general",
|
||||
"generic_memory": {},
|
||||
"shared_memory": {},
|
||||
"order_queue": [],
|
||||
"pending_order_selection": None,
|
||||
"pending_switch": None,
|
||||
"last_stock_results": [],
|
||||
"selected_vehicle": None,
|
||||
"expires_at": now,
|
||||
}
|
||||
|
||||
expires_at = now.replace(microsecond=0) + self._minutes_delta(ttl_minutes)
|
||||
context["expires_at"] = expires_at
|
||||
self._save(key, context, ttl_seconds=ttl_minutes * 60)
|
||||
|
||||
def get_user_context(self, user_id: int | None) -> dict | None:
|
||||
if user_id is None:
|
||||
return None
|
||||
key = self._bucket_key("user_contexts", user_id)
|
||||
context = self._load(key)
|
||||
if not context:
|
||||
return None
|
||||
if not self._entry_not_expired(context):
|
||||
self.redis.delete(key)
|
||||
return None
|
||||
return context
|
||||
|
||||
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None:
|
||||
if user_id is None:
|
||||
return None
|
||||
key = self._bucket_key(bucket, user_id)
|
||||
entry = self._load(key)
|
||||
if not entry:
|
||||
return None
|
||||
if expire and not self._entry_not_expired(entry):
|
||||
self.redis.delete(key)
|
||||
return None
|
||||
return entry
|
||||
|
||||
def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None:
|
||||
if user_id is None:
|
||||
return
|
||||
ttl_seconds = self._ttl_from_entry(value)
|
||||
if ttl_seconds is None:
|
||||
ttl_seconds = self.default_ttl_minutes * 60
|
||||
self._save(self._bucket_key(bucket, user_id), value, ttl_seconds=ttl_seconds)
|
||||
|
||||
def pop_entry(self, bucket: str, user_id: int | None) -> dict | None:
|
||||
if user_id is None:
|
||||
return None
|
||||
key = self._bucket_key(bucket, user_id)
|
||||
entry = self._load(key)
|
||||
self.redis.delete(key)
|
||||
return entry
|
||||
|
||||
def _bucket_key(self, bucket: str, user_id: int) -> str:
|
||||
return f"{self.key_prefix}:{bucket}:{user_id}"
|
||||
|
||||
def _load(self, key: str) -> dict | None:
|
||||
raw = self.redis.get(key)
|
||||
if not raw:
|
||||
return None
|
||||
return self._deserialize(json.loads(raw))
|
||||
|
||||
def _save(self, key: str, value: dict, ttl_seconds: int) -> None:
|
||||
payload = json.dumps(self._serialize(value), ensure_ascii=True, separators=(",", ":"))
|
||||
self.redis.set(name=key, value=payload, ex=max(1, int(ttl_seconds)))
|
||||
|
||||
def _entry_not_expired(self, entry: dict, now: datetime | None = None) -> bool:
|
||||
expires_at = entry.get("expires_at")
|
||||
if not isinstance(expires_at, datetime):
|
||||
return True
|
||||
return expires_at >= (now or datetime.utcnow())
|
||||
|
||||
def _ttl_from_entry(self, entry: dict) -> int | None:
|
||||
expires_at = entry.get("expires_at")
|
||||
if not isinstance(expires_at, datetime):
|
||||
return None
|
||||
delta = int((expires_at - datetime.utcnow()).total_seconds())
|
||||
return max(1, delta)
|
||||
|
||||
def _minutes_delta(self, minutes: int):
|
||||
return timedelta(minutes=minutes)
|
||||
|
||||
def _serialize(self, value: Any):
|
||||
if isinstance(value, datetime):
|
||||
return {"__datetime__": value.isoformat()}
|
||||
if isinstance(value, dict):
|
||||
return {key: self._serialize(item) for key, item in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [self._serialize(item) for item in value]
|
||||
return value
|
||||
|
||||
def _deserialize(self, value: Any):
|
||||
if isinstance(value, dict):
|
||||
if set(value.keys()) == {"__datetime__"}:
|
||||
try:
|
||||
return datetime.fromisoformat(value["__datetime__"])
|
||||
except ValueError:
|
||||
return value["__datetime__"]
|
||||
return {key: self._deserialize(item) for key, item in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [self._deserialize(item) for item in value]
|
||||
return value
|
||||
@ -0,0 +1,39 @@
|
||||
from app.core.settings import settings
|
||||
from app.services.orchestration.conversation_state_repository import ConversationStateRepository
|
||||
from app.services.orchestration.conversation_state_store import ConversationStateStore
|
||||
|
||||
|
||||
_state_repository: ConversationStateRepository | None = None
|
||||
|
||||
|
||||
def get_conversation_state_repository() -> ConversationStateRepository:
|
||||
global _state_repository
|
||||
if _state_repository is not None:
|
||||
return _state_repository
|
||||
|
||||
backend = settings.conversation_state_backend.strip().lower()
|
||||
if backend == "redis":
|
||||
from redis import Redis
|
||||
|
||||
from app.services.orchestration.redis_state_repository import RedisConversationStateRepository
|
||||
|
||||
redis_client = Redis.from_url(
|
||||
settings.redis_url,
|
||||
decode_responses=True,
|
||||
socket_timeout=settings.redis_socket_timeout_seconds,
|
||||
)
|
||||
try:
|
||||
redis_client.ping()
|
||||
except Exception as exc:
|
||||
raise RuntimeError(
|
||||
f"Falha ao conectar no Redis configurado em REDIS_URL={settings.redis_url!r}."
|
||||
) from exc
|
||||
_state_repository = RedisConversationStateRepository(
|
||||
redis_client=redis_client,
|
||||
key_prefix=settings.redis_key_prefix,
|
||||
default_ttl_minutes=settings.conversation_state_ttl_minutes,
|
||||
)
|
||||
return _state_repository
|
||||
|
||||
_state_repository = ConversationStateStore()
|
||||
return _state_repository
|
||||
Loading…
Reference in New Issue