Pythonでマルチスレッド処理

とっくの昔に、threadingを使ったマルチスレッド処理について記事を書いていたつもりだったのに、まだ書いてないことに気づきました。(そして、マルチプロセスの処理についてもまだ書いてませんでした。)

それでは気づいたこのタイミングで記事にしようと思ったのですが、改めてドキュメントを見てみると、concurrent.futures というより高レベルなモジュールがあるとのことでしたので、こちらを利用したマルチスレッド処理について紹介します。

先に言っておきますが、PythonにはGIL (Global Interpreter Lock) という制約があって、マルチスレッドにしたとしても、Pythonインタープリタは一度に1つのスレッドしか実行できません。なので、Pythonで完結するプログラムはマルチスレッドしても高速化の恩恵はありません。では、いつマルチスレッドは使うのかというと、Python外部のリソース(ストレージとかOSの処理とかWebアクセスとか)の待ち時間が発生する場合になります。

前置きが長くなってきましたが、実際に、concurrent.futuresを使ったマルチスレッドの並列処理のサンプルコードを紹介します。concurrent.futures.ThreadPoolExecutor というのを使います。
参考: concurrent.futures.ThreadPoolExecutor

5つのサイトへのアクセスを並列でやってみましょう。

import concurrent.futures
import requests
import time


# 取得するURLのリスト
URLS = [
    'http://www.example.com',
    'http://www.python.org',
    'http://www.openai.com',
    'http://www.wikipedia.org',
    'http://www.github.com'
]


# URLからコンテンツを取得する関数
def fetch_url(url):
    print(f"実行開始: {url}")
    response = requests.get(url)
    print(f"実行完了: {url}")
    return url, response.status_code, len(response.content)


# マルチスレッドでURLを並列取得する
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 各URLに対してfetch_url関数を並列実行
    futures = {executor.submit(fetch_url, url): url for url in URLS}

    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        try:
            url, status, content_length = future.result()
            print(f"URL: {url}, Status: {status}, length: {content_length}")
        except Exception as e:
            print(f"{url}でエラーが発生しました: {e}")

print(f"処理時間: {time.time() - start_time}秒")

# 以下結果
"""
実行開始: http://www.example.com
実行開始: http://www.python.org
実行開始: http://www.openai.com
実行開始: http://www.wikipedia.org
実行開始: http://www.github.com
実行完了: http://www.python.org
URL: http://www.python.org, Status: 200, length: 50928
実行完了: http://www.github.com
URL: http://www.github.com, Status: 200, length: 254186
実行完了: http://www.openai.com
URL: http://www.openai.com, Status: 403, length: 14186
実行完了: http://www.example.com
URL: http://www.example.com, Status: 200, length: 1256
実行完了: http://www.wikipedia.org
URL: http://www.wikipedia.org, Status: 200, length: 78458
処理時間: 0.49734020233154297秒
"""

ドキュメントのコードをもとにしていますが、fetch_url()メソッドの最初と最後にprit文を差し込んで5つのURLについて同時に処理が進んでいるのが分かるようにしました。開始と終了が異なる順番で結果がprintされていて、並列で動いてた感がありますね。

さて、上記コードの fetch_url() がマルチスレッドで実行されていた関数本体ですが、 肝心のThreadPoolExecutorはかなり使い方にクセがあります。

oncurrent.futures.ThreadPoolExecutor(max_workers=5) でエグゼキューターを作って、submit()や、as_completed()というメソッドを使っていますね。

submit() は実行キューへタスクを送信するメソッドです。

そして、もう一つ、oncurrent.futures.as_completed() というのを使っています。
こちらは、送信された非同期タスクが完了した順にFutureオブジェクトを返すジェネレータ関数です。これを使うことで、並列で動いていたメソッドが完了した順に、後続の処理を行うことができます。
上の例では、future.result() でメソッドの戻り値を受け取って、順次printしています。

使い所は慎重に選ばないと高速化等の効果は得られないですし、書き方にクセがあるので、慣れないと少々戸惑うのですが、ハードウェアアクセスの待ち時間が長い時や外部リソースへのアクセスを伴う処理の高速化では非常に役に立つものなので機会があったら使ってみてください。

Streamlitでアニメーション

今回はStreamlitでアニメーションを作成します。

といっても、やることは以前紹介したプレースホルダーの中身を順次更新し続けるだけ、という実装です。
参考: Streamlitのコンテナを使って動的にページを表示する

アニメーションさせるためには一つの枠を連続的に書き換えて画像を表示するので、st.empty() を使います。

とりあえず一個やってみましょう。画像の描写はmatplotlibを使ってみました。お試しなのでアニメーションの内容は線分をぐるぐる回すだけです。(両端を三角関数で実装します。)

import streamlit as st
import matplotlib.pyplot as plt
import numpy as np


# 描画エリアを設定
fig = plt.figure()
ax = fig.add_subplot(111)

ax.set_xlim(-1.2, 1.2)
ax.set_ylim(-1.2, 1.2)

# アニメを描写するプレースホルダーを作成
placeholder = st.empty()

# Streamlitのアニメーション表示
for i in range(100):
    ax.clear()
    ax.set_xlim(-1.2, 1.2)
    ax.set_ylim(-1.2, 1.2)
    ax.plot(
        [np.cos(i*0.1), -np.cos(i*0.1)],
        [np.sin(i*0.1), -np.sin(i*0.1)]
    )
    
    # プレースホルダーを更新
    placeholder.pyplot(fig)

これで線がぐるぐる回るアニメーションが表示できます。

あれ、time.sleep(0.01)とかウェイトを入れておかないとこのfor文が一瞬で終わってしまうんじゃないの?と思われるかもしれませんが、実験してみたところちょうど良い感じにアニメーションになりました。

どうもstreamlitの仕様として一枚一枚の画像の表示(pyplot)にウェイトがかかっているようです。

これは結構大きなメリットで、あまり表示時間とか気にせずにいい感じのアニメーションが作れます。

一方で、time.sleep(0.01) で0.01秒間隔の表示で1000フレーム使ってピッタリ10秒の動画を作ろう!みたいな調整は困難です。まぁ、これはstreamlitは動画作成を念頭に置いたものではないと思うので仕方ないですね。

ただし、デフォルトだと動作が早すぎるという場合はtime.sleep()を使ってウェイトを増やしましょう。