search code

This commit is contained in:
2025-05-17 17:29:31 +03:00
parent c84f29c4cf
commit c9c2349d26

View File

@ -1,6 +1,10 @@
import asyncio
import contextlib
import logging
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from random import randint
from typing import Self
from telethon import TelegramClient
@ -207,6 +211,32 @@ class BaseThon(BaseData):
self._logger.exception(e)
return f"ERROR_AUTH:{e}"
async def search_code(
self,
limit: int = 1,
date_delta: int = 60,
wait_time: int = 300,
chat_id: int = 777000,
regexp: str = r"\b\d{5,6}\b",
sleep_time: tuple[int, int] = (20, 30),
) -> str:
future = datetime.now() + timedelta(seconds=wait_time)
while future > datetime.now():
async for message in self.client.iter_messages(chat_id, limit=limit):
if not message.date:
continue
date = datetime.now(tz=timezone.utc) - timedelta(minutes=date_delta)
if message.date < date:
continue
if not (match := re.search(regexp, message.text)):
continue
_code = match.group()
_code = _code.replace(" ", "").replace("-", "").replace("_", "")
if _code.isdigit():
return _code
await asyncio.sleep(randint(*sleep_time))
return ""
async def disconnect(self):
with contextlib.suppress(Exception):
await self.client(UpdateStatusRequest(offline=True))