2標本コルモゴロフ–スミルノフ検定を試す

前回の記事の続きです。
参考: 1標本コルモゴロフ–スミルノフ検定について理解する

コルモゴロフ-スミルノフ検定(K-S検定)は、一つの標本が何かしらの確率分布に従っているかどうかだけではなく、二つの標本について、それらが同一の確率分布から生成されたものかどうかの判定にも利用可能です。

Wikipediaを見ると、KS検定は1標本より2標本の時の利用の方が推されてますね。2標本それぞれの確率分布が不明となるとノンパラメトリックな手法が有効になるのでしょう。

前の記事みたいに数式をつらつらと書いていこうかと思ったのですが、1標本との違いは、統計量を計算するときに、経験分布と検定対象の累積分布関数の差分の上限(sup)を取るのではなく、その二つの標本それぞれの経験分布に対して、差分の上限の上限(sup)を取るというだけです。KS検定について調べるような人でこの説明でわからないという人もいないと思うので、数式等は省略します。(前回の記事を参照してください。)

便利な特徴としては、2標本のそれぞれのサイズは等しくなくても使える点ですね。
ただ、色々試した結果、それなりにサンプルサイズが大きくないとあまり使い物にならなさそうです。

この記事では、Pythonで実際に検定をやってみます。
前回のt分布からの標本と正規分布は、標準偏差が違うとはいえ流石に似すぎだったので、今回はちょっと変えて、カイ2乗分布と、正規分布を使います。
カイ2乗分布の自由度は4とし、すると期待値4、分散8になるので、正規分布の方もそれに揃えます。期待値や分散が違うならt検定やF検定で十分ですからね。

とりあえず分布関数を作り、可視化して見比べておきます。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp
from scipy.stats import norm
from scipy.stats import chi2


chi2_frozen = chi2.freeze(df=4)  # 自由度4のカイ2乗分布 (期待値4, 分散8)
norm_frozen = norm.freeze(loc=4, scale=8**0.5)  # 正規分布 (期待値4, 分散8)

# 確率密度関数を可視化
xticks = np.linspace(-10, 20, 301)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, chi2_frozen.pdf(xticks), label="カイ2乗分布")
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.legend()
plt.show()

出力はこちら。これはちゃんと検定で見分けて欲しいですね。

続いて、それぞれの分布から標本を取ります。サンプルサイズが違っても使える手法なので、意図的にサンプルサイズ変えました。

# それぞれの分布から標本を生成
chi2_samples = chi2_frozen.rvs(size=200, random_state=0)  # カイ2乗分布からの標本
norm_samples = norm_frozen.rvs(size=150, random_state=0)  # 正規分布からの標本

さて、これでデータが取れたので、2標本KS検定やっていきます。
非常に簡単で、ks_2samp ってメソッドに渡すだけです。
ドキュメント: scipy.stats.ks_2samp — SciPy v1.9.0 Manual

例によって、alternative 引数で両側検定/片側検定などを指定できますが、今回両側でやるので何も指定せずにデフォルトのまま使います。

帰無仮説は「二つの標本が従う確率分布は等しい」です。有意水準は0.05とします。

結果がこちら。

print(ks_2samp(chi2_samples, norm_samples))
# KstestResult(statistic=0.17, pvalue=0.012637307799274388)

統計量とp値が表示されました。p値が0.05を下回りましたので、帰無仮説を棄却し、二つの標本が従う確率分布は異なっていると判断します。

サンプルサイズさえそこそこ確保できれば、非常に手軽に使えて便利な手法だと思いました。

あとは個人的には、KS検定って5段階や10段階評価のような離散分布でも使えるのかどうか気になっています。まぁ、5段階の場合は単純にカイ2乗検定しても良いのですが、10段階以上になってくると自由度が高くてカイ2乗検定があまり使いやすくないので。何か情報がないか追加で探したり、自分でシミュレーションしたりしてこの辺をもう少し調べようと思います。

1標本コルモゴロフ–スミルノフ検定について理解する

何らかのデータ(標本)が、特定の確率分布に従ってるかどうかを検定したいことって頻繁にあると思います。そのような場合に使えるコルモゴロフ–スミルノフ検定(Kolmogorov–Smirnov test, KS検定)という手法があるのでそれを紹介します。

取り上げられている書籍を探したのですが、手元に見当たらなかったので説明はWikipediaを参照しました。
参考: コルモゴロフ–スミルノフ検定 – Wikipedia

1標本と、特定の確率分布についてその標本が対象の確率分布に従っていることを帰無仮説として検定を行います。

この記事では、scipyを使った実装と、それを理解するための検証をやっていきたいと思います。

とりあえずこの記事で使うライブラリをインポートし、データを準備します。
標本は、自由度5のt分布からサンプリングし、それが正規分布に従ってないことを検定で見ていきましょう。scipyのパラメータはどちらの分布もloc=10, scale=10 としました。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import kstest
from scipy.stats import kstwo
from scipy.stats import t
from scipy.stats import norm


norm_frozen = norm.freeze(loc=10, scale=10)  # 検定する分布
t_frozen = t.freeze(df=5, loc=10, scale=10)  # 標本をサンプリングする分布
t_samples = t_frozen.rvs(size=1000, random_state=0)  # 標本を作成

# 各データを可視化
xticks = np.linspace(-40, 60, 1001)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.plot(xticks, t_frozen.pdf(xticks), label="t分布")
ax.hist(t_samples, density=True, bins=50, alpha=0.5, label="t分布からの標本")
ax.legend()
plt.show()

作成された図がこちらです。流石にt分布と正規分布は似ていますね。ぱっと見でこの標本が正規分布に従ってないと言い切るのは難しそうです。

それでは、早速scipyで(1標本)KS検定をやっていきましょう。これは専用の関数が用意されているので非常に簡単です。
参考: scipy.stats.kstest — SciPy v1.9.0 Manual

rvs引数に標本データ、cdf引数に検定したい分布の累積分布関数を指定してあげれば大丈夫です。alternative で両側検定、片側検定などを指定できます。今回は両側で行きます。有意水準は0.05としましょう。

ks_result = kstest(rvs=t_sample, cdf=norm_frozen.cdf, alternative="two-sided")

print(ks_result)
# KstestResult(statistic=0.04361195130340301, pvalue=0.04325168699194537)

# 統計量と引数を変数に入れておく
ks_value = ks_result.statistic
p_value = ks_result.pvalue

はい、p値が約0.043 で0.05を下回ったので、この標本が正規分布にし違うという帰無仮説は棄却されましたね。

ちなみに、標本をサンプリングした元のt分布でやると棄却されません。これも想定通りの挙動ですね。

print( kstest(rvs=t_sample, cdf=t_frozen.cdf, alternative="two-sided"))
# KstestResult(statistic=0.019824350810698554, pvalue=0.8191867386190135)

一点注意ですが、どのような仮説検定でもそうである通り、帰無仮説が棄却されなかったからと言って正しいことが証明されたわけではありません。(帰無仮説は採択されない。)
特にKS検定については、標本サイズを変えて何度も試してみたのですが、標本サイズが小さい時は誤った帰無仮説を棄却できないことがかなりあるようです。

さて、scipyで実行してみて、この検定が使えるようになったのですが、より理解を深めるために、自分で統計量を計算してみようと思います。

KS検定の統計量の計算には、経験分布というものを使います。要するに標本から作成した累積分布関数ですね。数式として書くと標本$y_1, y_2,\dots, y_n$に対する経験分布$F_n$は次のようになります。

$$F_n(x) = \frac{\#\{1\le i\le n | y_i \le n\}}{n}$$

要するに$x$以下の標本を数えて標本のサイズ$n$で割ってるだけです。

そして、検定したい分布の分布関数$F$とこの$F_n$に対して、KS検定の統計量は次のように計算されます。

$$\begin{align}
D^{+}_n &= \sup_x\left(F_n(x)-F(x)\right)\\
D^{-}_n &= \sup_x\left(F(x)-F_n(x)\right)\\
D_n &= \max(D^{+}_n, D^{-}_n)
\end{align}$$

max(最大値)ではなく、sup(上限)で定義されているのがポイントですね。
とはいえ、分布関数の方が一般的な十分滑らかな関数であれば、$D^{+}_n$の方はmaxでもsupでも同じですが,$D^{-}_n$の方はsupが効いてきます。

この$D_n$の直感的なイメージを説明すると、$F$と$F_n$の差が一番大きくなるところの値を取ってくることになるでしょうか。

経験分布の方を実装して可視化してみます。

# 経験分布関数
def empirical_distribution(x, samples):
    return len([y for y in samples if y <= x])/len(samples)


xticks_2 = np.linspace(-40, 60, 100001)  # メモリを細かめに取る
# 経験分布関数の値
empirical_cdf = np.array([empirical_distribution(x, t_samples) for x in xticks_2])

# 可視化
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks_2, norm_frozen.cdf(xticks_2), label="正規分布")
ax.plot(xticks_2, empirical_cdf, label="経験分布")
ax.legend()
plt.show()

出てきた図がこちらです。標本サイズが大きいのでよくみないとわかりませんが、オレンジの方は階段状の関数になっています。

この、青線(検定する正規分布)とオレンジの線(標本からの経験分布)が一番離れたところの差を統計量にしましょう、というのが考え方です。

では$D^+_n$から計算しましょう。実は(今回の例では)青線が滑らかなのに対して、オレンジの線が階段状になっているので、この段が上がるポイント、つまり標本が存在した点での値だけ調べると$D^+_n$が計算できます。

dplus = max([empirical_distribution(x, t_samples) - norm_frozen.cdf(x) for x in t_samples])
print(dplus)
# 0.031236203108694176

次に$D^-_n$の方ですが、これは少し厄介です。サンプルが存在する点ではなく、その近傍を調べて階段の根元の値と累積分布関数の差を取りたいんですよね(ここでmaxではくsupが採用されていた意味が出てくる)。1e-10くらいの極小の定数を使って計算することも考えましたが、経験分布関数の方を少しいじって計算してみることにしました。どういうことかというと、x以下の要素の割合ではなくx未満の要素の割合を返すようにします。これを使うとサンプルが存在した点については階段の根元の値が取れるようになるので、これを使って$D^+_n$と同様に計算してみます。

def empirical_distribution_2(x, samples):
    return len([y for y in samples if y < x])/len(samples)


dminus = max([norm_frozen.cdf(x) - empirical_distribution_2(x, t_samples) for x in t_samples])
print(dminus)
# 0.04361195130340301

さて、必要だった統計量は$D^+_n$,$D^-_n$の最大値です。これをライブラリが計算してくれた統計量と比較してみましょう。一致しますね。

print("自分で計算したKS値:", max([dplus, dminus]))
# 自分で計算したKS値: 0.04361195130340301
print("scipyのKS値:", ks_vakue)
# scipyのKS値: 0.04361195130340301

さて、統計量がわかったら次はp値を計算します。ウィキペディアを見ると、この統計量は無限和を含むそこそこ厄介な確率分布に従うことが知られているそうです。
これからp値を求めるのは流石に手間なのと、幸い、scipyに実装があるので(scipyの検証にscipyを使うのは不本意ですが)ここは甘えようと思います。

非常にありがたいことに、kstwoksoneという両側検定、片側検定に対応してそれぞれ分布関数を用意してくれています。

統計量は求まっていますし、分布関数もあるのでp値はすぐ出せます。

print(kstwo.sf(max([dplus, dminus]), n=1000))
# 0.04325168699194537

# 参考 kstestから取得したp値
print(p_value)
# 0.04325168699194537

最後、少し妥協しましたが、追々kstwo分布についても自分でスクラッチ実装して検証しておこうと思います。

今回の検証でコルモゴロフ–スミルノフ検定についてだいぶ理解が深まったので、これからバシバシ使っていこうと思います。

1標本だけでなく2標本でそれぞれの分布が等しいかどうかを検定するって使い方もできるので、次の記事はそれを取り上げようかなと思っています。

boto3で Amazon S3 のファイルを直接読み書きする

以前の記事で、S3にディスク上のファイルをアップロードしたり逆にS3からディスクにダウンロードしたりする方法を紹介しました。
参考: boto3でS3のファイル操作

