You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/fakerapi_client.py

58 lines
2.0 KiB
Python

from typing import Any, Dict, List, Optional
import httpx
from app.core.settings import settings
class FakerApiClient:
def __init__(
self,
base_url: Optional[str] = None,
locale: Optional[str] = None,
seed: Optional[int] = None,
):
self.base_url = (base_url or settings.fakerapi_base_url).rstrip("/")
self.locale = locale or settings.fakerapi_locale
self.seed = settings.fakerapi_seed if seed is None else seed
async def fetch_resource(
self,
resource: str,
quantity: int,
extra_params: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
url = f"{self.base_url}/{resource.lstrip('/')}"
params: Dict[str, Any] = {
"_quantity": quantity,
"_locale": self.locale,
"_seed": self.seed,
}
if extra_params:
params.update(extra_params)
timeout = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=5.0)
headers = {
"Accept": "application/json",
"User-Agent": "orquestrador-fakerapi-client/1.0",
}
async with httpx.AsyncClient(timeout=timeout, headers=headers) as client:
try:
response = await client.get(url, params=params)
response.raise_for_status()
payload = response.json()
except httpx.ReadTimeout:
# Retry once with smaller payload to reduce timeout risk in free/public APIs.
reduced_quantity = min(quantity, 20)
retry_params = dict(params)
retry_params["_quantity"] = reduced_quantity
response = await client.get(url, params=retry_params)
response.raise_for_status()
payload = response.json()
if isinstance(payload, dict) and isinstance(payload.get("data"), list):
return payload["data"]
if isinstance(payload, list):
return payload
return []