mplfinanceの株価チャートに指標を追加する

前回紹介した、mplfinanceの使い方の続編です。前回はただ単純に四本値でローソクチャートを書きましたが、今回はそれに各種テクニカル指標等を追加する方法を紹介します。
参考: mplfinanceで株価チャートを描く

免責事項
本記事では株式や為替などの金融商品の価格に関するデータをサンプルとして利用しますが、効果や正当性を保証するものではありません。本ブログを利用して損失を被った場合でも一切の責任を負いません。そもそも、この記事ではライブラリを使って図に線や点を追加する方法を紹介しているだけであり、著者自身がこの記事で登場している指標の投資における有用性を検証していませんし、投資にも利用していません。あくまでもこういうコードを書いたらこう動くという例の紹介です。

おきまりの文章が終わったのでやってきましょう。前回同様、こんな感じのデータがあるとします。

print("データ件数:", len(price_df))
# データ件数: 182

print(price_df.head(5))
"""
              open    high     low   close   volume
date                                               
2022-01-04  3330.0  3340.0  3295.0  3335.0  75300.0
2022-01-05  3355.0  3370.0  3320.0  3360.0  62200.0
2022-01-06  3345.0  3370.0  3305.0  3305.0  67200.0
2022-01-07  3315.0  3330.0  3265.0  3295.0  84500.0
2022-01-11  3300.0  3300.0  3215.0  3220.0  87400.0
"""

前回はこれをただそのまま表示しましたが、今回はそれに追加して以下の情報を追加で書いていこうと思います。追記する場所として、四本値のローソク足のパネル、出来高のグラフのパネル、新しいパネルを用意してそこに書き込む、の3種類ができるということ、何かしらの指標を線で表示することや特定の点にマーカーをプロットできる、ということを例示するために以下のような例を考えました。

  • 20日間の高値、安値の線 (四本値のパネル)
  • 高値、安値を更新した点のプロット(四本値のパネル)
  • 過去3日間の出来高の合計(出来高のパネル)
  • 前日の終値と当日の終値の差分(新規のパネル)

まずプロットするデータを作ります。データは四本値のデータと同じ長さで、インデックスの日付が共通のDataFrameかSeriesである必要があります。高値線や安値線などのテクニカル指標の場合は大丈夫雨だと思うのですが、1~2ヶ所点をプロットしたいだけ、といった場合であっても、点をプロットない場所は全部NaN値を入れる形で、同じ長さのデータを作らなければなりません。それだけ気をつければ特に詰まるところはないと思います。
まぁ、元のデータのDataFrameに列を追加する形で作っていけば確実でしょう。

高値安値とその更新した点は次のように作りました。更新点はちょっと上下にずれた位置にプロットしたかったので、それぞれ1.02倍/0.98倍しています。

import pandas as pd
import numpy as np


# 高値線、安値線
price_df["h_line"] = price_df.rolling(20, min_periods=1).max()["high"]
price_df["l_line"] = price_df.rolling(20, min_periods=1).min()["low"]

# 高値、安値を更新した日付を算出
h_breakout_flg = price_df["h_line"].shift(1) < price_df["high"]
l_breakout_flg = price_df["l_line"].shift(1) > price_df["low"]

# 更新日にプロットする値を用意する。
price_df["h_breakout"] = np.nan
price_df["l_breakout"] = np.nan
price_df.loc[h_breakout_flg, "h_breakout"] = price_df.loc[h_breakout_flg, "high"] * 1.02
price_df.loc[l_breakout_flg, "l_breakout"] = price_df.loc[l_breakout_flg, "low"] * 0.98

出来高の3日の和と終値の前日との差分はそれぞれ次のように作れます。

price_df["volume_sum"] = price_df.rolling(3)["volume"].sum()
price_df["close_diff"] = price_df["close"].diff()

これで、サンプルデータが出揃ったので可視化していきます。ドキュメントは前回同様Githubのサンプルコードを参照します。今回見るのはこれです。
参考: mplfinance/addplot.ipynb at master · matplotlib/mplfinance

ドキュメントは頼りないので必要に応じてソースコードもみましょう。

チャートに指標を追加するには、plotメソッドを呼び出すときに、addplot 引数に追加したい指標の情報をdictで渡します。複数追加したい時は追加したい指標の数だけのdictをlistにまとめて渡します。(今回やるのはこちら)。

addplot に渡す辞書というのは以下の例のような結構大掛かりな辞書です。

{'data': date
 2022-01-04       NaN
 2022-01-05    3437.4
 2022-01-06       NaN
 2022-01-07       NaN
 2022-01-11       NaN
                ...  
 2022-09-26       NaN
 2022-09-27       NaN
 2022-09-28       NaN
 2022-09-29       NaN
 2022-09-30       NaN
 Name: h_breakout, Length: 182, dtype: float64,
 'scatter': False,
 'type': 'scatter',
 'mav': None,
 'panel': 0,
 'marker': '^',
 'markersize': 18,
 'color': None,
 'linestyle': None,
 'linewidths': None,
 'edgecolors': None,
 'width': None,
 'bottom': 0,
 'alpha': 1,
 'secondary_y': 'auto',
 'y_on_right': None,
 'ylabel': None,
 'ylim': None,
 'title': None,
 'ax': None,
 'yscale': None,
 'stepwhere': 'pre',
 'marketcolors': None,
 'fill_between': None}

このdictデータを自分で作るのは大変です。そこで、mplfinance が専用のメソッド、make_addplot というのを持っているのでこれを使います。これを使って、追加する指標のデータと、どのグラフに書き込むのか(panel)可視化の方法(type, ‘line’, ‘bar’, ‘scatter’, ‘step’から選択)、マーカーや線のスタイル、大きや色、などの情報と合わせて渡すことでデータを作ってくれます。 (メインの四本値のデータよりそこに加筆する指標を先にライブラリに渡すのって微妙に直感的で無くて使いにくいですね。)

例えば、以下のようにすることで加筆データを生成できます。make_addplotに指定できる引数は上のサンプル辞書のkeyを見るのが早いと思います。だいたいイメージ通り動きます。

import mplfinance as mpf


adp = [
    # 高値安値線。 panel=0 と指定し、四本値と同じパネルを指定
    mpf.make_addplot(price_df[["h_line", "l_line"]], type='step',panel=0),
    # 高値更新位置。 scatter を指定し、markerで上向三角形を指定
    mpf.make_addplot(price_df["h_breakout"], type='scatter',panel=0, marker="^"),
    # 安値更新位置。 scatter を指定し、markerで下向三角形を指定
    mpf.make_addplot(price_df["l_breakout"], type='scatter',panel=0, marker="v"),
    # 出来高の和。 panel=1 とすると 出来高のパネルを指定したことになる。 line を指定して折れ線グラフに。
    mpf.make_addplot(price_df["volume_sum"], type='line',panel=1, linestyle="--", color="g"),
    # 終値の前日差分。棒グラフのサンプルも欲しかったのでbarを指定。panel=2とすると新規のパネルが追加される
    mpf.make_addplot(price_df.close.diff(), type='bar',panel=2),
]

