♻️ refactor(domain): extrair regras de negocio das tools mock

- mover regras de estoque, credito, pedidos e revisao para servicos de dominio dedicados
- manter handlers como camada fina de adaptacao para o ToolRegistry
- centralizar utilitarios compartilhados e o contrato estruturado de erro das tools
main
parent 82fc846e01
commit 11ebde3127

@ -0,0 +1,46 @@
import hashlib
from typing import Any
# Responsabilidade: utilitários genéricos reutilizáveis do domínio
def parse_float(value: Any, default: float = 0.0) -> float:
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
text = str(value).replace("R$", "").replace(" ", "")
text = text.replace(".", "").replace(",", ".") if "," in text else text
try:
return float(text)
except Exception:
return default
def parse_bool(value: Any, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if value is None:
return default
text = str(value).strip().lower()
if text in {"true", "1", "sim", "yes", "y"}:
return True
if text in {"false", "0", "nao", "no", "n"}:
return False
return default
def stable_int(seed_text: str) -> int:
digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest()
return int(digest[:16], 16)
def is_legacy_schema_issue(exc: Exception) -> bool:
lowered = str(exc).lower()
return (
"unknown column" in lowered
or "invalid column" in lowered
or "has no column named" in lowered
or "no such column" in lowered
or "column count doesn't match" in lowered
)

@ -0,0 +1,38 @@
from typing import Any
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Customer
from app.services.domain.common import parse_float, stable_int
from app.services.orchestration.technical_normalizer import normalize_cpf
async def validar_cliente_venda(cpf: str, valor_veiculo: float) -> dict[str, Any]:
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
try:
cliente = db.query(Customer).filter(Customer.cpf == cpf_norm).first()
if cliente:
score = int(cliente.score)
limite = parse_float(cliente.limite_credito, 0.0)
restricao = bool(cliente.possui_restricao)
nome = cliente.nome
else:
entropy = stable_int(cpf_norm)
score = int(300 + (entropy % 550))
limite = float(30000 + (entropy % 150000))
restricao = entropy % 7 == 0
nome = "Cliente Simulado"
aprovado = (not restricao) and (valor_veiculo <= limite)
return {
"aprovado": aprovado,
"cpf": cpf_norm,
"nome": nome,
"score": score,
"limite_credito": limite,
"possui_restricao": restricao,
"valor_veiculo": valor_veiculo,
}
finally:
db.close()

@ -0,0 +1,81 @@
from datetime import datetime
from typing import Any
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Order, Vehicle
from app.services.domain.common import is_legacy_schema_issue
# regra de crédito.
async def consultar_estoque(
preco_max: float | None = None,
categoria: str | None = None,
ordenar_preco: str | None = None,
limite: int | None = None,
) -> list[dict[str, Any]]:
db = SessionMockLocal()
try:
reserved_vehicle_ids = set()
try:
reserved_vehicle_ids = {
int(vehicle_id)
for (vehicle_id,) in (
db.query(Order.vehicle_id)
.filter(Order.vehicle_id.isnot(None))
.filter(Order.status != "Cancelado")
.all()
)
if vehicle_id is not None
}
except (OperationalError, SQLAlchemyError) as exc:
if not is_legacy_schema_issue(exc):
raise
db.rollback()
query = db.query(Vehicle)
if reserved_vehicle_ids:
query = query.filter(~Vehicle.id.in_(reserved_vehicle_ids))
if preco_max is not None:
query = query.filter(Vehicle.preco <= preco_max)
if categoria:
query = query.filter(Vehicle.categoria == categoria.lower())
if ordenar_preco in {"asc", "desc"}:
query = query.order_by(Vehicle.preco.asc() if ordenar_preco == "asc" else Vehicle.preco.desc())
if limite is not None:
try:
query = query.limit(max(1, int(limite)))
except (TypeError, ValueError):
pass
rows = query.all()
return [
{
"id": row.id,
"modelo": row.modelo,
"categoria": row.categoria,
"preco": float(row.preco),
}
for row in rows
]
finally:
db.close()
async def avaliar_veiculo_troca(modelo: str, ano: int, km: int) -> dict[str, Any]:
ano_atual = datetime.now().year
idade = max(0, ano_atual - ano)
base = 80000.0
valor = base * (0.85 ** idade) - (km * 0.03)
valor = max(5000.0, valor)
return {
"modelo": modelo,
"ano": ano,
"km": km,
"valor_estimado_troca": round(valor, 2),
}

@ -0,0 +1,200 @@
from datetime import datetime
from typing import Any
from uuid import uuid4
from sqlalchemy import text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Order, User, Vehicle
from app.services.domain.common import is_legacy_schema_issue
from app.services.domain.credit_service import validar_cliente_venda
from app.services.domain.tool_errors import raise_tool_http_error
from app.services.orchestration.technical_normalizer import normalize_cpf
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
# Responsabilidade: regra de pedido.
async def cancelar_pedido(
numero_pedido: str,
motivo: str,
user_id: int | None = None,
) -> dict[str, Any]:
db = SessionMockLocal()
try:
query = db.query(Order).filter(Order.numero_pedido == numero_pedido)
if user_id is not None:
query = query.filter(Order.user_id == user_id)
pedido = query.first()
if not pedido and user_id is not None:
legado = (
db.query(Order)
.filter(Order.numero_pedido == numero_pedido)
.filter(Order.user_id.is_(None))
.first()
)
if legado:
legado.user_id = user_id
db.commit()
db.refresh(legado)
pedido = legado
if not pedido:
raise_tool_http_error(
status_code=404,
code="order_not_found",
message=(
"Pedido nao encontrado para este usuario."
if user_id is not None
else "Pedido nao encontrado na base ficticia."
),
retryable=True,
field="numero_pedido",
)
if pedido.status.lower() == "cancelado":
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
pedido.status = "Cancelado"
pedido.motivo_cancelamento = motivo
pedido.data_cancelamento = datetime.utcnow()
db.commit()
db.refresh(pedido)
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
finally:
db.close()
async def realizar_pedido(
cpf: str,
vehicle_id: int,
user_id: int | None = None,
) -> dict[str, Any]:
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
try:
vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first()
if not vehicle:
raise_tool_http_error(
status_code=404,
code="vehicle_not_found",
message="Veiculo nao encontrado no estoque.",
retryable=True,
field="vehicle_id",
)
existing_order = None
try:
existing_order = (
db.query(Order)
.filter(Order.vehicle_id == vehicle_id)
.filter(Order.status != "Cancelado")
.first()
)
except (OperationalError, SQLAlchemyError) as exc:
if not is_legacy_schema_issue(exc):
raise
db.rollback()
if existing_order:
raise_tool_http_error(
status_code=409,
code="vehicle_already_reserved",
message="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
retryable=True,
field="vehicle_id",
)
valor_veiculo = float(vehicle.preco)
modelo_veiculo = str(vehicle.modelo)
await hydrate_mock_customer_from_cpf(cpf=cpf_norm, user_id=user_id)
avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo)
if not avaliacao.get("aprovado"):
raise_tool_http_error(
status_code=400,
code="credit_not_approved",
message=(
"Cliente nao aprovado para este valor. "
f"Limite disponivel: R$ {avaliacao.get('limite_credito', 0):.2f}."
),
retryable=False,
field="cpf",
)
numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
if user and user.cpf != cpf_norm:
user.cpf = cpf_norm
pedido = Order(
numero_pedido=numero_pedido,
user_id=user_id,
cpf=cpf_norm,
vehicle_id=vehicle.id,
modelo_veiculo=modelo_veiculo,
valor_veiculo=valor_veiculo,
status="Ativo",
)
db.add(pedido)
try:
db.commit()
db.refresh(pedido)
except (OperationalError, SQLAlchemyError) as exc:
db.rollback()
if not is_legacy_schema_issue(exc):
raise
db.execute(
text(
"INSERT INTO orders (numero_pedido, user_id, cpf, status) "
"VALUES (:numero_pedido, :user_id, :cpf, :status)"
),
{
"numero_pedido": numero_pedido,
"user_id": user_id,
"cpf": cpf_norm,
"status": "Ativo",
},
)
db.commit()
return {
"numero_pedido": numero_pedido,
"user_id": user_id,
"cpf": cpf_norm,
"vehicle_id": vehicle.id,
"modelo_veiculo": modelo_veiculo,
"status": "Ativo",
"status_veiculo": "Reservado",
"valor_veiculo": valor_veiculo,
"aprovado_credito": True,
}
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"cpf": pedido.cpf,
"vehicle_id": pedido.vehicle_id,
"modelo_veiculo": pedido.modelo_veiculo,
"status": pedido.status,
"status_veiculo": "Reservado",
"valor_veiculo": pedido.valor_veiculo,
"aprovado_credito": True,
}
finally:
db.close()

