Implemented features from not announced Telegram Bot API update: sendMediaGroup + InputMedia* + other small changes

This commit is contained in:
Alex Root Junior 2017-11-17 23:01:17 +02:00
parent 7025542e3c
commit f11775fcef
5 changed files with 200 additions and 0 deletions

View file

@ -186,6 +186,7 @@ class Methods(Helper):
SEND_VIDEO = Item() # sendVideo
SEND_VOICE = Item() # sendVoice
SEND_VIDEO_NOTE = Item() # sendVideoNote
SEND_MEDIA_GROUP = Item() # sendMediaGroup
SEND_LOCATION = Item() # sendLocation
EDIT_MESSAGE_LIVE_LOCATION = Item() # editMessageLiveLocation
STOP_MESSAGE_LIVE_LOCATION = Item() # stopMessageLiveLocation

View file

@ -475,6 +475,40 @@ class Bot(BaseBot):
return types.Message(**result)
async def send_media_group(self, chat_id: typing.Union[base.Integer, base.String],
media: typing.Union[types.MediaGroup, typing.List],
disable_notification: typing.Union[base.Boolean, None] = None,
reply_to_message_id: typing.Union[base.Integer,
None] = None) -> typing.List[types.Message]:
"""
Use this method to send a group of photos or videos as an album.
Source: https://core.telegram.org/bots/api#sendmediagroup
:param chat_id: Unique identifier for the target chat or username of the target channel
:type chat_id: :obj:`typing.Union[base.Integer, base.String]`
:param media: A JSON-serialized array describing photos and videos to be sent
:type media: :obj:`typing.Union[types.MediaGroup, typing.List]`
:param disable_notification: Sends the message silently. Users will receive a notification with no sound.
:type disable_notification: :obj:`typing.Union[base.Boolean, None]`
:param reply_to_message_id: If the message is a reply, ID of the original message
:type reply_to_message_id: :obj:`typing.Union[base.Integer, None]`
:return: On success, an array of the sent Messages is returned.
:rtype: typing.List[types.Message]
"""
# Convert list to MediaGroup
if isinstance(media, list):
media = types.MediaGroup(media)
# Extract files
files = media.get_files()
media = prepare_arg(media)
payload = generate_payload(**locals(), exclude=['files'])
result = await self.request(api.Methods.SEND_MEDIA_GROUP, payload, files)
return [types.Message(**message) for message in result]
async def send_location(self, chat_id: typing.Union[base.Integer, base.String], latitude: base.Float,
longitude: base.Float, live_period: typing.Union[base.Integer, None] = None,
disable_notification: typing.Union[base.Boolean, None] = None,
@ -1529,6 +1563,7 @@ class Bot(BaseBot):
async def send_invoice(self, chat_id: base.Integer, title: base.String, description: base.String,
payload: base.String, provider_token: base.String, start_parameter: base.String,
currency: base.String, prices: typing.List[types.LabeledPrice],
provider_data: typing.Union[typing.Dict, None] = None,
photo_url: typing.Union[base.String, None] = None,
photo_size: typing.Union[base.Integer, None] = None,
photo_width: typing.Union[base.Integer, None] = None,
@ -1565,6 +1600,8 @@ class Bot(BaseBot):
:param prices: Price breakdown, a list of components
(e.g. product price, tax, discount, delivery cost, delivery tax, bonus, etc.)
:type prices: :obj:`typing.List[types.LabeledPrice]`
:param provider_data: JSON-encoded data about the invoice, which will be shared with the payment provider.
:type provider_data: :obj:`typing.Union[typing.Dict, None]`
:param photo_url: URL of the product photo for the invoice.
:type photo_url: :obj:`typing.Union[base.String, None]`
:param photo_size: Photo size

View file

@ -23,6 +23,7 @@ from .inline_query_result import InlineQueryResult, InlineQueryResultArticle, In
InlineQueryResultGame, InlineQueryResultGif, InlineQueryResultLocation, InlineQueryResultMpeg4Gif, \
InlineQueryResultPhoto, InlineQueryResultVenue, InlineQueryResultVideo, InlineQueryResultVoice
from .input_file import InputFile
from .input_media import InputMediaPhoto, InputMediaVideo, MediaGroup
from .input_message_content import InputContactMessageContent, InputLocationMessageContent, InputMessageContent, \
InputTextMessageContent, InputVenueMessageContent
from .invoice import Invoice
@ -97,6 +98,8 @@ __all__ = (
'InlineQueryResultVoice',
'InputContactMessageContent',
'InputFile',
'InputMediaPhoto',
'InputMediaVideo',
'InputLocationMessageContent',
'InputMessageContent',
'InputTextMessageContent',
@ -106,6 +109,7 @@ __all__ = (
'LabeledPrice',
'Location',
'MaskPosition',
'MediaGroup',
'Message',
'MessageEntity',
'MessageEntityType',

View file

@ -0,0 +1,157 @@
import io
import secrets
import typing
from . import base
from . import fields
ATTACHMENT_PREFIX = 'attach://'
class InputMedia(base.TelegramObject):
"""
This object represents the content of a media message to be sent. It should be one of
- InputMediaPhoto
- InputMediaVideo
That is only base class.
https://core.telegram.org/bots/api#inputmedia
"""
type: base.String = fields.Field(default='photo')
media: base.String = fields.Field()
caption: base.String = fields.Field()
@property
def file(self):
return getattr(self, '_file', None)
@file.setter
def file(self, file: io.IOBase):
# File must be not closed before sending media.
# Read file into BytesIO
if isinstance(file, io.BufferedIOBase):
# Go to start of file
if file.seekable():
file.seek(0)
# Read
temp_file = io.BytesIO(file.read())
# Reset cursor
file.seek(0)
# Replace variable
file = temp_file
setattr(self, '_file', file)
self.media = ATTACHMENT_PREFIX + secrets.token_urlsafe(16)
@property
def attachment_key(self):
if self.media.startswith(ATTACHMENT_PREFIX):
return self.media[len(ATTACHMENT_PREFIX):]
return None
class InputMediaPhoto(InputMedia):
"""
Represents a photo to be sent.
https://core.telegram.org/bots/api#inputmediaphoto
"""
def __init__(self, media: base.InputFile, caption: base.String = None):
super(InputMediaPhoto, self).__init__(type='photo', media=media, caption=caption)
if isinstance(media, io.IOBase):
self.file = media
class InputMediaVideo(InputMedia):
"""
Represents a video to be sent.
https://core.telegram.org/bots/api#inputmediavideo
"""
width: base.Integer = fields.Field()
height: base.Integer = fields.Field()
duration: base.Integer = fields.Field()
def __init__(self, media: base.InputFile, caption: base.String = None,
width: base.Integer = None, height: base.Integer = None, duration: base.Integer = None):
super(InputMediaVideo, self).__init__(type='video', media=media, caption=caption,
width=width, height=height, duration=duration)
if isinstance(media, io.IOBase):
self.file = media
class MediaGroup(base.TelegramObject):
"""
Helper for sending media group
"""
def __init__(self, media: typing.Optional[typing.List] = None):
super(MediaGroup, self).__init__()
self.media = []
if media:
for item in media:
self._attach(item)
def _attach(self, media: InputMedia):
"""
Attach file
:param media:
:return:
"""
# TODO: Detect media type from dict
self.media.append(media)
def attach_photo(self, photo: typing.Union[InputMediaPhoto, base.InputFile],
caption: base.String = None):
"""
Attach photo
:param photo:
:param caption:
:return:
"""
if not isinstance(photo, InputMedia):
photo = InputMediaPhoto(media=photo, caption=caption)
self._attach(photo)
def attach_video(self, video: typing.Union[InputMediaVideo, base.InputFile],
caption: base.String = None,
width: base.Integer = None, height: base.Integer = None, duration: base.Integer = None):
"""
Attach video
:param video:
:param caption:
:param width:
:param height:
:param duration:
:return:
"""
if not isinstance(video, InputMedia):
video = InputMediaVideo(media=video, caption=caption,
width=width, height=height, duration=duration)
self._attach(video)
def to_python(self) -> typing.List:
"""
Get object as JSON serializable
:return:
"""
self.clean()
result = []
for obj in self.media:
if isinstance(obj, base.TelegramObject):
obj = obj.to_python()
result.append(obj)
return result
def get_files(self):
return {inputmedia.attachment_key: inputmedia.file
for inputmedia in self.media
if isinstance(inputmedia, InputMedia) and inputmedia.file}

View file

@ -41,6 +41,7 @@ class Message(base.TelegramObject):
forward_date: base.Integer = fields.Field()
reply_to_message: 'Message' = fields.Field(base='Message')
edit_date: base.Integer = fields.Field()
media_group_id: base.String = fields.Field()
author_signature: base.String = fields.Field()
text: base.String = fields.Field()
entities: typing.List[MessageEntity] = fields.ListField(base=MessageEntity)