さて、これで加筆データができました。 注意する点としてはpanel ですね。panel=0は四本値のグラフで固定ですが、panel=1は次のチャート描写のメソッドで、出来高の表示をするかどうか指定するvolume引数の値で挙動が変わります。Trueなら、出来高のグラフがpanel=1で、panel=2以降が新規のグラフです。一方Falseなら、panel=1から新規のグラフです。番号を飛ばすことができず、panel=1がないとpanel=2は指定できないので注意してください。

では、前回のグラフに追加して、上記のadpの値も渡してみます。今回、パネルが3個になりますが、panel_ratios でそれぞれの幅の調整ができます。メインの四本値のパネルを大きめにしておきました。

mpf.plot(
    price_df,
    volume=True,  # 出来高も表示
    mav=[10, 20],  # 移動平均線
    addplot=adp,  # 追加指標
    figratio=(3, 2),  # 図全体の縦横比
    panel_ratios=(2, 1, 1),  # パネルの縦幅の比率
)

出来上がった図形がこちらです。

いい感じですね。細かい話ですが、高値安値線はtypeをstepにしてカクカクした線にしていて、高値の方に引いた線はtypeをlineにして斜めにつながる線にしています。好みの問題ですがこういう微調整ができるのが良いですね。

最初はデータの準備等に戸惑ったり、思うような微調整に苦戦したりするかもしれませんが、一回作ってしまうと、あとは銘柄を入れ替えたり期間を変えたりしながらパラパラといろんな検証ができます。scatterで自分の仕掛けと手仕舞いのポイント等を入れて検証したりってことも可能ですね。

mplfinanceで株価チャートを描く

以前、Pythonでローソク足のチャートを描く方法を紹介しました。
参考: pythonでローソク足を描く

この時は、mpl_financeという既にメンテナンスされていないライブラリを使っていました。まぁ、個人的にちょっと使う分には問題ないのですがやはりちょっと不安になりますよね。

その一方で、実は株価チャートを描く別のライブラリがあることがわかりました。それが、
mplfinance です。 名前が非常に似ていますが、これはアンダーバー(_)がありません。
さっき見たらGithubに15時間前のコミットがあり、しっかりメンテナンス継続中のようです。

ドキュメントとなるのは以下の二つでしょうか。Githubのチュートリアルや、サンプルコードが比較的充実しているのでこちらが使いやすそうですね。
参考1 (Github): matplotlib/mplfinance: Financial Markets Data Visualization using Matplotlib
参考2(PyPI): mplfinance · PyPI

今回はこれのチュートリアルを見ながら一番基本的なローソク足と、出来高、あとついでに移動平均線でも書いてみましょう。

まずはデータの準備です。例えば次のような4本値データがあったとしましょう。データ件数とサンプルとして最初の10行表示しています。チャートを描くので、日付と、始値、高値、安値、終値が必須で、出来高がオプションです。

print("データ件数:", len(price_df))
# データ件数: 81
print(price_df.head(10))
"""
         date    open    high     low   close    volume
0  2022-04-01  3710.0  3725.0  3670.0  3715.0  113600.0
1  2022-04-04  3685.0  3730.0  3675.0  3725.0   94200.0
2  2022-04-05  3705.0  3725.0  3675.0  3675.0  113900.0
3  2022-04-06  3650.0  3680.0  3625.0  3635.0  163800.0
4  2022-04-07  3620.0  3660.0  3615.0  3630.0   98900.0
5  2022-04-08  3700.0  3730.0  3660.0  3700.0  135100.0
6  2022-04-11  3840.0  4060.0  3810.0  4045.0  494900.0
7  2022-04-12  4030.0  4210.0  4015.0  4120.0  403000.0
8  2022-04-13  4115.0  4165.0  4080.0  4155.0  184600.0
9  2022-04-14  4085.0  4120.0  4020.0  4065.0  240900.0
"""

これをライブラリが要求する形にする必要があります。まず、日付(date)がデータフレームのインデックスに指定されていないといけません。また、データ型も文字列ではダメで、DatetimeIndexである必要があります。その変形をします。

price_df["date"] = pd.to_datetime(price_df["date"])
price_df.set_index("date", inplace=True)

print(price_df.head(5))
"""
              open    high     low   close    volume
date                                                
2022-04-01  3710.0  3725.0  3670.0  3715.0  113600.0
2022-04-04  3685.0  3730.0  3675.0  3725.0   94200.0
2022-04-05  3705.0  3725.0  3675.0  3675.0  113900.0
2022-04-06  3650.0  3680.0  3625.0  3635.0  163800.0
2022-04-07  3620.0  3660.0  3615.0  3630.0   98900.0
"""

また、データの各列の名前は[“Open”, “High”, “Low”, “Close”, “Volume”] (“Volume”は無くても可)にするよう指定されています。日本語で”始値”とか入っている場合は変換しましょう。
先頭を大文字にしないといけないのかな?と思ったのですが、全部小文字でも動くことが確認できているので、今回はこのまま使います。

なんか、ソースコードを見ると多くの人が要求したから小文字にも対応してくれたそうです。
ここ

    # We will not be fully case-insensitive (since Pandas columns as NOT case-insensitive)
    # but because so many people have requested it, for the default column names we will
    # try both Capitalized and lower case:
    columns = config['columns']
    if columns is None:
        columns =  ('Open', 'High', 'Low', 'Close', 'Volume')
        if all([c.lower() in data for c in columns[0:4]]):
            columns =  ('open', 'high', 'low', 'close', 'volume')

さて、これでデータが揃いました。

ここからの使い方はとても簡単で、こだわりがなければライブラリをimportして、plotってメソッドに渡すだけです。

import mplfinance as mpf

mpf.plot(price_df)

いわゆるOHLCチャートが出力されました。

ここから少しカスタマイズしていきます。

まず、チャートをローソク足にするのは、type=”candle”です。”candle”の他にはデフォルトの”ohlc”や、”line”, “renko”, “pnf” が指定できます。それぞれイメージ通りの出力が得られるので興味がある方は試してみてください。

出来高の追加は volume=True を指定します。

また、移動平均線は、mav 引数で指定します。整数を一つ指定すればその日数の移動平均が1本、配列等で複数の整数を渡して数本引くこともできます。

チャート全体をもう少し横長にしたいなど、縦横比率を変えたい場合は、figratioで指定します。デフォルトは(8.00,5.75) です。どうも名前の通り、数値の比だけが重要のようです。(4, 2)と(2, 1)の結果が一緒でした。

以上、やってみます。

mpf.plot(
    price_df,
    type="candle",
    volume=True,
    mav=[5, 12],
    figratio=(2, 1),
)

出力がこちらです。

いい感じですね。

この他にも見栄えを整えるオプションは多数用意さてれいるのですが、いくつかの組み合わせがstyleとして用意されています。style=”yahoo”なんてのもあります。
こちらのページにサンプルがまとまっているので、気に入ったのがあったら使ってみるのも良いと思います。(僕は一旦上の、style=”default”でいいかな。)

今回はデータを渡すだけでサクッとチャートを描いてくれるmplfinanceの基本的な使い方を紹介しました。近いうちの記事で、これに別のテクニカル指標を表示したり線や点を追加するなどのカスタマイズ方法を紹介していきたいと思います。

