From 06fbe0d9cd302537e769530f83d56735c62a838f Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Mon, 25 Jun 2018 01:31:57 +0300 Subject: [PATCH] Remove context util. --- aiogram/contrib/middlewares/context.py | 3 +- aiogram/dispatcher/__init__.py | 1108 +----------------------- aiogram/dispatcher/ctx.py | 42 - aiogram/dispatcher/dispatcher.py | 1076 +++++++++++++++++++++++ aiogram/dispatcher/filters.py | 0 aiogram/dispatcher/filters/builtin.py | 16 +- aiogram/dispatcher/filters/factory.py | 27 +- aiogram/dispatcher/filters/filters.py | 4 + aiogram/dispatcher/handler.py | 5 +- aiogram/dispatcher/webhook.py | 26 +- aiogram/types/base.py | 13 + aiogram/types/chat.py | 9 +- aiogram/types/message.py | 2 +- aiogram/types/update.py | 12 - aiogram/types/user.py | 2 + aiogram/utils/context.py | 140 --- aiogram/utils/executor.py | 5 +- examples/middleware_and_antiflood.py | 12 +- 18 files changed, 1169 insertions(+), 1333 deletions(-) delete mode 100644 aiogram/dispatcher/ctx.py create mode 100644 aiogram/dispatcher/dispatcher.py delete mode 100644 aiogram/dispatcher/filters.py delete mode 100644 aiogram/utils/context.py diff --git a/aiogram/contrib/middlewares/context.py b/aiogram/contrib/middlewares/context.py index 8e6dce7a..54fca52d 100644 --- a/aiogram/contrib/middlewares/context.py +++ b/aiogram/contrib/middlewares/context.py @@ -1,5 +1,4 @@ from aiogram import types -from aiogram.dispatcher import ctx from aiogram.dispatcher.middlewares import BaseMiddleware OBJ_KEY = '_context_data' @@ -46,7 +45,7 @@ class ContextMiddleware(BaseMiddleware): :return: """ - update = ctx.get_update() + update = types.Update.current() obj = update.conf.get(OBJ_KEY, None) if obj is None: obj = self._configure_update(update) diff --git a/aiogram/dispatcher/__init__.py b/aiogram/dispatcher/__init__.py index e49a9f30..7a2c2dc9 100644 --- a/aiogram/dispatcher/__init__.py +++ b/aiogram/dispatcher/__init__.py @@ -1,1091 +1,17 @@ -import asyncio -import functools -import itertools -import logging -import time -import typing -from contextvars import ContextVar - -from aiogram import types -from .filters import CommandsFilter, ContentTypeFilter, ExceptionsFilter, FiltersFactory, RegexpFilter -from .handler import CancelHandler, Handler, SkipHandler -from .middlewares import MiddlewareManager -from .storage import BaseStorage, DELTA, DisabledStorage, EXCEEDED_COUNT, FSMContext, \ - LAST_CALL, RATE_LIMIT, RESULT -from .webhook import BaseResponse -from ..bot import Bot -from ..types.message import ContentType -from ..utils import context -from ..utils.exceptions import TelegramAPIError, Throttled - -log = logging.getLogger(__name__) - -MODE = 'MODE' -LONG_POLLING = 'long-polling' -UPDATE_OBJECT = 'update_object' - -DEFAULT_RATE_LIMIT = .1 - - -class Dispatcher: - """ - Simple Updates dispatcher - - It will process incoming updates: messages, edited messages, channel posts, edited channel posts, - inline queries, chosen inline results, callback queries, shipping queries, pre-checkout queries. - """ - - def __init__(self, bot, loop=None, storage: typing.Optional[BaseStorage] = None, - run_tasks_by_default: bool = False, - throttling_rate_limit=DEFAULT_RATE_LIMIT, no_throttle_error=False, - filters_factory=None): - - if loop is None: - loop = bot.loop - if storage is None: - storage = DisabledStorage() - if filters_factory is None: - filters_factory = FiltersFactory(self) - - self.bot: Bot = bot - self.loop = loop - self.storage = storage - self.run_tasks_by_default = run_tasks_by_default - - self.throttling_rate_limit = throttling_rate_limit - self.no_throttle_error = no_throttle_error - - self.last_update_id = 0 - - self.filters_factory: FiltersFactory = filters_factory - self.updates_handler = Handler(self, middleware_key='update') - self.message_handlers = Handler(self, middleware_key='message') - self.edited_message_handlers = Handler(self, middleware_key='edited_message') - self.channel_post_handlers = Handler(self, middleware_key='channel_post') - self.edited_channel_post_handlers = Handler(self, middleware_key='edited_channel_post') - self.inline_query_handlers = Handler(self, middleware_key='inline_query') - self.chosen_inline_result_handlers = Handler(self, middleware_key='chosen_inline_result') - self.callback_query_handlers = Handler(self, middleware_key='callback_query') - self.shipping_query_handlers = Handler(self, middleware_key='shipping_query') - self.pre_checkout_query_handlers = Handler(self, middleware_key='pre_checkout_query') - self.errors_handlers = Handler(self, once=False, middleware_key='error') - - self.middleware = MiddlewareManager(self) - - self.updates_handler.register(self.process_update) - - self._polling = False - self._closed = True - self._close_waiter = loop.create_future() - - filters_factory.bind(filters.CommandsFilter, event_handlers=[ - self.message_handlers, self.edited_message_handlers - ]) - filters_factory.bind(filters.RegexpFilter, event_handlers=[ - self.message_handlers, self.edited_message_handlers, - self.channel_post_handlers, self.edited_channel_post_handlers, - self.callback_query_handlers - - ]) - filters_factory.bind(filters.RegexpCommandsFilter, event_handlers=[ - self.message_handlers, self.edited_message_handlers - ]) - filters_factory.bind(filters.ContentTypeFilter, event_handlers=[ - self.message_handlers, self.edited_message_handlers, - self.channel_post_handlers, self.edited_channel_post_handlers, - ]) - filters_factory.bind(filters.StateFilter) - filters_factory.bind(filters.ExceptionsFilter, event_handlers=[ - self.errors_handlers - ]) - - def __del__(self): - self.stop_polling() - - @property - def data(self): - return self.bot.data - - def __setitem__(self, key, value): - self.bot.data[key] = value - - def __getitem__(self, item): - return self.bot.data[item] - - def get(self, key, default=None): - return self.bot.data.get(key, default) - - @classmethod - def current(cls): - return dispatcher.get() - - async def skip_updates(self): - """ - You can skip old incoming updates from queue. - This method is not recommended to use if you use payments or you bot has high-load. - - :return: count of skipped updates - """ - total = 0 - updates = await self.bot.get_updates(offset=self.last_update_id, timeout=1) - while updates: - total += len(updates) - for update in updates: - if update.update_id > self.last_update_id: - self.last_update_id = update.update_id - updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1) - return total - - async def process_updates(self, updates): - """ - Process list of updates - - :param updates: - :return: - """ - tasks = [] - for update in updates: - tasks.append(self.updates_handler.notify(update)) - return await asyncio.gather(*tasks) - - async def process_update(self, update): - """ - Process single update object - - :param update: - :return: - """ - self.last_update_id = update.update_id - context.set_value(UPDATE_OBJECT, update) - - types.Update.set_current(update) - try: - if update.message: - state = await self.storage.get_state(chat=update.message.chat.id, - user=update.message.from_user.id) - context.update_state(chat=update.message.chat.id, - user=update.message.from_user.id, - state=state) - return await self.message_handlers.notify(update.message) - if update.edited_message: - state = await self.storage.get_state(chat=update.edited_message.chat.id, - user=update.edited_message.from_user.id) - context.update_state(chat=update.edited_message.chat.id, - user=update.edited_message.from_user.id, - state=state) - return await self.edited_message_handlers.notify(update.edited_message) - if update.channel_post: - state = await self.storage.get_state(chat=update.channel_post.chat.id) - context.update_state(chat=update.channel_post.chat.id, - state=state) - return await self.channel_post_handlers.notify(update.channel_post) - if update.edited_channel_post: - state = await self.storage.get_state(chat=update.edited_channel_post.chat.id) - context.update_state(chat=update.edited_channel_post.chat.id, - state=state) - return await self.edited_channel_post_handlers.notify(update.edited_channel_post) - if update.inline_query: - state = await self.storage.get_state(user=update.inline_query.from_user.id) - context.update_state(user=update.inline_query.from_user.id, - state=state) - return await self.inline_query_handlers.notify(update.inline_query) - if update.chosen_inline_result: - state = await self.storage.get_state(user=update.chosen_inline_result.from_user.id) - context.update_state(user=update.chosen_inline_result.from_user.id, - state=state) - return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result) - if update.callback_query: - state = await self.storage.get_state( - chat=update.callback_query.message.chat.id if update.callback_query.message else None, - user=update.callback_query.from_user.id) - context.update_state(user=update.callback_query.from_user.id, - state=state) - return await self.callback_query_handlers.notify(update.callback_query) - if update.shipping_query: - state = await self.storage.get_state(user=update.shipping_query.from_user.id) - context.update_state(user=update.shipping_query.from_user.id, - state=state) - return await self.shipping_query_handlers.notify(update.shipping_query) - if update.pre_checkout_query: - state = await self.storage.get_state(user=update.pre_checkout_query.from_user.id) - context.update_state(user=update.pre_checkout_query.from_user.id, - state=state) - return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query) - except Exception as e: - err = await self.errors_handlers.notify(self, update, e) - if err: - return err - raise - - async def reset_webhook(self, check=True) -> bool: - """ - Reset webhook - - :param check: check before deleting - :return: - """ - if check: - wh = await self.bot.get_webhook_info() - if not wh.url: - return False - - return await self.bot.delete_webhook() - - async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None): - """ - Start long-polling - - :param timeout: - :param relax: - :param limit: - :param reset_webhook: - :return: - """ - if self._polling: - raise RuntimeError('Polling already started') - - log.info('Start polling.') - - context.set_value(MODE, LONG_POLLING) - context.set_value('dispatcher', self) - context.set_value('bot', self.bot) - - if reset_webhook is None: - await self.reset_webhook(check=False) - if reset_webhook: - await self.reset_webhook(check=True) - - self._polling = True - offset = None - try: - while self._polling: - try: - updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) - except: - log.exception('Cause exception while getting updates.') - await asyncio.sleep(15) - continue - - if updates: - log.debug(f"Received {len(updates)} updates.") - offset = updates[-1].update_id + 1 - - self.loop.create_task(self._process_polling_updates(updates)) - - if relax: - await asyncio.sleep(relax) - finally: - self._close_waiter._set_result(None) - log.warning('Polling is stopped.') - - async def _process_polling_updates(self, updates): - """ - Process updates received from long-polling. - - :param updates: list of updates. - """ - need_to_call = [] - for responses in itertools.chain.from_iterable(await self.process_updates(updates)): - for response in responses: - if not isinstance(response, BaseResponse): - continue - need_to_call.append(response.execute_response(self.bot)) - if need_to_call: - try: - asyncio.gather(*need_to_call) - except TelegramAPIError: - log.exception('Cause exception while processing updates.') - - def stop_polling(self): - """ - Break long-polling process. - - :return: - """ - if self._polling: - log.info('Stop polling...') - self._polling = False - - async def wait_closed(self): - """ - Wait for the long-polling to close - - :return: - """ - await asyncio.shield(self._close_waiter, loop=self.loop) - - def is_polling(self): - """ - Check if polling is enabled - - :return: - """ - return self._polling - - def register_message_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, - func=None, state=None, run_task=None, **kwargs): - """ - Register handler for message - - .. code-block:: python3 - - # This handler works only if state is None (by default). - dp.register_message_handler(cmd_start, commands=['start', 'about']) - dp.register_message_handler(entry_point, commands=['setup']) - - # This handler works only if current state is "first_step" - dp.register_message_handler(step_handler_1, state="first_step") - - # If you want to handle all states by one handler, use `state="*"`. - dp.register_message_handler(cancel_handler, commands=['cancel'], state="*") - dp.register_message_handler(cancel_handler, func=lambda msg: msg.text.lower() == 'cancel', state="*") - - :param callback: - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param custom_filters: list of custom filters - :param kwargs: - :param state: - :return: decorated function - """ - if content_types is None: - content_types = ContentType.TEXT - if func is not None: - custom_filters = list(custom_filters) - custom_filters.append(func) - - filters_set = self.filters_factory.resolve(self.message_handlers, - *custom_filters, - commands=commands, - regexp=regexp, - content_types=content_types, - state=state, - **kwargs) - self.message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, state=None, - run_task=None, **kwargs): - """ - Decorator for message handler - - Examples: - - Simple commands handler: - - .. code-block:: python3 - - @dp.message_handler(commands=['start', 'welcome', 'about']) - async def cmd_handler(message: types.Message): - - Filter messages by regular expression: - - .. code-block:: python3 - - @dp.message_handler(rexexp='^[a-z]+-[0-9]+') - async def msg_handler(message: types.Message): - - Filter messages by command regular expression: - - .. code-block:: python3 - - @dp.message_handler(filters.RegexpCommandsFilter(regexp_commands=['item_([0-9]*)'])) - async def send_welcome(message: types.Message): - - Filter by content type: - - .. code-block:: python3 - - @dp.message_handler(content_types=ContentType.PHOTO | ContentType.DOCUMENT) - async def audio_handler(message: types.Message): - - Filter by custom function: - - .. code-block:: python3 - - @dp.message_handler(func=lambda message: message.text and 'hello' in message.text.lower()) - async def text_handler(message: types.Message): - - Use multiple filters: - - .. code-block:: python3 - - @dp.message_handler(commands=['command'], content_types=ContentType.TEXT) - async def text_handler(message: types.Message): - - Register multiple filters set for one handler: - - .. code-block:: python3 - - @dp.message_handler(commands=['command']) - @dp.message_handler(func=lambda message: demojize(message.text) == ':new_moon_with_face:') - async def text_handler(message: types.Message): - - This handler will be called if the message starts with '/command' OR is some emoji - - By default content_type is :class:`ContentType.TEXT` - - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param custom_filters: list of custom filters - :param kwargs: - :param state: - :param run_task: run callback in task (no wait results) - :return: decorated function - """ - - def decorator(callback): - self.register_message_handler(callback, *custom_filters, - commands=commands, regexp=regexp, content_types=content_types, - func=func, state=state, run_task=run_task, - **kwargs) - return callback - - return decorator - - def register_edited_message_handler(self, callback, *, commands=None, regexp=None, content_types=None, func=None, - state=None, custom_filters=None, run_task=None, **kwargs): - """ - Register handler for edited message - - :param callback: - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - if content_types is None: - content_types = ContentType.TEXT - if custom_filters is None: - custom_filters = [] - - filters_set = generate_default_filters(self, - *custom_filters, - commands=commands, - regexp=regexp, - content_types=content_types, - func=func, - state=state, - **kwargs) - self.edited_message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def edited_message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, - state=None, run_task=None, **kwargs): - """ - Decorator for edited message handler - - You can use combination of different handlers - - .. code-block:: python3 - - @dp.message_handler() - @dp.edited_message_handler() - async def msg_handler(message: types.Message): - - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - - def decorator(callback): - self.register_edited_message_handler(callback, commands=commands, regexp=regexp, - content_types=content_types, func=func, state=state, - custom_filters=custom_filters, run_task=run_task, **kwargs) - return callback - - return decorator - - def register_channel_post_handler(self, callback, *, commands=None, regexp=None, content_types=None, func=None, - state=None, custom_filters=None, run_task=None, **kwargs): - """ - Register handler for channel post - - :param callback: - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - if content_types is None: - content_types = ContentType.TEXT - if custom_filters is None: - custom_filters = [] - - filters_set = generate_default_filters(self, - *custom_filters, - commands=commands, - regexp=regexp, - content_types=content_types, - func=func, - state=state, - **kwargs) - self.channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, - state=None, run_task=None, **kwargs): - """ - Decorator for channel post handler - - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - - def decorator(callback): - self.register_channel_post_handler(callback, commands=commands, regexp=regexp, content_types=content_types, - func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_edited_channel_post_handler(self, callback, *, commands=None, regexp=None, content_types=None, - func=None, state=None, custom_filters=None, run_task=None, **kwargs): - """ - Register handler for edited channel post - - :param callback: - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - if content_types is None: - content_types = ContentType.TEXT - if custom_filters is None: - custom_filters = [] - - filters_set = generate_default_filters(self, - *custom_filters, - commands=commands, - regexp=regexp, - content_types=content_types, - func=func, - state=state, - **kwargs) - self.edited_channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def edited_channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, - state=None, run_task=None, **kwargs): - """ - Decorator for edited channel post handler - - :param commands: list of commands - :param regexp: REGEXP - :param content_types: List of content types. - :param func: custom any callable object - :param custom_filters: list of custom filters - :param state: - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - - def decorator(callback): - self.register_edited_channel_post_handler(callback, commands=commands, regexp=regexp, - content_types=content_types, func=func, state=state, - custom_filters=custom_filters, run_task=run_task, **kwargs) - return callback - - return decorator - - def register_inline_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, **kwargs): - """ - Register handler for inline query - - Example: - - .. code-block:: python3 - - dp.register_inline_handler(some_inline_handler, func=lambda inline_query: True) - - :param callback: - :param func: custom any callable object - :param custom_filters: list of custom filters - :param state: - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - if custom_filters is None: - custom_filters = [] - filters_set = generate_default_filters(self, - *custom_filters, - func=func, - state=state, - **kwargs) - self.inline_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def inline_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): - """ - Decorator for inline query handler - - Example: - - .. code-block:: python3 - - @dp.inline_handler(func=lambda inline_query: True) - async def some_inline_handler(inline_query: types.InlineQuery) - - :param func: custom any callable object - :param state: - :param custom_filters: list of custom filters - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: decorated function - """ - - def decorator(callback): - self.register_inline_handler(callback, func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_chosen_inline_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, - **kwargs): - """ - Register handler for chosen inline query - - Example: - - .. code-block:: python3 - - dp.register_chosen_inline_handler(some_chosen_inline_handler, func=lambda chosen_inline_query: True) - - :param callback: - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: - """ - if custom_filters is None: - custom_filters = [] - filters_set = generate_default_filters(self, - *custom_filters, - func=func, - state=state, - **kwargs) - self.chosen_inline_result_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def chosen_inline_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): - """ - Decorator for chosen inline query handler - - Example: - - .. code-block:: python3 - - @dp.chosen_inline_handler(func=lambda chosen_inline_query: True) - async def some_chosen_inline_handler(chosen_inline_query: types.ChosenInlineResult) - - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - :return: - """ - - def decorator(callback): - self.register_chosen_inline_handler(callback, func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_callback_query_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, - **kwargs): - """ - Register handler for callback query - - Example: - - .. code-block:: python3 - - dp.register_callback_query_handler(some_callback_handler, func=lambda callback_query: True) - - :param callback: - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - if custom_filters is None: - custom_filters = [] - filters_set = generate_default_filters(self, - *custom_filters, - func=func, - state=state, - **kwargs) - self.callback_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def callback_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): - """ - Decorator for callback query handler - - Example: - - .. code-block:: python3 - - @dp.callback_query_handler(func=lambda callback_query: True) - async def some_callback_handler(callback_query: types.CallbackQuery) - - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - - def decorator(callback): - self.register_callback_query_handler(callback, func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_shipping_query_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, - **kwargs): - """ - Register handler for shipping query - - Example: - - .. code-block:: python3 - - dp.register_shipping_query_handler(some_shipping_query_handler, func=lambda shipping_query: True) - - :param callback: - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - if custom_filters is None: - custom_filters = [] - filters_set = generate_default_filters(self, - *custom_filters, - func=func, - state=state, - **kwargs) - self.shipping_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def shipping_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): - """ - Decorator for shipping query handler - - Example: - - .. code-block:: python3 - - @dp.shipping_query_handler(func=lambda shipping_query: True) - async def some_shipping_query_handler(shipping_query: types.ShippingQuery) - - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - - def decorator(callback): - self.register_shipping_query_handler(callback, func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_pre_checkout_query_handler(self, callback, *, func=None, state=None, custom_filters=None, - run_task=None, **kwargs): - """ - Register handler for pre-checkout query - - Example: - - .. code-block:: python3 - - dp.register_pre_checkout_query_handler(some_pre_checkout_query_handler, func=lambda shipping_query: True) - - :param callback: - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - if custom_filters is None: - custom_filters = [] - filters_set = generate_default_filters(self, - *custom_filters, - func=func, - state=state, - **kwargs) - self.pre_checkout_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def pre_checkout_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): - """ - Decorator for pre-checkout query handler - - Example: - - .. code-block:: python3 - - @dp.pre_checkout_query_handler(func=lambda shipping_query: True) - async def some_pre_checkout_query_handler(shipping_query: types.ShippingQuery) - - :param func: custom any callable object - :param state: - :param custom_filters: - :param run_task: run callback in task (no wait results) - :param kwargs: - """ - - def decorator(callback): - self.register_pre_checkout_query_handler(callback, func=func, state=state, custom_filters=custom_filters, - run_task=run_task, **kwargs) - return callback - - return decorator - - def register_errors_handler(self, callback, *, func=None, exception=None, run_task=None): - """ - Register handler for errors - - :param callback: - :param func: - :param exception: you can make handler for specific errors type - :param run_task: run callback in task (no wait results) - """ - filters_set = [] - if func is not None: - filters_set.append(func) - if exception is not None: - filters_set.append(ExceptionsFilter(exception)) - self.errors_handlers.register(self._wrap_async_task(callback, run_task), filters_set) - - def errors_handler(self, func=None, exception=None, run_task=None): - """ - Decorator for errors handler - - :param func: - :param exception: you can make handler for specific errors type - :param run_task: run callback in task (no wait results) - :return: - """ - - def decorator(callback): - self.register_errors_handler(self._wrap_async_task(callback, run_task), - func=func, exception=exception) - return callback - - return decorator - - def current_state(self, *, - chat: typing.Union[str, int, None] = None, - user: typing.Union[str, int, None] = None) -> FSMContext: - """ - Get current state for user in chat as context - - .. code-block:: python3 - - with dp.current_state(chat=message.chat.id, user=message.user.id) as state: - pass - - state = dp.current_state() - state.set_state('my_state') - - :param chat: - :param user: - :return: - """ - if chat is None: - from .ctx import get_chat - chat = get_chat() - if user is None: - from .ctx import get_user - user = get_user() - - return FSMContext(storage=self.storage, chat=chat, user=user) - - async def throttle(self, key, *, rate=None, user=None, chat=None, no_error=None) -> bool: - """ - Execute throttling manager. - Returns True if limit has not exceeded otherwise raises ThrottleError or returns False - - :param key: key in storage - :param rate: limit (by default is equal to default rate limit) - :param user: user id - :param chat: chat id - :param no_error: return boolean value instead of raising error - :return: bool - """ - if not self.storage.has_bucket(): - raise RuntimeError('This storage does not provide Leaky Bucket') - - if no_error is None: - no_error = self.no_throttle_error - if rate is None: - rate = self.throttling_rate_limit - if user is None and chat is None: - from . import ctx - user = ctx.get_user() - chat = ctx.get_chat() - - # Detect current time - now = time.time() - - bucket = await self.storage.get_bucket(chat=chat, user=user) - - # Fix bucket - if bucket is None: - bucket = {key: {}} - if key not in bucket: - bucket[key] = {} - data = bucket[key] - - # Calculate - called = data.get(LAST_CALL, now) - delta = now - called - result = delta >= rate or delta <= 0 - - # Save results - data[RESULT] = result - data[RATE_LIMIT] = rate - data[LAST_CALL] = now - data[DELTA] = delta - if not result: - data[EXCEEDED_COUNT] += 1 - else: - data[EXCEEDED_COUNT] = 1 - bucket[key].update(data) - await self.storage.set_bucket(chat=chat, user=user, bucket=bucket) - - if not result and not no_error: - # Raise if it is allowed - raise Throttled(key=key, chat=chat, user=user, **data) - return result - - async def check_key(self, key, chat=None, user=None): - """ - Get information about key in bucket - - :param key: - :param chat: - :param user: - :return: - """ - if not self.storage.has_bucket(): - raise RuntimeError('This storage does not provide Leaky Bucket') - - if user is None and chat is None: - from . import ctx - user = ctx.get_user() - chat = ctx.get_chat() - - bucket = await self.storage.get_bucket(chat=chat, user=user) - data = bucket.get(key, {}) - return Throttled(key=key, chat=chat, user=user, **data) - - async def release_key(self, key, chat=None, user=None): - """ - Release blocked key - - :param key: - :param chat: - :param user: - :return: - """ - if not self.storage.has_bucket(): - raise RuntimeError('This storage does not provide Leaky Bucket') - - if user is None and chat is None: - from . import ctx - user = ctx.get_user() - chat = ctx.get_chat() - - bucket = await self.storage.get_bucket(chat=chat, user=user) - if bucket and key in bucket: - del bucket['key'] - await self.storage.set_bucket(chat=chat, user=user, bucket=bucket) - return True - return False - - def async_task(self, func): - """ - Execute handler as task and return None. - Use this decorator for slow handlers (with timeouts) - - .. code-block:: python3 - - @dp.message_handler(commands=['command']) - @dp.async_task - async def cmd_with_timeout(message: types.Message): - await asyncio.sleep(120) - return SendMessage(message.chat.id, 'KABOOM').reply(message) - - :param func: - :return: - """ - - def process_response(task): - try: - response = task.result() - except Exception as e: - self.loop.create_task( - self.errors_handlers.notify(self, task.context.get(UPDATE_OBJECT, None), e)) - else: - if isinstance(response, BaseResponse): - self.loop.create_task(response.execute_response(self.bot)) - - @functools.wraps(func) - async def wrapper(*args, **kwargs): - task = self.loop.create_task(func(*args, **kwargs)) - task.add_done_callback(process_response) - - return wrapper - - def _wrap_async_task(self, callback, run_task=None) -> callable: - if run_task is None: - run_task = self.run_tasks_by_default - - if run_task: - return self.async_task(callback) - return callback - - -dispatcher: ContextVar[Dispatcher] = ContextVar('dispatcher_instance', default=None) +from . import filters +from . import handler +from . import middlewares +from . import storage +from . import webhook +from .dispatcher import Dispatcher, dispatcher, FSMContext + +__all__ = [ + 'Dispatcher', + 'dispatcher', + 'FSMContext', + 'filters', + 'handler', + 'middlewares', + 'storage', + 'webhook' +] diff --git a/aiogram/dispatcher/ctx.py b/aiogram/dispatcher/ctx.py deleted file mode 100644 index 18229125..00000000 --- a/aiogram/dispatcher/ctx.py +++ /dev/null @@ -1,42 +0,0 @@ -from . import Bot -from .. import types -from ..dispatcher import Dispatcher, FSMContext, MODE, UPDATE_OBJECT -from ..utils import context - - -def _get(key, default=None, no_error=False): - result = context.get_value(key, default) - if not no_error and result is None: - raise RuntimeError(f"Key '{key}' does not exist in the current execution context!\n" - f"Maybe asyncio task factory is not configured!\n" - f"\t>>> from aiogram.utils import context\n" - f"\t>>> loop.set_task_factory(context.task_factory)") - return result - - -def get_bot() -> Bot: - return _get('bot') - - -def get_dispatcher() -> Dispatcher: - return _get('dispatcher') - - -def get_update() -> types.Update: - return _get(UPDATE_OBJECT) - - -def get_mode() -> str: - return _get(MODE, 'unknown') - - -def get_chat() -> int: - return _get('chat', no_error=True) - - -def get_user() -> int: - return _get('user', no_error=True) - - -def get_state() -> FSMContext: - return get_dispatcher().current_state() diff --git a/aiogram/dispatcher/dispatcher.py b/aiogram/dispatcher/dispatcher.py new file mode 100644 index 00000000..d643b3dd --- /dev/null +++ b/aiogram/dispatcher/dispatcher.py @@ -0,0 +1,1076 @@ +import asyncio +import functools +import itertools +import logging +import time +import typing +from contextvars import ContextVar + +from aiogram.dispatcher.filters import RegexpCommandsFilter, StateFilter +from .filters import CommandsFilter, ContentTypeFilter, ExceptionsFilter, FiltersFactory, RegexpFilter +from .handler import Handler +from .middlewares import MiddlewareManager +from .storage import BaseStorage, DELTA, DisabledStorage, EXCEEDED_COUNT, FSMContext, \ + LAST_CALL, RATE_LIMIT, RESULT +from .webhook import BaseResponse +from .. import types +from ..bot import Bot, bot +from ..types.message import ContentType +from ..utils.exceptions import TelegramAPIError, Throttled + +log = logging.getLogger(__name__) + +MODE = 'MODE' +LONG_POLLING = 'long-polling' +UPDATE_OBJECT = 'update_object' + +DEFAULT_RATE_LIMIT = .1 + +current_user: ContextVar[int] = ContextVar('current_user_id', default=None) +current_chat: ContextVar[int] = ContextVar('current_chat_id', default=None) +current_state: ContextVar[int] = ContextVar('current_state', default=None) + + +class Dispatcher: + """ + Simple Updates dispatcher + + It will process incoming updates: messages, edited messages, channel posts, edited channel posts, + inline queries, chosen inline results, callback queries, shipping queries, pre-checkout queries. + """ + + def __init__(self, bot, loop=None, storage: typing.Optional[BaseStorage] = None, + run_tasks_by_default: bool = False, + throttling_rate_limit=DEFAULT_RATE_LIMIT, no_throttle_error=False, + filters_factory=None): + + if loop is None: + loop = bot.loop + if storage is None: + storage = DisabledStorage() + if filters_factory is None: + filters_factory = FiltersFactory(self) + + self.bot: Bot = bot + self.loop = loop + self.storage = storage + self.run_tasks_by_default = run_tasks_by_default + + self.throttling_rate_limit = throttling_rate_limit + self.no_throttle_error = no_throttle_error + + self.last_update_id = 0 + + self.filters_factory: FiltersFactory = filters_factory + self.updates_handler = Handler(self, middleware_key='update') + self.message_handlers = Handler(self, middleware_key='message') + self.edited_message_handlers = Handler(self, middleware_key='edited_message') + self.channel_post_handlers = Handler(self, middleware_key='channel_post') + self.edited_channel_post_handlers = Handler(self, middleware_key='edited_channel_post') + self.inline_query_handlers = Handler(self, middleware_key='inline_query') + self.chosen_inline_result_handlers = Handler(self, middleware_key='chosen_inline_result') + self.callback_query_handlers = Handler(self, middleware_key='callback_query') + self.shipping_query_handlers = Handler(self, middleware_key='shipping_query') + self.pre_checkout_query_handlers = Handler(self, middleware_key='pre_checkout_query') + self.errors_handlers = Handler(self, once=False, middleware_key='error') + + self.middleware = MiddlewareManager(self) + + self.updates_handler.register(self.process_update) + + self._polling = False + self._closed = True + self._close_waiter = loop.create_future() + + filters_factory.bind(CommandsFilter, event_handlers=[ + self.message_handlers, self.edited_message_handlers + ]) + filters_factory.bind(RegexpFilter, event_handlers=[ + self.message_handlers, self.edited_message_handlers, + self.channel_post_handlers, self.edited_channel_post_handlers, + self.callback_query_handlers + + ]) + filters_factory.bind(RegexpCommandsFilter, event_handlers=[ + self.message_handlers, self.edited_message_handlers + ]) + filters_factory.bind(ContentTypeFilter, event_handlers=[ + self.message_handlers, self.edited_message_handlers, + self.channel_post_handlers, self.edited_channel_post_handlers, + ]) + filters_factory.bind(StateFilter) + filters_factory.bind(ExceptionsFilter, event_handlers=[ + self.errors_handlers + ]) + + def __del__(self): + self.stop_polling() + + @property + def data(self): + return self.bot.data + + def __setitem__(self, key, value): + self.bot.data[key] = value + + def __getitem__(self, item): + return self.bot.data[item] + + def get(self, key, default=None): + return self.bot.data.get(key, default) + + @classmethod + def current(cls): + return dispatcher.get() + + async def skip_updates(self): + """ + You can skip old incoming updates from queue. + This method is not recommended to use if you use payments or you bot has high-load. + + :return: count of skipped updates + """ + total = 0 + updates = await self.bot.get_updates(offset=self.last_update_id, timeout=1) + while updates: + total += len(updates) + for update in updates: + if update.update_id > self.last_update_id: + self.last_update_id = update.update_id + updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1) + return total + + async def process_updates(self, updates): + """ + Process list of updates + + :param updates: + :return: + """ + tasks = [] + for update in updates: + tasks.append(self.updates_handler.notify(update)) + return await asyncio.gather(*tasks) + + async def process_update(self, update: types.Update): + """ + Process single update object + + :param update: + :return: + """ + self.last_update_id = update.update_id + types.Update.set_current(update) + + try: + if update.message: + # state = await self.storage.get_state(chat=update.message.chat.id, + # user=update.message.from_user.id) + types.User.set_current(update.message.from_user) + types.Chat.set_current(update.message.chat) + return await self.message_handlers.notify(update.message) + if update.edited_message: + # state = await self.storage.get_state(chat=update.edited_message.chat.id, + # user=update.edited_message.from_user.id) + types.User.set_current(update.edited_message.from_user) + types.Chat.set_current(update.edited_message.chat) + return await self.edited_message_handlers.notify(update.edited_message) + if update.channel_post: + # state = await self.storage.get_state(chat=update.channel_post.chat.id) + types.Chat.set_current(update.channel_post.chat) + return await self.channel_post_handlers.notify(update.channel_post) + if update.edited_channel_post: + # state = await self.storage.get_state(chat=update.edited_channel_post.chat.id) + types.Chat.set_current(update.edited_channel_post.chat) + return await self.edited_channel_post_handlers.notify(update.edited_channel_post) + if update.inline_query: + # state = await self.storage.get_state(user=update.inline_query.from_user.id) + types.User.set_current(update.inline_query.from_user) + return await self.inline_query_handlers.notify(update.inline_query) + if update.chosen_inline_result: + # state = await self.storage.get_state(user=update.chosen_inline_result.from_user.id) + types.User.set_current(update.chosen_inline_result.from_user) + return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result) + if update.callback_query: + # state = await self.storage.get_state( + # chat=update.callback_query.message.chat.id if update.callback_query.message else None, + # user=update.callback_query.from_user.id) + if update.callback_query.message: + types.Chat.set_current(update.callback_query.message.chat) + types.User.set_current(update.callback_query.from_user) + return await self.callback_query_handlers.notify(update.callback_query) + if update.shipping_query: + # state = await self.storage.get_state(user=update.shipping_query.from_user.id) + types.User.set_current(update.shipping_query.from_user) + return await self.shipping_query_handlers.notify(update.shipping_query) + if update.pre_checkout_query: + # state = await self.storage.get_state(user=update.pre_checkout_query.from_user.id) + types.User.set_current(update.pre_checkout_query.from_user) + return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query) + except Exception as e: + err = await self.errors_handlers.notify(self, update, e) + if err: + return err + raise + + async def reset_webhook(self, check=True) -> bool: + """ + Reset webhook + + :param check: check before deleting + :return: + """ + if check: + wh = await self.bot.get_webhook_info() + if not wh.url: + return False + + return await self.bot.delete_webhook() + + async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None): + """ + Start long-polling + + :param timeout: + :param relax: + :param limit: + :param reset_webhook: + :return: + """ + if self._polling: + raise RuntimeError('Polling already started') + + log.info('Start polling.') + + # context.set_value(MODE, LONG_POLLING) + dispatcher.set(self) + bot.bot.set(self.bot) + + if reset_webhook is None: + await self.reset_webhook(check=False) + if reset_webhook: + await self.reset_webhook(check=True) + + self._polling = True + offset = None + try: + while self._polling: + try: + updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) + except: + log.exception('Cause exception while getting updates.') + await asyncio.sleep(15) + continue + + if updates: + log.debug(f"Received {len(updates)} updates.") + offset = updates[-1].update_id + 1 + + self.loop.create_task(self._process_polling_updates(updates)) + + if relax: + await asyncio.sleep(relax) + finally: + self._close_waiter._set_result(None) + log.warning('Polling is stopped.') + + async def _process_polling_updates(self, updates): + """ + Process updates received from long-polling. + + :param updates: list of updates. + """ + need_to_call = [] + for responses in itertools.chain.from_iterable(await self.process_updates(updates)): + for response in responses: + if not isinstance(response, BaseResponse): + continue + need_to_call.append(response.execute_response(self.bot)) + if need_to_call: + try: + asyncio.gather(*need_to_call) + except TelegramAPIError: + log.exception('Cause exception while processing updates.') + + def stop_polling(self): + """ + Break long-polling process. + + :return: + """ + if self._polling: + log.info('Stop polling...') + self._polling = False + + async def wait_closed(self): + """ + Wait for the long-polling to close + + :return: + """ + await asyncio.shield(self._close_waiter, loop=self.loop) + + def is_polling(self): + """ + Check if polling is enabled + + :return: + """ + return self._polling + + def register_message_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, + state=None, run_task=None, **kwargs): + """ + Register handler for message + + .. code-block:: python3 + + # This handler works only if state is None (by default). + dp.register_message_handler(cmd_start, commands=['start', 'about']) + dp.register_message_handler(entry_point, commands=['setup']) + + # This handler works only if current state is "first_step" + dp.register_message_handler(step_handler_1, state="first_step") + + # If you want to handle all states by one handler, use `state="*"`. + dp.register_message_handler(cancel_handler, commands=['cancel'], state="*") + dp.register_message_handler(cancel_handler, func=lambda msg: msg.text.lower() == 'cancel', state="*") + + :param callback: + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param custom_filters: list of custom filters + :param kwargs: + :param state: + :return: decorated function + """ + if content_types is None: + content_types = ContentType.TEXT + + filters_set = self.filters_factory.resolve(self.message_handlers, + *custom_filters, + commands=commands, + regexp=regexp, + content_types=content_types, + state=state, + **kwargs) + self.message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, state=None, + run_task=None, **kwargs): + """ + Decorator for message handler + + Examples: + + Simple commands handler: + + .. code-block:: python3 + + @dp.message_handler(commands=['start', 'welcome', 'about']) + async def cmd_handler(message: types.Message): + + Filter messages by regular expression: + + .. code-block:: python3 + + @dp.message_handler(rexexp='^[a-z]+-[0-9]+') + async def msg_handler(message: types.Message): + + Filter messages by command regular expression: + + .. code-block:: python3 + + @dp.message_handler(filters.RegexpCommandsFilter(regexp_commands=['item_([0-9]*)'])) + async def send_welcome(message: types.Message): + + Filter by content type: + + .. code-block:: python3 + + @dp.message_handler(content_types=ContentType.PHOTO | ContentType.DOCUMENT) + async def audio_handler(message: types.Message): + + Filter by custom function: + + .. code-block:: python3 + + @dp.message_handler(func=lambda message: message.text and 'hello' in message.text.lower()) + async def text_handler(message: types.Message): + + Use multiple filters: + + .. code-block:: python3 + + @dp.message_handler(commands=['command'], content_types=ContentType.TEXT) + async def text_handler(message: types.Message): + + Register multiple filters set for one handler: + + .. code-block:: python3 + + @dp.message_handler(commands=['command']) + @dp.message_handler(func=lambda message: demojize(message.text) == ':new_moon_with_face:') + async def text_handler(message: types.Message): + + This handler will be called if the message starts with '/command' OR is some emoji + + By default content_type is :class:`ContentType.TEXT` + + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param custom_filters: list of custom filters + :param kwargs: + :param state: + :param run_task: run callback in task (no wait results) + :return: decorated function + """ + + def decorator(callback): + self.register_message_handler(callback, *custom_filters, + commands=commands, regexp=regexp, content_types=content_types, + state=state, run_task=run_task, **kwargs) + return callback + + return decorator + + def register_edited_message_handler(self, callback, *, commands=None, regexp=None, content_types=None, func=None, + state=None, custom_filters=None, run_task=None, **kwargs): + """ + Register handler for edited message + + :param callback: + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + if content_types is None: + content_types = ContentType.TEXT + if custom_filters is None: + custom_filters = [] + + filters_set = generate_default_filters(self, + *custom_filters, + commands=commands, + regexp=regexp, + content_types=content_types, + func=func, + state=state, + **kwargs) + self.edited_message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def edited_message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, + state=None, run_task=None, **kwargs): + """ + Decorator for edited message handler + + You can use combination of different handlers + + .. code-block:: python3 + + @dp.message_handler() + @dp.edited_message_handler() + async def msg_handler(message: types.Message): + + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + + def decorator(callback): + self.register_edited_message_handler(callback, commands=commands, regexp=regexp, + content_types=content_types, func=func, state=state, + custom_filters=custom_filters, run_task=run_task, **kwargs) + return callback + + return decorator + + def register_channel_post_handler(self, callback, *, commands=None, regexp=None, content_types=None, func=None, + state=None, custom_filters=None, run_task=None, **kwargs): + """ + Register handler for channel post + + :param callback: + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + if content_types is None: + content_types = ContentType.TEXT + if custom_filters is None: + custom_filters = [] + + filters_set = generate_default_filters(self, + *custom_filters, + commands=commands, + regexp=regexp, + content_types=content_types, + func=func, + state=state, + **kwargs) + self.channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, + state=None, run_task=None, **kwargs): + """ + Decorator for channel post handler + + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + + def decorator(callback): + self.register_channel_post_handler(callback, commands=commands, regexp=regexp, content_types=content_types, + func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_edited_channel_post_handler(self, callback, *, commands=None, regexp=None, content_types=None, + func=None, state=None, custom_filters=None, run_task=None, **kwargs): + """ + Register handler for edited channel post + + :param callback: + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + if content_types is None: + content_types = ContentType.TEXT + if custom_filters is None: + custom_filters = [] + + filters_set = generate_default_filters(self, + *custom_filters, + commands=commands, + regexp=regexp, + content_types=content_types, + func=func, + state=state, + **kwargs) + self.edited_channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def edited_channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, func=None, + state=None, run_task=None, **kwargs): + """ + Decorator for edited channel post handler + + :param commands: list of commands + :param regexp: REGEXP + :param content_types: List of content types. + :param func: custom any callable object + :param custom_filters: list of custom filters + :param state: + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + + def decorator(callback): + self.register_edited_channel_post_handler(callback, commands=commands, regexp=regexp, + content_types=content_types, func=func, state=state, + custom_filters=custom_filters, run_task=run_task, **kwargs) + return callback + + return decorator + + def register_inline_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, **kwargs): + """ + Register handler for inline query + + Example: + + .. code-block:: python3 + + dp.register_inline_handler(some_inline_handler, func=lambda inline_query: True) + + :param callback: + :param func: custom any callable object + :param custom_filters: list of custom filters + :param state: + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + if custom_filters is None: + custom_filters = [] + filters_set = generate_default_filters(self, + *custom_filters, + func=func, + state=state, + **kwargs) + self.inline_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def inline_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): + """ + Decorator for inline query handler + + Example: + + .. code-block:: python3 + + @dp.inline_handler(func=lambda inline_query: True) + async def some_inline_handler(inline_query: types.InlineQuery) + + :param func: custom any callable object + :param state: + :param custom_filters: list of custom filters + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: decorated function + """ + + def decorator(callback): + self.register_inline_handler(callback, func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_chosen_inline_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, + **kwargs): + """ + Register handler for chosen inline query + + Example: + + .. code-block:: python3 + + dp.register_chosen_inline_handler(some_chosen_inline_handler, func=lambda chosen_inline_query: True) + + :param callback: + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: + """ + if custom_filters is None: + custom_filters = [] + filters_set = generate_default_filters(self, + *custom_filters, + func=func, + state=state, + **kwargs) + self.chosen_inline_result_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def chosen_inline_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): + """ + Decorator for chosen inline query handler + + Example: + + .. code-block:: python3 + + @dp.chosen_inline_handler(func=lambda chosen_inline_query: True) + async def some_chosen_inline_handler(chosen_inline_query: types.ChosenInlineResult) + + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + :return: + """ + + def decorator(callback): + self.register_chosen_inline_handler(callback, func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_callback_query_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, + **kwargs): + """ + Register handler for callback query + + Example: + + .. code-block:: python3 + + dp.register_callback_query_handler(some_callback_handler, func=lambda callback_query: True) + + :param callback: + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + if custom_filters is None: + custom_filters = [] + filters_set = generate_default_filters(self, + *custom_filters, + func=func, + state=state, + **kwargs) + self.callback_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def callback_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): + """ + Decorator for callback query handler + + Example: + + .. code-block:: python3 + + @dp.callback_query_handler(func=lambda callback_query: True) + async def some_callback_handler(callback_query: types.CallbackQuery) + + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + + def decorator(callback): + self.register_callback_query_handler(callback, func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_shipping_query_handler(self, callback, *, func=None, state=None, custom_filters=None, run_task=None, + **kwargs): + """ + Register handler for shipping query + + Example: + + .. code-block:: python3 + + dp.register_shipping_query_handler(some_shipping_query_handler, func=lambda shipping_query: True) + + :param callback: + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + if custom_filters is None: + custom_filters = [] + filters_set = generate_default_filters(self, + *custom_filters, + func=func, + state=state, + **kwargs) + self.shipping_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def shipping_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): + """ + Decorator for shipping query handler + + Example: + + .. code-block:: python3 + + @dp.shipping_query_handler(func=lambda shipping_query: True) + async def some_shipping_query_handler(shipping_query: types.ShippingQuery) + + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + + def decorator(callback): + self.register_shipping_query_handler(callback, func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_pre_checkout_query_handler(self, callback, *, func=None, state=None, custom_filters=None, + run_task=None, **kwargs): + """ + Register handler for pre-checkout query + + Example: + + .. code-block:: python3 + + dp.register_pre_checkout_query_handler(some_pre_checkout_query_handler, func=lambda shipping_query: True) + + :param callback: + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + if custom_filters is None: + custom_filters = [] + filters_set = generate_default_filters(self, + *custom_filters, + func=func, + state=state, + **kwargs) + self.pre_checkout_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def pre_checkout_query_handler(self, *custom_filters, func=None, state=None, run_task=None, **kwargs): + """ + Decorator for pre-checkout query handler + + Example: + + .. code-block:: python3 + + @dp.pre_checkout_query_handler(func=lambda shipping_query: True) + async def some_pre_checkout_query_handler(shipping_query: types.ShippingQuery) + + :param func: custom any callable object + :param state: + :param custom_filters: + :param run_task: run callback in task (no wait results) + :param kwargs: + """ + + def decorator(callback): + self.register_pre_checkout_query_handler(callback, func=func, state=state, custom_filters=custom_filters, + run_task=run_task, **kwargs) + return callback + + return decorator + + def register_errors_handler(self, callback, *, func=None, exception=None, run_task=None): + """ + Register handler for errors + + :param callback: + :param func: + :param exception: you can make handler for specific errors type + :param run_task: run callback in task (no wait results) + """ + filters_set = [] + if func is not None: + filters_set.append(func) + if exception is not None: + filters_set.append(ExceptionsFilter(exception)) + self.errors_handlers.register(self._wrap_async_task(callback, run_task), filters_set) + + def errors_handler(self, func=None, exception=None, run_task=None): + """ + Decorator for errors handler + + :param func: + :param exception: you can make handler for specific errors type + :param run_task: run callback in task (no wait results) + :return: + """ + + def decorator(callback): + self.register_errors_handler(self._wrap_async_task(callback, run_task), + func=func, exception=exception) + return callback + + return decorator + + def current_state(self, *, + chat: typing.Union[str, int, None] = None, + user: typing.Union[str, int, None] = None) -> FSMContext: + """ + Get current state for user in chat as context + + .. code-block:: python3 + + with dp.current_state(chat=message.chat.id, user=message.user.id) as state: + pass + + state = dp.current_state() + state.set_state('my_state') + + :param chat: + :param user: + :return: + """ + if chat is None: + chat = types.Chat.current() + if user is None: + user = types.User.current() + + return FSMContext(storage=self.storage, chat=chat, user=user) + + async def throttle(self, key, *, rate=None, user=None, chat=None, no_error=None) -> bool: + """ + Execute throttling manager. + Returns True if limit has not exceeded otherwise raises ThrottleError or returns False + + :param key: key in storage + :param rate: limit (by default is equal to default rate limit) + :param user: user id + :param chat: chat id + :param no_error: return boolean value instead of raising error + :return: bool + """ + if not self.storage.has_bucket(): + raise RuntimeError('This storage does not provide Leaky Bucket') + + if no_error is None: + no_error = self.no_throttle_error + if rate is None: + rate = self.throttling_rate_limit + if user is None and chat is None: + user = types.User.current() + chat = types.Chat.current() + + # Detect current time + now = time.time() + + bucket = await self.storage.get_bucket(chat=chat, user=user) + + # Fix bucket + if bucket is None: + bucket = {key: {}} + if key not in bucket: + bucket[key] = {} + data = bucket[key] + + # Calculate + called = data.get(LAST_CALL, now) + delta = now - called + result = delta >= rate or delta <= 0 + + # Save results + data[RESULT] = result + data[RATE_LIMIT] = rate + data[LAST_CALL] = now + data[DELTA] = delta + if not result: + data[EXCEEDED_COUNT] += 1 + else: + data[EXCEEDED_COUNT] = 1 + bucket[key].update(data) + await self.storage.set_bucket(chat=chat, user=user, bucket=bucket) + + if not result and not no_error: + # Raise if it is allowed + raise Throttled(key=key, chat=chat, user=user, **data) + return result + + async def check_key(self, key, chat=None, user=None): + """ + Get information about key in bucket + + :param key: + :param chat: + :param user: + :return: + """ + if not self.storage.has_bucket(): + raise RuntimeError('This storage does not provide Leaky Bucket') + + if user is None and chat is None: + user = types.User.current() + chat = types.Chat.current() + + bucket = await self.storage.get_bucket(chat=chat, user=user) + data = bucket.get(key, {}) + return Throttled(key=key, chat=chat, user=user, **data) + + async def release_key(self, key, chat=None, user=None): + """ + Release blocked key + + :param key: + :param chat: + :param user: + :return: + """ + if not self.storage.has_bucket(): + raise RuntimeError('This storage does not provide Leaky Bucket') + + if user is None and chat is None: + user = types.User.current() + chat = types.Chat.current() + + bucket = await self.storage.get_bucket(chat=chat, user=user) + if bucket and key in bucket: + del bucket['key'] + await self.storage.set_bucket(chat=chat, user=user, bucket=bucket) + return True + return False + + def async_task(self, func): + """ + Execute handler as task and return None. + Use this decorator for slow handlers (with timeouts) + + .. code-block:: python3 + + @dp.message_handler(commands=['command']) + @dp.async_task + async def cmd_with_timeout(message: types.Message): + await asyncio.sleep(120) + return SendMessage(message.chat.id, 'KABOOM').reply(message) + + :param func: + :return: + """ + + def process_response(task): + try: + response = task.result() + except Exception as e: + self.loop.create_task( + self.errors_handlers.notify(self, types.Update.current(), e)) + else: + if isinstance(response, BaseResponse): + self.loop.create_task(response.execute_response(self.bot)) + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + task = self.loop.create_task(func(*args, **kwargs)) + task.add_done_callback(process_response) + + return wrapper + + def _wrap_async_task(self, callback, run_task=None) -> callable: + if run_task is None: + run_task = self.run_tasks_by_default + + if run_task: + return self.async_task(callback) + return callback + + +dispatcher: ContextVar[Dispatcher] = ContextVar('dispatcher_instance', default=None) diff --git a/aiogram/dispatcher/filters.py b/aiogram/dispatcher/filters.py deleted file mode 100644 index e69de29b..00000000 diff --git a/aiogram/dispatcher/filters/builtin.py b/aiogram/dispatcher/filters/builtin.py index 00725b5b..135a7338 100644 --- a/aiogram/dispatcher/filters/builtin.py +++ b/aiogram/dispatcher/filters/builtin.py @@ -1,9 +1,9 @@ import asyncio import re +from _contextvars import ContextVar from aiogram.dispatcher.filters.filters import BaseFilter, Filter, check_filter from aiogram.types import CallbackQuery, ContentType, Message -from aiogram.utils import context USER_STATE = 'USER_STATE' @@ -130,6 +130,8 @@ class StateFilter(BaseFilter): """ key = 'state' + ctx_state = ContextVar('user_state') + def __init__(self, dispatcher, state): super().__init__(dispatcher) if isinstance(state, str): @@ -143,14 +145,16 @@ class StateFilter(BaseFilter): if self.state == '*': return True - if context.check_value(USER_STATE): - context_state = context.get_value(USER_STATE) - return self.state == context_state - else: + try: + return self.state == self.ctx_state.get() + except LookupError: chat, user = self.get_target(obj) if chat or user: - return await self.dispatcher.storage.get_state(chat=chat, user=user) in self.state + state = await self.dispatcher.storage.get_state(chat=chat, user=user) in self.state + self.ctx_state.set(state) + return state == self.state + return False diff --git a/aiogram/dispatcher/filters/factory.py b/aiogram/dispatcher/filters/factory.py index 949318ee..88d1f141 100644 --- a/aiogram/dispatcher/filters/factory.py +++ b/aiogram/dispatcher/filters/factory.py @@ -4,7 +4,6 @@ from .filters import AbstractFilter, FilterRecord from ..handler import Handler -# TODO: provide to set default filters (Like state. It will be always be added to filters set) # TODO: move check_filter/check_filters functions to FiltersFactory class class FiltersFactory: @@ -18,15 +17,17 @@ class FiltersFactory: def bind(self, callback: typing.Union[typing.Callable, AbstractFilter], validator: typing.Optional[typing.Callable] = None, - event_handlers: typing.Optional[typing.List[Handler]] = None): + event_handlers: typing.Optional[typing.List[Handler]] = None, + exclude_event_handlers: typing.Optional[typing.Iterable[Handler]] = None): """ Register filter :param callback: callable or subclass of :obj:`AbstractFilter` :param validator: custom validator. :param event_handlers: list of instances of :obj:`Handler` + :param exclude_event_handlers: list of excluded event handlers (:obj:`Handler`) """ - record = FilterRecord(callback, validator, event_handlers) + record = FilterRecord(callback, validator, event_handlers, exclude_event_handlers) self._registered.append(record) def unbind(self, callback: typing.Union[typing.Callable, AbstractFilter]): @@ -52,17 +53,21 @@ class FiltersFactory: filters_set = [] if custom_filters: filters_set.extend(custom_filters) - if full_config: - filters_set.extend(self._resolve_registered(self._dispatcher, event_handler, - {k: v for k, v in full_config.items() if v is not None})) + filters_set.extend(self._resolve_registered(event_handler, + {k: v for k, v in full_config.items() if v is not None})) + return filters_set - def _resolve_registered(self, dispatcher, event_handler, full_config) -> typing.Generator: - for record in self._registered: - if not full_config: - break + def _resolve_registered(self, event_handler, full_config) -> typing.Generator: + """ + Resolve registered filters - filter_ = record.resolve(dispatcher, event_handler, full_config) + :param event_handler: + :param full_config: + :return: + """ + for record in self._registered: + filter_ = record.resolve(self._dispatcher, event_handler, full_config) if filter_: yield filter_ diff --git a/aiogram/dispatcher/filters/filters.py b/aiogram/dispatcher/filters/filters.py index 84e0c669..bbdf909f 100644 --- a/aiogram/dispatcher/filters/filters.py +++ b/aiogram/dispatcher/filters/filters.py @@ -72,6 +72,10 @@ class FilterRecord: return config = self.resolver(full_config) if config: + for key in config: + if key in full_config: + full_config.pop(key) + return self.callback(dispatcher, **config) def _check_event_handler(self, event_handler) -> bool: diff --git a/aiogram/dispatcher/handler.py b/aiogram/dispatcher/handler.py index bee98981..58b285a6 100644 --- a/aiogram/dispatcher/handler.py +++ b/aiogram/dispatcher/handler.py @@ -1,6 +1,3 @@ -from ..utils import context - - class SkipHandler(BaseException): pass @@ -70,7 +67,7 @@ class Handler: if await check_filters(filters, args): try: if self.middleware_key: - context.set_value('handler', handler) + # context.set_value('handler', handler) await self.dispatcher.middleware.trigger(f"process_{self.middleware_key}", args) response = await handler(*args) if response is not None: diff --git a/aiogram/dispatcher/webhook.py b/aiogram/dispatcher/webhook.py index ca717202..833da19e 100644 --- a/aiogram/dispatcher/webhook.py +++ b/aiogram/dispatcher/webhook.py @@ -8,11 +8,13 @@ from typing import Dict, List, Optional, Union from aiohttp import web +from aiogram import Bot +from aiogram.bot import bot +from aiogram.dispatcher import dispatcher from .. import types from ..bot import api from ..types import ParseMode from ..types.base import Boolean, Float, Integer, String -from ..utils import context from ..utils import helper, markdown from ..utils import json from ..utils.deprecated import warn_deprecated as warn @@ -88,8 +90,8 @@ class WebhookRequestHandler(web.View): """ dp = self.request.app[BOT_DISPATCHER_KEY] try: - context.set_value('dispatcher', dp) - context.set_value('bot', dp.bot) + dispatcher.set(dp) + bot.bot.set(dp.bot) except RuntimeError: pass return dp @@ -116,9 +118,9 @@ class WebhookRequestHandler(web.View): """ self.validate_ip() - context.update_state({'CALLER': WEBHOOK, - WEBHOOK_CONNECTION: True, - WEBHOOK_REQUEST: self.request}) + # context.update_state({'CALLER': WEBHOOK, + # WEBHOOK_CONNECTION: True, + # WEBHOOK_REQUEST: self.request}) dispatcher = self.get_dispatcher() update = await self.parse_update(dispatcher.bot) @@ -170,7 +172,7 @@ class WebhookRequestHandler(web.View): if fut.done(): return fut.result() else: - context.set_value(WEBHOOK_CONNECTION, False) + # context.set_value(WEBHOOK_CONNECTION, False) fut.remove_done_callback(cb) fut.add_done_callback(self.respond_via_request) finally: @@ -195,7 +197,7 @@ class WebhookRequestHandler(web.View): results = task.result() except Exception as e: loop.create_task( - dispatcher.errors_handlers.notify(dispatcher, context.get_value('update_object'), e)) + dispatcher.errors_handlers.notify(dispatcher, types.Update.current(), e)) else: response = self.get_response(results) if response is not None: @@ -242,7 +244,7 @@ class WebhookRequestHandler(web.View): ip_address, accept = self.check_ip() if not accept: raise web.HTTPUnauthorized() - context.set_value('TELEGRAM_IP', ip_address) + # context.set_value('TELEGRAM_IP', ip_address) def configure_app(dispatcher, app: web.Application, path=DEFAULT_WEB_PATH): @@ -332,8 +334,8 @@ class BaseResponse: async def __call__(self, bot=None): if bot is None: - from aiogram.dispatcher import ctx - bot = ctx.get_bot() + from aiogram import Bot + bot = Bot.current() return await self.execute_response(bot) async def __aenter__(self): @@ -426,7 +428,7 @@ class ParseModeMixin: :return: """ - bot = context.get_value('bot', None) + bot = Bot.current() if bot is not None: return bot.parse_mode diff --git a/aiogram/types/base.py b/aiogram/types/base.py index 4451fc36..9982ad35 100644 --- a/aiogram/types/base.py +++ b/aiogram/types/base.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import io import typing +from contextvars import ContextVar from typing import TypeVar from .fields import BaseField @@ -53,6 +56,8 @@ class MetaTelegramObject(type): setattr(cls, ALIASES_ATTR_NAME, aliases) mcs._objects[cls.__name__] = cls + + cls._current = ContextVar('current_' + cls.__name__, default=None) # Maybe need to set default=None? return cls @property @@ -88,6 +93,14 @@ class TelegramObject(metaclass=MetaTelegramObject): if value.default and key not in self.values: self.values[key] = value.default + @classmethod + def current(cls): + return cls._current.get() + + @classmethod + def set_current(cls, obj: TelegramObject): + return cls._current.set(obj) + @property def conf(self) -> typing.Dict[str, typing.Any]: return self._conf diff --git a/aiogram/types/chat.py b/aiogram/types/chat.py index ae70c519..947add4d 100644 --- a/aiogram/types/chat.py +++ b/aiogram/types/chat.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import asyncio import typing +from contextvars import ContextVar from . import base from . import fields @@ -64,7 +67,7 @@ class Chat(base.TelegramObject): if as_html: return markdown.hlink(name, self.user_url) return markdown.link(name, self.user_url) - + async def get_url(self): """ Use this method to get chat link. @@ -507,8 +510,8 @@ class ChatActions(helper.Helper): @classmethod async def _do(cls, action: str, sleep=None): - from ..dispatcher.ctx import get_bot, get_chat - await get_bot().send_chat_action(get_chat(), action) + from aiogram import Bot + await Bot.current().send_chat_action(Chat.current(), action) if sleep: await asyncio.sleep(sleep) diff --git a/aiogram/types/message.py b/aiogram/types/message.py index e4f3e4b1..94cc9170 100644 --- a/aiogram/types/message.py +++ b/aiogram/types/message.py @@ -190,7 +190,7 @@ class Message(base.TelegramObject): return text async def reply(self, text, parse_mode=None, disable_web_page_preview=None, - disable_notification=None, reply_markup=None, reply=True) -> 'Message': + disable_notification=None, reply_markup=None, reply=True) -> Message: """ Reply to this message diff --git a/aiogram/types/update.py b/aiogram/types/update.py index 879a0b89..2753ae5f 100644 --- a/aiogram/types/update.py +++ b/aiogram/types/update.py @@ -1,7 +1,5 @@ from __future__ import annotations -from contextvars import ContextVar - from . import base from . import fields from .callback_query import CallbackQuery @@ -12,8 +10,6 @@ from .pre_checkout_query import PreCheckoutQuery from .shipping_query import ShippingQuery from ..utils import helper -current_update: ContextVar[Update] = ContextVar('current_update_object', default=None) - class Update(base.TelegramObject): """ @@ -33,14 +29,6 @@ class Update(base.TelegramObject): shipping_query: ShippingQuery = fields.Field(base=ShippingQuery) pre_checkout_query: PreCheckoutQuery = fields.Field(base=PreCheckoutQuery) - @classmethod - def current(cls): - return current_update.get() - - @classmethod - def set_current(cls, update: Update): - return current_update.set(update) - def __hash__(self): return self.update_id diff --git a/aiogram/types/user.py b/aiogram/types/user.py index c4c64844..441c275f 100644 --- a/aiogram/types/user.py +++ b/aiogram/types/user.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import babel from . import base diff --git a/aiogram/utils/context.py b/aiogram/utils/context.py deleted file mode 100644 index 376d9aa9..00000000 --- a/aiogram/utils/context.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -You need to setup task factory: - >>> from aiogram.utils import context - >>> loop = asyncio.get_event_loop() - >>> loop.set_task_factory(context.task_factory) -""" - -import asyncio -import typing - -CONFIGURED = '@CONFIGURED_TASK_FACTORY' - - -def task_factory(loop: asyncio.BaseEventLoop, coro: typing.Coroutine): - """ - Task factory for implementing context processor - - :param loop: - :param coro: - :return: new task - :rtype: :obj:`asyncio.Task` - """ - # Is not allowed when loop is closed. - if loop.is_closed(): - raise RuntimeError('Event loop is closed.') - - task = asyncio.Task(coro, loop=loop) - - # Hide factory - if task._source_traceback: - del task._source_traceback[-1] - - try: - task.context = asyncio.Task.current_task().context.copy() - except AttributeError: - task.context = {CONFIGURED: True} - - return task - - -def get_current_state() -> typing.Dict: - """ - Get current execution context from task - - :return: context - :rtype: :obj:`dict` - """ - task = asyncio.Task.current_task() - if task is None: - raise RuntimeError('Can be used only in Task context.') - context_ = getattr(task, 'context', None) - if context_ is None: - context_ = task.context = {} - return context_ - - -def get_value(key, default=None): - """ - Get value from task - - :param key: - :param default: - :return: value - """ - return get_current_state().get(key, default) - - -def check_value(key): - """ - Key in context? - - :param key: - :return: - """ - return key in get_current_state() - - -def set_value(key, value): - """ - Set value - - :param key: - :param value: - :return: - """ - get_current_state()[key] = value - - -def del_value(key): - """ - Remove value from context - - :param key: - :return: - """ - del get_current_state()[key] - - -def update_state(data=None, **kwargs): - """ - Update multiple state items - - :param data: - :param kwargs: - :return: - """ - if data is None: - data = {} - state = get_current_state() - state.update(data, **kwargs) - - -def check_configured(): - """ - Check loop is configured - :return: - """ - return get_value(CONFIGURED) - - -class _Context: - """ - Other things for interactions with the execution context. - """ - - def __getitem__(self, item): - return get_value(item) - - def __setitem__(self, key, value): - set_value(key, value) - - def __delitem__(self, key): - del_value(key) - - @staticmethod - def get_context(): - return get_current_state() - - -context = _Context() diff --git a/aiogram/utils/executor.py b/aiogram/utils/executor.py index 57cb9a65..ee231f95 100644 --- a/aiogram/utils/executor.py +++ b/aiogram/utils/executor.py @@ -6,7 +6,6 @@ from warnings import warn from aiohttp import web -from . import context from ..bot.api import log from ..dispatcher.webhook import BOT_DISPATCHER_KEY, WebhookRequestHandler @@ -179,13 +178,13 @@ class Executor: self._check_frozen() self._freeze = True - self.loop.set_task_factory(context.task_factory) + # self.loop.set_task_factory(context.task_factory) def _prepare_webhook(self, path=None, handler=WebhookRequestHandler): self._check_frozen() self._freeze = True - self.loop.set_task_factory(context.task_factory) + # self.loop.set_task_factory(context.task_factory) app = self._web_app if app is None: diff --git a/examples/middleware_and_antiflood.py b/examples/middleware_and_antiflood.py index 2d0f002c..d0a8ad08 100644 --- a/examples/middleware_and_antiflood.py +++ b/examples/middleware_and_antiflood.py @@ -2,9 +2,9 @@ import asyncio from aiogram import Bot, types from aiogram.contrib.fsm_storage.redis import RedisStorage2 -from aiogram.dispatcher import CancelHandler, DEFAULT_RATE_LIMIT, Dispatcher, ctx +from aiogram.dispatcher import CancelHandler, DEFAULT_RATE_LIMIT, Dispatcher from aiogram.dispatcher.middlewares import BaseMiddleware -from aiogram.utils import context, executor +from aiogram.utils import executor from aiogram.utils.exceptions import Throttled TOKEN = 'BOT TOKEN HERE' @@ -53,10 +53,10 @@ class ThrottlingMiddleware(BaseMiddleware): :param message: """ # Get current handler - handler = context.get_value('handler') + # handler = context.get_value('handler') # Get dispatcher from context - dispatcher = ctx.get_dispatcher() + dispatcher = Dispatcher.current() # If handler was configured, get rate limit and key from handler if handler: @@ -83,8 +83,8 @@ class ThrottlingMiddleware(BaseMiddleware): :param message: :param throttled: """ - handler = context.get_value('handler') - dispatcher = ctx.get_dispatcher() + # handler = context.get_value('handler') + dispatcher = Dispatcher.current() if handler: key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}") else: