mirror of
https://github.com/aiogram/aiogram.git
synced 2025-12-12 10:11:52 +00:00
* Update API, added some new features * Fixed unknown chat_action value * Separate events from dispatcher messages * Disabled cache for I18n LazyProxy * Rework events isolation * Added chat member status changed filter, update Bot API 5.7, other small changes * Improve exceptions in chat member status filter * Fixed tests, covered flags and events isolation modules * Try to fix flake8 unused type ignore * Fixed linter error * Cover chat member updated filter * Cover chat action sender * Added docs for chat action util * Try to fix tests for python <= 3.9 * Fixed headers * Added docs for flags functionality * Added docs for chat_member_updated filter * Added change notes * Update dependencies and fix mypy checks * Bump version
57 lines
1.9 KiB
Python
57 lines
1.9 KiB
Python
import pytest
|
|
|
|
from aiogram.dispatcher.fsm.storage.base import DEFAULT_DESTINY, StorageKey
|
|
from aiogram.dispatcher.fsm.storage.redis import (
|
|
DefaultKeyBuilder,
|
|
RedisEventIsolation,
|
|
RedisStorage,
|
|
)
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
PREFIX = "test"
|
|
BOT_ID = 42
|
|
CHAT_ID = -1
|
|
USER_ID = 2
|
|
FIELD = "data"
|
|
|
|
|
|
class TestRedisDefaultKeyBuilder:
|
|
@pytest.mark.parametrize(
|
|
"with_bot_id,with_destiny,result",
|
|
[
|
|
[False, False, f"{PREFIX}:{CHAT_ID}:{USER_ID}:{FIELD}"],
|
|
[True, False, f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}:{FIELD}"],
|
|
[True, True, f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}"],
|
|
[False, True, f"{PREFIX}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}"],
|
|
],
|
|
)
|
|
async def test_generate_key(self, with_bot_id: bool, with_destiny: bool, result: str):
|
|
key_builder = DefaultKeyBuilder(
|
|
prefix=PREFIX,
|
|
with_bot_id=with_bot_id,
|
|
with_destiny=with_destiny,
|
|
)
|
|
key = StorageKey(chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID, destiny=DEFAULT_DESTINY)
|
|
assert key_builder.build(key, FIELD) == result
|
|
|
|
async def test_destiny_check(self):
|
|
key_builder = DefaultKeyBuilder(
|
|
with_destiny=False,
|
|
)
|
|
key = StorageKey(chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID)
|
|
assert key_builder.build(key, FIELD)
|
|
|
|
key = StorageKey(
|
|
chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID, destiny="CUSTOM_TEST_DESTINY"
|
|
)
|
|
with pytest.raises(ValueError):
|
|
key_builder.build(key, FIELD)
|
|
|
|
def test_create_isolation(self):
|
|
fake_redis = object()
|
|
storage = RedisStorage(redis=fake_redis)
|
|
isolation = storage.create_isolation()
|
|
assert isinstance(isolation, RedisEventIsolation)
|
|
assert isolation.redis is fake_redis
|
|
assert isolation.key_builder is storage.key_builder
|