diff --git a/aiogram/bot/base.py b/aiogram/bot/base.py index bbec8b31..0617dee9 100644 --- a/aiogram/bot/base.py +++ b/aiogram/bot/base.py @@ -62,8 +62,11 @@ class BaseBot: self._data = {} def __del__(self): + self.close() + + def close(self): """ - When bot object is deleting - need close all sessions + Close all client sessions """ for session in self._temp_sessions: if not session.closed: @@ -71,17 +74,19 @@ class BaseBot: if self.session and not self.session.closed: self.session.close() - def create_temp_session(self, limit: int = 1) -> aiohttp.ClientSession: + def create_temp_session(self, limit: int = 1, force_close: bool = False) -> aiohttp.ClientSession: """ Create temporary session :param limit: Limit of connections :type limit: :obj:`int` + :param force_close: Set to True to force close and do reconnect after each request (and between redirects). + :type force_close: :obj:`bool` :return: New session :rtype: :obj:`aiohttp.TCPConnector` """ session = aiohttp.ClientSession( - connector=aiohttp.TCPConnector(limit=limit, force_close=True), + connector=aiohttp.TCPConnector(limit=limit, force_close=force_close), loop=self.loop, json_serialize=json.dumps) self._temp_sessions.append(session) return session diff --git a/aiogram/dispatcher/__init__.py b/aiogram/dispatcher/__init__.py index a7323606..9cf328be 100644 --- a/aiogram/dispatcher/__init__.py +++ b/aiogram/dispatcher/__init__.py @@ -71,9 +71,11 @@ class Dispatcher: self.updates_handler.register(self.process_update) self._polling = False + self._closed = True + self._close_waiter = loop.create_future() def __del__(self): - self._polling = False + self.stop_polling() @property def data(self): @@ -243,24 +245,26 @@ class Dispatcher: self._polling = True offset = None - while self._polling: - try: - updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) - except NetworkError: - log.exception('Cause exception while getting updates.') - await asyncio.sleep(15) - continue + try: + while self._polling: + try: + updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) + except NetworkError: + log.exception('Cause exception while getting updates.') + await asyncio.sleep(15) + continue - if updates: - log.debug(f"Received {len(updates)} updates.") - offset = updates[-1].update_id + 1 + if updates: + log.debug(f"Received {len(updates)} updates.") + offset = updates[-1].update_id + 1 - self.loop.create_task(self._process_polling_updates(updates)) + self.loop.create_task(self._process_polling_updates(updates)) - if relax: - await asyncio.sleep(relax) - - log.warning('Polling is stopped.') + if relax: + await asyncio.sleep(relax) + finally: + self._close_waiter.set_result(None) + log.warning('Polling is stopped.') async def _process_polling_updates(self, updates): """ @@ -287,12 +291,21 @@ class Dispatcher: def stop_polling(self): """ Break long-polling process. + :return: """ if self._polling: - log.info('Stop polling.') + log.info('Stop polling...') self._polling = False + async def wait_closed(self): + """ + Wait closing the long polling + + :return: + """ + await asyncio.shield(self._close_waiter, loop=self.loop) + @deprecated('The old method was renamed to `is_polling`') def is_pooling(self): return self.is_polling() diff --git a/aiogram/utils/executor.py b/aiogram/utils/executor.py index 45fdf7da..2f31875c 100644 --- a/aiogram/utils/executor.py +++ b/aiogram/utils/executor.py @@ -34,6 +34,7 @@ async def _shutdown(dispatcher: Dispatcher, callback=None): if dispatcher.is_polling(): dispatcher.stop_polling() + await dispatcher.wait_closed() await dispatcher.storage.close() await dispatcher.storage.wait_closed()