Added AdminFilter and example of its usage

This commit is contained in:
Arslan 'Ars2014' Sakhapov 2019-08-08 19:12:49 +05:00
parent 8823c8f22a
commit bd032f066e
4 changed files with 76 additions and 2 deletions

View file

@ -9,7 +9,7 @@ import aiohttp
from aiohttp.helpers import sentinel
from .filters import Command, ContentTypeFilter, ExceptionsFilter, FiltersFactory, HashTag, Regexp, \
RegexpCommandsFilter, StateFilter, Text, IdFilter
RegexpCommandsFilter, StateFilter, Text, IdFilter, AdminFilter
from .handler import Handler
from .middlewares import MiddlewareManager
from .storage import BaseStorage, DELTA, DisabledStorage, EXCEEDED_COUNT, FSMContext, \
@ -119,6 +119,11 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
self.channel_post_handlers, self.edited_channel_post_handlers,
self.callback_query_handlers, self.inline_query_handlers
])
filters_factory.bind(AdminFilter, event_handlers=[
self.message_handlers, self.edited_message_handlers,
self.channel_post_handlers, self.edited_channel_post_handlers,
self.callback_query_handlers, self.inline_query_handlers
])
def __del__(self):
self.stop_polling()

View file

@ -1,5 +1,5 @@
from .builtin import Command, CommandHelp, CommandPrivacy, CommandSettings, CommandStart, ContentTypeFilter, \
ExceptionsFilter, HashTag, Regexp, RegexpCommandsFilter, StateFilter, Text, IdFilter
ExceptionsFilter, HashTag, Regexp, RegexpCommandsFilter, StateFilter, Text, IdFilter, AdminFilter
from .factory import FiltersFactory
from .filters import AbstractFilter, BoundFilter, Filter, FilterNotPassed, FilterRecord, execute_filter, \
check_filters, get_filter_spec, get_filters_spec
@ -24,6 +24,7 @@ __all__ = [
'StateFilter',
'Text',
'IdFilter',
'AdminFilter',
'get_filter_spec',
'get_filters_spec',
'execute_filter',

View file

@ -562,3 +562,43 @@ class IdFilter(Filter):
return chat_id in self.chat_id
return False
class AdminFilter(Filter):
"""
Checks if user is admin in a chat.
If chat_id is not set, the filter will check in the current chat (correct only for messages).
chat_id is required for InlineQuery.
"""
def __init__(self, admin_chat_id: typing.Optional[int] = None, admin_current_chat: typing.Optional[bool] = False):
self.chat_id = admin_chat_id
self.check_current_chat = admin_current_chat
@classmethod
def validate(cls, full_config: typing.Dict[str, typing.Any]) -> typing.Optional[typing.Dict[str, typing.Any]]:
result = {}
if "admin_chat_id" in full_config: # use prefix 'admin' to not conflict with IdFilter
result["admin_chat_id"] = full_config.pop("admin_chat_id")
if "admin_current_chat" in full_config: # set True if need to check current chat
result["admin_current_chat"] = full_config.pop("admin_current_chat")
return result
async def check(self, obj: Union[Message, CallbackQuery, InlineQuery]) -> bool:
user_id = obj.from_user.id
chat_id = self.chat_id
if not chat_id or self.check_current_chat:
if isinstance(obj, Message):
chat_id = obj.chat.id
elif isinstance(obj, CallbackQuery):
if obj.message:
chat_id = obj.message.chat.id
else:
raise ValueError("Cannot get current chat in a InlineQuery")
admins = [member.user.id for member in await obj.bot.get_chat_administrators(chat_id)]
return user_id in admins

View file

@ -0,0 +1,28 @@
import logging
from aiogram import Bot, Dispatcher, types, executor
API_TOKEN = 'API TOKEN HERE'
logging.basicConfig(level=logging.DEBUG)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot=bot)
chat_id = -1001241113577
# checks specified chat
@dp.message_handler(admin_chat_id=chat_id)
async def handler(msg: types.Message):
await msg.reply(f"You are an admin of the chat '{chat_id}'", reply=False)
# checks current chat
@dp.message_handler(admin_current_chat=True)
async def handler(msg: types.Message):
await msg.reply("You are an admin of the current chat!", reply=False)
if __name__ == '__main__':
executor.start_polling(dp)