自動化無しに生活無し

WEB開発関係を中心に備忘録をまとめています

Python非同期プログラミングのルール

thumbnail

非同期プログラミングには守るべきルールがある。

同期プログラミングと同じように作るとエラーが出るため、ルール・作法をここにまとめる。

非同期処理のルール一覧

  • I/Oバウンドな同期処理は非同期処理にする
  • 非同期関数を定義するには、async def
  • 非同期関数を実行するには、await
  • 同期関数から非同期関数を実行するには、 asyncio.run()
  • 非同期関数をまとめて実行するには、 asyncio.gather()
  • 同期関数を非同期化させるには、@sync_to_async
  • 非同期的に待機するには、asyncio.sleep()
  • HTTPリクエストを非同期化させるには、aiohttp
  • ファイル操作を非同期化させるには、aiofiles
  • DjangoORMで得られたモデルオブジェクトをそのままreturnしてはいけない

I/Oバウンドな同期処理は非同期処理にする

「I/Oバウンドな同期処理」というのは、I/O待機が発生する、

  • ネットワークへのリクエスト
  • データベースアクセス
  • ファイルの読み書き
  • USBデバイスの操作

などである。これらは、入出力操作に束縛され、動作完了まで待機される。

結果、このI/Oバウンドな処理を同期的に行っていると、処理速度が遅くなる。

非同期処理の概念に関しては下記も参考に。

参照: 非同期処理(async、await)とマルチスレッド(threading)とマルチプロセス(multiprocessing)の違い

Pythonのほとんどの処理は同期処理である。print文も同期処理だ。

ただし、同期処理の中にも、I/Oバウンドなものと、そうでないものがある。

非同期処理をするべきは、先に挙げた「I/Oバウンドな同期処理」であり、そうでないものは非同期処理による高速化は限られる。

print文は「I/Oバウンドではない同期処理」であり、print文を非同期化しても高速化はそれほど期待できない。

非同期関数を定義するには、async def

非同期関数を定義するには、通常のdef 関数に async をつける。

async def fetch_data():
    return "Hello, Async!"

非同期関数を実行するには、await

async def fetch_data():
    return "Hello, Async!"


async def run():
    message = await fetch_data()
    print(message)

このようにawait を使って非同期関数を実行する。await を使うことで、非同期関数終了まで待機できる。

ちなみに、await を使わずに実行することはできない。

async def fetch_data():
    return "Hello, Async!"

async def run():
    message = fetch_data()
    print(message)

このように、同期関数の実行のように書いても実行されない。

これは非同期関数のコルーチンオブジェクトが返されるだけである。

同期関数から非同期関数を実行するには、 asyncio.run()

import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "Async Done"

def sync_function():
    result = asyncio.run(async_function())
    print(result)

sync_function()

asyncio.run() を使い、非同期関数を引数に入れることで同期関数内でも実行できる。

これは、Pythonファイルが同期的に動作していれば、グローバルスコープでも同じ。

import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "Async Done"


result = asyncio.run(async_function())
print(result)

このように呼び出しできる。

FastAPIは非同期的に動作しているため、FastAPIのグローバルスコープでこの asyncio.run() は使ってはいけない(エラーになる)

非同期処理の中、もしくはイベントループ( asyncio.run() )が既に実行中の場合、エラーになってしまう。

import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "Async Done"


result = asyncio.run(async_function())
print(result)

# これはエラー。
result = asyncio.run(async_function())
print(result)

非同期関数をまとめて実行するには、 asyncio.gather()

非同期処理をまとめて実行することができる。これが非同期並行処理である。

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 Done"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 Done"

async def main():

    results = await asyncio.gather(task1(), task2())
    print(results)

asyncio.run(main())

このように、main関数では2つの非同期関数をまとめて実行することができる。

リスト型に非同期関数のコルーチンオブジェクトをappendして、後でまとめて実行する方法も有効。

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 Done"


async def main():

    tasks = [ task1() for i in range(10) ]

    results = await asyncio.gather(*tasks)
    print(results)

このリストの内包表記で、10個分のtask1のコルーチンオブジェクトを含んだリストを作る。

.gather() を使ってまとめて実行している。

*tasks とすることでリストを展開して、引数に与えることができる。 Djangoの*args, **kwargs と同じアンパッキングの概念である。

同期関数を非同期化させるには、@sync_to_async

同期関数があり、とにかく非同期化させたい場合は、 @sync_to_async デコレータでラップすれば良い。

from asgiref.sync import sync_to_async

@sync_to_async
def sync_function():
    return "Sync to Async"


async def main():
    result = await sync_function()
    print(result)

asyncio.run(main())

なお、デコレータを使わなくても同期関数の非同期化はできる。

from asgiref.sync import sync_to_async

def sync_function():
    return "Sync to Async"

async_function = sync_to_async(sync_function)

async def main():
    result = await async_function()
    print(result)

asyncio.run(main())

これで、非同期関数と同じように呼び出せるが、内部的には同期動作になってしまうため、直列的に動作する点に注意。

非同期的に待機するには、asyncio.sleep()

非同期関数内で、time.sleep() をしてしまうと、ブロッキングが発生してしまう。

これでは、他の非同期処理にタスクが回らない。

非同期関数内では、 asyncio.sleep() を使う。

import asyncio

async def wait():
    print("Waiting...")
    await asyncio.sleep(2)
    print("Done")

asyncio.run(wait())

HTTPリクエストを非同期化させるには、aiohttp

HTTPリクエストを送信できる、requests ライブラリは同期動作である。

つまり、ブロッキングが発生し、I/O待機時に他の非同期処理にタスクが回らない。

そこで、aiohttp ライブラリを使う。

pip install aiohttp
import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

asyncio.run(fetch("https://example.com"))

ただし、非同期的にリクエストを送るということは、同期リクエストよりも高速にリクエストを送るということ。

並列処理のマルチスレッドよりも非常に高速なため、同一のリモートホストに対して非同期リクエストを送ってBANされないように。

参照: 非同期リクエスト vs マルチスレッドリクエスト vs 直列リクエスト

ファイル操作を非同期化させるには、aiofiles

ファイル操作も同期動作である。

そのため、非同期関数内で実行すると、ブロッキングが発生。他の非同期処理にタスクが回らない。

そこで、非同期でファイルの操作をするライブラリ、aiofiles を使う。

pip install aiofiles
import aiofiles
import asyncio

async def write_file():
    async with aiofiles.open("example.txt", "w") as f:
        await f.write("Hello, aiofiles!")

asyncio.run(write_file())

DjangoORMで得られたモデルオブジェクトをそのままreturnしてはいけない

DjangoのORMは同期動作。

そのため、非同期関数からモデルオブジェクトをそのままreturn すると、エラーが出る。

async def load_topics():
    # モデルオブジェクトをそのまま返しているのでエラー。
    # SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
    return Topic.objects.all()

async def main():
    topics = await load_topics()
    print(topics)

内部的に同期処理が行われるので、そのまま返すのではなく、シリアライズするか配列に変換して返す。

async def load_topics():
    # モデルオブジェクトを配列に変換して返す。
    return list(Topic.objects.all().values())

async def main():
    topics = await load_topics()
    print(topics)
スポンサーリンク