mirror of
https://github.com/aiogram/aiogram.git
synced 2025-12-08 17:13:56 +00:00
Add commands filter and mixin for handlers
This commit is contained in:
parent
ea7b20e114
commit
6cb1f96ded
5 changed files with 157 additions and 6 deletions
|
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
|||
|
||||
from typing import Any, Optional, TypeVar
|
||||
|
||||
from ...utils.mixins import ContextInstanceMixin
|
||||
from ...utils.mixins import ContextInstanceMixin, DataMixin
|
||||
from ...utils.token import extract_bot_id, validate_token
|
||||
from ..methods import TelegramMethod
|
||||
from .session.aiohttp import AiohttpSession
|
||||
|
|
@ -11,7 +11,7 @@ from .session.base import BaseSession
|
|||
T = TypeVar("T")
|
||||
|
||||
|
||||
class BaseBot(ContextInstanceMixin):
|
||||
class BaseBot(ContextInstanceMixin, DataMixin):
|
||||
def __init__(self, token: str, session: BaseSession = None, parse_mode: Optional[str] = None):
|
||||
validate_token(token)
|
||||
|
||||
|
|
|
|||
|
|
@ -103,6 +103,11 @@ class Bot(BaseBot):
|
|||
Class where located all API methods
|
||||
"""
|
||||
|
||||
async def me(self) -> User:
|
||||
if self not in self:
|
||||
self[self] = await self.get_me()
|
||||
return self[self]
|
||||
|
||||
# =============================================================================================
|
||||
# Group: Getting updates
|
||||
# Source: https://core.telegram.org/bots/api#getting-updates
|
||||
|
|
|
|||
|
|
@ -1,14 +1,15 @@
|
|||
from typing import Dict, Tuple, Union
|
||||
|
||||
from .base import BaseFilter
|
||||
from .command import Command, CommandObject
|
||||
from .text import Text
|
||||
|
||||
__all__ = ("BUILTIN_FILTERS", "BaseFilter", "Text")
|
||||
__all__ = ("BUILTIN_FILTERS", "BaseFilter", "Text", "Command", "CommandObject")
|
||||
|
||||
BUILTIN_FILTERS: Dict[str, Union[Tuple[BaseFilter], Tuple]] = {
|
||||
"update": (),
|
||||
"message": (Text,),
|
||||
"edited_message": (Text,),
|
||||
"message": (Text, Command),
|
||||
"edited_message": (Text, Command),
|
||||
"channel_post": (Text,),
|
||||
"edited_channel_post": (Text,),
|
||||
"inline_query": (Text,),
|
||||
|
|
|
|||
135
aiogram/dispatcher/filters/command.py
Normal file
135
aiogram/dispatcher/filters/command.py
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, AnyStr, Dict, List, Match, Optional, Pattern, Union
|
||||
|
||||
from pydantic import root_validator
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.api.types import Message
|
||||
from aiogram.dispatcher.filters import BaseFilter
|
||||
|
||||
CommandPatterType = Union[str, re.Pattern]
|
||||
|
||||
|
||||
class Command(BaseFilter):
|
||||
commands: List[CommandPatterType]
|
||||
commands_prefix: str = "/"
|
||||
commands_ignore_case: bool = False
|
||||
commands_ignore_mention: bool = False
|
||||
|
||||
@root_validator
|
||||
def validate_constraints(cls, values: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if "commands" not in values:
|
||||
raise ValueError("Commands required")
|
||||
if not isinstance(values["commands"], list):
|
||||
values["commands"] = [values["commands"]]
|
||||
return values
|
||||
|
||||
async def __call__(self, message: Message, bot: Bot) -> Union[bool, Dict[str, Any]]:
|
||||
if not message.text:
|
||||
return False
|
||||
|
||||
return await self.parse_command(text=message.text, bot=bot)
|
||||
|
||||
async def parse_command(self, text: str, bot: Bot) -> Union[bool, Dict[str, CommandObject]]:
|
||||
"""
|
||||
Extract command from the text and validate
|
||||
|
||||
:param text:
|
||||
:param bot:
|
||||
:return:
|
||||
"""
|
||||
# First step: separate command with arguments
|
||||
# "/command@mention arg1 arg2" -> "/command@mention", ["arg1 arg2"]
|
||||
full_command, *args = text.split(maxsplit=1)
|
||||
|
||||
# Separate command into valuable parts
|
||||
# "/command@mention" -> "/", ("command", "@", "mention")
|
||||
prefix, (command, _, mention) = full_command[0], full_command[1:].partition("@")
|
||||
|
||||
# Validate prefixes
|
||||
if prefix not in self.commands_prefix:
|
||||
return False
|
||||
|
||||
# Validate mention
|
||||
if (
|
||||
mention
|
||||
and not self.commands_ignore_mention
|
||||
and mention.lower() != (await bot.me()).username.lower()
|
||||
):
|
||||
return False
|
||||
|
||||
# Validate command
|
||||
for allowed_command in self.commands:
|
||||
# Command can be presented as regexp pattern or raw string
|
||||
# then need to validate that in different ways
|
||||
if isinstance(allowed_command, Pattern): # Regexp
|
||||
result = allowed_command.match(command)
|
||||
if result:
|
||||
return {
|
||||
"command": CommandObject(
|
||||
prefix=prefix,
|
||||
command=command,
|
||||
mention=mention,
|
||||
args=args[0] if args else None,
|
||||
match=result,
|
||||
)
|
||||
}
|
||||
|
||||
elif command == allowed_command: # String
|
||||
return {
|
||||
"command": CommandObject(
|
||||
prefix=prefix,
|
||||
command=command,
|
||||
mention=mention,
|
||||
args=args[0] if args else None,
|
||||
match=None,
|
||||
)
|
||||
}
|
||||
|
||||
return False
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandObject:
|
||||
"""
|
||||
Instance of this object is always has command and it prefix.
|
||||
Can be passed as keyword argument ``command`` to the handler
|
||||
"""
|
||||
|
||||
prefix: str = "/"
|
||||
"""Command prefix"""
|
||||
command: str = ""
|
||||
"""Command without prefix and mention"""
|
||||
mention: str = None
|
||||
"""Mention (if available)"""
|
||||
args: str = field(repr=False, default=None)
|
||||
"""Command argument"""
|
||||
match: Optional[Match[AnyStr]] = None
|
||||
"""Will be presented match result if the command is presented as regexp in filter"""
|
||||
|
||||
@property
|
||||
def mentioned(self) -> bool:
|
||||
"""
|
||||
This command has mention?
|
||||
:return:
|
||||
"""
|
||||
return bool(self.mention)
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
"""
|
||||
Generate original text from object
|
||||
:return:
|
||||
"""
|
||||
line = self.prefix + self.command
|
||||
if self.mentioned:
|
||||
line += "@" + self.mention
|
||||
if self.args:
|
||||
line += " " + self.args
|
||||
return line
|
||||
|
|
@ -1,7 +1,9 @@
|
|||
from abc import ABC
|
||||
from typing import Optional
|
||||
|
||||
from aiogram.api.types import Message
|
||||
from aiogram.dispatcher.handler.base import BaseHandler
|
||||
from aiogram.dispatcher.filters import CommandObject
|
||||
from aiogram.dispatcher.handler.base import BaseHandler, BaseHandlerMixin
|
||||
|
||||
|
||||
class MessageHandler(BaseHandler, ABC):
|
||||
|
|
@ -14,3 +16,11 @@ class MessageHandler(BaseHandler, ABC):
|
|||
@property
|
||||
def chat(self):
|
||||
return self.event.chat
|
||||
|
||||
|
||||
class MessageHandlerCommandMixin(BaseHandlerMixin):
|
||||
@property
|
||||
def command(self) -> Optional[CommandObject]:
|
||||
if "command" in self.data:
|
||||
return self.command["data"]
|
||||
return None
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue