114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
import asyncio
|
|
import os
|
|
import traceback
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from threading import Lock
|
|
|
|
from camoufox import AsyncCamoufox
|
|
|
|
from envs import DEBUG, GEOIP, PROXY_HOST, PROXY_PSWD, PROXY_USER, THREADS
|
|
|
|
LOCKER = Lock()
|
|
|
|
RESULTS_FILE = "results.csv"
|
|
NUMBERS_FILE = "numbers.txt"
|
|
|
|
|
|
def replace_str(s: str) -> str:
|
|
return (
|
|
"+"
|
|
+ (
|
|
s.replace(" ", "")
|
|
.replace("-", "")
|
|
.replace("+", "")
|
|
.replace("(", "")
|
|
.replace(")", "")
|
|
).strip()
|
|
)
|
|
|
|
|
|
def get_numbers() -> list[str]:
|
|
with open(NUMBERS_FILE, "r", encoding="utf-8") as f:
|
|
return list(map(replace_str, f.read().splitlines()))
|
|
|
|
|
|
def save_result(phone: str, result: str):
|
|
with LOCKER:
|
|
with open(RESULTS_FILE, "a", encoding="utf-8-sig") as f:
|
|
f.write(f"{phone},{result}\n")
|
|
|
|
|
|
def _main(phone: str):
|
|
try:
|
|
is_valid = asyncio.run(__main(phone))
|
|
except Exception:
|
|
is_valid = "error"
|
|
traceback.print_exc()
|
|
save_result(phone, is_valid)
|
|
|
|
|
|
async def __main(phone: str) -> str:
|
|
is_valid = "unknown"
|
|
proxy = None
|
|
geoip = False
|
|
if PROXY_HOST:
|
|
proxy = {"server": f"http://{PROXY_HOST}"}
|
|
if PROXY_USER and PROXY_PSWD:
|
|
proxy["username"] = PROXY_USER
|
|
proxy["password"] = PROXY_PSWD
|
|
if GEOIP:
|
|
geoip = True
|
|
async with AsyncCamoufox(
|
|
humanize=True,
|
|
headless=bool(DEBUG),
|
|
locale="en-US",
|
|
block_images=True,
|
|
block_webrtc=True,
|
|
block_webgl=True,
|
|
i_know_what_im_doing=True,
|
|
geoip=geoip,
|
|
proxy=proxy,
|
|
) as browser:
|
|
page = await browser.new_page()
|
|
await page.goto("https://id.vk.com/restore/#/resetPassword")
|
|
await page.locator("//input[@name='login' or @name='phone']").fill(phone)
|
|
await asyncio.sleep(1)
|
|
await page.locator(
|
|
"//button[@type='button' and @data-test-id='nextButton']"
|
|
).click()
|
|
for _ in range(10):
|
|
if await page.query_selector_all(
|
|
"//*[contains(text(), 'Enter SMS confirmation code')]"
|
|
):
|
|
is_valid = "valid"
|
|
break
|
|
if await page.query_selector_all(
|
|
"//*[contains(text(), 'Account not found')]"
|
|
):
|
|
is_valid = "invalid"
|
|
break
|
|
if await page.query_selector_all(
|
|
"//*[contains(text(), 'Too many attempts')]"
|
|
):
|
|
is_valid = "too_many_attempts"
|
|
break
|
|
await asyncio.sleep(1)
|
|
return is_valid
|
|
|
|
|
|
def main():
|
|
if os.path.exists(RESULTS_FILE):
|
|
os.remove(RESULTS_FILE)
|
|
if not os.path.exists(NUMBERS_FILE):
|
|
with open(NUMBERS_FILE, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
return print(
|
|
"File 'numbers.txt' not found. Please create it and add numbers to it."
|
|
)
|
|
with ThreadPoolExecutor(max_workers=THREADS) as executor:
|
|
executor.map(_main, get_numbers())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|