2標本コルモゴロフ–スミルノフ検定を試す

前回の記事の続きです。
参考: 1標本コルモゴロフ–スミルノフ検定について理解する

コルモゴロフ-スミルノフ検定(K-S検定)は、一つの標本が何かしらの確率分布に従っているかどうかだけではなく、二つの標本について、それらが同一の確率分布から生成されたものかどうかの判定にも利用可能です。

Wikipediaを見ると、KS検定は1標本より2標本の時の利用の方が推されてますね。2標本それぞれの確率分布が不明となるとノンパラメトリックな手法が有効になるのでしょう。

前の記事みたいに数式をつらつらと書いていこうかと思ったのですが、1標本との違いは、統計量を計算するときに、経験分布と検定対象の累積分布関数の差分の上限(sup)を取るのではなく、その二つの標本それぞれの経験分布に対して、差分の上限の上限(sup)を取るというだけです。KS検定について調べるような人でこの説明でわからないという人もいないと思うので、数式等は省略します。(前回の記事を参照してください。)

便利な特徴としては、2標本のそれぞれのサイズは等しくなくても使える点ですね。
ただ、色々試した結果、それなりにサンプルサイズが大きくないとあまり使い物にならなさそうです。

この記事では、Pythonで実際に検定をやってみます。
前回のt分布からの標本と正規分布は、標準偏差が違うとはいえ流石に似すぎだったので、今回はちょっと変えて、カイ2乗分布と、正規分布を使います。
カイ2乗分布の自由度は4とし、すると期待値4、分散8になるので、正規分布の方もそれに揃えます。期待値や分散が違うならt検定やF検定で十分ですからね。

とりあえず分布関数を作り、可視化して見比べておきます。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import ks_2samp
from scipy.stats import norm
from scipy.stats import chi2


chi2_frozen = chi2.freeze(df=4)  # 自由度4のカイ2乗分布 (期待値4, 分散8)
norm_frozen = norm.freeze(loc=4, scale=8**0.5)  # 正規分布 (期待値4, 分散8)

# 確率密度関数を可視化
xticks = np.linspace(-10, 20, 301)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, chi2_frozen.pdf(xticks), label="カイ2乗分布")
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.legend()
plt.show()

出力はこちら。これはちゃんと検定で見分けて欲しいですね。

続いて、それぞれの分布から標本を取ります。サンプルサイズが違っても使える手法なので、意図的にサンプルサイズ変えました。

# それぞれの分布から標本を生成
chi2_samples = chi2_frozen.rvs(size=200, random_state=0)  # カイ2乗分布からの標本
norm_samples = norm_frozen.rvs(size=150, random_state=0)  # 正規分布からの標本

さて、これでデータが取れたので、2標本KS検定やっていきます。
非常に簡単で、ks_2samp ってメソッドに渡すだけです。
ドキュメント: scipy.stats.ks_2samp — SciPy v1.9.0 Manual

例によって、alternative 引数で両側検定/片側検定などを指定できますが、今回両側でやるので何も指定せずにデフォルトのまま使います。

帰無仮説は「二つの標本が従う確率分布は等しい」です。有意水準は0.05とします。

結果がこちら。

print(ks_2samp(chi2_samples, norm_samples))
# KstestResult(statistic=0.17, pvalue=0.012637307799274388)

統計量とp値が表示されました。p値が0.05を下回りましたので、帰無仮説を棄却し、二つの標本が従う確率分布は異なっていると判断します。

サンプルサイズさえそこそこ確保できれば、非常に手軽に使えて便利な手法だと思いました。

あとは個人的には、KS検定って5段階や10段階評価のような離散分布でも使えるのかどうか気になっています。まぁ、5段階の場合は単純にカイ2乗検定しても良いのですが、10段階以上になってくると自由度が高くてカイ2乗検定があまり使いやすくないので。何か情報がないか追加で探したり、自分でシミュレーションしたりしてこの辺をもう少し調べようと思います。

1標本コルモゴロフ–スミルノフ検定について理解する

何らかのデータ(標本)が、特定の確率分布に従ってるかどうかを検定したいことって頻繁にあると思います。そのような場合に使えるコルモゴロフ–スミルノフ検定(Kolmogorov–Smirnov test, KS検定)という手法があるのでそれを紹介します。

取り上げられている書籍を探したのですが、手元に見当たらなかったので説明はWikipediaを参照しました。
参考: コルモゴロフ–スミルノフ検定 – Wikipedia

1標本と、特定の確率分布についてその標本が対象の確率分布に従っていることを帰無仮説として検定を行います。

この記事では、scipyを使った実装と、それを理解するための検証をやっていきたいと思います。

とりあえずこの記事で使うライブラリをインポートし、データを準備します。
標本は、自由度5のt分布からサンプリングし、それが正規分布に従ってないことを検定で見ていきましょう。scipyのパラメータはどちらの分布もloc=10, scale=10 としました。

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import kstest
from scipy.stats import kstwo
from scipy.stats import t
from scipy.stats import norm


norm_frozen = norm.freeze(loc=10, scale=10)  # 検定する分布
t_frozen = t.freeze(df=5, loc=10, scale=10)  # 標本をサンプリングする分布
t_samples = t_frozen.rvs(size=1000, random_state=0)  # 標本を作成

# 各データを可視化
xticks = np.linspace(-40, 60, 1001)
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks, norm_frozen.pdf(xticks), label="正規分布")
ax.plot(xticks, t_frozen.pdf(xticks), label="t分布")
ax.hist(t_samples, density=True, bins=50, alpha=0.5, label="t分布からの標本")
ax.legend()
plt.show()

作成された図がこちらです。流石にt分布と正規分布は似ていますね。ぱっと見でこの標本が正規分布に従ってないと言い切るのは難しそうです。

それでは、早速scipyで(1標本)KS検定をやっていきましょう。これは専用の関数が用意されているので非常に簡単です。
参考: scipy.stats.kstest — SciPy v1.9.0 Manual

rvs引数に標本データ、cdf引数に検定したい分布の累積分布関数を指定してあげれば大丈夫です。alternative で両側検定、片側検定などを指定できます。今回は両側で行きます。有意水準は0.05としましょう。

ks_result = kstest(rvs=t_sample, cdf=norm_frozen.cdf, alternative="two-sided")

print(ks_result)
# KstestResult(statistic=0.04361195130340301, pvalue=0.04325168699194537)

# 統計量と引数を変数に入れておく
ks_value = ks_result.statistic
p_value = ks_result.pvalue

はい、p値が約0.043 で0.05を下回ったので、この標本が正規分布にし違うという帰無仮説は棄却されましたね。

ちなみに、標本をサンプリングした元のt分布でやると棄却されません。これも想定通りの挙動ですね。

print( kstest(rvs=t_sample, cdf=t_frozen.cdf, alternative="two-sided"))
# KstestResult(statistic=0.019824350810698554, pvalue=0.8191867386190135)

一点注意ですが、どのような仮説検定でもそうである通り、帰無仮説が棄却されなかったからと言って正しいことが証明されたわけではありません。(帰無仮説は採択されない。)
特にKS検定については、標本サイズを変えて何度も試してみたのですが、標本サイズが小さい時は誤った帰無仮説を棄却できないことがかなりあるようです。

さて、scipyで実行してみて、この検定が使えるようになったのですが、より理解を深めるために、自分で統計量を計算してみようと思います。

KS検定の統計量の計算には、経験分布というものを使います。要するに標本から作成した累積分布関数ですね。数式として書くと標本$y_1, y_2,\dots, y_n$に対する経験分布$F_n$は次のようになります。

$$F_n(x) = \frac{\#\{1\le i\le n | y_i \le n\}}{n}$$

要するに$x$以下の標本を数えて標本のサイズ$n$で割ってるだけです。

そして、検定したい分布の分布関数$F$とこの$F_n$に対して、KS検定の統計量は次のように計算されます。

$$\begin{align}
D^{+}_n &= \sup_x\left(F_n(x)-F(x)\right)\\
D^{-}_n &= \sup_x\left(F(x)-F_n(x)\right)\\
D_n &= \max(D^{+}_n, D^{-}_n)
\end{align}$$

max(最大値)ではなく、sup(上限)で定義されているのがポイントですね。
とはいえ、分布関数の方が一般的な十分滑らかな関数であれば、$D^{+}_n$の方はmaxでもsupでも同じですが,$D^{-}_n$の方はsupが効いてきます。

この$D_n$の直感的なイメージを説明すると、$F$と$F_n$の差が一番大きくなるところの値を取ってくることになるでしょうか。

経験分布の方を実装して可視化してみます。

# 経験分布関数
def empirical_distribution(x, samples):
    return len([y for y in samples if y <= x])/len(samples)


xticks_2 = np.linspace(-40, 60, 100001)  # メモリを細かめに取る
# 経験分布関数の値
empirical_cdf = np.array([empirical_distribution(x, t_samples) for x in xticks_2])

# 可視化
fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.plot(xticks_2, norm_frozen.cdf(xticks_2), label="正規分布")
ax.plot(xticks_2, empirical_cdf, label="経験分布")
ax.legend()
plt.show()

出てきた図がこちらです。標本サイズが大きいのでよくみないとわかりませんが、オレンジの方は階段状の関数になっています。

この、青線(検定する正規分布)とオレンジの線(標本からの経験分布)が一番離れたところの差を統計量にしましょう、というのが考え方です。

では$D^+_n$から計算しましょう。実は(今回の例では)青線が滑らかなのに対して、オレンジの線が階段状になっているので、この段が上がるポイント、つまり標本が存在した点での値だけ調べると$D^+_n$が計算できます。

dplus = max([empirical_distribution(x, t_samples) - norm_frozen.cdf(x) for x in t_samples])
print(dplus)
# 0.031236203108694176

次に$D^-_n$の方ですが、これは少し厄介です。サンプルが存在する点ではなく、その近傍を調べて階段の根元の値と累積分布関数の差を取りたいんですよね(ここでmaxではくsupが採用されていた意味が出てくる)。1e-10くらいの極小の定数を使って計算することも考えましたが、経験分布関数の方を少しいじって計算してみることにしました。どういうことかというと、x以下の要素の割合ではなくx未満の要素の割合を返すようにします。これを使うとサンプルが存在した点については階段の根元の値が取れるようになるので、これを使って$D^+_n$と同様に計算してみます。

def empirical_distribution_2(x, samples):
    return len([y for y in samples if y < x])/len(samples)


dminus = max([norm_frozen.cdf(x) - empirical_distribution_2(x, t_samples) for x in t_samples])
print(dminus)
# 0.04361195130340301

さて、必要だった統計量は$D^+_n$,$D^-_n$の最大値です。これをライブラリが計算してくれた統計量と比較してみましょう。一致しますね。

print("自分で計算したKS値:", max([dplus, dminus]))
# 自分で計算したKS値: 0.04361195130340301
print("scipyのKS値:", ks_vakue)
# scipyのKS値: 0.04361195130340301

さて、統計量がわかったら次はp値を計算します。ウィキペディアを見ると、この統計量は無限和を含むそこそこ厄介な確率分布に従うことが知られているそうです。
これからp値を求めるのは流石に手間なのと、幸い、scipyに実装があるので(scipyの検証にscipyを使うのは不本意ですが)ここは甘えようと思います。

非常にありがたいことに、kstwoksoneという両側検定、片側検定に対応してそれぞれ分布関数を用意してくれています。

統計量は求まっていますし、分布関数もあるのでp値はすぐ出せます。

print(kstwo.sf(max([dplus, dminus]), n=1000))
# 0.04325168699194537

# 参考 kstestから取得したp値
print(p_value)
# 0.04325168699194537

最後、少し妥協しましたが、追々kstwo分布についても自分でスクラッチ実装して検証しておこうと思います。

今回の検証でコルモゴロフ–スミルノフ検定についてだいぶ理解が深まったので、これからバシバシ使っていこうと思います。

1標本だけでなく2標本でそれぞれの分布が等しいかどうかを検定するって使い方もできるので、次の記事はそれを取り上げようかなと思っています。

boto3で Amazon S3 のファイルを直接読み書きする

以前の記事で、S3にディスク上のファイルをアップロードしたり逆にS3からディスクにダウンロードしたりする方法を紹介しました。
参考: boto3でS3のファイル操作

ただ、実際に使っているとディスクを経由せずにS3のファイルを読み込んでそのままプログラムで処理したかったり、結果を直接S3に書き込みたい場面もあります。その方法をまだ書いてなかったのでまとめておきます。

正直に言うと、本当にやりたかったのはS3上のテキストファイルに直接どんどんテキストを追記していく操作だったのですが、どうやらS3はオブジェクトの出し入れにしか対応しておらず、追記などの編集はできないそうです。結局やりたかったことはできなかったのですが、この方法を探すためにドキュメントを読み込んだんのでその時間を無駄にしないようにしようと言う記事です。

読んだドキュメントはこれです。putが書き込み、getが読み込み
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.put
S3 — Boto3 Docs 1.24.42 documentation の S3.Object.get

さて、見ていきましょう。boto3はS3に関してはresource APIも用意されているのでこちらを使います。

まず、S3への書き込みです。これは、S3上のオブジェクトを取得し、オブジェクトputメソッドを使います。オブジェクトが存在しない状態でもkeyを取得できる、ってことと、putするときにデータはバイナリにエンコードしておかないといけないと言う2点がつまずきポイントでしょうか。また、上で追記はできないって書いてますが、既存のオブジェクトのキーを指定しまうとファイルが丸ごと上書きされていまいます。その点も注意しましょう。

import boto3


text = "書き込みたいテキストの内容"
data = text.encode("utf-8")  # エンコードしてバイナリデータにする
# もちろん、 data = "書き込みたいテキスト".encode("utf-8")でも良い。

# 書き込み先のバケットと、オブジェクトキーを設定。
bucket_name = "{バケット名}"
key = "{書き込み先ファイル名}"  # hogehoge.text など。

s3 = boto3.resource("s3")
# オブジェクトを取得
s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=data)  # resultに書き込み結果のステータスコードなどの辞書が戻る

これで、バケットにデータが作成されます。「s3.Object(bucket_name, key)」としてオブジェクトを一発で取ってますが、これはまずバケットを指定して、その後、そのバケット内のオブジェクトを指定しても良いです。一つのバケットに何ファイルも書き込む場合などに使えるかもしれません。以降に紹介するgetでも状況は同様です。

# 以下の二行で、 s3_object = s3.Object(bucket_name, key) と同じ
s3_bucket = s3.Bucket(bucket_name)
s3_object = s3_bucket.Object(key)

PandasのデータフレームをS3に保存したい場合などは、to_csvと組み合わせると良いでしょう。これファイル名のところにNoneを渡せばファイルに書き込まずにCSVのテキストを返してくれます。それをエンコードして書き込んだらOKです。

df = pd.DataFrame({
    "id": [0, 1, 2],
    "text": ["あいうえお", "かきくけこ", "さしすせそ"],
})
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
result = s3_object.put(Body=df.to_csv(None, index=None).encode("utf-8"))

次は読み込みです。これはオブジェクトのメソッド、getを利用します。
さっき書き込んだCSVファイル読み込んでみましょうかね。結果は辞書で返ってきますが、key=”Body”に対応しているのが欲しいデータです。

bucket_name = "{バケット名}"
key = "df.csv"  # 保存先ファイル名

s3_object = s3.Object(bucket_name, key)
s3_object.get()["Body"]
# <botocore.response.StreamingBody at 0x1179bc910>

