mirror of
https://github.com/aiogram/aiogram.git
synced 2025-12-08 17:13:56 +00:00
Add Bot API parser and code-generator
This commit is contained in:
parent
5e9d4e55d9
commit
af2573dbee
15 changed files with 3242 additions and 1 deletions
0
aiogram/_telegram/__init__.py
Normal file
0
aiogram/_telegram/__init__.py
Normal file
2793
aiogram/_telegram/types.py
Normal file
2793
aiogram/_telegram/types.py
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -15,3 +15,5 @@ sphinx-rtd-theme>=0.4.3
|
|||
sphinxcontrib-programoutput>=0.14
|
||||
aiohttp-socks>=0.2.2
|
||||
rethinkdb>=2.4.1
|
||||
lxml==4.3.4
|
||||
requests==2.22.0
|
||||
|
|
|
|||
0
generator/__init__.py
Normal file
0
generator/__init__.py
Normal file
7
generator/__main__.py
Normal file
7
generator/__main__.py
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from generator.cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv))
|
||||
22
generator/cli.py
Normal file
22
generator/cli.py
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
import logging
|
||||
import pathlib
|
||||
import sys
|
||||
import typing
|
||||
|
||||
from generator.generator import Generator
|
||||
from generator.parser import Parser
|
||||
|
||||
script_path = pathlib.Path(__file__).parent
|
||||
out_dir = script_path.parent / "aiogram" / "_telegram"
|
||||
|
||||
|
||||
def main(argv: typing.List[str]) -> int:
|
||||
logging.basicConfig(level=logging.ERROR, stream=sys.stdout)
|
||||
parser = Parser()
|
||||
parser.parse()
|
||||
generator = Generator(parser)
|
||||
|
||||
with (out_dir / "types.py").open("w") as f:
|
||||
f.write(generator.render_types())
|
||||
|
||||
return 0
|
||||
32
generator/consts.py
Normal file
32
generator/consts.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
import re
|
||||
|
||||
DOCS_URL = "https://core.telegram.org/bots/api"
|
||||
|
||||
RE_FLAGS = re.IGNORECASE
|
||||
ANCHOR_HEADER_PATTERN = re.compile(r"^h([34])$")
|
||||
RETURN_PATTERNS = [
|
||||
re.compile(r"(?P<type>Array of [a-z]+) objects", flags=RE_FLAGS),
|
||||
re.compile(r"a (?P<type>[a-z]+) object", flags=RE_FLAGS),
|
||||
re.compile(r"Returns (?P<type>[a-z]+) on success", flags=RE_FLAGS),
|
||||
re.compile(r"(?P<type>[a-z]+) on success", flags=RE_FLAGS),
|
||||
re.compile(
|
||||
r"(?P<type>[a-z]+) is returned, otherwise (?P<other>[a-zA-Z]+) is returned", flags=RE_FLAGS
|
||||
),
|
||||
re.compile(
|
||||
r"returns the edited (?P<type>[a-z]+), otherwise returns (?P<other>[a-zA-Z]+)",
|
||||
flags=RE_FLAGS,
|
||||
),
|
||||
re.compile(r"(?P<type>[a-z]+) is returned", flags=RE_FLAGS),
|
||||
re.compile(r"Returns (?P<type>[a-z]+)", flags=RE_FLAGS),
|
||||
]
|
||||
BUILTIN_TYPES = {
|
||||
"String": "str",
|
||||
"Integer": "int",
|
||||
"Float": "float",
|
||||
"Boolean": "bool",
|
||||
"InputFile": "types.InputFile",
|
||||
}
|
||||
READ_MORE_PATTERN = re.compile(
|
||||
r" ((More info on|More about)([\W\w]+»)|»)", flags=re.MULTILINE & re.IGNORECASE
|
||||
)
|
||||
SYMBOLS_MAP = {"“": "'", "”": "'"}
|
||||
34
generator/generator.py
Normal file
34
generator/generator.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
import datetime
|
||||
import pathlib
|
||||
|
||||
import black
|
||||
import jinja2
|
||||
|
||||
from generator.parser import Parser
|
||||
|
||||
templates_dir: pathlib.Path = pathlib.Path(__file__).parent / "templates"
|
||||
|
||||
|
||||
class Generator:
|
||||
def __init__(self, parser: Parser):
|
||||
self.parser = parser
|
||||
self.env = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=[templates_dir]))
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
return {
|
||||
"groups": self.parser.groups,
|
||||
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
|
||||
}
|
||||
|
||||
def _render_template(self, template: str) -> str:
|
||||
template = self.env.get_template(template)
|
||||
content = template.render(self.context)
|
||||
return content
|
||||
|
||||
def _reformat_code(self, code: str) -> str:
|
||||
return black.format_str(code, mode=black.FileMode())
|
||||
|
||||
def render_types(self):
|
||||
content = self._render_template("types.py.jinja2")
|
||||
return self._reformat_code(content)
|
||||
85
generator/normalizers.py
Normal file
85
generator/normalizers.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
import functools
|
||||
|
||||
from generator.consts import BUILTIN_TYPES, RETURN_PATTERNS, READ_MORE_PATTERN, SYMBOLS_MAP
|
||||
|
||||
|
||||
def normalize_description(text: str) -> str:
|
||||
for bad, good in SYMBOLS_MAP.items():
|
||||
text = text.replace(bad, good)
|
||||
text = READ_MORE_PATTERN.sub("", text)
|
||||
text.strip()
|
||||
return text
|
||||
|
||||
|
||||
def normalize_annotation(item: dict):
|
||||
for key in list(item.keys()):
|
||||
item[key.lower()] = item.pop(key)
|
||||
|
||||
item["description"] = normalize_description(item["description"])
|
||||
|
||||
|
||||
def normalize_method_annotation(item: dict):
|
||||
normalize_annotation(item)
|
||||
item["required"] = {"Optional": False, "Yes": True}[item["required"]]
|
||||
item["name"] = item.pop("parameter")
|
||||
|
||||
|
||||
def normalize_type_annotation(item: dict):
|
||||
normalize_annotation(item)
|
||||
|
||||
item["name"] = item.pop("field")
|
||||
|
||||
if item["description"].startswith("Optional"):
|
||||
item["required"] = False
|
||||
item["description"] = item["description"][10:]
|
||||
else:
|
||||
item["required"] = True
|
||||
|
||||
|
||||
@functools.lru_cache()
|
||||
def normalize_type(string, required=True):
|
||||
if not string:
|
||||
return "typing.Any"
|
||||
|
||||
union = "typing.Union" if required else "typing.Optional"
|
||||
|
||||
lower = string.lower()
|
||||
split = lower.split()
|
||||
|
||||
if split[0] == "array":
|
||||
new_string = string[lower.index("of") + 2 :].strip()
|
||||
return f"typing.List[{normalize_type(new_string)}]"
|
||||
if "or" in split:
|
||||
split_types = string.split(" or ")
|
||||
norm_str = ", ".join(map(normalize_type, map(str.strip, split_types)))
|
||||
return f"{union}[{norm_str}]"
|
||||
if "number" in lower:
|
||||
return normalize_type(string.replace("number", "").strip())
|
||||
if lower in ["true", "false"]:
|
||||
return "bool"
|
||||
if string not in BUILTIN_TYPES and string[0].isupper():
|
||||
return f"types.{string}"
|
||||
elif string in BUILTIN_TYPES:
|
||||
return BUILTIN_TYPES[string]
|
||||
return "typing.Any"
|
||||
|
||||
|
||||
@functools.lru_cache()
|
||||
def get_returning(description):
|
||||
parts = list(filter(lambda item: "return" in item.lower(), description.split(".")))
|
||||
if not parts:
|
||||
return "typing.Any", ""
|
||||
sentence = ". ".join(map(str.strip, parts))
|
||||
return_type = None
|
||||
|
||||
for pattern in RETURN_PATTERNS:
|
||||
temp = pattern.search(sentence)
|
||||
if temp:
|
||||
return_type = temp.group("type")
|
||||
if "other" in temp.groupdict():
|
||||
otherwise = temp.group("other")
|
||||
return_type += f" or {otherwise}"
|
||||
if return_type:
|
||||
break
|
||||
|
||||
return return_type, sentence + "."
|
||||
134
generator/parser.py
Normal file
134
generator/parser.py
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
import logging
|
||||
|
||||
import requests
|
||||
from lxml import html
|
||||
from lxml.html import HtmlElement
|
||||
|
||||
from generator.consts import DOCS_URL, ANCHOR_HEADER_PATTERN
|
||||
from generator.normalizers import (
|
||||
normalize_type_annotation,
|
||||
normalize_method_annotation,
|
||||
normalize_description,
|
||||
)
|
||||
from generator.structures import Group, Entity, Annotation
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Parser:
|
||||
def __init__(self):
|
||||
self.docs = self.load(DOCS_URL)
|
||||
self.groups = []
|
||||
|
||||
@staticmethod
|
||||
def load_page(url: str) -> str:
|
||||
log.info("Load page %r", url)
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
|
||||
@staticmethod
|
||||
def to_html(content: str, url: str) -> HtmlElement:
|
||||
page = html.fromstring(content, url)
|
||||
|
||||
for br in page.xpath("*//br"):
|
||||
br.tail = "\n" + br.tail if br.tail else "\n"
|
||||
|
||||
return page
|
||||
|
||||
def load(self, url: str) -> HtmlElement:
|
||||
content = self.load_page(url)
|
||||
return self.to_html(content, url)
|
||||
|
||||
def optimize_group(self, group: Group):
|
||||
if not group.childs:
|
||||
log.warning("Remove empty %s", group)
|
||||
self.groups.remove(group)
|
||||
return
|
||||
|
||||
if not group.childs[0].annotations:
|
||||
log.warning("Update group %r description from first child element", group.title)
|
||||
group.description = group.childs[0].description
|
||||
group.childs.pop(0)
|
||||
|
||||
def parse(self):
|
||||
self.groups.clear()
|
||||
|
||||
group = None
|
||||
|
||||
for item in self.docs.xpath("//a[@class='anchor']"): # type: HtmlElement
|
||||
parent_tag: HtmlElement = item.getparent()
|
||||
anchor_name = item.get("name", None)
|
||||
matches = ANCHOR_HEADER_PATTERN.match(parent_tag.tag)
|
||||
if not matches or not anchor_name:
|
||||
continue
|
||||
level = int(matches.group(1))
|
||||
title = item.tail
|
||||
|
||||
if level == 3:
|
||||
if group:
|
||||
self.optimize_group(group)
|
||||
|
||||
log.info("Parse group %r (#%s)", title, anchor_name)
|
||||
group = Group(title=title, anchor=anchor_name)
|
||||
self.groups.append(group)
|
||||
|
||||
if level == 4 and len(title.split()) > 1:
|
||||
continue
|
||||
|
||||
elif anchor_name not in ["recent-changes", "authorizing-your-bot", "making-requests"]:
|
||||
child = self._parse_child(parent_tag, anchor_name)
|
||||
group.childs.append(child)
|
||||
|
||||
return self.groups
|
||||
|
||||
def _parse_child(self, start_tag: HtmlElement, anchor: str):
|
||||
name = start_tag.text_content()
|
||||
description = []
|
||||
annotations = []
|
||||
|
||||
is_method = name[0].islower()
|
||||
|
||||
log.info("Parse block: %r (#%s)", name, anchor)
|
||||
|
||||
for item in self._parse_tags_group(start_tag):
|
||||
if item.tag == "table":
|
||||
for raw in self._parse_table(item):
|
||||
if is_method:
|
||||
normalize_method_annotation(raw)
|
||||
else:
|
||||
normalize_type_annotation(raw)
|
||||
annotations.append(Annotation(**raw))
|
||||
|
||||
elif item.tag == "p":
|
||||
description.extend(item.text_content().splitlines())
|
||||
elif item.tag == "blockquote":
|
||||
description.extend(self._parse_blockquote(item))
|
||||
elif item.tag == "ul":
|
||||
description.extend(self._parse_list(item))
|
||||
|
||||
description = normalize_description("\n".join(description))
|
||||
block = Entity(anchor=anchor, name=name, description=description, annotations=annotations)
|
||||
log.info("%s", block)
|
||||
return block
|
||||
|
||||
def _parse_tags_group(self, start_tag: HtmlElement):
|
||||
tag: HtmlElement = start_tag.getnext()
|
||||
while tag is not None and tag.tag not in ["h3", "h4"]:
|
||||
yield tag
|
||||
tag: HtmlElement = tag.getnext()
|
||||
|
||||
def _parse_table(self, table: HtmlElement):
|
||||
head, body = table.getchildren() # type: HtmlElement, HtmlElement
|
||||
header = [item.text_content() for item in head.getchildren()[0]]
|
||||
|
||||
for body_item in body:
|
||||
yield {k: v for k, v in zip(header, [item.text_content() for item in body_item])}
|
||||
|
||||
def _parse_blockquote(self, blockquote: HtmlElement):
|
||||
for item in blockquote.getchildren():
|
||||
yield from item.text_content().splitlines()
|
||||
|
||||
def _parse_list(self, data: HtmlElement):
|
||||
for item in data.getchildren():
|
||||
yield " - " + item.text_content()
|
||||
90
generator/structures.py
Normal file
90
generator/structures.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from generator.normalizers import normalize_type, get_returning
|
||||
|
||||
|
||||
@dataclass
|
||||
class Annotation:
|
||||
name: str
|
||||
type: str
|
||||
description: str
|
||||
required: bool = True
|
||||
|
||||
@property
|
||||
def python_name(self):
|
||||
if self.name == "from":
|
||||
return "from_user"
|
||||
return self.name
|
||||
|
||||
@property
|
||||
def python_type(self) -> str:
|
||||
return normalize_type(self.type, self.required)
|
||||
|
||||
@property
|
||||
def python_argument(self):
|
||||
result = f"{self.python_name}: {self.python_type}"
|
||||
|
||||
value = "" if self.required else "None"
|
||||
if self.name == "from":
|
||||
value = f"pydantic.Schema({value or '...'}, alias=\"from\")"
|
||||
|
||||
if value:
|
||||
result += f" = {value}"
|
||||
return result
|
||||
|
||||
|
||||
@dataclass
|
||||
class Entity:
|
||||
name: str
|
||||
anchor: str
|
||||
description: str = None
|
||||
annotations: typing.List[Annotation] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def is_method(self) -> bool:
|
||||
return self.name[0].islower()
|
||||
|
||||
@property
|
||||
def is_type(self) -> bool:
|
||||
return not self.is_method
|
||||
|
||||
@property
|
||||
def python_name(self) -> str:
|
||||
return self.name
|
||||
|
||||
def _get_returning(self):
|
||||
if self.is_type:
|
||||
return self.name, ""
|
||||
|
||||
return get_returning(self.description)
|
||||
|
||||
@property
|
||||
def returning(self):
|
||||
return self._get_returning()[1]
|
||||
|
||||
@property
|
||||
def returning_type(self):
|
||||
return self._get_returning()[0]
|
||||
|
||||
@property
|
||||
def python_returning_type(self):
|
||||
return normalize_type(self.returning_type)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Group:
|
||||
title: str
|
||||
anchor: str
|
||||
description: str = None
|
||||
childs: typing.List[Entity] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def has_methods(self):
|
||||
return any(entity.is_method for entity in self.childs)
|
||||
|
||||
@property
|
||||
def has_types(self):
|
||||
return any(entity.is_method for entity in self.childs)
|
||||
12
generator/templates/type.py.jinja2
Normal file
12
generator/templates/type.py.jinja2
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
class {{ entity.python_name }}(pydantic.BaseModel):
|
||||
"""
|
||||
{{ entity.description|indent(width=4) }}
|
||||
|
||||
Source: https://core.telegram.org/bots/api#{{ entity.anchor }}
|
||||
"""
|
||||
{% for annotation in entity.annotations %}
|
||||
{{ annotation.python_argument }}
|
||||
"""{{ annotation.description|indent(width=4) }}"""
|
||||
{% else %}
|
||||
pass
|
||||
{% endfor %}
|
||||
20
generator/templates/types.py.jinja2
Normal file
20
generator/templates/types.py.jinja2
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
"""
|
||||
!!! DO NOT EDIT THIS FILE !!!
|
||||
This file is autogenerated from Docs of Telegram Bot API at {{ timestamp }}
|
||||
"""
|
||||
import typing
|
||||
|
||||
import pydantic
|
||||
|
||||
from aiogram import types
|
||||
|
||||
__all__ = [
|
||||
{% for group in groups %}{% for entity in group.childs %}{% if entity.is_type %}
|
||||
"{{ entity.python_name }}",
|
||||
{% endif %}{% endfor %}{% endfor %}
|
||||
]
|
||||
|
||||
|
||||
{% for group in groups %}
|
||||
{% include 'types_group.py.jinja2' %}
|
||||
{% endfor %}
|
||||
10
generator/templates/types_group.py.jinja2
Normal file
10
generator/templates/types_group.py.jinja2
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
# %% Region: '{{ group.title }}'
|
||||
"""{% if group.description %}
|
||||
{{ group.description }}
|
||||
{% endif %}
|
||||
link: https://core.telegram.org/bots/api#{{ group.anchor }}
|
||||
"""
|
||||
{% for entity in group.childs %}{% if entity.is_type %}
|
||||
{% include 'type.py.jinja2' %}
|
||||
{% endif %}{% endfor %}
|
||||
# %% End of region '{{ group.title }}'
|
||||
2
setup.py
2
setup.py
|
|
@ -63,7 +63,7 @@ def get_requirements(filename=None):
|
|||
setup(
|
||||
name="aiogram",
|
||||
version=get_version(),
|
||||
packages=find_packages(exclude=("tests", "tests.*", "examples.*", "docs")),
|
||||
packages=find_packages(exclude=("tests", "tests.*", "examples.*", "docs", "generator")),
|
||||
url="https://github.com/aiogram/aiogram",
|
||||
license="MIT",
|
||||
author="Alex Root Junior",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue