既存のディレクトリをGit管理するようにし、別ディレクトリのリポジトリへPushする

Gitの操作メモです。

2記事に分けて書こうかと思ったのですが、ほとんどの人にとってあまり有益な情報でない気がしたし、おそらく自分も今後やらないと思うのでまとめて書きます。やることは次の二つです。
1. 既存のディレクトリをGit管理するようにする。
2. Githubなどではなく、別のディレクトリにbareリポジトリを置いてそこにプッシュする。

要するに自分が、git remote とか git init –bare とかのコマンドをこれまで使ったことなくて、今回初めてやる機会があったからメモを残そうとしています。

これまで、自分がGitを使うときは、何かのプロジェクトに参画してリモートからCloneしてきて作業を始めたり、新規のプロジェクトを立ち上げる時はGithubに空っぽのリポジトリを作ってそれをローカルにCloneして作業を始めたりしていました。

ただ、今回は特に何かのプロジェクトに属してるわけではない雑多な作業や調査のファイル群たちをバックアップ取るようにしたくなり、ついでにGit管理するようにしたくなったのです。

それで、普通はGithubにプライベートリポジトリを作ればいいのですが、今回のはローカル端末から外に出す予定がなかったファイル群(主にjupyter notebook)なので、内容にAPIキーなどの認証情報等も含まれていてプライベートリポジトリであってもGithubに上げるの嫌だなってことで別の方法を探しました。その結果、NASのファイルサーバーの自分しか見れない領域にリポジトリを作ってそっちで管理しようってのが今回の発端です。

ではさっそくやっていきます。

1. 既存のディレクトリをGit管理するようにする

こちらは簡単ですね。基本的には、git init するだけです。ただ、最近の潮流にも考慮して、デフォルトブランチをmasterではなくmainにします。また、最初のコミットは空コミットにしておけというアドバイスも見かけたのでそれにも従います。ブランチ名の変更は初回コミットが無いとできないようだったので、次の順番で実行してください。

# Gitで管理したいディレクトリの内側に移動する
$ cd {Gitで管理したいディレクトリ}
# リポジトリを作成する
$ git init
# 出力
> Initialized empty Git repository in /{ディレクトリパス}/.git/
# 空コミットを許可するオプションをつけて最初のコミットを実行
$ git commit --allow-empty -m "first commit"
# ブランチ名を変更する
$ git branch -M main

これでリポジトリができました。

2. Pushするリポジトリを作成する

次にPush先のリポジトリを作成します。いわゆる bareリポジトリというやつです。

ローカルに作ると端末破損時等のバックアップにならないので、/Volumes/ の配下にマウントしているNASに作ります。(僕の端末はMacです。別OSでは別のパスになると思います。)

bareリポジトリは初めて作ったのですが、通常のリポジトリみたいに .git ディレクトリができてその中に各種ファイルが作成されると思っていたら、コマンドを実行したカレントディレクトリにgit関連のディレクトリが複数発生してしまって焦りました。

git init –bare する時はディレクトリ名を指定して作成するのが作法のようです。そして、慣習としてそのディレクトリ名(リポジトリ名)はhogehoge.git とするのが作法とのこと。そのようにします。(ただ、ディレクトリ名に拡張子っぽく.が入ってるのが少し慣れません)

# リポジトリを作成したいディレクトリに移動する
$ cd /Volumes/{パス}
$ git init --bare {リポジトリ名}.git

こうして出来上がる、 /Volumes/{パス}/{リポジトリ名}.git/ がプッシュ先のリポジトリです。
ちなみにその配下には以下のようなファイルやディレクトリができています。

$ ls {リポジトリ名}.git/
HEAD        config      description hooks       info        objects     refs

3. リモートリポジトリを設定する

プッシュ先のリポジトリができたので、元のリポジトリがここにPushできるように設定します。Githubでいつも使っている、originって名前でPushできるようにします。名前自体はbentouでもhottomottoでも何でもいいらしいのですが、こだわった名前使うメリットもないと思います。

git remote は初めて使いました。ドキュメントはこちらです。
参考: git-remote

# 元のリポジトリに移動
$ cd {最初にリポジトリを作ったディレクトリ}
# リモートディレクトリを設定する
$ git remote add origin /Volumes/{パス}/{リポジトリ名}.git/
# 設定されたことを確認する
$ git remote -v
origin	/Volumes/{パス}/{リポジトリ名}.git/ (fetch)
origin	/Volumes/{パス}/{リポジトリ名}.git/ (push)
# Push
$ git push origin main

これで設定が完了したので、いつもGithubでやっているのと同じようにコードを管理できるようになりました。

Pythonでファイルの更新時刻やファイルサイズの情報を取得する

パソコン(ここではMacを想定)内のファイルを整理していて、古いファイルなどをリストアップしようとしたときのメモです。
更新時刻を取得するのはBashコマンドでもできますしファインダーでも見れて、僕も普段はそうしているのですが、一旦気合入れて整理しようと思ったときにこれらの方法がやや使いにくかったのでPythonでやることを検討しました。

結論から言うと、Pythonのosモジュールを使うと実装できます。
os.stat ってのがファイルの情報を取得する関数で、結果はstat_result というオブジェクトで帰ってきます。

ドキュメントはこちら。
参考: os — 雑多なオペレーティングシステムインターフェース — Python 3.10.6 ドキュメント

サンプルとしてこんなファイルがあったとしましょう。

$ ls -la sample.txt
-rw-r--r--  1 {user} {group}  7  9  5 01:01 sample.txt

これの情報を取得するには次のようにします。

import os


file_path = "./sample.txt"
file_info = os.stat(file_path)
print(file_info)
"""
os.stat_result(st_mode=33188, st_ino=10433402, st_dev=16777220, st_nlink=1,
st_uid=501, st_gid=20, st_size=7,
st_atime=1662307286, st_mtime=1662307285, st_ctime=1662307285)
"""

st_atimeが最終アクセス時刻、st_mtimeが最終更新時刻です。
printすると出てきませんが、st_birthtimeなんてのもあってこれがファイルの作成時刻です。

