とっくの昔に、threadingを使ったマルチスレッド処理について記事を書いていたつもりだったのに、まだ書いてないことに気づきました。(そして、マルチプロセスの処理についてもまだ書いてませんでした。)
それでは気づいたこのタイミングで記事にしようと思ったのですが、改めてドキュメントを見てみると、concurrent.futures
というより高レベルなモジュールがあるとのことでしたので、こちらを利用したマルチスレッド処理について紹介します。
先に言っておきますが、PythonにはGIL (Global Interpreter Lock) という制約があって、マルチスレッドにしたとしても、Pythonインタープリタは一度に1つのスレッドしか実行できません。なので、Pythonで完結するプログラムはマルチスレッドしても高速化の恩恵はありません。では、いつマルチスレッドは使うのかというと、Python外部のリソース(ストレージとかOSの処理とかWebアクセスとか)の待ち時間が発生する場合になります。
前置きが長くなってきましたが、実際に、concurrent.futures
を使ったマルチスレッドの並列処理のサンプルコードを紹介します。concurrent.futures.ThreadPoolExecutor というのを使います。
参考: concurrent.futures.ThreadPoolExecutor
5つのサイトへのアクセスを並列でやってみましょう。
import concurrent.futures
import requests
import time
# 取得するURLのリスト
URLS = [
'http://www.example.com',
'http://www.python.org',
'http://www.openai.com',
'http://www.wikipedia.org',
'http://www.github.com'
]
# URLからコンテンツを取得する関数
def fetch_url(url):
print(f"実行開始: {url}")
response = requests.get(url)
print(f"実行完了: {url}")
return url, response.status_code, len(response.content)
# マルチスレッドでURLを並列取得する
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 各URLに対してfetch_url関数を並列実行
futures = {executor.submit(fetch_url, url): url for url in URLS}
for future in concurrent.futures.as_completed(futures):
url = futures[future]
try:
url, status, content_length = future.result()
print(f"URL: {url}, Status: {status}, length: {content_length}")
except Exception as e:
print(f"{url}でエラーが発生しました: {e}")
print(f"処理時間: {time.time() - start_time}秒")
# 以下結果
"""
実行開始: http://www.example.com
実行開始: http://www.python.org
実行開始: http://www.openai.com
実行開始: http://www.wikipedia.org
実行開始: http://www.github.com
実行完了: http://www.python.org
URL: http://www.python.org, Status: 200, length: 50928
実行完了: http://www.github.com
URL: http://www.github.com, Status: 200, length: 254186
実行完了: http://www.openai.com
URL: http://www.openai.com, Status: 403, length: 14186
実行完了: http://www.example.com
URL: http://www.example.com, Status: 200, length: 1256
実行完了: http://www.wikipedia.org
URL: http://www.wikipedia.org, Status: 200, length: 78458
処理時間: 0.49734020233154297秒
"""
ドキュメントのコードをもとにしていますが、fetch_url()メソッドの最初と最後にprit文を差し込んで5つのURLについて同時に処理が進んでいるのが分かるようにしました。開始と終了が異なる順番で結果がprintされていて、並列で動いてた感がありますね。
さて、上記コードの fetch_url() がマルチスレッドで実行されていた関数本体ですが、 肝心のThreadPoolExecutorはかなり使い方にクセがあります。
oncurrent.futures.ThreadPoolExecutor(max_workers=5) でエグゼキューターを作って、submit()や、as_completed()というメソッドを使っていますね。
submit() は実行キューへタスクを送信するメソッドです。
そして、もう一つ、oncurrent.futures.as_completed() というのを使っています。
こちらは、送信された非同期タスクが完了した順にFuture
オブジェクトを返すジェネレータ関数です。これを使うことで、並列で動いていたメソッドが完了した順に、後続の処理を行うことができます。
上の例では、future.result() でメソッドの戻り値を受け取って、順次printしています。
使い所は慎重に選ばないと高速化等の効果は得られないですし、書き方にクセがあるので、慣れないと少々戸惑うのですが、ハードウェアアクセスの待ち時間が長い時や外部リソースへのアクセスを伴う処理の高速化では非常に役に立つものなので機会があったら使ってみてください。