@ -0,0 +1,468 @@
import hashlib
import re
from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import HTTPException
from sqlalchemy import func
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import ReviewSchedule
from app.services.domain.common import parse_bool
from app.services.domain.tool_errors import build_tool_error, raise_tool_http_error
# Responsabilidade: tudo que é regra de revisão.
def _base_review_price_by_model(modelo: str) -> float:
text = (modelo or "").lower()
premium_brands = ("bmw", "audi", "mercedes", "volvo", "land rover", "lexus")
if any(brand in text for brand in premium_brands):
return 1200.0
if any(tag in text for tag in ("suv", "pickup", "caminhonete")):
return 900.0
if "sedan" in text:
return 750.0
if "hatch" in text:
return 650.0
return 700.0
def _calculate_review_price(
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
) -> float:
base = _base_review_price_by_model(modelo)
ano_atual = datetime.now().year
idade = max(0, ano_atual - int(ano))
fator_idade = 1.0 + min(idade * 0.02, 0.30)
km_int = max(0, int(km))
if km_int <= 20000:
adicional_km = 0.0
elif km_int <= 60000:
adicional_km = 150.0
elif km_int <= 100000:
adicional_km = 300.0
else:
adicional_km = 500.0
subtotal = (base * fator_idade) + adicional_km
if revisao_previa_concessionaria:
subtotal *= 0.90
return round(max(subtotal, 300.0), 2)
def _parse_tzinfo(offset: str | None) -> timezone | None:
if not offset:
return None
if offset == "Z":
return timezone.utc
sign = 1 if offset[0] == "+" else -1
hours = int(offset[1:3])
minutes = int(offset[4:6])
return timezone(sign * timedelta(hours=hours, minutes=minutes))
def parse_review_datetime(value: str) -> datetime:
text = (value or "").strip()
if not text:
raise ValueError("data_hora vazia")
normalized = re.sub(r"\s+[aàáâã]s\s+", " ", text, flags=re.IGNORECASE)
for candidate in (text, normalized):
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
pass
patterns = (
r"^(?P<day>\d{1,2})[/-](?P<month>\d{1,2})[/-](?P<year>\d{4})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
r"^(?P<year>\d{4})[/-](?P<month>\d{1,2})[/-](?P<day>\d{1,2})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
)
for pattern in patterns:
match = re.match(pattern, normalized)
if not match:
continue
parts = match.groupdict()
return datetime(
year=int(parts["year"]),
month=int(parts["month"]),
day=int(parts["day"]),
hour=int(parts["hour"]),
minute=int(parts["minute"]),
second=int(parts["second"] or 0),
tzinfo=_parse_tzinfo(parts.get("tz")),
)
raise ValueError("formato invalido")
def _normalize_review_slot(value: datetime) -> datetime:
return value.replace(second=0, microsecond=0)
def _format_datetime_pt_br(value: datetime) -> str:
return value.strftime("%d/%m/%Y as %H:%M")
def _find_next_available_review_slot(
db,
requested_dt: datetime,
max_attempts: int = 16,
step_minutes: int = 30,
) -> datetime | None:
for attempt in range(1, max_attempts + 1):
candidate = requested_dt + timedelta(minutes=step_minutes * attempt)
ocupado = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == candidate)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if not ocupado:
return candidate
return None
def build_review_conflict_detail(
requested_dt: datetime,
suggested_dt: datetime | None = None,
) -> dict[str, Any]:
message = (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado. "
f"Posso agendar em {_format_datetime_pt_br(suggested_dt)}."
if suggested_dt is not None
else (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
)
)
return build_tool_error(
code="review_schedule_conflict",
message=message,
retryable=True,
field="data_hora",
meta={
"requested_iso": requested_dt.isoformat(),
"suggested_iso": suggested_dt.isoformat() if suggested_dt is not None else None,
},
)
async def agendar_revisao(
placa: str,
data_hora: str,
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
user_id: int | None = None,
) -> dict[str, Any]:
try:
ano_int = int(ano)
km_int = int(km)
except (TypeError, ValueError):
raise_tool_http_error(
status_code=400,
code="invalid_vehicle_data",
message="ano e km devem ser valores inteiros validos.",
retryable=True,
)
ano_atual = datetime.now().year
if ano_int < 1980 or ano_int > ano_atual + 1:
raise_tool_http_error(
status_code=400,
code="invalid_year",
message=f"ano invalido. Informe entre 1980 e {ano_atual + 1}.",
retryable=True,
field="ano",
)
if km_int < 0:
raise_tool_http_error(
status_code=400,
code="invalid_km",
message="km invalido. Informe um valor maior ou igual a zero.",
retryable=True,
field="km",
)
try:
dt = _normalize_review_slot(parse_review_datetime(data_hora))
except ValueError:
raise_tool_http_error(
status_code=400,
code="invalid_review_datetime",
message=(
"data_hora invalida. Exemplos aceitos: "
"2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, "
"10/03/2026 as 09:00."
),
retryable=True,
field="data_hora",
)
placa_normalizada = placa.upper()
revisao_previa = parse_bool(revisao_previa_concessionaria)
valor_revisao = _calculate_review_price(
modelo=modelo,
ano=ano_int,
km=km_int,
revisao_previa_concessionaria=revisao_previa,
)
dt_canonical = dt.isoformat()
entropy = hashlib.md5(f"{user_id}:{placa_normalizada}:{dt_canonical}".encode("utf-8")).hexdigest()[:8].upper()
protocolo = f"REV-{dt.strftime('%Y%m%d')}-{entropy}"
db = SessionMockLocal()
try:
conflito_horario = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == dt)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if conflito_horario:
proximo_horario = _find_next_available_review_slot(db=db, requested_dt=dt)
raise HTTPException(
status_code=409,
detail=build_review_conflict_detail(
requested_dt=dt,
suggested_dt=proximo_horario,
),
)
existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first()
if existente:
return {
"protocolo": existente.protocolo,
"user_id": existente.user_id,
"placa": existente.placa,
"data_hora": existente.data_hora.isoformat(),
"status": existente.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
agendamento = ReviewSchedule(
protocolo=protocolo,
user_id=user_id,
placa=placa_normalizada,
data_hora=dt,
status="agendado",
)
db.add(agendamento)
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
finally:
db.close()
async def listar_agendamentos_revisao(
user_id: int | None = None,
placa: str | None = None,
status: str | None = None,
limite: int | None = 20,
) -> list[dict[str, Any]]:
if user_id is None:
raise_tool_http_error(
status_code=400,
code="missing_user_id",
message="Informe user_id para listar seus agendamentos de revisao.",
retryable=False,
)
placa_normalizada = placa.upper().strip() if placa else None
status_normalizado = status.lower().strip() if status else None
try:
limite_int = int(limite) if limite is not None else 20
except (TypeError, ValueError):
limite_int = 20
limite_int = max(1, min(limite_int, 100))
db = SessionMockLocal()
try:
query = db.query(ReviewSchedule).filter(ReviewSchedule.user_id == user_id)
if placa_normalizada:
query = query.filter(ReviewSchedule.placa == placa_normalizada)
if status_normalizado:
query = query.filter(func.lower(ReviewSchedule.status) == status_normalizado)
agendamentos = query.order_by(ReviewSchedule.data_hora.asc()).limit(limite_int).all()
return [
{
"protocolo": row.protocolo,
"user_id": row.user_id,
"placa": row.placa,
"data_hora": row.data_hora.isoformat(),
"status": row.status,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
for row in agendamentos
]
finally:
db.close()
async def cancelar_agendamento_revisao(
protocolo: str,
motivo: str | None = None,
user_id: int | None = None,
) -> dict[str, Any]:
if user_id is None:
raise_tool_http_error(
status_code=400,
code="missing_user_id",
message="Informe user_id para cancelar seu agendamento de revisao.",
retryable=False,
)
db = SessionMockLocal()
try:
agendamento = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.protocolo == protocolo)
.filter(ReviewSchedule.user_id == user_id)
.first()
)
if not agendamento:
raise_tool_http_error(
status_code=404,
code="review_not_found",
message="Agendamento de revisao nao encontrado para este usuario.",
retryable=True,
field="protocolo",
)
if agendamento.status.lower() == "cancelado":
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"motivo": motivo,
}
agendamento.status = "cancelado"
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"motivo": motivo,
}
finally:
db.close()
async def editar_data_revisao(
protocolo: str,
nova_data_hora: str,
user_id: int | None = None,
) -> dict[str, Any]:
if user_id is None:
raise_tool_http_error(
status_code=400,
code="missing_user_id",
message="Informe user_id para editar seu agendamento de revisao.",
retryable=False,
)
try:
nova_data = _normalize_review_slot(parse_review_datetime(nova_data_hora))
except ValueError:
raise_tool_http_error(
status_code=400,
code="invalid_review_datetime",
message=(
"nova_data_hora invalida. Exemplos aceitos: "
"2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, "
"10/03/2026 as 09:00."
),
retryable=True,
field="nova_data_hora",
)
db = SessionMockLocal()
try:
agendamento = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.protocolo == protocolo)
.filter(ReviewSchedule.user_id == user_id)
.first()
)
if not agendamento:
raise_tool_http_error(
status_code=404,
code="review_not_found",
message="Agendamento de revisao nao encontrado para este usuario.",
retryable=True,
field="protocolo",
)
if agendamento.status.lower() == "cancelado":
raise_tool_http_error(
status_code=400,
code="review_already_cancelled",
message="Nao e possivel editar um agendamento ja cancelado.",
retryable=False,
)
conflito = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.id != agendamento.id)
.filter(ReviewSchedule.data_hora == nova_data)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if conflito:
proximo_horario = _find_next_available_review_slot(db=db, requested_dt=nova_data)
raise HTTPException(
status_code=409,
detail=build_review_conflict_detail(
requested_dt=nova_data,
suggested_dt=proximo_horario,
),
)
agendamento.data_hora = nova_data
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
}
finally:
db.close()

