from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from admin_app.api.dependencies import ( get_auth_service, get_current_panel_staff_context, get_settings, ) from admin_app.api.panel_session import ( clear_panel_auth_cookies, get_panel_access_cookie, get_panel_refresh_cookie, set_panel_auth_cookies, ) from admin_app.api.schemas import ( AdminAuthenticatedStaffResponse, AdminLoginRequest, AdminPanelLogoutResponse, AdminPanelWebSessionResponse, ) from admin_app.core import AdminAuthenticatedSession, AdminSettings, AuthenticatedStaffContext from admin_app.services import AuthService router = APIRouter(prefix="/panel/auth", tags=["panel-auth"]) @router.post("/login", response_model=AdminPanelWebSessionResponse) def panel_login( payload: AdminLoginRequest, request: Request, response: Response, settings: AdminSettings = Depends(get_settings), auth_service: AuthService = Depends(get_auth_service), ): ip_address, user_agent = _extract_request_metadata(request) session = auth_service.login( email=payload.email, password=payload.password, ip_address=ip_address, user_agent=user_agent, ) if session is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Credenciais administrativas invalidas.", ) set_panel_auth_cookies(response, session, settings) return _build_panel_session_response( session=session, message="Sessao administrativa web iniciada.", redirect_to=_build_prefixed_path(settings.admin_api_prefix, "/panel/admin"), ) @router.post("/refresh", response_model=AdminPanelWebSessionResponse) def panel_refresh( request: Request, response: Response, settings: AdminSettings = Depends(get_settings), auth_service: AuthService = Depends(get_auth_service), ): refresh_token = get_panel_refresh_cookie(request) if not refresh_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web sem refresh token.", ) ip_address, user_agent = _extract_request_metadata(request) session = auth_service.refresh_session( refresh_token=refresh_token, ip_address=ip_address, user_agent=user_agent, ) if session is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web invalida para refresh.", ) set_panel_auth_cookies(response, session, settings) return _build_panel_session_response( session=session, message="Sessao administrativa web renovada.", ) @router.get("/session", response_model=AdminPanelWebSessionResponse) def panel_session( current_context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context), settings: AdminSettings = Depends(get_settings), ): return AdminPanelWebSessionResponse( service="orquestrador-admin", status="ok", message="Sessao administrativa web ativa.", session_id=current_context.session_id, expires_in_seconds=settings.admin_auth_access_token_ttl_minutes * 60, staff_account=AdminAuthenticatedStaffResponse(**current_context.principal.model_dump()), redirect_to=None, ) @router.post("/logout", response_model=AdminPanelLogoutResponse) def panel_logout( request: Request, response: Response, settings: AdminSettings = Depends(get_settings), auth_service: AuthService = Depends(get_auth_service), ): ip_address, user_agent = _extract_request_metadata(request) session_id: int | None = None access_token = get_panel_access_cookie(request) if access_token: try: current_context = auth_service.get_authenticated_context(access_token) except ValueError: current_context = None if current_context is not None: auth_service.logout( current_context.session_id, actor_staff_account_id=current_context.principal.id, ip_address=ip_address, user_agent=user_agent, ) session_id = current_context.session_id refresh_token = get_panel_refresh_cookie(request) if session_id is None and refresh_token: session_id = auth_service.logout_by_refresh_token( refresh_token, ip_address=ip_address, user_agent=user_agent, ) clear_panel_auth_cookies(response, settings) return AdminPanelLogoutResponse( service="orquestrador-admin", status="ok", message="Sessao administrativa web encerrada.", session_id=session_id, redirect_to=_build_prefixed_path(settings.admin_api_prefix, "/login"), ) def _build_panel_session_response( session: AdminAuthenticatedSession, *, message: str, redirect_to: str | None = None, ) -> AdminPanelWebSessionResponse: return AdminPanelWebSessionResponse( service="orquestrador-admin", status="ok", message=message, session_id=session.session_id, expires_in_seconds=session.expires_in_seconds, staff_account=AdminAuthenticatedStaffResponse(**session.principal.model_dump()), redirect_to=redirect_to, ) def _extract_request_metadata(request: Request) -> tuple[str | None, str | None]: ip_address = request.client.host if request.client else None user_agent = request.headers.get("user-agent") return ip_address, user_agent def _build_prefixed_path(api_prefix: str, path: str) -> str: normalized_prefix = api_prefix.rstrip("/") normalized_path = path if path.startswith("/") else f"/{path}" if not normalized_prefix: return normalized_path if normalized_path == "/": return f"{normalized_prefix}/" return f"{normalized_prefix}{normalized_path}"