import threading from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from admin_app.api.panel_session import get_panel_access_cookie from admin_app.core import ( AdminSecurityService, AdminSettings, AuthenticatedStaffContext, AuthenticatedStaffPrincipal, get_admin_settings, ) from admin_app.db.database import get_admin_db_session from admin_app.repositories import ( AuditLogRepository, StaffAccountRepository, StaffSessionRepository, ToolArtifactRepository, ToolDraftRepository, ToolMetadataRepository, ToolVersionRepository, ) from admin_app.services import ( AuditService, AuthService, CollaboratorManagementService, ToolGenerationService, ToolGenerationWorkerService, ToolManagementService, ) from shared.contracts import AdminPermission, StaffRole, permissions_for_role, role_has_permission, role_includes # Injeta services, repositórios e settings. bearer_scheme = HTTPBearer(auto_error=False) _tool_generation_worker_lock = threading.Lock() _tool_generation_worker_service: ToolGenerationWorkerService | None = None _tool_generation_worker_config: tuple[int, str, str, int, int, float] | None = None def get_settings(request: Request) -> AdminSettings: app_settings = getattr(request.app.state, "admin_settings", None) if isinstance(app_settings, AdminSettings): return app_settings return get_admin_settings() def get_admin_db(db: Session = Depends(get_admin_db_session)) -> Session: return db def get_security_service(settings: AdminSettings = Depends(get_settings)) -> AdminSecurityService: return AdminSecurityService(settings) def get_staff_account_repository(db: Session = Depends(get_admin_db)) -> StaffAccountRepository: return StaffAccountRepository(db) def get_staff_session_repository(db: Session = Depends(get_admin_db)) -> StaffSessionRepository: return StaffSessionRepository(db) def get_audit_log_repository(db: Session = Depends(get_admin_db)) -> AuditLogRepository: return AuditLogRepository(db) def get_tool_draft_repository(db: Session = Depends(get_admin_db)) -> ToolDraftRepository: return ToolDraftRepository(db) def get_tool_version_repository(db: Session = Depends(get_admin_db)) -> ToolVersionRepository: return ToolVersionRepository(db) def get_tool_metadata_repository(db: Session = Depends(get_admin_db)) -> ToolMetadataRepository: return ToolMetadataRepository(db) def get_tool_artifact_repository(db: Session = Depends(get_admin_db)) -> ToolArtifactRepository: return ToolArtifactRepository(db) def get_audit_service( repository: AuditLogRepository = Depends(get_audit_log_repository), ) -> AuditService: return AuditService(repository) def get_auth_service( account_repository: StaffAccountRepository = Depends(get_staff_account_repository), session_repository: StaffSessionRepository = Depends(get_staff_session_repository), security_service: AdminSecurityService = Depends(get_security_service), audit_service: AuditService = Depends(get_audit_service), ) -> AuthService: return AuthService( account_repository=account_repository, session_repository=session_repository, security_service=security_service, audit_service=audit_service, ) def get_collaborator_management_service( account_repository: StaffAccountRepository = Depends(get_staff_account_repository), security_service: AdminSecurityService = Depends(get_security_service), audit_service: AuditService = Depends(get_audit_service), ) -> CollaboratorManagementService: return CollaboratorManagementService( account_repository=account_repository, security_service=security_service, audit_service=audit_service, ) def get_tool_generation_service( settings: AdminSettings = Depends(get_settings), ) -> ToolGenerationService: """Instancia o serviço isolado de geração via LLM do runtime administrativo. Separado completamente do LLMService do product (app.services.ai.llm_service). Usa as settings admin_tool_generation_model / admin_tool_generation_fallback_model. Mapeado ao tool_generation_runtime_profile do contrato model_runtime_separation. """ return ToolGenerationService(settings) def get_tool_generation_worker_service( settings: AdminSettings = Depends(get_settings), ) -> ToolGenerationWorkerService: global _tool_generation_worker_service, _tool_generation_worker_config config = ( int(settings.admin_tool_generation_worker_max_workers), str(settings.admin_tool_generation_model), str(settings.admin_tool_generation_fallback_model), int(settings.admin_tool_generation_timeout_seconds), int(settings.admin_tool_generation_max_output_tokens), float(settings.admin_tool_generation_temperature), ) with _tool_generation_worker_lock: if _tool_generation_worker_service is None or _tool_generation_worker_config != config: if _tool_generation_worker_service is not None: _tool_generation_worker_service.shutdown(wait=False) _tool_generation_worker_service = ToolGenerationWorkerService(settings) _tool_generation_worker_config = config return _tool_generation_worker_service def get_tool_management_service( settings: AdminSettings = Depends(get_settings), draft_repository: ToolDraftRepository = Depends(get_tool_draft_repository), version_repository: ToolVersionRepository = Depends(get_tool_version_repository), metadata_repository: ToolMetadataRepository = Depends(get_tool_metadata_repository), artifact_repository: ToolArtifactRepository = Depends(get_tool_artifact_repository), tool_generation_service: ToolGenerationService = Depends(get_tool_generation_service), tool_generation_worker_service: ToolGenerationWorkerService = Depends(get_tool_generation_worker_service), ) -> ToolManagementService: return ToolManagementService( settings=settings, draft_repository=draft_repository, version_repository=version_repository, metadata_repository=metadata_repository, artifact_repository=artifact_repository, tool_generation_service=tool_generation_service, tool_generation_worker_service=tool_generation_worker_service, ) def get_current_staff_context( credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme), auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext: if credentials is None or credentials.scheme.lower() != "bearer": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Autenticacao administrativa obrigatoria.", headers={"WWW-Authenticate": "Bearer"}, ) try: return auth_service.get_authenticated_context(credentials.credentials) except ValueError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token administrativo invalido.", headers={"WWW-Authenticate": "Bearer"}, ) from exc def get_current_panel_staff_context( request: Request, auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext: access_token = get_panel_access_cookie(request) if not access_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web obrigatoria.", ) try: return auth_service.get_authenticated_context(access_token) except ValueError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Sessao administrativa web invalida.", ) from exc def get_optional_panel_staff_context( request: Request, auth_service: AuthService = Depends(get_auth_service), ) -> AuthenticatedStaffContext | None: access_token = get_panel_access_cookie(request) if not access_token: return None try: return auth_service.get_authenticated_context(access_token) except ValueError: return None def get_current_staff_principal( context: AuthenticatedStaffContext = Depends(get_current_staff_context), ) -> AuthenticatedStaffPrincipal: return context.principal def get_current_panel_staff_principal( context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context), ) -> AuthenticatedStaffPrincipal: return context.principal def get_optional_panel_staff_principal( context: AuthenticatedStaffContext | None = Depends(get_optional_panel_staff_context), ) -> AuthenticatedStaffPrincipal | None: if context is None: return None return context.principal def get_current_staff_session_id( context: AuthenticatedStaffContext = Depends(get_current_staff_context), ) -> int: return context.session_id def get_current_panel_staff_session_id( context: AuthenticatedStaffContext = Depends(get_current_panel_staff_context), ) -> int: return context.session_id def require_staff_role(minimum_role: StaffRole): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_includes(current_staff.role, minimum_role): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Acesso administrativo requer papel minimo '{minimum_role.value}'.", ) return current_staff return dependency def require_admin_permission(permission: AdminPermission): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_has_permission(current_staff.role, permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permissao administrativa insuficiente: '{permission.value}'.", ) return current_staff return dependency def require_panel_admin_permission(permission: AdminPermission): def dependency( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_panel_staff_principal), ) -> AuthenticatedStaffPrincipal: if not role_has_permission(current_staff.role, permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permissao administrativa insuficiente: '{permission.value}'.", ) return current_staff return dependency def get_current_staff_permissions( current_staff: AuthenticatedStaffPrincipal = Depends(get_current_staff_principal), ) -> tuple[str, ...]: return tuple(permission.value for permission in permissions_for_role(current_staff.role))