ただ、実際に使っているとディスクを経由せずにS3のファイルを読み込んでそのままプログラムで処理したかったり、結果を直接S3に書き込みたい場面もあります。その方法をまだ書いてなかったのでまとめておきます。

正直に言うと、本当にやりたかったのはS3上のテキストファイルに直接どんどんテキストを追記していく操作だったのですが、どうやらS3はオブジェクトの出し入れにしか対応しておらず、追記などの編集はできないそうです。結局やりたかったことはできなかったのですが、この方法を探すためにドキュメントを読み込んだんのでその時間を無駄にしないようにしようと言う記事です。

読んだドキュメントはこれです。putが書き込み、getが読み込み
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.put
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.get

さて、見ていきましょう。boto3はS3に関してはresource APIも用意されているのでこちらを使います。

まず、S3への書き込みです。これは、S3上のオブジェクトを取得し、オブジェクトputメソッドを使います。オブジェクトが存在しない状態でもkeyを取得できる、ってことと、putするときにデータはバイナリにエンコードしておかないといけないと言う2点がつまずきポイントでしょうか。また、上で追記はできないって書いてますが、既存のオブジェクトのキーを指定しまうとファイルが丸ごと上書きされていまいます。その点も注意しましょう。

import boto3


text = "書き込みたいテキストの内容"
data = text.encode("utf-8")  # エンコードしてバイナリデータにする
# もちろん、 data = "書き込みたいテキスト".encode("utf-8")でも良い。

# 書き込み先のバケットと、オブジェクトキーを設定。
bucket_name = "{バケット名}"
key = "{書き込み先ファイル名}"  # hogehoge.text など。

s3 = boto3.resource("s3")
# オブジェクトを取得
s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=data)  # resultに書き込み結果のステータスコードなどの辞書が戻る

これで、バケットにデータが作成されます。「s3.Object(bucket_name, key)」としてオブジェクトを一発で取ってますが、これはまずバケットを指定して、その後、そのバケット内のオブジェクトを指定しても良いです。一つのバケットに何ファイルも書き込む場合などに使えるかもしれません。以降に紹介するgetでも状況は同様です。

# 以下の二行で、 s3_object = s3.Object(bucket_name, key) と同じ
s3_bucket = s3.Bucket(bucket_name)
s3_object = s3_bucket.Object(key)

PandasのデータフレームをS3に保存したい場合などは、to_csvと組み合わせると良いでしょう。これファイル名のところにNoneを渡せばファイルに書き込まずにCSVのテキストを返してくれます。それをエンコードして書き込んだらOKです。

df = pd.DataFrame({
    "id": [0, 1, 2],
    "text": ["あいうえお", "かきくけこ", "さしすせそ"],
})
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=df.to_csv(None, index=None).encode("utf-8"))

次は読み込みです。これはオブジェクトのメソッド、getを利用します。
さっき書き込んだCSVファイル読み込んでみましょうかね。結果は辞書で返ってきますが、key=”Body”に対応しているのが欲しいデータです。

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
s3_object.get()["Body"]
# <botocore.response.StreamingBody at 0x1179bc910>

StreamingBody なる型で返ってきました。これ、with openして 通常のファイルを読み込むときと同じように read()のメソッドを持っているので、それを使えます。また、読み込んだデータはバイナリ型なので文字列に戻すならdecodeが必要です。

csv_text = s3_object.get()["Body"].read().decode("utf-8")
print(csv_text)
"""
id,text
0,あいうえお
1,かきくけこ
2,さしすせそ
"""

元のファイルがテキストファイルであればこれで読み込み完了ですね。

ちなみに、元々DataFrameを書き込んだものなので、DataFrameに読み込みたいと言うケースもあると思います。その場合、テキストに変換を終えたデータではなく、StreamingBodyを使います。つまりこうです。

df = pd.read_csv(s3_object.get()["Body"])
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

これで、ディスクを経由することなくメモリ上のテキストやデータフレームをS3に書き込んだり逆にS3からメモリに読み込んだりできるようになりました。

最初にやりたいと言ってた追記なんですが、これはもう一度読み込んでテキストなりデータフレームなりに新しいデータを追加して改めて書き込むしか無さそうです。