Jupyter notebookのファイルをコマンドラインで実行する

Jupyter notebookのファイル (.ipynbファイル)をそのまま実行したい、って場面は結構あります。notebookファイルから通常のPythonファイル(.pyファイル)に変換しておけばいいじゃないか、という意見もあると思いますし、それはそれでごもっともです。ただ、僕個人の事例で言うと、個人的に開発してるツールの中に土日に触る時はちょっとずつ編集して改良して実行し、平日はそのまま全セルを実行するだけってnotebookファイルなどもあります。そのようなファイルについて、逐一上から順番にnotebookのセルを実行していくのはやや面倒です。

と言うことで、.ipynbファイルをコマンドラインからバッチのように実行できると便利、ってことでその方法を紹介していきます。

Google等で検索するとよく出てくる方法と、もう一つ、ドキュメントを読んでいて見つけた方法があるのでそれぞれ紹介します。後者の方法の方が手軽なので、まずそちらを書きます。

jupyter execute コマンドを使う方法

一つ目に紹介する方法は、jupyter execute コマンドです。
ドキュメントはこちら。
参考: Executing notebooks — nbclient – Using a command-line interface

これはすごく簡単で、以下のコマンドで実行するだけです。

$ jupyter execute {ファイル名}.ipynb
# 以下出力
[NbClientApp] Executing {ファイル名}.ipynb
[NbClientApp] Executing notebook with kernel: python3

コマンド名は直感的でわかりやすくて記述量も少なくて僕は気に入っています。

ただし、注意点があってこの方法でnotebookを実行しても元のnotebookファイルは更新されません。つまりどう言うことかと言うと、notebook内の出力領域に表示されるはずの情報は残らないと言うことです。printしたテキストとか、matplotlib等で表示した画像などは見れず、ただプログラムが走るだけと言う状態になります。

そのため、この方法でnotebookを実行する場合は必要な出力はnotebookの外部に保存するように作っておく必要があります。必要な結果はファイルに書き出すとかDBに保存するような実装にしておきましょう。

次に紹介する方法(ググるとよく出てくる方法)では、実行結果の出力を残せるので、このexexuteコマンドでも何かオプションを指定したら実行結果を残せるだろうと思って探してたんですが、どうも今日時点ではそのような機能は実装されていなさそうです。今後に期待したいところです。

全体的にオプションも少なく、その中でも実際使えるものというと実質的に次の二つだけかなと思います。

# $ jupyter execute --help の出力結果から抜粋
--allow-errors
    Errors are ignored and execution is continued until the end of the notebook.
    Equivalent to: [--NbClientApp.allow_errors=True]
--timeout=<Int>
    The time to wait (in seconds) for output from executions. If a cell
    execution takes longer, a TimeoutError is raised. ``-1`` will disable the
    timeout.
    Default: None
    Equivalent to: [--NbClientApp.timeout]

–allow-errors をつけると、エラーが発生してもそれ以降のセルも実行されるようになります。これをつけてない場合は、エラーになったセルがあればそれ以降のセルは実行されません。
試してみたのですが、–allow-errorsをつけていると、エラーになったセルがあってもそのエラー文等は表示されないので、リスクを伴うオプションだと思います。エラーになったらその旨を外部のログに残す実装になっていないと自分で気づく手段がありません。なお、–allow-errorsをつけてない場合、エラーになるセルがあったらそこで標準エラー出力にエラーを表示して止まるので気付けます。

–timeout の方はデフォルトでタイムアウト無しになっているのであまり気にしなくても良いかと思うのですが、異常に長く時間がかかるリスクがある場合などは設定しても良いでしょう。

jupyter nbconvert コマンドを使う方法

次に紹介するのは、 jupyter nbconvert コマンドを使う方法です。jupyter notebookをコマンドライン(CUI)で使う方法として検索するとよく出てくるのはこちらの方法です。

nbconvert 自体は、notebookを実行するコマンドじゃなくて、別の形式に変換するコマンドなので、正直これをnotebookの実行に使うのって抵抗あるのですが、どういうわけかこちらの方がいろんなオプションが充実していて、実行専用と思われる先ほどの jupyter execute コマンドよりも柔軟な設定が必要です。詳細は不明ですが歴史的な経緯か何かによるものでしょうか。

ドキュメントはこちら
参考: Executing notebooks — nbconvert 7.1.0.dev0 documentation

基本的な使い方は次のようになります。–to でファイルの変換先のタイプを指定するのですが、そこでnotebookを指定して、さらに–execute をつけると実行されます。

$ jupyter nbconvert --to notebook --execute {ファイル名}.ipynb
# 以下出力
[NbConvertApp] Converting notebook {ファイル名}.ipynb to notebook
[NbConvertApp] Writing {ファイルサイズ} bytes to {ファイル名}.nbconvert.ipynb

上記の出力をみていただくと分かる通り、実行した結果を、{ファイル名}.nbconvert.ipynb という新しいファイルに書き出してくれています。これの内容がセルを(空のセルを飛ばしながら)上から順番に実行した結果になっていて、こちらの方法であればnotebookの出力領域にprintした文字列やmatplotlibの画像なども残すことができます。

細かいオプションについては、 jupyter nbconvert –help で確認可能ですが、 先ほども書きましたがexecuteよりもたくさんあります。

–allow-errors は同じように指定できますし、 –output {ファイル名} で、書き込み先のファイル名を変更することも可能です。
ちなみにデフォルトだと、上記の実行例の通り{ファイル名}.nbconvert.ipynbに書き込みますが、既に同名のファイルが存在した場合は上書きしてしまいます。そのため、毎回の実行履歴を残しておきたいならば出来上がったファイルを退避しておくか、–outputオプションで別の名前をつける必要があるでしょう。
–inplace をつけて、別ファイルに書き出すのではなくて、元のファイルを置き換えるなども可能です。この辺の細かい調整を行えるのがnbconvertの方を使える利点ですね。executeの方にも実装していただきたいものです。

まとめ

以上で、jupyter notebookファイルをコマンドラインで実行する方法を二つ紹介してきました。それぞれメリットデメリットあるので用途に応じて便利な方を使っていただけたらと思います。

Pythonで線形和割り当て問題を解く

昔、あるアルゴリズムを実装する中で使ったことがある、 linear_sum_assignment っていうscipyのメソッドを久々に使うと思ったら使い方を忘れていたのでその復習を兼ねた記事です。

これは、2部グラフの最小重みマッチングとも呼ばれている問題で、要するに、二つのグループの要素からそれぞれ1個ずつ選んだペアにコストが定義されていて、どのように組み合わせてペアを選んでいったらコストの和を最小にできるかという問題です。

この説明はわかりにくいですね。もう少し具体的なのがいいと思うので、Scipyのドキュメントで使われている例を使いましょう。

Scipyのドキュメントではworker(作業者)とjob(仕事)を例に解説されています。
参考: scipy.optimize.linear_sum_assignment — SciPy v1.9.1 マニュアル

例えば、4人の作業者がいて4つの仕事があったとします。そして、その4人がそれぞれの仕事をした場合に、かかる時間(=コスト)が次のように与えられていたとします。行列形式ですが、i行j列の値が、作業者iが仕事jを実行した場合にかかるコストです。(例を乱数で作りました。)

import numpy as np


np.random.seed(0)
cost = np.random.randint(1, 10, size=(4, 4))
print(cost)

"""
[[6 1 4 4]
 [8 4 6 3]
 [5 8 7 9]
 [9 2 7 8]]
"""

cost[1, 2] = 6 ですが、これは作業者1が仕事2を行った場合のコストが6ということです。
(インデックスが0始まりであることに注意してください。cost[1, 2]は2行3列目の要素です。)

さて、上の図を見ての通り、作業者ごとに仕事の得手不得手があり、コストが違うようです。そこで、これらの仕事をそれぞれ誰が担当したらコストの総和を最小にできるでしょうか、というのが線形和割り当て問題です。

これが、先ほどの linear_sum_assignment を使うと一発で解けます。

ドキュメントにある通り、戻り値が行のインデックス、列のインデックスと帰ってくるので注意してください。

from scipy.optimize import linear_sum_assignment


row_ind, col_ind = linear_sum_assignment(cost)
print("行:", row_ind)
print("列:", col_ind)
"""
行: [0 1 2 3]
列: [2 3 0 1]
"""

二つのarray(プリントしてるのでlistに見えますがnumpyのArrayです)が戻ってきます。
これが、worker0がjob2を担当し、worker1がjob3を担当し、、、と読んでいきます。
これがコストを最小にする組み合わせです。簡単でしたね。

さて、値の戻ってき方がちょっと独特だったのでプログラムでこれを使うにはコツが要ります。こう使うと便利だよ、ってところまでドキュメントに書いてあると嬉しいのですが、書いてないので自分で考えないといけません。

インデックスとして返ってきているので、次のようにコスト行列のインデックスにこの値を入れると、最適化された組み合わせのコストが得られます。そして、sum()すると合計が得られます。以下の通り、14が最小ということがわかります。

print(cost[row_ind, col_ind].sum())
# 14

Scipyの実装を疑うわけではないのですが、念の為、本当にこの組み合わせが最適で14が最小なのか、全組み合わせ見ておきましょう。itertools.permutationsを使います。

from itertools import permutations


for perm in permutations(range(4)):
    print(list(perm), "=>", cost[range(4), perm].sum())
"""
[0, 1, 2, 3] => 25
[0, 1, 3, 2] => 26
[0, 2, 1, 3] => 28
[0, 2, 3, 1] => 23
[0, 3, 1, 2] => 24
[0, 3, 2, 1] => 18
[1, 0, 2, 3] => 24
[1, 0, 3, 2] => 25
[1, 2, 0, 3] => 20
[1, 2, 3, 0] => 25
[1, 3, 0, 2] => 16
[1, 3, 2, 0] => 20
[2, 0, 1, 3] => 28
[2, 0, 3, 1] => 23
[2, 1, 0, 3] => 21
[2, 1, 3, 0] => 26
[2, 3, 0, 1] => 14
[2, 3, 1, 0] => 24
[3, 0, 1, 2] => 27
[3, 0, 2, 1] => 21
[3, 1, 0, 2] => 20
[3, 1, 2, 0] => 24
[3, 2, 0, 1] => 17
[3, 2, 1, 0] => 27
"""

どうやらあってそうですね。

col_ind の方を使って、行列を並び替えることもできます。i行目のworkerがi列目のjobを担当する直感的に見やすい行列が次のようにして得られます。

print(cost[:, col_ind])
"""
[[4 4 6 1]
 [6 3 8 4]
 [7 9 5 8]
 [7 8 9 2]]
"""

また、解きたい問題や実装によっては、この行と列の対応を辞書にしたほうが使いやすいこともあるでしょう。そのような時はdictとzipで変換します。

print(dict(zip(row_ind, col_ind)))
# {0: 2, 1: 3, 2: 0, 3: 1}

ここまでの例では、与えられた行列はコストの行列でこれを最小化したい、という問題設定でやってきました。ただ、場合によっては利益やスコアの行列が与えられて、最大化する組み合わせを探したいという場合もあると思います。行列にマイナス掛けて同じことすればいいのですが、linear_sum_assignment自体にもそれに対応した引数があります。

それが、maximize で、 デフォルトはFalseですが、Trueにすると最大化を目指すようになります。同じ行列でやってみます。さっき全パターン列挙しているので正解はわかっていて、[0, 2, 1, 3]か[2, 0, 1, 3]のどちらかが得られるはずです。

print(linear_sum_assignment(cost, maximize=True))
# (array([0, 1, 2, 3]), array([0, 2, 1, 3]))

[0, 2, 1, 3]の方が出ててきましたね。

ここまで、正方行列を取り上げてきましたが、linear_sum_assignment は、一般行列についても実行できます。行と列の数が違う場合は、行と列のうち数が小さい方に揃えて、実行されます。

まず、行が多い(workerが多い)場合をやってみましょう。7行4列で、7人のworkerがいて、jobが4つあって、コストがそれぞれ定義されていた場合に、どの4人を選抜してそれぞれにどの4つのタスクをやってもらうのが最適か、という問題を解くのと対応します。

np.random.seed(0)
cost = np.random.randint(1, 10, size=(7, 4))
print(cost)
"""
[[6 1 4 4]
 [8 4 6 3]
 [5 8 7 9]
 [9 2 7 8]
 [8 9 2 6]
 [9 5 4 1]
 [4 6 1 3]]
"""

row_ind, col_ind = linear_sum_assignment(cost)
print("行:", row_ind)
print("列:", col_ind)
"""
行: [0 2 5 6]
列: [1 0 3 2]
"""

次に同様に横長の行列の場合です。例えば4人のworkerがいて7つのjobがあったときに、どの4つのjobを選んで実行したら利益を最大化できるか、って問題がこれに相当します。(最小化でいい例が思いつかなかったのでこれは最大化でやります。)

np.random.seed(0)
score = np.random.randint(1, 10, size=(4, 7))
print(score)
"""
[[6 1 4 4 8 4 6]
 [3 5 8 7 9 9 2]
 [7 8 8 9 2 6 9]
 [5 4 1 4 6 1 3]]
"""

row_ind, col_ind = linear_sum_assignment(score, maximize=True)
print("行:", row_ind)
print("列:", col_ind)
"""
行: [0 1 2 3]
列: [4 5 3 0]
"""

以上が linear_sum_assignment の基本的な使い方になります。

既存のディレクトリをGit管理するようにし、別ディレクトリのリポジトリへPushする

Gitの操作メモです。

2記事に分けて書こうかと思ったのですが、ほとんどの人にとってあまり有益な情報でない気がしたし、おそらく自分も今後やらないと思うのでまとめて書きます。やることは次の二つです。
1. 既存のディレクトリをGit管理するようにする。
2. Githubなどではなく、別のディレクトリにbareリポジトリを置いてそこにプッシュする。

要するに自分が、git remote とか git init –bare とかのコマンドをこれまで使ったことなくて、今回初めてやる機会があったからメモを残そうとしています。

