Files
IPAToolClient/main.py
2025-11-06 23:26:27 +03:00

96 lines
2.9 KiB
Python

import asyncio
import getpass
from pathlib import Path
from jsoner import json_read_async, json_write_async
from functions import move_ipa
from models import UdidInfo
from wrapper import IPAToolClient
CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True)
APPS_DIR = Path("apps")
APPS_DIR.mkdir(exist_ok=True)
async def __main(client: IPAToolClient, udid: str, info: UdidInfo) -> bool:
output_path = f"{info.app_name}.ipa"
await client.app_delete(output_path)
for _ in range(30):
if await client.app_download(info.bundle_id, output_path, info.pswd):
break
print(udid, "download failed, retrying...")
await asyncio.sleep(3)
print(udid, "downloaded")
save_path = APPS_DIR / f"{info.app_name}-{udid}.ipa"
for _ in range(10):
if await client.app_upload(output_path, str(save_path)):
return True
return False
async def _main(client: IPAToolClient, udid: str, info: UdidInfo) -> bool:
if not await client.auth(info.user, info.pswd):
print(udid, "no auth response")
return False
if not (response := await client.app_info(info.app_name, 1, info.pswd)):
print(udid, "no app info response")
return False
if not (apps := response.get("apps", [])):
print(udid, "no apps in response")
return False
for app in apps:
if app.get("bundleID", "") != info.bundle_id:
continue
if not (version := app.get("version", "")):
print(udid, "no version in response")
continue
old_version = ""
cache_file = CACHE_DIR / f"{udid}.{info.app_name}.json"
if json_data := await json_read_async(cache_file):
old_version = json_data.get("version", "")
if old_version == version:
print(udid, "already have version", version)
return False
print(udid, "new version", version)
if await __main(client, udid, info):
await json_write_async(cache_file, {"version": version})
print(udid, "uploaded")
ipa_file = f"{info.app_name}-{udid}.ipa"
save_path = APPS_DIR / ipa_file
to_path = f"/Users/{getpass.getuser()}/{ipa_file}"
move_ipa(str(save_path), to_path)
print(udid, "moved")
return True
print(udid, "not uploaded")
return False
return False
async def main(json_data: dict):
tasks = []
for udid, value in json_data.items():
info = UdidInfo(**value)
client = IPAToolClient(info.host, info.port)
task = _main(client, udid, info)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def start():
while True:
json_data = await json_read_async("info.json")
await main(json_data)
print("sleep 600 seconds")
await asyncio.sleep(600)
if __name__ == "__main__":
asyncio.run(start())