Implement connectors mechanism.

This commit is contained in:
Alex Root Junior 2018-07-08 18:40:54 +03:00
parent afa7ef9ebe
commit cd4fee5eaa
3 changed files with 144 additions and 107 deletions

View file

@ -1,8 +1,14 @@
import abc
import asyncio
import logging
import os
import ssl
from asyncio import AbstractEventLoop
from http import HTTPStatus
from typing import Optional, Tuple
import aiohttp
import certifi
from .. import types
from ..utils import exceptions
@ -34,58 +40,138 @@ def check_token(token: str) -> bool:
return True
async def _check_result(method_name, response):
class AbstractConnector(abc.ABC):
"""
Checks whether `result` is a valid API response.
A result is considered invalid if:
- The server returned an HTTP response code other than 200
- The content of the result is invalid JSON.
- The method call was unsuccessful (The JSON 'ok' field equals False)
:raises ApiException: if one of the above listed cases is applicable
:param method_name: The name of the method called
:param response: The returned response of the method request
:return: The result parsed to a JSON dictionary.
Abstract connector class
"""
body = await response.text()
log.debug(f"Response for {method_name}: [{response.status}] {body}")
if response.content_type != 'application/json':
raise exceptions.NetworkError(f"Invalid response with content type {response.content_type}: \"{body}\"")
def __init__(self, loop: Optional[AbstractEventLoop] = None, *args, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
self._args = args
self._kwargs = kwargs
try:
result_json = await response.json(loads=json.loads)
except ValueError:
result_json = {}
async def make_request(self, token, method, data=None, files=None, **kwargs):
log.debug(f"Make request: '{method}' with data: {data} and files {files}")
url = Methods.api_url(token=token, method=method)
content_type, status, data = await self.request(url, data, files, **kwargs)
return await self.check_result(method, content_type, status, data)
description = result_json.get('description') or body
parameters = types.ResponseParameters(**result_json.get('parameters', {}) or {})
@abc.abstractmethod
async def request(self, url, data=None, files=None, **kwargs) -> Tuple[str, int, str]:
pass
if HTTPStatus.OK <= response.status <= HTTPStatus.IM_USED:
return result_json.get('result')
elif parameters.retry_after:
raise exceptions.RetryAfter(parameters.retry_after)
elif parameters.migrate_to_chat_id:
raise exceptions.MigrateToChat(parameters.migrate_to_chat_id)
elif response.status == HTTPStatus.BAD_REQUEST:
exceptions.BadRequest.detect(description)
elif response.status == HTTPStatus.NOT_FOUND:
exceptions.NotFound.detect(description)
elif response.status == HTTPStatus.CONFLICT:
exceptions.ConflictError.detect(description)
elif response.status in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]:
exceptions.Unauthorized.detect(description)
elif response.status == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
raise exceptions.NetworkError('File too large for uploading. '
'Check telegram api limits https://core.telegram.org/bots/api#senddocument')
elif response.status >= HTTPStatus.INTERNAL_SERVER_ERROR:
if 'restart' in description:
raise exceptions.RestartingTelegram()
raise exceptions.TelegramAPIError(description)
raise exceptions.TelegramAPIError(f"{description} [{response.status}]")
async def check_result(self, method_name: str, content_type: str, status_code: int, body: str):
"""
Checks whether `result` is a valid API response.
A result is considered invalid if:
- The server returned an HTTP response code other than 200
- The content of the result is invalid JSON.
- The method call was unsuccessful (The JSON 'ok' field equals False)
:raises ApiException: if one of the above listed cases is applicable
:param method_name: The name of the method called
:return: The result parsed to a JSON dictionary.
"""
log.debug(f"Response for {method_name}: [{status_code}] {body}")
if content_type != 'application/json':
raise exceptions.NetworkError(f"Invalid response with content type {content_type}: \"{body}\"")
try:
result_json = json.loads(body)
except ValueError:
result_json = {}
description = result_json.get('description') or body
parameters = types.ResponseParameters(**result_json.get('parameters', {}) or {})
if HTTPStatus.OK <= status_code <= HTTPStatus.IM_USED:
return result_json.get('result')
elif parameters.retry_after:
raise exceptions.RetryAfter(parameters.retry_after)
elif parameters.migrate_to_chat_id:
raise exceptions.MigrateToChat(parameters.migrate_to_chat_id)
elif status_code == HTTPStatus.BAD_REQUEST:
exceptions.BadRequest.detect(description)
elif status_code == HTTPStatus.NOT_FOUND:
exceptions.NotFound.detect(description)
elif status_code == HTTPStatus.CONFLICT:
exceptions.ConflictError.detect(description)
elif status_code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]:
exceptions.Unauthorized.detect(description)
elif status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
raise exceptions.NetworkError('File too large for uploading. '
'Check telegram api limits https://core.telegram.org/bots/api#senddocument')
elif status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
if 'restart' in description:
raise exceptions.RestartingTelegram()
raise exceptions.TelegramAPIError(description)
raise exceptions.TelegramAPIError(f"{description} [{status_code}]")
@abc.abstractmethod
async def close(self):
pass
def _guess_filename(obj):
class AiohttpConnector(AbstractConnector):
def __init__(self, loop: Optional[AbstractEventLoop] = None,
proxy: Optional[str] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None,
connections_limit: Optional[int] = None, *args, **kwargs):
super(AiohttpConnector, self).__init__(loop, *args, **kwargs)
self.proxy = proxy
self.proxy_auth = proxy_auth
# aiohttp main session
ssl_context = ssl.create_default_context(cafile=certifi.where())
if isinstance(proxy, str) and proxy.startswith('socks5://'):
from aiosocksy.connector import ProxyClientRequest, ProxyConnector
connector = ProxyConnector(limit=connections_limit, ssl_context=ssl_context,
loop=self.loop)
request_class = ProxyClientRequest
else:
connector = aiohttp.TCPConnector(limit=connections_limit, ssl_context=ssl_context,
loop=self.loop)
request_class = aiohttp.ClientRequest
self.session = aiohttp.ClientSession(connector=connector, request_class=request_class,
loop=self.loop, json_serialize=json.dumps)
async def request(self, url, data=None, files=None, **kwargs):
"""
Make request to API
That make request with Content-Type:
application/x-www-form-urlencoded - For simple request
and multipart/form-data - for files uploading
https://core.telegram.org/bots/api#making-requests
:param url: requested URL
:type url: :obj:`str`
:param data: request payload
:type data: :obj:`dict`
:param files: files
:type files: :obj:`dict`
:return: result
:rtype :obj:`bool` or :obj:`dict`
"""
req = compose_data(data, files)
try:
async with self.session.post(url, data=req, **kwargs) as response:
return response.content_type, response.status, await response.text()
except aiohttp.ClientError as e:
raise exceptions.NetworkError(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
async def close(self):
if self.session and not self.session.closed:
await self.session.close()
def guess_filename(obj):
"""
Get file name from object
@ -97,7 +183,7 @@ def _guess_filename(obj):
return os.path.basename(name)
def _compose_data(params=None, files=None):
def compose_data(params=None, files=None):
"""
Prepare request data
@ -121,47 +207,13 @@ def _compose_data(params=None, files=None):
elif isinstance(f, types.InputFile):
filename, fileobj = f.filename, f.file
else:
filename, fileobj = _guess_filename(f) or key, f
filename, fileobj = guess_filename(f) or key, f
data.add_field(key, fileobj, filename=filename)
return data
async def request(session, token, method, data=None, files=None, **kwargs) -> bool or dict:
"""
Make request to API
That make request with Content-Type:
application/x-www-form-urlencoded - For simple request
and multipart/form-data - for files uploading
https://core.telegram.org/bots/api#making-requests
:param session: HTTP Client session
:type session: :obj:`aiohttp.ClientSession`
:param token: BOT token
:type token: :obj:`str`
:param method: API method
:type method: :obj:`str`
:param data: request payload
:type data: :obj:`dict`
:param files: files
:type files: :obj:`dict`
:return: result
:rtype :obj:`bool` or :obj:`dict`
"""
log.debug("Make request: '{0}' with data: {1} and files {2}".format(
method, data or {}, files or {}))
data = _compose_data(data, files)
url = Methods.api_url(token=token, method=method)
try:
async with session.post(url, data=data, **kwargs) as response:
return await _check_result(method, response)
except aiohttp.ClientError as e:
raise exceptions.NetworkError(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
class Methods(Helper):
"""
Helper for Telegram API Methods listed on https://core.telegram.org/bots/api

View file

@ -19,6 +19,7 @@ class BaseBot:
def __init__(self, token: base.String,
loop: Optional[Union[asyncio.BaseEventLoop, asyncio.AbstractEventLoop]] = None,
connector: Optional[api.AbstractConnector] = None,
connections_limit: Optional[base.Integer] = None,
proxy: Optional[base.String] = None, proxy_auth: Optional[aiohttp.BasicAuth] = None,
validate_token: Optional[base.Boolean] = True,
@ -47,45 +48,30 @@ class BaseBot:
api.check_token(token)
self.__token = token
# Proxy settings
self.proxy = proxy
self.proxy_auth = proxy_auth
if connector and any((connections_limit, proxy, proxy_auth)):
raise ValueError('Connector instance can\'t be passed with connection settings in one time.')
elif connector:
self.connector = connector
else:
connector = api.AiohttpConnector(loop=loop, proxy=proxy, proxy_auth=proxy_auth,
connections_limit=connections_limit)
self.connector = connector
# Asyncio loop instance
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
# aiohttp main session
ssl_context = ssl.create_default_context(cafile=certifi.where())
if isinstance(proxy, str) and proxy.startswith('socks5://'):
from aiosocksy.connector import ProxyClientRequest, ProxyConnector
connector = ProxyConnector(limit=connections_limit, ssl_context=ssl_context, loop=self.loop)
request_class = ProxyClientRequest
else:
connector = aiohttp.TCPConnector(limit=connections_limit, ssl_context=ssl_context,
loop=self.loop)
request_class = aiohttp.ClientRequest
self.session = aiohttp.ClientSession(connector=connector, request_class=request_class,
loop=self.loop, json_serialize=json.dumps)
# Data stored in bot instance
self._data = {}
self.parse_mode = parse_mode
def __del__(self):
# asyncio.ensure_future(self.close())
pass
async def close(self):
"""
Close all client sessions
"""
if self.session and not self.session.closed:
await self.session.close()
await self.connector.close()
async def request(self, method: base.String,
data: Optional[Dict] = None,
@ -105,8 +91,7 @@ class BaseBot:
:rtype: Union[List, Dict]
:raise: :obj:`aiogram.exceptions.TelegramApiError`
"""
return await api.request(self.session, self.__token, method, data, files,
proxy=self.proxy, proxy_auth=self.proxy_auth)
return await self.connector.make_request(self.__token, method, data, files)
async def download_file(self, file_path: base.String,
destination: Optional[base.InputFile] = None,

View file

@ -54,7 +54,7 @@ class InputFile(base.TelegramObject):
@property
def filename(self):
if self._filename is None:
self._filename = api._guess_filename(self._file)
self._filename = api.guess_filename(self._file)
return self._filename
@filename.setter