From e3f40c4beede1df1296288aa13d84787c9cd11eb Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Fri, 26 May 2017 10:55:07 +0300 Subject: [PATCH] Add skeleton for Dispatcher. --- aiogram/dispatcher/__init__.py | 62 ++++++++++++++++++++++++++++++++++ aiogram/dispatcher/handler.py | 44 ++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 aiogram/dispatcher/__init__.py create mode 100644 aiogram/dispatcher/handler.py diff --git a/aiogram/dispatcher/__init__.py b/aiogram/dispatcher/__init__.py new file mode 100644 index 00000000..d99775a8 --- /dev/null +++ b/aiogram/dispatcher/__init__.py @@ -0,0 +1,62 @@ +import asyncio +import logging + +from .handler import Handler +from ..bot import AIOGramBot + +log = logging.getLogger(__name__) + + +class Dispatcher: + def __init__(self, bot): + self.bot: AIOGramBot = bot + + self.last_update_id = 0 + self.updates = Handler(self) + self.messages = Handler(self) + self.commands = Handler(self) + + self.updates.register(self.process_update) + + self._pooling = False + + async def skip_updates(self): + total = 0 + updates = await self.bot.get_updates(offset=self.last_update_id, timeout=1) + while updates: + total += len(updates) + for update in updates: + if update.update_id > self.last_update_id: + self.last_update_id = update.update_id + updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1) + return total + + async def process_updates(self, updates): + for update in updates: + self.bot.loop.create_task(self.updates.notify(update)) + + async def process_update(self, update): + if update.message: + await self.messages.notify(update.message) + + async def start_pooling(self, timeout=20, relax=0.1): + if self._pooling: + raise RuntimeError('Pooling already started') + self._pooling = True + offset = None + while self._pooling: + try: + updates = await self.bot.get_updates(offset=offset, timeout=timeout) + except Exception as e: + log.exception('Cause exception while getting updates') + await asyncio.sleep(relax) + continue + + if updates: + offset = updates[-1].update_id + 1 + await self.process_updates(updates) + + await asyncio.sleep(relax) + + def stop_pooling(self): + self._pooling = False diff --git a/aiogram/dispatcher/handler.py b/aiogram/dispatcher/handler.py new file mode 100644 index 00000000..006c199e --- /dev/null +++ b/aiogram/dispatcher/handler.py @@ -0,0 +1,44 @@ +import inspect + + +async def check_filter(filter_, args, kwargs): + if inspect.iscoroutinefunction(filter_): + return await filter_(*args, **kwargs) + elif callable(filter_): + return filter_(*args, **kwargs) + else: + return True + + +async def check_filters(filters, args, kwargs): + if filters is not None: + for filter_ in filters: + f = await check_filter(filter_, args, kwargs) + if not f: + return False + return True + + +class Handler: + def __init__(self, dispatcher): + self.dispatcher = dispatcher + + self.handlers = [] + + def register(self, handler, filters=None): + if filters and not isinstance(filters, (list, tuple, set)): + filters = [filters] + self.handlers.append((filters, handler)) + + def unregister(self, handler): + for handler_with_filters in self.handlers: + _, registered = handler_with_filters + if handler is registered: + self.handlers.remove(handler_with_filters) + return True + raise ValueError('This handler is not registered!') + + async def notify(self, *args, **kwargs): + for filters, handler in self.handlers: + if await check_filters(filters, args, kwargs): + await handler(*args, **kwargs)