Clean obsolete code and change configs

This commit is contained in:
jrootjunior 2019-11-15 12:44:24 +02:00
parent d09b72ce6e
commit 65815e509f
58 changed files with 78 additions and 3959 deletions

2
.gitignore vendored
View file

@ -10,3 +10,5 @@ dist/
*.egg *.egg
.mypy_cache .mypy_cache
.pytest_cache
.coverage

View file

@ -7,14 +7,20 @@ help:
@echo "=======================================================================================" @echo "======================================================================================="
@echo " aiogram build tools " @echo " aiogram build tools "
@echo "=======================================================================================" @echo "======================================================================================="
@echo "Commands list:" @echo "Environment:"
@echo " install: Install development dependencies" @echo " install: Install development dependencies"
@echo ""
@echo "Code quality:"
@echo " isort: Run isort tool" @echo " isort: Run isort tool"
@echo " black: Run black tool" @echo " black: Run black tool"
@echo " flake8: Run flake8 tool" @echo " flake8: Run flake8 tool"
@echo " mypy: Run mypy tool" @echo " mypy: Run mypy tool"
@echo " lint: Run isort, black, flake8 and mypy tools" @echo " lint: Run isort, black, flake8 and mypy tools"
@echo "" @echo ""
@echo "Tests:"
@echo " test: Run tests"
@echo ""
@echo ""
@echo "" @echo ""
.PHONY: install .PHONY: install
@ -40,3 +46,8 @@ mypy:
.PHONY: lint .PHONY: lint
lint: isort black flake8 mypy lint: isort black flake8 mypy
.PHONY: test
test:
poetry run pytest --cov=aiogram --cov-config .coveragerc tests/ -sq

11
README.md Normal file
View file

