- https://stackabuse.com/python-async-await-tutorial/
- https://stackoverflow.com/questions/40897428/please-explain-task-was-destroyed-but-it-is-pending/40946916#40946916
- create_task() kolejkuje zadanie
- run_until_complete() uruchomienie corutine (i wszystkie inne zakolejkowane do tej pory zadania) i czeka aż się zakończy (to konkretna). Jeżeli będą jakiś inne zadania w stanie oczekiwania to run_until_complete() nie będzie na nie czekać.
- run_forever() uruchamia wszystkie zakolejkowane zadania
import asyncio async def short_task(): print('short_task before') await asyncio.sleep(2) print('short_task after') async def print_task(): print('print_task') async def long_task(): print('long_task before') await asyncio.sleep(5) print('long_task after') async def draw_task(): print('draw_task') def main(): loop = asyncio.get_event_loop() loop.create_task(print_task()) loop.create_task(long_task()) loop.run_until_complete(short_task()) loop.run_until_complete(draw_task()) loop.close() if __name__ == '__main__': main()Program czeka aż zakończą się dwa zadania: short_task i draw_task, wcześniej uruchamiająć long_task. Ponieważ short_task i draw_task kończą się szybciej dostajemy ostrzeżenie, o wciąż działającym long_taks.
print_task long_task before short_task before short_task after draw_task Task was destroyed but it is pending! task: <Task pending coro=<long_task() done, defined at /home/beru/python_asyncio/run_until.py:13> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f47d9b217d0>()]>>Bardziej zaawansowany przykład serwera/czatu. Tutaj mamy do czynienia z trzema rodzajami zdarzeń: uruchomienie serwera (otwarcie portów i nasłuchiwanie), oczekiwanie na tekst na stdio oraz oczekiwanie na nadejście wiadomości od klienta. Samo oczekiwanie na tekst składa się z dwóch zdarzeń: pojawienie się tekstu na stdio i zapisanie go do kolejki, oraz odczytanie, gdy coś w tej kolejce się znajduje.
Jest tu kilka kwiatków asyncio, jak np. sposób przekazywanie parametrów do obiektu Chat (przez lambdę). Ale najbardziej dokuczliwym (i dalej nie mam pewności czy zrobiłem to poprawnie) jest sposób zatrzymania programu. Po otrzymaniu komendy "exit" Chat ustawia future na True, co z kolei anuluje najpierw wszystkie zadania, następnie stopuje pętle, a na końcu pętla ta jest jeszcze zatrzymywana.
Jeżeli program jest zabijany przez Ctrl+C, to ustawienie future w obsłudze wyjątku nie zadziałała. Koniecznym stało się wywołanie explicit cancel_all_task(). Samo anulowanie zadań jest też zdaje się być asynchroniczne, więc stop() nie może być zawołane za wcześnie.
import sys import asyncio def main(): queue = asyncio.Queue() loop = asyncio.get_event_loop() # Start monitoring the fd file descriptor for read availability and invoke # callback with the specified arguments once fd is available for reading loop.add_reader(sys.stdin, got_stdin_data, queue) fut = loop.create_future() fut.add_done_callback(cancel_all_task) coro = loop.create_server(lambda: Chat(loop, queue, fut), '127.0.0.1', 7777) server = loop.run_until_complete(coro) # Run until Ctrl+C is pressed or loop is stopped try: loop.run_forever() except KeyboardInterrupt: print('[+] Keyboard exception') cancel_all_task() # Stop server. Closing listening sockets it's done asynchronously, so # wait_closed() need to be used to ensure. server.close() loop.run_until_complete(server.wait_closed()) loop.close() def got_stdin_data(queue): loop = asyncio.get_event_loop() loop.create_task(queue.put(sys.stdin.readline())) def cancel_all_task(result=None): print('[+] Cancel all tasks') loop = asyncio.get_event_loop() for task in asyncio.Task.all_tasks(): task.cancel() loop.create_task(stop_loop()) async def stop_loop(): print('[+] Stop loop') loop = asyncio.get_event_loop() loop.stop() class Chat(asyncio.Protocol): def __init__(self, loop, queue, fut): self.loop = loop self.queue = queue self.fut = fut def connection_made(self, transport): peername = transport.get_extra_info('peername') print('[+] Connection from:', peername) self.transport = transport self.loop.create_task(self._wait_for_stdin_data()) def connection_lost(self, exc): print('[+] Connection lost') self.fut.set_result(True) def data_received(self, data): message = data.decode() print('[+] Data received: {!r}'.format(message)) if message.strip() == "exit": self.fut.set_result(True) def _send_reply(self, reply): print('[+] Data send: {!r}'.format(reply)) self.transport.write(reply.encode()) self.loop.create_task(self._wait_for_stdin_data()) async def _wait_for_stdin_data(self): reply = await self.queue.get() self._send_reply(reply) if __name__ == '__main__': main()W celu połączenia się z serwerm:
nc 127.0.0.1 7777Działanie:
[+] Connection from: ('127.0.0.1', 45260) [+] Data received: 'asdf\n' [+] Data received: 'exit\n' [+] Cancel all tasks [+] Stop loop
Brak komentarzy:
Prześlij komentarz