StreamingBody なる型で返ってきました。これ、with openして 通常のファイルを読み込むときと同じように read()のメソッドを持っているので、それを使えます。また、読み込んだデータはバイナリ型なので文字列に戻すならdecodeが必要です。

csv_text = s3_object.get()["Body"].read().decode("utf-8")
print(csv_text)
"""
id,text
0,あいうえお
1,かきくけこ
2,さしすせそ
"""

元のファイルがテキストファイルであればこれで読み込み完了ですね。

ちなみに、元々DataFrameを書き込んだものなので、DataFrameに読み込みたいと言うケースもあると思います。その場合、テキストに変換を終えたデータではなく、StreamingBodyを使います。つまりこうです。

df = pd.read_csv(s3_object.get()["Body"])
print(df)
"""
   id   text
0   0  あいうえお
1   1  かきくけこ
2   2  さしすせそ
"""

これで、ディスクを経由することなくメモリ上のテキストやデータフレームをS3に書き込んだり逆にS3からメモリに読み込んだりできるようになりました。

最初にやりたいと言ってた追記なんですが、これはもう一度読み込んでテキストなりデータフレームなりに新しいデータを追加して改めて書き込むしか無さそうです。

ipysankeywidgetでサンキーダイアグラム

詳細はWikipediaに譲りますが、サンキーダイアグラムっていうデータの可視化手法があります。
参考: サンキー ダイアグラム – Wikipedia

Webサービスにおけるユーザーの動線可視化とか、コンバージョンに至るまでの途中の離脱状況とか分析するのに便利なのですが、適したツールがなかなか見つからずあまり利用することがありませんでした。Tableauでやるには非常に面倒な手順を踏む必要がありましたし、matplotlibで作ると見た目がカッコ悪いものになりがちです。

何か良いツールはないかと思っていて、JavaScriptのライブラリなども含めて調べていたのですが、jupyterで使える ipysankeywidget というのが良さそうだったので紹介します。

まず、導入方法はこちらです。pip だけではなく jupyter notebook / jupyter lab それぞれ対応したコマンドが必要なので注意してください。
参考: https://github.com/ricklupton/ipysankeywidget

# pip インストールの場合、以下のコマンドでイストールした後に以降の設定コマンド実行
$ pip install ipysankeywidget

# notebookの場合は次の2つ
$ jupyter nbextension enable --py --sys-prefix ipysankeywidget
$ jupyter nbextension enable --py --sys-prefix widgetsnbextension

# lab の場合は次の1つ
$ jupyter labextension install jupyter-sankey-widget @jupyter-widgets/jupyterlab-manager

# condaの場合はインストールだけで設定まで完了する。
$ conda install -c conda-forge ipysankeywidget 

ドキュメントは、d3-sankey-diagram のドキュメントを見ろって書いてありますね。これはJavaScriptのライブラリです。 ipysankeywidget はそれをラップしたもののようです。

このライブラリ自体のドキュメントがかなり貧弱ですが、サンプルのnoteboookが4つ用意されています。こちらを一通り試しておけば細かい設定のを除いて問題なく使えるようになるでしょう。より詳しくはd3のドキュメントを見に行ったほうがよさそうです。
参考: ipysankeywidget/examples/README.md

基本的な使い方は、 どこから(source)、どこへ(target)、どのくらいの量の(value)流れを描写するかのdictの配列を用意し、それを渡すだけです。auto_save_png というメソッドを使うと画像に書き出すこともできます。

from ipysankeywidget import SankeyWidget


links = [
    {'source': 'A', 'target': 'B', 'value': 2},
    {'source': 'B', 'target': 'C', 'value': 2},
    {'source': 'D', 'target': 'B', 'value': 2},
    {'source': 'B', 'target': 'D', 'value': 2},
]

sanky = SankeyWidget(links=links)
sanky  # jupyter notebook上に表示される
# sanky.auto_save_png("simple-sanky.png")  # ファイルに保存する場合

出力がこちらです。

簡単でしたね。

あとは order 引数でnodeの順番を指定したり、groupでまとめたり、 linkの各dictにtype属性を付与して色を塗り分けたりと、細かい補正が色々できます。

結構面白いので是非試してみてください。

tqdmを使ってプログレスバーを表示する

以前、こんな記事を書きました。
参考: printでお手軽プログレスバー

そして、自分ではどこかでライブラリを使ってプログレスバーを表示する普通の方法も紹介済みだったつもりだったのですが、探すとまだ書いてなかったので書いておきます。

ipywidgets にも実はプログレスバー用のウィジェット(IntProgress/ FloatProgress)が存在し、これを使うこともできますが、プログレスバーには専用のライブラリも存在し、そちらの方が手軽に使えるので今回はそれを紹介します。

それがタイトルに書いてるtqdmです。
ドキュメント: tqdm documentation

使い方は簡単で、for文等で逐次処理するリストやイテレーターを tqdm.tqdm でラップするだけです。 tqdmを2回書くの嫌なので「from tqdm import tqdm」とインポートすると良いでしょう。
time.sleepをダミー処理としてやってみましょう。

from tqdm import tqdm
import time


for i in tqdm(range(1000)):
    time.sleep(0.01)

# 以下が出力されるプログレスバー。
100%|██████████| 1000/1000 [00:10<00:00, 97.60it/s]

内包表記でも使えます。

square_numbers = [i**2 for i in tqdm(range(100))]
# 以下出力
100%|██████████| 100/100 [00:00<00:00, 242445.32it/s]

単純にリストを周回させるだけでなく、enumerate や zip を使うこともあると思います。
この場合、これらの関数の外側にtqdmをつけると、一応進捗を数字で出してはくれるのですが、進捗のバーが出ません。

list_a = list("abcdefghij")
for i, s in tqdm(enumerate(list_a)):
    print(i, s)
​
# 以下出力
10it [00:00, 16757.11it/s]
0 a
1 b
2 c
3 d
4 e
5 f
6 g
7 h
8 i
9 j

この場合、少しコツがあって、enumerateやzipの内側にtqdmを使うと良いです。

for i, s in enumerate(tqdm(list_a)):
    print(i, s)
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 20301.57it/s]
0 a
1 b
2 c
3 d
4 e
5 f
6 g
7 h
8 i
9 j

# zipの場合は内側のリストの一方を囲む。
for a, b in zip(tqdm(range(10)), range(10, 20)):
    print(a, b)
​
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 8607.23it/s]
0 10
1 11
2 12
3 13
4 14
5 15
6 16
7 17
8 18
9 19

どうやら、enumerate や zipの外側にtqdm を置くと、事前にイテレーションの回数が取得できず、進捗率が計算できないのが原因みたいです。

僕はPandasのデータフレームを扱うことが多いので、進捗を表示したくなるのももっぱらPandasのデータを処理しているときです。データフレームの iterrows() メソッドもこの enumerate や zipと同様に事前にデータ件数を取得できないらしく、普通に使うとバーや進捗率が出ません。

import numpy as np
import pandas as pd


df = pd.DataFrame(
    {
        "col1": np.random.randint(10, size=10000),
        "col2": np.random.randn(10000),
    }
)

for i, row in tqdm(df.iterrows()):
    pass

# 以下出力
10000it [00:00, 28429.70it/s]

この場合は、total引数で明示的にデータ件数を渡すのが有効です。ちなみにこれはenumerateなどでも使えるテクニックです。

for i, row in tqdm(df.iterrows(), total=len(df)):
    pass

# 以下出力
100%|██████████| 10000/10000 [00:00<00:00, 26819.34it/s]

さらに、pandasの applyでもプログレスバーを使うことができます。
テキストの前処理など地味に時間のかかる処理をapplyでやることが多いのでこれはありがたいですね。使い方は少しトリッキーで、まず、 「tqdm.pandas()」を実行します。

すると、pandasのデータが、progress_apply や、 progress_applymap などのメソッドを持つようになるので、これを実行します。

