You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/mockaroo_client.py

61 lines
2.1 KiB
Python

from typing import Any, Dict, List, Optional
import httpx
from app.core.settings import settings
class MockarooClient:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
):
self.api_key = api_key or settings.mockaroo_api_key
self.base_url = (base_url or settings.mockaroo_base_url).rstrip("/")
async def fetch_schema_data(
self,
schema_name: str,
count: int = 100,
extra_params: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
url = f"{self.base_url}/{schema_name}"
params: Dict[str, Any] = {
"key": self.api_key,
"count": count,
}
if extra_params:
params.update(extra_params)
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if isinstance(data, list):
return data
return [data]
async def post_json(self, route: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}/{route}"
params = {"key": self.api_key}
async with httpx.AsyncClient() as client:
response = await client.post(url, params=params, json=payload)
response.raise_for_status()
return response.json()
async def put_json(self, route: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}/{route}"
params = {"key": self.api_key}
async with httpx.AsyncClient() as client:
response = await client.put(url, params=params, json=payload)
response.raise_for_status()
return response.json()
async def delete_json(self, route: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{route}"
params = {"key": self.api_key}
async with httpx.AsyncClient() as client:
response = await client.delete(url, params=params, json=payload or {})
response.raise_for_status()
return response.json()