Files
VkChecker/main.py
2025-11-12 20:30:14 +03:00

114 lines
3.1 KiB
Python

import asyncio
import os
import traceback
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from camoufox import AsyncCamoufox
from envs import DEBUG, GEOIP, PROXY_HOST, PROXY_PSWD, PROXY_USER, THREADS
LOCKER = Lock()
RESULTS_FILE = "results.csv"
NUMBERS_FILE = "numbers.txt"
def replace_str(s: str) -> str:
return (
"+"
+ (
s.replace(" ", "")
.replace("-", "")
.replace("+", "")
.replace("(", "")
.replace(")", "")
).strip()
)
def get_numbers() -> list[str]:
with open(NUMBERS_FILE, "r", encoding="utf-8") as f:
return list(map(replace_str, f.read().splitlines()))
def save_result(phone: str, result: str):
with LOCKER:
with open(RESULTS_FILE, "a", encoding="utf-8-sig") as f:
f.write(f"{phone},{result}\n")
def _main(phone: str):
try:
is_valid = asyncio.run(__main(phone))
except Exception:
is_valid = "error"
traceback.print_exc()
save_result(phone, is_valid)
async def __main(phone: str) -> str:
is_valid = "unknown"
proxy = None
geoip = False
if PROXY_HOST:
proxy = {"server": f"http://{PROXY_HOST}"}
if PROXY_USER and PROXY_PSWD:
proxy["username"] = PROXY_USER
proxy["password"] = PROXY_PSWD
if GEOIP:
geoip = True
async with AsyncCamoufox(
humanize=True,
headless=bool(DEBUG),
locale="en-US",
block_images=True,
block_webrtc=True,
block_webgl=True,
i_know_what_im_doing=True,
geoip=geoip,
proxy=proxy,
) as browser:
page = await browser.new_page()
await page.goto("https://id.vk.com/restore/#/resetPassword")
await page.locator("//input[@name='login' or @name='phone']").fill(phone)
await asyncio.sleep(1)
await page.locator(
"//button[@type='button' and @data-test-id='nextButton']"
).click()
for _ in range(10):
if await page.query_selector_all(
"//*[contains(text(), 'Enter SMS confirmation code')]"
):
is_valid = "valid"
break
if await page.query_selector_all(
"//*[contains(text(), 'Account not found')]"
):
is_valid = "invalid"
break
if await page.query_selector_all(
"//*[contains(text(), 'Too many attempts')]"
):
is_valid = "too_many_attempts"
break
await asyncio.sleep(1)
return is_valid
def main():
if os.path.exists(RESULTS_FILE):
os.remove(RESULTS_FILE)
if not os.path.exists(NUMBERS_FILE):
with open(NUMBERS_FILE, "w", encoding="utf-8") as f:
f.write("")
return print(
"File 'numbers.txt' not found. Please create it and add numbers to it."
)
with ThreadPoolExecutor(max_workers=THREADS) as executor:
executor.map(_main, get_numbers())
if __name__ == "__main__":
main()