Merge branch 'dev-2.x'

# Conflicts:
#	aiogram/__init__.py
This commit is contained in:
Alex Root Junior 2018-12-31 16:26:20 +02:00
commit fc717902c6
14 changed files with 461 additions and 45 deletions

View file

@ -38,5 +38,5 @@ __all__ = [
'utils' 'utils'
] ]
__version__ = '2.0' __version__ = '2.0.1'
__api_version__ = '4.1' __api_version__ = '4.1'

View file

@ -50,8 +50,6 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
self.throttling_rate_limit = throttling_rate_limit self.throttling_rate_limit = throttling_rate_limit
self.no_throttle_error = no_throttle_error self.no_throttle_error = no_throttle_error
self.last_update_id = 0
self.filters_factory: FiltersFactory = filters_factory self.filters_factory: FiltersFactory = filters_factory
self.updates_handler = Handler(self, middleware_key='update') self.updates_handler = Handler(self, middleware_key='update')
self.message_handlers = Handler(self, middleware_key='message') self.message_handlers = Handler(self, middleware_key='message')
@ -120,17 +118,9 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
You can skip old incoming updates from queue. You can skip old incoming updates from queue.
This method is not recommended to use if you use payments or you bot has high-load. This method is not recommended to use if you use payments or you bot has high-load.
:return: count of skipped updates :return: None
""" """
total = 0 await self.bot.get_updates(offset=-1, timeout=1)
updates = await self.bot.get_updates(offset=self.last_update_id, timeout=1)
while updates:
total += len(updates)
for update in updates:
if update.update_id > self.last_update_id:
self.last_update_id = update.update_id
updates = await self.bot.get_updates(offset=self.last_update_id + 1, timeout=1)
return total
async def process_updates(self, updates, fast: typing.Optional[bool] = True): async def process_updates(self, updates, fast: typing.Optional[bool] = True):
""" """
@ -158,7 +148,6 @@ class Dispatcher(DataMixin, ContextInstanceMixin):
:param update: :param update:
:return: :return:
""" """
self.last_update_id = update.update_id
types.Update.set_current(update) types.Update.set_current(update)
try: try:

View file

@ -393,11 +393,10 @@ class ExceptionsFilter(BoundFilter):
key = 'exception' key = 'exception'
def __init__(self, dispatcher, exception): def __init__(self, exception):
super().__init__(dispatcher)
self.exception = exception self.exception = exception
async def check(self, dispatcher, update, exception): async def check(self, update, exception):
try: try:
raise exception raise exception
except self.exception: except self.exception:

View file

@ -3,7 +3,6 @@ import inspect
import typing import typing
from ..handler import Handler from ..handler import Handler
from ...types.base import TelegramObject
class FilterNotPassed(Exception): class FilterNotPassed(Exception):
@ -140,8 +139,8 @@ class AbstractFilter(abc.ABC):
""" """
pass pass
async def __call__(self, obj: TelegramObject) -> bool: async def __call__(self, *args) -> bool:
return await self.check(obj) return await self.check(*args)
def __invert__(self): def __invert__(self):
return NotFilter(self) return NotFilter(self)

View file

@ -21,6 +21,7 @@ from ..utils.exceptions import TimeoutWarning
from ..utils.payload import prepare_arg from ..utils.payload import prepare_arg
DEFAULT_WEB_PATH = '/webhook' DEFAULT_WEB_PATH = '/webhook'
DEFAULT_ROUTE_NAME = 'webhook_handler'
BOT_DISPATCHER_KEY = 'BOT_DISPATCHER' BOT_DISPATCHER_KEY = 'BOT_DISPATCHER'
RESPONSE_TIMEOUT = 55 RESPONSE_TIMEOUT = 55
@ -266,16 +267,17 @@ class GoneRequestHandler(web.View):
raise HTTPGone() raise HTTPGone()
def configure_app(dispatcher, app: web.Application, path=DEFAULT_WEB_PATH): def configure_app(dispatcher, app: web.Application, path=DEFAULT_WEB_PATH, route_name=DEFAULT_ROUTE_NAME):
""" """
You can prepare web.Application for working with webhook handler. You can prepare web.Application for working with webhook handler.
:param dispatcher: Dispatcher instance :param dispatcher: Dispatcher instance
:param app: :class:`aiohttp.web.Application` :param app: :class:`aiohttp.web.Application`
:param path: Path to your webhook. :param path: Path to your webhook.
:param route_name: Name of webhook handler route
:return: :return:
""" """
app.router.add_route('*', path, WebhookRequestHandler, name='webhook_handler') app.router.add_route('*', path, WebhookRequestHandler, name=route_name)
app[BOT_DISPATCHER_KEY] = dispatcher app[BOT_DISPATCHER_KEY] = dispatcher

