from collections.abc import Generator from sqlalchemy import create_engine from sqlalchemy.orm import Session, declarative_base, sessionmaker from admin_app.core.settings import get_admin_settings # monta a conexão do banco administrativo e expõe get_admin_db_session(). Esse generator é o que alimenta as dependências FastAPI para repositórios e serviços. settings = get_admin_settings() admin_cloud_sql = settings.admin_db_cloud_sql_connection_name if admin_cloud_sql: ADMIN_DATABASE_URL = ( f"mysql+pymysql://{settings.admin_db_user}:{settings.admin_db_password}@/{settings.admin_db_name}" f"?unix_socket=/cloudsql/{admin_cloud_sql}" ) else: ADMIN_DATABASE_URL = ( f"mysql+pymysql://{settings.admin_db_user}:{settings.admin_db_password}@" f"{settings.admin_db_host}:{settings.admin_db_port}/{settings.admin_db_name}" ) admin_engine = create_engine( ADMIN_DATABASE_URL, pool_pre_ping=True, connect_args={"connect_timeout": 5}, ) AdminSessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=admin_engine, ) AdminBase = declarative_base() def get_admin_db_session() -> Generator[Session, None, None]: db = AdminSessionLocal() try: yield db finally: db.close()