More graceful stopping of long polling.

This commit is contained in:
Alex Root Junior 2018-01-06 14:31:59 +02:00
parent 62d5dda84d
commit 16a61b092f
3 changed files with 39 additions and 20 deletions

View file

@ -62,8 +62,11 @@ class BaseBot:
self._data = {}
def __del__(self):
self.close()
def close(self):
"""
When bot object is deleting - need close all sessions
Close all client sessions
"""
for session in self._temp_sessions:
if not session.closed:
@ -71,17 +74,19 @@ class BaseBot:
if self.session and not self.session.closed:
self.session.close()
def create_temp_session(self, limit: int = 1) -> aiohttp.ClientSession:
def create_temp_session(self, limit: int = 1, force_close: bool = False) -> aiohttp.ClientSession:
"""
Create temporary session
:param limit: Limit of connections
:type limit: :obj:`int`
:param force_close: Set to True to force close and do reconnect after each request (and between redirects).
:type force_close: :obj:`bool`
:return: New session
:rtype: :obj:`aiohttp.TCPConnector`
"""
session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=limit, force_close=True),
connector=aiohttp.TCPConnector(limit=limit, force_close=force_close),
loop=self.loop, json_serialize=json.dumps)
self._temp_sessions.append(session)
return session

View file

@ -71,9 +71,11 @@ class Dispatcher:
self.updates_handler.register(self.process_update)
self._polling = False
self._closed = True
self._close_waiter = loop.create_future()
def __del__(self):
self._polling = False
self.stop_polling()
@property
def data(self):
@ -243,24 +245,26 @@ class Dispatcher:
self._polling = True
offset = None
while self._polling:
try:
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout)
except NetworkError:
log.exception('Cause exception while getting updates.')
await asyncio.sleep(15)
continue
try:
while self._polling:
try:
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout)
except NetworkError:
log.exception('Cause exception while getting updates.')
await asyncio.sleep(15)
continue
if updates:
log.debug(f"Received {len(updates)} updates.")
offset = updates[-1].update_id + 1
if updates:
log.debug(f"Received {len(updates)} updates.")
offset = updates[-1].update_id + 1
self.loop.create_task(self._process_polling_updates(updates))
self.loop.create_task(self._process_polling_updates(updates))
if relax:
await asyncio.sleep(relax)
log.warning('Polling is stopped.')
if relax:
await asyncio.sleep(relax)
finally:
self._close_waiter.set_result(None)
log.warning('Polling is stopped.')
async def _process_polling_updates(self, updates):
"""
@ -287,12 +291,21 @@ class Dispatcher:
def stop_polling(self):
"""
Break long-polling process.
:return:
"""
if self._polling:
log.info('Stop polling.')
log.info('Stop polling...')
self._polling = False
async def wait_closed(self):
"""
Wait closing the long polling
:return:
"""
await asyncio.shield(self._close_waiter, loop=self.loop)
@deprecated('The old method was renamed to `is_polling`')
def is_pooling(self):
return self.is_polling()

View file

@ -34,6 +34,7 @@ async def _shutdown(dispatcher: Dispatcher, callback=None):
if dispatcher.is_polling():
dispatcher.stop_polling()
await dispatcher.wait_closed()
await dispatcher.storage.close()
await dispatcher.storage.wait_closed()