tqdm.pandas()
df["col2"].progress_apply(np.sin)
# 以下出力
100%|██████████| 10000/10000 [00:00<00:00, 241031.18it/s]
# 結果略

groupbyに対応したprogress_aggregateもありますよ。

df.groupby("col1").progress_aggregate(sum)
# 以下出力
100%|██████████| 10/10 [00:00<00:00, 803.51it/s]
# 結果略

あとは滅多に使わないのですが、for文ではなく、事前にループ回数が決まっていないループで使う方法を書いておきます。
下のように、 with でインスタンスを作って、明示的にupdateとしていきます。あらかじめのループ回数を渡していないのでバーや進捗率は出ませんが現在の実行回数が観れるので一応進捗が確認できます。

i = 1
with tqdm() as pbar:
    while True:
        pbar.update(1)
        i += 1
        # 無限ループ防止
        if i>100:
            break

# 以下出力
100it [00:00, 226229.99it/s]

これ以外にも、 leave=False を指定して、処理が終わったらプログレスバーを消すとか、
desc/ postfix で前後に説明文を書くとか、数字の単位や進捗の更新頻度など細かい設定がたくさんできます。
必要に応じてドキュメントを参照して使ってみてください。

sshtunnel を使って踏み台サーバー経由でDB接続

以前、PyMySQLを使って、Amazon RDSを利用する方法を記事にしました。
参考: PythonでAuroraを操作する(PyMySQLを使う方法)

DBに直接接続できる場合はこれで良いのですが、場合によっては踏み台となるサーバーを経由して接続しなければならないことがあります。

僕の場合は職場ではセキュリティ上の理由から分析用のDBの一つがローカルから直接接続できないようになっていますし、プライベートではAurora Serverless v1使っているので、これはAWS内のリソース経由でしか接続できません。

ということで、Pythonで踏み台経由してAWSに接続する方法を書いていきます。
実はこれまで人からもらったコードをそのまま使っていたのですが、この記事書くために改めてsshtunnel のドキュメントを読んで仕組みを理解しました。

参考: Welcome to sshtunnel’s documentation! — sshtunnel 0.4.0 documentation

さて、さっそくやっていきましょう。セキュリティ的に接続情報はブログに書くわけにいかないので、以下の変数に入ってるものとします。

あと、サンプルなので実行したいSQL文も sql って変数に入ってるものとします。

サーバーのネットワーク設定ですが、踏み台はSSHのポート(通常は22番)、RDSはDBの接続ポート(通常は3306番)を開けておいてください。以降のコードで出てくる9999番ポートは、ローカル端末のポートなので踏み台やDBのサーバーでは開けておかなくて良いです。

# DBの接続情報 (RDSを想定)
db_host = "{DBのエンドポイント}"  # xxxx.rds.amazon.com みたいなの
db_port = 3306  # DBのポート(デフォルトから変更している場合は要修正)
db_name = "{データベース名}"
db_user = "{DBに接続するユーザー名}"
db_pass = "{DBに接続するユーザーのパスワード}"

# 踏み台サーバーの接続情報 (EC2を想定)
ssh_ip = "{サーバーのIPアドレス}"
ssh_user = "{SSH接続するユーザー名}"  # EC2のデフォルトであれば ec2-user
ssh_port = 22  # SS接続するポート(デオフォルトから変更している場合は要修正)
ssh_pkey = "{秘密鍵ファイルの配置パス}"  # .pem ファイルのパス

sql = "{実行したいSQL文}"

さて、さっそく行ってみましょう。単発で1個だけSQLを打って結果を取得したい、という場合、以下のコードで実行できます。
ローカル(手元のPCやMac)の 9999 番ポート (これは他で使ってなければ何番でもいい)への通信が、踏み台サーバーを経由してRDSに届くようになります。

from sshtunnel import SSHTunnelForwarder
from pymysql import cursors
from pymysql import connect


with SSHTunnelForwarder(
    ssh_address_or_host=(ssh_ip, ssh_port),  # 踏み台にするサーバーのIP/SSHポート
    ssh_username=ssh_user,  # SSHでログインするユーザー
    ssh_pkey=ssh_pkey,  # SSHの認証に使う秘密鍵
    remote_bind_address=(db_host, db_port),  # 踏み台を経由して接続したいDBのホスト名とポート
    local_bind_address=("localhost", 9999),  # バインドするローカル端末のホスト名とポート
) as tunnel:
    with connect(
            host="localhost",  # DBのエンドポイントではなく、ローカルの端末を指定する
            port=9999,  # これもDBのポートでは無く、バインドしたポート番号を指定する
            user=db_user,  # これ以下は普通にDB接続する場合と同じ引数
            password=db_pass,
            database=db_name,
            charset="utf8mb4",
            autocommit=True,
            cursorclass=cursors.DictCursor,
    ).cursor() as cursor:
        cursor.execute(sql)  # これでSQL実行
        rows = cursor.fetchall()  # 結果の取り出し

これで、通常はローカルからはアクセスできないDBへSQLを発行し、結果を変数rowsに取得することができました。SELECT文を打ったのであればpandasのDataFrame等に変換して使いましょう。

with文で変数をたくさん呼び出すインスタンスを使うのはコードの見栄えが非常に悪くなりますが、以下のように変数を事前に辞書にまとめておくと少しマシになります。

ssh_args = {
    "ssh_address_or_host": (ssh_ip, ssh_port),
    "ssh_username": ssh_user,
    "ssh_pkey": ssh_pkey,
    "remote_bind_address": (db_host, db_port),
    "local_bind_address": ("localhost", 9999),
}

db_args = {
    "host": "localhost",
    "port":  9999,
    "user":  db_user,
    "password":  db_pass,
    "database":  db_name,
    "charset":  "utf8mb4",
    "autocommit":  True,
    "cursorclass":  cursors.DictCursor,
}

with SSHTunnelForwarder(**ssh_args) as tunnel:
    with connect(**db_args).cursor() as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()

以上で、一応やりたいことはできると思いますが、発行したいSQLが複数ある場合かつ途中に別の処理も含むような場合一回ごとにポートフォワードとDBの接続をやり直していたらリソースの無駄です。(といっても、最近のコンピューター環境ならこれがストレスになる程時間かかるってことはないと思いますが。)

DBヘ接続しっぱなしにしておく場合は、withを使わずに次のように書きます。引数は上のコード例で作った、ssh_args, db_args をそのまま使います。

server = SSHTunnelForwarder(**ssh_args)
server.start()  # ポートフォワード開始

connection = connect(**db_args)  # DB接続
# 以上の3行で DBに接続した状態になる。

# 以下のようにして接続を使ってSQLを実行する。
with connection.cursor() as cursor:
    cursor.execute(sql)
    rows = cursor.fetchall()

# サンプルコードなのでSQLを1回しかやってないけど、続けて複数実行できる。

# 終わったらDB接続とポートフォワードをそれぞれ閉じる
connection.close()
server.stop()

これで、一つの接続を使い回すこともできるようになりました。

ちなみにですが、このsshtunnelで作ったポートフォワードの設定は端末単位で有効です。どういうことかというと、複数のPythonプロセス(例えば別々のJupyter notebook)間で、共有することができます。というより、Pythonに限らず他のプログラムからも使えます。
コンソールで、以下のコマンド使ってポートの動きを見ながら試すとよくわかります。

# 9999番ポートの利用状況を確認する
$ sudo lsof -i:9999

普段は何も結果が返ってこないか、ここまでのプログラムを実行してたらいろんな情報と共に(CLOSED)が返ってくると思いますが、ポートフォワードしている最中はESTABLISHEDになっていて、pythonが使っていることが確認できます。

特にPythonでDB操作したいという場合に限って言えば、別々のnotebookで操作するメリットなんて無いのですが、全く別の用途でポートフォワードだけPythonでやっておきたい、ということはあるかもしれないので、覚えておくと使う機会があるかもしれません。