@ -0,0 +1,45 @@
from typing import Any
from fastapi import HTTPException
# Responsabilidade: padronizar erros do domínio
def build_tool_error(
*,
code: str,
message: str,
retryable: bool,
field: str | None = None,
meta: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"code": code,
"message": message,
"retryable": retryable,
"field": field,
}
if isinstance(meta, dict):
payload.update(meta)
return payload
def raise_tool_http_error(
*,
status_code: int,
code: str,
message: str,
retryable: bool,
field: str | None = None,
meta: dict[str, Any] | None = None,
) -> None:
raise HTTPException(
status_code=status_code,
detail=build_tool_error(
code=code,
message=message,
retryable=retryable,
field=field,
meta=meta,
),
)

@ -1,768 +1,33 @@
from datetime import datetime, timedelta, timezone
import hashlib
import logging
import re
from uuid import uuid4
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from sqlalchemy import func
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.sql import text
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Customer, Order, ReviewSchedule, User, Vehicle
from app.services.orchestration.technical_normalizer import normalize_cpf
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
# Nesse arquivo eu faço a normalização dos dados para persisti-los no DB
# Tambem ficam as tools mock que simulam regras de negocio do dominio.
logger = logging.getLogger(__name__)
def _parse_float(value: Any, default: float = 0.0) -> float:
"""Converte entradas numericas/textuais para float com fallback padrao."""
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
text = str(value).replace("R$", "").replace(" ", "")
text = text.replace(".", "").replace(",", ".") if "," in text else text
try:
return float(text)
except Exception:
return default
def _is_legacy_schema_issue(exc: Exception) -> bool:
lowered = str(exc).lower()
return (
"unknown column" in lowered
or "invalid column" in lowered
or "has no column named" in lowered
or "no such column" in lowered
or "column count doesn't match" in lowered
)
def _stable_int(seed_text: str) -> int:
"""Gera inteiro deterministico a partir de um texto usando hash SHA-256."""
digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest()
return int(digest[:16], 16)
def _parse_bool(value: Any, default: bool = False) -> bool:
"""Converte valores textuais/booleanos comuns para bool."""
if isinstance(value, bool):
return value
if value is None:
return default
text = str(value).strip().lower()
if text in {"true", "1", "sim", "yes", "y"}:
return True
if text in {"false", "0", "nao", "no", "n"}:
return False
return default
def _base_review_price_by_model(modelo: str) -> float:
"""Define valor base da revisao com heuristica simples pelo modelo/categoria textual."""
text = (modelo or "").lower()
premium_brands = ("bmw", "audi", "mercedes", "volvo", "land rover", "lexus")
if any(brand in text for brand in premium_brands):
return 1200.0
if any(tag in text for tag in ("suv", "pickup", "caminhonete")):
return 900.0
if "sedan" in text:
return 750.0
if "hatch" in text:
return 650.0
return 700.0
def _calculate_review_price(
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
) -> float:
"""Calcula valor da revisao com base em modelo, idade, quilometragem e historico."""
base = _base_review_price_by_model(modelo)
ano_atual = datetime.now().year
idade = max(0, ano_atual - int(ano))
fator_idade = 1.0 + min(idade * 0.02, 0.30)
km_int = max(0, int(km))
if km_int <= 20000:
adicional_km = 0.0
elif km_int <= 60000:
adicional_km = 150.0
elif km_int <= 100000:
adicional_km = 300.0
else:
adicional_km = 500.0
subtotal = (base * fator_idade) + adicional_km
if revisao_previa_concessionaria:
subtotal *= 0.90
return round(max(subtotal, 300.0), 2)
async def consultar_estoque(
preco_max: Optional[float] = None,
categoria: Optional[str] = None,
ordenar_preco: Optional[str] = None,
limite: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Consulta veiculos no estoque com filtros opcionais e ordenacao por preco."""
db = SessionMockLocal()
try:
reserved_vehicle_ids = set()
try:
reserved_vehicle_ids = {
int(vehicle_id)
for (vehicle_id,) in (
db.query(Order.vehicle_id)
.filter(Order.vehicle_id.isnot(None))
.filter(Order.status != "Cancelado")
.all()
)
if vehicle_id is not None
}
except (OperationalError, SQLAlchemyError) as exc:
if not _is_legacy_schema_issue(exc):
raise
db.rollback()
logger.warning("Schema legado sem vehicle_id em orders; estoque nao filtrara reservas.")
query = db.query(Vehicle)
if reserved_vehicle_ids:
query = query.filter(~Vehicle.id.in_(reserved_vehicle_ids))
if preco_max is not None:
query = query.filter(Vehicle.preco <= preco_max)
if categoria:
query = query.filter(Vehicle.categoria == categoria.lower())
if ordenar_preco in ("asc", "desc"):
query = query.order_by(Vehicle.preco.asc() if ordenar_preco == "asc" else Vehicle.preco.desc())
if limite is not None:
try:
limite = max(1, int(limite))
query = query.limit(limite)
except (TypeError, ValueError):
pass
rows = query.all()
return [
{
"id": row.id,
"modelo": row.modelo,
"categoria": row.categoria,
"preco": _parse_float(row.preco),
}
for row in rows
]
finally:
db.close()
async def validar_cliente_venda(cpf: str, valor_veiculo: float) -> Dict[str, Any]:
"""Avalia aprovacao de compra com base em score, limite e restricao do cliente."""
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
try:
cliente = db.query(Customer).filter(Customer.cpf == cpf_norm).first()
if cliente:
score = int(cliente.score)
limite = _parse_float(cliente.limite_credito, 0.0)
restricao = bool(cliente.possui_restricao)
nome = cliente.nome
else:
entropy = _stable_int(cpf_norm)
score = int(300 + (entropy % 550))
limite = float(30000 + (entropy % 150000))
restricao = entropy % 7 == 0
nome = "Cliente Simulado"
aprovado = (not restricao) and (valor_veiculo <= limite)
return {
"aprovado": aprovado,
"cpf": cpf_norm,
"nome": nome,
"score": score,
"limite_credito": limite,
"possui_restricao": restricao,
"valor_veiculo": valor_veiculo,
}
finally:
db.close()
async def avaliar_veiculo_troca(modelo: str, ano: int, km: int) -> Dict[str, Any]:
"""Calcula valor estimado de troca usando depreciacao por ano e quilometragem."""
ano_atual = datetime.now().year
idade = max(0, ano_atual - ano)
base = 80000.0
valor = base * (0.85 ** idade) - (km * 0.03)
valor = max(5000.0, valor)
return {
"modelo": modelo,
"ano": ano,
"km": km,
"valor_estimado_troca": round(valor, 2),
}
def _parse_tzinfo(offset: Optional[str]) -> Optional[timezone]:
if not offset:
return None
if offset == "Z":
return timezone.utc
sign = 1 if offset[0] == "+" else -1
hours = int(offset[1:3])
minutes = int(offset[4:6])
return timezone(sign * timedelta(hours=hours, minutes=minutes))
def _parse_data_hora_revisao(value: str) -> datetime:
text = (value or "").strip()
if not text:
raise ValueError("data_hora vazia")
normalized = re.sub(r"\s+[aàáâã]s\s+", " ", text, flags=re.IGNORECASE)
iso_candidates = [text, normalized]
for candidate in iso_candidates:
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
pass
patterns = (
r"^(?P<day>\d{1,2})[/-](?P<month>\d{1,2})[/-](?P<year>\d{4})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
r"^(?P<year>\d{4})[/-](?P<month>\d{1,2})[/-](?P<day>\d{1,2})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
)
for pattern in patterns:
match = re.match(pattern, normalized)
if not match:
continue
parts = match.groupdict()
second = int(parts["second"] or 0)
tzinfo = _parse_tzinfo(parts.get("tz"))
return datetime(
year=int(parts["year"]),
month=int(parts["month"]),
day=int(parts["day"]),
hour=int(parts["hour"]),
minute=int(parts["minute"]),
second=second,
tzinfo=tzinfo,
)
raise ValueError("formato invalido")
def _normalize_review_slot(value: datetime) -> datetime:
"""Normaliza data/hora de revisao para granularidade de minuto."""
return value.replace(second=0, microsecond=0)
def _format_datetime_pt_br(value: datetime) -> str:
"""Formata datetime em padrao brasileiro para mensagens ao usuario."""
return value.strftime("%d/%m/%Y as %H:%M")
def _find_next_available_review_slot(
db,
requested_dt: datetime,
max_attempts: int = 16,
step_minutes: int = 30,
) -> Optional[datetime]:
"""
Procura o proximo horario livre avancando em blocos de 30 minutos.
Retorna None se nao encontrar dentro da janela de tentativa.
"""
for attempt in range(1, max_attempts + 1):
candidate = requested_dt + timedelta(minutes=step_minutes * attempt)
ocupado = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == candidate)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if not ocupado:
return candidate
return None
def _build_review_conflict_detail(
requested_dt: datetime,
suggested_dt: Optional[datetime] = None,
) -> Dict[str, Any]:
if suggested_dt is not None:
return {
"code": "review_schedule_conflict",
"message": (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado. "
f"Posso agendar em {_format_datetime_pt_br(suggested_dt)}."
),
"requested_iso": requested_dt.isoformat(),
"suggested_iso": suggested_dt.isoformat(),
}
return {
"code": "review_schedule_conflict",
"message": (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
),
"requested_iso": requested_dt.isoformat(),
"suggested_iso": None,
}
async def agendar_revisao(
placa: str,
data_hora: str,
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
user_id: Optional[int] = None,
) -> Dict[str, Any]:
"""Cria ou reaproveita agendamento de revisao a partir de placa e data/hora."""
try:
ano_int = int(ano)
km_int = int(km)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="ano e km devem ser valores inteiros validos.")
ano_atual = datetime.now().year
if ano_int < 1980 or ano_int > ano_atual + 1:
raise HTTPException(status_code=400, detail=f"ano invalido. Informe entre 1980 e {ano_atual + 1}.")
if km_int < 0:
raise HTTPException(status_code=400, detail="km invalido. Informe um valor maior ou igual a zero.")
try:
dt = _parse_data_hora_revisao(data_hora)
except ValueError:
raise HTTPException(
status_code=400,
detail=(
"data_hora invalida. Exemplos aceitos: "
"2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, "
"10/03/2026 as 09:00."
),
)
dt = _normalize_review_slot(dt)
placa_normalizada = placa.upper()
revisao_previa = _parse_bool(revisao_previa_concessionaria)
valor_revisao = _calculate_review_price(
modelo=modelo,
ano=ano_int,
km=km_int,
revisao_previa_concessionaria=revisao_previa,
)
dt_canonical = dt.isoformat()
entropy = hashlib.md5(f"{user_id}:{placa_normalizada}:{dt_canonical}".encode("utf-8")).hexdigest()[:8].upper()
protocolo = f"REV-{dt.strftime('%Y%m%d')}-{entropy}"
db = SessionMockLocal()
try:
conflito_horario = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == dt)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if conflito_horario:
proximo_horario = _find_next_available_review_slot(db=db, requested_dt=dt)
if proximo_horario:
raise HTTPException(
status_code=409,
detail=_build_review_conflict_detail(
requested_dt=dt,
suggested_dt=proximo_horario,
),
)
raise HTTPException(
status_code=409,
detail=_build_review_conflict_detail(requested_dt=dt),
)
existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first()
if existente:
return {
"protocolo": existente.protocolo,
"user_id": existente.user_id,
"placa": existente.placa,
"data_hora": existente.data_hora.isoformat(),
"status": existente.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
agendamento = ReviewSchedule(
protocolo=protocolo,
user_id=user_id,
placa=placa_normalizada,
data_hora=dt,
status="agendado",
)
db.add(agendamento)
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
finally:
db.close()
async def listar_agendamentos_revisao(
user_id: Optional[int] = None,
placa: Optional[str] = None,
status: Optional[str] = None,
limite: Optional[int] = 20,
) -> List[Dict[str, Any]]:
"""Lista agendamentos de revisao do usuario autenticado com filtros opcionais."""
if user_id is None:
raise HTTPException(status_code=400, detail="Informe user_id para listar seus agendamentos de revisao.")
placa_normalizada = placa.upper().strip() if placa else None
status_normalizado = status.lower().strip() if status else None
try:
limite_int = int(limite) if limite is not None else 20
except (TypeError, ValueError):
limite_int = 20
limite_int = max(1, min(limite_int, 100))
db = SessionMockLocal()
try:
query = db.query(ReviewSchedule).filter(ReviewSchedule.user_id == user_id)
if placa_normalizada:
query = query.filter(ReviewSchedule.placa == placa_normalizada)
if status_normalizado:
query = query.filter(func.lower(ReviewSchedule.status) == status_normalizado)
agendamentos = (
query.order_by(ReviewSchedule.data_hora.asc())
.limit(limite_int)
.all()
)
return [
{
"protocolo": row.protocolo,
"user_id": row.user_id,
"placa": row.placa,
"data_hora": row.data_hora.isoformat(),
"status": row.status,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
for row in agendamentos
]
finally:
db.close()
async def cancelar_agendamento_revisao(
protocolo: str,
motivo: Optional[str] = None,
user_id: Optional[int] = None,
) -> Dict[str, Any]:
"""Cancela um agendamento de revisao existente pelo protocolo."""
if user_id is None:
raise HTTPException(status_code=400, detail="Informe user_id para cancelar seu agendamento de revisao.")
db = SessionMockLocal()
try:
agendamento = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.protocolo == protocolo)
.filter(ReviewSchedule.user_id == user_id)
.first()
)
if not agendamento:
raise HTTPException(status_code=404, detail="Agendamento de revisao nao encontrado para este usuario.")
if agendamento.status.lower() == "cancelado":
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"motivo": motivo,
}
agendamento.status = "cancelado"
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"motivo": motivo,
}
finally:
db.close()
async def editar_data_revisao(
protocolo: str,
nova_data_hora: str,
user_id: Optional[int] = None,
) -> Dict[str, Any]:
"""Edita a data/hora de um agendamento de revisao existente."""
if user_id is None:
raise HTTPException(status_code=400, detail="Informe user_id para editar seu agendamento de revisao.")
try:
nova_data = _normalize_review_slot(_parse_data_hora_revisao(nova_data_hora))
except ValueError:
raise HTTPException(
status_code=400,
detail=(
"nova_data_hora invalida. Exemplos aceitos: "
"2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, "
"10/03/2026 as 09:00."
),
)
db = SessionMockLocal()
try:
agendamento = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.protocolo == protocolo)
.filter(ReviewSchedule.user_id == user_id)
.first()
)
if not agendamento:
raise HTTPException(status_code=404, detail="Agendamento de revisao nao encontrado para este usuario.")
if agendamento.status.lower() == "cancelado":
raise HTTPException(status_code=400, detail="Nao e possivel editar um agendamento ja cancelado.")
conflito = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.id != agendamento.id)
.filter(ReviewSchedule.data_hora == nova_data)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if conflito:
proximo_horario = _find_next_available_review_slot(db=db, requested_dt=nova_data)
if proximo_horario:
raise HTTPException(
status_code=409,
detail=_build_review_conflict_detail(
requested_dt=nova_data,
suggested_dt=proximo_horario,
),
)
raise HTTPException(
status_code=409,
detail=_build_review_conflict_detail(requested_dt=nova_data),
)
agendamento.data_hora = nova_data
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
}
finally:
db.close()
async def cancelar_pedido(numero_pedido: str, motivo: str, user_id: Optional[int] = None) -> Dict[str, Any]:
"""Cancela pedido existente e registra motivo e data de cancelamento."""
db = SessionMockLocal()
try:
query = db.query(Order).filter(Order.numero_pedido == numero_pedido)
if user_id is not None:
query = query.filter(Order.user_id == user_id)
pedido = query.first()
if not pedido and user_id is not None:
# Compatibilidade com pedidos antigos que ainda nao possuem user_id.
legado = (
db.query(Order)
.filter(Order.numero_pedido == numero_pedido)
.filter(Order.user_id.is_(None))
.first()
)
if legado:
legado.user_id = user_id
db.commit()
db.refresh(legado)
pedido = legado
if not pedido:
if user_id is not None:
raise HTTPException(status_code=404, detail="Pedido nao encontrado para este usuario.")
raise HTTPException(status_code=404, detail="Pedido nao encontrado na base ficticia.")
if pedido.status.lower() == "cancelado":
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
pedido.status = "Cancelado"
pedido.motivo_cancelamento = motivo
pedido.data_cancelamento = datetime.utcnow()
db.commit()
db.refresh(pedido)
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
finally:
db.close()
async def realizar_pedido(cpf: str, vehicle_id: int, user_id: Optional[int] = None) -> Dict[str, Any]:
"""Cria um novo pedido de compra quando o cliente estiver aprovado para o veiculo selecionado."""
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
try:
vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first()
if not vehicle:
raise HTTPException(status_code=404, detail="Veiculo nao encontrado no estoque.")
existing_order = None
try:
existing_order = (
db.query(Order)
.filter(Order.vehicle_id == vehicle_id)
.filter(Order.status != "Cancelado")
.first()
)
except (OperationalError, SQLAlchemyError) as exc:
if not _is_legacy_schema_issue(exc):
raise
db.rollback()
logger.warning("Schema legado sem vehicle_id em orders; reserva exclusiva de veiculo desativada.")
if existing_order:
raise HTTPException(
status_code=409,
detail="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
)
valor_veiculo = float(vehicle.preco)
modelo_veiculo = str(vehicle.modelo)
await hydrate_mock_customer_from_cpf(cpf=cpf_norm, user_id=user_id)
avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo)
if not avaliacao.get("aprovado"):
raise HTTPException(
status_code=400,
detail=(
"Cliente nao aprovado para este valor. "
f"Limite disponivel: R$ {avaliacao.get('limite_credito', 0):.2f}."
),
)
numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
if user and user.cpf != cpf_norm:
user.cpf = cpf_norm
# Tenta gravar no schema novo; se a tabela ainda estiver
# no formato legado, cai para um insert minimo compativel.
pedido = Order(
numero_pedido=numero_pedido,
user_id=user_id,
cpf=cpf_norm,
vehicle_id=vehicle.id,
modelo_veiculo=modelo_veiculo,
valor_veiculo=valor_veiculo,
status="Ativo",
)
db.add(pedido)
try:
db.commit()
db.refresh(pedido)
except (OperationalError, SQLAlchemyError) as exc:
db.rollback()
legacy_schema_issue = _is_legacy_schema_issue(exc)
if not legacy_schema_issue:
raise
db.execute(
text(
"INSERT INTO orders (numero_pedido, user_id, cpf, status) "
"VALUES (:numero_pedido, :user_id, :cpf, :status)"
),
{
"numero_pedido": numero_pedido,
"user_id": user_id,
"cpf": cpf_norm,
"status": "Ativo",
},
)
db.commit()
return {
"numero_pedido": numero_pedido,
"user_id": user_id,
"cpf": cpf_norm,
"vehicle_id": vehicle.id,
"modelo_veiculo": modelo_veiculo,
"status": "Ativo",
"status_veiculo": "Reservado",
"valor_veiculo": valor_veiculo,
"aprovado_credito": True,
}
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"cpf": pedido.cpf,
"vehicle_id": pedido.vehicle_id,
"modelo_veiculo": pedido.modelo_veiculo,
"status": pedido.status,
"status_veiculo": "Reservado",
"valor_veiculo": pedido.valor_veiculo,
"aprovado_credito": True,
}
finally:
db.close()
from typing import Any
from app.services.domain.credit_service import validar_cliente_venda
from app.services.domain.inventory_service import avaliar_veiculo_troca, consultar_estoque
from app.services.domain.order_service import cancelar_pedido, realizar_pedido
from app.services.domain.review_service import (
agendar_revisao,
cancelar_agendamento_revisao,
editar_data_revisao,
listar_agendamentos_revisao,
parse_review_datetime,
)
# Camada de adaptacao das tools: mantem a API esperada pelo registry
# enquanto a regra de negocio fica em servicos de dominio separados.
# Ele praticamente só expõe as funções esperadas pelo ToolRegistry
def _parse_data_hora_revisao(value: str):
return parse_review_datetime(value)
__all__ = [
"_parse_data_hora_revisao",
"agendar_revisao",
"avaliar_veiculo_troca",
"cancelar_agendamento_revisao",
"cancelar_pedido",
"consultar_estoque",
"editar_data_revisao",
"listar_agendamentos_revisao",
"realizar_pedido",
"validar_cliente_venda",
]

Loading…
Cancel
Save