@ -0,0 +1,11 @@
# aiogram 3.0 [WIP]
[![MIT License](https://img.shields.io/pypi/l/aiogram.svg?style=flat-square)](https://opensource.org/licenses/MIT)
[![Supported python versions](https://img.shields.io/pypi/pyversions/aiogram.svg?style=flat-square)](https://pypi.python.org/pypi/aiogram)
[![Telegram Bot API](https://img.shields.io/badge/Telegram%20Bot%20API-4.4-blue.svg?style=flat-square&logo=telegram)](https://core.telegram.org/bots/api)
[![PyPi Package Version](https://img.shields.io/pypi/v/aiogram.svg?style=flat-square)](https://pypi.python.org/pypi/aiogram)
[![PyPi status](https://img.shields.io/pypi/status/aiogram.svg?style=flat-square)](https://pypi.python.org/pypi/aiogram)
[![Downloads](https://img.shields.io/pypi/dm/aiogram.svg?style=flat-square)](https://pypi.python.org/pypi/aiogram)
[![\[Telegram\] aiogram live](https://img.shields.io/badge/telegram-aiogram-blue.svg?style=flat-square)](https://t.me/aiogram_live)
**aiogram** modern and fully asynchronous framework for [Telegram Bot API](https://core.telegram.org/bots/api) written in Python 3.7 with [asyncio](https://docs.python.org/3/library/asyncio.html) and [aiohttp](https://github.com/aio-libs/aiohttp). It helps you to make your bots faster and simpler.

View file

@ -1,55 +0,0 @@
AIOGramBot
==========
.. image:: https://img.shields.io/badge/telegram-aiogram-blue.svg?style=flat-square
:target: https://t.me/aiogram_live
:alt: [Telegram] aiogram live
.. image:: https://img.shields.io/pypi/v/aiogram.svg?style=flat-square
:target: https://pypi.python.org/pypi/aiogram
:alt: PyPi Package Version
.. image:: https://img.shields.io/pypi/status/aiogram.svg?style=flat-square
:target: https://pypi.python.org/pypi/aiogram
:alt: PyPi status
.. image:: https://img.shields.io/pypi/dm/aiogram.svg?style=flat-square
:target: https://pypi.python.org/pypi/aiogram
:alt: PyPi downloads
.. image:: https://img.shields.io/pypi/pyversions/aiogram.svg?style=flat-square
:target: https://pypi.python.org/pypi/aiogram
:alt: Supported python versions
.. image:: https://img.shields.io/badge/Telegram%20Bot%20API-4.4-blue.svg?style=flat-square&logo=telegram
:target: https://core.telegram.org/bots/api
:alt: Telegram Bot API
.. image:: https://img.shields.io/readthedocs/aiogram?style=flat-square
:target: http://aiogram.readthedocs.io/en/latest/?badge=latest
:alt: Documentation Status
.. image:: https://img.shields.io/github/issues/aiogram/aiogram.svg?style=flat-square
:target: https://github.com/aiogram/aiogram/issues
:alt: Github issues
.. image:: https://img.shields.io/pypi/l/aiogram.svg?style=flat-square
:target: https://opensource.org/licenses/MIT
:alt: MIT License
**aiogram** is a pretty simple and fully asynchronous framework for `Telegram Bot API <https://core.telegram.org/bots/api>`_ written in Python 3.7 with `asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://github.com/aio-libs/aiohttp>`_. It helps you to make your bots faster and simpler.
You can `read the docs here <http://aiogram.readthedocs.io/en/latest/>`_.
Official aiogram resources
--------------------------
- News: `@aiogram_live <https://t.me/aiogram_live>`_
- Community: `@aiogram <https://t.me/aiogram>`_
- Russian community: `@aiogram_ru <https://t.me/aiogram_ru>`_
- Pip: `aiogram <https://pypi.python.org/pypi/aiogram>`_
- Docs: `ReadTheDocs <http://aiogram.readthedocs.io>`_
- Source: `Github repo <https://github.com/aiogram/aiogram>`_
- Issues/Bug tracker: `Github issues tracker <https://github.com/aiogram/aiogram/issues>`_
- Test bot: `@aiogram_bot <https://t.me/aiogram_bot>`_

View file

@ -1,11 +1,10 @@
from __future__ import annotations from __future__ import annotations
import abc import abc
import io
import secrets import secrets
from typing import TYPE_CHECKING, Any, Dict, Generic, Optional, Type, TypeVar, Union from typing import TYPE_CHECKING, Any, Dict, Generic, Optional, Type, TypeVar
from pydantic import BaseConfig, BaseModel, Extra from pydantic import BaseConfig, BaseModel
from pydantic.generics import GenericModel from pydantic.generics import GenericModel
from ..types import InputFile, ResponseParameters from ..types import InputFile, ResponseParameters

View file

@ -1,3 +1,4 @@
import secrets
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from ..types import InputFile, InputMediaPhoto, InputMediaVideo, Message from ..types import InputFile, InputMediaPhoto, InputMediaVideo, Message

View file

@ -1,33 +0,0 @@
import logging
from aiogram import Bot, Dispatcher, types, executor
API_TOKEN = 'API_TOKEN_HERE'
logging.basicConfig(level=logging.DEBUG)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot=bot)
# checks specified chat
@dp.message_handler(is_chat_admin=-1001241113577)
async def handle_specified(msg: types.Message):
await msg.answer("You are an admin of the specified chat!")
# checks multiple chats
@dp.message_handler(is_chat_admin=[-1001241113577, -320463906])
async def handle_multiple(msg: types.Message):
await msg.answer("You are an admin of multiple chats!")
# checks current chat
@dp.message_handler(is_chat_admin=True)
async def handler3(msg: types.Message):
await msg.answer("You are an admin of the current chat!")
if __name__ == '__main__':
executor.start_polling(dp)

View file

@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
**This example is outdated**
In this example used ArgumentParser for configuring Your bot.
Provided to start bot with webhook:
python advanced_executor_example.py \
--token TOKEN_HERE \
--host 0.0.0.0 \
--port 8084 \
--host-name example.com \
--webhook-port 443
Or long polling:
python advanced_executor_example.py --token TOKEN_HERE
So... In this example found small trouble:
can't get bot instance in handlers.
If you want to automatic change getting updates method use executor utils (from aiogram.utils.executor)
"""
# TODO: Move token to environment variables.
import argparse
import logging
import ssl
import sys
from aiogram import Bot
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.webhook import *
from aiogram.utils.executor import start_polling, start_webhook
logging.basicConfig(level=logging.INFO)
# Configure arguments parser.
parser = argparse.ArgumentParser(description="Python telegram bot")
parser.add_argument(
"--token", "-t", nargs="?", type=str, default=None, help="Set working directory"
)
parser.add_argument("--sock", help="UNIX Socket path")
parser.add_argument("--host", help="Webserver host")
parser.add_argument("--port", type=int, help="Webserver port")
parser.add_argument("--cert", help="Path to SSL certificate")
parser.add_argument("--pkey", help="Path to SSL private key")
parser.add_argument("--host-name", help="Set webhook host name")
parser.add_argument("--webhook-port", type=int, help="Port for webhook (default=port)")
parser.add_argument("--webhook-path", default="/webhook", help="Port for webhook (default=port)")
async def cmd_start(message: types.Message):
return SendMessage(message.chat.id, f"Hello, {message.from_user.full_name}!")
def setup_handlers(dispatcher: Dispatcher):
# This example has only one messages handler
dispatcher.register_message_handler(cmd_start, commands=["start", "welcome"])
async def on_startup(dispatcher, url=None, cert=None):
setup_handlers(dispatcher)
bot = dispatcher.bot
# Get current webhook status
webhook = await bot.get_webhook_info()
if url:
# If URL is bad
if webhook.url != url:
# If URL doesnt match with by current remove webhook
if not webhook.url:
await bot.delete_webhook()
# Set new URL for webhook
if cert:
with open(cert, "rb") as cert_file:
await bot.set_webhook(url, certificate=cert_file)
else:
await bot.set_webhook(url)
elif webhook.url:
# Otherwise remove webhook.
await bot.delete_webhook()
async def on_shutdown(dispatcher):
print("Shutdown.")
def main(arguments):
args = parser.parse_args(arguments)
token = args.token
sock = args.sock
host = args.host
port = args.port
cert = args.cert
pkey = args.pkey
host_name = args.host_name or host
webhook_port = args.webhook_port or port
webhook_path = args.webhook_path
# Fi webhook path
if not webhook_path.startswith("/"):
webhook_path = "/" + webhook_path
# Generate webhook URL
webhook_url = f"https://{host_name}:{webhook_port}{webhook_path}"
# Create bot & dispatcher instances.
bot = Bot(token)
dispatcher = Dispatcher(bot)
if (sock or host) and host_name:
if cert and pkey:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(cert, pkey)
else:
ssl_context = None
start_webhook(
dispatcher,
webhook_path,
on_startup=functools.partial(on_startup, url=webhook_url, cert=cert),
on_shutdown=on_shutdown,
host=host,
port=port,
path=sock,
ssl_context=ssl_context,
)
else:
start_polling(dispatcher, on_startup=on_startup, on_shutdown=on_shutdown)
if __name__ == "__main__":
argv = sys.argv[1:]
if not len(argv):
parser.print_help()
sys.exit(1)
main(argv)

View file

@ -1,74 +0,0 @@
import asyncio
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.utils import exceptions, executor
API_TOKEN = "BOT TOKEN HERE"
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("broadcast")
bot = Bot(token=API_TOKEN, parse_mode=types.ParseMode.HTML)
dp = Dispatcher(bot)
def get_users():
"""
Return users list
In this example returns some random ID's
"""
yield from (61043901, 78238238, 78378343, 98765431, 12345678)
async def send_message(user_id: int, text: str, disable_notification: bool = False) -> bool:
"""
Safe messages sender
:param user_id:
:param text:
:param disable_notification:
:return:
"""
try:
await bot.send_message(user_id, text, disable_notification=disable_notification)
except exceptions.BotBlocked:
log.error(f"Target [ID:{user_id}]: blocked by user")
except exceptions.ChatNotFound:
log.error(f"Target [ID:{user_id}]: invalid user ID")
except exceptions.RetryAfter as e:
log.error(f"Target [ID:{user_id}]: Flood limit is exceeded. Sleep {e.timeout} seconds.")
await asyncio.sleep(e.timeout)
return await send_message(user_id, text) # Recursive call
except exceptions.UserDeactivated:
log.error(f"Target [ID:{user_id}]: user is deactivated")
except exceptions.TelegramAPIError:
log.exception(f"Target [ID:{user_id}]: failed")
else:
log.info(f"Target [ID:{user_id}]: success")
return True
return False
async def broadcaster() -> int:
"""
Simple broadcaster
:return: Count of messages
"""
count = 0
try:
for user_id in get_users():
if await send_message(user_id, "<b>Hello!</b>"):
count += 1
await asyncio.sleep(0.05) # 20 messages per second (Limit: 30 messages per second)
finally:
log.info(f"{count} messages successful sent.")
return count
if __name__ == "__main__":
# Execute broadcaster
executor.start(dp, broadcaster())

View file

@ -1,119 +0,0 @@
import logging
import random
import uuid
from aiogram import Bot, Dispatcher, executor, md, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.utils.callback_data import CallbackData
from aiogram.utils.exceptions import MessageNotModified, Throttled
logging.basicConfig(level=logging.INFO)
API_TOKEN = 'BOT TOKEN HERE'
bot = Bot(token=API_TOKEN, parse_mode=types.ParseMode.HTML)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
dp.middleware.setup(LoggingMiddleware())
POSTS = {
str(uuid.uuid4()): {
'title': f'Post {index}',
'body': 'Lorem ipsum dolor sit amet, '
'consectetur adipiscing elit, '
'sed do eiusmod tempor incididunt ut '
'labore et dolore magna aliqua',
'votes': random.randint(-2, 5),
} for index in range(1, 6)
}
posts_cb = CallbackData('post', 'id', 'action') # post:<id>:<action>
def get_keyboard() -> types.InlineKeyboardMarkup:
"""
Generate keyboard with list of posts
"""
markup = types.InlineKeyboardMarkup()
for post_id, post in POSTS.items():
markup.add(
types.InlineKeyboardButton(
post['title'],
callback_data=posts_cb.new(id=post_id, action='view')),
)
return markup
def format_post(post_id: str, post: dict) -> (str, types.InlineKeyboardMarkup):
text = md.text(
md.hbold(post['title']),
md.quote_html(post['body']),
'', # just new empty line
f"Votes: {post['votes']}",
sep = '\n',
)
markup = types.InlineKeyboardMarkup()
markup.row(
types.InlineKeyboardButton('👍', callback_data=posts_cb.new(id=post_id, action='like')),
types.InlineKeyboardButton('👎', callback_data=posts_cb.new(id=post_id, action='dislike')),
)
markup.add(types.InlineKeyboardButton('<< Back', callback_data=posts_cb.new(id='-', action='list')))
return text, markup
@dp.message_handler(commands='start')
async def cmd_start(message: types.Message):
await message.reply('Posts', reply_markup=get_keyboard())
@dp.callback_query_handler(posts_cb.filter(action='list'))
async def query_show_list(query: types.CallbackQuery):
await query.message.edit_text('Posts', reply_markup=get_keyboard())
@dp.callback_query_handler(posts_cb.filter(action='view'))
async def query_view(query: types.CallbackQuery, callback_data: dict):
post_id = callback_data['id']
post = POSTS.get(post_id, None)
if not post:
return await query.answer('Unknown post!')
text, markup = format_post(post_id, post)
await query.message.edit_text(text, reply_markup=markup)
@dp.callback_query_handler(posts_cb.filter(action=['like', 'dislike']))
async def query_post_vote(query: types.CallbackQuery, callback_data: dict):
try:
await dp.throttle('vote', rate=1)
except Throttled:
return await query.answer('Too many requests.')
post_id = callback_data['id']
action = callback_data['action']
post = POSTS.get(post_id, None)
if not post:
return await query.answer('Unknown post!')
if action == 'like':
post['votes'] += 1
elif action == 'dislike':
post['votes'] -= 1
await query.answer('Vote accepted')
text, markup = format_post(post_id, post)
await query.message.edit_text(text, reply_markup=markup)
@dp.errors_handler(exception=MessageNotModified)
async def message_not_modified_handler(update, error):
return True
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,68 +0,0 @@
"""
This is a simple example of usage of CallbackData factory
For more comprehensive example see callback_data_factory.py
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.utils.callback_data import CallbackData
from aiogram.utils.exceptions import MessageNotModified
logging.basicConfig(level=logging.INFO)
API_TOKEN = 'BOT_TOKEN_HERE'
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
dp.middleware.setup(LoggingMiddleware())
vote_cb = CallbackData('vote', 'action') # vote:<action>
likes = {} # user_id: amount_of_likes
def get_keyboard():
return types.InlineKeyboardMarkup().row(
types.InlineKeyboardButton('👍', callback_data=vote_cb.new(action='up')),
types.InlineKeyboardButton('👎', callback_data=vote_cb.new(action='down')),
)
@dp.message_handler(commands=['start'])
async def cmd_start(message: types.Message):
amount_of_likes = likes.get(message.from_user.id, 0) # get value if key exists else set to 0
await message.reply(f'Vote! You have {amount_of_likes} votes now.', reply_markup=get_keyboard())
@dp.callback_query_handler(vote_cb.filter(action=['up', 'down']))
async def callback_vote_action(query: types.CallbackQuery, callback_data: dict):
logging.info('Got this callback data: %r', callback_data) # callback_data contains all info from callback data
await query.answer() # don't forget to answer callback query as soon as possible
callback_data_action = callback_data['action']
likes_count = likes.get(query.from_user.id, 0)
if callback_data_action == 'up':
likes_count += 1
else:
likes_count -= 1
likes[query.from_user.id] = likes_count # update amount of likes in storage
await bot.edit_message_text(
f'You voted {callback_data_action}! Now you have {likes_count} vote[s].',
query.from_user.id,
query.message.message_id,
reply_markup=get_keyboard(),
)
@dp.errors_handler(exception=MessageNotModified) # handle the cases when this exception raises
async def message_not_modified_handler(update, error):
return True
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,33 +0,0 @@
"""
Babel is required.
"""
import logging
from aiogram import Bot, Dispatcher, executor, md, types
API_TOKEN = 'BOT TOKEN HERE'
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN, parse_mode=types.ParseMode.MARKDOWN)
dp = Dispatcher(bot)
@dp.message_handler()
async def check_language(message: types.Message):
locale = message.from_user.locale
await message.reply(md.text(
md.bold('Info about your language:'),
md.text('🔸', md.bold('Code:'), md.code(locale.language)),
md.text('🔸', md.bold('Territory:'), md.code(locale.territory or 'Unknown')),
md.text('🔸', md.bold('Language name:'), md.code(locale.language_name)),
md.text('🔸', md.bold('English language name:'), md.code(locale.english_name)),
sep='\n',
))
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,53 +0,0 @@
"""
This is a echo bot.
It echoes any incoming text messages.
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = 'BOT TOKEN HERE'
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler(commands=['start', 'help'])
async def send_welcome(message: types.Message):
"""
This handler will be called when user sends `/start` or `/help` command
"""
await message.reply("Hi!\nI'm EchoBot!\nPowered by aiogram.")
@dp.message_handler(regexp='(^cat[s]?$|puss)')
async def cats(message: types.Message):
with open('data/cats.jpg', 'rb') as photo:
'''
# Old fashioned way:
await bot.send_photo(
message.chat.id,
photo,
caption='Cats are here 😺',
reply_to_message_id=message.message_id,
)
'''
await message.reply_photo(photo, caption='Cats are here 😺')
@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)
await message.reply(message.text, reply=False)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,129 +0,0 @@
import logging
import aiogram.utils.markdown as md
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters import Text
from aiogram.dispatcher.filters.state import State, StatesGroup
from aiogram.types import ParseMode
from aiogram.utils import executor
logging.basicConfig(level=logging.INFO)
API_TOKEN = 'BOT TOKEN HERE'
bot = Bot(token=API_TOKEN)
# For example use simple MemoryStorage for Dispatcher.
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
# States
class Form(StatesGroup):
name = State() # Will be represented in storage as 'Form:name'
age = State() # Will be represented in storage as 'Form:age'
gender = State() # Will be represented in storage as 'Form:gender'
@dp.message_handler(commands='start')
async def cmd_start(message: types.Message):
"""
Conversation's entry point
"""
# Set state
await Form.name.set()
await message.reply("Hi there! What's your name?")
# You can use state '*' if you need to handle all states
@dp.message_handler(state='*', commands='cancel')
@dp.message_handler(Text(equals='cancel', ignore_case=True), state='*')
async def cancel_handler(message: types.Message, state: FSMContext):
"""
Allow user to cancel any action
"""
current_state = await state.get_state()
if current_state is None:
return
logging.info('Cancelling state %r', current_state)
# Cancel state and inform user about it
await state.finish()
# And remove keyboard (just in case)
await message.reply('Cancelled.', reply_markup=types.ReplyKeyboardRemove())
@dp.message_handler(state=Form.name)
async def process_name(message: types.Message, state: FSMContext):
"""
Process user name
"""
async with state.proxy() as data:
data['name'] = message.text
await Form.next()
await message.reply("How old are you?")
# Check age. Age gotta be digit
@dp.message_handler(lambda message: not message.text.isdigit(), state=Form.age)
async def process_age_invalid(message: types.Message):
"""
If age is invalid
"""
return await message.reply("Age gotta be a number.\nHow old are you? (digits only)")
@dp.message_handler(lambda message: message.text.isdigit(), state=Form.age)
async def process_age(message: types.Message, state: FSMContext):
# Update state and data
await Form.next()
await state.update_data(age=int(message.text))
# Configure ReplyKeyboardMarkup
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, selective=True)
markup.add("Male", "Female")
markup.add("Other")
await message.reply("What is your gender?", reply_markup=markup)
@dp.message_handler(lambda message: message.text not in ["Male", "Female", "Other"], state=Form.gender)
async def process_gender_invalid(message: types.Message):
"""
In this example gender has to be one of: Male, Female, Other.
"""
return await message.reply("Bad gender name. Choose your gender from the keyboard.")
@dp.message_handler(state=Form.gender)
async def process_gender(message: types.Message, state: FSMContext):
async with state.proxy() as data:
data['gender'] = message.text
# Remove keyboard
markup = types.ReplyKeyboardRemove()
# And send message
await bot.send_message(
message.chat.id,
md.text(
md.text('Hi! Nice to meet you,', md.bold(data['name'])),
md.text('Age:', md.code(data['age'])),
md.text('Gender:', data['gender']),
sep='\n',
),
reply_markup=markup,
parse_mode=ParseMode.MARKDOWN,
)
# Finish conversation
await state.finish()
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,98 +0,0 @@
"""
Internalize your bot
Step 1: extract texts
# pybabel extract i18n_example.py -o locales/mybot.pot
Some useful options:
- Extract texts with pluralization support
# -k __:1,2
- Add comments for translators, you can use another tag if you want (TR)
# --add-comments=NOTE
- Disable comments with string location in code
# --no-location
- Set project name
# --project=MySuperBot
- Set version
# --version=2.2
Step 2: create *.po files. For e.g. create en, ru, uk locales.
# echo {en,ru,uk} | xargs -n1 pybabel init -i locales/mybot.pot -d locales -D mybot -l
Step 3: translate texts
Step 4: compile translations
# pybabel compile -d locales -D mybot
Step 5: When you change the code of your bot you need to update po & mo files.
Step 5.1: regenerate pot file:
command from step 1
Step 5.2: update po files
# pybabel update -d locales -D mybot -i locales/mybot.pot
Step 5.3: update your translations
Step 5.4: compile mo files
command from step 4
"""
from pathlib import Path
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.middlewares.i18n import I18nMiddleware
TOKEN = 'BOT_TOKEN_HERE'
I18N_DOMAIN = 'mybot'
BASE_DIR = Path(__file__).parent
LOCALES_DIR = BASE_DIR / 'locales'
bot = Bot(TOKEN, parse_mode=types.ParseMode.HTML)
dp = Dispatcher(bot)
# Setup i18n middleware
i18n = I18nMiddleware(I18N_DOMAIN, LOCALES_DIR)
dp.middleware.setup(i18n)
# Alias for gettext method
_ = i18n.gettext
@dp.message_handler(commands='start')
async def cmd_start(message: types.Message):
# Simply use `_('message')` instead of `'message'` and never use f-strings for translatable texts.
await message.reply(_('Hello, <b>{user}</b>!').format(user=message.from_user.full_name))
@dp.message_handler(commands='lang')
async def cmd_lang(message: types.Message, locale):
# For setting custom lang you have to modify i18n middleware, like this:
# https://github.com/aiogram/EventsTrackerBot/blob/master/modules/base/middlewares.py
await message.reply(_('Your current language: <i>{language}</i>').format(language=locale))
# If you care about pluralization, here's small handler
# And also, there's and example of comments for translators. Most translation tools support them.
# Alias for gettext method, parser will understand double underscore as plural (aka ngettext)
__ = i18n.gettext
# some likes manager
LIKES_STORAGE = {'count': 0}
def get_likes() -> int:
return LIKES_STORAGE['count']
def increase_likes() -> int:
LIKES_STORAGE['count'] += 1
return get_likes()
#
@dp.message_handler(commands='like')
async def cmd_like(message: types.Message, locale):
likes = increase_likes()
# NOTE: This is comment for a translator
await message.reply(__('Aiogram has {number} like!', 'Aiogram has {number} likes!', likes).format(number=likes))
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,36 +0,0 @@
from aiogram import Bot, Dispatcher, executor, types
from aiogram.dispatcher.handler import SkipHandler
API_TOKEN = 'BOT_TOKEN_HERE'
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
user_id_required = None # TODO: Set id here
chat_id_required = user_id_required # Change for use in groups (user_id == chat_id in pm)
@dp.message_handler(user_id=user_id_required)
async def handler1(msg: types.Message):
await bot.send_message(msg.chat.id, "Hello, checking with user_id=")
raise SkipHandler # just for demo
@dp.message_handler(chat_id=chat_id_required)
async def handler2(msg: types.Message):
await bot.send_message(msg.chat.id, "Hello, checking with chat_id=")
raise SkipHandler # just for demo
@dp.message_handler(user_id=user_id_required, chat_id=chat_id_required)
async def handler3(msg: types.Message):
await msg.reply("Hello from user= & chat_id=", reply=False)
@dp.message_handler(user_id=[user_id_required, 42]) # TODO: You can add any number of ids here
async def handler4(msg: types.Message):
await msg.reply("Checked user_id with list!", reply=False)
if __name__ == '__main__':
executor.start_polling(dp)

View file

@ -1,37 +0,0 @@
import hashlib
import logging
from aiogram import Bot, Dispatcher, executor
from aiogram.types import InlineQuery, \
InputTextMessageContent, InlineQueryResultArticle
API_TOKEN = 'BOT_TOKEN_HERE'
logging.basicConfig(level=logging.DEBUG)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.inline_handler()
async def inline_echo(inline_query: InlineQuery):
# id affects both preview and content,
# so it has to be unique for each result
# (Unique identifier for this result, 1-64 Bytes)
# you can set your unique id's
# but for example i'll generate it based on text because I know, that
# only text will be passed in this example
text = inline_query.query or 'echo'
input_content = InputTextMessageContent(text)
result_id: str = hashlib.md5(text.encode()).hexdigest()
item = InlineQueryResultArticle(
id=result_id,
title=f'Result {text!r}',
input_message_content=input_content,
)
# don't forget to set cache_time=1 for testing (default is 300s or 5m)
await bot.answer_inline_query(inline_query.id, results=[item], cache_time=1)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,62 +0,0 @@
"""
This bot is created for the demonstration of a usage of inline keyboards.
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = 'BOT_TOKEN_HERE'
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler(commands='start')
async def start_cmd_handler(message: types.Message):
keyboard_markup = types.InlineKeyboardMarkup(row_width=3)
# default row_width is 3, so here we can omit it actually
# kept for clearness
text_and_data = (
('Yes!', 'yes'),
('No!', 'no'),
)
# in real life for the callback_data the callback data factory should be used
# here the raw string is used for the simplicity
row_btns = (types.InlineKeyboardButton(text, callback_data=data) for text, data in text_and_data)
keyboard_markup.row(*row_btns)
keyboard_markup.add(
# url buttons have no callback data
types.InlineKeyboardButton('aiogram source', url='https://github.com/aiogram/aiogram'),
)
await message.reply("Hi!\nDo you love aiogram?", reply_markup=keyboard_markup)
# Use multiple registrators. Handler will execute when one of the filters is OK
@dp.callback_query_handler(text='no') # if cb.data == 'no'
@dp.callback_query_handler(text='yes') # if cb.data == 'yes'
async def inline_kb_answer_callback_handler(query: types.CallbackQuery):
answer_data = query.data
# always answer callback queries, even if you have nothing to say
await query.answer(f'You answered with {answer_data!r}')
if answer_data == 'yes':
text = 'Great, me too!'
elif answer_data == 'no':
text = 'Oh no...Why so?'
else:
text = f'Unexpected callback data {answer_data!r}!'
await bot.send_message(query.from_user.id, text)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,28 +0,0 @@
# English translations for PROJECT.
# Copyright (C) 2018 ORGANIZATION
# This file is distributed under the same license as the PROJECT project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2018.
#
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2018-06-30 03:50+0300\n"
"PO-Revision-Date: 2018-06-30 03:43+0300\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: en\n"
"Language-Team: en <LL@li.org>\n"
"Plural-Forms: nplurals=2; plural=(n != 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.6.0\n"
#: i18n_example.py:48
msgid "Hello, <b>{user}</b>!"
msgstr ""
#: i18n_example.py:53
msgid "Your current language: <i>{language}</i>"
msgstr ""

View file

@ -1,26 +0,0 @@
# Translations template for PROJECT.
# Copyright (C) 2019 ORGANIZATION
# This file is distributed under the same license as the PROJECT project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2019.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2019-08-10 17:51+0300\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.7.0\n"
#: i18n_example.py:60
msgid "Hello, <b>{user}</b>!"
msgstr ""
#: i18n_example.py:67
msgid "Your current language: <i>{language}</i>"
msgstr ""

View file

@ -1,35 +0,0 @@
# Russian translations for PROJECT.
# Copyright (C) 2019 ORGANIZATION
# This file is distributed under the same license as the PROJECT project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2019.
#
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2019-08-10 17:51+0300\n"
"PO-Revision-Date: 2019-08-10 17:52+0300\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: ru\n"
"Language-Team: ru <LL@li.org>\n"
"Plural-Forms: nplurals=3; plural=(n%10==1 && n%100!=11 ? 0 : n%10>=2 && "
"n%10<=4 && (n%100<10 || n%100>=20) ? 1 : 2)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.7.0\n"
#: i18n_example.py:60
msgid "Hello, <b>{user}</b>!"
msgstr "Привет, <b>{user}</b>!"
#: i18n_example.py:67
msgid "Your current language: <i>{language}</i>"
msgstr "Твой язык: <i>{language}</i>"
#: i18n_example.py:95
msgid "Aiogram has {number} like!"
msgid_plural "Aiogram has {number} likes!"
msgstr[0] "Aiogram имеет {number} лайк!"
msgstr[1] "Aiogram имеет {number} лайка!"
msgstr[2] "Aiogram имеет {number} лайков!"

View file

@ -1,29 +0,0 @@
# Ukrainian translations for PROJECT.
# Copyright (C) 2018 ORGANIZATION
# This file is distributed under the same license as the PROJECT project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2018.
#
msgid ""
msgstr ""
"Project-Id-Version: PROJECT VERSION\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2018-06-30 03:50+0300\n"
"PO-Revision-Date: 2018-06-30 03:43+0300\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: uk\n"
"Language-Team: uk <LL@li.org>\n"
"Plural-Forms: nplurals=3; plural=(n%10==1 && n%100!=11 ? 0 : n%10>=2 && "
"n%10<=4 && (n%100<10 || n%100>=20) ? 1 : 2)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.6.0\n"
#: i18n_example.py:48
msgid "Hello, <b>{user}</b>!"
msgstr "Привіт, <b>{user}</b>!"
#: i18n_example.py:53
msgid "Your current language: <i>{language}</i>"
msgstr "Твоя мова: <i>{language}</i>"

View file

@ -1,43 +0,0 @@
import asyncio
from aiogram import Bot, Dispatcher, executor, filters, types
API_TOKEN = 'BOT_TOKEN_HERE'
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler(filters.CommandStart())
async def send_welcome(message: types.Message):
# So... At first I want to send something like this:
await message.reply("Do you want to see many pussies? Are you ready?")
# Wait a little...
await asyncio.sleep(1)
# Good bots should send chat actions...
await types.ChatActions.upload_photo()
# Create media group
media = types.MediaGroup()
# Attach local file
media.attach_photo(types.InputFile('data/cat.jpg'), 'Cat!')
# More local files and more cats!
media.attach_photo(types.InputFile('data/cats.jpg'), 'More cats!')
# You can also use URL's
# For example: get random puss:
media.attach_photo('http://lorempixel.com/400/200/cats/', 'Random cat.')
# And you can also use file ID:
# media.attach_photo('<file_id>', 'cat-cat-cat.')
# Done! Send media group
await message.reply_media_group(media=media)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,120 +0,0 @@
import asyncio
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.redis import RedisStorage2
from aiogram.dispatcher import DEFAULT_RATE_LIMIT
from aiogram.dispatcher.handler import CancelHandler, current_handler
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.utils.exceptions import Throttled
TOKEN = 'BOT_TOKEN_HERE'
# In this example Redis storage is used
storage = RedisStorage2(db=5)
bot = Bot(token=TOKEN)
dp = Dispatcher(bot, storage=storage)
def rate_limit(limit: int, key=None):
"""
Decorator for configuring rate limit and key in different functions.
:param limit:
:param key:
:return:
"""
def decorator(func):
setattr(func, 'throttling_rate_limit', limit)
if key:
setattr(func, 'throttling_key', key)
return func
return decorator
class ThrottlingMiddleware(BaseMiddleware):
"""
Simple middleware
"""
def __init__(self, limit=DEFAULT_RATE_LIMIT, key_prefix='antiflood_'):
self.rate_limit = limit
self.prefix = key_prefix
super(ThrottlingMiddleware, self).__init__()
async def on_process_message(self, message: types.Message, data: dict):
"""
This handler is called when dispatcher receives a message
:param message:
"""
# Get current handler
handler = current_handler.get()
# Get dispatcher from context
dispatcher = Dispatcher.get_current()
# If handler was configured, get rate limit and key from handler
if handler:
limit = getattr(handler, 'throttling_rate_limit', self.rate_limit)
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
limit = self.rate_limit
key = f"{self.prefix}_message"
# Use Dispatcher.throttle method.
try:
await dispatcher.throttle(key, rate=limit)
except Throttled as t:
# Execute action
await self.message_throttled(message, t)
# Cancel current handler
raise CancelHandler()
async def message_throttled(self, message: types.Message, throttled: Throttled):
"""
Notify user only on first exceed and notify about unlocking only on last exceed
:param message:
:param throttled:
"""
handler = current_handler.get()
dispatcher = Dispatcher.get_current()
if handler:
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
key = f"{self.prefix}_message"
# Calculate how many time is left till the block ends
delta = throttled.rate - throttled.delta
# Prevent flooding
if throttled.exceeded_count <= 2:
await message.reply('Too many requests! ')
# Sleep.
await asyncio.sleep(delta)
# Check lock status
thr = await dispatcher.check_key(key)
# If current message is not last with current key - do not send message
if thr.exceeded_count == throttled.exceeded_count:
await message.reply('Unlocked.')
@dp.message_handler(commands=['start'])
@rate_limit(5, 'start') # this is not required but you can configure throttling manager for current handler using it
async def cmd_test(message: types.Message):
# You can use this command every 5 seconds
await message.reply('Test passed! You can use this command every 5 seconds.')
if __name__ == '__main__':
# Setup middleware
dp.middleware.setup(ThrottlingMiddleware())
# Start long-polling
executor.start_polling(dp)

View file

@ -1,97 +0,0 @@
from aiogram import Bot
from aiogram import types
from aiogram.dispatcher import Dispatcher
from aiogram.types.message import ContentTypes
from aiogram.utils import executor
BOT_TOKEN = 'BOT_TOKEN_HERE'
PAYMENTS_PROVIDER_TOKEN = '123456789:TEST:1422'
bot = Bot(BOT_TOKEN)
dp = Dispatcher(bot)
# Setup prices
prices = [
types.LabeledPrice(label='Working Time Machine', amount=5750),
types.LabeledPrice(label='Gift wrapping', amount=500),
]
# Setup shipping options
shipping_options = [
types.ShippingOption(id='instant', title='WorldWide Teleporter').add(types.LabeledPrice('Teleporter', 1000)),
types.ShippingOption(id='pickup', title='Local pickup').add(types.LabeledPrice('Pickup', 300)),
]
@dp.message_handler(commands=['start'])
async def cmd_start(message: types.Message):
await bot.send_message(message.chat.id,
"Hello, I'm the demo merchant bot."
" I can sell you a Time Machine."
" Use /buy to order one, /terms for Terms and Conditions")
@dp.message_handler(commands=['terms'])
async def cmd_terms(message: types.Message):
await bot.send_message(message.chat.id,
'Thank you for shopping with our demo bot. We hope you like your new time machine!\n'
'1. If your time machine was not delivered on time, please rethink your concept of time'
' and try again.\n'
'2. If you find that your time machine is not working, kindly contact our future service'
' workshops on Trappist-1e. They will be accessible anywhere between'
' May 2075 and November 4000 C.E.\n'
'3. If you would like a refund, kindly apply for one yesterday and we will have sent it'
' to you immediately.')
@dp.message_handler(commands=['buy'])
async def cmd_buy(message: types.Message):
await bot.send_message(message.chat.id,
"Real cards won't work with me, no money will be debited from your account."
" Use this test card number to pay for your Time Machine: `4242 4242 4242 4242`"
"\n\nThis is your demo invoice:", parse_mode='Markdown')
await bot.send_invoice(message.chat.id, title='Working Time Machine',
description='Want to visit your great-great-great-grandparents?'
' Make a fortune at the races?'
' Shake hands with Hammurabi and take a stroll in the Hanging Gardens?'
' Order our Working Time Machine today!',
provider_token=PAYMENTS_PROVIDER_TOKEN,
currency='usd',
photo_url='https://telegra.ph/file/d08ff863531f10bf2ea4b.jpg',
photo_height=512, # !=0/None or picture won't be shown
photo_width=512,
photo_size=512,
is_flexible=True, # True If you need to set up Shipping Fee
prices=prices,
start_parameter='time-machine-example',
payload='HAPPY FRIDAYS COUPON')
@dp.shipping_query_handler(lambda query: True)
async def shipping(shipping_query: types.ShippingQuery):
await bot.answer_shipping_query(shipping_query.id, ok=True, shipping_options=shipping_options,
error_message='Oh, seems like our Dog couriers are having a lunch right now.'
' Try again later!')
@dp.pre_checkout_query_handler(lambda query: True)
async def checkout(pre_checkout_query: types.PreCheckoutQuery):
await bot.answer_pre_checkout_query(pre_checkout_query.id, ok=True,
error_message="Aliens tried to steal your card's CVV,"
" but we successfully protected your credentials,"
" try to pay again in a few minutes, we need a small rest.")
@dp.message_handler(content_types=ContentTypes.SUCCESSFUL_PAYMENT)
async def got_payment(message: types.Message):
await bot.send_message(message.chat.id,
'Hoooooray! Thanks for payment! We will proceed your order for `{} {}`'
' as fast as possible! Stay in touch.'
'\n\nUse /buy again to get a Time Machine for your friend!'.format(
message.successful_payment.total_amount / 100, message.successful_payment.currency),
parse_mode='Markdown')
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,69 +0,0 @@
import logging
import aiohttp
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher
from aiogram.types import ParseMode
from aiogram.utils.emoji import emojize
from aiogram.utils.executor import start_polling
from aiogram.utils.markdown import bold, code, italic, text
# Configure bot here
API_TOKEN = 'BOT_TOKEN_HERE'
PROXY_URL = 'http://PROXY_URL' # Or 'socks5://host:port'
# NOTE: If authentication is required in your proxy then uncomment next line and change login/password for it
# PROXY_AUTH = aiohttp.BasicAuth(login='login', password='password')
# And add `proxy_auth=PROXY_AUTH` argument in line 30, like this:
# >>> bot = Bot(token=API_TOKEN, proxy=PROXY_URL, proxy_auth=PROXY_AUTH)
# Also you can use Socks5 proxy but you need manually install aiohttp_socks package.
# Get my ip URL
GET_IP_URL = 'http://bot.whatismyipaddress.com/'
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN, proxy=PROXY_URL)
# If auth is required:
# bot = Bot(token=API_TOKEN, proxy=PROXY_URL, proxy_auth=PROXY_AUTH)
dp = Dispatcher(bot)
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
@dp.message_handler(commands=['start'])
async def cmd_start(message: types.Message):
# fetching urls will take some time, so notify user that everything is OK
await types.ChatActions.typing()
content = []
# Make request (without proxy)
async with aiohttp.ClientSession() as session:
ip = await fetch(GET_IP_URL, session)
content.append(text(':globe_showing_Americas:', bold('IP:'), code(ip)))
# This line is formatted to '🌎 *IP:* `YOUR IP`'
# Make request through bot's proxy
ip = await fetch(GET_IP_URL, bot.session)
content.append(text(':locked_with_key:', bold('IP:'), code(ip), italic('via proxy')))
# This line is formatted to '🔐 *IP:* `YOUR IP` _via proxy_'
# Send content
await bot.send_message(message.chat.id, emojize(text(*content, sep='\n')), parse_mode=ParseMode.MARKDOWN)
# In this example you can see emoji codes: ":globe_showing_Americas:" and ":locked_with_key:"
# You can find full emoji cheat sheet at https://www.webpagefx.com/tools/emoji-cheat-sheet/
# For representing emoji codes into real emoji use emoji util (aiogram.utils.emoji)
# (you have to install emoji module)
# For example emojize('Moon face :new_moon_face:') is transformed to 'Moon face 🌚'
if __name__ == '__main__':
start_polling(dp, skip_updates=True)

View file

@ -1,29 +0,0 @@
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher, filters
from aiogram.utils import executor
bot = Bot(token='BOT_TOKEN_HERE', parse_mode=types.ParseMode.HTML)
dp = Dispatcher(bot)
@dp.message_handler(filters.RegexpCommandsFilter(regexp_commands=['item_([0-9]*)']))
async def send_welcome(message: types.Message, regexp_command):
await message.reply(f"You have requested an item with id <code>{regexp_command.group(1)}</code>")
@dp.message_handler(commands='start')
async def create_deeplink(message: types.Message):
bot_user = await bot.me
bot_username = bot_user.username
deeplink = f'https://t.me/{bot_username}?start=item_12345'
text = (
f'Either send a command /item_1234 or follow this link {deeplink} and then click start\n'
'It also can be hidden in a inline button\n\n'
'Or just send <code>/start item_123</code>'
)
await message.reply(text, disable_web_page_preview=True)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,67 +0,0 @@
"""
This bot is created for the demonstration of a usage of regular keyboards.
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = 'BOT_TOKEN_HERE'
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler(commands='start')
async def start_cmd_handler(message: types.Message):
keyboard_markup = types.ReplyKeyboardMarkup(row_width=3)
# default row_width is 3, so here we can omit it actually
# kept for clearness
btns_text = ('Yes!', 'No!')
keyboard_markup.row(*(types.KeyboardButton(text) for text in btns_text))
# adds buttons as a new row to the existing keyboard
# the behaviour doesn't depend on row_width attribute
more_btns_text = (
"I don't know",
"Who am i?",
"Where am i?",
"Who is there?",
)
keyboard_markup.add(*(types.KeyboardButton(text) for text in more_btns_text))
# adds buttons. New rows are formed according to row_width parameter
await message.reply("Hi!\nDo you like aiogram?", reply_markup=keyboard_markup)
@dp.message_handler()
async def all_msg_handler(message: types.Message):
# pressing of a KeyboardButton is the same as sending the regular message with the same text
# so, to handle the responses from the keyboard, we need to use a message_handler
# in real bot, it's better to define message_handler(text="...") for each button
# but here for the simplicity only one handler is defined
button_text = message.text
logger.debug('The answer is %r', button_text) # print the text we've got
if button_text == 'Yes!':
reply_text = "That's great"
elif button_text == 'No!':
reply_text = "Oh no! Why?"
else:
reply_text = "Keep calm...Everything is fine"
await message.reply(reply_text, reply_markup=types.ReplyKeyboardRemove())
# with message, we send types.ReplyKeyboardRemove() to hide the keyboard
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,53 +0,0 @@
"""
This is a bot to show the usage of the builtin Text filter
Instead of a list, a single element can be passed to any filter, it will be treated as list with an element
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = 'BOT_TOKEN_HERE'
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
# if the text from user in the list
@dp.message_handler(text=['text1', 'text2'])
async def text_in_handler(message: types.Message):
await message.answer("The message text equals to one of in the list!")
# if the text contains any string
@dp.message_handler(text_contains='example1')
@dp.message_handler(text_contains='example2')
async def text_contains_any_handler(message: types.Message):
await message.answer("The message text contains any of strings")
# if the text contains all the strings from the list
@dp.message_handler(text_contains=['str1', 'str2'])
async def text_contains_all_handler(message: types.Message):
await message.answer("The message text contains all strings from the list")
# if the text starts with any string from the list
@dp.message_handler(text_startswith=['prefix1', 'prefix2'])
async def text_startswith_handler(message: types.Message):
await message.answer("The message text starts with any of prefixes")
# if the text ends with any string from the list
@dp.message_handler(text_endswith=['postfix1', 'postfix2'])
async def text_endswith_handler(message: types.Message):
await message.answer("The message text ends with any of postfixes")
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View file

@ -1,71 +0,0 @@
"""
Example for throttling manager.
You can use that for flood controlling.
"""
import logging
from aiogram import Bot, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import Dispatcher
from aiogram.utils.exceptions import Throttled
from aiogram.utils.executor import start_polling
API_TOKEN = 'BOT_TOKEN_HERE'
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
# Throttling manager does not work without Leaky Bucket.
# You need to use a storage. For example use simple in-memory storage.
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
@dp.message_handler(commands=['start'])
async def send_welcome(message: types.Message):
try:
# Execute throttling manager with rate-limit equal to 2 seconds for key "start"
await dp.throttle('start', rate=2)
except Throttled:
# If request is throttled, the `Throttled` exception will be raised
await message.reply('Too many requests!')
else:
# Otherwise do something
await message.reply("Hi!\nI'm EchoBot!\nPowered by aiogram.")
@dp.message_handler(commands=['hi'])
@dp.throttled(lambda msg, loop, *args, **kwargs: loop.create_task(bot.send_message(msg.from_user.id, "Throttled")),
rate=5)
# loop is added to the function to run coroutines from it
async def say_hi(message: types.Message):
await message.answer("Hi")
# the on_throttled object can be either a regular function or coroutine
async def hello_throttled(*args, **kwargs):
# args will be the same as in the original handler
# kwargs will be updated with parameters given to .throttled (rate, key, user_id, chat_id)
print(f"hello_throttled was called with args={args} and kwargs={kwargs}")
message = args[0] # as message was the first argument in the original handler
await message.answer("Throttled")
@dp.message_handler(commands=['hello'])
@dp.throttled(hello_throttled, rate=4)
async def say_hello(message: types.Message):
await message.answer("Hello!")
@dp.message_handler(commands=['help'])
@dp.throttled(rate=5)
# nothing will happen if the handler will be throttled
async def help_handler(message: types.Message):
await message.answer('Help!')
if __name__ == '__main__':
start_polling(dp, skip_updates=True)

View file

@ -1,66 +0,0 @@
import logging
from aiogram import Bot, types
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.webhook import SendMessage
from aiogram.utils.executor import start_webhook
API_TOKEN = 'BOT_TOKEN_HERE'
# webhook settings
WEBHOOK_HOST = 'https://your.domain'
WEBHOOK_PATH = '/path/to/api'
WEBHOOK_URL = f"{WEBHOOK_HOST}{WEBHOOK_PATH}"
# webserver settings
WEBAPP_HOST = 'localhost' # or ip
WEBAPP_PORT = 3001
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
dp.middleware.setup(LoggingMiddleware())
@dp.message_handler()
async def echo(message: types.Message):
# Regular request
# await bot.send_message(message.chat.id, message.text)
# or reply INTO webhook
return SendMessage(message.chat.id, message.text)
async def on_startup(dp):
await bot.set_webhook(WEBHOOK_URL)
# insert code here to run it after start
async def on_shutdown(dp):
logging.warning('Shutting down..')
# insert code here to run it before shutdown
# Remove webhook (not acceptable in some cases)
await bot.delete_webhook()
# Close DB connection (if used)
await dp.storage.close()
await dp.storage.wait_closed()
logging.warning('Bye!')
if __name__ == '__main__':
start_webhook(
dispatcher=dp,
webhook_path=WEBHOOK_PATH,
on_startup=on_startup,
on_shutdown=on_shutdown,
skip_updates=True,
host=WEBAPP_HOST,
port=WEBAPP_PORT,
)

View file

@ -1,176 +0,0 @@
"""
Example outdated
"""
import asyncio
import ssl
import sys
from aiohttp import web
import aiogram
from aiogram import Bot, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.webhook import get_new_configured_app, SendMessage
from aiogram.types import ChatType, ParseMode, ContentTypes
from aiogram.utils.markdown import hbold, bold, text, link
TOKEN = 'BOT TOKEN HERE'
WEBHOOK_HOST = 'example.com' # Domain name or IP addres which your bot is located.
WEBHOOK_PORT = 443 # Telegram Bot API allows only for usage next ports: 443, 80, 88 or 8443
WEBHOOK_URL_PATH = '/webhook' # Part of URL
# This options needed if you use self-signed SSL certificate
# Instructions: https://core.telegram.org/bots/self-signed
WEBHOOK_SSL_CERT = './webhook_cert.pem' # Path to the ssl certificate
WEBHOOK_SSL_PRIV = './webhook_pkey.pem' # Path to the ssl private key
WEBHOOK_URL = f"https://{WEBHOOK_HOST}:{WEBHOOK_PORT}{WEBHOOK_URL_PATH}"
# Web app settings:
# Use LAN address to listen webhooks
# User any available port in range from 1024 to 49151 if you're using proxy, or WEBHOOK_PORT if you're using direct webhook handling
WEBAPP_HOST = 'localhost'
WEBAPP_PORT = 3001
BAD_CONTENT = ContentTypes.PHOTO & ContentTypes.DOCUMENT & ContentTypes.STICKER & ContentTypes.AUDIO
bot = Bot(TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
async def cmd_start(message: types.Message):
# Yep. aiogram allows to respond into webhook.
# https://core.telegram.org/bots/api#making-requests-when-getting-updates
return SendMessage(chat_id=message.chat.id, text='Hi from webhook!',
reply_to_message_id=message.message_id)
async def cmd_about(message: types.Message):
# In this function markdown utils are userd for formatting message text
return SendMessage(message.chat.id, text(
bold('Hi! I\'m just a simple telegram bot.'),
'',
text('I\'m powered by', bold('Python', Version(*sys.version_info[:]))),
text('With', link(text('aiogram', aiogram.VERSION), 'https://github.com/aiogram/aiogram')),
sep='\n'
), parse_mode=ParseMode.MARKDOWN)
async def cancel(message: types.Message):
# Get current state context
state = dp.current_state(chat=message.chat.id, user=message.from_user.id)
# If current user in any state - cancel it.
if await state.get_state() is not None:
await state.set_state(state=None)
return SendMessage(message.chat.id, 'Current action is canceled.')
# Otherwise do nothing
async def unknown(message: types.Message):
"""
Handler for unknown messages.
"""
return SendMessage(message.chat.id,
f"I don\'t know what to do with content type `{message.content_type()}`. Sorry :c")
async def cmd_id(message: types.Message):
"""
Return info about user.
"""
if message.reply_to_message:
target = message.reply_to_message.from_user
chat = message.chat
elif message.forward_from and message.chat.type == ChatType.PRIVATE:
target = message.forward_from
chat = message.forward_from or message.chat
else:
target = message.from_user
chat = message.chat
result_msg = [hbold('Info about user:'),
f"First name: {target.first_name}"]
if target.last_name:
result_msg.append(f"Last name: {target.last_name}")
if target.username:
result_msg.append(f"Username: {target.mention}")
result_msg.append(f"User ID: {target.id}")
result_msg.extend([hbold('Chat:'),
f"Type: {chat.type}",
f"Chat ID: {chat.id}"])
if chat.type != ChatType.PRIVATE:
result_msg.append(f"Title: {chat.title}")
else:
result_msg.append(f"Title: {chat.full_name}")
return SendMessage(message.chat.id, '\n'.join(result_msg), reply_to_message_id=message.message_id,
parse_mode=ParseMode.HTML)
async def on_startup(app):
# Demonstrate one of the available methods for registering handlers
# This command available only in main state (state=None)
dp.register_message_handler(cmd_start, commands=['start'])
# This handler is available in all states at any time.
dp.register_message_handler(cmd_about, commands=['help', 'about'], state='*')
dp.register_message_handler(unknown, content_types=BAD_CONTENT,
func=lambda message: message.chat.type == ChatType.PRIVATE)
# You are able to register one function handler for multiple conditions
dp.register_message_handler(cancel, commands=['cancel'], state='*')
dp.register_message_handler(cancel, func=lambda message: message.text.lower().strip() in ['cancel'], state='*')
dp.register_message_handler(cmd_id, commands=['id'], state='*')
dp.register_message_handler(cmd_id, func=lambda message: message.forward_from or
message.reply_to_message and
message.chat.type == ChatType.PRIVATE, state='*')
# Get current webhook status
webhook = await bot.get_webhook_info()
# If URL is bad
if webhook.url != WEBHOOK_URL:
# If URL doesnt match current - remove webhook
if not webhook.url:
await bot.delete_webhook()
# Set new URL for webhook
await bot.set_webhook(WEBHOOK_URL, certificate=open(WEBHOOK_SSL_CERT, 'rb'))
# If you want to use free certificate signed by LetsEncrypt you need to set only URL without sending certificate.
async def on_shutdown(app):
"""
Graceful shutdown. This method is recommended by aiohttp docs.
"""
# Remove webhook.
await bot.delete_webhook()
# Close Redis connection.
await dp.storage.close()
await dp.storage.wait_closed()
if __name__ == '__main__':
# Get instance of :class:`aiohttp.web.Application` with configured router.
app = get_new_configured_app(dispatcher=dp, path=WEBHOOK_URL_PATH)
# Setup event handlers.
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
# Generate SSL context
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_cert_chain(WEBHOOK_SSL_CERT, WEBHOOK_SSL_PRIV)
# Start web-application.
web.run_app(app, host=WEBAPP_HOST, port=WEBAPP_PORT, ssl_context=context)
# Note:
# If you start your bot using nginx or Apache web server, SSL context is not required.
# Otherwise you need to set `ssl_context` parameter.

48
poetry.lock generated
View file

@ -42,7 +42,7 @@ python-versions = ">=3.5.3"
version = "3.0.1" version = "3.0.1"
[[package]] [[package]]
category = "dev" category = "main"
description = "Atomic file writes." description = "Atomic file writes."
name = "atomicwrites" name = "atomicwrites"
optional = false optional = false
@ -99,7 +99,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "7.0" version = "7.0"
[[package]] [[package]]
category = "dev" category = "main"
description = "Cross-platform colored terminal text." description = "Cross-platform colored terminal text."
marker = "sys_platform == \"win32\"" marker = "sys_platform == \"win32\""
name = "colorama" name = "colorama"
@ -107,6 +107,14 @@ optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "0.4.1" version = "0.4.1"
[[package]]
category = "main"
description = "Code coverage measurement for Python"
name = "coverage"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, <4"
version = "4.5.4"
[[package]] [[package]]
category = "dev" category = "dev"
description = "Discover and load entry points from installed packages." description = "Discover and load entry points from installed packages."
@ -146,7 +154,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "2.8" version = "2.8"
[[package]] [[package]]
category = "dev" category = "main"
description = "Read metadata from Python packages" description = "Read metadata from Python packages"
marker = "python_version < \"3.8\"" marker = "python_version < \"3.8\""
name = "importlib-metadata" name = "importlib-metadata"
@ -267,7 +275,7 @@ jsmin = ">=2.2.2"
mkdocs = ">=1.0.4" mkdocs = ">=1.0.4"
[[package]] [[package]]
category = "dev" category = "main"
description = "More routines for operating on iterables, beyond itertools" description = "More routines for operating on iterables, beyond itertools"
name = "more-itertools" name = "more-itertools"
optional = false optional = false
@ -304,7 +312,7 @@ python-versions = "*"
version = "0.4.3" version = "0.4.3"
[[package]] [[package]]
category = "dev" category = "main"
description = "Core utilities for Python packages" description = "Core utilities for Python packages"
name = "packaging" name = "packaging"
optional = false optional = false
@ -324,7 +332,7 @@ python-versions = "*"
version = "1.0" version = "1.0"
[[package]] [[package]]
category = "dev" category = "main"
description = "plugin and hook calling mechanisms for python" description = "plugin and hook calling mechanisms for python"
name = "pluggy" name = "pluggy"
optional = false optional = false
@ -337,7 +345,7 @@ python = "<3.8"
version = ">=0.12" version = ">=0.12"
[[package]] [[package]]
category = "dev" category = "main"
description = "library with cross-python path, ini-parsing, io, code, log facilities" description = "library with cross-python path, ini-parsing, io, code, log facilities"
name = "py" name = "py"
optional = false optional = false
@ -389,7 +397,7 @@ Markdown = ">=3.0.1"
pep562 = "*" pep562 = "*"
[[package]] [[package]]
category = "dev" category = "main"
description = "Python parsing module" description = "Python parsing module"
name = "pyparsing" name = "pyparsing"
optional = false optional = false
@ -397,7 +405,7 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
version = "2.4.5" version = "2.4.5"
[[package]] [[package]]
category = "dev" category = "main"
description = "pytest: simple powerful testing with Python" description = "pytest: simple powerful testing with Python"
name = "pytest" name = "pytest"
optional = false optional = false
@ -429,6 +437,18 @@ version = "0.10.0"
[package.dependencies] [package.dependencies]
pytest = ">=3.0.6" pytest = ">=3.0.6"
[[package]]
category = "main"
description = "Pytest plugin for measuring coverage."
name = "pytest-cov"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "2.8.1"
[package.dependencies]
coverage = ">=4.4"
pytest = ">=3.6"
[[package]] [[package]]
category = "main" category = "main"
description = "World timezone definitions, modern and historical" description = "World timezone definitions, modern and historical"
@ -446,7 +466,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "5.1.2" version = "5.1.2"
[[package]] [[package]]
category = "dev" category = "main"
description = "Python 2 and 3 compatibility utilities" description = "Python 2 and 3 compatibility utilities"
name = "six" name = "six"
optional = false optional = false
@ -494,7 +514,7 @@ python-versions = "*"
version = "0.14.0" version = "0.14.0"
[[package]] [[package]]
category = "dev" category = "main"
description = "Measures number of Terminal column cells of wide-character codes" description = "Measures number of Terminal column cells of wide-character codes"
name = "wcwidth" name = "wcwidth"
optional = false optional = false
@ -514,7 +534,7 @@ idna = ">=2.0"
multidict = ">=4.0" multidict = ">=4.0"
[[package]] [[package]]
category = "dev" category = "main"
description = "Backport of pathlib-compatible object wrapper for zip files" description = "Backport of pathlib-compatible object wrapper for zip files"
marker = "python_version < \"3.8\"" marker = "python_version < \"3.8\""
name = "zipp" name = "zipp"
@ -526,7 +546,7 @@ version = "0.6.0"
more-itertools = "*" more-itertools = "*"
[metadata] [metadata]
content-hash = "39221d7aa0d3e63fb6e8ab693e36ec6a9af686f3b23aadbdbfc5c58722f51cca" content-hash = "32dbfc6a25dd6930178818c8d0ba8aeac600853f992b058e46569fd6ea7ee208"
python-versions = "^3.7" python-versions = "^3.7"
[metadata.hashes] [metadata.hashes]
@ -541,6 +561,7 @@ black = ["817243426042db1d36617910df579a54f1afd659adb96fc5032fcf4b36209739", "e0
chardet = ["84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", "fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"] chardet = ["84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", "fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"]
click = ["2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", "5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7"] click = ["2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", "5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7"]
colorama = ["05eed71e2e327246ad6b38c540c4a3117230b19679b875190486ddd2d721422d", "f8ac84de7840f5b9c4e3347b3c1eaa50f7e49c2b07596221daec5edaabbd7c48"] colorama = ["05eed71e2e327246ad6b38c540c4a3117230b19679b875190486ddd2d721422d", "f8ac84de7840f5b9c4e3347b3c1eaa50f7e49c2b07596221daec5edaabbd7c48"]
coverage = ["08907593569fe59baca0bf152c43f3863201efb6113ecb38ce7e97ce339805a6", "0be0f1ed45fc0c185cfd4ecc19a1d6532d72f86a2bac9de7e24541febad72650", "141f08ed3c4b1847015e2cd62ec06d35e67a3ac185c26f7635f4406b90afa9c5", "19e4df788a0581238e9390c85a7a09af39c7b539b29f25c89209e6c3e371270d", "23cc09ed395b03424d1ae30dcc292615c1372bfba7141eb85e11e50efaa6b351", "245388cda02af78276b479f299bbf3783ef0a6a6273037d7c60dc73b8d8d7755", "331cb5115673a20fb131dadd22f5bcaf7677ef758741312bee4937d71a14b2ef", "386e2e4090f0bc5df274e720105c342263423e77ee8826002dcffe0c9533dbca", "3a794ce50daee01c74a494919d5ebdc23d58873747fa0e288318728533a3e1ca", "60851187677b24c6085248f0a0b9b98d49cba7ecc7ec60ba6b9d2e5574ac1ee9", "63a9a5fc43b58735f65ed63d2cf43508f462dc49857da70b8980ad78d41d52fc", "6b62544bb68106e3f00b21c8930e83e584fdca005d4fffd29bb39fb3ffa03cb5", "6ba744056423ef8d450cf627289166da65903885272055fb4b5e113137cfa14f", "7494b0b0274c5072bddbfd5b4a6c6f18fbbe1ab1d22a41e99cd2d00c8f96ecfe", "826f32b9547c8091679ff292a82aca9c7b9650f9fda3e2ca6bf2ac905b7ce888", "93715dffbcd0678057f947f496484e906bf9509f5c1c38fc9ba3922893cda5f5", "9a334d6c83dfeadae576b4d633a71620d40d1c379129d587faa42ee3e2a85cce", "af7ed8a8aa6957aac47b4268631fa1df984643f07ef00acd374e456364b373f5", "bf0a7aed7f5521c7ca67febd57db473af4762b9622254291fbcbb8cd0ba5e33e", "bf1ef9eb901113a9805287e090452c05547578eaab1b62e4ad456fcc049a9b7e", "c0afd27bc0e307a1ffc04ca5ec010a290e49e3afbe841c5cafc5c5a80ecd81c9", "dd579709a87092c6dbee09d1b7cfa81831040705ffa12a1b248935274aee0437", "df6712284b2e44a065097846488f66840445eb987eb81b3cc6e4149e7b6982e1", "e07d9f1a23e9e93ab5c62902833bf3e4b1f65502927379148b6622686223125c", "e2ede7c1d45e65e209d6093b762e98e8318ddeff95317d07a27a2140b80cfd24", "e4ef9c164eb55123c62411f5936b5c2e521b12356037b6e1c2617cef45523d47", "eca2b7343524e7ba246cab8ff00cab47a2d6d54ada3b02772e908a45675722e2", "eee64c616adeff7db37cc37da4180a3a5b6177f5c46b187894e633f088fb5b28", "ef824cad1f980d27f26166f86856efe11eff9912c4fed97d3804820d43fa550c", "efc89291bd5a08855829a3c522df16d856455297cf35ae827a37edac45f466a7", "fa964bae817babece5aa2e8c1af841bebb6d0b9add8e637548809d040443fee0", "ff37757e068ae606659c28c3bd0d923f9d29a85de79bf25b2b34b148473b5025"]
entrypoints = ["589f874b313739ad35be6e0cd7efde2a4e9b6fea91edcc34e58ecbb8dbe56d19", "c70dd71abe5a8c85e55e12c19bd91ccfeec11a6e99044204511f9ed547d48451"] entrypoints = ["589f874b313739ad35be6e0cd7efde2a4e9b6fea91edcc34e58ecbb8dbe56d19", "c70dd71abe5a8c85e55e12c19bd91ccfeec11a6e99044204511f9ed547d48451"]
flake8 = ["45681a117ecc81e870cbf1262835ae4af5e7a8b08e40b944a8a6e6b895914cfb", "49356e766643ad15072a789a20915d3c91dc89fd313ccd71802303fd67e4deca"] flake8 = ["45681a117ecc81e870cbf1262835ae4af5e7a8b08e40b944a8a6e6b895914cfb", "49356e766643ad15072a789a20915d3c91dc89fd313ccd71802303fd67e4deca"]
htmlmin = ["50c1ef4630374a5d723900096a961cff426dff46b48f34d194a81bbe14eca178"] htmlmin = ["50c1ef4630374a5d723900096a961cff426dff46b48f34d194a81bbe14eca178"]
@ -572,6 +593,7 @@ pymdown-extensions = ["24c1a0afbae101c4e2b2675ff4dd936470a90133f93398b9cbe0c855e
pyparsing = ["20f995ecd72f2a1f4bf6b072b63b22e2eb457836601e76d6e5dfcd75436acc1f", "4ca62001be367f01bd3e92ecbb79070272a9d4964dce6a48a82ff0b8bc7e683a"] pyparsing = ["20f995ecd72f2a1f4bf6b072b63b22e2eb457836601e76d6e5dfcd75436acc1f", "4ca62001be367f01bd3e92ecbb79070272a9d4964dce6a48a82ff0b8bc7e683a"]
pytest = ["15837d2880cb94821087bc07476892ea740696b20e90288fd6c19e44b435abdb", "b6cf7ad9064049ee486586b3a0ddd70dc5136c40e1147e7d286efd77ba66c5eb"] pytest = ["15837d2880cb94821087bc07476892ea740696b20e90288fd6c19e44b435abdb", "b6cf7ad9064049ee486586b3a0ddd70dc5136c40e1147e7d286efd77ba66c5eb"]
pytest-asyncio = ["9fac5100fd716cbecf6ef89233e8590a4ad61d729d1732e0a96b84182df1daaf", "d734718e25cfc32d2bf78d346e99d33724deeba774cc4afdf491530c6184b63b"] pytest-asyncio = ["9fac5100fd716cbecf6ef89233e8590a4ad61d729d1732e0a96b84182df1daaf", "d734718e25cfc32d2bf78d346e99d33724deeba774cc4afdf491530c6184b63b"]
pytest-cov = ["cc6742d8bac45070217169f5f72ceee1e0e55b0221f54bcf24845972d3a47f2b", "cdbdef4f870408ebdbfeb44e63e07eb18bb4619fae852f6e760645fa36172626"]
pytz = ["1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", "b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"] pytz = ["1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", "b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"]
pyyaml = ["0113bc0ec2ad727182326b61326afa3d1d8280ae1122493553fd6f4397f33df9", "01adf0b6c6f61bd11af6e10ca52b7d4057dd0be0343eb9283c878cf3af56aee4", "5124373960b0b3f4aa7df1707e63e9f109b5263eca5976c66e08b1c552d4eaf8", "5ca4f10adbddae56d824b2c09668e91219bb178a1eee1faa56af6f99f11bf696", "7907be34ffa3c5a32b60b95f4d95ea25361c951383a894fec31be7252b2b6f34", "7ec9b2a4ed5cad025c2278a1e6a19c011c80a3caaac804fd2d329e9cc2c287c9", "87ae4c829bb25b9fe99cf71fbb2140c448f534e24c998cc60f39ae4f94396a73", "9de9919becc9cc2ff03637872a440195ac4241c80536632fffeb6a1e25a74299", "a5a85b10e450c66b49f98846937e8cfca1db3127a9d5d1e31ca45c3d0bef4c5b", "b0997827b4f6a7c286c01c5f60384d218dca4ed7d9efa945c3e1aa623d5709ae", "b631ef96d3222e62861443cc89d6563ba3eeb816eeb96b2629345ab795e53681", "bf47c0607522fdbca6c9e817a6e81b08491de50f3766a7a0e6a5be7905961b41", "f81025eddd0327c7d4cfe9b62cf33190e1e736cc6e97502b3ec425f574b3e7a8"] pyyaml = ["0113bc0ec2ad727182326b61326afa3d1d8280ae1122493553fd6f4397f33df9", "01adf0b6c6f61bd11af6e10ca52b7d4057dd0be0343eb9283c878cf3af56aee4", "5124373960b0b3f4aa7df1707e63e9f109b5263eca5976c66e08b1c552d4eaf8", "5ca4f10adbddae56d824b2c09668e91219bb178a1eee1faa56af6f99f11bf696", "7907be34ffa3c5a32b60b95f4d95ea25361c951383a894fec31be7252b2b6f34", "7ec9b2a4ed5cad025c2278a1e6a19c011c80a3caaac804fd2d329e9cc2c287c9", "87ae4c829bb25b9fe99cf71fbb2140c448f534e24c998cc60f39ae4f94396a73", "9de9919becc9cc2ff03637872a440195ac4241c80536632fffeb6a1e25a74299", "a5a85b10e450c66b49f98846937e8cfca1db3127a9d5d1e31ca45c3d0bef4c5b", "b0997827b4f6a7c286c01c5f60384d218dca4ed7d9efa945c3e1aa623d5709ae", "b631ef96d3222e62861443cc89d6563ba3eeb816eeb96b2629345ab795e53681", "bf47c0607522fdbca6c9e817a6e81b08491de50f3766a7a0e6a5be7905961b41", "f81025eddd0327c7d4cfe9b62cf33190e1e736cc6e97502b3ec425f574b3e7a8"]
six = ["1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", "30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66"] six = ["1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", "30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66"]

View file

@ -4,12 +4,17 @@ version = "3.0.0-alpha.0"
description = "is a pretty simple and fully asynchronous framework for Telegram Bot API" description = "is a pretty simple and fully asynchronous framework for Telegram Bot API"
authors = ["Alex Root Junior <jroot.junior@gmail.com>"] authors = ["Alex Root Junior <jroot.junior@gmail.com>"]
license = "MIT" license = "MIT"
readme = "README.md"
homepage = "https://aiogram.readthedocs.io/"
documentation = "https://aiogram.readthedocs.io/"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.7" python = "^3.7"
aiohttp = "^3.6" aiohttp = "^3.6"
pydantic = "^1.1" pydantic = "^1.1"
Babel = "^2.7" Babel = "^2.7"
pytest-cov = "^2.8"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
black = {version = "^18.3-alpha.0", allows-prereleases = true} black = {version = "^18.3-alpha.0", allows-prereleases = true}

View file

@ -1,42 +0,0 @@
import aresponses
from aiogram import Bot
TOKEN = "123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
class FakeTelegram(aresponses.ResponsesMockServer):
def __init__(self, message_dict, bot=None, **kwargs):
super().__init__(**kwargs)
self._body, self._headers = self.parse_data(message_dict)
if isinstance(bot, Bot):
Bot.set_current(bot)
async def __aenter__(self):
await super().__aenter__()
_response = self.Response(text=self._body, headers=self._headers, status=200, reason="OK")
self.add(self.ANY, response=_response)
async def __aexit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, "monkeypatch"):
self.monkeypatch.undo()
await super().__aexit__(exc_type, exc_val, exc_tb)
@staticmethod
def parse_data(message_dict):
import json
_body = '{"ok":true,"result":' + json.dumps(message_dict) + "}"
_headers = {
"Server": "nginx/1.12.2",
"Date": "Tue, 03 Apr 2018 16:59:54 GMT",
"Content-Type": "application/json",
"Content-Length": str(len(_body)),
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Expose-Headers": "Content-Length,Content-Type,Date,Server,Connection",
"Strict-Transport-Security": "max-age=31536000; includeSubdomains",
}
return _body, _headers

View file

@ -1 +0,0 @@
# pytest_plugins = "pytest_asyncio.plugin"

View file

@ -1,681 +0,0 @@
import aresponses
import pytest
from aiogram import Bot, types
TOKEN = "123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
class FakeTelegram(aresponses.ResponsesMockServer):
def __init__(self, message_dict, **kwargs):
super().__init__(**kwargs)
self._body, self._headers = self.parse_data(message_dict)
async def __aenter__(self):
await super().__aenter__()
_response = self.Response(text=self._body, headers=self._headers, status=200, reason="OK")
self.add(self.ANY, response=_response)
@staticmethod
def parse_data(message_dict):
import json
_body = '{"ok":true,"result":' + json.dumps(message_dict) + "}"
_headers = {
"Server": "nginx/1.12.2",
"Date": "Tue, 03 Apr 2018 16:59:54 GMT",
"Content-Type": "application/json",
"Content-Length": str(len(_body)),
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Expose-Headers": "Content-Length,Content-Type,Date,Server,Connection",
"Strict-Transport-Security": "max-age=31536000; includeSubdomains",
}
return _body, _headers
@pytest.yield_fixture()
@pytest.mark.asyncio
async def bot(event_loop):
""" Bot fixture """
_bot = Bot(TOKEN, loop=event_loop, parse_mode=types.ParseMode.MARKDOWN)
yield _bot
await _bot.close()
@pytest.mark.asyncio
async def test_get_me(bot: Bot, event_loop):
""" getMe method test """
from .types.dataset import USER
user = types.User(**USER)
async with FakeTelegram(message_dict=USER, loop=event_loop):
result = await bot.me
assert result == user
@pytest.mark.asyncio
async def test_send_message(bot: Bot, event_loop):
""" sendMessage method test """
from .types.dataset import MESSAGE
msg = types.Message(**MESSAGE)
async with FakeTelegram(message_dict=MESSAGE, loop=event_loop):
result = await bot.send_message(chat_id=msg.chat.id, text=msg.text)
assert result == msg
@pytest.mark.asyncio
async def test_forward_message(bot: Bot, event_loop):
""" forwardMessage method test """
from .types.dataset import FORWARDED_MESSAGE
msg = types.Message(**FORWARDED_MESSAGE)
async with FakeTelegram(message_dict=FORWARDED_MESSAGE, loop=event_loop):
result = await bot.forward_message(
chat_id=msg.chat.id,
from_chat_id=msg.forward_from_chat.id,
message_id=msg.forward_from_message_id,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_photo(bot: Bot, event_loop):
""" sendPhoto method test with file_id """
from .types.dataset import MESSAGE_WITH_PHOTO, PHOTO
msg = types.Message(**MESSAGE_WITH_PHOTO)
photo = types.PhotoSize(**PHOTO)
async with FakeTelegram(message_dict=MESSAGE_WITH_PHOTO, loop=event_loop):
result = await bot.send_photo(
msg.chat.id,
photo=photo.file_id,
caption=msg.caption,
parse_mode=types.ParseMode.HTML,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_audio(bot: Bot, event_loop):
""" sendAudio method test with file_id """
from .types.dataset import MESSAGE_WITH_AUDIO
msg = types.Message(**MESSAGE_WITH_AUDIO)
async with FakeTelegram(message_dict=MESSAGE_WITH_AUDIO, loop=event_loop):
result = await bot.send_audio(
chat_id=msg.chat.id,
audio=msg.audio.file_id,
caption=msg.caption,
parse_mode=types.ParseMode.HTML,
duration=msg.audio.duration,
performer=msg.audio.performer,
title=msg.audio.title,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_document(bot: Bot, event_loop):
""" sendDocument method test with file_id """
from .types.dataset import MESSAGE_WITH_DOCUMENT
msg = types.Message(**MESSAGE_WITH_DOCUMENT)
async with FakeTelegram(message_dict=MESSAGE_WITH_DOCUMENT, loop=event_loop):
result = await bot.send_document(
chat_id=msg.chat.id,
document=msg.document.file_id,
caption=msg.caption,
parse_mode=types.ParseMode.HTML,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_video(bot: Bot, event_loop):
""" sendVideo method test with file_id """
from .types.dataset import MESSAGE_WITH_VIDEO, VIDEO
msg = types.Message(**MESSAGE_WITH_VIDEO)
video = types.Video(**VIDEO)
async with FakeTelegram(message_dict=MESSAGE_WITH_VIDEO, loop=event_loop):
result = await bot.send_video(
chat_id=msg.chat.id,
video=video.file_id,
duration=video.duration,
width=video.width,
height=video.height,
caption=msg.caption,
parse_mode=types.ParseMode.HTML,
supports_streaming=True,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_voice(bot: Bot, event_loop):
""" sendVoice method test with file_id """
from .types.dataset import MESSAGE_WITH_VOICE, VOICE
msg = types.Message(**MESSAGE_WITH_VOICE)
voice = types.Voice(**VOICE)
async with FakeTelegram(message_dict=MESSAGE_WITH_VOICE, loop=event_loop):
result = await bot.send_voice(
chat_id=msg.chat.id,
voice=voice.file_id,
caption=msg.caption,
parse_mode=types.ParseMode.HTML,
duration=voice.duration,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_video_note(bot: Bot, event_loop):
""" sendVideoNote method test with file_id """
from .types.dataset import MESSAGE_WITH_VIDEO_NOTE, VIDEO_NOTE
msg = types.Message(**MESSAGE_WITH_VIDEO_NOTE)
video_note = types.VideoNote(**VIDEO_NOTE)
async with FakeTelegram(message_dict=MESSAGE_WITH_VIDEO_NOTE, loop=event_loop):
result = await bot.send_video_note(
chat_id=msg.chat.id,
video_note=video_note.file_id,
duration=video_note.duration,
length=video_note.length,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_media_group(bot: Bot, event_loop):
""" sendMediaGroup method test with file_id """
from .types.dataset import MESSAGE_WITH_MEDIA_GROUP, PHOTO
msg = types.Message(**MESSAGE_WITH_MEDIA_GROUP)
photo = types.PhotoSize(**PHOTO)
media = [
types.InputMediaPhoto(media=photo.file_id),
types.InputMediaPhoto(media=photo.file_id),
]
async with FakeTelegram(
message_dict=[MESSAGE_WITH_MEDIA_GROUP, MESSAGE_WITH_MEDIA_GROUP], loop=event_loop
):
result = await bot.send_media_group(msg.chat.id, media=media, disable_notification=False)
assert len(result) == len(media)
assert result.pop().media_group_id
@pytest.mark.asyncio
async def test_send_location(bot: Bot, event_loop):
""" sendLocation method test """
from .types.dataset import MESSAGE_WITH_LOCATION, LOCATION
msg = types.Message(**MESSAGE_WITH_LOCATION)
location = types.Location(**LOCATION)
async with FakeTelegram(message_dict=MESSAGE_WITH_LOCATION, loop=event_loop):
result = await bot.send_location(
msg.chat.id,
latitude=location.latitude,
longitude=location.longitude,
live_period=10,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_edit_message_live_location_by_bot(bot: Bot, event_loop):
""" editMessageLiveLocation method test """
from .types.dataset import MESSAGE_WITH_LOCATION, LOCATION
msg = types.Message(**MESSAGE_WITH_LOCATION)
location = types.Location(**LOCATION)
# editing bot message
async with FakeTelegram(message_dict=MESSAGE_WITH_LOCATION, loop=event_loop):
result = await bot.edit_message_live_location(
chat_id=msg.chat.id,
message_id=msg.message_id,
latitude=location.latitude,
longitude=location.longitude,
)
assert result == msg
@pytest.mark.asyncio
async def test_edit_message_live_location_by_user(bot: Bot, event_loop):
""" editMessageLiveLocation method test """
from .types.dataset import MESSAGE_WITH_LOCATION, LOCATION
msg = types.Message(**MESSAGE_WITH_LOCATION)
location = types.Location(**LOCATION)
# editing user's message
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.edit_message_live_location(
chat_id=msg.chat.id,
message_id=msg.message_id,
latitude=location.latitude,
longitude=location.longitude,
)
assert isinstance(result, bool) and result is True
@pytest.mark.asyncio
async def test_stop_message_live_location_by_bot(bot: Bot, event_loop):
""" stopMessageLiveLocation method test """
from .types.dataset import MESSAGE_WITH_LOCATION
msg = types.Message(**MESSAGE_WITH_LOCATION)
# stopping bot message
async with FakeTelegram(message_dict=MESSAGE_WITH_LOCATION, loop=event_loop):
result = await bot.stop_message_live_location(
chat_id=msg.chat.id, message_id=msg.message_id
)
assert result == msg
@pytest.mark.asyncio
async def test_stop_message_live_location_by_user(bot: Bot, event_loop):
""" stopMessageLiveLocation method test """
from .types.dataset import MESSAGE_WITH_LOCATION
msg = types.Message(**MESSAGE_WITH_LOCATION)
# stopping user's message
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.stop_message_live_location(
chat_id=msg.chat.id, message_id=msg.message_id
)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_send_venue(bot: Bot, event_loop):
""" sendVenue method test """
from .types.dataset import MESSAGE_WITH_VENUE, VENUE, LOCATION
msg = types.Message(**MESSAGE_WITH_VENUE)
location = types.Location(**LOCATION)
venue = types.Venue(**VENUE)
async with FakeTelegram(message_dict=MESSAGE_WITH_VENUE, loop=event_loop):
result = await bot.send_venue(
msg.chat.id,
latitude=location.latitude,
longitude=location.longitude,
title=venue.title,
address=venue.address,
foursquare_id=venue.foursquare_id,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_contact(bot: Bot, event_loop):
""" sendContact method test """
from .types.dataset import MESSAGE_WITH_CONTACT, CONTACT
msg = types.Message(**MESSAGE_WITH_CONTACT)
contact = types.Contact(**CONTACT)
async with FakeTelegram(message_dict=MESSAGE_WITH_CONTACT, loop=event_loop):
result = await bot.send_contact(
msg.chat.id,
phone_number=contact.phone_number,
first_name=contact.first_name,
last_name=contact.last_name,
disable_notification=False,
)
assert result == msg
@pytest.mark.asyncio
async def test_send_chat_action(bot: Bot, event_loop):
""" sendChatAction method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.send_chat_action(chat_id=chat.id, action=types.ChatActions.TYPING)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_get_user_profile_photo(bot: Bot, event_loop):
""" getUserProfilePhotos method test """
from .types.dataset import USER_PROFILE_PHOTOS, USER
user = types.User(**USER)
async with FakeTelegram(message_dict=USER_PROFILE_PHOTOS, loop=event_loop):
result = await bot.get_user_profile_photos(user_id=user.id, offset=1, limit=1)
assert isinstance(result, types.UserProfilePhotos)
@pytest.mark.asyncio
async def test_get_file(bot: Bot, event_loop):
""" getFile method test """
from .types.dataset import FILE
file = types.File(**FILE)
async with FakeTelegram(message_dict=FILE, loop=event_loop):
result = await bot.get_file(file_id=file.file_id)
assert isinstance(result, types.File)
@pytest.mark.asyncio
async def test_kick_chat_member(bot: Bot, event_loop):
""" kickChatMember method test """
from .types.dataset import USER, CHAT
user = types.User(**USER)
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.kick_chat_member(chat_id=chat.id, user_id=user.id, until_date=123)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_unban_chat_member(bot: Bot, event_loop):
""" unbanChatMember method test """
from .types.dataset import USER, CHAT
user = types.User(**USER)
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.unban_chat_member(chat_id=chat.id, user_id=user.id)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_restrict_chat_member(bot: Bot, event_loop):
""" restrictChatMember method test """
from .types.dataset import USER, CHAT
user = types.User(**USER)
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.restrict_chat_member(
chat_id=chat.id,
user_id=user.id,
permissions=types.ChatPermissions(
can_add_web_page_previews=False,
can_send_media_messages=False,
can_send_messages=False,
can_send_other_messages=False,
),
until_date=123,
)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_promote_chat_member(bot: Bot, event_loop):
""" promoteChatMember method test """
from .types.dataset import USER, CHAT
user = types.User(**USER)
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.promote_chat_member(
chat_id=chat.id,
user_id=user.id,
can_change_info=True,
can_delete_messages=True,
can_edit_messages=True,
can_invite_users=True,
can_pin_messages=True,
can_post_messages=True,
can_promote_members=True,
can_restrict_members=True,
)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_export_chat_invite_link(bot: Bot, event_loop):
""" exportChatInviteLink method test """
from .types.dataset import CHAT, INVITE_LINK
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=INVITE_LINK, loop=event_loop):
result = await bot.export_chat_invite_link(chat_id=chat.id)
assert result == INVITE_LINK
@pytest.mark.asyncio
async def test_delete_chat_photo(bot: Bot, event_loop):
""" deleteChatPhoto method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.delete_chat_photo(chat_id=chat.id)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_set_chat_title(bot: Bot, event_loop):
""" setChatTitle method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.set_chat_title(chat_id=chat.id, title="Test title")
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_set_chat_description(bot: Bot, event_loop):
""" setChatDescription method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.set_chat_description(chat_id=chat.id, description="Test description")
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_pin_chat_message(bot: Bot, event_loop):
""" pinChatMessage method test """
from .types.dataset import MESSAGE
message = types.Message(**MESSAGE)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.pin_chat_message(
chat_id=message.chat.id, message_id=message.message_id, disable_notification=False
)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_unpin_chat_message(bot: Bot, event_loop):
""" unpinChatMessage method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.unpin_chat_message(chat_id=chat.id)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_leave_chat(bot: Bot, event_loop):
""" leaveChat method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.leave_chat(chat_id=chat.id)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_get_chat(bot: Bot, event_loop):
""" getChat method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=CHAT, loop=event_loop):
result = await bot.get_chat(chat_id=chat.id)
assert result == chat
@pytest.mark.asyncio
async def test_get_chat_administrators(bot: Bot, event_loop):
""" getChatAdministrators method test """
from .types.dataset import CHAT, CHAT_MEMBER
chat = types.Chat(**CHAT)
member = types.ChatMember(**CHAT_MEMBER)
async with FakeTelegram(message_dict=[CHAT_MEMBER, CHAT_MEMBER], loop=event_loop):
result = await bot.get_chat_administrators(chat_id=chat.id)
assert result[0] == member
assert len(result) == 2
@pytest.mark.asyncio
async def test_get_chat_members_count(bot: Bot, event_loop):
""" getChatMembersCount method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
count = 5
async with FakeTelegram(message_dict=count, loop=event_loop):
result = await bot.get_chat_members_count(chat_id=chat.id)
assert result == count
@pytest.mark.asyncio
async def test_get_chat_member(bot: Bot, event_loop):
""" getChatMember method test """
from .types.dataset import CHAT, CHAT_MEMBER
chat = types.Chat(**CHAT)
member = types.ChatMember(**CHAT_MEMBER)
async with FakeTelegram(message_dict=CHAT_MEMBER, loop=event_loop):
result = await bot.get_chat_member(chat_id=chat.id, user_id=member.user.id)
assert isinstance(result, types.ChatMember)
assert result == member
@pytest.mark.asyncio
async def test_set_chat_sticker_set(bot: Bot, event_loop):
""" setChatStickerSet method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.set_chat_sticker_set(
chat_id=chat.id, sticker_set_name="aiogram_stickers"
)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_delete_chat_sticker_set(bot: Bot, event_loop):
""" setChatStickerSet method test """
from .types.dataset import CHAT
chat = types.Chat(**CHAT)
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.delete_chat_sticker_set(chat_id=chat.id)
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_answer_callback_query(bot: Bot, event_loop):
""" answerCallbackQuery method test """
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.answer_callback_query(callback_query_id="QuERyId", text="Test Answer")
assert isinstance(result, bool)
assert result is True
@pytest.mark.asyncio
async def test_edit_message_text_by_bot(bot: Bot, event_loop):
""" editMessageText method test """
from .types.dataset import EDITED_MESSAGE
msg = types.Message(**EDITED_MESSAGE)
# message by bot
async with FakeTelegram(message_dict=EDITED_MESSAGE, loop=event_loop):
result = await bot.edit_message_text(
text=msg.text, chat_id=msg.chat.id, message_id=msg.message_id
)
assert result == msg
@pytest.mark.asyncio
async def test_edit_message_text_by_user(bot: Bot, event_loop):
""" editMessageText method test """
from .types.dataset import EDITED_MESSAGE
msg = types.Message(**EDITED_MESSAGE)
# message by user
async with FakeTelegram(message_dict=True, loop=event_loop):
result = await bot.edit_message_text(
text=msg.text, chat_id=msg.chat.id, message_id=msg.message_id
)
assert isinstance(result, bool)
assert result is True

View file

@ -1,16 +0,0 @@
import pytest
from aiogram.bot.api import check_token
from aiogram.utils.exceptions import ValidationError
VALID_TOKEN = "123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
INVALID_TOKEN = "123456789:AABBCCDDEEFFaabbccddeeff 123456789" # Space in token and wrong length
class Test_check_token:
def test_valid(self):
assert check_token(VALID_TOKEN) is True
def test_invalid_token(self):
with pytest.raises(ValidationError):
check_token(INVALID_TOKEN)

View file

@ -1,34 +0,0 @@
import pytest
from aiogram import Bot, Dispatcher
pytestmark = pytest.mark.asyncio
@pytest.yield_fixture()
async def bot(event_loop):
""" Bot fixture """
_bot = Bot(token="123456789:AABBCCDDEEFFaabbccddeeff-1234567890", loop=event_loop)
yield _bot
await _bot.close()
class TestDispatcherInit:
async def test_successful_init(self, bot):
"""
Success __init__ case
:param bot: bot instance
:type bot: Bot
"""
dp = Dispatcher(bot=bot)
assert isinstance(dp, Dispatcher)
@pytest.mark.parametrize("bot_instance", [None, Bot, 123, "abc"])
async def test_wrong_bot_instance(self, bot_instance):
"""
User provides wrong data to 'bot' argument.
:return: TypeError with reason
"""
with pytest.raises(TypeError):
_ = Dispatcher(bot=bot_instance)

View file

@ -1,20 +0,0 @@
import pytest
from aiogram.dispatcher.filters.builtin import Text
class TestText:
@pytest.mark.parametrize(
"param, key",
[
("text", "equals"),
("text_contains", "contains"),
("text_startswith", "startswith"),
("text_endswith", "endswith"),
],
)
def test_validate(self, param, key):
value = "spam and eggs"
config = {param: value}
res = Text.validate(config)
assert res == {key: value}

View file

@ -1,17 +0,0 @@
from aiogram.dispatcher.filters.state import StatesGroup
class TestStatesGroup:
def test_all_childs(self):
class InnerState1(StatesGroup):
pass
class InnerState2(InnerState1):
pass
class Form(StatesGroup):
inner1 = InnerState1
inner2 = InnerState2
form_childs = Form.all_childs
assert form_childs == (InnerState1, InnerState2)

View file

@ -1,264 +0,0 @@
import pytest
from aiogram.dispatcher.filters import Text
from aiogram.types import CallbackQuery, InlineQuery, Message, Poll
def data_sample_1():
return [
("", ""),
("", "exAmple_string"),
("example_string", "example_string"),
("example_string", "exAmple_string"),
("exAmple_string", "example_string"),
("example_string", "example_string_dsf"),
("example_string", "example_striNG_dsf"),
("example_striNG", "example_string_dsf"),
("example_string", "not_example_string"),
("example_string", "not_eXample_string"),
("EXample_string", "not_example_string"),
]
class TestTextFilter:
async def _run_check(self, check, test_text):
assert await check(Message(text=test_text))
assert await check(CallbackQuery(data=test_text))
assert await check(InlineQuery(query=test_text))
assert await check(Poll(question=test_text))
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize("test_prefix, test_text", data_sample_1())
async def test_startswith(self, test_prefix, test_text, ignore_case):
test_filter = Text(startswith=test_prefix, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_prefix = test_prefix.lower()
_test_text = test_text.lower()
else:
_test_prefix = test_prefix
_test_text = test_text
return result is _test_text.startswith(_test_prefix)
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_prefix_list, test_text",
[
(["not_example", ""], ""),
(["", "not_example"], "exAmple_string"),
(["not_example", "example_string"], "example_string"),
(["example_string", "not_example"], "exAmple_string"),
(["not_example", "exAmple_string"], "example_string"),
(["not_example", "example_string"], "example_string_dsf"),
(["example_string", "not_example"], "example_striNG_dsf"),
(["not_example", "example_striNG"], "example_string_dsf"),
(["not_example", "example_string"], "not_example_string"),
(["example_string", "not_example"], "not_eXample_string"),
(["not_example", "EXample_string"], "not_example_string"),
],
)
async def test_startswith_list(self, test_prefix_list, test_text, ignore_case):
test_filter = Text(startswith=test_prefix_list, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_prefix_list = map(str.lower, test_prefix_list)
_test_text = test_text.lower()
else:
_test_prefix_list = test_prefix_list
_test_text = test_text
return result is any(map(_test_text.startswith, _test_prefix_list))
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize("test_postfix, test_text", data_sample_1())
async def test_endswith(self, test_postfix, test_text, ignore_case):
test_filter = Text(endswith=test_postfix, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_postfix = test_postfix.lower()
_test_text = test_text.lower()
else:
_test_postfix = test_postfix
_test_text = test_text
return result is _test_text.endswith(_test_postfix)
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_postfix_list, test_text",
[
(["", "not_example"], ""),
(["not_example", ""], "exAmple_string"),
(["example_string", "not_example"], "example_string"),
(["not_example", "example_string"], "exAmple_string"),
(["exAmple_string", "not_example"], "example_string"),
(["not_example", "example_string"], "example_string_dsf"),
(["example_string", "not_example"], "example_striNG_dsf"),
(["not_example", "example_striNG"], "example_string_dsf"),
(["not_example", "example_string"], "not_example_string"),
(["example_string", "not_example"], "not_eXample_string"),
(["not_example", "EXample_string"], "not_example_string"),
],
)
async def test_endswith_list(self, test_postfix_list, test_text, ignore_case):
test_filter = Text(endswith=test_postfix_list, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_postfix_list = map(str.lower, test_postfix_list)
_test_text = test_text.lower()
else:
_test_postfix_list = test_postfix_list
_test_text = test_text
return result is any(map(_test_text.endswith, _test_postfix_list))
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_string, test_text",
[
("", ""),
("", "exAmple_string"),
("example_string", "example_string"),
("example_string", "exAmple_string"),
("exAmple_string", "example_string"),
("example_string", "example_string_dsf"),
("example_string", "example_striNG_dsf"),
("example_striNG", "example_string_dsf"),
("example_string", "not_example_strin"),
("example_string", "not_eXample_strin"),
("EXample_string", "not_example_strin"),
],
)
async def test_contains(self, test_string, test_text, ignore_case):
test_filter = Text(contains=test_string, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_string = test_string.lower()
_test_text = test_text.lower()
else:
_test_string = test_string
_test_text = test_text
return result is (_test_string in _test_text)
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_filter_list, test_text",
[
(["a", "ab", "abc"], "A"),
(["a", "ab", "abc"], "ab"),
(["a", "ab", "abc"], "aBc"),
(["a", "ab", "abc"], "d"),
],
)
async def test_contains_list(self, test_filter_list, test_text, ignore_case):
test_filter = Text(contains=test_filter_list, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_filter_list = list(map(str.lower, test_filter_list))
_test_text = test_text.lower()
else:
_test_filter_list = test_filter_list
_test_text = test_text
return result is all(map(_test_text.__contains__, _test_filter_list))
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_filter_text, test_text",
[
("", ""),
("", "exAmple_string"),
("example_string", "example_string"),
("example_string", "exAmple_string"),
("exAmple_string", "example_string"),
("example_string", "not_example_string"),
("example_string", "not_eXample_string"),
("EXample_string", "not_example_string"),
],
)
async def test_equals_string(self, test_filter_text, test_text, ignore_case):
test_filter = Text(equals=test_filter_text, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_filter_text = test_filter_text.lower()
_test_text = test_text.lower()
else:
_test_filter_text = test_filter_text
_test_text = test_text
return result is (_test_text == _test_filter_text)
await self._run_check(check, test_text)
@pytest.mark.asyncio
@pytest.mark.parametrize("ignore_case", (True, False))
@pytest.mark.parametrize(
"test_filter_list, test_text",
[
(["new_string", ""], ""),
(["", "new_string"], "exAmple_string"),
(["example_string"], "example_string"),
(["example_string"], "exAmple_string"),
(["exAmple_string"], "example_string"),
(["example_string"], "not_example_string"),
(["example_string"], "not_eXample_string"),
(["EXample_string"], "not_example_string"),
(["example_string", "new_string"], "example_string"),
(["new_string", "example_string"], "exAmple_string"),
(["exAmple_string", "new_string"], "example_string"),
(["example_string", "new_string"], "not_example_string"),
(["new_string", "example_string"], "not_eXample_string"),
(["EXample_string", "new_string"], "not_example_string"),
],
)
async def test_equals_list(self, test_filter_list, test_text, ignore_case):
test_filter = Text(equals=test_filter_list, ignore_case=ignore_case)
async def check(obj):
result = await test_filter.check(obj)
if ignore_case:
_test_filter_list = list(map(str.lower, test_filter_list))
_test_text = test_text.lower()
else:
_test_filter_list = test_filter_list
_test_text = test_text
assert result is (_test_text in _test_filter_list)
await check(Message(text=test_text))
await check(CallbackQuery(data=test_text))
await check(InlineQuery(query=test_text))
await check(Poll(question=test_text))

View file

@ -1,49 +0,0 @@
from asyncio import BaseEventLoop
import pytest
from aiogram import Bot, types
from . import TOKEN, FakeTelegram
pytestmark = pytest.mark.asyncio
@pytest.yield_fixture()
async def bot(event_loop):
""" Bot fixture """
_bot = Bot(TOKEN, loop=event_loop, parse_mode=types.ParseMode.HTML)
yield _bot
await _bot.close()
@pytest.yield_fixture()
async def message(bot, event_loop):
"""
Message fixture
:param bot: Telegram bot fixture
:type bot: Bot
:param event_loop: asyncio event loop
:type event_loop: BaseEventLoop
"""
from .types.dataset import MESSAGE
msg = types.Message(**MESSAGE)
async with FakeTelegram(message_dict=MESSAGE, loop=event_loop):
_message = await bot.send_message(chat_id=msg.chat.id, text=msg.text)
yield _message
class TestMiscCases:
async def test_calling_bot_not_from_context(self, message):
"""
Calling any helper method without bot instance in context.
:param message: message fixture
:type message: types.Message
:return: RuntimeError with reason and help
"""
with pytest.raises(RuntimeError):
await message.edit_text("test_calling_bot_not_from_context")

10
tests/test_nothing.py Normal file
View file

@ -0,0 +1,10 @@
import asyncio
import pytest
class TestNothing:
@pytest.mark.asyncio
async def test_nothing(self):
result = await asyncio.sleep(1, result=42)
assert result == 42

View file

@ -1,114 +0,0 @@
import pytest
from aiogram.dispatcher.filters.state import State, StatesGroup, any_state, default_state
class MyGroup(StatesGroup):
state = State()
state_1 = State()
state_2 = State()
class MySubGroup(StatesGroup):
sub_state = State()
sub_state_1 = State()
sub_state_2 = State()
in_custom_group = State(group_name="custom_group")
class NewGroup(StatesGroup):
spam = State()
renamed_state = State(state="spam_state")
alone_state = State("alone")
alone_in_group = State("alone", group_name="home")
def test_default_state():
assert default_state.state is None
def test_any_state():
assert any_state.state == "*"
def test_alone_state():
assert alone_state.state == "@:alone"
assert alone_in_group.state == "home:alone"
def test_group_names():
assert MyGroup.__group_name__ == "MyGroup"
assert MyGroup.__full_group_name__ == "MyGroup"
assert MyGroup.MySubGroup.__group_name__ == "MySubGroup"
assert MyGroup.MySubGroup.__full_group_name__ == "MyGroup.MySubGroup"
assert MyGroup.MySubGroup.NewGroup.__group_name__ == "NewGroup"
assert MyGroup.MySubGroup.NewGroup.__full_group_name__ == "MyGroup.MySubGroup.NewGroup"
def test_custom_group_in_group():
assert MyGroup.MySubGroup.in_custom_group.state == "custom_group:in_custom_group"
def test_custom_state_name_in_group():
assert (
MyGroup.MySubGroup.NewGroup.renamed_state.state == "MyGroup.MySubGroup.NewGroup:spam_state"
)
def test_group_states_names():
assert len(MyGroup.states) == 3
assert len(MyGroup.all_states) == 9
assert MyGroup.states_names == ("MyGroup:state", "MyGroup:state_1", "MyGroup:state_2")
assert MyGroup.MySubGroup.states_names == (
"MyGroup.MySubGroup:sub_state",
"MyGroup.MySubGroup:sub_state_1",
"MyGroup.MySubGroup:sub_state_2",
"custom_group:in_custom_group",
)
assert MyGroup.MySubGroup.NewGroup.states_names == (
"MyGroup.MySubGroup.NewGroup:spam",
"MyGroup.MySubGroup.NewGroup:spam_state",
)
assert MyGroup.all_states_names == (
"MyGroup:state",
"MyGroup:state_1",
"MyGroup:state_2",
"MyGroup.MySubGroup:sub_state",
"MyGroup.MySubGroup:sub_state_1",
"MyGroup.MySubGroup:sub_state_2",
"custom_group:in_custom_group",
"MyGroup.MySubGroup.NewGroup:spam",
"MyGroup.MySubGroup.NewGroup:spam_state",
)
assert MyGroup.MySubGroup.all_states_names == (
"MyGroup.MySubGroup:sub_state",
"MyGroup.MySubGroup:sub_state_1",
"MyGroup.MySubGroup:sub_state_2",
"custom_group:in_custom_group",
"MyGroup.MySubGroup.NewGroup:spam",
"MyGroup.MySubGroup.NewGroup:spam_state",
)
assert MyGroup.MySubGroup.NewGroup.all_states_names == (
"MyGroup.MySubGroup.NewGroup:spam",
"MyGroup.MySubGroup.NewGroup:spam_state",
)
def test_root_element():
root = MyGroup.MySubGroup.NewGroup.spam.get_root()
assert issubclass(root, StatesGroup)
assert root == MyGroup
assert root == MyGroup.state.get_root()
assert root == MyGroup.MySubGroup.get_root()
with pytest.raises(RuntimeError):
any_state.get_root()

View file

@ -1,45 +0,0 @@
import pytest
from aiogram.utils.auth_widget import check_integrity, check_token, generate_hash
TOKEN = "123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11"
@pytest.fixture
def data():
return {
"id": "42",
"first_name": "John",
"last_name": "Smith",
"username": "username",
"photo_url": "https://t.me/i/userpic/320/picname.jpg",
"auth_date": "1565810688",
"hash": "c303db2b5a06fe41d23a9b14f7c545cfc11dcc7473c07c9c5034ae60062461ce",
}
def test_generate_hash(data):
res = generate_hash(data, TOKEN)
assert res == data["hash"]
class Test_check_token:
"""
This case gonna be deleted
"""
def test_ok(self, data):
assert check_token(data, TOKEN) is True
def test_fail(self, data):
data.pop("username")
assert check_token(data, TOKEN) is False
class Test_check_integrity:
def test_ok(self, data):
assert check_integrity(TOKEN, data) is True
def test_fail(self, data):
data.pop("username")
assert check_integrity(TOKEN, data) is False

View file

@ -1,21 +0,0 @@
from aiogram.utils.helper import Item, ListItem, OrderedHelper
class TestOrderedHelper:
def test_items_are_ordered(self):
class Helper(OrderedHelper):
A = Item()
D = Item()
C = Item()
B = Item()
assert Helper.all() == ["A", "D", "C", "B"]
def test_list_items_are_ordered(self):
class Helper(OrderedHelper):
A = ListItem()
D = ListItem()
C = ListItem()
B = ListItem()
assert Helper.all() == ["A", "D", "C", "B"]

View file

@ -1,402 +0,0 @@
""""
Dict data set for Telegram message types
"""
USER = {
"id": 12345678,
"is_bot": False,
"first_name": "FirstName",
"last_name": "LastName",
"username": "username",
"language_code": "ru",
}
CHAT = {
"id": 12345678,
"first_name": "FirstName",
"last_name": "LastName",
"username": "username",
"type": "private",
}
PHOTO = {
"file_id": "AgADBAADFak0G88YZAf8OAug7bHyS9x2ZxkABHVfpJywcloRAAGAAQABAg",
"file_size": 1101,
"width": 90,
"height": 51,
}
AUDIO = {
"duration": 236,
"mime_type": "audio/mpeg3",
"title": "The Best Song",
"performer": "The Best Singer",
"file_id": "CQADAgADbQEAAsnrIUpNoRRNsH7_hAI",
"file_size": 9507774,
}
CHAT_MEMBER = {
"user": USER,
"status": "administrator",
"can_be_edited": False,
"can_change_info": True,
"can_delete_messages": True,
"can_invite_users": True,
"can_restrict_members": True,
"can_pin_messages": True,
"can_promote_members": False,
}
CONTACT = {"phone_number": "88005553535", "first_name": "John", "last_name": "Smith"}
DOCUMENT = {
"file_name": "test.docx",
"mime_type": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"file_id": "BQADAgADpgADy_JxS66XQTBRHFleAg",
"file_size": 21331,
}
ANIMATION = {
"file_name": "a9b0e0ca537aa344338f80978f0896b7.gif.mp4",
"mime_type": "video/mp4",
"thumb": PHOTO,
"file_id": "CgADBAAD4DUAAoceZAe2WiE9y0crrAI",
"file_size": 65837,
}
ENTITY_BOLD = {"offset": 5, "length": 2, "type": "bold"}
ENTITY_ITALIC = {"offset": 8, "length": 1, "type": "italic"}
ENTITY_LINK = {"offset": 10, "length": 6, "type": "text_link", "url": "http://google.com/"}
ENTITY_CODE = {"offset": 17, "length": 7, "type": "code"}
ENTITY_PRE = {"offset": 30, "length": 4, "type": "pre"}
ENTITY_MENTION = {"offset": 47, "length": 9, "type": "mention"}
GAME = {
"title": "Karate Kido",
"description": "No trees were harmed in the making of this game :)",
"photo": [PHOTO, PHOTO, PHOTO],
"animation": ANIMATION,
}
INVOICE = {
"title": "Working Time Machine",
"description": "Want to visit your great-great-great-grandparents? "
"Make a fortune at the races? "
"Shake hands with Hammurabi and take a stroll in the Hanging Gardens? "
"Order our Working Time Machine today!",
"start_parameter": "time-machine-example",
"currency": "USD",
"total_amount": 6250,
}
LOCATION = {"latitude": 50.693416, "longitude": 30.624605}
VENUE = {
"location": LOCATION,
"title": "Venue Name",
"address": "Venue Address",
"foursquare_id": "4e6f2cec483bad563d150f98",
}
SHIPPING_ADDRESS = {
"country_code": "US",
"state": "State",
"city": "DefaultCity",
"street_line1": "Central",
"street_line2": "Middle",
"post_code": "424242",
}
STICKER = {
"width": 512,
"height": 512,
"emoji": "🛠",
"set_name": "StickerSet",
"thumb": {
"file_id": "AAbbCCddEEffGGhh1234567890",
"file_size": 1234,
"width": 128,
"height": 128,
},
"file_id": "AAbbCCddEEffGGhh1234567890",
"file_size": 12345,
}
SUCCESSFUL_PAYMENT = {
"currency": "USD",
"total_amount": 6250,
"invoice_payload": "HAPPY FRIDAYS COUPON",
"telegram_payment_charge_id": "_",
"provider_payment_charge_id": "12345678901234_test",
}
VIDEO = {
"duration": 52,
"width": 853,
"height": 480,
"mime_type": "video/quicktime",
"thumb": PHOTO,
"file_id": "BAADAgpAADdawy_JxS72kRvV3cortAg",
"file_size": 10099782,
}
VIDEO_NOTE = {
"duration": 4,
"length": 240,
"thumb": PHOTO,
"file_id": "AbCdEfGhIjKlMnOpQrStUvWxYz",
"file_size": 186562,
}
VOICE = {
"duration": 1,
"mime_type": "audio/ogg",
"file_id": "AwADawAgADADy_JxS2gopIVIIxlhAg",
"file_size": 4321,
}
CALLBACK_QUERY = {}
CHANNEL_POST = {}
CHOSEN_INLINE_RESULT = {}
EDITED_CHANNEL_POST = {}
EDITED_MESSAGE = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508825372,
"edit_date": 1508825379,
"text": "hi there (edited)",
}
FORWARDED_MESSAGE = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1522828529,
"forward_from_chat": CHAT,
"forward_from_message_id": 123,
"forward_date": 1522749037,
"text": "Forwarded text with entities from public channel ",
"entities": [
ENTITY_BOLD,
ENTITY_CODE,
ENTITY_ITALIC,
ENTITY_LINK,
ENTITY_LINK,
ENTITY_MENTION,
ENTITY_PRE,
],
}
INLINE_QUERY = {}
MESSAGE = {
"message_id": 11223,
"from": USER,
"chat": CHAT,
"date": 1508709711,
"text": "Hi, world!",
}
MESSAGE_WITH_AUDIO = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508739776,
"audio": AUDIO,
"caption": "This is my favourite song",
}
MESSAGE_WITH_AUTHOR_SIGNATURE = {}
MESSAGE_WITH_CHANNEL_CHAT_CREATED = {}
MESSAGE_WITH_CONTACT = {
"message_id": 56006,
"from": USER,
"chat": CHAT,
"date": 1522850298,
"contact": CONTACT,
}
MESSAGE_WITH_DELETE_CHAT_PHOTO = {}
MESSAGE_WITH_DOCUMENT = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508768012,
"document": DOCUMENT,
"caption": "Read my document",
}
MESSAGE_WITH_EDIT_DATE = {}
MESSAGE_WITH_ENTITIES = {}
MESSAGE_WITH_GAME = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508824810,
"game": GAME,
}
MESSAGE_WITH_GROUP_CHAT_CREATED = {}
MESSAGE_WITH_INVOICE = {
"message_id": 9772,
"from": USER,
"chat": CHAT,
"date": 1508761719,
"invoice": INVOICE,
}
MESSAGE_WITH_LEFT_CHAT_MEMBER = {}
MESSAGE_WITH_LOCATION = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508755473,
"location": LOCATION,
}
MESSAGE_WITH_MIGRATE_TO_CHAT_ID = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1526943253,
"migrate_to_chat_id": -1234567890987,
}
MESSAGE_WITH_MIGRATE_FROM_CHAT_ID = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1526943253,
"migrate_from_chat_id": -123456789,
}
MESSAGE_WITH_NEW_CHAT_MEMBERS = {}
MESSAGE_WITH_NEW_CHAT_PHOTO = {}
MESSAGE_WITH_NEW_CHAT_TITLE = {}
MESSAGE_WITH_PHOTO = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508825154,
"photo": [PHOTO, PHOTO, PHOTO, PHOTO],
"caption": "photo description",
}
MESSAGE_WITH_MEDIA_GROUP = {
"message_id": 55966,
"from": USER,
"chat": CHAT,
"date": 1522843665,
"media_group_id": "12182749320567362",
"photo": [PHOTO, PHOTO, PHOTO, PHOTO],
}
MESSAGE_WITH_PINNED_MESSAGE = {}
MESSAGE_WITH_REPLY_TO_MESSAGE = {}
MESSAGE_WITH_STICKER = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508771450,
"sticker": STICKER,
}
MESSAGE_WITH_SUCCESSFUL_PAYMENT = {
"message_id": 9768,
"from": USER,
"chat": CHAT,
"date": 1508761169,
"successful_payment": SUCCESSFUL_PAYMENT,
}
MESSAGE_WITH_SUPERGROUP_CHAT_CREATED = {}
MESSAGE_WITH_VENUE = {
"message_id": 56004,
"from": USER,
"chat": CHAT,
"date": 1522849819,
"location": LOCATION,
"venue": VENUE,
}
MESSAGE_WITH_VIDEO = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508756494,
"video": VIDEO,
"caption": "description",
}
MESSAGE_WITH_VIDEO_NOTE = {
"message_id": 55934,
"from": USER,
"chat": CHAT,
"date": 1522835890,
"video_note": VIDEO_NOTE,
}
MESSAGE_WITH_VOICE = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508768403,
"voice": VOICE,
}
PRE_CHECKOUT_QUERY = {
"id": "262181558630368727",
"from": USER,
"currency": "USD",
"total_amount": 6250,
"invoice_payload": "HAPPY FRIDAYS COUPON",
}
REPLY_MESSAGE = {
"message_id": 12345,
"from": USER,
"chat": CHAT,
"date": 1508751866,
"reply_to_message": MESSAGE,
"text": "Reply to quoted message",
}
SHIPPING_QUERY = {
"id": "262181558684397422",
"from": USER,
"invoice_payload": "HAPPY FRIDAYS COUPON",
"shipping_address": SHIPPING_ADDRESS,
}
USER_PROFILE_PHOTOS = {"total_count": 1, "photos": [[PHOTO, PHOTO, PHOTO]]}
FILE = {"file_id": "XXXYYYZZZ", "file_size": 5254, "file_path": "voice/file_8"}
INVITE_LINK = "https://t.me/joinchat/AbCdEfjKILDADwdd123"
UPDATE = {"update_id": 123456789, "message": MESSAGE}
WEBHOOK_INFO = {"url": "", "has_custom_certificate": False, "pending_update_count": 0}

View file

@ -1,40 +0,0 @@
from aiogram import types
from .dataset import ANIMATION
animation = types.Animation(**ANIMATION)
def test_export():
exported = animation.to_python()
assert isinstance(exported, dict)
assert exported == ANIMATION
def test_file_name():
assert isinstance(animation.file_name, str)
assert animation.file_name == ANIMATION["file_name"]
def test_mime_type():
assert isinstance(animation.mime_type, str)
assert animation.mime_type == ANIMATION["mime_type"]
def test_file_id():
assert isinstance(animation.file_id, str)
# assert hash(animation) == ANIMATION['file_id']
assert animation.file_id == ANIMATION["file_id"]
def test_file_size():
assert isinstance(animation.file_size, int)
assert animation.file_size == ANIMATION["file_size"]
def test_thumb():
assert isinstance(animation.thumb, types.PhotoSize)
assert animation.thumb.file_id == ANIMATION["thumb"]["file_id"]
assert animation.thumb.width == ANIMATION["thumb"]["width"]
assert animation.thumb.height == ANIMATION["thumb"]["height"]
assert animation.thumb.file_size == ANIMATION["thumb"]["file_size"]

View file

@ -1,63 +0,0 @@
from aiogram import types
from .dataset import CHAT
chat = types.Chat(**CHAT)
def test_export():
exported = chat.to_python()
assert isinstance(exported, dict)
assert exported == CHAT
def test_id():
assert isinstance(chat.id, int)
assert chat.id == CHAT["id"]
# assert hash(chat) == CHAT['id']
def test_name():
assert isinstance(chat.first_name, str)
assert chat.first_name == CHAT["first_name"]
assert isinstance(chat.last_name, str)
assert chat.last_name == CHAT["last_name"]
assert isinstance(chat.username, str)
assert chat.username == CHAT["username"]
def test_type():
assert isinstance(chat.type, str)
assert chat.type == CHAT["type"]
def test_chat_types():
assert types.ChatType.PRIVATE == "private"
assert types.ChatType.GROUP == "group"
assert types.ChatType.SUPER_GROUP == "supergroup"
assert types.ChatType.CHANNEL == "channel"
def test_chat_type_filters():
from . import test_message
assert types.ChatType.is_private(test_message.message)
assert not types.ChatType.is_group(test_message.message)
assert not types.ChatType.is_super_group(test_message.message)
assert not types.ChatType.is_group_or_super_group(test_message.message)
assert not types.ChatType.is_channel(test_message.message)
def test_chat_actions():
assert types.ChatActions.TYPING == "typing"
assert types.ChatActions.UPLOAD_PHOTO == "upload_photo"
assert types.ChatActions.RECORD_VIDEO == "record_video"
assert types.ChatActions.UPLOAD_VIDEO == "upload_video"
assert types.ChatActions.RECORD_AUDIO == "record_audio"
assert types.ChatActions.UPLOAD_AUDIO == "upload_audio"
assert types.ChatActions.UPLOAD_DOCUMENT == "upload_document"
assert types.ChatActions.FIND_LOCATION == "find_location"
assert types.ChatActions.RECORD_VIDEO_NOTE == "record_video_note"
assert types.ChatActions.UPLOAD_VIDEO_NOTE == "upload_video_note"

View file

@ -1,78 +0,0 @@
from aiogram import types
from .dataset import CHAT_MEMBER
chat_member = types.ChatMember(**CHAT_MEMBER)
def test_export():
exported = chat_member.to_python()
assert isinstance(exported, dict)
assert exported == CHAT_MEMBER
def test_user():
assert isinstance(chat_member.user, types.User)
def test_status():
assert isinstance(chat_member.status, str)
assert chat_member.status == CHAT_MEMBER["status"]
def test_privileges():
assert isinstance(chat_member.can_be_edited, bool)
assert chat_member.can_be_edited == CHAT_MEMBER["can_be_edited"]
assert isinstance(chat_member.can_change_info, bool)
assert chat_member.can_change_info == CHAT_MEMBER["can_change_info"]
assert isinstance(chat_member.can_delete_messages, bool)
assert chat_member.can_delete_messages == CHAT_MEMBER["can_delete_messages"]
assert isinstance(chat_member.can_invite_users, bool)
assert chat_member.can_invite_users == CHAT_MEMBER["can_invite_users"]
assert isinstance(chat_member.can_restrict_members, bool)
assert chat_member.can_restrict_members == CHAT_MEMBER["can_restrict_members"]
assert isinstance(chat_member.can_pin_messages, bool)
assert chat_member.can_pin_messages == CHAT_MEMBER["can_pin_messages"]
assert isinstance(chat_member.can_promote_members, bool)
assert chat_member.can_promote_members == CHAT_MEMBER["can_promote_members"]
def test_int():
assert int(chat_member) == chat_member.user.id
assert isinstance(int(chat_member), int)
def test_chat_member_status():
assert types.ChatMemberStatus.CREATOR == "creator"
assert types.ChatMemberStatus.ADMINISTRATOR == "administrator"
assert types.ChatMemberStatus.MEMBER == "member"
assert types.ChatMemberStatus.RESTRICTED == "restricted"
assert types.ChatMemberStatus.LEFT == "left"
assert types.ChatMemberStatus.KICKED == "kicked"
def test_chat_member_status_filters():
assert types.ChatMemberStatus.is_chat_admin(chat_member.status)
assert types.ChatMemberStatus.is_chat_member(chat_member.status)
assert types.ChatMemberStatus.is_chat_admin(types.ChatMemberStatus.CREATOR)
assert types.ChatMemberStatus.is_chat_admin(types.ChatMemberStatus.ADMINISTRATOR)
assert types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.CREATOR)
assert types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.ADMINISTRATOR)
assert types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.MEMBER)
assert types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.RESTRICTED)
assert not types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.LEFT)
assert not types.ChatMemberStatus.is_chat_member(types.ChatMemberStatus.KICKED)
def test_chat_member_filters():
assert chat_member.is_chat_admin()
assert chat_member.is_chat_member()

View file

@ -1,36 +0,0 @@
from aiogram import types
from .dataset import DOCUMENT
document = types.Document(**DOCUMENT)
def test_export():
exported = document.to_python()
assert isinstance(exported, dict)
assert exported == DOCUMENT
def test_file_name():
assert isinstance(document.file_name, str)
assert document.file_name == DOCUMENT["file_name"]
def test_mime_type():
assert isinstance(document.mime_type, str)
assert document.mime_type == DOCUMENT["mime_type"]
def test_file_id():
assert isinstance(document.file_id, str)
# assert hash(document) == DOCUMENT['file_id']
assert document.file_id == DOCUMENT["file_id"]
def test_file_size():
assert isinstance(document.file_size, int)
assert document.file_size == DOCUMENT["file_size"]
def test_thumb():
assert document.thumb is None

View file

@ -1,31 +0,0 @@
from aiogram import types
from .dataset import GAME
game = types.Game(**GAME)
def test_export():
exported = game.to_python()
assert isinstance(exported, dict)
assert exported == GAME
def test_title():
assert isinstance(game.title, str)
assert game.title == GAME["title"]
def test_description():
assert isinstance(game.description, str)
assert game.description == GAME["description"]
def test_photo():
assert isinstance(game.photo, list)
assert len(game.photo) == len(GAME["photo"])
assert all(map(lambda t: isinstance(t, types.PhotoSize), game.photo))
def test_animation():
assert isinstance(game.animation, types.Animation)

View file

@ -1,40 +0,0 @@
import datetime
from aiogram import types
from .dataset import MESSAGE
message = types.Message(**MESSAGE)
def test_export():
exported_chat = message.to_python()
assert isinstance(exported_chat, dict)
assert exported_chat == MESSAGE
def test_message_id():
# assert hash(message) == MESSAGE['message_id']
assert message.message_id == MESSAGE["message_id"]
assert message["message_id"] == MESSAGE["message_id"]
def test_from():
assert isinstance(message.from_user, types.User)
assert message.from_user == message["from"]
def test_chat():
assert isinstance(message.chat, types.Chat)
assert message.chat == message["chat"]
def test_date():
assert isinstance(message.date, datetime.datetime)
assert int(message.date.timestamp()) == MESSAGE["date"]
assert message.date == message["date"]
def test_text():
assert message.text == MESSAGE["text"]
assert message["text"] == MESSAGE["text"]

View file

@ -1,28 +0,0 @@
from aiogram import types
from .dataset import PHOTO
photo = types.PhotoSize(**PHOTO)
def test_export():
exported = photo.to_python()
assert isinstance(exported, dict)
assert exported == PHOTO
def test_file_id():
assert isinstance(photo.file_id, str)
assert photo.file_id == PHOTO["file_id"]
def test_file_size():
assert isinstance(photo.file_size, int)
assert photo.file_size == PHOTO["file_size"]
def test_size():
assert isinstance(photo.width, int)
assert isinstance(photo.height, int)
assert photo.width == PHOTO["width"]
assert photo.height == PHOTO["height"]

View file

@ -1,21 +0,0 @@
from aiogram import types
from .dataset import UPDATE
update = types.Update(**UPDATE)
def test_export():
exported = update.to_python()
assert isinstance(exported, dict)
assert exported == UPDATE
def test_update_id():
assert isinstance(update.update_id, int)
# assert hash(update) == UPDATE['update_id']
assert update.update_id == UPDATE["update_id"]
def test_message():
assert isinstance(update.message, types.Message)

View file

@ -1,51 +0,0 @@
from babel import Locale
from aiogram import types
from .dataset import USER
user = types.User(**USER)
def test_export():
exported = user.to_python()
assert isinstance(exported, dict)
assert exported == USER
def test_id():
assert isinstance(user.id, int)
assert user.id == USER["id"]
# assert hash(user) == USER['id']
def test_bot():
assert isinstance(user.is_bot, bool)
assert user.is_bot == USER["is_bot"]
def test_name():
assert user.first_name == USER["first_name"]
assert user.last_name == USER["last_name"]
assert user.username == USER["username"]
def test_language_code():
assert user.language_code == USER["language_code"]
assert user.locale == Locale.parse(USER["language_code"], sep="-")
def test_full_name():
assert user.full_name == f"{USER['first_name']} {USER['last_name']}"
def test_mention():
assert user.mention == f"@{USER['username']}"
assert user.get_mention("foo", as_html=False) == f"[foo](tg://user?id={USER['id']})"
assert (
user.get_mention("foo", as_html=True) == f"<a href=\"tg://user?id={USER['id']}\">foo</a>"
)
def test_url():
assert user.url == f"tg://user?id={USER['id']}"