Dev 3.x flat package (#961)

* Move packages

* Added changelog

* Update examples/echo_bot.py

Co-authored-by: Oleg A. <t0rr@mail.ru>

* Rename `handler` -> `handlers`

* Update __init__.py

Co-authored-by: Oleg A. <t0rr@mail.ru>
This commit is contained in:
Alex Root Junior 2022-08-14 01:07:52 +03:00 committed by GitHub
parent 5e7932ca20
commit 4315ecf1a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
111 changed files with 376 additions and 390 deletions

View file

@ -5,8 +5,8 @@ import pytest
from aiogram import F
from aiogram.dispatcher.event.handler import CallableMixin, FilterObject, HandlerObject
from aiogram.dispatcher.filters.base import BaseFilter
from aiogram.dispatcher.handler.base import BaseHandler
from aiogram.filters import BaseFilter
from aiogram.handlers import BaseHandler
from aiogram.types import Update
pytestmark = pytest.mark.asyncio

View file

@ -7,9 +7,9 @@ import pytest
from aiogram.dispatcher.event.bases import REJECTED, SkipHandler
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.dispatcher.event.telegram import TelegramEventObserver
from aiogram.dispatcher.filters.base import BaseFilter
from aiogram.dispatcher.router import Router
from aiogram.exceptions import FiltersResolveError
from aiogram.filters import BaseFilter
from aiogram.types import Chat, Message, User
pytestmark = pytest.mark.asyncio

View file

@ -1,35 +0,0 @@
from typing import Awaitable
import pytest
from aiogram.dispatcher.filters.base import BaseFilter
try:
from asynctest import CoroutineMock, patch
except ImportError:
from unittest.mock import AsyncMock as CoroutineMock # type: ignore
from unittest.mock import patch
pytestmark = pytest.mark.asyncio
class MyFilter(BaseFilter):
foo: str
async def __call__(self, event: str):
return
class TestBaseFilter:
async def test_awaitable(self):
my_filter = MyFilter(foo="bar")
assert isinstance(my_filter, Awaitable)
with patch(
"tests.test_dispatcher.test_filters.test_base.MyFilter.__call__",
new_callable=CoroutineMock,
) as mocked_call:
call = my_filter(event="test")
await call
mocked_call.assert_awaited_with(event="test")

View file

@ -1,181 +0,0 @@
from decimal import Decimal
from enum import Enum, auto
from fractions import Fraction
from typing import Optional
from uuid import UUID
import pytest
from magic_filter import MagicFilter
from pydantic import ValidationError
from aiogram import F
from aiogram.dispatcher.filters.callback_data import CallbackData
from aiogram.types import CallbackQuery, User
pytestmark = pytest.mark.asyncio
class MyIntEnum(Enum):
FOO = auto()
class MyStringEnum(str, Enum):
FOO = "FOO"
class MyCallback(CallbackData, prefix="test"):
foo: str
bar: int
class TestCallbackData:
def test_init_subclass_prefix_required(self):
assert MyCallback.__prefix__ == "test"
with pytest.raises(ValueError, match="prefix required.+"):
class MyInvalidCallback(CallbackData):
pass
def test_init_subclass_sep_validation(self):
assert MyCallback.__separator__ == ":"
class MyCallback2(CallbackData, prefix="test2", sep="@"):
pass
assert MyCallback2.__separator__ == "@"
with pytest.raises(ValueError, match="Separator symbol '@' .+ 'sp@m'"):
class MyInvalidCallback(CallbackData, prefix="sp@m", sep="@"):
pass
@pytest.mark.parametrize(
"value,success,expected",
[
[None, True, ""],
[42, True, "42"],
["test", True, "test"],
[9.99, True, "9.99"],
[Decimal("9.99"), True, "9.99"],
[Fraction("3/2"), True, "3/2"],
[
UUID("123e4567-e89b-12d3-a456-426655440000"),
True,
"123e4567-e89b-12d3-a456-426655440000",
],
[MyIntEnum.FOO, True, "1"],
[MyStringEnum.FOO, True, "FOO"],
[..., False, "..."],
[object, False, "..."],
[object(), False, "..."],
[User(id=42, is_bot=False, first_name="test"), False, "..."],
],
)
def test_encode_value(self, value, success, expected):
callback = MyCallback(foo="test", bar=42)
if success:
assert callback._encode_value("test", value) == expected
else:
with pytest.raises(ValueError):
assert callback._encode_value("test", value) == expected
def test_pack(self):
with pytest.raises(ValueError, match="Separator symbol .+"):
assert MyCallback(foo="te:st", bar=42).pack()
with pytest.raises(ValueError, match=".+is too long.+"):
assert MyCallback(foo="test" * 32, bar=42).pack()
assert MyCallback(foo="test", bar=42).pack() == "test:test:42"
def test_pack_optional(self):
class MyCallback1(CallbackData, prefix="test1"):
foo: str
bar: Optional[int] = None
assert MyCallback1(foo="spam").pack() == "test1:spam:"
assert MyCallback1(foo="spam", bar=42).pack() == "test1:spam:42"
class MyCallback2(CallbackData, prefix="test2"):
foo: Optional[str] = None
bar: int
assert MyCallback2(bar=42).pack() == "test2::42"
assert MyCallback2(foo="spam", bar=42).pack() == "test2:spam:42"
class MyCallback3(CallbackData, prefix="test3"):
foo: Optional[str] = "experiment"
bar: int
assert MyCallback3(bar=42).pack() == "test3:experiment:42"
assert MyCallback3(foo="spam", bar=42).pack() == "test3:spam:42"
def test_unpack(self):
with pytest.raises(TypeError, match=".+ takes 2 arguments but 3 were given"):
MyCallback.unpack("test:test:test:test")
with pytest.raises(ValueError, match="Bad prefix .+"):
MyCallback.unpack("spam:test:test")
assert MyCallback.unpack("test:test:42") == MyCallback(foo="test", bar=42)
def test_unpack_optional(self):
with pytest.raises(ValidationError):
assert MyCallback.unpack("test:test:")
class MyCallback1(CallbackData, prefix="test1"):
foo: str
bar: Optional[int] = None
assert MyCallback1.unpack("test1:spam:") == MyCallback1(foo="spam")
assert MyCallback1.unpack("test1:spam:42") == MyCallback1(foo="spam", bar=42)
class MyCallback2(CallbackData, prefix="test2"):
foo: Optional[str] = None
bar: int
assert MyCallback2.unpack("test2::42") == MyCallback2(bar=42)
assert MyCallback2.unpack("test2:spam:42") == MyCallback2(foo="spam", bar=42)
class MyCallback3(CallbackData, prefix="test3"):
foo: Optional[str] = "experiment"
bar: int
assert MyCallback3.unpack("test3:experiment:42") == MyCallback3(bar=42)
assert MyCallback3.unpack("test3:spam:42") == MyCallback3(foo="spam", bar=42)
def test_build_filter(self):
filter_object = MyCallback.filter(F.foo == "test")
assert isinstance(filter_object.rule, MagicFilter)
assert filter_object.callback_data is MyCallback
class TestCallbackDataFilter:
@pytest.mark.parametrize(
"query,rule,result",
[
["test", F.foo == "test", False],
["test:spam:42", F.foo == "test", False],
["test:test:42", F.foo == "test", {"callback_data": MyCallback(foo="test", bar=42)}],
["test:test:42", None, {"callback_data": MyCallback(foo="test", bar=42)}],
["test:test:777", None, {"callback_data": MyCallback(foo="test", bar=777)}],
["spam:test:777", None, False],
["test:test:", F.foo == "test", False],
["test:test:", None, False],
],
)
async def test_call(self, query, rule, result):
callback_query = CallbackQuery(
id="1",
from_user=User(id=42, is_bot=False, first_name="test"),
data=query,
chat_instance="test",
)
filter_object = MyCallback.filter(rule)
assert await filter_object(callback_query) == result
async def test_invalid_call(self):
filter_object = MyCallback.filter(F.test)
assert not await filter_object(User(id=42, is_bot=False, first_name="test"))

View file

@ -1,345 +0,0 @@
from datetime import datetime
import pytest
from aiogram.dispatcher.filters.chat_member_updated import (
ADMINISTRATOR,
IS_MEMBER,
JOIN_TRANSITION,
LEAVE_TRANSITION,
ChatMemberUpdatedFilter,
_MemberStatusGroupMarker,
_MemberStatusMarker,
_MemberStatusTransition,
)
from aiogram.types import Chat, ChatMember, ChatMemberUpdated, User
class TestMemberStatusMarker:
def test_str(self):
marker = _MemberStatusMarker("test")
assert str(marker) == "TEST"
assert str(+marker) == "+TEST"
assert str(-marker) == "-TEST"
def test_pos(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
positive_marker = +marker
assert positive_marker is not marker
assert marker.is_member is None
assert positive_marker.is_member is True
def test_neg(self):
marker = _MemberStatusMarker("test")
assert marker.is_member is None
negative_marker = -marker
assert negative_marker is not marker
assert marker.is_member is None
assert negative_marker.is_member is False
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
combination = marker1 | marker2
assert isinstance(combination, _MemberStatusGroupMarker)
assert marker1 in combination.statuses
assert marker2 in combination.statuses
combination2 = marker1 | marker1
assert isinstance(combination2, _MemberStatusGroupMarker)
assert len(combination2.statuses) == 1
marker3 = _MemberStatusMarker("test3")
combination3 = marker3 | combination
assert isinstance(combination3, _MemberStatusGroupMarker)
assert marker3 in combination3.statuses
assert len(combination3.statuses) == 3
assert combination3 is not combination
with pytest.raises(TypeError):
marker1 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 >> marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker1 in transition.old.statuses
assert marker2 in transition.new.statuses
transition2 = marker1 >> (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
transition = marker1 << marker2
assert isinstance(transition, _MemberStatusTransition)
assert marker2 in transition.old.statuses
assert marker1 in transition.new.statuses
transition2 = marker1 << (marker2 | marker3)
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
marker1 << 42
def test_hash(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
assert hash(marker1) != hash(marker2)
assert hash(marker1) == hash(marker1_1)
assert hash(marker1) != hash(-marker1)
@pytest.mark.parametrize(
"name,is_member,member,result",
[
["test", None, ChatMember(status="member"), False],
["test", None, ChatMember(status="test"), True],
["test", True, ChatMember(status="test"), False],
["test", True, ChatMember(status="test", is_member=True), True],
["test", True, ChatMember(status="test", is_member=False), False],
],
)
def test_check(self, name, is_member, member, result):
marker = _MemberStatusMarker(name, is_member=is_member)
assert marker.check(member=member) == result
class TestMemberStatusGroupMarker:
def test_init_unique(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group = _MemberStatusGroupMarker(marker1, marker1, marker2, marker3)
assert len(group.statuses) == 3
def test_init_empty(self):
with pytest.raises(ValueError):
_MemberStatusGroupMarker()
def test_or(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
marker4 = _MemberStatusMarker("test4")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker3, marker4)
group3 = group1 | marker3
assert isinstance(group3, _MemberStatusGroupMarker)
assert len(group3.statuses) == 3
group4 = group1 | group2
assert isinstance(group4, _MemberStatusGroupMarker)
assert len(group4.statuses) == 4
with pytest.raises(TypeError):
group4 | 42
def test_rshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 >> marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.old is group1
assert marker1 in transition1.new.statuses
transition2 = group1 >> group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 >> 42
def test_lshift(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
marker3 = _MemberStatusMarker("test3")
group1 = _MemberStatusGroupMarker(marker1, marker2)
group2 = _MemberStatusGroupMarker(marker1, marker3)
transition1 = group1 << marker1
assert isinstance(transition1, _MemberStatusTransition)
assert transition1.new is group1
assert marker1 in transition1.old.statuses
transition2 = group1 << group2
assert isinstance(transition2, _MemberStatusTransition)
with pytest.raises(TypeError):
group1 << 42
def test_str(self):
marker1 = _MemberStatusMarker("test1")
marker1_1 = +marker1
marker2 = _MemberStatusMarker("test2")
group1 = marker1 | marker1
assert str(group1) == "TEST1"
group2 = marker1 | marker2
assert str(group2) == "(TEST1 | TEST2)"
group3 = marker1 | marker1_1
assert str(group3) == "(+TEST1 | TEST1)"
@pytest.mark.parametrize(
"status,result",
[
["test", False],
["test1", True],
["test2", True],
],
)
def test_check(self, status, result):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
group = marker1 | marker2
assert group.check(member=ChatMember(status=status)) is result
class TestMemberStatusTransition:
def test_invert(self):
marker1 = _MemberStatusMarker("test1")
marker2 = _MemberStatusMarker("test2")
transition1 = marker1 >> marker2
transition2 = ~transition1
assert transition1 is not transition2
assert transition1.old == transition2.new
assert transition1.new == transition2.old
assert str(transition1) == "TEST1 >> TEST2"
assert str(transition2) == "TEST2 >> TEST1"
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
],
)
def test_check(self, transition, old, new, result):
assert transition.check(old=old, new=new) == result
class TestChatMemberUpdatedStatusFilter:
@pytest.mark.asyncio
@pytest.mark.parametrize(
"transition,old,new,result",
[
[JOIN_TRANSITION, ChatMember(status="left"), ChatMember(status="member"), True],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=True),
ChatMember(status="member"),
False,
],
[
JOIN_TRANSITION,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
[
JOIN_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
False,
],
[
LEAVE_TRANSITION,
ChatMember(status="member"),
ChatMember(status="restricted", is_member=False),
True,
],
[
ADMINISTRATOR,
ChatMember(status="member"),
ChatMember(status="administrator"),
True,
],
[
IS_MEMBER,
ChatMember(status="restricted", is_member=False),
ChatMember(status="member"),
True,
],
],
)
async def test_call(self, transition, old, new, result):
updated_filter = ChatMemberUpdatedFilter(member_status_changed=transition)
user = User(id=42, first_name="Test", is_bot=False)
update = {
"user": user,
"until_date": datetime.now(),
"is_anonymous": False,
"can_be_edited": True,
"can_manage_chat": True,
"can_delete_messages": True,
"can_manage_video_chats": True,
"can_restrict_members": True,
"can_promote_members": True,
"can_change_info": True,
"can_invite_users": True,
"can_post_messages": True,
"can_edit_messages": True,
"can_pin_messages": True,
"can_send_messages": True,
"can_send_media_messages": True,
"can_send_polls": True,
"can_send_other_messages": True,
"can_add_web_page_previews": True,
}
event = ChatMemberUpdated(
chat=Chat(id=42, type="test"),
from_user=user,
old_chat_member=old.copy(update=update),
new_chat_member=new.copy(update=update),
date=datetime.now(),
)
assert await updated_filter(event) is result

View file

@ -1,153 +0,0 @@
import datetime
import re
import pytest
from aiogram import F
from aiogram.dispatcher.filters import Command, CommandObject
from aiogram.dispatcher.filters.command import CommandStart
from aiogram.types import Chat, Message, User
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
class TestCommandFilter:
def test_convert_to_list(self):
cmd = Command(commands="start")
assert cmd.commands
assert isinstance(cmd.commands, list)
assert cmd.commands[0] == "start"
assert cmd == Command(commands=["start"])
@pytest.mark.parametrize(
"text,command,result",
[
["/test@tbot", Command(commands=["test"], commands_prefix="/"), True],
["!test", Command(commands=["test"], commands_prefix="/"), False],
["/test@mention", Command(commands=["test"], commands_prefix="/"), False],
["/tests", Command(commands=["test"], commands_prefix="/"), False],
["/", Command(commands=["test"], commands_prefix="/"), False],
["/ test", Command(commands=["test"], commands_prefix="/"), False],
["", Command(commands=["test"], commands_prefix="/"), False],
[" ", Command(commands=["test"], commands_prefix="/"), False],
["test", Command(commands=["test"], commands_prefix="/"), False],
[" test", Command(commands=["test"], commands_prefix="/"), False],
["a", Command(commands=["test"], commands_prefix="/"), False],
["/test@tbot some args", Command(commands=["test"]), True],
["/test42@tbot some args", Command(commands=[re.compile(r"test(\d+)")]), True],
[
"/test42@tbot some args",
Command(commands=[re.compile(r"test(\d+)")], command_magic=F.args == "some args"),
True,
],
[
"/test42@tbot some args",
Command(commands=[re.compile(r"test(\d+)")], command_magic=F.args == "test"),
False,
],
["/start test", CommandStart(), True],
["/start", CommandStart(deep_link=True), False],
["/start test", CommandStart(deep_link=True), True],
["/start test", CommandStart(deep_link=True, deep_link_encoded=True), False],
["/start dGVzdA", CommandStart(deep_link=True, deep_link_encoded=True), True],
],
)
async def test_parse_command(self, bot: MockedBot, text: str, result: bool, command: Command):
# TODO: test ignore case
# TODO: test ignore mention
message = Message(
message_id=0, text=text, chat=Chat(id=42, type="private"), date=datetime.datetime.now()
)
response = await command(message, bot)
assert bool(response) is result
@pytest.mark.parametrize(
"message,result",
[
[
Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
False,
],
[
Message(
message_id=42,
date=datetime.datetime.now(),
text="/test",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
],
)
async def test_call(self, message: Message, result: bool, bot: MockedBot):
command = Command(commands=["test"])
assert bool(await command(message=message, bot=bot)) is result
async def test_command_magic_result(self, bot: MockedBot):
message = Message(
message_id=0,
text="/test 42",
chat=Chat(id=42, type="private"),
date=datetime.datetime.now(),
)
command = Command(commands=["test"], command_magic=(F.args.as_("args")))
result = await command(message=message, bot=bot)
assert "args" in result
assert result["args"] == "42"
class TestCommandObject:
@pytest.mark.parametrize(
"obj,result",
[
[CommandObject(prefix="/", command="command", mention="mention", args="args"), True],
[CommandObject(prefix="/", command="command", args="args"), False],
],
)
def test_mentioned(self, obj: CommandObject, result: bool):
assert isinstance(obj.mentioned, bool)
assert obj.mentioned is result
@pytest.mark.parametrize(
"obj,result",
[
[
CommandObject(prefix="/", command="command", mention="mention", args="args"),
"/command@mention args",
],
[
CommandObject(prefix="/", command="command", mention="mention", args=None),
"/command@mention",
],
[
CommandObject(prefix="/", command="command", mention=None, args="args"),
"/command args",
],
[CommandObject(prefix="/", command="command", mention=None, args=None), "/command"],
[CommandObject(prefix="!", command="command", mention=None, args=None), "!command"],
],
)
def test_text(self, obj: CommandObject, result: str):
assert obj.text == result
def test_update_handler_flags(self):
cmd = Command(commands=["start"])
flags = {}
cmd.update_handler_flags(flags)
assert "commands" in flags
assert isinstance(flags["commands"], list)
assert len(flags["commands"]) == 1
assert flags["commands"][0] is cmd
cmd.update_handler_flags(flags)
assert len(flags["commands"]) == 2

View file

@ -1,52 +0,0 @@
from dataclasses import dataclass
from typing import cast
import pytest
from pydantic import ValidationError
from aiogram.dispatcher.filters import ContentTypesFilter
from aiogram.types import ContentType, Message
pytestmark = pytest.mark.asyncio
@dataclass
class MinimalMessage:
content_type: str
class TestContentTypesFilter:
def test_validator_empty_list(self):
filter_ = ContentTypesFilter(content_types=[])
assert filter_.content_types == []
def test_convert_to_list(self):
filter_ = ContentTypesFilter(content_types="text")
assert filter_.content_types
assert isinstance(filter_.content_types, list)
assert filter_.content_types[0] == "text"
assert filter_ == ContentTypesFilter(content_types=["text"])
@pytest.mark.parametrize("values", [["text", "photo"], ["sticker"]])
def test_validator_with_values(self, values):
filter_ = ContentTypesFilter(content_types=values)
assert filter_.content_types == values
@pytest.mark.parametrize("values", [["test"], ["text", "test"], ["TEXT"]])
def test_validator_with_bad_values(self, values):
with pytest.raises(ValidationError):
ContentTypesFilter(content_types=values)
@pytest.mark.parametrize(
"values,content_type,result",
[
[[ContentType.TEXT], ContentType.TEXT, True],
[[ContentType.PHOTO], ContentType.TEXT, False],
[[ContentType.ANY], ContentType.TEXT, True],
[[ContentType.TEXT, ContentType.PHOTO, ContentType.DOCUMENT], ContentType.TEXT, True],
[[ContentType.ANY, ContentType.PHOTO, ContentType.DOCUMENT], ContentType.TEXT, True],
],
)
async def test_call(self, values, content_type, result):
filter_ = ContentTypesFilter(content_types=values)
assert await filter_(cast(Message, MinimalMessage(content_type=content_type))) == result

View file

@ -1,68 +0,0 @@
import re
import pytest
from aiogram import Dispatcher
from aiogram.dispatcher.filters import ExceptionMessageFilter, ExceptionTypeFilter
from aiogram.types import Update
pytestmark = pytest.mark.asyncio
class TestExceptionMessageFilter:
@pytest.mark.parametrize("value", ["value", re.compile("value")])
def test_converter(self, value):
obj = ExceptionMessageFilter(pattern=value)
assert isinstance(obj.pattern, re.Pattern)
async def test_match(self):
obj = ExceptionMessageFilter(pattern="KABOOM")
result = await obj(Update(update_id=0), exception=Exception())
assert not result
result = await obj(Update(update_id=0), exception=Exception("KABOOM"))
assert isinstance(result, dict)
assert "match_exception" in result
class MyException(Exception):
pass
class MyAnotherException(MyException):
pass
class TestExceptionTypeFilter:
@pytest.mark.parametrize(
"exception,value",
[
[Exception(), False],
[ValueError(), False],
[TypeError(), False],
[MyException(), True],
[MyAnotherException(), True],
],
)
async def test_check(self, exception: Exception, value: bool):
obj = ExceptionTypeFilter(exception=MyException)
result = await obj(Update(update_id=0), exception=exception)
assert result == value
class TestDispatchException:
async def test_handle_exception(self, bot):
dp = Dispatcher()
@dp.update()
async def update_handler(update):
raise ValueError("KABOOM")
@dp.errors(ExceptionMessageFilter(pattern="KABOOM"))
async def handler0(update, exception):
return "Handled"
assert await dp.feed_update(bot, Update(update_id=0)) == "Handled"

View file

@ -1,37 +0,0 @@
import pytest
from aiogram.dispatcher.filters import Text, and_f, invert_f, or_f
from aiogram.dispatcher.filters.logic import _AndFilter, _InvertFilter, _OrFilter
class TestLogic:
@pytest.mark.parametrize(
"obj,case,result",
[
[True, and_f(lambda t: t is True, lambda t: t is True), True],
[True, and_f(lambda t: t is True, lambda t: t is False), False],
[True, and_f(lambda t: t is False, lambda t: t is False), False],
[True, and_f(lambda t: {"t": t}, lambda t: t is False), False],
[True, and_f(lambda t: {"t": t}, lambda t: t is True), {"t": True}],
[True, or_f(lambda t: t is True, lambda t: t is True), True],
[True, or_f(lambda t: t is True, lambda t: t is False), True],
[True, or_f(lambda t: t is False, lambda t: t is False), False],
[True, or_f(lambda t: t is False, lambda t: t is True), True],
[True, or_f(lambda t: t is False, lambda t: {"t": t}), {"t": True}],
[True, or_f(lambda t: {"t": t}, lambda t: {"a": 42}), {"t": True}],
[True, invert_f(lambda t: t is False), True],
],
)
async def test_logic(self, obj, case, result):
assert await case(obj) == result
@pytest.mark.parametrize(
"case,type_",
[
[Text(text="test") | Text(text="test"), _OrFilter],
[Text(text="test") & Text(text="test"), _AndFilter],
[~Text(text="test"), _InvertFilter],
],
)
def test_dunder_methods(self, case, type_):
assert isinstance(case, type_)

View file

@ -1,30 +0,0 @@
import pytest
from magic_filter import AttrDict
from aiogram import F
from aiogram.dispatcher.filters import MagicData
from aiogram.types import Update
class TestMagicDataFilter:
@pytest.mark.asyncio
async def test_call(self):
called = False
def check(value):
nonlocal called
called = True
assert isinstance(value, AttrDict)
assert value[0] == "foo"
assert value[1] == "bar"
assert value["spam"] is True
assert value.spam is True
return value
f = MagicData(magic_data=F.func(check).as_("test"))
result = await f(Update(update_id=123), "foo", "bar", spam=True)
assert called
assert isinstance(result, dict)
assert result["test"]

View file

@ -1,74 +0,0 @@
from copy import copy
from inspect import isclass
import pytest
from aiogram.dispatcher.event.handler import FilterObject
from aiogram.dispatcher.filters import StateFilter
from aiogram.dispatcher.fsm.state import State, StatesGroup
from aiogram.types import Update
pytestmark = pytest.mark.asyncio
class MyGroup(StatesGroup):
state = State()
class TestStateFilter:
@pytest.mark.parametrize(
"state", [None, State("test"), MyGroup, MyGroup(), "state", ["state"]]
)
def test_validator(self, state):
f = StateFilter(state=state)
assert isinstance(f.state, list)
value = f.state[0]
assert (
isinstance(value, (State, str, MyGroup))
or (isclass(value) and issubclass(value, StatesGroup))
or value is None
)
@pytest.mark.parametrize(
"state,current_state,result",
[
[State("state"), "@:state", True],
[[State("state")], "@:state", True],
[MyGroup, "MyGroup:state", True],
[[MyGroup], "MyGroup:state", True],
[MyGroup(), "MyGroup:state", True],
[[MyGroup()], "MyGroup:state", True],
["*", "state", True],
[None, None, True],
[[None], None, True],
[None, "state", False],
[[], "state", False],
[[State("state"), "state"], "state", True],
[[MyGroup(), State("state")], "@:state", True],
[[MyGroup, State("state")], "state", False],
],
)
@pytestmark
async def test_filter(self, state, current_state, result):
f = StateFilter(state=state)
assert bool(await f(obj=Update(update_id=42), raw_state=current_state)) is result
@pytestmark
async def test_create_filter_from_state(self):
FilterObject(callback=State(state="state"))
@pytestmark
async def test_state_copy(self):
class SG(StatesGroup):
state = State()
assert SG.state == copy(SG.state)
assert SG.state == "SG:state"
assert "SG:state" == SG.state
assert State() == State()
assert SG.state != 1
states = {SG.state: "OK"}
assert states.get(copy(SG.state)) == "OK"

View file

@ -1,247 +0,0 @@
import datetime
from itertools import permutations
from typing import Sequence, Type
import pytest
from pydantic import ValidationError
from aiogram.dispatcher.filters import BUILTIN_FILTERS
from aiogram.dispatcher.filters.text import Text
from aiogram.types import CallbackQuery, Chat, InlineQuery, Message, Poll, PollOption, User
pytestmark = pytest.mark.asyncio
class TestText:
def test_default_for_observer(self):
registered_for = {
update_type for update_type, filters in BUILTIN_FILTERS.items() if Text in filters
}
assert registered_for == {
"message",
"edited_message",
"channel_post",
"edited_channel_post",
"inline_query",
"callback_query",
}
def test_validator_not_enough_arguments(self):
with pytest.raises(ValidationError):
Text()
with pytest.raises(ValidationError):
Text(text_ignore_case=True)
@pytest.mark.parametrize(
"first,last",
permutations(["text", "text_contains", "text_startswith", "text_endswith"], 2),
)
@pytest.mark.parametrize("ignore_case", [True, False])
def test_validator_too_few_arguments(self, first, last, ignore_case):
kwargs = {first: "test", last: "test"}
if ignore_case:
kwargs["text_ignore_case"] = True
with pytest.raises(ValidationError):
Text(**kwargs)
@pytest.mark.parametrize(
"argument", ["text", "text_contains", "text_startswith", "text_endswith"]
)
@pytest.mark.parametrize("input_type", [str, list, tuple])
def test_validator_convert_to_list(self, argument: str, input_type: Type):
text = Text(**{argument: input_type("test")})
assert hasattr(text, argument)
assert isinstance(getattr(text, argument), Sequence)
@pytest.mark.parametrize(
"argument,ignore_case,input_value,update_type,result",
[
[
"text",
False,
"test",
Message(
message_id=42,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
False,
],
[
"text",
False,
"test",
Message(
message_id=42,
date=datetime.datetime.now(),
caption="test",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text",
False,
"test",
Message(
message_id=42,
date=datetime.datetime.now(),
text="test",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text",
True,
"TEst",
Message(
message_id=42,
date=datetime.datetime.now(),
text="tesT",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text",
False,
"TEst",
Message(
message_id=42,
date=datetime.datetime.now(),
text="tesT",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
False,
],
[
"text_startswith",
False,
"test",
Message(
message_id=42,
date=datetime.datetime.now(),
text="test case",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text_endswith",
False,
"case",
Message(
message_id=42,
date=datetime.datetime.now(),
text="test case",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text_contains",
False,
" ",
Message(
message_id=42,
date=datetime.datetime.now(),
text="test case",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text_startswith",
True,
"question",
Message(
message_id=42,
date=datetime.datetime.now(),
poll=Poll(
id="poll id",
question="Question?",
options=[PollOption(text="A", voter_count=0)],
is_closed=False,
is_anonymous=False,
type="regular",
allows_multiple_answers=False,
total_voter_count=0,
),
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
True,
],
[
"text_startswith",
True,
"callback:",
CallbackQuery(
id="query id",
from_user=User(id=42, is_bot=False, first_name="Test"),
chat_instance="instance",
data="callback:data",
),
True,
],
[
"text_startswith",
True,
"query",
InlineQuery(
id="query id",
from_user=User(id=42, is_bot=False, first_name="Test"),
query="query line",
offset="offset",
),
True,
],
[
"text",
True,
"question",
Poll(
id="poll id",
question="Question",
options=[PollOption(text="A", voter_count=0)],
is_closed=False,
is_anonymous=False,
type="regular",
allows_multiple_answers=False,
total_voter_count=0,
),
True,
],
[
"text",
True,
["question", "another question"],
Poll(
id="poll id",
question="Another question",
options=[PollOption(text="A", voter_count=0)],
is_closed=False,
is_anonymous=False,
type="quiz",
allows_multiple_answers=False,
total_voter_count=0,
correct_option_id=0,
),
True,
],
["text", True, ["question", "another question"], object(), False],
],
)
async def test_check_text(self, argument, ignore_case, input_value, result, update_type):
text = Text(**{argument: input_value}, text_ignore_case=ignore_case)
assert await text(obj=update_type) is result

View file

@ -1,66 +0,0 @@
import pytest
from aiogram.dispatcher.flags.flag import Flag, FlagDecorator, FlagGenerator
@pytest.fixture(name="flag")
def flag_fixture() -> Flag:
return Flag("test", True)
@pytest.fixture(name="flag_decorator")
def flag_decorator_fixture(flag: Flag) -> FlagDecorator:
return FlagDecorator(flag)
@pytest.fixture(name="flag_generator")
def flag_flag_generator() -> FlagGenerator:
return FlagGenerator()
class TestFlagDecorator:
def test_with_value(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator._with_value(True)
assert new_decorator is not flag_decorator
assert new_decorator.flag is not flag_decorator.flag
assert new_decorator.flag
def test_call_invalid(self, flag_decorator: FlagDecorator):
with pytest.raises(ValueError):
flag_decorator(True, test=True)
def test_call_with_function(self, flag_decorator: FlagDecorator):
def func():
pass
decorated = flag_decorator(func)
assert decorated is func
assert hasattr(decorated, "aiogram_flag")
def test_call_with_arg(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator("hello")
assert new_decorator is not flag_decorator
assert new_decorator.flag.value == "hello"
def test_call_with_kwargs(self, flag_decorator: FlagDecorator):
new_decorator = flag_decorator(test=True)
assert new_decorator is not flag_decorator
assert isinstance(new_decorator.flag.value, dict)
assert "test" in new_decorator.flag.value
class TestFlagGenerator:
def test_getattr(self):
generator = FlagGenerator()
assert isinstance(generator.foo, FlagDecorator)
assert isinstance(generator.bar, FlagDecorator)
assert generator.foo is not generator.foo
assert generator.foo is not generator.bar
def test_failed_getattr(self):
generator = FlagGenerator()
with pytest.raises(AttributeError):
generator._something

View file

@ -1,64 +0,0 @@
from unittest.mock import patch
import pytest
from aiogram import F
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.dispatcher.flags.getter import (
check_flags,
extract_flags,
extract_flags_from_object,
get_flag,
)
class TestGetters:
def test_extract_flags_from_object(self):
def func():
pass
assert extract_flags_from_object(func) == {}
func.aiogram_flag = {"test": True}
assert extract_flags_from_object(func) == func.aiogram_flag
@pytest.mark.parametrize(
"obj,result",
[
[None, {}],
[{}, {}],
[{"handler": None}, {}],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, {"test": True}],
],
)
def test_extract_flags(self, obj, result):
assert extract_flags(obj) == result
@pytest.mark.parametrize(
"obj,name,default,result",
[
[None, "test", None, None],
[None, "test", 42, 42],
[{}, "test", None, None],
[{}, "test", 42, 42],
[{"handler": None}, "test", None, None],
[{"handler": None}, "test", 42, 42],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test", None, True],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test2", None, None],
[{"handler": HandlerObject(lambda: True, flags={"test": True})}, "test2", 42, 42],
],
)
def test_get_flag(self, obj, name, default, result):
assert get_flag(obj, name, default=default) == result
@pytest.mark.parametrize(
"flags,magic,result",
[
[{}, F.test, None],
[{"test": True}, F.test, True],
[{"test": True}, F.spam, None],
],
)
def test_check_flag(self, flags, magic, result):
with patch("aiogram.dispatcher.flags.getter.extract_flags", return_value=flags):
assert check_flags(object(), magic) == result

View file

@ -1,30 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.storage.base import BaseEventIsolation, StorageKey
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
@pytest.fixture(name="storage_key")
def create_storate_key(bot: MockedBot):
return StorageKey(chat_id=-42, user_id=42, bot_id=bot.id)
@pytest.mark.parametrize(
"isolation",
[
pytest.lazy_fixture("redis_isolation"),
pytest.lazy_fixture("lock_isolation"),
pytest.lazy_fixture("disabled_isolation"),
],
)
class TestIsolations:
async def test_lock(
self,
bot: MockedBot,
isolation: BaseEventIsolation,
storage_key: StorageKey,
):
async with isolation.lock(bot=bot, key=storage_key):
assert True, "You are kidding me?"

View file

@ -1,57 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.storage.base import DEFAULT_DESTINY, StorageKey
from aiogram.dispatcher.fsm.storage.redis import (
DefaultKeyBuilder,
RedisEventIsolation,
RedisStorage,
)
pytestmark = pytest.mark.asyncio
PREFIX = "test"
BOT_ID = 42
CHAT_ID = -1
USER_ID = 2
FIELD = "data"
class TestRedisDefaultKeyBuilder:
@pytest.mark.parametrize(
"with_bot_id,with_destiny,result",
[
[False, False, f"{PREFIX}:{CHAT_ID}:{USER_ID}:{FIELD}"],
[True, False, f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}:{FIELD}"],
[True, True, f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}"],
[False, True, f"{PREFIX}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}"],
],
)
async def test_generate_key(self, with_bot_id: bool, with_destiny: bool, result: str):
key_builder = DefaultKeyBuilder(
prefix=PREFIX,
with_bot_id=with_bot_id,
with_destiny=with_destiny,
)
key = StorageKey(chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID, destiny=DEFAULT_DESTINY)
assert key_builder.build(key, FIELD) == result
async def test_destiny_check(self):
key_builder = DefaultKeyBuilder(
with_destiny=False,
)
key = StorageKey(chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID)
assert key_builder.build(key, FIELD)
key = StorageKey(
chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID, destiny="CUSTOM_TEST_DESTINY"
)
with pytest.raises(ValueError):
key_builder.build(key, FIELD)
def test_create_isolation(self):
fake_redis = object()
storage = RedisStorage(redis=fake_redis)
isolation = storage.create_isolation()
assert isinstance(isolation, RedisEventIsolation)
assert isolation.redis is fake_redis
assert isolation.key_builder is storage.key_builder

View file

@ -1,49 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.storage.base import BaseStorage, StorageKey
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
@pytest.fixture(name="storage_key")
def create_storate_key(bot: MockedBot):
return StorageKey(chat_id=-42, user_id=42, bot_id=bot.id)
@pytest.mark.parametrize(
"storage",
[pytest.lazy_fixture("redis_storage"), pytest.lazy_fixture("memory_storage")],
)
class TestStorages:
async def test_set_state(self, bot: MockedBot, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_state(bot=bot, key=storage_key) is None
await storage.set_state(bot=bot, key=storage_key, state="state")
assert await storage.get_state(bot=bot, key=storage_key) == "state"
await storage.set_state(bot=bot, key=storage_key, state=None)
assert await storage.get_state(bot=bot, key=storage_key) is None
async def test_set_data(self, bot: MockedBot, storage: BaseStorage, storage_key: StorageKey):
assert await storage.get_data(bot=bot, key=storage_key) == {}
await storage.set_data(bot=bot, key=storage_key, data={"foo": "bar"})
assert await storage.get_data(bot=bot, key=storage_key) == {"foo": "bar"}
await storage.set_data(bot=bot, key=storage_key, data={})
assert await storage.get_data(bot=bot, key=storage_key) == {}
async def test_update_data(
self, bot: MockedBot, storage: BaseStorage, storage_key: StorageKey
):
assert await storage.get_data(bot=bot, key=storage_key) == {}
assert await storage.update_data(bot=bot, key=storage_key, data={"foo": "bar"}) == {
"foo": "bar"
}
assert await storage.update_data(bot=bot, key=storage_key, data={"baz": "spam"}) == {
"foo": "bar",
"baz": "spam",
}
assert await storage.get_data(bot=bot, key=storage_key) == {
"foo": "bar",
"baz": "spam",
}

View file

@ -1,59 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.context import FSMContext
from aiogram.dispatcher.fsm.storage.base import StorageKey
from aiogram.dispatcher.fsm.storage.memory import MemoryStorage
from tests.mocked_bot import MockedBot
pytestmark = pytest.mark.asyncio
@pytest.fixture()
def state(bot: MockedBot):
storage = MemoryStorage()
key = StorageKey(user_id=42, chat_id=-42, bot_id=bot.id)
ctx = storage.storage[key]
ctx.state = "test"
ctx.data = {"foo": "bar"}
return FSMContext(bot=bot, storage=storage, key=key)
class TestFSMContext:
async def test_address_mapping(self, bot: MockedBot):
storage = MemoryStorage()
ctx = storage.storage[StorageKey(chat_id=-42, user_id=42, bot_id=bot.id)]
ctx.state = "test"
ctx.data = {"foo": "bar"}
state = FSMContext(
bot=bot, storage=storage, key=StorageKey(chat_id=-42, user_id=42, bot_id=bot.id)
)
state2 = FSMContext(
bot=bot, storage=storage, key=StorageKey(chat_id=42, user_id=42, bot_id=bot.id)
)
state3 = FSMContext(
bot=bot, storage=storage, key=StorageKey(chat_id=69, user_id=69, bot_id=bot.id)
)
assert await state.get_state() == "test"
assert await state2.get_state() is None
assert await state3.get_state() is None
assert await state.get_data() == {"foo": "bar"}
assert await state2.get_data() == {}
assert await state3.get_data() == {}
await state2.set_state("experiments")
assert await state.get_state() == "test"
assert await state3.get_state() is None
await state3.set_data({"key": "value"})
assert await state2.get_data() == {}
await state.update_data({"key": "value"})
assert await state.get_data() == {"foo": "bar", "key": "value"}
await state.clear()
assert await state.get_state() is None
assert await state.get_data() == {}
assert await state2.get_state() == "experiments"

View file

@ -1,191 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.state import State, StatesGroup, any_state
class TestState:
def test_empty(self):
state = State()
assert state._state is None
assert state._group_name is None
assert state._group is None
with pytest.raises(RuntimeError):
assert state.group
assert state.state is None
assert str(state) == "<State ''>"
def test_star(self):
state = State(state="*")
assert state._state == "*"
assert state._group_name is None
assert state._group is None
with pytest.raises(RuntimeError):
assert state.group
assert state.state == "*"
assert str(state) == "<State '*'>"
def test_star_filter(self):
assert any_state(None, "foo")
assert any_state(None, "bar")
assert any_state(None, "baz")
def test_alone(self):
state = State("test")
assert state._state == "test"
assert state._group_name is None
assert state._group is None
with pytest.raises(RuntimeError):
assert state.group
assert state.state == "@:test"
assert str(state) == "<State '@:test'>"
def test_alone_with_group(self):
state = State("test", group_name="Test")
assert state._state == "test"
assert state._group_name == "Test"
assert state._group is None
with pytest.raises(RuntimeError):
assert state.group == "Test"
assert state.state == "Test:test"
assert str(state) == "<State 'Test:test'>"
@pytest.mark.parametrize(
"state,check,result",
[
[State("test"), "test", False],
[State("test"), "@:test", True],
[State("test"), "test1", False],
[State("test", group_name="test"), "test:test", True],
[State("test", group_name="test"), "test:test2", False],
[State("test", group_name="test"), "test2:test", False],
[State("test", group_name="test"), "test2:test2", False],
],
)
def test_filter(self, state, check, result):
assert state(None, check) is result
def test_state_in_unknown_class(self):
with pytest.raises(RuntimeError):
class MyClass:
state1 = State()
class TestStatesGroup:
def test_empty(self):
class MyGroup(StatesGroup):
pass
assert MyGroup.__states__ == ()
assert MyGroup.__state_names__ == ()
assert MyGroup.__all_childs__ == ()
assert MyGroup.__all_states__ == ()
assert MyGroup.__all_states_names__ == ()
assert MyGroup.__parent__ is None
assert MyGroup.__full_group_name__ == "MyGroup"
assert str(MyGroup) == "<StatesGroup 'MyGroup'>"
def test_with_state(self):
class MyGroup(StatesGroup):
state1 = State()
assert MyGroup.__states__ == (MyGroup.state1,)
assert MyGroup.__state_names__ == ("MyGroup:state1",)
assert MyGroup.__all_childs__ == ()
assert MyGroup.__all_states__ == (MyGroup.state1,)
assert MyGroup.__parent__ is None
assert MyGroup.__full_group_name__ == "MyGroup"
assert str(MyGroup) == "<StatesGroup 'MyGroup'>"
assert MyGroup.state1.state == "MyGroup:state1"
assert MyGroup.state1.group == MyGroup
def test_nested_group(self):
class MyGroup(StatesGroup):
state1 = State()
class MyNestedGroup(StatesGroup):
state1 = State()
assert MyGroup.__states__ == (MyGroup.state1,)
assert MyGroup.__state_names__ == ("MyGroup:state1",)
assert MyGroup.__all_childs__ == (MyGroup.MyNestedGroup,)
assert MyGroup.__all_states__ == (MyGroup.state1, MyGroup.MyNestedGroup.state1)
assert MyGroup.__parent__ is None
assert MyGroup.MyNestedGroup.__parent__ is MyGroup
assert MyGroup.__full_group_name__ == "MyGroup"
assert MyGroup.MyNestedGroup.__full_group_name__ == "MyGroup.MyNestedGroup"
assert str(MyGroup) == "<StatesGroup 'MyGroup'>"
assert str(MyGroup.MyNestedGroup) == "<StatesGroup 'MyGroup.MyNestedGroup'>"
assert MyGroup.state1.state == "MyGroup:state1"
assert MyGroup.state1.group == MyGroup
assert MyGroup.MyNestedGroup.state1.state == "MyGroup.MyNestedGroup:state1"
assert MyGroup.MyNestedGroup.state1.group == MyGroup.MyNestedGroup
assert MyGroup.MyNestedGroup.state1 in MyGroup.MyNestedGroup
assert MyGroup.MyNestedGroup.state1 in MyGroup
assert MyGroup.state1 not in MyGroup.MyNestedGroup
assert MyGroup.state1 in MyGroup
assert MyGroup.MyNestedGroup in MyGroup
assert "MyGroup.MyNestedGroup:state1" in MyGroup
assert "MyGroup.MyNestedGroup:state1" in MyGroup.MyNestedGroup
assert MyGroup.state1 not in MyGroup.MyNestedGroup
assert "test" not in MyGroup
assert 42 not in MyGroup
assert MyGroup.MyNestedGroup.get_root() is MyGroup
def test_iterable(self):
class Group(StatesGroup):
x = State()
y = State()
assert set(Group) == {Group.x, Group.y}
def test_empty_filter(self):
class MyGroup(StatesGroup):
pass
assert str(MyGroup()) == "StatesGroup MyGroup"
def test_with_state_filter(self):
class MyGroup(StatesGroup):
state1 = State()
state2 = State()
assert MyGroup()(None, "MyGroup:state1")
assert MyGroup()(None, "MyGroup:state2")
assert not MyGroup()(None, "MyGroup:state3")
assert str(MyGroup()) == "StatesGroup MyGroup"
def test_nested_group_filter(self):
class MyGroup(StatesGroup):
state1 = State()
class MyNestedGroup(StatesGroup):
state1 = State()
assert MyGroup()(None, "MyGroup:state1")
assert MyGroup()(None, "MyGroup.MyNestedGroup:state1")
assert not MyGroup()(None, "MyGroup:state2")
assert MyGroup.MyNestedGroup()(None, "MyGroup.MyNestedGroup:state1")
assert not MyGroup.MyNestedGroup()(None, "MyGroup:state1")
assert str(MyGroup()) == "StatesGroup MyGroup"
assert str(MyGroup.MyNestedGroup()) == "StatesGroup MyGroup.MyNestedGroup"

View file

@ -1,20 +0,0 @@
import pytest
from aiogram.dispatcher.fsm.strategy import FSMStrategy, apply_strategy
class TestStrategy:
@pytest.mark.parametrize(
"strategy,case,expected",
[
[FSMStrategy.USER_IN_CHAT, (-42, 42), (-42, 42)],
[FSMStrategy.CHAT, (-42, 42), (-42, -42)],
[FSMStrategy.GLOBAL_USER, (-42, 42), (42, 42)],
[FSMStrategy.USER_IN_CHAT, (42, 42), (42, 42)],
[FSMStrategy.CHAT, (42, 42), (42, 42)],
[FSMStrategy.GLOBAL_USER, (42, 42), (42, 42)],
],
)
def test_strategy(self, strategy, case, expected):
chat_id, user_id = case
assert apply_strategy(chat_id=chat_id, user_id=user_id, strategy=strategy) == expected

View file

@ -1,64 +0,0 @@
import asyncio
import datetime
from functools import wraps
from typing import Any
import pytest
from aiogram import Bot
from aiogram.dispatcher.event.handler import HandlerObject
from aiogram.dispatcher.handler.base import BaseHandler
from aiogram.types import Chat, Message, Update
pytestmark = pytest.mark.asyncio
class MyHandler(BaseHandler):
async def handle(self) -> Any:
await asyncio.sleep(0.1)
return 42
class TestBaseClassBasedHandler:
async def test_base_handler(self):
event = Update(update_id=42)
handler = MyHandler(event=event, key=42)
assert handler.event == event
assert handler.data["key"] == 42
assert not hasattr(handler, "filters")
assert await handler == 42
async def test_bot_from_context(self):
event = Update(update_id=42)
handler = MyHandler(event=event, key=42)
bot = Bot("42:TEST")
with pytest.raises(LookupError):
handler.bot
Bot.set_current(bot)
assert handler.bot == bot
async def test_bot_from_data(self):
event = Update(update_id=42)
bot = Bot("42:TEST")
handler = MyHandler(event=event, key=42, bot=bot)
assert "bot" in handler.data
assert handler.bot == bot
def test_update_from_data(self):
event = Message(
message_id=42, chat=Chat(id=42, type="private"), date=datetime.datetime.now()
)
update = Update(update_id=42, message=event)
handler = MyHandler(event=event, update=update)
assert handler.event == event
assert handler.update == update
async def test_wrapped_handler(self):
# wrap the handler on dummy function
handler = wraps(MyHandler)(lambda: None)
assert HandlerObject(handler).awaitable is True

View file

@ -1,28 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import CallbackQueryHandler
from aiogram.types import CallbackQuery, User
pytestmark = pytest.mark.asyncio
class TestCallbackQueryHandler:
async def test_attributes_aliases(self):
event = CallbackQuery(
id="chosen",
from_user=User(id=42, is_bot=False, first_name="Test"),
data="test",
chat_instance="test",
)
class MyHandler(CallbackQueryHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
assert self.callback_data == self.event.data
assert self.message == self.message
return True
assert await MyHandler(event)

View file

@ -1,29 +0,0 @@
import datetime
from typing import Any
import pytest
from aiogram.dispatcher.handler.chat_member import ChatMemberHandler
from aiogram.types import Chat, ChatMemberMember, ChatMemberUpdated, User
pytestmark = pytest.mark.asyncio
class TestChatMemberUpdated:
async def test_attributes_aliases(self):
event = ChatMemberUpdated(
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
date=datetime.datetime.now(),
old_chat_member=ChatMemberMember(user=User(id=42, is_bot=False, first_name="Test")),
new_chat_member=ChatMemberMember(user=User(id=42, is_bot=False, first_name="Test")),
)
class MyHandler(ChatMemberHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
return True
assert await MyHandler(event)

View file

@ -1,26 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import ChosenInlineResultHandler
from aiogram.types import ChosenInlineResult, User
pytestmark = pytest.mark.asyncio
class TestChosenInlineResultHandler:
async def test_attributes_aliases(self):
event = ChosenInlineResult(
result_id="chosen",
from_user=User(id=42, is_bot=False, first_name="Test"),
query="test",
)
class MyHandler(ChosenInlineResultHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
assert self.query == self.event.query
return True
assert await MyHandler(event)

View file

@ -1,21 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import ErrorHandler
pytestmark = pytest.mark.asyncio
class TestErrorHandler:
async def test_extensions(self):
event = KeyError("kaboom")
class MyHandler(ErrorHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.exception_name == event.__class__.__name__
assert self.exception_message == str(event)
return True
assert await MyHandler(event)

View file

@ -1,27 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import InlineQueryHandler
from aiogram.types import InlineQuery, User
pytestmark = pytest.mark.asyncio
class TestCallbackQueryHandler:
async def test_attributes_aliases(self):
event = InlineQuery(
id="query",
from_user=User(id=42, is_bot=False, first_name="Test"),
query="query",
offset="0",
)
class MyHandler(InlineQueryHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
assert self.query == self.event.query
return True
assert await MyHandler(event)

View file

@ -1,65 +0,0 @@
import datetime
from typing import Any
import pytest
from aiogram.dispatcher.filters import CommandObject
from aiogram.dispatcher.handler.message import MessageHandler, MessageHandlerCommandMixin
from aiogram.types import Chat, Message, User
pytestmark = pytest.mark.asyncio
class MyHandler(MessageHandler):
async def handle(self) -> Any:
return self.event.text
class TestClassBasedMessageHandler:
async def test_message_handler(self):
event = Message(
message_id=42,
date=datetime.datetime.now(),
text="test",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
handler = MyHandler(event=event)
assert handler.from_user == event.from_user
assert handler.chat == event.chat
class HandlerWithCommand(MessageHandlerCommandMixin, MessageHandler):
async def handle(self) -> Any:
return self.command
class TestBaseMessageHandlerCommandMixin:
def test_command_accessible(self):
handler = HandlerWithCommand(
Message(
message_id=42,
date=datetime.datetime.now(),
text="/test args",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
),
command=CommandObject(prefix="/", command="command", args="args"),
)
assert isinstance(handler.command, CommandObject)
assert handler.command.command == "command"
def test_command_not_presented(self):
handler = HandlerWithCommand(
Message(
message_id=42,
date=datetime.datetime.now(),
text="test",
chat=Chat(id=42, type="private"),
from_user=User(id=42, is_bot=False, first_name="Test"),
)
)
assert handler.command is None

View file

@ -1,33 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import PollHandler
from aiogram.types import Poll, PollOption
pytestmark = pytest.mark.asyncio
class TestShippingQueryHandler:
async def test_attributes_aliases(self):
event = Poll(
id="query",
question="Q?",
options=[PollOption(text="A1", voter_count=1)],
is_closed=True,
is_anonymous=False,
type="quiz",
allows_multiple_answers=False,
total_voter_count=0,
correct_option_id=0,
)
class MyHandler(PollHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.question == self.event.question
assert self.options == self.event.options
return True
assert await MyHandler(event)

View file

@ -1,27 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import PreCheckoutQueryHandler
from aiogram.types import PreCheckoutQuery, User
pytestmark = pytest.mark.asyncio
class TestPreCheckoutQueryHandler:
async def test_attributes_aliases(self):
event = PreCheckoutQuery(
id="query",
from_user=User(id=42, is_bot=False, first_name="Test"),
currency="BTC",
total_amount=7,
invoice_payload="payload",
)
class MyHandler(PreCheckoutQueryHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
return True
assert await MyHandler(event)

View file

@ -1,33 +0,0 @@
from typing import Any
import pytest
from aiogram.dispatcher.handler import ShippingQueryHandler
from aiogram.types import ShippingAddress, ShippingQuery, User
pytestmark = pytest.mark.asyncio
class TestShippingQueryHandler:
async def test_attributes_aliases(self):
event = ShippingQuery(
id="query",
from_user=User(id=42, is_bot=False, first_name="Test"),
invoice_payload="payload",
shipping_address=ShippingAddress(
country_code="country_code",
state="state",
city="city",
street_line1="street_line1",
street_line2="street_line2",
post_code="post_code",
),
)
class MyHandler(ShippingQueryHandler):
async def handle(self) -> Any:
assert self.event == event
assert self.from_user == self.event.from_user
return True
assert await MyHandler(event)

View file

@ -1,179 +0,0 @@
import asyncio
import time
from asyncio import Event
from dataclasses import dataclass
from typing import Any, Dict
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient
from aiohttp.web_app import Application
from aiogram import Dispatcher, F
from aiogram.dispatcher.webhook.aiohttp_server import (
SimpleRequestHandler,
TokenBasedRequestHandler,
ip_filter_middleware,
setup_application,
)
from aiogram.dispatcher.webhook.security import IPFilter
from aiogram.methods import GetMe, Request
from aiogram.types import Message, User
from tests.mocked_bot import MockedBot
try:
from asynctest import CoroutineMock, patch
except ImportError:
from unittest.mock import AsyncMock as CoroutineMock # type: ignore
from unittest.mock import patch
class TestAiohttpServer:
def test_setup_application(self):
app = Application()
dp = Dispatcher()
setup_application(app, dp)
assert len(app.router.routes()) == 0
assert len(app.on_startup) == 2
assert len(app.on_shutdown) == 1
async def test_middleware(self, aiohttp_client):
app = Application()
ip_filter = IPFilter.default()
app.middlewares.append(ip_filter_middleware(ip_filter))
async def handler(request: Request):
return web.json_response({"ok": True})
app.router.add_route("POST", "/webhook", handler)
client: TestClient = await aiohttp_client(app)
resp = await client.post("/webhook")
assert resp.status == 401
resp = await client.post("/webhook", headers={"X-Forwarded-For": "149.154.167.220"})
assert resp.status == 200
resp = await client.post(
"/webhook", headers={"X-Forwarded-For": "149.154.167.220,10.111.0.2"}
)
assert resp.status == 200
class TestSimpleRequestHandler:
async def make_reqest(self, client: TestClient, text: str = "test"):
return await client.post(
"/webhook",
json={
"update_id": 0,
"message": {
"message_id": 0,
"from": {"id": 42, "first_name": "Test", "is_bot": False},
"chat": {"id": 42, "is_bot": False, "type": "private"},
"date": int(time.time()),
"text": text,
},
},
)
async def test(self, bot: MockedBot, aiohttp_client):
app = Application()
dp = Dispatcher()
handler_event = Event()
@dp.message(F.text == "test")
def handle_message(msg: Message):
handler_event.set()
return msg.answer("PASS")
handler = SimpleRequestHandler(
dispatcher=dp,
bot=bot,
handle_in_background=False,
)
handler.register(app, path="/webhook")
client: TestClient = await aiohttp_client(app)
resp = await self.make_reqest(client=client)
assert resp.status == 200
result = await resp.json()
assert result["method"] == "sendMessage"
resp = await self.make_reqest(client=client, text="spam")
assert resp.status == 200
result = await resp.json()
assert not result
handler.handle_in_background = True
with patch(
"aiogram.dispatcher.dispatcher.Dispatcher.silent_call_request",
new_callable=CoroutineMock,
) as mocked_silent_call_request:
handler_event.clear()
resp = await self.make_reqest(client=client)
assert resp.status == 200
await asyncio.wait_for(handler_event.wait(), timeout=1)
mocked_silent_call_request.assert_awaited()
result = await resp.json()
assert not result
class TestTokenBasedRequestHandler:
async def test_register(self):
dispatcher = Dispatcher()
app = Application()
handler = TokenBasedRequestHandler(dispatcher=dispatcher)
assert len(app.router.routes()) == 0
with pytest.raises(ValueError):
handler.register(app, path="/webhook")
assert len(app.router.routes()) == 0
handler.register(app, path="/webhook/{bot_token}")
assert len(app.router.routes()) == 1
async def test_close(self):
dispatcher = Dispatcher()
handler = TokenBasedRequestHandler(dispatcher=dispatcher)
bot1 = handler.bots["42:TEST"] = MockedBot(token="42:TEST")
bot1.add_result_for(GetMe, ok=True, result=User(id=42, is_bot=True, first_name="Test"))
assert await bot1.get_me()
assert not bot1.session.closed
bot2 = handler.bots["1337:TEST"] = MockedBot(token="1337:TEST")
bot2.add_result_for(GetMe, ok=True, result=User(id=1337, is_bot=True, first_name="Test"))
assert await bot2.get_me()
assert not bot2.session.closed
await handler.close()
assert bot1.session.closed
assert bot2.session.closed
async def test_resolve_bot(self):
dispatcher = Dispatcher()
handler = TokenBasedRequestHandler(dispatcher=dispatcher)
@dataclass
class FakeRequest:
match_info: Dict[str, Any]
bot1 = await handler.resolve_bot(request=FakeRequest(match_info={"bot_token": "42:TEST"}))
assert bot1.id == 42
bot2 = await handler.resolve_bot(
request=FakeRequest(match_info={"bot_token": "1337:TEST"})
)
assert bot2.id == 1337
bot3 = await handler.resolve_bot(
request=FakeRequest(match_info={"bot_token": "1337:TEST"})
)
assert bot3.id == 1337
assert bot2 == bot3
assert len(handler.bots) == 2

View file

@ -1,57 +0,0 @@
from ipaddress import IPv4Address, IPv4Network
import pytest
from aiogram.dispatcher.webhook.security import IPFilter
class TestSecurity:
def test_empty_init(self):
ip_filter = IPFilter()
assert not ip_filter._allowed_ips
@pytest.mark.parametrize(
"ip,result",
[
("127.0.0.1", True),
("127.0.0.2", False),
(IPv4Address("127.0.0.1"), True),
(IPv4Address("127.0.0.2"), False),
(IPv4Address("192.168.0.32"), True),
("192.168.0.33", False),
("10.111.0.5", True),
("10.111.0.100", True),
("10.111.1.100", False),
],
)
def test_check_ip(self, ip, result):
ip_filter = IPFilter(
ips=["127.0.0.1", IPv4Address("192.168.0.32"), IPv4Network("10.111.0.0/24")]
)
assert (ip in ip_filter) is result
def test_default(self):
ip_filter = IPFilter.default()
assert isinstance(ip_filter, IPFilter)
assert len(ip_filter._allowed_ips) == 5116
assert "91.108.4.50" in ip_filter
assert "149.154.160.20" in ip_filter
@pytest.mark.parametrize(
"ip,ip_range",
[
["127.0.0.1", {IPv4Address("127.0.0.1")}],
["91.108.4.0/22", set(IPv4Network("91.108.4.0/22").hosts())],
[IPv4Address("91.108.4.5"), {IPv4Address("91.108.4.5")}],
[IPv4Network("91.108.4.0/22"), set(IPv4Network("91.108.4.0/22").hosts())],
[42, set()],
],
)
def test_allow_ip(self, ip, ip_range):
ip_filter = IPFilter()
if not ip_range:
with pytest.raises(ValueError):
ip_filter.allow_ip(ip)
else:
ip_filter.allow_ip(ip)
assert ip_filter._allowed_ips == ip_range