これまで、自分がGitを使うときは、何かのプロジェクトに参画してリモートからCloneしてきて作業を始めたり、新規のプロジェクトを立ち上げる時はGithubに空っぽのリポジトリを作ってそれをローカルにCloneして作業を始めたりしていました。

ただ、今回は特に何かのプロジェクトに属してるわけではない雑多な作業や調査のファイル群たちをバックアップ取るようにしたくなり、ついでにGit管理するようにしたくなったのです。

それで、普通はGithubにプライベートリポジトリを作ればいいのですが、今回のはローカル端末から外に出す予定がなかったファイル群(主にjupyter notebook)なので、内容にAPIキーなどの認証情報等も含まれていてプライベートリポジトリであってもGithubに上げるの嫌だなってことで別の方法を探しました。その結果、NASのファイルサーバーの自分しか見れない領域にリポジトリを作ってそっちで管理しようってのが今回の発端です。

ではさっそくやっていきます。

1. 既存のディレクトリをGit管理するようにする

こちらは簡単ですね。基本的には、git init するだけです。ただ、最近の潮流にも考慮して、デフォルトブランチをmasterではなくmainにします。また、最初のコミットは空コミットにしておけというアドバイスも見かけたのでそれにも従います。ブランチ名の変更は初回コミットが無いとできないようだったので、次の順番で実行してください。

# Gitで管理したいディレクトリの内側に移動する
$ cd {Gitで管理したいディレクトリ}
# リポジトリを作成する
$ git init
# 出力
> Initialized empty Git repository in /{ディレクトリパス}/.git/
# 空コミットを許可するオプションをつけて最初のコミットを実行
$ git commit --allow-empty -m "first commit"
# ブランチ名を変更する
$ git branch -M main

これでリポジトリができました。

2. Pushするリポジトリを作成する

次にPush先のリポジトリを作成します。いわゆる bareリポジトリというやつです。

ローカルに作ると端末破損時等のバックアップにならないので、/Volumes/ の配下にマウントしているNASに作ります。(僕の端末はMacです。別OSでは別のパスになると思います。)

bareリポジトリは初めて作ったのですが、通常のリポジトリみたいに .git ディレクトリができてその中に各種ファイルが作成されると思っていたら、コマンドを実行したカレントディレクトリにgit関連のディレクトリが複数発生してしまって焦りました。

git init –bare する時はディレクトリ名を指定して作成するのが作法のようです。そして、慣習としてそのディレクトリ名(リポジトリ名)はhogehoge.git とするのが作法とのこと。そのようにします。(ただ、ディレクトリ名に拡張子っぽく.が入ってるのが少し慣れません)

# リポジトリを作成したいディレクトリに移動する
$ cd /Volumes/{パス}
$ git init --bare {リポジトリ名}.git

こうして出来上がる、 /Volumes/{パス}/{リポジトリ名}.git/ がプッシュ先のリポジトリです。
ちなみにその配下には以下のようなファイルやディレクトリができています。

$ ls {リポジトリ名}.git/
HEAD        config      description hooks       info        objects     refs

3. リモートリポジトリを設定する

プッシュ先のリポジトリができたので、元のリポジトリがここにPushできるように設定します。Githubでいつも使っている、originって名前でPushできるようにします。名前自体はbentouでもhottomottoでも何でもいいらしいのですが、こだわった名前使うメリットもないと思います。

git remote は初めて使いました。ドキュメントはこちらです。
参考: git-remote

# 元のリポジトリに移動
$ cd {最初にリポジトリを作ったディレクトリ}
# リモートディレクトリを設定する
$ git remote add origin /Volumes/{パス}/{リポジトリ名}.git/
# 設定されたことを確認する
$ git remote -v
origin	/Volumes/{パス}/{リポジトリ名}.git/ (fetch)
origin	/Volumes/{パス}/{リポジトリ名}.git/ (push)
# Push
$ git push origin main

これで設定が完了したので、いつもGithubでやっているのと同じようにコードを管理できるようになりました。

Pythonでファイルの更新時刻やファイルサイズの情報を取得する

パソコン(ここではMacを想定)内のファイルを整理していて、古いファイルなどをリストアップしようとしたときのメモです。
更新時刻を取得するのはBashコマンドでもできますしファインダーでも見れて、僕も普段はそうしているのですが、一旦気合入れて整理しようと思ったときにこれらの方法がやや使いにくかったのでPythonでやることを検討しました。

結論から言うと、Pythonのosモジュールを使うと実装できます。
os.stat ってのがファイルの情報を取得する関数で、結果はstat_result というオブジェクトで帰ってきます。

ドキュメントはこちら。
参考: os — 雑多なオペレーティングシステムインターフェース — Python 3.10.6 ドキュメント

サンプルとしてこんなファイルがあったとしましょう。

$ ls -la sample.txt
-rw-r--r--  1 {user} {group}  7  9  5 01:01 sample.txt

これの情報を取得するには次のようにします。

import os


file_path = "./sample.txt"
file_info = os.stat(file_path)
print(file_info)
"""
os.stat_result(st_mode=33188, st_ino=10433402, st_dev=16777220, st_nlink=1,
st_uid=501, st_gid=20, st_size=7,
st_atime=1662307286, st_mtime=1662307285, st_ctime=1662307285)
"""

st_atimeが最終アクセス時刻、st_mtimeが最終更新時刻です。
printすると出てきませんが、st_birthtimeなんてのもあってこれがファイルの作成時刻です。

これらの値は普通に属性なので、.(ドット)で繋いでアクセスできます。

注意しないといけないのは、実行しているOSによって取得できる値に違いがあり、取得できなかったり取得できるけど意味が違ったりするものがあることです。

詳しくはドキュメントに書いてあります。
class os.stat_result

st_ctime はUNIXではメタデータの最終更新時刻で、Windows では作成時刻、単位は秒など色々違いますね。
なんとなく使わずにきちんと動作を確認して使うことが重要でしょう。

また、元々の目的が更新時刻の取得だったのですが、ついでにst_size でファイルサイズも取得できています。
上の例で見ていただくと、 st_size=7 となっていて、その上のlsの結果と一致します。

さて、以上でファイルの更新時刻やサイズが取得できたのですが、更新時刻(を含む事故国関係の情報一式)はUNIX時間で得られます。
人間にとって使いにくいので、以前紹介した方法で変換しましょう。
参考: Pythonで時刻をUNIX時間に変換する方法やPandasのデータを使う時の注意点

from datetime import datetime


# ファイル作成時刻
print(datetime.fromtimestamp(file_info.st_birthtime))
# 2022-09-05 01:01:13.879805

# 最終内容更新時刻
print(datetime.fromtimestamp(file_info.st_mtime))
# 2022-09-05 01:01:25.663676

# 最終アクセス時刻
print(datetime.fromtimestamp(file_info.st_atime))
# 2022-09-05 01:01:26.286258

非常に簡単ですね。
あとは globか何かでファイルパスの一覧を作成してDataFrame化して、applyでさっと処理して仕舞えば少々ファイルが多くてもすぐリスト化できそうです。

