mirror of
https://github.com/aiogram/aiogram.git
synced 2025-12-06 07:50:32 +00:00
In this commit, a new function `get_mounted_bot` was added to `context_controller.py` that returns the bot mounted in context. This function was needed to bypass the limitation in pydantic BaseModel's properties, which neither support computed fields nor serialization/validation. Various tests were also updated to compare models using `model_dump_json()` method rather than comparing the models directly. This change provides more accurate comparisons by considering default values in the models. Further, the dispatcher was adjusted to enforce update re-mounting if the mounted bot differs from the current update. This allows shortcuts to be used in the bot's current instance and ensures the correct propagation of the context to all the nested objects and attributes of Updates.
97 lines
3 KiB
Python
97 lines
3 KiB
Python
from collections import deque
|
|
from typing import TYPE_CHECKING, Any, AsyncGenerator, Deque, Dict, Optional, Type
|
|
|
|
from aiogram import Bot
|
|
from aiogram.client.session.base import BaseSession
|
|
from aiogram.methods import TelegramMethod
|
|
from aiogram.methods.base import Response, TelegramType
|
|
from aiogram.types import UNSET_PARSE_MODE, ResponseParameters, User
|
|
|
|
|
|
class MockedSession(BaseSession):
|
|
def __init__(self):
|
|
super(MockedSession, self).__init__()
|
|
self.responses: Deque[Response[TelegramType]] = deque()
|
|
self.requests: Deque[TelegramMethod[TelegramType]] = deque()
|
|
self.closed = True
|
|
|
|
def add_result(self, response: Response[TelegramType]) -> Response[TelegramType]:
|
|
self.responses.append(response)
|
|
return response
|
|
|
|
def get_request(self) -> TelegramMethod[TelegramType]:
|
|
return self.requests.pop()
|
|
|
|
async def close(self):
|
|
self.closed = True
|
|
|
|
async def make_request(
|
|
self,
|
|
bot: Bot,
|
|
method: TelegramMethod[TelegramType],
|
|
timeout: Optional[int] = UNSET_PARSE_MODE,
|
|
) -> TelegramType:
|
|
self.closed = False
|
|
self.requests.append(method)
|
|
response: Response[TelegramType] = self.responses.pop()
|
|
self.check_response(
|
|
bot=bot,
|
|
method=method,
|
|
status_code=response.error_code,
|
|
content=response.model_dump_json(),
|
|
)
|
|
return response.result # type: ignore
|
|
|
|
async def stream_content(
|
|
self,
|
|
url: str,
|
|
headers: Optional[Dict[str, Any]] = None,
|
|
timeout: int = 30,
|
|
chunk_size: int = 65536,
|
|
raise_for_status: bool = True,
|
|
) -> AsyncGenerator[bytes, None]: # pragma: no cover
|
|
yield b""
|
|
|
|
|
|
class MockedBot(Bot):
|
|
if TYPE_CHECKING:
|
|
session: MockedSession
|
|
|
|
def __init__(self, **kwargs):
|
|
super(MockedBot, self).__init__(
|
|
kwargs.pop("token", "42:TEST"), session=MockedSession(), **kwargs
|
|
)
|
|
self._me = User(
|
|
id=self.id,
|
|
is_bot=True,
|
|
first_name="FirstName",
|
|
last_name="LastName",
|
|
username="tbot",
|
|
language_code="uk-UA",
|
|
)
|
|
|
|
def add_result_for(
|
|
self,
|
|
method: Type[TelegramMethod[TelegramType]],
|
|
ok: bool,
|
|
result: TelegramType = None,
|
|
description: Optional[str] = None,
|
|
error_code: int = 200,
|
|
migrate_to_chat_id: Optional[int] = None,
|
|
retry_after: Optional[int] = None,
|
|
) -> Response[TelegramType]:
|
|
response = Response[method.__returning__]( # type: ignore
|
|
ok=ok,
|
|
result=result,
|
|
description=description,
|
|
error_code=error_code,
|
|
parameters=ResponseParameters(
|
|
migrate_to_chat_id=migrate_to_chat_id,
|
|
retry_after=retry_after,
|
|
),
|
|
)
|
|
self.session.add_result(response)
|
|
return response
|
|
|
|
def get_request(self) -> TelegramMethod[TelegramType]:
|
|
return self.session.get_request()
|