You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/handlers.py

478 lines
16 KiB
Python

from datetime import datetime, timedelta, timezone
import hashlib
import re
from uuid import uuid4
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from sqlalchemy import func
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Customer, Order, ReviewSchedule, Vehicle
# Nesse arquivo eu faço a limpeza dos dados para persisti-los no DB
def normalize_cpf(value: str) -> str:
"""Normaliza CPF removendo qualquer caractere nao numerico."""
return re.sub(r"\D", "", value or "")
def _parse_float(value: Any, default: float = 0.0) -> float:
"""Converte entradas numericas/textuais para float com fallback padrao."""
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
text = str(value).replace("R$", "").replace(" ", "")
text = text.replace(".", "").replace(",", ".") if "," in text else text
try:
return float(text)
except Exception:
return default
def _stable_int(seed_text: str) -> int:
"""Gera inteiro deterministico a partir de um texto usando hash SHA-256."""
digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest()
return int(digest[:16], 16)
def _parse_bool(value: Any, default: bool = False) -> bool:
"""Converte valores textuais/booleanos comuns para bool."""
if isinstance(value, bool):
return value
if value is None:
return default
text = str(value).strip().lower()
if text in {"true", "1", "sim", "yes", "y"}:
return True
if text in {"false", "0", "nao", "no", "n"}:
return False
return default
def _base_review_price_by_model(modelo: str) -> float:
"""Define valor base da revisao com heuristica simples pelo modelo/categoria textual."""
text = (modelo or "").lower()
premium_brands = ("bmw", "audi", "mercedes", "volvo", "land rover", "lexus")
if any(brand in text for brand in premium_brands):
return 1200.0
if any(tag in text for tag in ("suv", "pickup", "caminhonete")):
return 900.0
if "sedan" in text:
return 750.0
if "hatch" in text:
return 650.0
return 700.0
def _calculate_review_price(
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
) -> float:
"""Calcula valor da revisao com base em modelo, idade, quilometragem e historico."""
base = _base_review_price_by_model(modelo)
ano_atual = datetime.now().year
idade = max(0, ano_atual - int(ano))
fator_idade = 1.0 + min(idade * 0.02, 0.30)
km_int = max(0, int(km))
if km_int <= 20000:
adicional_km = 0.0
elif km_int <= 60000:
adicional_km = 150.0
elif km_int <= 100000:
adicional_km = 300.0
else:
adicional_km = 500.0
subtotal = (base * fator_idade) + adicional_km
if revisao_previa_concessionaria:
subtotal *= 0.90
return round(max(subtotal, 300.0), 2)
async def consultar_estoque(
preco_max: Optional[float] = None,
categoria: Optional[str] = None,
ordenar_preco: Optional[str] = None,
limite: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Consulta veiculos no estoque com filtros opcionais e ordenacao por preco."""
db = SessionMockLocal()
try:
query = db.query(Vehicle)
if preco_max is not None:
query = query.filter(Vehicle.preco <= preco_max)
if categoria:
query = query.filter(Vehicle.categoria == categoria.lower())
if ordenar_preco in ("asc", "desc"):
query = query.order_by(Vehicle.preco.asc() if ordenar_preco == "asc" else Vehicle.preco.desc())
if limite is not None:
try:
limite = max(1, int(limite))
query = query.limit(limite)
except (TypeError, ValueError):
pass
rows = query.all()
return [
{
"id": row.id,
"modelo": row.modelo,
"categoria": row.categoria,
"preco": _parse_float(row.preco),
}
for row in rows
]
finally:
db.close()
async def validar_cliente_venda(cpf: str, valor_veiculo: float) -> Dict[str, Any]:
"""Avalia aprovacao de compra com base em score, limite e restricao do cliente."""
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
try:
cliente = db.query(Customer).filter(Customer.cpf == cpf_norm).first()
if cliente:
score = int(cliente.score)
limite = _parse_float(cliente.limite_credito, 0.0)
restricao = bool(cliente.possui_restricao)
nome = cliente.nome
else:
entropy = _stable_int(cpf_norm)
score = int(300 + (entropy % 550))
limite = float(30000 + (entropy % 150000))
restricao = entropy % 7 == 0
nome = "Cliente Simulado"
aprovado = (not restricao) and (valor_veiculo <= limite)
return {
"aprovado": aprovado,
"cpf": cpf_norm,
"nome": nome,
"score": score,
"limite_credito": limite,
"possui_restricao": restricao,
"valor_veiculo": valor_veiculo,
}
finally:
db.close()
async def avaliar_veiculo_troca(modelo: str, ano: int, km: int) -> Dict[str, Any]:
"""Calcula valor estimado de troca usando depreciacao por ano e quilometragem."""
ano_atual = datetime.now().year
idade = max(0, ano_atual - ano)
base = 80000.0
valor = base * (0.85 ** idade) - (km * 0.03)
valor = max(5000.0, valor)
return {
"modelo": modelo,
"ano": ano,
"km": km,
"valor_estimado_troca": round(valor, 2),
}
def _parse_tzinfo(offset: Optional[str]) -> Optional[timezone]:
if not offset:
return None
if offset == "Z":
return timezone.utc
sign = 1 if offset[0] == "+" else -1
hours = int(offset[1:3])
minutes = int(offset[4:6])
return timezone(sign * timedelta(hours=hours, minutes=minutes))
def _parse_data_hora_revisao(value: str) -> datetime:
text = (value or "").strip()
if not text:
raise ValueError("data_hora vazia")
normalized = re.sub(r"\s+[aA]s\s+", " ", text)
iso_candidates = [text, normalized]
for candidate in iso_candidates:
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
pass
patterns = (
r"^(?P<day>\d{1,2})[/-](?P<month>\d{1,2})[/-](?P<year>\d{4})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
r"^(?P<year>\d{4})[/-](?P<month>\d{1,2})[/-](?P<day>\d{1,2})\s+"
r"(?P<hour>\d{1,2}):(?P<minute>\d{2})(?::(?P<second>\d{2}))?"
r"(?:\s*(?P<tz>Z|[+-]\d{2}:\d{2}))?$",
)
for pattern in patterns:
match = re.match(pattern, normalized)
if not match:
continue
parts = match.groupdict()
second = int(parts["second"] or 0)
tzinfo = _parse_tzinfo(parts.get("tz"))
return datetime(
year=int(parts["year"]),
month=int(parts["month"]),
day=int(parts["day"]),
hour=int(parts["hour"]),
minute=int(parts["minute"]),
second=second,
tzinfo=tzinfo,
)
raise ValueError("formato invalido")
def _normalize_review_slot(value: datetime) -> datetime:
"""Normaliza data/hora de revisao para granularidade de minuto."""
return value.replace(second=0, microsecond=0)
def _format_datetime_pt_br(value: datetime) -> str:
"""Formata datetime em padrao brasileiro para mensagens ao usuario."""
return value.strftime("%d/%m/%Y as %H:%M")
def _find_next_available_review_slot(
db,
requested_dt: datetime,
max_attempts: int = 16,
step_minutes: int = 30,
) -> Optional[datetime]:
"""
Procura o proximo horario livre avancando em blocos de 30 minutos.
Retorna None se nao encontrar dentro da janela de tentativa.
"""
for attempt in range(1, max_attempts + 1):
candidate = requested_dt + timedelta(minutes=step_minutes * attempt)
ocupado = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == candidate)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if not ocupado:
return candidate
return None
async def agendar_revisao(
placa: str,
data_hora: str,
modelo: str,
ano: int,
km: int,
revisao_previa_concessionaria: bool,
user_id: Optional[int] = None,
) -> Dict[str, Any]:
"""Cria ou reaproveita agendamento de revisao a partir de placa e data/hora."""
try:
ano_int = int(ano)
km_int = int(km)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="ano e km devem ser valores inteiros validos.")
ano_atual = datetime.now().year
if ano_int < 1980 or ano_int > ano_atual + 1:
raise HTTPException(status_code=400, detail=f"ano invalido. Informe entre 1980 e {ano_atual + 1}.")
if km_int < 0:
raise HTTPException(status_code=400, detail="km invalido. Informe um valor maior ou igual a zero.")
try:
dt = _parse_data_hora_revisao(data_hora)
except ValueError:
raise HTTPException(
status_code=400,
detail=(
"data_hora invalida. Exemplos aceitos: "
"2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, "
"10/03/2026 as 09:00."
),
)
dt = _normalize_review_slot(dt)
placa_normalizada = placa.upper()
revisao_previa = _parse_bool(revisao_previa_concessionaria)
valor_revisao = _calculate_review_price(
modelo=modelo,
ano=ano_int,
km=km_int,
revisao_previa_concessionaria=revisao_previa,
)
dt_canonical = dt.isoformat()
entropy = hashlib.md5(f"{user_id}:{placa_normalizada}:{dt_canonical}".encode("utf-8")).hexdigest()[:8].upper()
protocolo = f"REV-{dt.strftime('%Y%m%d')}-{entropy}"
db = SessionMockLocal()
try:
conflito_horario = (
db.query(ReviewSchedule)
.filter(ReviewSchedule.data_hora == dt)
.filter(func.lower(ReviewSchedule.status) != "cancelado")
.first()
)
if conflito_horario:
proximo_horario = _find_next_available_review_slot(db=db, requested_dt=dt)
if proximo_horario:
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado. "
f"Posso agendar em {_format_datetime_pt_br(proximo_horario)} "
f"(ISO: {proximo_horario.isoformat()})."
),
)
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
),
)
existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first()
if existente:
return {
"protocolo": existente.protocolo,
"user_id": existente.user_id,
"placa": existente.placa,
"data_hora": existente.data_hora.isoformat(),
"status": existente.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
agendamento = ReviewSchedule(
protocolo=protocolo,
user_id=user_id,
placa=placa_normalizada,
data_hora=dt,
status="agendado",
)
db.add(agendamento)
db.commit()
db.refresh(agendamento)
return {
"protocolo": agendamento.protocolo,
"user_id": agendamento.user_id,
"placa": agendamento.placa,
"data_hora": agendamento.data_hora.isoformat(),
"status": agendamento.status,
"modelo": modelo,
"ano": ano_int,
"km": km_int,
"revisao_previa_concessionaria": revisao_previa,
"valor_revisao": valor_revisao,
}
finally:
db.close()
async def cancelar_pedido(numero_pedido: str, motivo: str, user_id: Optional[int] = None) -> Dict[str, Any]:
"""Cancela pedido existente e registra motivo e data de cancelamento."""
db = SessionMockLocal()
try:
query = db.query(Order).filter(Order.numero_pedido == numero_pedido)
if user_id is not None:
query = query.filter(Order.user_id == user_id)
pedido = query.first()
if not pedido and user_id is not None:
# Compatibilidade com pedidos antigos que ainda nao possuem user_id.
legado = (
db.query(Order)
.filter(Order.numero_pedido == numero_pedido)
.filter(Order.user_id.is_(None))
.first()
)
if legado:
legado.user_id = user_id
db.commit()
db.refresh(legado)
pedido = legado
if not pedido:
if user_id is not None:
raise HTTPException(status_code=404, detail="Pedido nao encontrado para este usuario.")
raise HTTPException(status_code=404, detail="Pedido nao encontrado na base ficticia.")
if pedido.status.lower() == "cancelado":
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
pedido.status = "Cancelado"
pedido.motivo_cancelamento = motivo
pedido.data_cancelamento = datetime.utcnow()
db.commit()
db.refresh(pedido)
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"status": pedido.status,
"motivo": pedido.motivo_cancelamento,
"data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None,
}
finally:
db.close()
async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int] = None) -> Dict[str, Any]:
"""Cria um novo pedido de compra quando o cliente estiver aprovado para o valor informado."""
cpf_norm = normalize_cpf(cpf)
avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo)
if not avaliacao.get("aprovado"):
raise HTTPException(
status_code=400,
detail=(
"Cliente nao aprovado para este valor. "
f"Limite disponivel: R$ {avaliacao.get('limite_credito', 0):.2f}."
),
)
numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
db = SessionMockLocal()
try:
pedido = Order(
numero_pedido=numero_pedido,
user_id=user_id,
cpf=cpf_norm,
status="Ativo",
)
db.add(pedido)
db.commit()
db.refresh(pedido)
return {
"numero_pedido": pedido.numero_pedido,
"user_id": pedido.user_id,
"cpf": pedido.cpf,
"status": pedido.status,
"valor_veiculo": valor_veiculo,
"aprovado_credito": True,
}
finally:
db.close()