Pythonのdataclassを使ってみた

Pythonの標準ライブラリにdataclassというのがあるの見つけたので使ってみました。
参考: dataclasses — データクラス — Python 3.10.6 ドキュメント

名前から、オリジナルのデータ型を定義するためのモジュールなのかなとも思ったのですが実際は少し違いそうです。もちろん、オリジナルのデータ型を定義するためにも使えるのですが、その実態は、クラスに対して__init__()__repr__()といった特殊メソッドを自動的に生成してくれるデコレーターという解釈が正確のようです。

お試しに、証券コードと会社名と説明を属性として持ったCompanyクラスを作ってみましょう。

import dataclasses


@dataclasses.dataclass
class Company:
    code: int
    name: str
    description: str = None


# __init__() メソッドが自動的に定義されているためこれでインスタンスを作成できる
toyota = Company(code=7203, name="トヨタ自動車", description="自動車メーカー")
suzuki = Company(code=7269, name="スズキ", description="自動車メーカー")


# __eq__() メソッドが自動的に定義されているため、比較ができる
toyota == suzuki
# False

これは属性がたった3個だけで、メソッドも持ってないようなクラスなのですが、__init__()が入らないってだけでものすごくシンプルに描けるようになりましたね。

また、比較用のメソッドを自分で作らなくても、各属性が全て一致しているかどうかを基準に一致不一致を判定してくれるのも便利です。属性が3個だけだとそこまでありがたみがないですが、もっと大規模なクラスで、全属性の一致を判定するのは無駄にコードが長くなりますから。

int とか str と書いて型ヒントをつけられたりするのも今風な感じがします。ただ、この型ヒントはどうやらただのアノテーションで、代入する値に対する強制力などはないようです。
codeを整数、nameを文字列としていますがそうでない値も入ります。

dummy_company = Company(code="文字列", name=1234)
print(dummy_company)
# Company(code='文字列', name=1234, description=None)

この記事の冒頭で書いていますが、このdataclassはclassに対するデコレーターなので、ただのオリジナルデータ型ではなく、普通にメソッド等を持っているクラスを作成することもできます。その場合__init__()などを自分で書かなくて良くなるので、特に凝った__init__()が不要な場合はバンバン使って良さそうです。例えば、二つの値を持ち、合計値を返せるクラスは次のようになります。

@dataclasses.dataclass
class two_number:
    a: int
    b: int

    def sum(self):
        return self.a + self.b


tn = two_number(5, 8)
print(tn.sum())
# 13

自分が実装したいメソッドの部分に専念できるのはいいですね。

このdataclassのデコレーターですが、デコレーター自体も引数を取って、色々設定することができます。詳細はドキュメントに譲りますが、例えばinit=Falseやrepr=False, eq=Falseを指定すると、デフォルトで生成されると言っていた__init__()や__repr__()、__eq__()などが生成されなくなります。自分で実装したいものがあったらそれだけ自分で実装するようにしましょう。

frozen (デフォルトはFalse) を Trueに指定すると、値への代入が禁止されます。これのメリットしては辞書のキーとして使えるようになることでしょうか。ちょっとやってみます。
1個目の例は上で作ったCompanyなので、frozenはFalseです。その次がfrozen=True。

# frozen = False だと属性に値を代入できる
toyota.description  = "日本の自動車メーカー"
print(toyota)


# frozen=Trueを指定してみる
@dataclasses.dataclass(frozen=True)
class Frozen_Company:
    code: int
    name: str
    description: str = None


f_toyota = Frozen_Company(code=7203, name="トヨタ自動車", description="自動車メーカー")

# frozen = True だと属性に値を代入できないため、例外が上がる
try:
    f_toyota.description  = "日本の自動車メーカー"
except Exception as e:
    print(type(e), ":", e)
#  <class 'dataclasses.FrozenInstanceError'> : cannot assign to field 'description'

frozenにするメリットとしては、タプルと同様に辞書のキーにできる、という点があります。

# frozenではない、つまりハッシュ化不可能なので辞書のキーにできない
try:
    {toyota: 1}
except Exception as e:
    print(type(e), ":", e)
# <class 'TypeError'> : unhashable type: 'Company'

# frozen=Trueだとハッシュ化可能なので辞書のキーにできる
{f_toyota: 1}
# {Frozen_Company(code=7203, name='トヨタ自動車', description='自動車メーカー'): 1}

また、order という引数(デフォルトFalse)にTrueを渡すと、__le__()等々の不等号を実装する特殊メソッドたち4種も自動的に生成してくれるようになります。どうも要素を順番に比較して最初に上下がついたもので決まるようです。これも属性が多い時は便利なのではないでしょうか。

@dataclasses.dataclass(order=True)
class two_number:
    a: int
    b: int

tn1 = two_number(12, 5)
tn2 = two_number(6, 20)
tn1 > tn2
# True

さて、以上でdataclass自体の基本的な説明はおしまいです。

あとは偶然気づいた豆知識なのですが、Pandasとの連携について紹介します。
dataclassの配列は、簡単にPandasのDataFrameに変換できます。実質的にdictみたいに振る舞ってくれるようです。

import pandas as pd


df = pd.DataFrame([toyota, suzuki])
print(df)
"""
   code    name description
0  7203  トヨタ自動車  日本の自動車メーカー
1  7269     スズキ     自動車メーカー
"""

便利ですね。

逆に、DataFrameに入った値たちをdataclassで定義したクラスのインスタンスに変換したいな、と思って方法探しました。専用のメソッドなどは見つかってないのですが、lamda関数を使ってこのようにするのが良いでしょう。

df.apply(lambda row: Company(**row), axis=1)
"""
0    Company(code=7203, name='トヨタ自動車', description=...
1    Company(code=7269, name='スズキ', description='自動...
dtype: object
"""

事前にDataFrameの列名と、dataclassの属性名を揃えておく必要はあるのでそこは注意してください。

PythonでUUIDを生成する

ある作業をやっているときに、データの塊ごとにユニークなidを振りたいことがありました。
通常は0から順番に番号振っていけば十分なのですが、今回は分散処理していて個別の処理でバッティングしないようにidを振りたかったのです。これでも番号のレンジを分けておけば十分なのですが、世の中にはUUIDって仕組みがあるのでこれを試すことにしました。

UUIDの詳細はWikipediaをご参照ください。要するにユニークなIDを発行する仕組みです。
参考: UUID – Wikipedia

UUIDってバージョン1〜5があって仕組みが違っていたんですね。自分はMACアドレスと時刻を使ってIDを生成する方法だと認識していたのですが、それはバージョン1のことだったようです。

PythonでUUIDを利用したい場合は、uuidという標準ライブラリが使えます。
参考: uuid — UUID objects according to RFC 4122

バージョン1, 3, 4, 5 の4種類のUUIDが実装されているようです。
とりあえず動かしてみますか。バージョン3と5は名前空間と名前に対してIDを振り分けるので、とりあえずこのブログのドメイン名を渡しています。

import uuid


uuid.uuid1()
# UUID('9f15c3a4-2173-11ed-bb9d-dca9048ad673')