これらの値は普通に属性なので、.(ドット)で繋いでアクセスできます。

注意しないといけないのは、実行しているOSによって取得できる値に違いがあり、取得できなかったり取得できるけど意味が違ったりするものがあることです。

詳しくはドキュメントに書いてあります。
class os.stat_result

st_ctime はUNIXではメタデータの最終更新時刻で、Windows では作成時刻、単位は秒など色々違いますね。
なんとなく使わずにきちんと動作を確認して使うことが重要でしょう。

また、元々の目的が更新時刻の取得だったのですが、ついでにst_size でファイルサイズも取得できています。
上の例で見ていただくと、 st_size=7 となっていて、その上のlsの結果と一致します。

さて、以上でファイルの更新時刻やサイズが取得できたのですが、更新時刻(を含む事故国関係の情報一式)はUNIX時間で得られます。
人間にとって使いにくいので、以前紹介した方法で変換しましょう。
参考: Pythonで時刻をUNIX時間に変換する方法やPandasのデータを使う時の注意点

from datetime import datetime


# ファイル作成時刻
print(datetime.fromtimestamp(file_info.st_birthtime))
# 2022-09-05 01:01:13.879805

# 最終内容更新時刻
print(datetime.fromtimestamp(file_info.st_mtime))
# 2022-09-05 01:01:25.663676

# 最終アクセス時刻
print(datetime.fromtimestamp(file_info.st_atime))
# 2022-09-05 01:01:26.286258

非常に簡単ですね。
あとは globか何かでファイルパスの一覧を作成してDataFrame化して、applyでさっと処理して仕舞えば少々ファイルが多くてもすぐリスト化できそうです。

Pythonのdataclassを使ってみた

Pythonの標準ライブラリにdataclassというのがあるの見つけたので使ってみました。
参考: dataclasses — データクラス — Python 3.10.6 ドキュメント

名前から、オリジナルのデータ型を定義するためのモジュールなのかなとも思ったのですが実際は少し違いそうです。もちろん、オリジナルのデータ型を定義するためにも使えるのですが、その実態は、クラスに対して__init__()__repr__()といった特殊メソッドを自動的に生成してくれるデコレーターという解釈が正確のようです。

お試しに、証券コードと会社名と説明を属性として持ったCompanyクラスを作ってみましょう。

import dataclasses


@dataclasses.dataclass
class Company:
    code: int
    name: str
    description: str = None


# __init__() メソッドが自動的に定義されているためこれでインスタンスを作成できる
toyota = Company(code=7203, name="トヨタ自動車", description="自動車メーカー")
suzuki = Company(code=7269, name="スズキ", description="自動車メーカー")


# __eq__() メソッドが自動的に定義されているため、比較ができる
toyota == suzuki
# False

これは属性がたった3個だけで、メソッドも持ってないようなクラスなのですが、__init__()が入らないってだけでものすごくシンプルに描けるようになりましたね。

また、比較用のメソッドを自分で作らなくても、各属性が全て一致しているかどうかを基準に一致不一致を判定してくれるのも便利です。属性が3個だけだとそこまでありがたみがないですが、もっと大規模なクラスで、全属性の一致を判定するのは無駄にコードが長くなりますから。

int とか str と書いて型ヒントをつけられたりするのも今風な感じがします。ただ、この型ヒントはどうやらただのアノテーションで、代入する値に対する強制力などはないようです。
codeを整数、nameを文字列としていますがそうでない値も入ります。

dummy_company = Company(code="文字列", name=1234)
print(dummy_company)
# Company(code='文字列', name=1234, description=None)

この記事の冒頭で書いていますが、このdataclassはclassに対するデコレーターなので、ただのオリジナルデータ型ではなく、普通にメソッド等を持っているクラスを作成することもできます。その場合__init__()などを自分で書かなくて良くなるので、特に凝った__init__()が不要な場合はバンバン使って良さそうです。例えば、二つの値を持ち、合計値を返せるクラスは次のようになります。

@dataclasses.dataclass
class two_number:
    a: int
    b: int

    def sum(self):
        return self.a + self.b


tn = two_number(5, 8)
print(tn.sum())
# 13

自分が実装したいメソッドの部分に専念できるのはいいですね。

このdataclassのデコレーターですが、デコレーター自体も引数を取って、色々設定することができます。詳細はドキュメントに譲りますが、例えばinit=Falseやrepr=False, eq=Falseを指定すると、デフォルトで生成されると言っていた__init__()や__repr__()、__eq__()などが生成されなくなります。自分で実装したいものがあったらそれだけ自分で実装するようにしましょう。

frozen (デフォルトはFalse) を Trueに指定すると、値への代入が禁止されます。これのメリットしては辞書のキーとして使えるようになることでしょうか。ちょっとやってみます。
1個目の例は上で作ったCompanyなので、frozenはFalseです。その次がfrozen=True。

# frozen = False だと属性に値を代入できる
toyota.description  = "日本の自動車メーカー"
print(toyota)


# frozen=Trueを指定してみる
@dataclasses.dataclass(frozen=True)
class Frozen_Company:
    code: int
    name: str
    description: str = None


f_toyota = Frozen_Company(code=7203, name="トヨタ自動車", description="自動車メーカー")

# frozen = True だと属性に値を代入できないため、例外が上がる
try:
    f_toyota.description  = "日本の自動車メーカー"
except Exception as e:
    print(type(e), ":", e)
#  <class 'dataclasses.FrozenInstanceError'> : cannot assign to field 'description'

frozenにするメリットとしては、タプルと同様に辞書のキーにできる、という点があります。

# frozenではない、つまりハッシュ化不可能なので辞書のキーにできない
try:
    {toyota: 1}
except Exception as e:
    print(type(e), ":", e)
# <class 'TypeError'> : unhashable type: 'Company'

# frozen=Trueだとハッシュ化可能なので辞書のキーにできる
{f_toyota: 1}
# {Frozen_Company(code=7203, name='トヨタ自動車', description='自動車メーカー'): 1}