コマンドやライブラリでHTTPアクセスした場合のUser Agentの初期値を調べた

curl コマンドでも Pythonの requests でもそうなのですが、使用するときにUser Agentを指定することができます。

では、もし何も指定しなかったらどんな値が設定されているんだろうかと気になったのですが、意外に情報が出てこなかったものが多いのであれこれ調べてみました。ついでに他の手段についても調べています。

調査方法について

やり方としては、AWS Lambda の関数URLで、User Agentを表示するHTMLを作ってそこにアクセスします。
参考: Lambda の関数URLで送信されたデータを扱う

Pythonの場合、リクエスト元のUser Agentは、以下のコードで取れます。

event["requestContext"]["http"]["userAgent"]

Pandasのread_htmlも試したかったので、こんな感じの Lambda 関数を作ってTableで返すようにしてみました。

def lambda_handler(event, context):
    html = """<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>{user_agent}
</table>""".format(user_agent=event["requestContext"]["http"]["userAgent"])
    
    return {
        "headers": {
            "Content-Type": "text/html;charset=utf-8",
        },
        'statusCode': 200,
        "body": html,
    }

ブラウザでアクセスすると、以下の結果が得られます。
まぁ、普通にMac Book Proの Chrome ブラウザですね。なぜか Safari の文字列も入っています。

<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36
</table>

ここから、色々試していきます。物によってはHTMLタグ部分省略してるのもありますが、LambdaはHTMLで結果返してきてます。

Curlコマンド編

まず、Curlコマンドです。 普通にさっきの lambda の 関数URLを叩きます。

$ curl https://{url-id}.lambda-url.{region}.on.aws/
<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>curl/7.79.1
</table>

# 参考 curl のバージョン情報
$ curl --version
curl 7.79.1 (x86_64-apple-darwin21.0) libcurl/7.79.1 (SecureTransport) LibreSSL/3.3.6 zlib/1.2.11 nghttp2/1.45.1
Release-Date: 2021-09-22
Protocols: dict file ftp ftps gopher gophers http https imap imaps ldap ldaps mqtt pop3 pop3s rtsp smb smbs smtp smtps telnet tftp
Features: alt-svc AsynchDNS GSS-API HSTS HTTP2 HTTPS-proxy IPv6 Kerberos Largefile libz MultiSSL NTLM NTLM_WB SPNEGO SSL UnixSockets

「curl/7.79.1」だけでしたね。超シンプルです。OSの情報とかゴテゴテついてくるかと思ってました。

この記事の本題からは外れますが、Curlは-A か -H オプションでUser Agentを指定できます。書き方が、違うので注意してください。-Aの方が簡単ですが、-Hの方が User Agent以外の設定とも統一的な書き方ができます。

$ curl https://{url-id}.lambda-url.{region}.on.aws/ -A "Chrome"
<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>Chrome
</table>

$ curl ≈ -H "User-Agent: Chrome"
<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>Chrome
</table>

設定できましたね。

ここからPythonのライブラリを見ていきます。

requests 編

ここからずっと同じURLを使うので、url って変数に入れておきます。

url = "https://{url-id}.lambda-url.{region}.on.aws/"

では、requestsでgetしてみましょう。

import requests


r = requests.get(url)
print(r.text)
"""
<!DOCTYPE html>
<table>
    <tr><th>User Agent
    <tr><td>python-requests/2.27.1
</table>
"""

これもまた随分シンプルなUser Agentでした。python-ってつくんですね。
2.27.1 は ライブラリのバージョンです。

$ pip freeze | grep requests
requests==2.27.1

別の値を設定したい場合は、headers 引数に curl の -H オプションのイメージで設定します。

headers = {'User-Agent': "Chrome"}
r = requests.get(url, headers=headers)
print(r.text)
"""
    # 必要な行だけ抜粋
    <tr><td>Chrome
"""

urllib 編

次はPython標準ライブラリのurllibです。

正直、requests に比べてはるかに使用頻度低いので軽く試すだけにします。

import urllib.request
 
urllib_response = urllib.request.urlopen(url)
urllib_html = urllib_response.read().decode("utf-8")  # バイナリなのでデコードが必要
print(urllib_html)
"""
    # 必要な行だけ抜粋
    <tr><td>Python-urllib/3.9
"""

Python-urllib/3.9 と、これもまたかなりシンプルでした。特徴的なのは、 3.9 とPythonのバージョン番号がついてきましたね。さすが標準ライブラリです。

Pandasのread_html 編

最後に、PandasでHTMLからテーブルを抽出できるread_htmlメソッドについて実験します。これはかなり便利なメソッドなのでいずれ専用記事で紹介したいですね。HTMLのテキストだけでなくURLを渡すこともでき、そのURLにアクセスしてHTML中のテーブルをデータフレームの配列で返してくれます。データフレームの配列で返してくるので、以下のコードでは[0]で最初の要素を取り出しています。

import pandas as pd


df = pd.read_html(url)[0]
print(df)
"""
          User Agent
0  Python-urllib/3.9
"""

結果はご覧の通り、Python-urllib/3.9 でした。先ほどの標準ライブラリの urllibが内部で動いているようです。Pandasっぽい値が設定されることなく、そのままurllibの情報が出てきました。

Pandasのドキュメントを見る限りでは、read_htmlで情報を取るときにUser Agentを指定する方法は用意されてないようです。

以上で、 curl / requests / urllib / pandas.read_html のデフォルトの User Agent がわかりました。分かったからといって特に使い道が思いついているわけではないですが、気になったことが調べられてよかったです。

Pythonのrequestsでログインが必要なページにアクセスする

タイトルにはログインと書きましたが要するにPythonのrequestsライブラリでセッションを扱う方法を書きます。

PythonでWebアクセスする場合、普通にURLにGETでアクセスして必要な情報が取れる場合だけでなく、事前にログインが必要なページの情報を取りたい場合があります。
requestsライブラリを使えばそれも比較的容易に実現できました。

基本的に、どのサイトでも同じような方法で対応可能ですが、例として今回はSBI証券のバックアップサイトで保有証券の一覧あたりを取って見ましょう。
バックアップサイトを使うのはHTMLがシンプルで見やすいからです。
ログイン画面のURLは https://k.sbisec.co.jp/bsite/visitor/top.do で、
ログイン後に取得したい保有証券一覧ページは、https://k.sbisec.co.jp/bsite/member/acc/holdStockList.do です。

通常、ログイン等のセッションはCookieを使って維持されます。
そのため、素直に考えれば以下の手順でログイン済みのページにアクセスすることができます。

– ログインフォームのPOST先URL(htmlのformタグでaction属性で指定されているURL)に、フォームの入力内容のデータ(ユーザーIDやパスワード)をPOSTする。
– レスポンスのCookieを取得する。
– そのCookie情報をヘッダーに付与した状態でアクセスしたかったページにアクセスする。

あとは、アクセスするたびに、Cookieを付与してGETなりPOSTなりのリクエストを送れば、ブラウザで操作するのと同じように、ログイン後の画面にアクセスできます。

ただ、非常にありがたいことに、requestsライブラリには、Sessionっていう専用のクラスがあって、これを使うと逐一レスポンスからCookieを取り出したり、ヘッダーに付与したりするすることを意識せずにセッションを維持できます。

参考: Advanced Usage – Session Objects

使ってみる前に、POST先の情報や、フォームの各要素のnameを把握しておく必要があるので、まずはログインしたいページのフォームのHTMLを見ておきます。

import requests
from bs4 import BeautifulSoup


