Implements opportunity to change request timeouts

This commit is contained in:
Alex RootJunior 2019-04-17 23:24:23 +03:00
parent 71c49fd08b
commit beca19b5e2
3 changed files with 71 additions and 8 deletions

View file

@ -1,11 +1,14 @@
import asyncio import asyncio
import contextlib
import io import io
import ssl import ssl
import typing import typing
from contextvars import ContextVar
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
import aiohttp import aiohttp
import certifi import certifi
from aiohttp.helpers import sentinel
from . import api from . import api
from ..types import ParseMode, base from ..types import ParseMode, base
@ -17,6 +20,7 @@ class BaseBot:
""" """
Base class for bot. It's raw bot. Base class for bot. It's raw bot.
""" """
_ctx_timeout = ContextVar('TelegramRequestTimeout')
def __init__( def __init__(
self, self,
@ -27,6 +31,7 @@ class BaseBot:
proxy_auth: Optional[aiohttp.BasicAuth] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None,
validate_token: Optional[base.Boolean] = True, validate_token: Optional[base.Boolean] = True,
parse_mode: typing.Optional[base.String] = None, parse_mode: typing.Optional[base.String] = None,
timeout: typing.Optional[typing.Union[base.Integer, base.Float]] = None
): ):
""" """
Instructions how to get Bot token is found here: https://core.telegram.org/bots#3-how-do-i-create-a-bot Instructions how to get Bot token is found here: https://core.telegram.org/bots#3-how-do-i-create-a-bot
@ -45,6 +50,8 @@ class BaseBot:
:type validate_token: :obj:`bool` :type validate_token: :obj:`bool`
:param parse_mode: You can set default parse mode :param parse_mode: You can set default parse mode
:type parse_mode: :obj:`str` :type parse_mode: :obj:`str`
:param timeout: Request timeout
:type timeout: :obj:`typing.Optional[typing.Union[base.Integer, base.Float]]`
:raise: when token is invalid throw an :obj:`aiogram.utils.exceptions.ValidationError` :raise: when token is invalid throw an :obj:`aiogram.utils.exceptions.ValidationError`
""" """
# Authentication # Authentication
@ -83,11 +90,51 @@ class BaseBot:
self.proxy_auth = None self.proxy_auth = None
else: else:
connector = aiohttp.TCPConnector(limit=connections_limit, ssl=ssl_context, loop=self.loop) connector = aiohttp.TCPConnector(limit=connections_limit, ssl=ssl_context, loop=self.loop)
self._timeout = None
self.timeout = timeout
self.session = aiohttp.ClientSession(connector=connector, loop=self.loop, json_serialize=json.dumps) self.session = aiohttp.ClientSession(connector=connector, loop=self.loop, json_serialize=json.dumps)
self.parse_mode = parse_mode self.parse_mode = parse_mode
@staticmethod
def _prepare_timeout(
value: typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]]
) -> typing.Optional[aiohttp.ClientTimeout]:
if value is None or isinstance(value, aiohttp.ClientTimeout):
return value
return aiohttp.ClientTimeout(total=value)
@property
def timeout(self):
timeout = self._ctx_timeout.get(self._timeout)
if timeout is None:
return sentinel
return timeout
@timeout.setter
def timeout(self, value):
self._timeout = self._prepare_timeout(value)
@timeout.deleter
def timeout(self):
self.timeout = None
@contextlib.contextmanager
def request_timeout(self, timeout):
"""
Context manager implements opportunity to change request timeout in current context
:param timeout:
:return:
"""
timeout = self._prepare_timeout(timeout)
token = self._ctx_timeout.set(timeout)
try:
yield
finally:
self._ctx_timeout.reset(token)
async def close(self): async def close(self):
""" """
Close all client sessions Close all client sessions
@ -113,11 +160,11 @@ class BaseBot:
:raise: :obj:`aiogram.exceptions.TelegramApiError` :raise: :obj:`aiogram.exceptions.TelegramApiError`
""" """
return await api.make_request(self.session, self.__token, method, data, files, return await api.make_request(self.session, self.__token, method, data, files,
proxy=self.proxy, proxy_auth=self.proxy_auth, **kwargs) proxy=self.proxy, proxy_auth=self.proxy_auth, timeout=self.timeout, **kwargs)
async def download_file(self, file_path: base.String, async def download_file(self, file_path: base.String,
destination: Optional[base.InputFile] = None, destination: Optional[base.InputFile] = None,
timeout: Optional[base.Integer] = 30, timeout: Optional[base.Integer] = sentinel,
chunk_size: Optional[base.Integer] = 65536, chunk_size: Optional[base.Integer] = 65536,
seek: Optional[base.Boolean] = True) -> Union[io.BytesIO, io.FileIO]: seek: Optional[base.Boolean] = True) -> Union[io.BytesIO, io.FileIO]:
""" """

