aiogram/examples/multibot.py
Alex Root Junior f4251382e8
Remove filters factory, introduce docs translation (#978)
* Rewrite filters

* Update README.rst

* Fixed tests

* Small optimization of the Text filter (TY to @bomzheg)

* Remove dataclass slots argument in due to the only Python 3.10 has an slots argument

* Fixed mypy

* Update tests

* Disable Python 3.11

* Fixed #1013: Empty mention should be None instead of empty string.

* Added #990 to the changelog

* Added #942 to the changelog

* Fixed coverage

* Update poetry and dependencies

* Fixed mypy

* Remove deprecated code

* Added more tests, update pyproject.toml

* Partial update docs

* Added initial Docs translation files

* Added more changes

* Added log message when connection is established in polling process

* Fixed action

* Disable lint for PyPy

* Added changelog for docs translation
2022-10-02 00:04:31 +03:00

85 lines
2.8 KiB
Python

from os import getenv
from typing import Any, Dict, Union
from aiohttp import web
from finite_state_machine import form_router
from aiogram import Bot, Dispatcher, F, Router
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.exceptions import TelegramUnauthorizedError
from aiogram.filters import Command, CommandObject
from aiogram.fsm.storage.redis import DefaultKeyBuilder, RedisStorage
from aiogram.types import Message
from aiogram.utils.token import TokenValidationError, validate_token
from aiogram.webhook.aiohttp_server import (
SimpleRequestHandler,
TokenBasedRequestHandler,
setup_application,
)
main_router = Router()
BASE_URL = getenv("BASE_URL", "https://example.com")
MAIN_BOT_TOKEN = getenv("TELEGRAM_TOKEN")
WEB_SERVER_HOST = "127.0.0.1"
WEB_SERVER_PORT = 8080
MAIN_BOT_PATH = "/webhook/main"
OTHER_BOTS_PATH = "/webhook/bot/{bot_token}"
REDIS_DSN = "redis://127.0.0.1:6479"
OTHER_BOTS_URL = f"{BASE_URL}{OTHER_BOTS_PATH}"
def is_bot_token(value: str) -> Union[bool, Dict[str, Any]]:
try:
validate_token(value)
except TokenValidationError:
return False
return True
@main_router.message(Command(commands=["add"], magic=F.args.func(is_bot_token)))
async def command_add_bot(message: Message, command: CommandObject, bot: Bot) -> Any:
new_bot = Bot(token=command.args, session=bot.session)
try:
bot_user = await new_bot.get_me()
except TelegramUnauthorizedError:
return message.answer("Invalid token")
await new_bot.delete_webhook(drop_pending_updates=True)
await new_bot.set_webhook(OTHER_BOTS_URL.format(bot_token=command.args))
await message.answer(f"Bot @{bot_user.username} successful added")
async def on_startup(dispatcher: Dispatcher, bot: Bot):
await bot.set_webhook(f"{BASE_URL}{MAIN_BOT_PATH}")
def main():
session = AiohttpSession()
bot_settings = {"session": session, "parse_mode": "HTML"}
bot = Bot(token=MAIN_BOT_TOKEN, **bot_settings)
storage = RedisStorage.from_url(REDIS_DSN, key_builder=DefaultKeyBuilder(with_bot_id=True))
main_dispatcher = Dispatcher(storage=storage)
main_dispatcher.include_router(main_router)
main_dispatcher.startup.register(on_startup)
multibot_dispatcher = Dispatcher(storage=storage)
multibot_dispatcher.include_router(form_router)
app = web.Application()
SimpleRequestHandler(dispatcher=main_dispatcher, bot=bot).register(app, path=MAIN_BOT_PATH)
TokenBasedRequestHandler(
dispatcher=multibot_dispatcher,
bot_settings=bot_settings,
).register(app, path=OTHER_BOTS_PATH)
setup_application(app, main_dispatcher, bot=bot)
setup_application(app, multibot_dispatcher)
web.run_app(app, host=WEB_SERVER_HOST, port=WEB_SERVER_PORT)
if __name__ == "__main__":
main()