また、order という引数(デフォルトFalse)にTrueを渡すと、__le__()等々の不等号を実装する特殊メソッドたち4種も自動的に生成してくれるようになります。どうも要素を順番に比較して最初に上下がついたもので決まるようです。これも属性が多い時は便利なのではないでしょうか。

@dataclasses.dataclass(order=True)
class two_number:
    a: int
    b: int

tn1 = two_number(12, 5)
tn2 = two_number(6, 20)
tn1 > tn2
# True

さて、以上でdataclass自体の基本的な説明はおしまいです。

あとは偶然気づいた豆知識なのですが、Pandasとの連携について紹介します。
dataclassの配列は、簡単にPandasのDataFrameに変換できます。実質的にdictみたいに振る舞ってくれるようです。

import pandas as pd


df = pd.DataFrame([toyota, suzuki])
print(df)
"""
   code    name description
0  7203  トヨタ自動車  日本の自動車メーカー
1  7269     スズキ     自動車メーカー
"""

便利ですね。

逆に、DataFrameに入った値たちをdataclassで定義したクラスのインスタンスに変換したいな、と思って方法探しました。専用のメソッドなどは見つかってないのですが、lamda関数を使ってこのようにするのが良いでしょう。

df.apply(lambda row: Company(**row), axis=1)
"""
0    Company(code=7203, name='トヨタ自動車', description=...
1    Company(code=7269, name='スズキ', description='自動...
dtype: object
"""

事前にDataFrameの列名と、dataclassの属性名を揃えておく必要はあるのでそこは注意してください。

PythonでUUIDを生成する

ある作業をやっているときに、データの塊ごとにユニークなidを振りたいことがありました。
通常は0から順番に番号振っていけば十分なのですが、今回は分散処理していて個別の処理でバッティングしないようにidを振りたかったのです。これでも番号のレンジを分けておけば十分なのですが、世の中にはUUIDって仕組みがあるのでこれを試すことにしました。

UUIDの詳細はWikipediaをご参照ください。要するにユニークなIDを発行する仕組みです。
参考: UUID – Wikipedia

UUIDってバージョン1〜5があって仕組みが違っていたんですね。自分はMACアドレスと時刻を使ってIDを生成する方法だと認識していたのですが、それはバージョン1のことだったようです。

PythonでUUIDを利用したい場合は、uuidという標準ライブラリが使えます。
参考: uuid — UUID objects according to RFC 4122

バージョン1, 3, 4, 5 の4種類のUUIDが実装されているようです。
とりあえず動かしてみますか。バージョン3と5は名前空間と名前に対してIDを振り分けるので、とりあえずこのブログのドメイン名を渡しています。

import uuid


uuid.uuid1()
# UUID('9f15c3a4-2173-11ed-bb9d-dca9048ad673')

uuid.uuid3(uuid.NAMESPACE_DNS, "analytics-note.xyz")
# UUID('30b2104c-d522-3f77-9fe8-6863bd4a6cda')

uuid.uuid4()
# UUID('8e489abb-a18a-4f0c-80d1-30ae0ea83d81')

uuid.uuid5(uuid.NAMESPACE_DNS, "analytics-note.xyz")
# UUID('feeaf7d1-3987-5451-9a28-afd72801a03b')

それぞれ、UUIDが生成できましたね。ハイフンで区切られた3つめの塊の1文字目の数字がバージョン番号なのですが、ちゃんと1,3,4,5となっています。

uuid3とuuid5 は渡した名前に対してIDを割り当てているので、引数が同じなら結果はずっと同じです。
uuid1は時刻を、uuid4は乱数を用いているので実行するたびに生成されるIDが変わります。

この記事冒頭の目的にではuuid4を使えば良いでしょう。

上記の結果を見ても分かる通り、結果はUUIDというクラスのオブジェクトとして返ってきます。一応typeを見ておきましょう。

sample_id = uuid.uuid4()
print(type(sample_id))
# <class 'uuid.UUID'>

str で文字型にキャストすることもできますし、プロパティのhexやintで16進法表示や整数表示も得られます。(なぜstrはプロパティではないのか不思議です)

print(str(sample_id))
# 61d67c09-4a78-4d03-a6f0-8f1843673083

print(sample_id.hex)
# 61d67c094a784d03a6f08f1843673083

print(sample_id.int)
# 130048782873754933734408394572189610115

文字列からUUIDオブジェクトを作ることもできます。ただ、これはいつ使うものなのか不明です。

uuid.UUID("61d67c09-4a78-4d03-a6f0-8f1843673083")
# UUID('61d67c09-4a78-4d03-a6f0-8f1843673083')

あとは気になるのは処理時間ですね。ID作成に時間がかかるようだと困るので。
ただ、ちょっと実験したところそこまで大きな問題もなさそうでした。
100万回実行するのに3秒未満で完了しています。

%%time
for i in range(1000000):
    uuid.uuid4()

"""
CPU times: user 2.37 s, sys: 407 ms, total: 2.77 s
Wall time: 2.79 s
"""

2標本コルモゴロフ–スミルノフ検定を試す

前回の記事の続きです。
参考: 1標本コルモゴロフ–スミルノフ検定について理解する

コルモゴロフ-スミルノフ検定(K-S検定)は、一つの標本が何かしらの確率分布に従っているかどうかだけではなく、二つの標本について、それらが同一の確率分布から生成されたものかどうかの判定にも利用可能です。

Wikipediaを見ると、KS検定は1標本より2標本の時の利用の方が推されてますね。2標本それぞれの確率分布が不明となるとノンパラメトリックな手法が有効になるのでしょう。

前の記事みたいに数式をつらつらと書いていこうかと思ったのですが、1標本との違いは、統計量を計算するときに、経験分布と検定対象の累積分布関数の差分の上限(sup)を取るのではなく、その二つの標本それぞれの経験分布に対して、差分の上限の上限(sup)を取るというだけです。KS検定について調べるような人でこの説明でわからないという人もいないと思うので、数式等は省略します。(前回の記事を参照してください。)

便利な特徴としては、2標本のそれぞれのサイズは等しくなくても使える点ですね。
ただ、色々試した結果、それなりにサンプルサイズが大きくないとあまり使い物にならなさそうです。

この記事では、Pythonで実際に検定をやってみます。
前回のt分布からの標本と正規分布は、標準偏差が違うとはいえ流石に似すぎだったので、今回はちょっと変えて、カイ2乗分布と、正規分布を使います。
カイ2乗分布の自由度は4とし、すると期待値4、分散8になるので、正規分布の方もそれに揃えます。期待値や分散が違うならt検定やF検定で十分ですからね。

とりあえず分布関数を作り、可視化して見比べておきます。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp
from scipy.stats import norm
from scipy.stats import chi2


chi2_frozen = chi2.freeze(df=4)  # 自由度4のカイ2乗分布 (期待値4, 分散8)
norm_frozen = norm.freeze(loc=4, scale=8**0.5)  # 正規分布 (期待値4, 分散8)

# 確率密度関数を可視化
xticks = np.linspace(-10, 20, 301)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, chi2_frozen.pdf(xticks), label="カイ2乗分布")
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.legend()
plt.show()

出力はこちら。これはちゃんと検定で見分けて欲しいですね。

続いて、それぞれの分布から標本を取ります。サンプルサイズが違っても使える手法なので、意図的にサンプルサイズ変えました。

# それぞれの分布から標本を生成
chi2_samples = chi2_frozen.rvs(size=200, random_state=0)  # カイ2乗分布からの標本
norm_samples = norm_frozen.rvs(size=150, random_state=0)  # 正規分布からの標本

さて、これでデータが取れたので、2標本KS検定やっていきます。
非常に簡単で、ks_2samp ってメソッドに渡すだけです。
ドキュメント: scipy.stats.ks_2samp — SciPy v1.9.0 Manual

例によって、alternative 引数で両側検定/片側検定などを指定できますが、今回両側でやるので何も指定せずにデフォルトのまま使います。

帰無仮説は「二つの標本が従う確率分布は等しい」です。有意水準は0.05とします。

結果がこちら。

print(ks_2samp(chi2_samples, norm_samples))
# KstestResult(statistic=0.17, pvalue=0.012637307799274388)

統計量とp値が表示されました。p値が0.05を下回りましたので、帰無仮説を棄却し、二つの標本が従う確率分布は異なっていると判断します。

サンプルサイズさえそこそこ確保できれば、非常に手軽に使えて便利な手法だと思いました。

あとは個人的には、KS検定って5段階や10段階評価のような離散分布でも使えるのかどうか気になっています。まぁ、5段階の場合は単純にカイ2乗検定しても良いのですが、10段階以上になってくると自由度が高くてカイ2乗検定があまり使いやすくないので。何か情報がないか追加で探したり、自分でシミュレーションしたりしてこの辺をもう少し調べようと思います。

1標本コルモゴロフ–スミルノフ検定について理解する

何らかのデータ(標本)が、特定の確率分布に従ってるかどうかを検定したいことって頻繁にあると思います。そのような場合に使えるコルモゴロフ–スミルノフ検定(Kolmogorov–Smirnov test, KS検定)という手法があるのでそれを紹介します。

取り上げられている書籍を探したのですが、手元に見当たらなかったので説明はWikipediaを参照しました。
参考: コルモゴロフ–スミルノフ検定 – Wikipedia

1標本と、特定の確率分布についてその標本が対象の確率分布に従っていることを帰無仮説として検定を行います。

この記事では、scipyを使った実装と、それを理解するための検証をやっていきたいと思います。

とりあえずこの記事で使うライブラリをインポートし、データを準備します。
標本は、自由度5のt分布からサンプリングし、それが正規分布に従ってないことを検定で見ていきましょう。scipyのパラメータはどちらの分布もloc=10, scale=10 としました。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import kstest
from scipy.stats import kstwo
from scipy.stats import t
from scipy.stats import norm


norm_frozen = norm.freeze(loc=10, scale=10)  # 検定する分布
t_frozen = t.freeze(df=5, loc=10, scale=10)  # 標本をサンプリングする分布
t_samples = t_frozen.rvs(size=1000, random_state=0)  # 標本を作成

# 各データを可視化
xticks = np.linspace(-40, 60, 1001)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.plot(xticks, t_frozen.pdf(xticks), label="t分布")
ax.hist(t_samples, density=True, bins=50, alpha=0.5, label="t分布からの標本")
ax.legend()
plt.show()

作成された図がこちらです。流石にt分布と正規分布は似ていますね。ぱっと見でこの標本が正規分布に従ってないと言い切るのは難しそうです。

それでは、早速scipyで(1標本)KS検定をやっていきましょう。これは専用の関数が用意されているので非常に簡単です。
参考: scipy.stats.kstest — SciPy v1.9.0 Manual

rvs引数に標本データ、cdf引数に検定したい分布の累積分布関数を指定してあげれば大丈夫です。alternative で両側検定、片側検定などを指定できます。今回は両側で行きます。有意水準は0.05としましょう。

ks_result = kstest(rvs=t_sample, cdf=norm_frozen.cdf, alternative="two-sided")

print(ks_result)
# KstestResult(statistic=0.04361195130340301, pvalue=0.04325168699194537)

# 統計量と引数を変数に入れておく
ks_value = ks_result.statistic
p_value = ks_result.pvalue

はい、p値が約0.043 で0.05を下回ったので、この標本が正規分布にし違うという帰無仮説は棄却されましたね。

ちなみに、標本をサンプリングした元のt分布でやると棄却されません。これも想定通りの挙動ですね。

print( kstest(rvs=t_sample, cdf=t_frozen.cdf, alternative="two-sided"))
# KstestResult(statistic=0.019824350810698554, pvalue=0.8191867386190135)

一点注意ですが、どのような仮説検定でもそうである通り、帰無仮説が棄却されなかったからと言って正しいことが証明されたわけではありません。(帰無仮説は採択されない。)
特にKS検定については、標本サイズを変えて何度も試してみたのですが、標本サイズが小さい時は誤った帰無仮説を棄却できないことがかなりあるようです。

さて、scipyで実行してみて、この検定が使えるようになったのですが、より理解を深めるために、自分で統計量を計算してみようと思います。

KS検定の統計量の計算には、経験分布というものを使います。要するに標本から作成した累積分布関数ですね。数式として書くと標本$y_1, y_2,\dots, y_n$に対する経験分布$F_n$は次のようになります。

$$F_n(x) = \frac{\#\{1\le i\le n | y_i \le n\}}{n}$$

要するに$x$以下の標本を数えて標本のサイズ$n$で割ってるだけです。

そして、検定したい分布の分布関数$F$とこの$F_n$に対して、KS検定の統計量は次のように計算されます。

$$\begin{align}
D^{+}_n &= \sup_x\left(F_n(x)-F(x)\right)\\
D^{-}_n &= \sup_x\left(F(x)-F_n(x)\right)\\
D_n &= \max(D^{+}_n, D^{-}_n)
\end{align}$$

max(最大値)ではなく、sup(上限)で定義されているのがポイントですね。
とはいえ、分布関数の方が一般的な十分滑らかな関数であれば、$D^{+}_n$の方はmaxでもsupでも同じですが,$D^{-}_n$の方はsupが効いてきます。

この$D_n$の直感的なイメージを説明すると、$F$と$F_n$の差が一番大きくなるところの値を取ってくることになるでしょうか。

経験分布の方を実装して可視化してみます。

# 経験分布関数
def empirical_distribution(x, samples):
    return len([y for y in samples if y <= x])/len(samples)


xticks_2 = np.linspace(-40, 60, 100001)  # メモリを細かめに取る
# 経験分布関数の値
empirical_cdf = np.array([empirical_distribution(x, t_samples) for x in xticks_2])

# 可視化
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks_2, norm_frozen.cdf(xticks_2), label="正規分布")
ax.plot(xticks_2, empirical_cdf, label="経験分布")
ax.legend()
plt.show()

出てきた図がこちらです。標本サイズが大きいのでよくみないとわかりませんが、オレンジの方は階段状の関数になっています。

この、青線(検定する正規分布)とオレンジの線(標本からの経験分布)が一番離れたところの差を統計量にしましょう、というのが考え方です。

では$D^+_n$から計算しましょう。実は(今回の例では)青線が滑らかなのに対して、オレンジの線が階段状になっているので、この段が上がるポイント、つまり標本が存在した点での値だけ調べると$D^+_n$が計算できます。

dplus = max([empirical_distribution(x, t_samples) - norm_frozen.cdf(x) for x in t_samples])
print(dplus)
# 0.031236203108694176

次に$D^-_n$の方ですが、これは少し厄介です。サンプルが存在する点ではなく、その近傍を調べて階段の根元の値と累積分布関数の差を取りたいんですよね(ここでmaxではくsupが採用されていた意味が出てくる)。1e-10くらいの極小の定数を使って計算することも考えましたが、経験分布関数の方を少しいじって計算してみることにしました。どういうことかというと、x以下の要素の割合ではなくx未満の要素の割合を返すようにします。これを使うとサンプルが存在した点については階段の根元の値が取れるようになるので、これを使って$D^+_n$と同様に計算してみます。

def empirical_distribution_2(x, samples):
    return len([y for y in samples if y < x])/len(samples)


dminus = max([norm_frozen.cdf(x) - empirical_distribution_2(x, t_samples) for x in t_samples])
print(dminus)
# 0.04361195130340301

さて、必要だった統計量は$D^+_n$,$D^-_n$の最大値です。これをライブラリが計算してくれた統計量と比較してみましょう。一致しますね。

print("自分で計算したKS値:", max([dplus, dminus]))
# 自分で計算したKS値: 0.04361195130340301
print("scipyのKS値:", ks_vakue)
# scipyのKS値: 0.04361195130340301

さて、統計量がわかったら次はp値を計算します。ウィキペディアを見ると、この統計量は無限和を含むそこそこ厄介な確率分布に従うことが知られているそうです。
これからp値を求めるのは流石に手間なのと、幸い、scipyに実装があるので(scipyの検証にscipyを使うのは不本意ですが)ここは甘えようと思います。

非常にありがたいことに、kstwoksoneという両側検定、片側検定に対応してそれぞれ分布関数を用意してくれています。

統計量は求まっていますし、分布関数もあるのでp値はすぐ出せます。

print(kstwo.sf(max([dplus, dminus]), n=1000))
# 0.04325168699194537

# 参考 kstestから取得したp値
print(p_value)
# 0.04325168699194537

最後、少し妥協しましたが、追々kstwo分布についても自分でスクラッチ実装して検証しておこうと思います。

今回の検証でコルモゴロフ–スミルノフ検定についてだいぶ理解が深まったので、これからバシバシ使っていこうと思います。

1標本だけでなく2標本でそれぞれの分布が等しいかどうかを検定するって使い方もできるので、次の記事はそれを取り上げようかなと思っています。

boto3で Amazon S3 のファイルを直接読み書きする

以前の記事で、S3にディスク上のファイルをアップロードしたり逆にS3からディスクにダウンロードしたりする方法を紹介しました。
参考: boto3でS3のファイル操作

ただ、実際に使っているとディスクを経由せずにS3のファイルを読み込んでそのままプログラムで処理したかったり、結果を直接S3に書き込みたい場面もあります。その方法をまだ書いてなかったのでまとめておきます。

正直に言うと、本当にやりたかったのはS3上のテキストファイルに直接どんどんテキストを追記していく操作だったのですが、どうやらS3はオブジェクトの出し入れにしか対応しておらず、追記などの編集はできないそうです。結局やりたかったことはできなかったのですが、この方法を探すためにドキュメントを読み込んだんのでその時間を無駄にしないようにしようと言う記事です。

読んだドキュメントはこれです。putが書き込み、getが読み込み
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.put
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.get