View file

@ -5,6 +5,9 @@ import logging
import time import time
import typing import typing
import aiohttp
from aiohttp.helpers import sentinel
from .filters import Command, ContentTypeFilter, ExceptionsFilter, FiltersFactory, FuncFilter, HashTag, Regexp, \ from .filters import Command, ContentTypeFilter, ExceptionsFilter, FiltersFactory, FuncFilter, HashTag, Regexp, \
RegexpCommandsFilter, StateFilter, Text RegexpCommandsFilter, StateFilter, Text
from .handler import Handler from .handler import Handler
@ -209,8 +212,13 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
return await self.bot.delete_webhook() return await self.bot.delete_webhook()
async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None, async def start_polling(self,
fast: typing.Optional[bool] = True): timeout=20,
relax=0.1,
limit=None,
reset_webhook=None,
fast: typing.Optional[bool] = True,
error_sleep: int = 5):
""" """
Start long-polling Start long-polling
@ -238,12 +246,19 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
self._polling = True self._polling = True
offset = None offset = None
try: try:
current_request_timeout = self.bot.timeout
if current_request_timeout is not sentinel and timeout is not None:
request_timeout = aiohttp.ClientTimeout(total=current_request_timeout.total + timeout or 1)
else:
request_timeout = None
while self._polling: while self._polling:
try: try:
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout) with self.bot.request_timeout(request_timeout):
updates = await self.bot.get_updates(limit=limit, offset=offset, timeout=timeout)
except: except:
log.exception('Cause exception while getting updates.') log.exception('Cause exception while getting updates.')
await asyncio.sleep(15) await asyncio.sleep(error_sleep)
continue continue
if updates: if updates:
@ -254,6 +269,7 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
if relax: if relax:
await asyncio.sleep(relax) await asyncio.sleep(relax)
finally: finally:
self._close_waiter._set_result(None) self._close_waiter._set_result(None)
log.warning('Polling is stopped.') log.warning('Polling is stopped.')

View file

@ -23,7 +23,7 @@ def _setup_callbacks(executor, on_startup=None, on_shutdown=None):
def start_polling(dispatcher, *, loop=None, skip_updates=False, reset_webhook=True, def start_polling(dispatcher, *, loop=None, skip_updates=False, reset_webhook=True,
on_startup=None, on_shutdown=None, timeout=None, fast=True): on_startup=None, on_shutdown=None, timeout=20, fast=True):
""" """
Start bot in long-polling mode Start bot in long-polling mode
@ -291,7 +291,7 @@ class Executor:
self.set_webhook(webhook_path=webhook_path, request_handler=request_handler, route_name=route_name) self.set_webhook(webhook_path=webhook_path, request_handler=request_handler, route_name=route_name)
self.run_app(**kwargs) self.run_app(**kwargs)
def start_polling(self, reset_webhook=None, timeout=None, fast=True): def start_polling(self, reset_webhook=None, timeout=20, fast=True):
""" """
Start bot in long-polling mode Start bot in long-polling mode