import asyncio import contextlib import logging from datetime import datetime, timedelta, timezone from random import randint from typing import Self # ty:ignore[unresolved-import] from telethon import TelegramClient from telethon.tl.functions.account import UpdateStatusRequest from telethon.tl.functions.help import GetCountriesListRequest, GetNearestDcRequest from telethon.tl.functions.langpack import GetLangPackRequest from telethon.tl.types import ( InputPeerUser, JsonNumber, JsonObject, JsonObjectValue, JsonString, User, ) from thon.data import Data from thon.exceptions import ThonBannedError from thon.models.misc import ThonSearchCodeOptions from thon.models.options import ThonOptions from thon.models.session import Session from thon.utils import extract_verification_code logger = logging.getLogger("thon") API_PACKS = { 4: "android", 5: "android", 6: "android", 8: "ios", 9: "macos", 2834: "macos", 2040: "tdesktop", 10840: "ios", 17349: "tdesktop", 21724: "android", 16623: "android", 2496: "", } class ProcessThon(Data): def __init__(self, session: Session, options: ThonOptions): self.__item = session.item self.__retries = options.retries self.__timeout = options.timeout self._async_check_timeout = options.check_timeout super().__init__(session.json_data) self.__client = self.__get_client() self.me: User | InputPeerUser | None = None @property def client(self) -> TelegramClient: return self.__client def __get_client(self) -> TelegramClient: __session = str(self.__item) if self.__item else self.string_session proxy = self.proxy logger.debug(f"{self.session_file} | {proxy}") client = TelegramClient( session=__session, api_id=self.app_id, api_hash=self.app_hash, device_model=self.device, app_version=self.app_version, system_lang_code=self.system_lang_code, lang_code=self.lang_pack, connection_retries=self.__retries, request_retries=self.__retries, proxy=proxy, timeout=self.__timeout, ) installer = JsonObjectValue("installer", JsonString("com.android.vending")) if self.app_id in (1, 8): installer = JsonObjectValue("installer", JsonString("com.apple.AppStore")) package = JsonObjectValue("package_id", JsonString("org.telegram.messenger")) if self.app_id in (1, 8): package = JsonObjectValue("package_id", JsonString("ph.telegra.Telegraph")) perf_cat = JsonObjectValue("perf_cat", JsonNumber(2)) tz_offset = JsonObjectValue("tz_offset", JsonNumber(self.tz_offset)) if self.tz_offset: # noinspection PyProtectedMember client._init_request.params = JsonObject([tz_offset]) if self.app_id in (4, 6): _list = [installer, package, perf_cat] if self.tz_offset: _list.append(tz_offset) # noinspection PyProtectedMember client._init_request.params = JsonObject(_list) # noinspection PyProtectedMember client._init_request.lang_pack = API_PACKS.get(self.app_id, "android") return client async def get_additional_data(self): lang_pack = API_PACKS.get(self.app_id, "") with contextlib.suppress(Exception): await self.client(GetLangPackRequest(lang_pack, self.lang_pack)) with contextlib.suppress(Exception): await self.client(GetNearestDcRequest()) with contextlib.suppress(Exception): await self.client(GetCountriesListRequest(self.lang_pack, 0)) async def __check(self) -> str: try: await self.client.connect() if not await self.client.is_user_authorized(): return f"{self.session_file} | Забанен / Banned" await self.get_additional_data() with contextlib.suppress(Exception): await self.client(UpdateStatusRequest(offline=False)) self.me = await self.client.get_me() return "OK" except ConnectionError: await self.disconnect() return f"{self.session_file} | Ошибка подключения / ConnectionError" except ThonBannedError: await self.disconnect() return f"{self.session_file} | Забанен / Banned" except Exception as e: await self.disconnect() logger.exception(e) return f"{self.session_file} | Ошибка авторизации / AuthorizationError: {e}" async def check(self) -> str: if not self._async_check_timeout: return await self.__check() try: return await asyncio.wait_for(self.__check(), self._async_check_timeout) except asyncio.TimeoutError: return f"{self.session_file} | Таймаут / Timeout" @property def phone(self) -> str: if not self.me: return "" if isinstance(self.me, User): return self.me.phone or "" return "" async def search_code(self, options: ThonSearchCodeOptions | None = None) -> str: options = options or ThonSearchCodeOptions() end_time = datetime.now() + timedelta(seconds=options.wait_time) while datetime.now() < end_time: async for m in self.client.iter_messages( entity=options.entity, limit=options.limit ): if not m.date: continue cutoff_date = datetime.now(tz=timezone.utc) - timedelta( seconds=options.date_delta ) if m.date < cutoff_date: continue if code := extract_verification_code(m.text, options.regexp): return code await asyncio.sleep(randint(*options.sleep_time)) return "" async def disconnect(self): """Полное отключение клиента с очисткой ресурсов""" # Сначала переводим в оффлайн (если есть) with contextlib.suppress(Exception): await self.client(UpdateStatusRequest(offline=True)) # Затем отключаем соединение (если есть) with contextlib.suppress(Exception): await self.client.disconnect() # Явно удаляем client из памяти with contextlib.suppress(Exception): del self.__client async def __aenter__(self) -> Self | str: r = await self.check() logger.info(f"{self.session_file} | {r}") if r != "OK": await self.disconnect() return r return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.disconnect()