🧾 feat(audit): persistir historico de conversas no banco mock
- adiciona a tabela conversation_turns ao schema mock com request_id, conversation_id, user_id, canal, mensagem, resposta, status, intent, domain, action, tool, erro e latencia por turno - integra o OrquestradorService para registrar historico tanto em turnos concluidos quanto em falhas, aproveitando o trace do turno e os metadados de execucao - cria o ConversationHistoryService com persistencia e consulta list_turns com filtros simples para auditoria interna - inclui cobertura para persistencia do historico, leitura filtrada e registro de turnos completed/failed na camada de orquestracaomain
parent
9b6b2a643b
commit
65cd775b2a
@ -0,0 +1,170 @@
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from app.db.mock_database import SessionMockLocal
|
||||
from app.db.mock_models import ConversationTurn, User
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConversationHistoryService:
|
||||
"""Persiste uma trilha simples de auditoria por turno no banco mock."""
|
||||
|
||||
def record_turn(
|
||||
self,
|
||||
*,
|
||||
request_id: str,
|
||||
conversation_id: str,
|
||||
user_id: int | None,
|
||||
user_message: str,
|
||||
assistant_response: str | None,
|
||||
turn_status: str,
|
||||
intent: str | None = None,
|
||||
domain: str | None = None,
|
||||
action: str | None = None,
|
||||
tool_name: str | None = None,
|
||||
tool_arguments: dict[str, Any] | None = None,
|
||||
error_detail: str | None = None,
|
||||
started_at: datetime | None = None,
|
||||
completed_at: datetime | None = None,
|
||||
elapsed_ms: float | None = None,
|
||||
) -> None:
|
||||
db = SessionMockLocal()
|
||||
try:
|
||||
channel = None
|
||||
external_id = None
|
||||
username = None
|
||||
if user_id is not None:
|
||||
user = db.query(User).filter(User.id == user_id).first()
|
||||
if user:
|
||||
channel = user.channel
|
||||
external_id = user.external_id
|
||||
username = user.username
|
||||
|
||||
payload = {
|
||||
"request_id": str(request_id or ""),
|
||||
"conversation_id": str(conversation_id or "anonymous"),
|
||||
"user_id": user_id,
|
||||
"channel": channel,
|
||||
"external_id": external_id,
|
||||
"username": username,
|
||||
"user_message": str(user_message or ""),
|
||||
"assistant_response": assistant_response,
|
||||
"turn_status": str(turn_status or "completed"),
|
||||
"intent": self._clean_text(intent),
|
||||
"domain": self._clean_text(domain),
|
||||
"action": self._clean_text(action),
|
||||
"tool_name": self._clean_text(tool_name),
|
||||
"tool_arguments": self._serialize_json(tool_arguments),
|
||||
"error_detail": error_detail,
|
||||
}
|
||||
if started_at is not None:
|
||||
payload["started_at"] = started_at
|
||||
if completed_at is not None:
|
||||
payload["completed_at"] = completed_at
|
||||
if elapsed_ms is not None:
|
||||
payload["elapsed_ms"] = elapsed_ms
|
||||
|
||||
record = ConversationTurn(**payload)
|
||||
db.add(record)
|
||||
db.commit()
|
||||
except Exception:
|
||||
db.rollback()
|
||||
logger.exception(
|
||||
"Falha ao persistir historico de conversa.",
|
||||
extra={
|
||||
"request_id": request_id,
|
||||
"conversation_id": conversation_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def list_turns(
|
||||
self,
|
||||
*,
|
||||
user_id: int | None = None,
|
||||
conversation_id: str | None = None,
|
||||
request_id: str | None = None,
|
||||
turn_status: str | None = None,
|
||||
limit: int = 20,
|
||||
) -> list[dict[str, Any]]:
|
||||
db = SessionMockLocal()
|
||||
try:
|
||||
query = db.query(ConversationTurn)
|
||||
|
||||
if user_id is not None:
|
||||
query = query.filter(ConversationTurn.user_id == user_id)
|
||||
|
||||
normalized_conversation_id = self._clean_text(conversation_id)
|
||||
if normalized_conversation_id:
|
||||
query = query.filter(ConversationTurn.conversation_id == normalized_conversation_id)
|
||||
|
||||
normalized_request_id = self._clean_text(request_id)
|
||||
if normalized_request_id:
|
||||
query = query.filter(ConversationTurn.request_id == normalized_request_id)
|
||||
|
||||
normalized_turn_status = self._clean_text(turn_status)
|
||||
if normalized_turn_status:
|
||||
query = query.filter(ConversationTurn.turn_status == normalized_turn_status)
|
||||
|
||||
safe_limit = max(1, min(int(limit or 20), 100))
|
||||
rows = (
|
||||
query.order_by(ConversationTurn.started_at.desc(), ConversationTurn.id.desc())
|
||||
.limit(safe_limit)
|
||||
.all()
|
||||
)
|
||||
return [self._serialize_row(row) for row in rows]
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
def _clean_text(self, value: str | None) -> str | None:
|
||||
text = str(value or "").strip()
|
||||
return text or None
|
||||
|
||||
def _serialize_row(self, row: ConversationTurn) -> dict[str, Any]:
|
||||
return {
|
||||
"id": row.id,
|
||||
"request_id": row.request_id,
|
||||
"conversation_id": row.conversation_id,
|
||||
"user_id": row.user_id,
|
||||
"channel": row.channel,
|
||||
"external_id": row.external_id,
|
||||
"username": row.username,
|
||||
"user_message": row.user_message,
|
||||
"assistant_response": row.assistant_response,
|
||||
"turn_status": row.turn_status,
|
||||
"intent": row.intent,
|
||||
"domain": row.domain,
|
||||
"action": row.action,
|
||||
"tool_name": row.tool_name,
|
||||
"tool_arguments": self._deserialize_json(row.tool_arguments),
|
||||
"error_detail": row.error_detail,
|
||||
"elapsed_ms": row.elapsed_ms,
|
||||
"started_at": row.started_at.isoformat() if row.started_at else None,
|
||||
"completed_at": row.completed_at.isoformat() if row.completed_at else None,
|
||||
}
|
||||
|
||||
def _deserialize_json(self, value: str | None) -> dict[str, Any] | None:
|
||||
text = str(value or "").strip()
|
||||
if not text:
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(text)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return payload if isinstance(payload, dict) else None
|
||||
|
||||
def _serialize_json(self, value: dict[str, Any] | None) -> str | None:
|
||||
if not isinstance(value, dict) or not value:
|
||||
return None
|
||||
return json.dumps(
|
||||
value,
|
||||
ensure_ascii=True,
|
||||
separators=(",", ":"),
|
||||
default=str,
|
||||
)
|
||||
@ -0,0 +1,226 @@
|
||||
import unittest
|
||||
from datetime import datetime
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from app.db.mock_database import MockBase
|
||||
from app.db.mock_models import ConversationTurn, User
|
||||
from app.services.orchestration.conversation_history_service import ConversationHistoryService
|
||||
|
||||
|
||||
class _FakeQuery:
|
||||
def __init__(self, result):
|
||||
self.result = result
|
||||
|
||||
def filter(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return self.result
|
||||
|
||||
|
||||
class _FakeSession:
|
||||
def __init__(self, user=None):
|
||||
self.user = user
|
||||
self.added = []
|
||||
self.committed = False
|
||||
self.rolled_back = False
|
||||
self.closed = False
|
||||
|
||||
def query(self, model):
|
||||
return _FakeQuery(self.user)
|
||||
|
||||
def add(self, item):
|
||||
self.added.append(item)
|
||||
|
||||
def commit(self):
|
||||
self.committed = True
|
||||
|
||||
def rollback(self):
|
||||
self.rolled_back = True
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
class ConversationHistoryServiceTests(unittest.TestCase):
|
||||
def test_record_turn_persists_conversation_audit_record(self):
|
||||
session = _FakeSession(
|
||||
user=SimpleNamespace(
|
||||
id=7,
|
||||
channel="telegram",
|
||||
external_id="12345",
|
||||
username="cliente_teste",
|
||||
)
|
||||
)
|
||||
service = ConversationHistoryService()
|
||||
started_at = datetime(2026, 3, 16, 18, 0, 0)
|
||||
completed_at = datetime(2026, 3, 16, 18, 0, 5)
|
||||
|
||||
with patch(
|
||||
"app.services.orchestration.conversation_history_service.SessionMockLocal",
|
||||
return_value=session,
|
||||
):
|
||||
service.record_turn(
|
||||
request_id="req-123",
|
||||
conversation_id="user:7",
|
||||
user_id=7,
|
||||
user_message="quero comprar um carro",
|
||||
assistant_response="Encontrei 2 veiculo(s).",
|
||||
turn_status="completed",
|
||||
intent="order_create",
|
||||
domain="sales",
|
||||
action="collect_order_create",
|
||||
tool_name="consultar_estoque",
|
||||
tool_arguments={"preco_max": 80000},
|
||||
error_detail=None,
|
||||
started_at=started_at,
|
||||
completed_at=completed_at,
|
||||
elapsed_ms=512.4,
|
||||
)
|
||||
|
||||
self.assertTrue(session.committed)
|
||||
self.assertTrue(session.closed)
|
||||
self.assertEqual(len(session.added), 1)
|
||||
record = session.added[0]
|
||||
self.assertEqual(record.request_id, "req-123")
|
||||
self.assertEqual(record.conversation_id, "user:7")
|
||||
self.assertEqual(record.user_id, 7)
|
||||
self.assertEqual(record.channel, "telegram")
|
||||
self.assertEqual(record.external_id, "12345")
|
||||
self.assertEqual(record.username, "cliente_teste")
|
||||
self.assertEqual(record.intent, "order_create")
|
||||
self.assertEqual(record.domain, "sales")
|
||||
self.assertEqual(record.action, "collect_order_create")
|
||||
self.assertEqual(record.tool_name, "consultar_estoque")
|
||||
self.assertEqual(record.tool_arguments, '{"preco_max":80000}')
|
||||
self.assertEqual(record.started_at, started_at)
|
||||
self.assertEqual(record.completed_at, completed_at)
|
||||
self.assertEqual(record.elapsed_ms, 512.4)
|
||||
|
||||
def test_list_turns_filters_and_orders_recent_first(self):
|
||||
engine = create_engine("sqlite:///:memory:")
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
MockBase.metadata.create_all(bind=engine)
|
||||
self.addCleanup(engine.dispose)
|
||||
|
||||
db = SessionLocal()
|
||||
user = User(channel="telegram", external_id="999", name="Cliente Teste", username="cliente_teste")
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
user_id = user.id
|
||||
db.add_all(
|
||||
[
|
||||
ConversationTurn(
|
||||
request_id="req-old",
|
||||
conversation_id=f"user:{user_id}",
|
||||
user_id=user_id,
|
||||
channel="telegram",
|
||||
external_id="999",
|
||||
username="cliente_teste",
|
||||
user_message="oi",
|
||||
assistant_response="ola",
|
||||
turn_status="completed",
|
||||
intent="general",
|
||||
domain="general",
|
||||
action="answer_user",
|
||||
tool_name=None,
|
||||
tool_arguments=None,
|
||||
started_at=datetime(2026, 3, 16, 10, 0, 0),
|
||||
completed_at=datetime(2026, 3, 16, 10, 0, 1),
|
||||
elapsed_ms=100.0,
|
||||
),
|
||||
ConversationTurn(
|
||||
request_id="req-new",
|
||||
conversation_id=f"user:{user_id}",
|
||||
user_id=user_id,
|
||||
channel="telegram",
|
||||
external_id="999",
|
||||
username="cliente_teste",
|
||||
user_message="quero comprar",
|
||||
assistant_response="pedido criado",
|
||||
turn_status="completed",
|
||||
intent="order_create",
|
||||
domain="sales",
|
||||
action="call_tool",
|
||||
tool_name="realizar_pedido",
|
||||
tool_arguments='{"vehicle_id":1}',
|
||||
started_at=datetime(2026, 3, 16, 11, 0, 0),
|
||||
completed_at=datetime(2026, 3, 16, 11, 0, 2),
|
||||
elapsed_ms=230.0,
|
||||
),
|
||||
ConversationTurn(
|
||||
request_id="req-failed",
|
||||
conversation_id="user:999",
|
||||
user_id=None,
|
||||
channel=None,
|
||||
external_id=None,
|
||||
username=None,
|
||||
user_message="erro",
|
||||
assistant_response=None,
|
||||
turn_status="failed",
|
||||
intent="general",
|
||||
domain="general",
|
||||
action="answer_user",
|
||||
tool_name=None,
|
||||
tool_arguments=None,
|
||||
error_detail="RuntimeError: boom",
|
||||
started_at=datetime(2026, 3, 16, 12, 0, 0),
|
||||
completed_at=datetime(2026, 3, 16, 12, 0, 1),
|
||||
elapsed_ms=300.0,
|
||||
),
|
||||
]
|
||||
)
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
service = ConversationHistoryService()
|
||||
with patch(
|
||||
"app.services.orchestration.conversation_history_service.SessionMockLocal",
|
||||
SessionLocal,
|
||||
):
|
||||
items = service.list_turns(user_id=user_id, turn_status="completed", limit=5)
|
||||
|
||||
self.assertEqual(len(items), 2)
|
||||
self.assertEqual(items[0]["request_id"], "req-new")
|
||||
self.assertEqual(items[1]["request_id"], "req-old")
|
||||
self.assertEqual(items[0]["tool_arguments"], {"vehicle_id": 1})
|
||||
self.assertEqual(items[0]["turn_status"], "completed")
|
||||
self.assertEqual(items[0]["user_id"], user_id)
|
||||
|
||||
def test_list_turns_can_filter_by_request_id(self):
|
||||
engine = create_engine("sqlite:///:memory:")
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
MockBase.metadata.create_all(bind=engine)
|
||||
self.addCleanup(engine.dispose)
|
||||
|
||||
db = SessionLocal()
|
||||
db.add(
|
||||
ConversationTurn(
|
||||
request_id="req-only",
|
||||
conversation_id="user:42",
|
||||
user_id=None,
|
||||
user_message="teste",
|
||||
assistant_response="ok",
|
||||
turn_status="completed",
|
||||
started_at=datetime(2026, 3, 16, 13, 0, 0),
|
||||
completed_at=datetime(2026, 3, 16, 13, 0, 1),
|
||||
)
|
||||
)
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
service = ConversationHistoryService()
|
||||
with patch(
|
||||
"app.services.orchestration.conversation_history_service.SessionMockLocal",
|
||||
SessionLocal,
|
||||
):
|
||||
items = service.list_turns(request_id="req-only", limit=5)
|
||||
|
||||
self.assertEqual(len(items), 1)
|
||||
self.assertEqual(items[0]["request_id"], "req-only")
|
||||
self.assertEqual(items[0]["conversation_id"], "user:42")
|
||||
Loading…
Reference in New Issue