96 lines
2.9 KiB
Python
96 lines
2.9 KiB
Python
import asyncio
|
|
import getpass
|
|
from pathlib import Path
|
|
|
|
from jsoner import json_read_async, json_write_async
|
|
|
|
from functions import move_ipa
|
|
from models import UdidInfo
|
|
from wrapper import IPAToolClient
|
|
|
|
CACHE_DIR = Path("cache")
|
|
CACHE_DIR.mkdir(exist_ok=True)
|
|
|
|
APPS_DIR = Path("apps")
|
|
APPS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
async def __main(client: IPAToolClient, udid: str, info: UdidInfo) -> bool:
|
|
output_path = f"{info.app_name}.ipa"
|
|
await client.app_delete(output_path)
|
|
|
|
for _ in range(30):
|
|
if await client.app_download(info.bundle_id, output_path, info.pswd):
|
|
break
|
|
print(udid, "download failed, retrying...")
|
|
await asyncio.sleep(3)
|
|
print(udid, "downloaded")
|
|
|
|
save_path = APPS_DIR / f"{info.app_name}-{udid}.ipa"
|
|
for _ in range(10):
|
|
if await client.app_upload(output_path, str(save_path)):
|
|
return True
|
|
return False
|
|
|
|
|
|
async def _main(client: IPAToolClient, udid: str, info: UdidInfo) -> bool:
|
|
if not await client.auth(info.user, info.pswd):
|
|
print(udid, "no auth response")
|
|
return False
|
|
if not (response := await client.app_info(info.app_name, 1, info.pswd)):
|
|
print(udid, "no app info response")
|
|
return False
|
|
if not (apps := response.get("apps", [])):
|
|
print(udid, "no apps in response")
|
|
return False
|
|
for app in apps:
|
|
if app.get("bundleID", "") != info.bundle_id:
|
|
continue
|
|
if not (version := app.get("version", "")):
|
|
print(udid, "no version in response")
|
|
continue
|
|
old_version = ""
|
|
cache_file = CACHE_DIR / f"{udid}.{info.app_name}.json"
|
|
if json_data := await json_read_async(cache_file):
|
|
old_version = json_data.get("version", "")
|
|
if old_version == version:
|
|
print(udid, "already have version", version)
|
|
return False
|
|
print(udid, "new version", version)
|
|
if await __main(client, udid, info):
|
|
await json_write_async(cache_file, {"version": version})
|
|
|
|
print(udid, "uploaded")
|
|
|
|
ipa_file = f"{info.app_name}-{udid}.ipa"
|
|
save_path = APPS_DIR / ipa_file
|
|
to_path = f"/Users/{getpass.getuser()}/{ipa_file}"
|
|
move_ipa(str(save_path), to_path)
|
|
print(udid, "moved")
|
|
return True
|
|
print(udid, "not uploaded")
|
|
return False
|
|
return False
|
|
|
|
|
|
async def main(json_data: dict):
|
|
tasks = []
|
|
for udid, value in json_data.items():
|
|
info = UdidInfo(**value)
|
|
client = IPAToolClient(info.host, info.port)
|
|
task = _main(client, udid, info)
|
|
tasks.append(task)
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
async def start():
|
|
while True:
|
|
json_data = await json_read_async("info.json")
|
|
await main(json_data)
|
|
print("sleep 600 seconds")
|
|
await asyncio.sleep(600)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(start())
|