response_0 = requests.get("https://k.sbisec.co.jp/bsite/visitor/top.do")
soup_login = BeautifulSoup(response_0.text)
print(soup_login.find("form"))
"""
<form action="https://k.sbisec.co.jp/bsite/visitor/loginUserCheck.do" method="POST" name="form1">
<b>ユーザネーム</b>:
                <div style="margin-top:5px;">
<input istyle="3" maxlength="32" name="username" size="15" style="width:115px" type="text" value=""/>
</div>
<br/>
<b>パスワード</b>:
                <div style="margin-top:5px;">
<input class="bsite_pass" istyle="3" maxlength="30" name="password" size="15" type="password" value=""/>
</div>
<br/>
<div style="margin-top:5px;">
<a href="/bsite/info/policyList.do?list=attention">お取引注意事項</a>に同意の上、ログインして下さい
                </div>
<br/>
<div align="center">
<input name="login" type="submit" value=" ログイン "/>   
                <input name="cancel" onclick="dataclear()" type="button" value="キャンセル"/>
</div>
<br/>
</form>
"""

input タグのnameを見ると、username と password で良いようですね。

Sessionクラスを使う方法

さて、POST先ですが、actionに https://k.sbisec.co.jp/bsite/visitor/loginUserCheck.do と絶対URLで入ってるので、これをそのまま使えば良いでしょう。サイトやフォームによっては相対URLなので注意してください。
ではいよいよやってみましょう。

username と passwordは自分のを使います。

username = "{ユーザーネーム}"
password = "{パスワード}"
login_url = "https://k.sbisec.co.jp/bsite/visitor/loginUserCheck.do"

# セッションのインスタンスを作成する。
session = requests.Session()
response_1 = session.post(
        url=login_url,
        data={
            "username": username,
            "password": password,
        }
    )

print(response_1.text)
# 長いので出力略
# ログイン後に表示される機能メニューのHTMLが得られている。

はい、この段階でログインができました。そして、このsessionオブジェクトを使うと、目当ての保有証券一覧ページにアクセスできます。

response_2 = session.get(
        url="https://k.sbisec.co.jp/bsite/member/acc/holdStockList.do"
    )
print(response_2.text)
# 長いのでこれも出力略

自分が持ってる株を公開するつもりもないので、出力完全に省略させていただきましたが、このページのHTMLをパースすると保有中の証券の一覧が得られます。

やっていることといえば、requests.get や requests.post の代わりに、最初にSessionオブジェクトを作って、それのgetやpostメソッドを使うというだけです。非常に手軽ですね。

おまけ: Sessionクラスを使わない場合どうするか

冒頭でCookieを付与して云々という手順を書いていますが、もちろんそのやり方でログインすることもできます。Sessionクラスを使うデメリットとか得なくて、自分でCookieの操作を実装する必要性無いと思うのですが、実は実験したのでそれ書いておきます。
次のようにすると、Session使わずにログイン後のページにアクセスできます。

# ログインフォームにデータをポストしログインする。
response_3 = requests.post(
        url=login_url,
        data={
            "username": username,
            "password": password,
        }
    )
# cookieを取り出す。
cookies = response_3.cookies

# cookieをヘッダーに付与してgetする。
response_4 = requests.get(
        url="https://k.sbisec.co.jp/bsite/member/acc/holdStockList.do",
        cookies=cookies,
    )

# 保有証券一覧ページのHTMLが得られる
print(response_4.text)

うまくいかなかった場合に検証すること

サイトによっては、ここまでの方法をそのまま適用してもうまくいかないことがあります。
だいたいその原因は何かしらのセキュリティ対策がなされているためです。
ユーザーエージェントの問題の場合もあると思いますし、サイトによっては、ユーザーIDやパスワードを入力する画面の表示時点でCookieを発行していて、それと整合性が取れない場合、不正な画面遷移としてログインさせてくれないこともあります。その場合は、Sessionのインスタンスを作って、そのインスタンスで一度ログイン画面をgetしてCookieを得ておく必要があるでしょう。

また、ログインフォームにhidden属性で非表示のinputタグを作っておき、そこにトークンのようなものを埋めておいてそれが無いとログインできないというケースもあります。
その場合は、HTMLから情報取り出して、それらのトークン情報をデータに加えてPOSTするなどの対応が必要になります。

ハードリンクとシンボリックリンク

新卒で就職したばかりの頃、UNIX/Linuxの研修で習ったような気がするけど忘れてしまっていた話を改めて調べたのでまとめておきます。
ちなみに、この記事はMacで検証しています。

概要説明

Windowsでは特定のファイルやフォルダーを別のフォルダーから開けるようにするショートカットという仕組みがあります。Macを含むUNIX系OSにも同様の仕組みがあり、それがリンクです。(正確には、ディスク上のファイルの実体と、ファイルパスを繋げる仕組みをリンクと言います。)

ここで、Windowsと違うのは、UNIX系のリンクにはハードリンクとシンボリックリンクの2種類がある点です。(実はWindowsでもハードリンクができるという話も聞きますが、あまり一般的ではないと思います。)

この後、実際にファイルを操作しながら具体的な挙動の違いを見ていきますが、ざくっと二つのリンクの違いを説明すると次のようになります。

ハードリンクは、一つの実体を持つファイルを表現するパスを複数作る方法です。
マニュアルにも、ハードリンクでは、リンクと元のファイルの区別はつかないと書かれています。
man ln から抜粋。
> A hard link to a file is indistinguishable from the original directory entry

それに対してシンボリックリンクは、リンク先のファイルへの参照を含みます。Windowsのショートカットに近いのはこちらです。

わかりにくいですね。

例えば、 file01.txt というファイルがあったとします。これは当然、ディスク上にあるデータが存在し、それに対して、file01.txt という名前でアクセスできることを意味します。

これに対して、file01.txt を file02.txtに「ハードリンク」したとします。すると、そのディスク上の同じデータが file02.txt というファイル名でもアクセスできるようになります。

一方で、file01.txt を file03.txt に「シンボリックリンク」したとします。すると、file03.txt は、file01.txt というパスへのリンクになります。その結果、file03.txtにアクセスしようとすると、file01.txtにアクセスすることになり、結局同じ実体のファイルにアクセスできることになります。

2種類のリンクの作成

まだわかりにくいのでやってみましょう。実際にファイルとリンクを作ってみます。
まず、サンプルのデータから。

# 検証用ディレクトリを作って移動
$ mkdir linktest
$ cd linktest
# サンプルデータ作成
$ echo "サンプルファイル1行目" > file01.txt
# 作成したファイルの情報を確認
$ ls -li
total 8
8795955 -rw-r--r--  1 {owner} {group}  34  6 12 17:51 file01.txt

ls コマンドで作ったデータを見るときに、-iオプションをつけてiノード番号も表示させました(先頭の8795955がそれ)。iノードというのは、ファイルの属性が記録されているデータのことで、iノード番号というのはそのデータについている番号のことです。このデータは実体のファイルと対応して存在しているので、実質的にディスク上のファイルの実体と対応している番号だと考えて大丈夫です。iノード番号が同じなら同じファイルを表しています。

さて、ここからリンクを作っていきましょう。リンクは ln コマンドで作ります。文法は次のとおりです。

# ハードリンクを作成する
$ ln リンク先 リンク名
# シンボリックリンクを作成する
$ ln -s リンク先 リンク名

やってみます。

# ハード/シンボリックでそれぞれリンクを作成する
$ ln file01.txt file02_hard.txt
$ ln -s file01.txt file03_symbolic.txt
# 確認
$ ls -li
total 16
8795955 -rw-r--r--  2 {owner} {group}  34  6 12 17:51 file01.txt
8795955 -rw-r--r--  2 {owner} {group}  34  6 12 17:51 file02_hard.txt
8796478 lrwxr-xr-x  1 {owner} {group}  10  6 12 18:03 file03_symbolic.txt -> file01.txt

はい、できました。

