16 lipca 2020

[python] asyncio - Asynchronous I/O

Asynchronieczne funkcje w Pythonie nazywane są corutines (poprzedza jest słowo kluczowe async albo są udekorowane @asyncio.coroutine). Nie można ich wołać jak zwyczajnych funkcji, trzeba skorzystać ze słowa kluczowego await (podobne do yield) i można to robić tylko wewnątrz innych corutines. await przerywa działanie i oddaje sterowanie do "event loop", które zajmuje się zarządzeniem (przekazywaniem sterowania do corutines), i w której są one rejestrowane. Przydatne linki: Biblioteka posiada całe mnóstwo funkcji, dla mnie najważniejsze to:
  • create_task() kolejkuje zadanie
  • run_until_complete() uruchomienie corutine (i wszystkie inne zakolejkowane do tej pory zadania) i czeka aż się zakończy (to konkretna). Jeżeli będą jakiś inne zadania w stanie oczekiwania to run_until_complete() nie będzie na nie czekać.
  • run_forever() uruchamia wszystkie zakolejkowane zadania
Przykład:
import asyncio

async def short_task():
    print('short_task before')
    await asyncio.sleep(2)
    print('short_task after')


async def print_task():
    print('print_task')


async def long_task():
    print('long_task before')
    await asyncio.sleep(5)
    print('long_task after')


async def draw_task():
    print('draw_task')


def main():
    loop = asyncio.get_event_loop()

    loop.create_task(print_task())
    loop.create_task(long_task())

    loop.run_until_complete(short_task())
    loop.run_until_complete(draw_task())

    loop.close()


if __name__ == '__main__':
    main()
Program czeka aż zakończą się dwa zadania: short_task i draw_task, wcześniej uruchamiająć long_task. Ponieważ short_task i draw_task kończą się szybciej dostajemy ostrzeżenie, o wciąż działającym long_taks.
print_task
long_task before
short_task before
short_task after
draw_task
Task was destroyed but it is pending!
task: <Task pending coro=<long_task() done, defined at /home/beru/python_asyncio/run_until.py:13> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f47d9b217d0>()]>>
Bardziej zaawansowany przykład serwera/czatu. Tutaj mamy do czynienia z trzema rodzajami zdarzeń: uruchomienie serwera (otwarcie portów i nasłuchiwanie), oczekiwanie na tekst na stdio oraz oczekiwanie na nadejście wiadomości od klienta. Samo oczekiwanie na tekst składa się z dwóch zdarzeń: pojawienie się tekstu na stdio i zapisanie go do kolejki, oraz odczytanie, gdy coś w tej kolejce się znajduje.

Jest tu kilka kwiatków asyncio, jak np. sposób przekazywanie parametrów do obiektu Chat (przez lambdę). Ale najbardziej dokuczliwym (i dalej nie mam pewności czy zrobiłem to poprawnie) jest sposób zatrzymania programu. Po otrzymaniu komendy "exit" Chat ustawia future na True, co z kolei anuluje najpierw wszystkie zadania, następnie stopuje pętle, a na końcu pętla ta jest jeszcze zatrzymywana.
Jeżeli program jest zabijany przez Ctrl+C, to ustawienie future w obsłudze wyjątku nie zadziałała. Koniecznym stało się wywołanie explicit cancel_all_task(). Samo anulowanie zadań jest też zdaje się być asynchroniczne, więc stop() nie może być zawołane za wcześnie.
import sys
import asyncio


def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()

    # Start monitoring the fd file descriptor for read availability and invoke
    # callback with the specified arguments once fd is available for reading
    loop.add_reader(sys.stdin, got_stdin_data, queue)

    fut = loop.create_future()
    fut.add_done_callback(cancel_all_task)

    coro = loop.create_server(lambda: Chat(loop, queue, fut), '127.0.0.1', 7777)
    server = loop.run_until_complete(coro)

    # Run until Ctrl+C is pressed or loop is stopped
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('[+] Keyboard exception')
        cancel_all_task()

    # Stop server. Closing listening sockets it's done asynchronously, so
    # wait_closed() need to be used to ensure.
    server.close()
    loop.run_until_complete(server.wait_closed())

    loop.close()


def got_stdin_data(queue):
    loop = asyncio.get_event_loop()
    loop.create_task(queue.put(sys.stdin.readline()))


def cancel_all_task(result=None):
    print('[+] Cancel all tasks')
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.create_task(stop_loop())


async def stop_loop():
    print('[+] Stop loop')
    loop = asyncio.get_event_loop()
    loop.stop()


class Chat(asyncio.Protocol):
    def __init__(self, loop, queue, fut):
        self.loop = loop
        self.queue = queue
        self.fut = fut

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('[+] Connection from:', peername)
        self.transport = transport
        self.loop.create_task(self._wait_for_stdin_data())

    def connection_lost(self, exc):
        print('[+] Connection lost')
        self.fut.set_result(True)

    def data_received(self, data):
        message = data.decode()
        print('[+] Data received: {!r}'.format(message))

        if message.strip() == "exit":
            self.fut.set_result(True)

    def _send_reply(self, reply):
        print('[+] Data send: {!r}'.format(reply))
        self.transport.write(reply.encode())
        self.loop.create_task(self._wait_for_stdin_data())

    async def _wait_for_stdin_data(self):
        reply = await self.queue.get()
        self._send_reply(reply)


if __name__ == '__main__':
    main()
W celu połączenia się z serwerm:
nc 127.0.0.1 7777
Działanie:
[+] Connection from: ('127.0.0.1', 45260)
[+] Data received: 'asdf\n'
[+] Data received: 'exit\n'
[+] Cancel all tasks
[+] Stop loop