81 lines
3.1 KiB
Python
81 lines
3.1 KiB
Python
import asyncio
|
||
import contextlib
|
||
from pathlib import Path
|
||
from typing import AsyncGenerator
|
||
|
||
from telethon import TelegramClient
|
||
from telethon.sessions import StringSession
|
||
|
||
from thon.models import Session
|
||
from thon.proxy import ProxyParser
|
||
from thon.session import ThonSession
|
||
from thon.utils import json_write
|
||
|
||
|
||
class Converter:
|
||
def __init__(self, sessions_folder: Path = Path("sessions"), proxy: str = ""):
|
||
"""
|
||
Конвертер сессий в json
|
||
sessions_folder: директория с сессиями
|
||
proxy: прокси для подключения формат: http:ip:port:user:pswd
|
||
если proxy "debug", то прокси не используется
|
||
"""
|
||
self.__thon_session = ThonSession(sessions_folder)
|
||
self.__api_id, self.__api_hash = 2040, "b18441a1ff607e10a989891a5462e627"
|
||
try:
|
||
self.__is_debug = proxy == "debug"
|
||
proxy_parser = ProxyParser(proxy)
|
||
self.__proxy = proxy_parser.thon
|
||
except Exception as e:
|
||
if not self.__is_debug:
|
||
raise e
|
||
self.__proxy = {}
|
||
|
||
async def _main(self, session: Session) -> Session:
|
||
"""
|
||
Конвертация сессии в json (string_session)
|
||
Возвращает объект Session
|
||
"""
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
client = TelegramClient(str(session.item), self.__api_id, self.__api_hash)
|
||
ss = StringSession()
|
||
ss._server_address = client.session.server_address # type: ignore
|
||
ss._takeout_id = client.session.takeout_id # type: ignore
|
||
ss._auth_key = client.session.auth_key # type: ignore
|
||
ss._dc_id = client.session.dc_id # type: ignore
|
||
ss._port = client.session.port # type: ignore
|
||
string_session = ss.save()
|
||
with contextlib.suppress(Exception):
|
||
await client.disconnect() # type: ignore # ty:ignore[unused-ignore-comment]
|
||
del client, ss
|
||
loop.close()
|
||
session.json_data["proxy"] = self.__proxy
|
||
session.json_data["string_session"] = string_session
|
||
await json_write(session.json_file, session.json_data)
|
||
if not self.__proxy and self.__is_debug:
|
||
session.json_data["proxy"] = "debug"
|
||
return session
|
||
|
||
async def __aiter__(self) -> AsyncGenerator[Session, None]:
|
||
"""
|
||
Поиск сессий в sessions_folder + конвертация сессии в json (string_session)
|
||
Возвращает генератор с объектами Session
|
||
json_data содержит:
|
||
string_session: str
|
||
proxy: dict
|
||
"""
|
||
async for session in self.__thon_session:
|
||
yield await self._main(session)
|
||
|
||
async def main(self) -> int:
|
||
"""
|
||
Основная функция для записи сессий в json файлы
|
||
Возвращает количество записанных сессий
|
||
"""
|
||
count = 0
|
||
async for session in self.__thon_session:
|
||
await self._main(session)
|
||
count += 1
|
||
return count
|