さて、見ていきましょう。boto3はS3に関してはresource APIも用意されているのでこちらを使います。

まず、S3への書き込みです。これは、S3上のオブジェクトを取得し、オブジェクトputメソッドを使います。オブジェクトが存在しない状態でもkeyを取得できる、ってことと、putするときにデータはバイナリにエンコードしておかないといけないと言う2点がつまずきポイントでしょうか。また、上で追記はできないって書いてますが、既存のオブジェクトのキーを指定しまうとファイルが丸ごと上書きされていまいます。その点も注意しましょう。

import boto3


text = "書き込みたいテキストの内容"
data = text.encode("utf-8")  # エンコードしてバイナリデータにする
# もちろん、 data = "書き込みたいテキスト".encode("utf-8")でも良い。

# 書き込み先のバケットと、オブジェクトキーを設定。
bucket_name = "{バケット名}"
key = "{書き込み先ファイル名}"  # hogehoge.text など。

s3 = boto3.resource("s3")
# オブジェクトを取得
s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=data)  # resultに書き込み結果のステータスコードなどの辞書が戻る

これで、バケットにデータが作成されます。「s3.Object(bucket_name, key)」としてオブジェクトを一発で取ってますが、これはまずバケットを指定して、その後、そのバケット内のオブジェクトを指定しても良いです。一つのバケットに何ファイルも書き込む場合などに使えるかもしれません。以降に紹介するgetでも状況は同様です。

# 以下の二行で、 s3_object = s3.Object(bucket_name, key) と同じ
s3_bucket = s3.Bucket(bucket_name)
s3_object = s3_bucket.Object(key)

PandasのデータフレームをS3に保存したい場合などは、to_csvと組み合わせると良いでしょう。これファイル名のところにNoneを渡せばファイルに書き込まずにCSVのテキストを返してくれます。それをエンコードして書き込んだらOKです。

df = pd.DataFrame({
    "id": [0, 1, 2],
    "text": ["あいうえお", "かきくけこ", "さしすせそ"],
})
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=df.to_csv(None, index=None).encode("utf-8"))

次は読み込みです。これはオブジェクトのメソッド、getを利用します。
さっき書き込んだCSVファイル読み込んでみましょうかね。結果は辞書で返ってきますが、key=”Body”に対応しているのが欲しいデータです。

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
s3_object.get()["Body"]
# <botocore.response.StreamingBody at 0x1179bc910>

StreamingBody なる型で返ってきました。これ、with openして 通常のファイルを読み込むときと同じように read()のメソッドを持っているので、それを使えます。また、読み込んだデータはバイナリ型なので文字列に戻すならdecodeが必要です。

csv_text = s3_object.get()["Body"].read().decode("utf-8")
print(csv_text)
"""
id,text
0,あいうえお
1,かきくけこ
2,さしすせそ
"""

元のファイルがテキストファイルであればこれで読み込み完了ですね。

ちなみに、元々DataFrameを書き込んだものなので、DataFrameに読み込みたいと言うケースもあると思います。その場合、テキストに変換を終えたデータではなく、StreamingBodyを使います。つまりこうです。

df = pd.read_csv(s3_object.get()["Body"])
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

これで、ディスクを経由することなくメモリ上のテキストやデータフレームをS3に書き込んだり逆にS3からメモリに読み込んだりできるようになりました。

最初にやりたいと言ってた追記なんですが、これはもう一度読み込んでテキストなりデータフレームなりに新しいデータを追加して改めて書き込むしか無さそうです。

ipysankeywidgetでサンキーダイアグラム

詳細はWikipediaに譲りますが、サンキーダイアグラムっていうデータの可視化手法があります。
参考: サンキー ダイアグラム – Wikipedia

Webサービスにおけるユーザーの動線可視化とか、コンバージョンに至るまでの途中の離脱状況とか分析するのに便利なのですが、適したツールがなかなか見つからずあまり利用することがありませんでした。Tableauでやるには非常に面倒な手順を踏む必要がありましたし、matplotlibで作ると見た目がカッコ悪いものになりがちです。

何か良いツールはないかと思っていて、JavaScriptのライブラリなども含めて調べていたのですが、jupyterで使える ipysankeywidget というのが良さそうだったので紹介します。

まず、導入方法はこちらです。pip だけではなく jupyter notebook / jupyter lab それぞれ対応したコマンドが必要なので注意してください。
参考: https://github.com/ricklupton/ipysankeywidget

# pip インストールの場合、以下のコマンドでイストールした後に以降の設定コマンド実行
$ pip install ipysankeywidget

# notebookの場合は次の2つ
$ jupyter nbextension enable --py --sys-prefix ipysankeywidget
$ jupyter nbextension enable --py --sys-prefix widgetsnbextension

# lab の場合は次の1つ
$ jupyter labextension install jupyter-sankey-widget @jupyter-widgets/jupyterlab-manager

# condaの場合はインストールだけで設定まで完了する。
$ conda install -c conda-forge ipysankeywidget 

ドキュメントは、d3-sankey-diagram のドキュメントを見ろって書いてありますね。これはJavaScriptのライブラリです。 ipysankeywidget はそれをラップしたもののようです。

このライブラリ自体のドキュメントがかなり貧弱ですが、サンプルのnoteboookが4つ用意されています。こちらを一通り試しておけば細かい設定のを除いて問題なく使えるようになるでしょう。より詳しくはd3のドキュメントを見に行ったほうがよさそうです。
参考: ipysankeywidget/examples/README.md

基本的な使い方は、 どこから(source)、どこへ(target)、どのくらいの量の(value)流れを描写するかのdictの配列を用意し、それを渡すだけです。auto_save_png というメソッドを使うと画像に書き出すこともできます。

from ipysankeywidget import SankeyWidget


links = [
    {'source': 'A', 'target': 'B', 'value': 2},
    {'source': 'B', 'target': 'C', 'value': 2},
    {'source': 'D', 'target': 'B', 'value': 2},
    {'source': 'B', 'target': 'D', 'value': 2},
]