uuid.uuid3(uuid.NAMESPACE_DNS, "analytics-note.xyz")
# UUID('30b2104c-d522-3f77-9fe8-6863bd4a6cda')

uuid.uuid4()
# UUID('8e489abb-a18a-4f0c-80d1-30ae0ea83d81')

uuid.uuid5(uuid.NAMESPACE_DNS, "analytics-note.xyz")
# UUID('feeaf7d1-3987-5451-9a28-afd72801a03b')

それぞれ、UUIDが生成できましたね。ハイフンで区切られた3つめの塊の1文字目の数字がバージョン番号なのですが、ちゃんと1,3,4,5となっています。

uuid3とuuid5 は渡した名前に対してIDを割り当てているので、引数が同じなら結果はずっと同じです。
uuid1は時刻を、uuid4は乱数を用いているので実行するたびに生成されるIDが変わります。

この記事冒頭の目的にではuuid4を使えば良いでしょう。

上記の結果を見ても分かる通り、結果はUUIDというクラスのオブジェクトとして返ってきます。一応typeを見ておきましょう。

sample_id = uuid.uuid4()
print(type(sample_id))
# <class 'uuid.UUID'>

str で文字型にキャストすることもできますし、プロパティのhexやintで16進法表示や整数表示も得られます。(なぜstrはプロパティではないのか不思議です)

print(str(sample_id))
# 61d67c09-4a78-4d03-a6f0-8f1843673083

print(sample_id.hex)
# 61d67c094a784d03a6f08f1843673083

print(sample_id.int)
# 130048782873754933734408394572189610115

文字列からUUIDオブジェクトを作ることもできます。ただ、これはいつ使うものなのか不明です。

uuid.UUID("61d67c09-4a78-4d03-a6f0-8f1843673083")
# UUID('61d67c09-4a78-4d03-a6f0-8f1843673083')

あとは気になるのは処理時間ですね。ID作成に時間がかかるようだと困るので。
ただ、ちょっと実験したところそこまで大きな問題もなさそうでした。
100万回実行するのに3秒未満で完了しています。

%%time
for i in range(1000000):
    uuid.uuid4()

"""
CPU times: user 2.37 s, sys: 407 ms, total: 2.77 s
Wall time: 2.79 s
"""

2標本コルモゴロフ–スミルノフ検定を試す

前回の記事の続きです。
参考: 1標本コルモゴロフ–スミルノフ検定について理解する

コルモゴロフ-スミルノフ検定(K-S検定)は、一つの標本が何かしらの確率分布に従っているかどうかだけではなく、二つの標本について、それらが同一の確率分布から生成されたものかどうかの判定にも利用可能です。

Wikipediaを見ると、KS検定は1標本より2標本の時の利用の方が推されてますね。2標本それぞれの確率分布が不明となるとノンパラメトリックな手法が有効になるのでしょう。

前の記事みたいに数式をつらつらと書いていこうかと思ったのですが、1標本との違いは、統計量を計算するときに、経験分布と検定対象の累積分布関数の差分の上限(sup)を取るのではなく、その二つの標本それぞれの経験分布に対して、差分の上限の上限(sup)を取るというだけです。KS検定について調べるような人でこの説明でわからないという人もいないと思うので、数式等は省略します。(前回の記事を参照してください。)

便利な特徴としては、2標本のそれぞれのサイズは等しくなくても使える点ですね。
ただ、色々試した結果、それなりにサンプルサイズが大きくないとあまり使い物にならなさそうです。

この記事では、Pythonで実際に検定をやってみます。
前回のt分布からの標本と正規分布は、標準偏差が違うとはいえ流石に似すぎだったので、今回はちょっと変えて、カイ2乗分布と、正規分布を使います。
カイ2乗分布の自由度は4とし、すると期待値4、分散8になるので、正規分布の方もそれに揃えます。期待値や分散が違うならt検定やF検定で十分ですからね。

とりあえず分布関数を作り、可視化して見比べておきます。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp
from scipy.stats import norm
from scipy.stats import chi2


chi2_frozen = chi2.freeze(df=4)  # 自由度4のカイ2乗分布 (期待値4, 分散8)
norm_frozen = norm.freeze(loc=4, scale=8**0.5)  # 正規分布 (期待値4, 分散8)

# 確率密度関数を可視化
xticks = np.linspace(-10, 20, 301)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, chi2_frozen.pdf(xticks), label="カイ2乗分布")
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.legend()
plt.show()

出力はこちら。これはちゃんと検定で見分けて欲しいですね。

続いて、それぞれの分布から標本を取ります。サンプルサイズが違っても使える手法なので、意図的にサンプルサイズ変えました。

# それぞれの分布から標本を生成
chi2_samples = chi2_frozen.rvs(size=200, random_state=0)  # カイ2乗分布からの標本
norm_samples = norm_frozen.rvs(size=150, random_state=0)  # 正規分布からの標本

さて、これでデータが取れたので、2標本KS検定やっていきます。
非常に簡単で、ks_2samp ってメソッドに渡すだけです。
ドキュメント: scipy.stats.ks_2samp — SciPy v1.9.0 Manual

例によって、alternative 引数で両側検定/片側検定などを指定できますが、今回両側でやるので何も指定せずにデフォルトのまま使います。

帰無仮説は「二つの標本が従う確率分布は等しい」です。有意水準は0.05とします。

結果がこちら。

print(ks_2samp(chi2_samples, norm_samples))
# KstestResult(statistic=0.17, pvalue=0.012637307799274388)

統計量とp値が表示されました。p値が0.05を下回りましたので、帰無仮説を棄却し、二つの標本が従う確率分布は異なっていると判断します。

サンプルサイズさえそこそこ確保できれば、非常に手軽に使えて便利な手法だと思いました。

あとは個人的には、KS検定って5段階や10段階評価のような離散分布でも使えるのかどうか気になっています。まぁ、5段階の場合は単純にカイ2乗検定しても良いのですが、10段階以上になってくると自由度が高くてカイ2乗検定があまり使いやすくないので。何か情報がないか追加で探したり、自分でシミュレーションしたりしてこの辺をもう少し調べようと思います。

1標本コルモゴロフ–スミルノフ検定について理解する

何らかのデータ(標本)が、特定の確率分布に従ってるかどうかを検定したいことって頻繁にあると思います。そのような場合に使えるコルモゴロフ–スミルノフ検定(Kolmogorov–Smirnov test, KS検定)という手法があるのでそれを紹介します。

取り上げられている書籍を探したのですが、手元に見当たらなかったので説明はWikipediaを参照しました。
参考: コルモゴロフ–スミルノフ検定 – Wikipedia

1標本と、特定の確率分布についてその標本が対象の確率分布に従っていることを帰無仮説として検定を行います。

この記事では、scipyを使った実装と、それを理解するための検証をやっていきたいと思います。

とりあえずこの記事で使うライブラリをインポートし、データを準備します。
標本は、自由度5のt分布からサンプリングし、それが正規分布に従ってないことを検定で見ていきましょう。scipyのパラメータはどちらの分布もloc=10, scale=10 としました。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import kstest
from scipy.stats import kstwo
from scipy.stats import t
from scipy.stats import norm


