initial commit
This commit is contained in:
112
main.py
Normal file
112
main.py
Normal file
@ -0,0 +1,112 @@
|
||||
import asyncio
|
||||
import os
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from threading import Lock
|
||||
|
||||
from camoufox import AsyncCamoufox
|
||||
|
||||
from envs import PROXY_HOST, PROXY_PSWD, PROXY_USER, THREADS
|
||||
|
||||
LOCKER = Lock()
|
||||
|
||||
RESULTS_FILE = "results.csv"
|
||||
NUMBERS_FILE = "numbers.txt"
|
||||
|
||||
|
||||
def replace_str(s: str) -> str:
|
||||
return (
|
||||
"+"
|
||||
+ (
|
||||
s.replace(" ", "")
|
||||
.replace("-", "")
|
||||
.replace("+", "")
|
||||
.replace("(", "")
|
||||
.replace(")", "")
|
||||
).strip()
|
||||
)
|
||||
|
||||
|
||||
def get_numbers() -> list[str]:
|
||||
with open(NUMBERS_FILE, "r", encoding="utf-8") as f:
|
||||
return list(map(replace_str, f.read().splitlines()))
|
||||
|
||||
|
||||
def save_result(phone: str, result: str):
|
||||
with LOCKER:
|
||||
with open(RESULTS_FILE, "a", encoding="utf-8-sig") as f:
|
||||
f.write(f"{phone},{result}\n")
|
||||
|
||||
|
||||
def _main(phone: str):
|
||||
try:
|
||||
is_valid = asyncio.run(__main(phone))
|
||||
except Exception:
|
||||
is_valid = "error"
|
||||
traceback.print_exc()
|
||||
save_result(phone, is_valid)
|
||||
|
||||
|
||||
async def __main(phone: str) -> str:
|
||||
is_valid = "unknown"
|
||||
proxy = None
|
||||
geoip = False
|
||||
if PROXY_HOST:
|
||||
proxy = {"server": f"http://{PROXY_HOST}"}
|
||||
if PROXY_USER and PROXY_PSWD:
|
||||
proxy["username"] = PROXY_USER
|
||||
proxy["password"] = PROXY_PSWD
|
||||
geoip = False
|
||||
async with AsyncCamoufox(
|
||||
humanize=True,
|
||||
headless=False,
|
||||
locale="en-US",
|
||||
block_images=True,
|
||||
block_webrtc=True,
|
||||
block_webgl=True,
|
||||
i_know_what_im_doing=True,
|
||||
geoip=geoip,
|
||||
proxy=proxy,
|
||||
) as browser:
|
||||
page = await browser.new_page()
|
||||
await page.goto("https://id.vk.com/restore/#/resetPassword")
|
||||
await page.locator("//input[@name='login']").fill(phone)
|
||||
await asyncio.sleep(1)
|
||||
await page.locator(
|
||||
"//button[@type='button' and @data-test-id='nextButton']"
|
||||
).click()
|
||||
for _ in range(10):
|
||||
if await page.query_selector_all(
|
||||
"//*[contains(text(), 'Enter SMS confirmation code')]"
|
||||
):
|
||||
is_valid = "valid"
|
||||
break
|
||||
if await page.query_selector_all(
|
||||
"//*[contains(text(), 'Account not found')]"
|
||||
):
|
||||
is_valid = "invalid"
|
||||
break
|
||||
if await page.query_selector_all(
|
||||
"//*[contains(text(), 'Too many attempts')]"
|
||||
):
|
||||
is_valid = "too_many_attempts"
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
return is_valid
|
||||
|
||||
|
||||
def main():
|
||||
if os.path.exists(RESULTS_FILE):
|
||||
os.remove(RESULTS_FILE)
|
||||
if not os.path.exists(NUMBERS_FILE):
|
||||
with open(NUMBERS_FILE, "w", encoding="utf-8") as f:
|
||||
f.write("")
|
||||
return print(
|
||||
"File 'numbers.txt' not found. Please create it and add numbers to it."
|
||||
)
|
||||
with ThreadPoolExecutor(max_workers=THREADS) as executor:
|
||||
executor.map(_main, get_numbers())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user