sanky = SankeyWidget(links=links)
sanky  # jupyter notebook上に表示される
# sanky.auto_save_png("simple-sanky.png")  # ファイルに保存する場合

出力がこちらです。

簡単でしたね。

あとは order 引数でnodeの順番を指定したり、groupでまとめたり、 linkの各dictにtype属性を付与して色を塗り分けたりと、細かい補正が色々できます。

結構面白いので是非試してみてください。

tqdmを使ってプログレスバーを表示する

以前、こんな記事を書きました。
参考: printでお手軽プログレスバー

そして、自分ではどこかでライブラリを使ってプログレスバーを表示する普通の方法も紹介済みだったつもりだったのですが、探すとまだ書いてなかったので書いておきます。

ipywidgets にも実はプログレスバー用のウィジェット(IntProgress/ FloatProgress)が存在し、これを使うこともできますが、プログレスバーには専用のライブラリも存在し、そちらの方が手軽に使えるので今回はそれを紹介します。

それがタイトルに書いてるtqdmです。
ドキュメント: tqdm documentation

使い方は簡単で、for文等で逐次処理するリストやイテレーターを tqdm.tqdm でラップするだけです。 tqdmを2回書くの嫌なので「from tqdm import tqdm」とインポートすると良いでしょう。
time.sleepをダミー処理としてやってみましょう。

from tqdm import tqdm
import time


for i in tqdm(range(1000)):
    time.sleep(0.01)

# 以下が出力されるプログレスバー。
100%|██████████| 1000/1000 [00:10<00:00, 97.60it/s]

内包表記でも使えます。

square_numbers = [i**2 for i in tqdm(range(100))]
# 以下出力
100%|██████████| 100/100 [00:00<00:00, 242445.32it/s]

単純にリストを周回させるだけでなく、enumerate や zip を使うこともあると思います。
この場合、これらの関数の外側にtqdmをつけると、一応進捗を数字で出してはくれるのですが、進捗のバーが出ません。

list_a = list("abcdefghij")
for i, s in tqdm(enumerate(list_a)):
    print(i, s)
​
# 以下出力
10it [00:00, 16757.11it/s]
0 a
1 b
2 c
3 d
4 e
5 f
6 g
7 h
8 i
9 j

この場合、少しコツがあって、enumerateやzipの内側にtqdmを使うと良いです。

for i, s in enumerate(tqdm(list_a)):
    print(i, s)
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 20301.57it/s]
0 a
1 b
2 c
3 d
4 e
5 f
6 g
7 h
8 i
9 j

# zipの場合は内側のリストの一方を囲む。
for a, b in zip(tqdm(range(10)), range(10, 20)):
    print(a, b)
​
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 8607.23it/s]
0 10
1 11
2 12
3 13
4 14
5 15
6 16
7 17
8 18
9 19

どうやら、enumerate や zipの外側にtqdm を置くと、事前にイテレーションの回数が取得できず、進捗率が計算できないのが原因みたいです。

僕はPandasのデータフレームを扱うことが多いので、進捗を表示したくなるのももっぱらPandasのデータを処理しているときです。データフレームの iterrows() メソッドもこの enumerate や zipと同様に事前にデータ件数を取得できないらしく、普通に使うとバーや進捗率が出ません。

import numpy as np
import pandas as pd


df = pd.DataFrame(
    {
        "col1": np.random.randint(10, size=10000),
        "col2": np.random.randn(10000),
    }
)

for i, row in tqdm(df.iterrows()):
    pass

# 以下出力
10000it [00:00, 28429.70it/s]

この場合は、total引数で明示的にデータ件数を渡すのが有効です。ちなみにこれはenumerateなどでも使えるテクニックです。

for i, row in tqdm(df.iterrows(), total=len(df)):
    pass

# 以下出力
100%|██████████| 10000/10000 [00:00<00:00, 26819.34it/s]

さらに、pandasの applyでもプログレスバーを使うことができます。
テキストの前処理など地味に時間のかかる処理をapplyでやることが多いのでこれはありがたいですね。使い方は少しトリッキーで、まず、 「tqdm.pandas()」を実行します。

すると、pandasのデータが、progress_apply や、 progress_applymap などのメソッドを持つようになるので、これを実行します。

tqdm.pandas()
df["col2"].progress_apply(np.sin)
# 以下出力
100%|██████████| 10000/10000 [00:00<00:00, 241031.18it/s]
# 結果略

groupbyに対応したprogress_aggregateもありますよ。

df.groupby("col1").progress_aggregate(sum)
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 803.51it/s]
# 結果略

あとは滅多に使わないのですが、for文ではなく、事前にループ回数が決まっていないループで使う方法を書いておきます。
下のように、 with でインスタンスを作って、明示的にupdateとしていきます。あらかじめのループ回数を渡していないのでバーや進捗率は出ませんが現在の実行回数が観れるので一応進捗が確認できます。

i = 1
with tqdm() as pbar:
    while True:
        pbar.update(1)
        i += 1
        # 無限ループ防止
        if i>100:
            break

# 以下出力
100it [00:00, 226229.99it/s]

これ以外にも、 leave=False を指定して、処理が終わったらプログレスバーを消すとか、
desc/ postfix で前後に説明文を書くとか、数字の単位や進捗の更新頻度など細かい設定がたくさんできます。
必要に応じてドキュメントを参照して使ってみてください。

sshtunnel を使って踏み台サーバー経由でDB接続

以前、PyMySQLを使って、Amazon RDSを利用する方法を記事にしました。
参考: PythonでAuroraを操作する(PyMySQLを使う方法)

DBに直接接続できる場合はこれで良いのですが、場合によっては踏み台となるサーバーを経由して接続しなければならないことがあります。

僕の場合は職場ではセキュリティ上の理由から分析用のDBの一つがローカルから直接接続できないようになっていますし、プライベートではAurora Serverless v1使っているので、これはAWS内のリソース経由でしか接続できません。

ということで、Pythonで踏み台経由してAWSに接続する方法を書いていきます。
実はこれまで人からもらったコードをそのまま使っていたのですが、この記事書くために改めてsshtunnel のドキュメントを読んで仕組みを理解しました。

