from datetime import datetime, timedelta, timezone import hashlib import re from uuid import uuid4 from typing import Any, Dict, List, Optional from fastapi import HTTPException from sqlalchemy import func from app.db.mock_database import SessionMockLocal from app.db.mock_models import Customer, Order, ReviewSchedule, User, Vehicle from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf # Nesse arquivo eu faço a normalização dos dados para persisti-los no DB def normalize_cpf(value: str) -> str: """Normaliza CPF removendo qualquer caractere nao numerico.""" return re.sub(r"\D", "", value or "") def _parse_float(value: Any, default: float = 0.0) -> float: """Converte entradas numericas/textuais para float com fallback padrao.""" if value is None: return default if isinstance(value, (int, float)): return float(value) text = str(value).replace("R$", "").replace(" ", "") text = text.replace(".", "").replace(",", ".") if "," in text else text try: return float(text) except Exception: return default def _stable_int(seed_text: str) -> int: """Gera inteiro deterministico a partir de um texto usando hash SHA-256.""" digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest() return int(digest[:16], 16) def _parse_bool(value: Any, default: bool = False) -> bool: """Converte valores textuais/booleanos comuns para bool.""" if isinstance(value, bool): return value if value is None: return default text = str(value).strip().lower() if text in {"true", "1", "sim", "yes", "y"}: return True if text in {"false", "0", "nao", "no", "n"}: return False return default def _base_review_price_by_model(modelo: str) -> float: """Define valor base da revisao com heuristica simples pelo modelo/categoria textual.""" text = (modelo or "").lower() premium_brands = ("bmw", "audi", "mercedes", "volvo", "land rover", "lexus") if any(brand in text for brand in premium_brands): return 1200.0 if any(tag in text for tag in ("suv", "pickup", "caminhonete")): return 900.0 if "sedan" in text: return 750.0 if "hatch" in text: return 650.0 return 700.0 def _calculate_review_price( modelo: str, ano: int, km: int, revisao_previa_concessionaria: bool, ) -> float: """Calcula valor da revisao com base em modelo, idade, quilometragem e historico.""" base = _base_review_price_by_model(modelo) ano_atual = datetime.now().year idade = max(0, ano_atual - int(ano)) fator_idade = 1.0 + min(idade * 0.02, 0.30) km_int = max(0, int(km)) if km_int <= 20000: adicional_km = 0.0 elif km_int <= 60000: adicional_km = 150.0 elif km_int <= 100000: adicional_km = 300.0 else: adicional_km = 500.0 subtotal = (base * fator_idade) + adicional_km if revisao_previa_concessionaria: subtotal *= 0.90 return round(max(subtotal, 300.0), 2) async def consultar_estoque( preco_max: Optional[float] = None, categoria: Optional[str] = None, ordenar_preco: Optional[str] = None, limite: Optional[int] = None, ) -> List[Dict[str, Any]]: """Consulta veiculos no estoque com filtros opcionais e ordenacao por preco.""" db = SessionMockLocal() try: query = db.query(Vehicle) if preco_max is not None: query = query.filter(Vehicle.preco <= preco_max) if categoria: query = query.filter(Vehicle.categoria == categoria.lower()) if ordenar_preco in ("asc", "desc"): query = query.order_by(Vehicle.preco.asc() if ordenar_preco == "asc" else Vehicle.preco.desc()) if limite is not None: try: limite = max(1, int(limite)) query = query.limit(limite) except (TypeError, ValueError): pass rows = query.all() return [ { "id": row.id, "modelo": row.modelo, "categoria": row.categoria, "preco": _parse_float(row.preco), } for row in rows ] finally: db.close() async def validar_cliente_venda(cpf: str, valor_veiculo: float) -> Dict[str, Any]: """Avalia aprovacao de compra com base em score, limite e restricao do cliente.""" cpf_norm = normalize_cpf(cpf) db = SessionMockLocal() try: cliente = db.query(Customer).filter(Customer.cpf == cpf_norm).first() if cliente: score = int(cliente.score) limite = _parse_float(cliente.limite_credito, 0.0) restricao = bool(cliente.possui_restricao) nome = cliente.nome else: entropy = _stable_int(cpf_norm) score = int(300 + (entropy % 550)) limite = float(30000 + (entropy % 150000)) restricao = entropy % 7 == 0 nome = "Cliente Simulado" aprovado = (not restricao) and (valor_veiculo <= limite) return { "aprovado": aprovado, "cpf": cpf_norm, "nome": nome, "score": score, "limite_credito": limite, "possui_restricao": restricao, "valor_veiculo": valor_veiculo, } finally: db.close() async def avaliar_veiculo_troca(modelo: str, ano: int, km: int) -> Dict[str, Any]: """Calcula valor estimado de troca usando depreciacao por ano e quilometragem.""" ano_atual = datetime.now().year idade = max(0, ano_atual - ano) base = 80000.0 valor = base * (0.85 ** idade) - (km * 0.03) valor = max(5000.0, valor) return { "modelo": modelo, "ano": ano, "km": km, "valor_estimado_troca": round(valor, 2), } def _parse_tzinfo(offset: Optional[str]) -> Optional[timezone]: if not offset: return None if offset == "Z": return timezone.utc sign = 1 if offset[0] == "+" else -1 hours = int(offset[1:3]) minutes = int(offset[4:6]) return timezone(sign * timedelta(hours=hours, minutes=minutes)) def _parse_data_hora_revisao(value: str) -> datetime: text = (value or "").strip() if not text: raise ValueError("data_hora vazia") normalized = re.sub(r"\s+[aA]s\s+", " ", text) iso_candidates = [text, normalized] for candidate in iso_candidates: try: return datetime.fromisoformat(candidate.replace("Z", "+00:00")) except ValueError: pass patterns = ( r"^(?P\d{1,2})[/-](?P\d{1,2})[/-](?P\d{4})\s+" r"(?P\d{1,2}):(?P\d{2})(?::(?P\d{2}))?" r"(?:\s*(?PZ|[+-]\d{2}:\d{2}))?$", r"^(?P\d{4})[/-](?P\d{1,2})[/-](?P\d{1,2})\s+" r"(?P\d{1,2}):(?P\d{2})(?::(?P\d{2}))?" r"(?:\s*(?PZ|[+-]\d{2}:\d{2}))?$", ) for pattern in patterns: match = re.match(pattern, normalized) if not match: continue parts = match.groupdict() second = int(parts["second"] or 0) tzinfo = _parse_tzinfo(parts.get("tz")) return datetime( year=int(parts["year"]), month=int(parts["month"]), day=int(parts["day"]), hour=int(parts["hour"]), minute=int(parts["minute"]), second=second, tzinfo=tzinfo, ) raise ValueError("formato invalido") def _normalize_review_slot(value: datetime) -> datetime: """Normaliza data/hora de revisao para granularidade de minuto.""" return value.replace(second=0, microsecond=0) def _format_datetime_pt_br(value: datetime) -> str: """Formata datetime em padrao brasileiro para mensagens ao usuario.""" return value.strftime("%d/%m/%Y as %H:%M") def _find_next_available_review_slot( db, requested_dt: datetime, max_attempts: int = 16, step_minutes: int = 30, ) -> Optional[datetime]: """ Procura o proximo horario livre avancando em blocos de 30 minutos. Retorna None se nao encontrar dentro da janela de tentativa. """ for attempt in range(1, max_attempts + 1): candidate = requested_dt + timedelta(minutes=step_minutes * attempt) ocupado = ( db.query(ReviewSchedule) .filter(ReviewSchedule.data_hora == candidate) .filter(func.lower(ReviewSchedule.status) != "cancelado") .first() ) if not ocupado: return candidate return None def _build_review_conflict_detail( requested_dt: datetime, suggested_dt: Optional[datetime] = None, ) -> Dict[str, Any]: if suggested_dt is not None: return { "code": "review_schedule_conflict", "message": ( f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado. " f"Posso agendar em {_format_datetime_pt_br(suggested_dt)}." ), "requested_iso": requested_dt.isoformat(), "suggested_iso": suggested_dt.isoformat(), } return { "code": "review_schedule_conflict", "message": ( f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado e nao encontrei " "disponibilidade nas proximas 8 horas." ), "requested_iso": requested_dt.isoformat(), "suggested_iso": None, } async def agendar_revisao( placa: str, data_hora: str, modelo: str, ano: int, km: int, revisao_previa_concessionaria: bool, user_id: Optional[int] = None, ) -> Dict[str, Any]: """Cria ou reaproveita agendamento de revisao a partir de placa e data/hora.""" try: ano_int = int(ano) km_int = int(km) except (TypeError, ValueError): raise HTTPException(status_code=400, detail="ano e km devem ser valores inteiros validos.") ano_atual = datetime.now().year if ano_int < 1980 or ano_int > ano_atual + 1: raise HTTPException(status_code=400, detail=f"ano invalido. Informe entre 1980 e {ano_atual + 1}.") if km_int < 0: raise HTTPException(status_code=400, detail="km invalido. Informe um valor maior ou igual a zero.") try: dt = _parse_data_hora_revisao(data_hora) except ValueError: raise HTTPException( status_code=400, detail=( "data_hora invalida. Exemplos aceitos: " "2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, " "10/03/2026 as 09:00." ), ) dt = _normalize_review_slot(dt) placa_normalizada = placa.upper() revisao_previa = _parse_bool(revisao_previa_concessionaria) valor_revisao = _calculate_review_price( modelo=modelo, ano=ano_int, km=km_int, revisao_previa_concessionaria=revisao_previa, ) dt_canonical = dt.isoformat() entropy = hashlib.md5(f"{user_id}:{placa_normalizada}:{dt_canonical}".encode("utf-8")).hexdigest()[:8].upper() protocolo = f"REV-{dt.strftime('%Y%m%d')}-{entropy}" db = SessionMockLocal() try: conflito_horario = ( db.query(ReviewSchedule) .filter(ReviewSchedule.data_hora == dt) .filter(func.lower(ReviewSchedule.status) != "cancelado") .first() ) if conflito_horario: proximo_horario = _find_next_available_review_slot(db=db, requested_dt=dt) if proximo_horario: raise HTTPException( status_code=409, detail=_build_review_conflict_detail( requested_dt=dt, suggested_dt=proximo_horario, ), ) raise HTTPException( status_code=409, detail=_build_review_conflict_detail(requested_dt=dt), ) existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first() if existente: return { "protocolo": existente.protocolo, "user_id": existente.user_id, "placa": existente.placa, "data_hora": existente.data_hora.isoformat(), "status": existente.status, "modelo": modelo, "ano": ano_int, "km": km_int, "revisao_previa_concessionaria": revisao_previa, "valor_revisao": valor_revisao, } agendamento = ReviewSchedule( protocolo=protocolo, user_id=user_id, placa=placa_normalizada, data_hora=dt, status="agendado", ) db.add(agendamento) db.commit() db.refresh(agendamento) return { "protocolo": agendamento.protocolo, "user_id": agendamento.user_id, "placa": agendamento.placa, "data_hora": agendamento.data_hora.isoformat(), "status": agendamento.status, "modelo": modelo, "ano": ano_int, "km": km_int, "revisao_previa_concessionaria": revisao_previa, "valor_revisao": valor_revisao, } finally: db.close() async def listar_agendamentos_revisao( user_id: Optional[int] = None, placa: Optional[str] = None, status: Optional[str] = None, limite: Optional[int] = 20, ) -> List[Dict[str, Any]]: """Lista agendamentos de revisao do usuario autenticado com filtros opcionais.""" if user_id is None: raise HTTPException(status_code=400, detail="Informe user_id para listar seus agendamentos de revisao.") placa_normalizada = placa.upper().strip() if placa else None status_normalizado = status.lower().strip() if status else None try: limite_int = int(limite) if limite is not None else 20 except (TypeError, ValueError): limite_int = 20 limite_int = max(1, min(limite_int, 100)) db = SessionMockLocal() try: query = db.query(ReviewSchedule).filter(ReviewSchedule.user_id == user_id) if placa_normalizada: query = query.filter(ReviewSchedule.placa == placa_normalizada) if status_normalizado: query = query.filter(func.lower(ReviewSchedule.status) == status_normalizado) agendamentos = ( query.order_by(ReviewSchedule.data_hora.asc()) .limit(limite_int) .all() ) return [ { "protocolo": row.protocolo, "user_id": row.user_id, "placa": row.placa, "data_hora": row.data_hora.isoformat(), "status": row.status, "created_at": row.created_at.isoformat() if row.created_at else None, } for row in agendamentos ] finally: db.close() async def cancelar_agendamento_revisao( protocolo: str, motivo: Optional[str] = None, user_id: Optional[int] = None, ) -> Dict[str, Any]: """Cancela um agendamento de revisao existente pelo protocolo.""" if user_id is None: raise HTTPException(status_code=400, detail="Informe user_id para cancelar seu agendamento de revisao.") db = SessionMockLocal() try: agendamento = ( db.query(ReviewSchedule) .filter(ReviewSchedule.protocolo == protocolo) .filter(ReviewSchedule.user_id == user_id) .first() ) if not agendamento: raise HTTPException(status_code=404, detail="Agendamento de revisao nao encontrado para este usuario.") if agendamento.status.lower() == "cancelado": return { "protocolo": agendamento.protocolo, "user_id": agendamento.user_id, "placa": agendamento.placa, "data_hora": agendamento.data_hora.isoformat(), "status": agendamento.status, "motivo": motivo, } agendamento.status = "cancelado" db.commit() db.refresh(agendamento) return { "protocolo": agendamento.protocolo, "user_id": agendamento.user_id, "placa": agendamento.placa, "data_hora": agendamento.data_hora.isoformat(), "status": agendamento.status, "motivo": motivo, } finally: db.close() async def editar_data_revisao( protocolo: str, nova_data_hora: str, user_id: Optional[int] = None, ) -> Dict[str, Any]: """Edita a data/hora de um agendamento de revisao existente.""" if user_id is None: raise HTTPException(status_code=400, detail="Informe user_id para editar seu agendamento de revisao.") try: nova_data = _normalize_review_slot(_parse_data_hora_revisao(nova_data_hora)) except ValueError: raise HTTPException( status_code=400, detail=( "nova_data_hora invalida. Exemplos aceitos: " "2026-03-10T09:00:00-03:00, 2026-03-10 09:00, 10/03/2026 09:00, " "10/03/2026 as 09:00." ), ) db = SessionMockLocal() try: agendamento = ( db.query(ReviewSchedule) .filter(ReviewSchedule.protocolo == protocolo) .filter(ReviewSchedule.user_id == user_id) .first() ) if not agendamento: raise HTTPException(status_code=404, detail="Agendamento de revisao nao encontrado para este usuario.") if agendamento.status.lower() == "cancelado": raise HTTPException(status_code=400, detail="Nao e possivel editar um agendamento ja cancelado.") conflito = ( db.query(ReviewSchedule) .filter(ReviewSchedule.id != agendamento.id) .filter(ReviewSchedule.data_hora == nova_data) .filter(func.lower(ReviewSchedule.status) != "cancelado") .first() ) if conflito: proximo_horario = _find_next_available_review_slot(db=db, requested_dt=nova_data) if proximo_horario: raise HTTPException( status_code=409, detail=_build_review_conflict_detail( requested_dt=nova_data, suggested_dt=proximo_horario, ), ) raise HTTPException( status_code=409, detail=_build_review_conflict_detail(requested_dt=nova_data), ) agendamento.data_hora = nova_data db.commit() db.refresh(agendamento) return { "protocolo": agendamento.protocolo, "user_id": agendamento.user_id, "placa": agendamento.placa, "data_hora": agendamento.data_hora.isoformat(), "status": agendamento.status, } finally: db.close() async def cancelar_pedido(numero_pedido: str, motivo: str, user_id: Optional[int] = None) -> Dict[str, Any]: """Cancela pedido existente e registra motivo e data de cancelamento.""" db = SessionMockLocal() try: query = db.query(Order).filter(Order.numero_pedido == numero_pedido) if user_id is not None: query = query.filter(Order.user_id == user_id) pedido = query.first() if not pedido and user_id is not None: # Compatibilidade com pedidos antigos que ainda nao possuem user_id. legado = ( db.query(Order) .filter(Order.numero_pedido == numero_pedido) .filter(Order.user_id.is_(None)) .first() ) if legado: legado.user_id = user_id db.commit() db.refresh(legado) pedido = legado if not pedido: if user_id is not None: raise HTTPException(status_code=404, detail="Pedido nao encontrado para este usuario.") raise HTTPException(status_code=404, detail="Pedido nao encontrado na base ficticia.") if pedido.status.lower() == "cancelado": return { "numero_pedido": pedido.numero_pedido, "user_id": pedido.user_id, "status": pedido.status, "motivo": pedido.motivo_cancelamento, "data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None, } pedido.status = "Cancelado" pedido.motivo_cancelamento = motivo pedido.data_cancelamento = datetime.utcnow() db.commit() db.refresh(pedido) return { "numero_pedido": pedido.numero_pedido, "user_id": pedido.user_id, "status": pedido.status, "motivo": pedido.motivo_cancelamento, "data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None, } finally: db.close() async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int] = None) -> Dict[str, Any]: """Cria um novo pedido de compra quando o cliente estiver aprovado para o valor informado.""" cpf_norm = normalize_cpf(cpf) await hydrate_mock_customer_from_cpf(cpf=cpf_norm, user_id=user_id) avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo) if not avaliacao.get("aprovado"): raise HTTPException( status_code=400, detail=( "Cliente nao aprovado para este valor. " f"Limite disponivel: R$ {avaliacao.get('limite_credito', 0):.2f}." ), ) numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}" db = SessionMockLocal() try: if user_id is not None: user = db.query(User).filter(User.id == user_id).first() if user and user.cpf != cpf_norm: user.cpf = cpf_norm pedido = Order( numero_pedido=numero_pedido, user_id=user_id, cpf=cpf_norm, status="Ativo", ) db.add(pedido) db.commit() db.refresh(pedido) return { "numero_pedido": pedido.numero_pedido, "user_id": pedido.user_id, "cpf": pedido.cpf, "status": pedido.status, "valor_veiculo": valor_veiculo, "aprovado_credito": True, } finally: db.close()