From 0fcb75e997a29ba680b3fd8f2516e34b8d78db22 Mon Sep 17 00:00:00 2001 From: Alex Root Junior Date: Sat, 26 Aug 2017 18:01:30 +0300 Subject: [PATCH] Implement tasks context manager. --- aiogram/utils/context.py | 105 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 aiogram/utils/context.py diff --git a/aiogram/utils/context.py b/aiogram/utils/context.py new file mode 100644 index 00000000..7e38ec62 --- /dev/null +++ b/aiogram/utils/context.py @@ -0,0 +1,105 @@ +import asyncio +import typing + +CONFIGURED = '@CONFIGURED_TASK_FACTORY' + + +def task_factory(loop: asyncio.BaseEventLoop, coro: typing.Coroutine): + """ + Task factory for implementing context processor + + :param loop: + :param coro: + :return: new task + :rtype: :obj:`asyncio.Task` + """ + # Is not allowed when loop is closed. + loop._check_closed() + + task = asyncio.Task(coro, loop=loop) + + # Hide factory + if task._source_traceback: + del task._source_traceback[-1] + + try: + task.context = asyncio.Task.current_task().context + except AttributeError: + task.context = {CONFIGURED: True} + + return task + + +def get_current_state() -> typing.Dict: + """ + Get current execution context from task + + :return: context + :rtype: :obj:`dict` + """ + task = asyncio.Task.current_task() + context = getattr(task, 'context', None) + if context is None: + context = task.context = {} + return context + + +def get_value(key, default=None): + """ + Get value from task + + :param key: + :param default: + :return: value + """ + return get_current_state().get(key, default) + + +def check_value(key): + """ + Key in context? + + :param key: + :return: + """ + return key in get_current_state() + + +def set_value(key, value): + """ + Set value + + :param key: + :param value: + :return: + """ + get_current_state()[key] = value + + +def del_value(key): + """ + Remove value from context + + :param key: + :return: + """ + del get_current_state()[key] + + +def update_state(data=None, **kwargs): + """ + Update multiple state items + + :param data: + :param kwargs: + :return: + """ + if data is None: + data = {} + state = get_current_state() + state.update(data, **kwargs) + + +def check_configured(): + print('CONFIGURED', get_value(CONFIGURED)) + return get_value(CONFIGURED)