Implement slow-mode polling (If need to save ordering of updates)

This commit is contained in:
Alex Root Junior 2018-09-13 22:15:00 +03:00
parent ce2d862333
commit 7fe12ba3d2

View file

@ -142,17 +142,24 @@ class Dispatcher:
updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1) updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1)
return total return total
async def process_updates(self, updates): async def process_updates(self, updates, fast: typing.Optional[bool] = True):
""" """
Process list of updates Process list of updates
:param updates: :param updates:
:param fast:
:return: :return:
""" """
tasks = [] if fast:
tasks = []
for update in updates:
tasks.append(self.updates_handler.notify(update))
return await asyncio.gather(*tasks)
results = []
for update in updates: for update in updates:
tasks.append(self.updates_handler.notify(update)) results.append(await self.updates_handler.notify(update))
return await asyncio.gather(*tasks) return results
async def process_update(self, update: types.Update): async def process_update(self, update: types.Update):
""" """
@ -216,7 +223,8 @@ class Dispatcher:
return await self.bot.delete_webhook() return await self.bot.delete_webhook()
async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None): async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None,
fast: typing.Optional[bool] = True):
""" """
Start long-polling Start long-polling
@ -224,6 +232,7 @@ class Dispatcher:
:param relax: :param relax:
:param limit: :param limit:
:param reset_webhook: :param reset_webhook:
:param fast:
:return: :return:
""" """
if self._polling: if self._polling:
@ -255,7 +264,7 @@ class Dispatcher:
log.debug(f"Received {len(updates)} updates.") log.debug(f"Received {len(updates)} updates.")
offset = updates[-1].update_id + 1 offset = updates[-1].update_id + 1
self.loop.create_task(self._process_polling_updates(updates)) self.loop.create_task(self._process_polling_updates(updates, fast))
if relax: if relax:
await asyncio.sleep(relax) await asyncio.sleep(relax)
@ -263,14 +272,15 @@ class Dispatcher:
self._close_waiter._set_result(None) self._close_waiter._set_result(None)
log.warning('Polling is stopped.') log.warning('Polling is stopped.')
async def _process_polling_updates(self, updates): async def _process_polling_updates(self, updates, fast: typing.Optional[bool] = True):
""" """
Process updates received from long-polling. Process updates received from long-polling.
:param updates: list of updates. :param updates: list of updates.
:param fast:
""" """
need_to_call = [] need_to_call = []
for responses in itertools.chain.from_iterable(await self.process_updates(updates)): for responses in itertools.chain.from_iterable(await self.process_updates(updates, fast)):
for response in responses: for response in responses:
if not isinstance(response, BaseResponse): if not isinstance(response, BaseResponse):
continue continue