View file

@ -1,5 +1,8 @@
import typing
from . import base from . import base
from . import fields from . import fields
from .inline_query_result import InlineQueryResult
from .location import Location from .location import Location
from .user import User from .user import User
@ -17,3 +20,47 @@ class InlineQuery(base.TelegramObject):
location: Location = fields.Field(base=Location) location: Location = fields.Field(base=Location)
query: base.String = fields.Field() query: base.String = fields.Field()
offset: base.String = fields.Field() offset: base.String = fields.Field()
async def answer(self,
results: typing.List[InlineQueryResult],
cache_time: typing.Union[base.Integer, None] = None,
is_personal: typing.Union[base.Boolean, None] = None,
next_offset: typing.Union[base.String, None] = None,
switch_pm_text: typing.Union[base.String, None] = None,
switch_pm_parameter: typing.Union[base.String, None] = None):
"""
Use this method to send answers to an inline query.
No more than 50 results per query are allowed.
Source: https://core.telegram.org/bots/api#answerinlinequery
:param results: A JSON-serialized array of results for the inline query
:type results: :obj:`typing.List[types.InlineQueryResult]`
:param cache_time: The maximum amount of time in seconds that the result of the
inline query may be cached on the server. Defaults to 300.
:type cache_time: :obj:`typing.Union[base.Integer, None]`
:param is_personal: Pass True, if results may be cached on the server side only
for the user that sent the query. By default, results may be returned to any user who sends the same query
:type is_personal: :obj:`typing.Union[base.Boolean, None]`
:param next_offset: Pass the offset that a client should send in the
next query with the same text to receive more results.
Pass an empty string if there are no more results or if you dont support pagination.
Offset length cant exceed 64 bytes.
:type next_offset: :obj:`typing.Union[base.String, None]`
:param switch_pm_text: If passed, clients will display a button with specified text that
switches the user to a private chat with the bot and sends the bot a start message
with the parameter switch_pm_parameter
:type switch_pm_text: :obj:`typing.Union[base.String, None]`
:param switch_pm_parameter: Deep-linking parameter for the /start message sent to the bot when
user presses the switch button. 1-64 characters, only A-Z, a-z, 0-9, _ and - are allowed.
:type switch_pm_parameter: :obj:`typing.Union[base.String, None]`
:return: On success, True is returned
:rtype: :obj:`base.Boolean`
"""
return await self.bot.answer_inline_query(self.id,
results=results,
cache_time=cache_time,
is_personal=is_personal,
next_offset=next_offset,
switch_pm_text=switch_pm_text,
switch_pm_parameter=switch_pm_parameter)

View file