norm_frozen = norm.freeze(loc=10, scale=10)  # 検定する分布
t_frozen = t.freeze(df=5, loc=10, scale=10)  # 標本をサンプリングする分布
t_samples = t_frozen.rvs(size=1000, random_state=0)  # 標本を作成

# 各データを可視化
xticks = np.linspace(-40, 60, 1001)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.plot(xticks, t_frozen.pdf(xticks), label="t分布")
ax.hist(t_samples, density=True, bins=50, alpha=0.5, label="t分布からの標本")
ax.legend()
plt.show()

作成された図がこちらです。流石にt分布と正規分布は似ていますね。ぱっと見でこの標本が正規分布に従ってないと言い切るのは難しそうです。

それでは、早速scipyで(1標本)KS検定をやっていきましょう。これは専用の関数が用意されているので非常に簡単です。
参考: scipy.stats.kstest — SciPy v1.9.0 Manual

rvs引数に標本データ、cdf引数に検定したい分布の累積分布関数を指定してあげれば大丈夫です。alternative で両側検定、片側検定などを指定できます。今回は両側で行きます。有意水準は0.05としましょう。

ks_result = kstest(rvs=t_sample, cdf=norm_frozen.cdf, alternative="two-sided")

print(ks_result)
# KstestResult(statistic=0.04361195130340301, pvalue=0.04325168699194537)

# 統計量と引数を変数に入れておく
ks_value = ks_result.statistic
p_value = ks_result.pvalue

はい、p値が約0.043 で0.05を下回ったので、この標本が正規分布にし違うという帰無仮説は棄却されましたね。

ちなみに、標本をサンプリングした元のt分布でやると棄却されません。これも想定通りの挙動ですね。

print( kstest(rvs=t_sample, cdf=t_frozen.cdf, alternative="two-sided"))
# KstestResult(statistic=0.019824350810698554, pvalue=0.8191867386190135)

一点注意ですが、どのような仮説検定でもそうである通り、帰無仮説が棄却されなかったからと言って正しいことが証明されたわけではありません。(帰無仮説は採択されない。)
特にKS検定については、標本サイズを変えて何度も試してみたのですが、標本サイズが小さい時は誤った帰無仮説を棄却できないことがかなりあるようです。

さて、scipyで実行してみて、この検定が使えるようになったのですが、より理解を深めるために、自分で統計量を計算してみようと思います。

KS検定の統計量の計算には、経験分布というものを使います。要するに標本から作成した累積分布関数ですね。数式として書くと標本$y_1, y_2,\dots, y_n$に対する経験分布$F_n$は次のようになります。

$$F_n(x) = \frac{\#\{1\le i\le n | y_i \le n\}}{n}$$

要するに$x$以下の標本を数えて標本のサイズ$n$で割ってるだけです。

そして、検定したい分布の分布関数$F$とこの$F_n$に対して、KS検定の統計量は次のように計算されます。

$$\begin{align}
D^{+}_n &= \sup_x\left(F_n(x)-F(x)\right)\\
D^{-}_n &= \sup_x\left(F(x)-F_n(x)\right)\\
D_n &= \max(D^{+}_n, D^{-}_n)
\end{align}$$

max(最大値)ではなく、sup(上限)で定義されているのがポイントですね。
とはいえ、分布関数の方が一般的な十分滑らかな関数であれば、$D^{+}_n$の方はmaxでもsupでも同じですが,$D^{-}_n$の方はsupが効いてきます。

この$D_n$の直感的なイメージを説明すると、$F$と$F_n$の差が一番大きくなるところの値を取ってくることになるでしょうか。

経験分布の方を実装して可視化してみます。

# 経験分布関数
def empirical_distribution(x, samples):
    return len([y for y in samples if y <= x])/len(samples)


xticks_2 = np.linspace(-40, 60, 100001)  # メモリを細かめに取る
# 経験分布関数の値
empirical_cdf = np.array([empirical_distribution(x, t_samples) for x in xticks_2])

# 可視化
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks_2, norm_frozen.cdf(xticks_2), label="正規分布")
ax.plot(xticks_2, empirical_cdf, label="経験分布")
ax.legend()
plt.show()

出てきた図がこちらです。標本サイズが大きいのでよくみないとわかりませんが、オレンジの方は階段状の関数になっています。

この、青線(検定する正規分布)とオレンジの線(標本からの経験分布)が一番離れたところの差を統計量にしましょう、というのが考え方です。

では$D^+_n$から計算しましょう。実は(今回の例では)青線が滑らかなのに対して、オレンジの線が階段状になっているので、この段が上がるポイント、つまり標本が存在した点での値だけ調べると$D^+_n$が計算できます。

dplus = max([empirical_distribution(x, t_samples) - norm_frozen.cdf(x) for x in t_samples])
print(dplus)
# 0.031236203108694176

次に$D^-_n$の方ですが、これは少し厄介です。サンプルが存在する点ではなく、その近傍を調べて階段の根元の値と累積分布関数の差を取りたいんですよね(ここでmaxではくsupが採用されていた意味が出てくる)。1e-10くらいの極小の定数を使って計算することも考えましたが、経験分布関数の方を少しいじって計算してみることにしました。どういうことかというと、x以下の要素の割合ではなくx未満の要素の割合を返すようにします。これを使うとサンプルが存在した点については階段の根元の値が取れるようになるので、これを使って$D^+_n$と同様に計算してみます。

def empirical_distribution_2(x, samples):
    return len([y for y in samples if y < x])/len(samples)


dminus = max([norm_frozen.cdf(x) - empirical_distribution_2(x, t_samples) for x in t_samples])
print(dminus)
# 0.04361195130340301

さて、必要だった統計量は$D^+_n$,$D^-_n$の最大値です。これをライブラリが計算してくれた統計量と比較してみましょう。一致しますね。

print("自分で計算したKS値:", max([dplus, dminus]))
# 自分で計算したKS値: 0.04361195130340301
print("scipyのKS値:", ks_vakue)
# scipyのKS値: 0.04361195130340301

さて、統計量がわかったら次はp値を計算します。ウィキペディアを見ると、この統計量は無限和を含むそこそこ厄介な確率分布に従うことが知られているそうです。
これからp値を求めるのは流石に手間なのと、幸い、scipyに実装があるので(scipyの検証にscipyを使うのは不本意ですが)ここは甘えようと思います。

非常にありがたいことに、kstwoksoneという両側検定、片側検定に対応してそれぞれ分布関数を用意してくれています。

統計量は求まっていますし、分布関数もあるのでp値はすぐ出せます。

print(kstwo.sf(max([dplus, dminus]), n=1000))
# 0.04325168699194537

# 参考 kstestから取得したp値
print(p_value)
# 0.04325168699194537

最後、少し妥協しましたが、追々kstwo分布についても自分でスクラッチ実装して検証しておこうと思います。

今回の検証でコルモゴロフ–スミルノフ検定についてだいぶ理解が深まったので、これからバシバシ使っていこうと思います。

1標本だけでなく2標本でそれぞれの分布が等しいかどうかを検定するって使い方もできるので、次の記事はそれを取り上げようかなと思っています。