mirror of
https://github.com/aiogram/aiogram.git
synced 2025-12-08 17:13:56 +00:00
Full coverage of observers
This commit is contained in:
parent
a1e840af89
commit
23dbd88487
2 changed files with 77 additions and 16 deletions
|
|
@ -37,6 +37,7 @@ class EventObserver:
|
|||
callback=callback, filters=[FilterObject(filter_) for filter_ in filters]
|
||||
)
|
||||
)
|
||||
return callback
|
||||
|
||||
async def trigger(self, *args, **kwargs):
|
||||
for handler in self.handlers:
|
||||
|
|
|
|||
|
|
@ -21,6 +21,21 @@ async def pipe_handler(*args, **kwargs):
|
|||
return args, kwargs
|
||||
|
||||
|
||||
class MyFilter1(BaseFilter):
|
||||
test: str
|
||||
|
||||
async def __call__(self, *args: Any, **kwargs: Any) -> Union[bool, Dict[str, Any]]:
|
||||
return True
|
||||
|
||||
|
||||
class MyFilter2(MyFilter1):
|
||||
pass
|
||||
|
||||
|
||||
class MyFilter3(MyFilter1):
|
||||
pass
|
||||
|
||||
|
||||
class TestEventObserver:
|
||||
@pytest.mark.parametrize(
|
||||
"count,handler,filters",
|
||||
|
|
@ -133,18 +148,6 @@ class TestTelegramEventObserver:
|
|||
router1.include_router(router2)
|
||||
router2.include_router(router3)
|
||||
|
||||
class MyFilter1(BaseFilter):
|
||||
test: str
|
||||
|
||||
async def __call__(self, *args: Any, **kwargs: Any) -> Union[bool, Dict[str, Any]]:
|
||||
return True
|
||||
|
||||
class MyFilter2(MyFilter1):
|
||||
pass
|
||||
|
||||
class MyFilter3(MyFilter1):
|
||||
pass
|
||||
|
||||
router1.message_handler.bind_filter(MyFilter1)
|
||||
router1.message_handler.bind_filter(MyFilter2)
|
||||
router2.message_handler.bind_filter(MyFilter2)
|
||||
|
|
@ -159,14 +162,71 @@ class TestTelegramEventObserver:
|
|||
assert filters_chain3 == [MyFilter3, MyFilter2, MyFilter1]
|
||||
|
||||
def test_resolve_filters(self):
|
||||
pass
|
||||
router = Router()
|
||||
observer = router.message_handler
|
||||
observer.bind_filter(MyFilter1)
|
||||
|
||||
resolved = observer.resolve_filters({"test": "PASS"})
|
||||
assert isinstance(resolved, list)
|
||||
assert len(resolved) == 1
|
||||
assert isinstance(resolved[0], MyFilter1)
|
||||
assert resolved[0].test == "PASS"
|
||||
|
||||
# Unknown filter
|
||||
with pytest.raises(ValueError, match="Unknown filters: {'@bad'}"):
|
||||
assert observer.resolve_filters({"@bad": "very"})
|
||||
|
||||
# Unknown filter
|
||||
with pytest.raises(ValueError, match="Unknown filters: {'@bad'}"):
|
||||
assert observer.resolve_filters({"test": "ok", "@bad": "very"})
|
||||
|
||||
# Bad argument type
|
||||
with pytest.raises(ValueError, match="Unknown filters: {'test'}"):
|
||||
assert observer.resolve_filters({"test": ...})
|
||||
|
||||
def test_register(self):
|
||||
pass
|
||||
router = Router()
|
||||
observer = router.message_handler
|
||||
observer.bind_filter(MyFilter1)
|
||||
|
||||
assert observer.register(my_handler) == my_handler
|
||||
assert isinstance(observer.handlers[0], HandlerObject)
|
||||
assert not observer.handlers[0].filters
|
||||
|
||||
f = MyFilter1(test="ok")
|
||||
observer.register(my_handler, f)
|
||||
assert isinstance(observer.handlers[1], HandlerObject)
|
||||
assert len(observer.handlers[1].filters) == 1
|
||||
assert observer.handlers[1].filters[0].callback == f
|
||||
|
||||
observer.register(my_handler, test="PASS")
|
||||
assert isinstance(observer.handlers[2], HandlerObject)
|
||||
assert len(observer.handlers[2].filters) == 1
|
||||
assert observer.handlers[2].filters[0].callback == MyFilter1(test="PASS")
|
||||
|
||||
observer.register(my_handler, f, test="PASS")
|
||||
assert isinstance(observer.handlers[3], HandlerObject)
|
||||
assert len(observer.handlers[3].filters) == 2
|
||||
assert observer.handlers[3].filters[0].callback == f
|
||||
assert observer.handlers[3].filters[1].callback == MyFilter1(test="PASS")
|
||||
|
||||
def test_register_decorator(self):
|
||||
pass
|
||||
router = Router()
|
||||
observer = router.message_handler
|
||||
|
||||
@observer()
|
||||
async def my_handler(event: Any):
|
||||
pass
|
||||
|
||||
assert len(observer.handlers) == 1
|
||||
assert observer.handlers[0].callback == my_handler
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trigger(self):
|
||||
pass
|
||||
router = Router()
|
||||
observer = router.message_handler
|
||||
observer.bind_filter(MyFilter1)
|
||||
observer.register(my_handler, test="ok")
|
||||
|
||||
results = [result async for result in observer.trigger(42)]
|
||||
assert results == [42]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue