Моделируем тиканье бомбы

В статье про корутины были разобраны основные понятия, встречающиеся в асинхронном программировании. В этой статье мы воспользуемся этими понятиями для решения конкретной задачи: смоделировать бомбу.

Постановка задачи

Любая бомба делает три вещи:

  • ведёт обратный отсчёт до взрыва
  • издаёт устрашающее «тик-тик»
  • взрывается, когда доходит до 0

Чтобы бомбу смоделировать аккуратно, нам понадобится асинхронное программирование. Бомб может быть несколько, работать они будут независимо друг от друга, и взрываться они будут независимо друг от друга. В общем, самое оно, чтобы углубить понимание корутин.

Тиканье через корутину

Начнём с того, что напишем простую корутину: бомба должна пять раз тикнуть и взорваться.

import asyncio


async def do_ticking(amount_of_ticks=5):
    for _ in range(amount_of_ticks):
        print("tick")
        await asyncio.sleep(0)

Ключевое слово async используется, чтобы показать, что функция будет асинхронной. Строка await asyncio.sleep(0) означает «верни управление вызывающей функции до повторного вызова coroutine.send(None)».

Если вы знакомы с итераторами, то заметите: await чем-то напоминает yield . В примере выше прямо хочется написать

# ...
print("tick")
yield

Теперь включим тиканье бомбы:

ticking = do_ticking()
while True:
    try:
        ticking.send(None)
    except StopIteration:
        break  # корутина истощилась, прерываем event loop

print("BOOM!")

На первой строке мы создаём корутину, дальше вызываем её до истощения и взрываем бомбу. Цикл, в котором мы вызываем асинхронный метод раз за разом, называется event loop.

Но теперь проблема: как вставить интервал длиной в секунду между двумя тиками? Казалось бы, заменить await asyncio.sleep(0) на await asyncio.sleep(1), и всё должно сработать:

import asyncio


async def do_ticking(amount_of_ticks=5):
    for _ in range(amount_of_ticks):
        print("tick")
        await asyncio.sleep(1)  # здесь мы изменили интервал на 1 секунду


ticking = do_ticking()

while True:
    try:
        ticking.send(None)
    except StopIteration:
        break  # корутина истощилась, прерываем event loop

print('BOOM!')

На деле мы получаем ошибку:

AssertionError: yield from wasn"t used with future

Проблема в том, что мы используем свой event loop, а не тот, который предоставляет нам asyncio. Но почему asyncio требует свой event loop? Сейчас мы разберемся в этом вопросе, а для этого напишем свою версию команды sleep, совместимую с нашим самодельным event loop.

Создаём Sleep

Итак, создадим awaitable сущность. В документации awaitable определён как «класс, имеющий метод __await__, возвращающий итератор». В коде это выглядит так:

class EventLoopCommand():

    def __await__(self):
        return (yield self)

Наша команда Sleep — это дочерний класс EventLoopCommand. Он хранит количество секунд:

class Sleep(EventLoopCommand):

    def __init__(self, seconds):
        self.seconds = seconds

Теперь мы можем использовать наш собственный Sleep на месте asyncio.sleep:

async def do_ticking(amount_of_ticks=5):
    for _ in range(amount_of_ticks):
        print("tick")
        await Sleep(1)

Обрабатываем Sleep

Теперь, когда наша корутина вызывает Sleep, наш event loop должен обработать эту команду. С помощью магии генераторов, наша функция возвращает Sleep после каждого запуска:

sleep_command = ticking.send(None)
print(sleep_command.seconds)

Итак, наш цикл знает, сколько секунд ему надо подождать, перед тем как сделать ещё один тик. Этот же цикл и обеспечит ожидание:

ticking = do_ticking()

while True:
    try:
        sleep_command = ticking.send(None)
        seconds_to_sleep = sleep_command.seconds
        time.sleep(seconds_to_sleep)
    except StopIteration:
        break  # корутина истощилась, прерываем event loop

print('BOOM!')

Сначала этот цикл узнаёт у корутины сколько ему ждать следующего «тика», после чего засыпает до следующей итерации. И так пока не исчерпается корутина. Вот полный код примера:

import time


class EventLoopCommand():

    def __await__(self):
        return (yield self)


class Sleep(EventLoopCommand):

    def __init__(self, seconds):
        self.seconds = seconds


async def do_ticking(amount_of_ticks=5):
    for _ in range(amount_of_ticks):
        print("tick")
        await Sleep(1)


ticking = do_ticking()

while True:
    try:
        sleep_command = ticking.send(None)
        seconds_to_sleep = sleep_command.seconds
        time.sleep(seconds_to_sleep)
    except StopIteration:
        break  # корутина истощилась, прерываем event loop

print("BOOM!")

Изолируем бомбу

Наконец, модель была бы неполной без качественной декомпозиции. Сейчас она у нас никудышная: бомба тикает внутри функции, а взрывается снаружи. Соберём всё, что относится к бомбе внутри новой функции bang_the_bomb:

async def do_ticking(amount_of_ticks=5):
    for _ in range(amount_of_ticks):
        print("tick")
        await Sleep(1)


async def bang_the_bomb():
    clock = do_ticking()
    await clock
    print("BOOM!")


bomb = bang_the_bomb()

