aiogram/tests/test_fsm/storage/test_isolation.py
Rishat-F 1df3adaba1
Fail redis and mongo tests if incorrect URI provided + some storages tests refactoring (#1510)
* Smaller timeout for MongoStorage connection

By default serverSelectionTimeoutMS=30000. This is too much

* Correct ConnectionError for RedisStorage in tests

* Remove unused import in conftest.py

* Refactor skipping redis and mongo tests

* Fail redis and mongo tests if incorrect URI

If incorrect URIs provided to "--redis" and/or "--mongo" options
tests should fail with ERRORs instead of skipping.
Otherwise the next scenario is possible:
  1) developer breaks RedisStorage and/or MongoStorage code
  2) tests are run with incorrect redis and/or mongo URIs
     provided by "--redis" and "--mongo" options.
     For example, wrong port specified.
  3) tests pass because skipping doesn't fail tests run
  4) developer or reviewer doesn't notice
     that redis and/or mongo tests were skipped
  5) broken code gets in codebase

* Remove unused fixtures passing in storages tests

* Define create_storage_key fixture in conftest.py

* Linters formatting

* Changes description

* Revert "Smaller timeout for MongoStorage connection"

This reverts commit d88b7ec612.

* Smaller timeout for MongoStorage connection in tests

The default 30s timeout is too long

* Add test for MongoStorage for 100% coverage

* Linters formatting

* Move skipping redis/mongo tests in earlier fixtures

* Replace vars with constants in conftest.py

* Linters formatting
2024-06-17 00:55:59 +03:00

65 lines
2.2 KiB
Python

from unittest import mock
from unittest.mock import AsyncMock, patch
import pytest
from aiogram.fsm.storage.base import BaseEventIsolation, StorageKey
from aiogram.fsm.storage.redis import RedisEventIsolation, RedisStorage
@pytest.mark.parametrize(
"isolation",
[
pytest.lazy_fixture("redis_isolation"),
pytest.lazy_fixture("lock_isolation"),
pytest.lazy_fixture("disabled_isolation"),
],
)
class TestIsolations:
async def test_lock(
self,
isolation: BaseEventIsolation,
storage_key: StorageKey,
):
async with isolation.lock(key=storage_key):
assert True, "Are you kidding me?"
class TestRedisEventIsolation:
def test_create_isolation(self):
fake_redis = object()
storage = RedisStorage(redis=fake_redis)
isolation = storage.create_isolation()
assert isinstance(isolation, RedisEventIsolation)
assert isolation.redis is fake_redis
assert isolation.key_builder is storage.key_builder
def test_init_without_key_builder(self):
redis = AsyncMock()
isolation = RedisEventIsolation(redis=redis)
assert isolation.redis is redis
assert isolation.key_builder is not None
def test_init_with_key_builder(self):
redis = AsyncMock()
key_builder = AsyncMock()
isolation = RedisEventIsolation(redis=redis, key_builder=key_builder)
assert isolation.redis is redis
assert isolation.key_builder is key_builder
def test_create_from_url(self):
with patch("redis.asyncio.connection.ConnectionPool.from_url") as pool:
isolation = RedisEventIsolation.from_url("redis://localhost:6379/0")
assert isinstance(isolation, RedisEventIsolation)
assert isolation.redis is not None
assert isolation.key_builder is not None
pool.assert_called_once_with("redis://localhost:6379/0")
async def test_close(self):
isolation = RedisEventIsolation(redis=AsyncMock())
await isolation.close()
# close is not called because connection should be closed from the storage
# assert isolation.redis.close.called_once()