import traceback from aiohttp import ClientTimeout from httpwrapper import AsyncClientConfig, BaseAsyncClient class IPAToolClient(BaseAsyncClient): def __init__(self, host: str, port: int): super().__init__( f"http://{host}:{port}", config=AsyncClientConfig(timeout=ClientTimeout(300)), ) async def auth(self, user: str, pswd: str) -> bool: r = await self._post( "/auth/login", params={ "user": user, "pswd": pswd, "keychain": pswd, }, ) if r.status != 200: return False json_data = await r.json() if isinstance(json_data, bool): return json_data return False async def auth_check(self, user: str) -> bool: r = await self._post("/auth/check", params={"user": user}) if r.status != 200: return False json_data = await r.json() if isinstance(json_data, bool): return json_data return False async def auth_twostep(self, user: str, pswd: str, code: str): r = await self._post( "/auth/twostep", params={ "user": user, "pswd": pswd, "code": code, "keychain": pswd, }, ) if r.status != 200: return False json_data = await r.json() if isinstance(json_data, bool): return json_data return False async def app_info(self, query: str, limit: int, keychain: str) -> dict: r = await self._post( "/app/search", params={ "query": query, "limit": limit, "keychain": keychain, }, ) if r.status != 200: return {} json_data = await r.json() if isinstance(json_data, dict): return json_data return {} async def app_upload(self, output_path: str, save_path: str): r = await self._get( "/app/upload", params={ "pswd": "81928192", "output_path": output_path, }, ) if r.status != 200: return False try: content = await r.read() with open(save_path, "wb") as f: f.write(content) return True except Exception: traceback.print_exc() return False async def app_download( self, bundle_id: str, output_path: str, keychain: str, ) -> bool: r = await self._post( "/app/download", params={ "keychain": keychain, "bundle_id": bundle_id, "output_path": output_path, }, ) if r.status != 200: return False json_data = await r.json() if isinstance(json_data, dict): return json_data.get("success", False) return False async def app_delete(self, output_path: str) -> bool: r = await self._delete( "/app/delete", params={ "pswd": "81928192", "output_path": output_path, }, ) if r.status != 200: return False json_data = await r.json() if isinstance(json_data, bool): return json_data return False