aiogram/tests/test_fsm/storage/test_key_builder.py
Rishat-F 1ef7655fd7
Added MongoStorage for FSM (#1434)
* Mongo storage included to storages test

* Added few additional checks in storages test

* Added MongoStorage for FSM

* Added changes description

* Fixed error message syntax

Co-authored-by: Alex Root Junior <jroot.junior@gmail.com>

* Resolved mypy check error

* IF/ELSE statement simplified

* Fix ruff linter error: RET505 Unnecessary `elif` after `return` statement

* Fix ruff linter error: E501 Line too long (100 > 99)

* Added mongo storage testing in CI

* Refactoring while review

* Refactoring while review

* Storing FSM state and data together in MongoDB-storage

* Fix CI - MongoDB container action is only supported on Linux

* Refactoring while review

* Enable Macos in pypy-tests section of CI

* Refactoring while review

* Makefile updated

* redis and mongo storages tests do not run in pypy-tests job of CI

* Fix docstring of DefaultKeyBuilder

---------

Co-authored-by: Alex Root Junior <jroot.junior@gmail.com>
2024-05-07 22:42:31 +03:00

106 lines
3.4 KiB
Python

from typing import Literal, Optional
import pytest
from aiogram.fsm.storage.base import DEFAULT_DESTINY, DefaultKeyBuilder, StorageKey
PREFIX = "test"
BOT_ID = 42
CHAT_ID = -1
USER_ID = 2
THREAD_ID = 3
BUSINESS_CONNECTION_ID = "4"
FIELD = "data"
class TestDefaultKeyBuilder:
@pytest.mark.parametrize(
"key_builder,field,result",
[
[
DefaultKeyBuilder(
prefix=PREFIX,
with_bot_id=True,
with_destiny=True,
with_business_connection_id=True,
),
FIELD,
f"{PREFIX}:{BOT_ID}:{BUSINESS_CONNECTION_ID}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}",
],
[
DefaultKeyBuilder(prefix=PREFIX, with_bot_id=True, with_destiny=True),
None,
f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}",
],
[
DefaultKeyBuilder(
prefix=PREFIX, with_bot_id=True, with_business_connection_id=True
),
FIELD,
f"{PREFIX}:{BOT_ID}:{BUSINESS_CONNECTION_ID}:{CHAT_ID}:{USER_ID}:{FIELD}",
],
[
DefaultKeyBuilder(prefix=PREFIX, with_bot_id=True),
None,
f"{PREFIX}:{BOT_ID}:{CHAT_ID}:{USER_ID}",
],
[
DefaultKeyBuilder(
prefix=PREFIX, with_destiny=True, with_business_connection_id=True
),
FIELD,
f"{PREFIX}:{BUSINESS_CONNECTION_ID}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}:{FIELD}",
],
[
DefaultKeyBuilder(prefix=PREFIX, with_destiny=True),
None,
f"{PREFIX}:{CHAT_ID}:{USER_ID}:{DEFAULT_DESTINY}",
],
[
DefaultKeyBuilder(prefix=PREFIX, with_business_connection_id=True),
FIELD,
f"{PREFIX}:{BUSINESS_CONNECTION_ID}:{CHAT_ID}:{USER_ID}:{FIELD}",
],
[DefaultKeyBuilder(prefix=PREFIX), None, f"{PREFIX}:{CHAT_ID}:{USER_ID}"],
],
)
async def test_generate_key(
self,
key_builder: DefaultKeyBuilder,
field: Optional[Literal["data", "state", "lock"]],
result: str,
):
key = StorageKey(
chat_id=CHAT_ID,
user_id=USER_ID,
bot_id=BOT_ID,
business_connection_id=BUSINESS_CONNECTION_ID,
destiny=DEFAULT_DESTINY,
)
assert key_builder.build(key, field) == result
async def test_destiny_check(self):
key_builder = DefaultKeyBuilder(
with_destiny=False,
)
key = StorageKey(chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID)
assert key_builder.build(key, FIELD)
key = StorageKey(
chat_id=CHAT_ID, user_id=USER_ID, bot_id=BOT_ID, destiny="CUSTOM_TEST_DESTINY"
)
with pytest.raises(ValueError):
key_builder.build(key, FIELD)
def test_thread_id(self):
key_builder = DefaultKeyBuilder(
prefix=PREFIX,
)
key = StorageKey(
chat_id=CHAT_ID,
user_id=USER_ID,
bot_id=BOT_ID,
thread_id=THREAD_ID,
destiny=DEFAULT_DESTINY,
)
assert key_builder.build(key, FIELD) == f"{PREFIX}:{CHAT_ID}:{THREAD_ID}:{USER_ID}:{FIELD}"