@ -333,6 +333,65 @@ class Message(base.TelegramObject):
reply_to_message_id=self.message_id if reply else None, reply_to_message_id=self.message_id if reply else None,
reply_markup=reply_markup) reply_markup=reply_markup)
async def send_animation(self,
animation: typing.Union[base.InputFile, base.String],
duration: typing.Union[base.Integer, None] = None,
width: typing.Union[base.Integer, None] = None,
height: typing.Union[base.Integer, None] = None,
thumb: typing.Union[typing.Union[base.InputFile, base.String], None] = None,
caption: typing.Union[base.String, None] = None,
parse_mode: typing.Union[base.String, None] = None,
disable_notification: typing.Union[base.Boolean, None] = None,
reply_markup=None,
reply=True) -> Message:
"""
Use this method to send animation files (GIF or H.264/MPEG-4 AVC video without sound).
On success, the sent Message is returned.
Bots can currently send animation files of up to 50 MB in size, this limit may be changed in the future.
Source https://core.telegram.org/bots/api#sendanimation
:param animation: Animation to send. Pass a file_id as String to send an animation that exists
on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get an animation
from the Internet, or upload a new animation using multipart/form-data
:type animation: :obj:`typing.Union[base.InputFile, base.String]`
:param duration: Duration of sent animation in seconds
:type duration: :obj:`typing.Union[base.Integer, None]`
:param width: Animation width
:type width: :obj:`typing.Union[base.Integer, None]`
:param height: Animation height
:type height: :obj:`typing.Union[base.Integer, None]`
:param thumb: Thumbnail of the file sent. The thumbnail should be in JPEG format and less than 200 kB in size.
A thumbnails width and height should not exceed 90.
:type thumb: :obj:`typing.Union[typing.Union[base.InputFile, base.String], None]`
:param caption: Animation caption (may also be used when resending animation by file_id), 0-1024 characters
:type caption: :obj:`typing.Union[base.String, None]`
:param parse_mode: Send Markdown or HTML, if you want Telegram apps to show bold, italic,
fixed-width text or inline URLs in the media caption
:type parse_mode: :obj:`typing.Union[base.String, None]`
:param disable_notification: Sends the message silently. Users will receive a notification with no sound
:type disable_notification: :obj:`typing.Union[base.Boolean, None]`
:param reply_markup: Additional interface options. A JSON-serialized object for an inline keyboard,
custom reply keyboard, instructions to remove reply keyboard or to force a reply from the user
:type reply_markup: :obj:`typing.Union[typing.Union[types.InlineKeyboardMarkup, types.ReplyKeyboardMarkup,
types.ReplyKeyboardRemove, types.ForceReply], None]`
:param reply: fill 'reply_to_message_id'
:return: On success, the sent Message is returned
:rtype: :obj:`types.Message`
"""
return await self.bot.send_animation(self.chat.id, animation=animation,
duration=duration,
width=width,
height=height,
thumb=thumb,
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
reply_to_message_id=self.message_id if reply else None,
reply_markup=reply_markup
)
async def reply_document(self, document: typing.Union[base.InputFile, base.String], async def reply_document(self, document: typing.Union[base.InputFile, base.String],
caption: typing.Union[base.String, None] = None, caption: typing.Union[base.String, None] = None,
disable_notification: typing.Union[base.Boolean, None] = None, disable_notification: typing.Union[base.Boolean, None] = None,

View file

@ -0,0 +1,136 @@
"""
Callback data factory
Usage:
Create instance of factory with prefix and element names:
>>> posts_query = CallbackData('post', 'post_id', 'action')
Then you can generate callback data:
>>> posts_query.new('32feff9b-92fa-48d9-9d29-621dc713743a', action='view')
<<< post:32feff9b-92fa-48d9-9d29-621dc713743a:view
Also you can generate filters:
>>> posts_query.filter(action='delete')
This filter can handle callback data by pattern: post:*:delete
"""
from __future__ import annotations
import typing
from aiogram import types
from aiogram.dispatcher.filters import Filter
class CallbackData:
"""
Callback data factory
"""
def __init__(self, prefix, *parts, sep=':'):
if not isinstance(prefix, str):
raise TypeError(f"Prefix must be instance of str not {type(prefix).__name__}")
elif not prefix:
raise ValueError('Prefix can\'t be empty')
elif len(sep) != 1:
raise ValueError(f"Length of sep should be equals to 1")
elif sep in prefix:
raise ValueError(f"Symbol '{sep}' can't be used in prefix")
elif not parts:
raise TypeError('Parts is not passed!')
self.prefix = prefix
self.sep = sep
self._part_names = parts
def new(self, *args, **kwargs) -> str:
"""
Generate callback data
:param args:
:param kwargs:
:return:
"""
args = list(args)
data = [self.prefix]
for part in self._part_names:
value = kwargs.pop(part, None)
if not value:
if args:
value = args.pop(0)
else:
raise ValueError(f"Value for '{part}' is not passed!")
if not isinstance(value, str):
raise TypeError(f"Value must be instance of str not {type(value).__name__}")
elif not value:
raise ValueError(f"Value for part {part} can't be empty!'")
elif self.sep in value:
raise ValueError(f"Symbol defined as separator can't be used in values of parts")
data.append(value)
if args or kwargs:
raise TypeError('Too many arguments is passed!')
callback_data = self.sep.join(data)
if len(callback_data) > 64:
raise ValueError('Resulted callback data is too long!')
return callback_data
def parse(self, callback_data: str) -> typing.Dict[str, str]:
"""
Parse data from the callback data
:param callback_data:
:return:
"""
prefix, *parts = callback_data.split(self.sep)
if prefix != self.prefix:
raise ValueError("Passed callback data can't be parsed with that prefix.")
elif len(parts) != len(self._part_names):
raise ValueError('Invalid parts count!')
result = {'@': prefix}
result.update(zip(self._part_names, parts))
return result
def filter(self, **config) -> CallbackDataFilter:
"""
Generate filter
:param config:
:return:
"""
for key in config.keys():
if key not in self._part_names:
raise ValueError(f"Invalid field name '{key}'")
return CallbackDataFilter(self, config)
class CallbackDataFilter(Filter):
def __init__(self, factory: CallbackData, config: typing.Dict[str, str]):
self.config = config
self.factory = factory
@classmethod
def validate(cls, full_config: typing.Dict[str, typing.Any]):
raise ValueError('That filter can\'t be used in filters factory!')
async def check(self, query: types.CallbackQuery):
try:
data = self.factory.parse(query.data)
except ValueError:
return False
else:
for key, value in self.config.items():
if isinstance(value, (list, tuple, set)):
if data.get(key) not in value:
return False
else:
if value != data.get(key):
return False
return {'callback_data': data}

View file

@ -16,6 +16,7 @@ TelegramAPIError
ObjectExpectedAsReplyMarkup ObjectExpectedAsReplyMarkup
InlineKeyboardExpected InlineKeyboardExpected
ChatNotFound ChatNotFound
ChatIdIsEmpty
ChatDescriptionIsNotModified ChatDescriptionIsNotModified
InvalidQueryID InvalidQueryID
InvalidPeerID InvalidPeerID
@ -40,6 +41,7 @@ TelegramAPIError
NotEnoughRightsToPinMessage NotEnoughRightsToPinMessage
CantDemoteChatCreator CantDemoteChatCreator
CantRestrictSelf CantRestrictSelf
NotEnoughRightsToRestrict
PhotoDimensions PhotoDimensions
UnavailableMembers UnavailableMembers
TypeOfFileMismatch TypeOfFileMismatch
@ -205,6 +207,10 @@ class ChatNotFound(BadRequest):
match = 'chat not found' match = 'chat not found'
class ChatIdIsEmpty(BadRequest):
match = 'chat_id is empty'
class ChatDescriptionIsNotModified(BadRequest): class ChatDescriptionIsNotModified(BadRequest):
match = 'chat description is not modified' match = 'chat description is not modified'
@ -284,6 +290,10 @@ class CantRestrictSelf(BadRequest):
text = "Admin can't restrict self." text = "Admin can't restrict self."
class NotEnoughRightsToRestrict(BadRequest):
match = 'not enough rights to restrict/unrestrict chat member'
class PhotoDimensions(BadRequest): class PhotoDimensions(BadRequest):
match = 'PHOTO_INVALID_DIMENSIONS' match = 'PHOTO_INVALID_DIMENSIONS'
text = 'Invalid photo dimensions' text = 'Invalid photo dimensions'

View file

@ -2,12 +2,15 @@ import asyncio
import datetime import datetime
import functools import functools
import secrets import secrets
from typing import Callable, Union, Optional, Any
from warnings import warn from warnings import warn
from aiohttp import web from aiohttp import web
from aiohttp.web_app import Application
from ..bot.api import log from ..bot.api import log
from ..dispatcher.webhook import BOT_DISPATCHER_KEY, WebhookRequestHandler from ..dispatcher.dispatcher import Dispatcher
from ..dispatcher.webhook import BOT_DISPATCHER_KEY, DEFAULT_ROUTE_NAME, WebhookRequestHandler
APP_EXECUTOR_KEY = 'APP_EXECUTOR' APP_EXECUTOR_KEY = 'APP_EXECUTOR'
@ -38,8 +41,37 @@ def start_polling(dispatcher, *, loop=None, skip_updates=False, reset_webhook=Tr
executor.start_polling(reset_webhook=reset_webhook, timeout=timeout, fast=fast) executor.start_polling(reset_webhook=reset_webhook, timeout=timeout, fast=fast)
def set_webhook(dispatcher: Dispatcher, webhook_path: str, *, loop: Optional[asyncio.AbstractEventLoop] = None,
skip_updates: bool = None, on_startup: Optional[Callable] = None,
on_shutdown: Optional[Callable] = None, check_ip: bool = False,
retry_after: Optional[Union[str, int]] = None, route_name: str = DEFAULT_ROUTE_NAME,
web_app: Optional[Application] = None):
"""
Set webhook for bot
:param dispatcher: Dispatcher
:param webhook_path: str
:param loop: Optional[asyncio.AbstractEventLoop] (default: None)
:param skip_updates: bool (default: None)
:param on_startup: Optional[Callable] (default: None)
:param on_shutdown: Optional[Callable] (default: None)
:param check_ip: bool (default: False)
:param retry_after: Optional[Union[str, int]] See https://tools.ietf.org/html/rfc7231#section-7.1.3 (default: None)
:param route_name: str (default: 'webhook_handler')
:param web_app: Optional[Application] (default: None)
:return:
"""
executor = Executor(dispatcher, skip_updates=skip_updates, check_ip=check_ip, retry_after=retry_after,
loop=loop)
_setup_callbacks(executor, on_startup, on_shutdown)
executor.set_webhook(webhook_path, route_name=route_name, web_app=web_app)
return executor
def start_webhook(dispatcher, webhook_path, *, loop=None, skip_updates=None, def start_webhook(dispatcher, webhook_path, *, loop=None, skip_updates=None,
on_startup=None, on_shutdown=None, check_ip=False, retry_after=None, **kwargs): on_startup=None, on_shutdown=None, check_ip=False, retry_after=None, route_name=DEFAULT_ROUTE_NAME,
**kwargs):
""" """
Start bot in webhook mode Start bot in webhook mode
@ -50,14 +82,20 @@ def start_webhook(dispatcher, webhook_path, *, loop=None, skip_updates=None,
:param on_startup: :param on_startup:
:param on_shutdown: :param on_shutdown:
:param check_ip: :param check_ip:
:param route_name:
:param kwargs: :param kwargs:
:return: :return:
""" """
executor = Executor(dispatcher, skip_updates=skip_updates, check_ip=check_ip, retry_after=retry_after, executor = set_webhook(dispatcher=dispatcher,
loop=loop) webhook_path=webhook_path,
_setup_callbacks(executor, on_startup, on_shutdown) loop=loop,
skip_updates=skip_updates,
executor.start_webhook(webhook_path, **kwargs) on_startup=on_startup,
on_shutdown=on_shutdown,
check_ip=check_ip,
retry_after=retry_after,
route_name=route_name)
executor.run_app(**kwargs)
def start(dispatcher, future, *, loop=None, skip_updates=None, def start(dispatcher, future, *, loop=None, skip_updates=None,
@ -181,15 +219,18 @@ class Executor:
# self.loop.set_task_factory(context.task_factory) # self.loop.set_task_factory(context.task_factory)
def _prepare_webhook(self, path=None, handler=WebhookRequestHandler): def _prepare_webhook(self, path=None, handler=WebhookRequestHandler, route_name=DEFAULT_ROUTE_NAME, app=None):
self._check_frozen() self._check_frozen()
self._freeze = True self._freeze = True
# self.loop.set_task_factory(context.task_factory) # self.loop.set_task_factory(context.task_factory)
app = self._web_app if app is not None:
if app is None: self._web_app = app
elif self._web_app is None:
self._web_app = app = web.Application() self._web_app = app = web.Application()
else:
raise RuntimeError("web.Application() is already configured!")
if self.retry_after: if self.retry_after:
app['RETRY_AFTER'] = self.retry_after app['RETRY_AFTER'] = self.retry_after
@ -199,7 +240,7 @@ class Executor:
return return
if path is not None: if path is not None:
app.router.add_route('*', path, handler, name='webhook_handler') app.router.add_route('*', path, handler, name=route_name)
async def _wrap_callback(cb, _): async def _wrap_callback(cb, _):
return await cb(self.dispatcher) return await cb(self.dispatcher)
@ -219,19 +260,36 @@ class Executor:
app[self._identity] = datetime.datetime.now() app[self._identity] = datetime.datetime.now()
app['_check_ip'] = self.check_ip app['_check_ip'] = self.check_ip
def start_webhook(self, webhook_path=None, request_handler=WebhookRequestHandler, **kwargs): def set_webhook(self, webhook_path: Optional[str] = None, request_handler: Any = WebhookRequestHandler,
route_name: str = DEFAULT_ROUTE_NAME, web_app: Optional[Application] = None):
"""
Set webhook for bot
:param webhook_path: Optional[str] (default: None)
:param request_handler: Any (default: WebhookRequestHandler)
:param route_name: str Name of webhook handler route (default: 'webhook_handler')
:param web_app: Optional[Application] (default: None)
:return:
"""
self._prepare_webhook(webhook_path, request_handler, route_name, web_app)
self.loop.run_until_complete(self._startup_webhook())
def run_app(self, **kwargs):
web.run_app(self._web_app, **kwargs)
def start_webhook(self, webhook_path=None, request_handler=WebhookRequestHandler, route_name=DEFAULT_ROUTE_NAME,
**kwargs):
""" """
Start bot in webhook mode Start bot in webhook mode
:param webhook_path: :param webhook_path:
:param request_handler: :param request_handler:
:param route_name: Name of webhook handler route
:param kwargs: :param kwargs:
:return: :return:
""" """
self._prepare_webhook(webhook_path, request_handler) self.set_webhook(webhook_path=webhook_path, request_handler=request_handler, route_name=route_name)
self.loop.run_until_complete(self._startup_webhook()) self.run_app(**kwargs)
web.run_app(self._web_app, **kwargs)
def start_polling(self, reset_webhook=None, timeout=None, fast=True): def start_polling(self, reset_webhook=None, timeout=None, fast=True):
""" """
@ -280,10 +338,8 @@ class Executor:
async def _skip_updates(self): async def _skip_updates(self):
await self.dispatcher.reset_webhook(True) await self.dispatcher.reset_webhook(True)
count = await self.dispatcher.skip_updates() await self.dispatcher.skip_updates()
if count: log.warning(f"Updates are skipped successfully.")
log.warning(f"Skipped {count} updates.")
return count
async def _welcome(self): async def _welcome(self):
user = await self.dispatcher.bot.me user = await self.dispatcher.bot.me

View file

@ -31,7 +31,7 @@ Welcome to aiogram's documentation!
:alt: MIT License :alt: MIT License
**aiogram** is a pretty simple and fully asynchronous library for `Telegram Bot API <https://core.telegram.org/bots/api>`_ written in Python 3.6 with `asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://github.com/aio-libs/aiohttp>`_. It helps you to make your bots faster and simpler. **aiogram** is a pretty simple and fully asynchronous library for `Telegram Bot API <https://core.telegram.org/bots/api>`_ written in Python 3.7 with `asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://github.com/aio-libs/aiohttp>`_. It helps you to make your bots faster and simpler.
Official aiogram resources Official aiogram resources

View file

@ -0,0 +1,117 @@
import asyncio
import logging
import random
import uuid
from aiogram import Bot, Dispatcher, executor, md, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.utils.callback_data import CallbackData
from aiogram.utils.exceptions import MessageNotModified, Throttled
logging.basicConfig(level=logging.INFO)
API_TOKEN = 'BOT TOKEN HERE'
loop = asyncio.get_event_loop()
bot = Bot(token=API_TOKEN, loop=loop, parse_mode=types.ParseMode.HTML)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
dp.middleware.setup(LoggingMiddleware())
POSTS = {
str(uuid.uuid4()): {
'title': f"Post {index}",
'body': 'Lorem ipsum dolor sit amet, '
'consectetur adipiscing elit, '
'sed do eiusmod tempor incididunt ut '
'labore et dolore magna aliqua',
'votes': random.randint(-2, 5)
} for index in range(1, 6)
}
posts_cb = CallbackData('post', 'id', 'action') # post:<id>:<action>
def get_keyboard() -> types.InlineKeyboardMarkup:
"""
Generate keyboard with list of posts
"""
markup = types.InlineKeyboardMarkup()
for post_id, post in POSTS.items():
markup.add(
types.InlineKeyboardButton(
post['title'],
callback_data=posts_cb.new(id=post_id, action='view'))
)
return markup
def format_post(post_id: str, post: dict) -> (str, types.InlineKeyboardMarkup):
text = f"{md.hbold(post['title'])}\n" \
f"{md.quote_html(post['body'])}\n" \
f"\n" \
f"Votes: {post['votes']}"
markup = types.InlineKeyboardMarkup()
markup.row(
types.InlineKeyboardButton('👍', callback_data=posts_cb.new(id=post_id, action='like')),
types.InlineKeyboardButton('👎', callback_data=posts_cb.new(id=post_id, action='unlike')),
)
markup.add(types.InlineKeyboardButton('<< Back', callback_data=posts_cb.new(id='-', action='list')))
return text, markup
@dp.message_handler(commands='start')
async def cmd_start(message: types.Message):
await message.reply('Posts', reply_markup=get_keyboard())
@dp.callback_query_handler(posts_cb.filter(action='list'))
async def query_show_list(query: types.CallbackQuery):
await query.message.edit_text('Posts', reply_markup=get_keyboard())
@dp.callback_query_handler(posts_cb.filter(action='view'))
async def query_view(query: types.CallbackQuery, callback_data: dict):
post_id = callback_data['id']
post = POSTS.get(post_id, None)
if not post:
return await query.answer('Unknown post!')
text, markup = format_post(post_id, post)
await query.message.edit_text(text, reply_markup=markup)
@dp.callback_query_handler(posts_cb.filter(action=['like', 'unlike']))
async def query_post_vote(query: types.CallbackQuery, callback_data: dict):
try:
await dp.throttle('vote', rate=1)
except Throttled:
return await query.answer('Too many requests.')
post_id = callback_data['id']
action = callback_data['action']
post = POSTS.get(post_id, None)
if not post:
return await query.answer('Unknown post!')
if action == 'like':
post['votes'] += 1
elif action == 'unlike':
post['votes'] -= 1
await query.answer('Voted.')
text, markup = format_post(post_id, post)
await query.message.edit_text(text, reply_markup=markup)
@dp.errors_handler(exception=MessageNotModified)
async def message_not_modified_handler(update, error):
return True
if __name__ == '__main__':
executor.start_polling(dp, loop=loop, skip_updates=True)

View file

@ -46,7 +46,7 @@ class ThrottlingMiddleware(BaseMiddleware):
self.prefix = key_prefix self.prefix = key_prefix
super(ThrottlingMiddleware, self).__init__() super(ThrottlingMiddleware, self).__init__()
async def on_process_message(self, message: types.Message): async def on_process_message(self, message: types.Message, data: dict):
""" """
This handler is called when dispatcher receives a message This handler is called when dispatcher receives a message

View file

@ -79,5 +79,7 @@ setup(
'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.7',
'Topic :: Software Development :: Libraries :: Application Frameworks', 'Topic :: Software Development :: Libraries :: Application Frameworks',
], ],
install_requires=get_requirements() install_requires=get_requirements(),
package_data={'': ['requirements.txt']},
include_package_data=False,
) )