ls の結果を見ていきましょう。まずハードリンクの方(file02_hard.txt)です。
着目するべきは、最初のiノード番号で、元のファイルと全く同じになっています。これはfile02_hard.txtがfile01.txtと全く同じ実体ファイルにリンクしていること意味しており、同じデータに対して2個ファイル名があるような状態にになっています。ファイルサイズも同じ34バイトですね。
権限(-rw-r–r–) の後ろに 2 という数字がありますが、実はこれ、そのファイルへのリンク数です。元々1だったのがハードリンクを作成したことで2になっています。そして、ここからわかるのですが、シンボリックリンクの方はノーカウントです。(カウントされるなら3になるはず。) ちなみに、この数字がリンク数だっていう情報は、lsのマニュアル(man ls)のThe Long Format ってセクションに書かれています。

次は、シンボリックリンクの方(file03_symbolic.txt)を見ていきましょう。 -> で、元ファイルへのパスが書かれており、いかにもリンクって感じの表示になっていますね。
権限の最初にlがついて、lrwxr-xr-x となっていますが、このlはシンボリックリンクであることを示しています。そして、iノード番号は元のファイルと違うものが振られています。また、ファイルサイズも異なっていますね。file01.txtへの参照だけなのでサイズが小さいです。

当然ですが、全部同じデータを見てるので、中身を表示したら一致します。

$ cat file01.txt
サンプルファイル1行目
$ cat file02_hard.txt
サンプルファイル1行目
$ cat file03_symbolic.txt
サンプルファイル1行目

リンク元ファイルに変更が発生した場合の挙動

ここから、元ファイルに変更が発生した時のハードリンクとシンボリックリンクの挙動の違いを見ていきます。

まず、元ファイルに修正が入った場合、これはどちらのリンク方法でもそれぞれ反映されます。同じファイル見てるだけだからですね。1行追記してみてみましょう。

$ echo "サンプルファイル2行目" >> file01.txt
$ ls -li
total 16
8795955 -rw-r--r--  2 {owner} {group}  64  6 12 18:23 file01.txt
8795955 -rw-r--r--  2 {owner} {group}  64  6 12 18:23 file02_hard.txt
8796478 lrwxr-xr-x  1 {owner} {group}  10  6 12 18:03 file03_symbolic.txt -> file01.txt

# 念のため中身も見る
$ cat file01.txt
サンプルファイル1行目
サンプルファイル2行目
$ cat file02_hard.txt
サンプルファイル1行目
サンプルファイル2行目
$ cat file03_symbolic.txt
サンプルファイル1行目
サンプルファイル2行目

はい、 echo で追記したのはfile01.txtだけですが、3ファイルとも2行目が増えましたね。ファイルサイズが増えたのはfile01.txt/ file02_hard.txt だけで、file03_symbolic.txtはそのままなので想定通りです。ここまでは、ハードリンク、シンボリックリンク共に違いはありません。

ここからが差が発生することです。もし、このfile01.txtがリネームされたり削除されたりして、そのパスに存在しなくなったらどうなるでしょうか。
試しに消してみます。

$ rm file01.txt
# 1ファイル消えて2ファイル残っている。
$ ls -li
total 8
8795955 -rw-r--r--  1 {owner} {group}  64  6 12 18:23 file02_hard.txt
8796478 lrwxr-xr-x  1 {owner} {group}  10  6 12 18:03 file03_symbolic.txt -> file01.txt
# ハードリンクの方は元ファイルがなくなっていても開ける
$ cat file02_hard.txt
サンプルファイル1行目
サンプルファイル2行目
# シンボリックリンクの方はリンク先ファイルがなくなると開けない
$ cat file03_symbolic.txt
cat: file03_symbolic.txt: No such file or directory

はい、ここで大きな差が生じました。元ファイルが消えてしまったのですが、ハードリンクのファイルは、元ファイルのパスと関係なくディスク上のファイルの実体にリンクされていたので元ファイルのパスが消えてしまっても問題なく開くことができます。(消したと思ったファイルが残ってる、というのがデメリットになるケースもありそうですが。)
ちなみに、ls -li の結果のリンク数は1個になっていますね。

一方で、シンボリックリンクの方は、元ファイルへの参照しか情報を持っていなかったので、元ファイルのパスがなくなってしまうとデータにアクセスができなくなってしまっています。

さて、次はその逆の操作です。元々存在してたファイルと同じパスで、再度ファイルが作成されたらどうなるでしょうか。この時の挙動もそれぞれ異なります。やってみましょう。

# あたらめてリンク先ファイル作成
$ echo "新規作成したファイル1行目" > file01.txt
$ ls -li
total 16
8798109 -rw-r--r--  1 {owner} {group}  38  6 12 18:37 file01.txt
8795955 -rw-r--r--  1 {owner} {group}  64  6 12 18:23 file02_hard.txt
8796478 lrwxr-xr-x  1 {owner} {group}  10  6 12 18:03 file03_symbolic.txt -> file01.txt

新しい file01.txt は 元のと異なるiノード番号で作成されましたね。これは要するに、ファイル名は同じだけど実データとしては元々存在してたfile01.txtとは異なるファイルであることを意味します。そいて、よく見ていただきたいのは、file01.txt と file02_hard.txt のファイルサイズが違うことです。もはや同じファイルは指し示しておらず、file02_hard.txtは元々のファイルにリンクされていますね。それぞれ開くと明らかです。

$ cat file01.txt
新規作成したファイル1行目
$ cat file02_hard.txt
サンプルファイル1行目
サンプルファイル2行目
$ cat file03_symbolic.txt
新規作成したファイル1行目

はい、ハードリンクしていたfile02_hard.txtはもう完全にfile01.txtとは別ファイルになってしまいました。
一方で、リンク先がなくなって開けなくなっていたシンボリックリンクの方(file03_symbolic.txt)は、同じパスのファイルができたら自動的にそこにリンクされてfile01.txtと同じデータが参照できるようになりました。

ハードリンク、シンボリックリンクのどちらを選ぶかは、これらの挙動を踏まえて決めるのが良いと思います。

その他の違い

以上で元ファイルの編集に関するリンクごとの挙動の違いを書いて来ましたが、他にも少し違いがありますのでまとめておきます。

まず、ファイルではなくディレクトリに対しては、ハードリンクは作成できず、シンボリックリンクのみ作成できます。

# 実験用ディレクトリ作成
$ mkdir subfolder01

# ハードリンクは作れない
$ ln subfolder01 subfolder02
ln: subfolder01: Is a directory

# シンボリックリンクは作れる
$ ln -s subfolder01 subfolder02
$ ls -li
8799037 drwxr-xr-x  2 {owner} {group}  64  6 12 18:46 subfolder01
8799038 lrwxr-xr-x  1 {owner} {group}  11  6 12 18:46 subfolder02 -> subfolder01

また、別の違いとして、ハードリンクはパーティションなどファイルシステムを跨いで作ることはできないというものもあります。ハードリンクを作れるのは同じパーティション内のみだけです。理由はiノード番号の管理が違うからだそうです。
一方でシンボリックリンクはどこでも作れます。

あとは、ハードリンク(元のファイルパスも含む)を全部消すと、ファイル自体が削除されてしまいますが、シンボリックリンクは消しても元のファイルに影響がないとか細々した違いがあります。

ハードリンクしているファイルを探す方法

ls コマンドでそのファイルへのリンク数が表示されますが、 どこからリンクされているか探したくなることはあると思います。そのファイルを確実に消したい時などは一通り洗い出す必要ありますし。

良い探し方を調べていたのですが、今のところ、find コマンドで iノード番号を調べて検索する以外になさそうです。-inum 引数で指定できます。

今やっているサンプルは同じディレクトリ配下なので楽勝ですが、遠いパスに作っていたらかなり広範囲をfindで探さないといけないですね。

# 実験のためハードリンクを作成する 
$ ln file01.txt file04.txt
# iノード番号を調べる
$ ls -i file01.txt
8798109 file01.txt
# find コマンドで該当のiノード番号を持つファイルを探す
$ find . -inum 8798109
./file04.txt
./file01.txt

余談: 何が発端でこれを調べていたのか