参考: Welcome to sshtunnel’s documentation! — sshtunnel 0.4.0 documentation

さて、さっそくやっていきましょう。セキュリティ的に接続情報はブログに書くわけにいかないので、以下の変数に入ってるものとします。

あと、サンプルなので実行したいSQL文も sql って変数に入ってるものとします。

サーバーのネットワーク設定ですが、踏み台はSSHのポート(通常は22番)、RDSはDBの接続ポート(通常は3306番)を開けておいてください。以降のコードで出てくる9999番ポートは、ローカル端末のポートなので踏み台やDBのサーバーでは開けておかなくて良いです。

# DBの接続情報 (RDSを想定)
db_host = "{DBのエンドポイント}"  # xxxx.rds.amazon.com みたいなの
db_port = 3306  # DBのポート(デフォルトから変更している場合は要修正)
db_name = "{データベース名}"
db_user = "{DBに接続するユーザー名}"
db_pass = "{DBに接続するユーザーのパスワード}"

# 踏み台サーバーの接続情報 (EC2を想定)
ssh_ip = "{サーバーのIPアドレス}"
ssh_user = "{SSH接続するユーザー名}"  # EC2のデフォルトであれば ec2-user
ssh_port = 22  # SS接続するポート(デオフォルトから変更している場合は要修正)
ssh_pkey = "{秘密鍵ファイルの配置パス}"  # .pem ファイルのパス

sql = "{実行したいSQL文}"

さて、さっそく行ってみましょう。単発で1個だけSQLを打って結果を取得したい、という場合、以下のコードで実行できます。
ローカル(手元のPCやMac)の 9999 番ポート (これは他で使ってなければ何番でもいい)への通信が、踏み台サーバーを経由してRDSに届くようになります。

from sshtunnel import SSHTunnelForwarder
from pymysql import cursors
from pymysql import connect


with SSHTunnelForwarder(
    ssh_address_or_host=(ssh_ip, ssh_port),  # 踏み台にするサーバーのIP/SSHポート
    ssh_username=ssh_user,  # SSHでログインするユーザー
    ssh_pkey=ssh_pkey,  # SSHの認証に使う秘密鍵
    remote_bind_address=(db_host, db_port),  # 踏み台を経由して接続したいDBのホスト名とポート
    local_bind_address=("localhost", 9999),  # バインドするローカル端末のホスト名とポート
) as tunnel:
    with connect(
            host="localhost",  # DBのエンドポイントではなく、ローカルの端末を指定する
            port=9999,  # これもDBのポートでは無く、バインドしたポート番号を指定する
            user=db_user,  # これ以下は普通にDB接続する場合と同じ引数
            password=db_pass,
            database=db_name,
            charset="utf8mb4",
            autocommit=True,
            cursorclass=cursors.DictCursor,
    ).cursor() as cursor:
        cursor.execute(sql)  # これでSQL実行
        rows = cursor.fetchall()  # 結果の取り出し

これで、通常はローカルからはアクセスできないDBへSQLを発行し、結果を変数rowsに取得することができました。SELECT文を打ったのであればpandasのDataFrame等に変換して使いましょう。

with文で変数をたくさん呼び出すインスタンスを使うのはコードの見栄えが非常に悪くなりますが、以下のように変数を事前に辞書にまとめておくと少しマシになります。

ssh_args = {
    "ssh_address_or_host": (ssh_ip, ssh_port),
    "ssh_username": ssh_user,
    "ssh_pkey": ssh_pkey,
    "remote_bind_address": (db_host, db_port),
    "local_bind_address": ("localhost", 9999),
}

db_args = {
    "host": "localhost",
    "port":  9999,
    "user":  db_user,
    "password":  db_pass,
    "database":  db_name,
    "charset":  "utf8mb4",
    "autocommit":  True,
    "cursorclass":  cursors.DictCursor,
}

with SSHTunnelForwarder(**ssh_args) as tunnel:
    with connect(**db_args).cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()

以上で、一応やりたいことはできると思いますが、発行したいSQLが複数ある場合かつ途中に別の処理も含むような場合一回ごとにポートフォワードとDBの接続をやり直していたらリソースの無駄です。(といっても、最近のコンピューター環境ならこれがストレスになる程時間かかるってことはないと思いますが。)

DBヘ接続しっぱなしにしておく場合は、withを使わずに次のように書きます。引数は上のコード例で作った、ssh_args, db_args をそのまま使います。

server = SSHTunnelForwarder(**ssh_args)
server.start()  # ポートフォワード開始

connection = connect(**db_args)  # DB接続
# 以上の3行で DBに接続した状態になる。

# 以下のようにして接続を使ってSQLを実行する。
with connection.cursor() as cursor:
    cursor.execute(sql)
    rows = cursor.fetchall()

# サンプルコードなのでSQLを1回しかやってないけど、続けて複数実行できる。

# 終わったらDB接続とポートフォワードをそれぞれ閉じる
connection.close()
server.stop()

これで、一つの接続を使い回すこともできるようになりました。

ちなみにですが、このsshtunnelで作ったポートフォワードの設定は端末単位で有効です。どういうことかというと、複数のPythonプロセス(例えば別々のJupyter notebook)間で、共有することができます。というより、Pythonに限らず他のプログラムからも使えます。
コンソールで、以下のコマンド使ってポートの動きを見ながら試すとよくわかります。

# 9999番ポートの利用状況を確認する
$ sudo lsof -i:9999

普段は何も結果が返ってこないか、ここまでのプログラムを実行してたらいろんな情報と共に(CLOSED)が返ってくると思いますが、ポートフォワードしている最中はESTABLISHEDになっていて、pythonが使っていることが確認できます。

特にPythonでDB操作したいという場合に限って言えば、別々のnotebookで操作するメリットなんて無いのですが、全く別の用途でポートフォワードだけPythonでやっておきたい、ということはあるかもしれないので、覚えておくと使う機会があるかもしれません。