Add stream_content to BaseSession and add tests

This commit is contained in:
gabbhack 2020-01-22 22:54:29 +05:00
parent 213f2a3c16
commit 7ab0db7991
3 changed files with 33 additions and 3 deletions

View file

@ -3,7 +3,7 @@ from __future__ import annotations
import abc import abc
import datetime import datetime
import json import json
from typing import Any, Callable, Optional, TypeVar, Union from typing import Any, AsyncGenerator, Callable, Optional, TypeVar, Union
from aiogram.utils.exceptions import TelegramAPIError from aiogram.utils.exceptions import TelegramAPIError
@ -44,6 +44,12 @@ class BaseSession(abc.ABC):
async def make_request(self, token: str, method: TelegramMethod[T]) -> T: # pragma: no cover async def make_request(self, token: str, method: TelegramMethod[T]) -> T: # pragma: no cover
pass pass
@abc.abstractmethod
async def stream_content(
self, url: str, timeout: int, chunk_size: int
) -> AsyncGenerator[bytes, None]: # pragma: no cover
yield b""
def prepare_value(self, value: Any) -> Union[str, int, bool]: def prepare_value(self, value: Any) -> Union[str, int, bool]:
if isinstance(value, str): if isinstance(value, str):
return value return value

View file

@ -1,5 +1,5 @@
from collections import deque from collections import deque
from typing import TYPE_CHECKING, Deque, Optional, Type from typing import TYPE_CHECKING, AsyncGenerator, Deque, Optional, Type
from aiogram import Bot from aiogram import Bot
from aiogram.api.client.session.base import BaseSession from aiogram.api.client.session.base import BaseSession
@ -29,6 +29,11 @@ class MockedSession(BaseSession):
self.raise_for_status(response) self.raise_for_status(response)
return response.result # type: ignore return response.result # type: ignore
async def stream_content(
self, url: str, timeout: int, chunk_size: int
) -> AsyncGenerator[bytes, None]: # pragma: no cover
yield b""
class MockedBot(Bot): class MockedBot(Bot):
if TYPE_CHECKING: if TYPE_CHECKING:

View file

@ -1,5 +1,5 @@
import datetime import datetime
from typing import AsyncContextManager from typing import AsyncContextManager, AsyncGenerator
import pytest import pytest
@ -22,6 +22,14 @@ class CustomSession(BaseSession):
assert isinstance(token, str) assert isinstance(token, str)
assert isinstance(method, TelegramMethod) assert isinstance(method, TelegramMethod)
async def stream_content(
self, url: str, timeout: int, chunk_size: int
) -> AsyncGenerator[bytes, None]: # pragma: no cover
assert isinstance(url, str)
assert isinstance(timeout, int)
assert isinstance(chunk_size, int)
yield b"\f" * 10
class TestBaseSession(DataMixin): class TestBaseSession(DataMixin):
def test_init_api(self): def test_init_api(self):
@ -100,6 +108,17 @@ class TestBaseSession(DataMixin):
assert await session.make_request("42:TEST", GetMe()) is None assert await session.make_request("42:TEST", GetMe()) is None
@pytest.mark.asyncio
async def test_stream_content(self):
session = CustomSession()
stream = session.stream_content(
"https://www.python.org/static/img/python-logo.png", timeout=5, chunk_size=65536
)
assert isinstance(stream, AsyncGenerator)
async for chunk in stream:
assert isinstance(chunk, bytes)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_context_manager(self): async def test_context_manager(self):
session = CustomSession() session = CustomSession()