Files
IPAToolClient/wrapper.py
2025-11-06 23:15:03 +03:00

126 lines
3.4 KiB
Python

import traceback
from aiohttp import ClientTimeout
from httpwrapper import AsyncClientConfig, BaseAsyncClient
class IPAToolClient(BaseAsyncClient):
def __init__(self, host: str, port: int):
super().__init__(
f"http://{host}:{port}",
config=AsyncClientConfig(timeout=ClientTimeout(300)),
)
async def auth(self, user: str, pswd: str) -> bool:
r = await self._post(
"/auth/login",
params={
"user": user,
"pswd": pswd,
"keychain": pswd,
},
)
if r.status != 200:
return False
json_data = await r.json()
if isinstance(json_data, bool):
return json_data
return False
async def auth_check(self, keychain: str) -> bool:
r = await self._post("/auth/check", params={"keychain": keychain})
if r.status != 200:
return False
json_data = await r.json()
if isinstance(json_data, bool):
return json_data
return False
async def auth_twostep(self, user: str, pswd: str, code: str):
r = await self._post(
"/auth/twostep",
params={
"user": user,
"pswd": pswd,
"code": code,
"keychain": pswd,
},
)
if r.status != 200:
return False
json_data = await r.json()
if isinstance(json_data, bool):
return json_data
return False
async def app_info(self, query: str, limit: int, keychain: str) -> dict:
r = await self._post(
"/app/search",
params={
"query": query,
"limit": limit,
"keychain": keychain,
},
)
if r.status != 200:
return {}
json_data = await r.json()
if isinstance(json_data, dict):
return json_data
return {}
async def app_upload(self, output_path: str, save_path: str):
r = await self._get(
"/app/upload",
params={
"pswd": "81928192",
"output_path": output_path,
},
)
if r.status != 200:
return False
try:
content = await r.read()
with open(save_path, "wb") as f:
f.write(content)
return True
except Exception:
traceback.print_exc()
return False
async def app_download(
self,
bundle_id: str,
output_path: str,
keychain: str,
) -> bool:
r = await self._post(
"/app/download",
params={
"keychain": keychain,
"bundle_id": bundle_id,
"output_path": output_path,
},
)
if r.status != 200:
return False
json_data = await r.json()
if isinstance(json_data, dict):
return json_data.get("success", False)
return False
async def app_delete(self, output_path: str) -> bool:
r = await self._delete(
"/app/delete",
params={
"pswd": "81928192",
"output_path": output_path,
},
)
if r.status != 200:
return False
json_data = await r.json()
if isinstance(json_data, bool):
return json_data
return False