なんで今になってこんなのを調べているのかというと、実は個人的に開発してるプロジェクトがあって、そのコード管理に使いたかったからです。

ほとんどのソースコードはプロジェクトのディレクトリ配下に格納されていてそこでgit管理されているのですが、ごく一部/etc/の配下とか、ホームディレクトリのドット付き隠しフォルダの下とかで作成する必要上がります。

これらをどうやって管理しようかなと思ったときに、プロジェクトのディレクトリ内に実体ファイルを作って、それらの本来の配置場所にリンクを貼れば単一のリポジトリで管理できるじゃないかと思いつきました。で、その時のリンク方法が2種類あったのでどっちがいいのかというのが発端になります。

この用途だと、git管理してるファイルと、稼働してるファイルを確実に同期させたいのでシンボリックリンクの方が良さそうですね。

まぁ、その他、brewで入れたソフトウェアとかMeCabの辞書のファイルたちとか自分の環境内でシンボリックリンクで稼働しているファイルはいろいろ存在し、これらについても理解を深められたのは良かったです。

Lambda の関数URLで送信されたデータを扱う

前回に引き続き、Lambdaの関数URLの話です。URLでLambdaを起動できるのは便利ですが、せっかくならアクセスするときに何かしらの情報を渡して処理を変えたいということは多くあると思います。その場合に、あり得るパターンの数だけLambda関数と個別のURLを用意しておくというのは現実的ではありません。
普通は、世の中のたいていのAPIがやっているように、クエリパラメーターや、POSTされたデータに応じて挙動を変える作りにします。

受け取った値に応じて挙動を変えるっていうのは、ただのPythonのコーディングの話なので、今回の記事では、どうやってクエリパラメーターとかPOSTされたデータを受け取るかって部分を扱います。これまでAPI GatewayでAPI作ってたような人にとっては常識的なことしか書いてないと思いますがご了承ください。(僕はWebエンジニアではなく、これまでLambdaは手動かスケジュール実行で使って来たのでこの辺の挙動に詳しくないのです。)

さて、前置きが長くなって来ましたが、実はタネは非常に簡単で、Lamda関数の定義にデフォルトで入っている引数「event」、これが今回の主役です。

def lambda_handler(event, context):  # この第一引数eventに欲しいデータが渡される
    # 処理の中身

関数URLからLambdaが呼び出されたとき、lambda_handlerの第一引数には、以下のドキュメントのリクエストペイロードと呼ばれているデータが渡され、関数中ではeventという変数名で扱えます。
参考: Lambda 関数 URL の呼び出し – AWS Lambda の リクエストペイロードの形式 の部分

以下のような関数を作って実際に動かしてみるのが一番わかりやすいと思います。
テキストエリア2個と送信ボタンを持つフォームをGET/POSTでそれぞれ作成し、event変数の中身を表示します。

import pprint


def lambda_handler(event, context):
    html = """
<!DOCTYPE html>
<h2>GETのフォーム</h2>
<form action="/" method="get">
    <input type="text" name="text1"><br>
    <input type="text" name="text2"><br>
    <input type="submit">
</form>
<h2>POSTのフォーム</h2>
<form action="/" method="post">
    <input type="text" name="text3"><br>
    <input type="text" name="text4"><br>
    <input type="submit">
</form>
    """

    html +="<h2>eventの内容</h2>\n"
    html += "<pre>" + pprint.pformat(event, compact=True) + "</pre>"

    return {
        "headers": {
            "Content-Type": "text/html;charset=utf-8",
        },
        'statusCode': 200,
        "body": html,
    }

上記のコードで関数を作成し、関数URLからアクセスすると、フォーム二つとpprintで成形されたeventの辞書データが表示されます。また、それぞのフォームに値を入れて送信すると、eventのどこに結果が入るかが確認できます。
(キャプチャとか貼るとわかりやすいと思うのですが、idっぽいものが大量にあるので省略させていただきます。上のコードを試してただくのが一番良いです。)

まず、メソッド(GET/POSTなど)の区別ですが、次の部分にあります。
Pythonで言えば、 event[“requestContext”][“http”][“method”] で取得できますね。

{
    # 略
    'requestContext': {
        # 略
        'http': {
            # 略
            'method': 'GET',
        }
    }
}

それでは、 GET/ POST 順番にフォームを実行して、どのように値が取れるか見て行きましょう。
フォームには以下の値を入れて送信します。
1つ目のテキスト: あ&い=う%え,お
2つ目のテキスト: abc?123

フォームからGETで送信する場合、URLの末尾に入力内容が添付され、次のURLにアクセスされます。特殊文字はパーセントエンコーディングされていますね。

https://{url-id}.lambda-url.{region}.on.aws/?text1=あ%26い%3Dう%25え%2Cお&text2=abc%3F123

eventの中ではevent[“rawQueryString”] と event[“queryStringParameters”]の部分に現れます。
クエリ文字列がないときは、rawQueryStringは空文字””ですが、queryStringParametersの方はキー自体が存在しないので、コードで使うときは注意してください。

{
    # 略
    'queryStringParameters': {'text1': 'あ&い=う%え,お', 'text2': 'abc?123'},
    # 略
    'rawQueryString': 'text1=%E3%81%82%26%E3%81%84%3D%E3%81%86%25%E3%81%88%2C%E3%81%8A&text2=abc%3F123',
    # 略
}

上記の内容で分かる通り、rawQueryStringの方はURIエンコーディングされていますが、queryStringParametersの方は使いやすいように辞書型にパースしてURIエンコードも元に戻してくれています。こちらを使って行きましょう。
フォームに入れた値がURLに載ってしまうというのは大きなデメリットですが、ぶっちゃけると単純にテキスト等を送るフォームならPOSTよりGETの方が渡した値使いやすいな、と感じています。(Web系開発素人の発想ですが。)

続いて、POSTの場合です。個人的にはこの種のフォームは普通はPOSTで使うものだと思っています。
POSTの場合は、GETと違って少しわかりにくく、eventを表示しても送ったデータがそのままでは見つかりません。ではどこにあるのかというと、実は event[“body”]に、base64エンコードされて入ってます。これはフォームからPOSTした場合の挙動です。curl等でテキストのままポストしてあげればそのまま表示されます。
base64の判定は、event[“isBase64Encoded”] で行います。

{
    # 略
    'body': 'dGV4dDM9JUUzJTgxJTgyJTI2JUUzJTgxJTg0JTNEJUUzJTgxJTg2JTI1JUUzJTgxJTg4JTJDJUUzJTgxJThBJnRleHQ0PWFiYyUzRjEyMw==',
    # 略
    'isBase64Encoded': True,
    # 略
}

base64なので自分でこれをデコードする必要があります。
参考: PythonでBase64エンコードとデコード

該当部分のコードだけ書くとこんな感じです。

import base64

base64.b64decode(event["body"]).decode()
# text3=%E3%81%82%26%E3%81%84%3D%E3%81%86%25%E3%81%88%2C%E3%81%8A&text4=abc%3F123

パーセントエンコードされていますね。これを以下の手順で処理する必要があります。
&で区切って、フォームの各要素ごとの値に分割する。
=で区切ってキーと値のペアに変える。
パーセントエンコードをデコードする。

自分でやるのは面倒なので、ライブラリ使いましょう。
参考: PythonでURL文字列を要素に分解する

次のようになります。

import base64
from urllib.parse import parse_qs


parse_qs(base64.b64decode(event["body"]).decode())
# {'text3': ['あ&い=う%え,お'], 'text4': ['abc?123']}

キー対値 の辞書ではなく、 キー対値の配列 の辞書が結果として得られるので注意が必要です。特殊文字たちも元の形に戻っていますね。

さて、以上で関数URLにクエリストリングに付加されたデータや、POSTされて来たデータをLambdaの関数で取り出せるようになりました。

あとはこれを受け取ってそれに応じた処理をする関数を作るだけで、柔軟な処理を行えるようになると思います。