while True:
    try:
        sleep_command = bomb.send(None)
        seconds_to_sleep = sleep_command.seconds
        time.sleep(seconds_to_sleep)
    except StopIteration:
        break  # корутина истощилась, прерываем event loop

Обратите внимание, что функция bang_the_bomb по-прежнему возвращает объект класса Sleep, хотя его даже нет внутри функции. Фокус в том, что вызов await clock создаёт цепочку передачи объектов из внутренних функций во внешние. Что бы не «выкинула» корутина clock это сначала попадёт в bang_the_bomb , а затем улетит дальше наружу — во внешний цикл event loop. Благодаря await даже самая дальняя и вложенная асинхронная функция всегда может связаться с внешним event loop и попросить его «заснуть». Магия…

Добавляем другие бомбы

Теперь сделаем много бомб. А чтобы отличать их по звуку сразу добавим в код параметр sound:

async def do_ticking(amount_of_ticks, sound):
    for _ in range(amount_of_ticks):
        print(sound)
        await Sleep(1)


async def bang_the_bomb(amount_of_ticks=5, sound='tick'):
    clock = do_ticking(amount_of_ticks, sound)
    await clock
    print("BOOM!")


bombs = [
    bang_the_bomb(amount_of_ticks=3)
    bang_the_bomb(amount_of_ticks=5, sound='chick'),
    bang_the_bomb(amount_of_ticks=9, sound='click'),
]

Теперь у нас есть три бомбы. Они включаются одновременно сразу после запуска программы, затем через три секунды взрывается первая из них, а ещё через шесть — последняя.

Код внутри цикла event loop придётся сильно переписать. Старая версия умела работать только с одной бомбой, теперь же придётся запускать их сразу много и параллельно.

Второе важное изменение связано с вызовом time.sleep. Раньше время задержки выбирала одна корутина, теперь же придётся учитывать запросы сразу от всех бомб. Каждая бомба сообщит нам сколько ждать её следующего тика, а мы запомним числа и сравним, выберем из всех бомб самую быструю. Для этого понадобится новая структура данных sleeping_bombs, выглядит она примерно так:

sleeping_bombs = [
    # store timeout for each bomb coroutine
    [0, bomb_1],
    [1, bomb_2],
    [0, bomb_3],
]

И вот, наконец, последняя версия программы. Все ключевые изменения сосредоточены внутри цикла event loop:

import time


class EventLoopCommand():

    def __await__(self):
        return (yield self)


class Sleep(EventLoopCommand):

    def __init__(self, seconds):
        self.seconds = seconds


async def do_ticking(amount_of_ticks, sound):
    for _ in range(amount_of_ticks):
        print(sound)
        await Sleep(1)


async def bang_the_bomb(amount_of_ticks=5, sound='tick'):
    clock = do_ticking(amount_of_ticks, sound)
    await clock
    print('BOOM!')


bombs = [
    bang_the_bomb(amount_of_ticks=9, sound='click'),
    bang_the_bomb(amount_of_ticks=5, sound='chick'),
    bang_the_bomb(amount_of_ticks=3)
]

# store timeout for each bomb coroutine
sleeping_bombs = [[0, bomb] for bomb in bombs]

while sleeping_bombs:
    # осторожно засыпаем так, чтобы не пропустить активацию бомб
    min_delay, _ = min(sleeping_bombs, key=lambda pair: pair[0])
    sleeping_bombs = [[timeout - min_delay, bomb] for timeout, bomb in sleeping_bombs]
    time.sleep(min_delay)

    # делим бомбы на активные и спящие
    active_bombs = [[timeout, bomb] for timeout, bomb in sleeping_bombs if timeout <= 0]
    sleeping_bombs = [[timeout, bomb] for timeout, bomb in sleeping_bombs if timeout > 0]

    for _, bomb in active_bombs:
        try:
            sleep_command = bomb.send(None)
        except StopIteration:
            continue  # выкидываем истощившуюся корутину
        seconds_to_sleep = sleep_command.seconds
        sleeping_bombs.append([seconds_to_sleep, bomb])

Если вы запустите пример выше, то увидите, как одновременно тикают три разные бомбы. Пустые строки здесь добавлены ради удобства:

click
chick
tick

click
chick
tick

click
chick
tick

click
chick
BOOM!

click
chick

click
BOOM!

click

click

click

BOOM!

Последняя версия кода уже куда больше похожа на то, как работают полновесные асинхронные фреймворки — asyncio и trio. Теперь цикл event loop знает какая корутина чего ждёт и сам подстраивается под них - вовремя засыпает и вовремя просыпается. В других «взрослых» event loop есть дополнительные механизмы для работы с событиями ОС, обработки исключений и отмены корутин. Всё это доступно прямо из коробки, достаточно лишь запустить их цикл event loop.

Зачем time.sleep

Во всех примерах кода мы неизменно вызывали time.sleep. Он нужен для того, чтобы дождаться того самого момента когда произойдёт взрыв. Того же можно добиться и циклом без time.sleep, но тогда CPU будет перегружен бесполезной работой. Вызов time.sleep высвобождает время процессора на время, пока все корутины спят. Без него — никак.

Читать дальше

Попробуйте альтернативные источники по теме:


Попробуйте бесплатные уроки по Python

Получите крутое код-ревью от практикующих программистов с разбором ошибок и рекомендациями, на что обратить внимание — бесплатно.

Переходите на страницу учебных модулей «Девмана» и выбирайте тему.