refactor: remove MediaGroupFilter and fix rst heading

This commit is contained in:
Vitaly312 2026-04-05 18:15:59 +03:00
parent 24fdd285fd
commit 28626d124a
4 changed files with 6 additions and 147 deletions

View file

@ -18,7 +18,6 @@ from .command import Command, CommandObject, CommandStart
from .exception import ExceptionMessageFilter, ExceptionTypeFilter
from .logic import and_f, invert_f, or_f
from .magic_data import MagicData
from .media_group import MediaGroupFilter
from .state import StateFilter
BaseFilter = Filter
@ -45,7 +44,6 @@ __all__ = (
"ExceptionTypeFilter",
"Filter",
"MagicData",
"MediaGroupFilter",
"StateFilter",
"and_f",
"invert_f",

View file

@ -1,64 +0,0 @@
from typing import Any, Literal
from aiogram.filters.base import Filter
from aiogram.types import Message
MIN_MEDIA_COUNT = 2
DEFAULT_MAX_MEDIA_COUNT = 10
class MediaGroupFilter(Filter):
"""
This filter helps to handle media groups.
Works only with :class:`aiogram.types.message.Message` events which have the :code:`album`
in the handler context.
"""
__slots__ = ("min_media_count", "max_media_count")
def __init__(
self,
count: int | None = None,
min_media_count: int | None = None,
max_media_count: int | None = None,
):
"""
:param count: expected count of media in the group.
:param min_media_count: min count of media in the group, inclusively
:param max_media_count: max count of media in the group, inclusively
"""
if count is None:
if min_media_count is None:
min_media_count = MIN_MEDIA_COUNT
if max_media_count is None:
max_media_count = max(DEFAULT_MAX_MEDIA_COUNT, min_media_count)
else:
if min_media_count is not None or max_media_count is not None:
raise ValueError(
"count and min_media_count or max_media_count can not be used together"
)
if count < MIN_MEDIA_COUNT:
raise ValueError(f"count should be greater or equal to {MIN_MEDIA_COUNT}")
min_media_count = max_media_count = count
if min_media_count < MIN_MEDIA_COUNT:
raise ValueError(f"min_media_count should be greater or equal to {MIN_MEDIA_COUNT}")
if max_media_count < min_media_count:
raise ValueError("max_media_count should be greater or equal to min_media_count")
self.min_media_count = min_media_count
self.max_media_count = max_media_count
def __str__(self) -> str:
if self.min_media_count == self.max_media_count:
return self._signature_to_string(count=self.min_media_count)
return self._signature_to_string(
min_media_count=self.min_media_count, max_media_count=self.max_media_count
)
async def __call__(
self, message: Message, album: list[Message] | None = None
) -> Literal[False] | dict[str, Any]:
media_count = len(album or [])
if not (self.min_media_count <= media_count <= self.max_media_count):
return False
return {"media_count": media_count}

View file

@ -1,6 +1,6 @@
===================
===========
Media group
===================
===========
This module provides tools for media groups.
@ -45,7 +45,7 @@ it will be used as ``caption`` for first media in group.
Handling media groups
======================
=====================
By default each media in the group is processed separately.
@ -56,7 +56,7 @@ other messages with the same media group ID will be suppressed. There are two op
- :class:`aiogram.dispatcher.middlewares.media_group.MemoryMediaGroupAggregator` - simple in-memory storage, used by default
- :class:`aiogram.dispatcher.middlewares.media_group.RedisMediaGroupAggregator` - support distributed environment
You also can use :class:`aiogram.filters.media_group.MediaGroupFilter`
You also can use :class:`aiogram.filters.magic_data.MagicData` with ``F.album``
to filter media groups.
Usage
@ -69,13 +69,13 @@ Usage
# register middleware
from aiogram.dispatcher.middlewares.media_group import MediaGroupAggregatorMiddleware
from aiogram.filters import MediaGroupFilter
from aiogram.filters import MagicData
router.message.outer_middleware(MediaGroupAggregatorMiddleware())
# use middleware
@router.message(
MediaGroupFilter(max_media_count=5),
MagicData(F.album.len() <= 5),
F.caption == "album_caption" # other filters will be applied to the first message in the group
)
async def start(message: Message, album: list[Message]):
@ -94,8 +94,6 @@ References
:members:
.. autoclass:: aiogram.dispatcher.middlewares.media_group.MediaGroupAggregatorMiddleware
:members:
.. autoclass:: aiogram.filters.media_group.MediaGroupFilter
:members:
.. autoclass:: aiogram.dispatcher.middlewares.media_group.MemoryMediaGroupAggregator
:members:
.. autoclass:: aiogram.dispatcher.middlewares.media_group.RedisMediaGroupAggregator

View file

@ -1,73 +0,0 @@
import datetime
import pytest
from aiogram.filters.media_group import DEFAULT_MAX_MEDIA_COUNT, MIN_MEDIA_COUNT, MediaGroupFilter
from aiogram.types import Chat, Message
class TestMediaGroupFilter:
@pytest.mark.parametrize(
"args,min_count,max_count",
[
((), MIN_MEDIA_COUNT, DEFAULT_MAX_MEDIA_COUNT),
((3,), 3, 3),
((11,), 11, 11),
((None, 11, None), 11, 11),
((None, 3), 3, DEFAULT_MAX_MEDIA_COUNT),
((None, None, 3), MIN_MEDIA_COUNT, 3),
],
)
def test_init_range(self, args, min_count, max_count):
filter = MediaGroupFilter(*args)
assert filter.max_media_count == max_count
assert filter.min_media_count == min_count
@pytest.mark.parametrize(
"count,min_count,max_count",
[
(1, None, 1),
(1, 1, None),
(None, 1, None),
(None, None, 1),
(1, None, None),
(None, 5, 3),
],
)
def test_raise_error(self, count, min_count, max_count):
with pytest.raises(ValueError):
MediaGroupFilter(count, min_count, max_count)
@pytest.mark.parametrize(
"min_count,max_count,media_count,result",
[
[2, 2, 1, False],
[2, 2, 2, True],
[2, 2, 3, False],
[2, 5, 2, True],
[2, 5, 5, True],
[2, 5, 6, False],
],
)
async def test_call(self, min_count, max_count, media_count, result):
filter = MediaGroupFilter(min_media_count=min_count, max_media_count=max_count)
album = [
Message(
message_id=i,
date=datetime.datetime.now(),
chat=Chat(id=42, type="private"),
)
for i in range(media_count)
]
response = await filter(album[0], album)
assert bool(response) is result
if result:
assert response.get("media_count") == media_count
def test_str_count(self):
filter = MediaGroupFilter(5)
assert str(filter) == "MediaGroupFilter(count=5)"
def test_str_range(self):
filter = MediaGroupFilter(min_media_count=2, max_media_count=5)
assert str(filter) == "MediaGroupFilter(min_media_count=2, max_media_count=5)"