Pythonの標準ライブラリで手軽にWebサーバーを立てる方法

掲題の通りの記事です。Macには標準でhttpdが入ってるのでどこまでニーズがあるのか不明ですが、Python環境があると標準ライブラリで簡易的なWebサーバーを立てることができます。

手元のメモ等をテキストファイルで保存している場合、そのディレクトリをドキュメントルートとしてサーバーを起動すると、ブラウザから参照できるようになるので便利なことがあります。

もちろん、Web開発をやる人であればローカルでWebサービスのテストを行うと言った普通のWebサービスとしての活用もできますね。

利用するのは http.server モジュールです。
参考: http.server — HTTP servers — Python 3.12.3 ドキュメント

ドキュメントを開くといきなり、
警告 http.server is not recommended for production. It only implements basic security checks.
と出てくる通り、本番環境での活用は推奨されていない、あくまでも簡易的なものです。

本当に一番シンプルな使い方は、 `python -m http.server` というコマンドを打つだけです。

% python -m http.server
Serving HTTP on :: port 8000 (http://[::]:8000/) ...

上記の通り、デフォルトでは8000番ポートを使ってWebサーバーが起動します。
ドキュメントルートはコマンドを実行したディレクトリなので、 index.html ファイルを置いておくと、ブラウザで、 http://localhost:8000/ にアクセスするとそのindex.htmlファイルの中身が表示されます。

index.html ファイルがないと、 そのディレクトリに置かれているファイルリストが、
Directory listing for /
として表示されます。

これは通常の世界に公開するWebサーバーではディレクトリ構造が明らかになってしまうセキュリティ上欠陥のある仕様ですが、手元のメモへのアクセスのために使っている僕にとっては大変便利な仕様です。

下記のように、数値を渡すことでポート番号を変更することもできます。

% python -m http.server 9000

また、ドキュメントルートをカレントディレクトリではなく指定した場所にしたい場合は、-d か –directory 引数を使って次のように書きます。相対パスと絶対パスのどちらもサポートされています。

% python -m http.server --directory /tmp/

(自分の慣れだけの問題なのですが)個人的にはhttpdのサービスの起動をするよりも手軽だと感じているので、ちょいとWebサーバー建てたいな、という場面があればぜひ試してみてください。

最高密度区間 HDI (Highest Density Interval) について

これはpyMCに限った話ではないのですが、pyMC + ArviZ を使っていると頻繁に目にする指標なのでほぼpyMC関連の記事6記事目と見ていただいて良いと思います。

pyMCの結果を見ているとhdiという指標を頻繁に目にします。僕はこれについてあまり馴染みがなかったので説明を残しておこうと思います。

各記事のpyMCのサンプリング結果のsummaryをDataFrameにしたときも登場していますし、可視化したらグラフ中にも登場しますね。

参考: ArviZを使ってpyMCの推論結果を可視化する

このHDIは、確率分布の区間を切り取ったものです。指定の確率、(一般的には95%だそうですが、) ArviZでは94%がデフォルトで指定されており、その区間に含まれる確率が94%となるような区間を示します。

似たような概念で信頼区間や予測区間というのもありますが、違いはその区間の切り取り方です。信頼区間や予測区間では大抵、区間を切り取るとき、確率分布の上側と下側から同じ確率を切り抜きます。94%区間を取りたいのであれば、下から3%と上から3%を外して残りを取り出します。

それに対して、最高密度区間(HDI)では、その名の通り、確率密度が高い部分から優先的に確保して、指定の確率、今の例であれば94%となるように区間を切り出します。

山が一つあるような形の確率分布$f(x)$であれば、次を満たす区間$[a, b]$だと言えるでしょうか。$f(x)$が連続とか滑らかとか適切な過程が他にも必要かもしれません。

$$\begin{align}
&\int_{a}^{b} f(x)dx = 0.94\\
&f(a) = f(b)\\
&a < c < b \Rightarrow f(a) = f(b) < f(c)
\end{align}$$

注意として、確率分布が複数の山を持つような形だった場合、HDIは単一の区間ではなく、ふくすの区間に分かれて定義されることがあります。pyMCやArviZがそのような例に対応しているかどうかはちょっと確認しきれていません。(まだそういう例に出くわしたことがないので)

HDIの良い性質として、その指定の確率を含む区間の切り取り方としては一番狭い幅の区間になるというものがあります。ベイズ推論に限った話ではなく、何か数値を予想するのであればできるだけ狭いレンジに絞り込んで推論したいのでこれはありがたいですね。

pyMCで変化点検出

pyMC5の記事の5記事目です。
今回はpyMCをつかって変更点の検出をやってみます。といっても、題材が変更点の検出というだけでメインで取り上げたいのはデータ列の途中で期待値が変わるモデルを扱いたいってのと、事後分布からのサンプリングをやりたいっていう点の2点です。

題材の紹介

題材としては、少し古い本なのですが「Pythonで体験するベイズ推論」という、キャメロン・デビッドソン=ピロンさんの本から持ってきます。これの、受信するメッセージ数の変化がテーマです。

なぜこんな古い本を題材にしたかというと、新しい本を題材にするとそのままコードを書き写す感じになるので勉強にならないからです。この本のコードはpyMC3なのでpyMC5に焼き直すとコードは大幅に変わります。

サンプルデータはこちらのリポジトリにあります。

ただ、ここからダウンロドしてくるのも手間ですし、ただの整数列なのでそのままこの記事にも書いておきます。ちなみに、全部で74日分です。

data = [13, 24, 8, 24, 7, 35, 14, 11, 15, 11, 22, 22, 11,
 57, 11, 19, 29, 6, 19, 12, 22, 12, 18, 72, 32, 9, 7, 13,
 19, 23, 27, 20, 6, 17, 13, 10, 14, 6, 16, 15, 7, 2, 15,
 15, 19, 70, 49, 7, 53, 22, 21, 31, 19, 11, 18, 20, 12, 35,
 17, 23, 17, 4, 2, 31, 30, 13, 27, 0, 39, 37, 5, 14, 13, 22
]

結果は最後に図示しますが、これ途中から件数が増えているように見えるのですよ。
それが、何日目から増えていて、その前後は期待値何件だったのか、というのをpyMCで推論していきます。

pyMCによるモデリング

ではやっていきましょう。

データが整数値なので、メッセージの件数はポアソン分布に従うとし、前半と後半で期待値$\lambda$が違うものとします。それぞれの$\lambda$はガンマ分布から持ってきましょう。
そして、変化する日はtauという変数名で離散一様分布からサンプリングします。

tauより前の日と以降の日で期待値が変わる点については、pm.math.switchを使って実装しました。この辺は本のコードと違います。

import matplotlib.pyplot as plt
import pymc as pm
import arviz as az


model = pm.Model()

with model:
    # 変化前の期待値
    lambda_1 = pm.Gamma('lambda_1', alpha=2, beta=1)
    # 変化後の期待値
    lambda_2 = pm.Gamma('lambda_2', alpha=2, beta=1)
    # 変化日
    tau = pm.DiscreteUniform('tau', lower=0, upper=len(data)-1)

    # idxは各データ点のインデックス
    idx = np.arange(len(data))
    # lambda_の値がtauの値によって決定される
    lambda_ = pm.math.switch(tau >= idx, lambda_1, lambda_2)
    
    # データの尤度
    obs = pm.Poisson('obs', mu=lambda_, observed=data)
    # サンプリング
    trace = pm.sample(draws=10000, tune=10000, chains=2)

サンプリングは少し多めに行っています。
結果を見ておきましょう。

print(az.summary(trace))
"""
	mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
tau	43.221	0.875	42.000	44.000	0.016	0.011	3054.0	4512.0	1.0
lambda_1	17.406	0.617	16.275	18.576	0.004	0.003	18857.0	14591.0	1.0
lambda_2	22.035	0.859	20.354	23.597	0.007	0.005	15301.0	14084.0	1.0
"""

az.plot_trace(trace)
plt.tight_layout()

トレースを可視化したものがこちらです。

一瞬怪しいところがありますがtau = 44 あたりで変更しているのがわかりますね。

そこを閾にlambdaが増えていそうです。(17.4くらいから22くらいに。)

0日目から40日目くらいはでは、期待値17.4くらいで、45日目以降は22くらいと見て良さそうです。さて、その途中の41〜44日目はどう考えましょう?となった時に便利なのが事後分布からのサンプリングです。tauの分布を考慮していちいち計算しなくても、さっとサンプリングしてしまうことでそれぞれの日のメッセージ受信数の期待値の概算がわかります。

事後分布からのサンプリングに使うのが、 pm.sample_posterior_predictive です。これはモデルではなく、traceの方を渡してサンプリングを行います。サンプリングされるサイズが、最初に推論した時のステップ数に依存してしまうので、実はさっきサンプリングする時にdrawを大きめの値にしていたのです。draws=10000, chains=2 だったので、20000サンプルが得られます。

参考: pymc.sample_posterior_predictive — PyMC dev documentation

さて、やってみます。

# 既存のモデルとトレースを使用して、事後予測サンプルを生成
with model:
    # traceから事後予測サンプルを生成する際の修正
    posterior_predictive = pm.sample_posterior_predictive(trace, var_names=['obs'])

# 'obs'キーを使用して事後予測サンプルのデータを取得
posterior_obs = posterior_predictive.posterior_predictive['obs'].values

# 事後予測サンプルの期待値(平均)を計算
expected_values = np.mean(posterior_obs, axis=(0, 1))

axis=(0, 1) としているのは元の posterior_obs.shape が (2, 10000, 74) だからです。

これで、obsのサンプリング結果の期待値が得られました。元のデータと合わせて可視化してみましょう。

fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1)
ax.bar(range(len(data)),data)
ax.plot(expected_values, c="orange")
plt.show()

いい感じですね。

pyMCで線形回帰分析

pyMCの記事4記事目です。これまでの記事では観測値だけ与えてそれを生成する確率分布を考えてきましたが、今回は観測値だけでなく何か特徴量を持つデータを考えます。
その最も単純な例として1変数の線形回帰をやってみましょう。特徴量が増えて重回帰分析になってもほとんど同じように対応できるので汎用性は高いと思います。

データの準備

何のデータを使ってもいいのですが、今回はscikit-learnのiris使います。3種類のアヤメのうち、virginicaに絞って、petal length (cm)からsepal length (cm)を予測するモデルを考えてみましょう。(相関係数が0.86くらいあって予測が簡単なのです。)

次のようにしてデータを取得します。

%%pycodestyle
import pandas as pd
from sklearn.datasets import load_iris


# irisデータ取得
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df["label"] = iris.target
# virginica に絞る
df = df[df.label == 2].reset_index(drop=True)

# 回帰分析に使う特徴量xと目的変数yを取得
x = df["petal length (cm)"].values
y = df["sepal length (cm)"].values

モデルの実装

データが揃ったらpyMCでモデルを作っていきます。

回帰係数を a, 定数項をb、誤差をε とするとモデルの式はこのようになりますね。
$$y= ax + c + \epsilon$$

ベイズで行いますので、それぞれに事前分布が必要です。
a, c の事前分布は期待値が0、標準偏差が10の正規分布としましょう。
そして、誤差項εは、期待値が0で、標準偏差がσの正規分布に従うとし、このσは標準偏差が10の半正規分布に従うとします。

これを実装してきますが、今回新たに使うのは、特徴料等の定数を格納する pm.ConstantData と 数式を定義できるpm.Deterministic です。

参考:
pymc.ConstantData — PyMC dev documentation
pymc.Deterministic — PyMC v5.6.0 documentation

正確には、 ConstantData の方は使わなくてもいいのですが、明示的に書いておくとモデルを可視化した時に定数部分も表示されるので便利です。

実際にコードを見ていただくと使い方がわかると思うのでやっていきましょう。例によって、Graphvizで可視化してJpyterで表示しています。

import numpy as np
import matplotlib.pyplot as plt
import pymc as pm
import arviz as az


model = pm.Model()

with model:
    x_data = pm.ConstantData("x_data", x)
    y_data = pm.ConstantData("y_data", y)

    # 回帰係数と定数項
    a = pm.Normal("a", mu=0, sigma=10)
    c = pm.Normal("c", mu=0, sigma=10)

    # yの期待値
    mu = pm.Deterministic("mu", a*x_data + c)
    # 誤差
    sigma = pm.HalfNormal("sigma", sigma=10)

    # 観測値
    obs = pm.Normal("obs", mu=mu, sigma=sigma, observed=y_data)

# モデルの可視化
g = pm.model_to_graphviz(model)
display(g)

参考ですが、ConstantDataを使わない場合はこうなります。

model2 = pm.Model()

with model2:

    # 回帰係数と定数項
    a = pm.Normal("a", mu=0, sigma=10)
    c = pm.Normal("c", mu=0, sigma=10)

    # yの期待値
    mu = pm.Deterministic("mu", a*x + c)
    # 誤差
    sigma = pm.HalfNormal("sigma", sigma=10)

    # 観測値
    obs = pm.Normal("obs", mu=mu, sigma=sigma, observed=y)

g = pm.model_to_graphviz(model2)
display(g)

モデルができたのでサンプリングして結果を見ていきましょう。

with model:
    trace = pm.sample(random_seed=42, chains=2)


display(az.summary(trace, var_names=["a", "c", "sigma"]))
"""
	mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
a	0.995	0.084	0.828	1.143	0.003	0.002	641.0	636.0	1.0
c	1.066	0.470	0.216	1.988	0.019	0.013	648.0	679.0	1.0
sigma	0.331	0.035	0.259	0.391	0.001	0.001	554.0	349.0	1.0
"""

az.plot_trace(trace, var_names=["a", "c", "sigma"], compact=False)
plt.tight_layout()

いい感じに推定できていますね。

回帰直線の可視化

せっかく単回帰したので、回帰直線を可視化してみたいと思います。上記のsummaryのa,cで可視化してもいいのですがせっかくなのでサンプリングの各ステップの値で可視化してみましょう。


# a, cの各ステップの値を取得
a_list = trace.posterior.a.values.ravel().reshape(-1, 1)
c_list = trace.posterior.a.values.ravel().reshape(-1, 1)

# xの範囲
x_values = np.array([4.4, 7.0])

# a, c の各値からyの値を計算
y_preds = x_values * a_list + c_list

# 回帰直線と元のデータを可視化
for y_pred in y_preds:
    plt.plot(x_values, y_pred, lw=1, alpha=0.01, c="c")
plt.scatter(x, y)
plt.show()

なかなか妥当な結果が得られましたね。

まとめ

今回は線形回帰を題材として取り上げましたが、線形回帰に限らず特徴量を使うモデリングは同じようにして実装していくことができます。pm.Deterministicを使うと一気に実装の幅が広がりますのでぜひ試してみてください。

pyMCを使用した事前分布からのサンプリング方法

pyMCの3つ目の記事です。

このブログではまだ超単純なものしか扱っていませんが、pyMCではかなり柔軟にモデルを構築できます。そうなってくると、自分が実装したモデルが妥当なものなのか、思うような事象を表現できているのかといったことをサンプリング前に確認しておきたくなるものだと思います。

そのような場合に備えて、pyMCでは、ベイズ推論を行う前に事前分布からそのままサンプリングを行い結果を確認するメソッドとして、 pm.sample_prior_predictive() というものが用意されています。
参考: pymc.sample_prior_predictive — PyMC dev documentation

使い方は簡単で、モデルを組んだ後にsample()の代わりにこれを呼び出すだけです。引数としてはサンプリングしたいデータ数を渡します。

では具体的にやってみましょう。今回は二項分布あたりを扱ってみましょうかね。$n=10$くらいで、$p$は0から1の一様分布からサンプリングしてみましょう。通常二項分布って$np$を期待値として山形になるのですが、$p$が固定されていないのでそうはならない例をお見せできると思います。

import pymc as pm
import arviz as az
import matplotlib.pyplot as plt


with pm.Model() as model:
    # 事前分布を設定
    p = pm.Uniform('p', lower=0, upper=1)
    y = pm.Binomial('y', n=10, p=p)

    # 事前分布からサンプリング
    prior_predictive = pm.sample_prior_predictive(samples=1000)

このサンプリングはかなり短時間で終わります。複雑なモデルの通常のサンプリングは結構時間がかかるので、その意味でも事前確認をしておくメリットはありますね。

サンプリングしたら結果を確認します。prior_predictive って変数に結果が確認されていますが、`prior_predictive.prior[“p”]`や`prior_predictive.prior[“y”]`のような形式でサンプリング結果を取り出せます。

arviz使って確認しましょう。

fig = plt.figure(figsize=(12, 5), facecolor="w")
ax = fig.add_subplot(1, 2, 1, title="p")
az.plot_dist(prior_predictive.prior["p"], ax=ax)
ax = fig.add_subplot(1, 2, 2, title="y")
az.plot_dist(prior_predictive.prior["y"], ax=ax)
plt.show()

結果がこちらです。

サンプルサイズ(1000と設定しました。デフォルトは500です。)が小さいのか、ちょっとpの分布がボコボコしていますが概ね想定通りの結果が得られましたね。

ArviZを使ってpyMCの推論結果を可視化する

pyMCの記事2記事目です。
前回非常にシンプルなモデルで推論をやりましたが、今回はその結果を可視化する便利なライブラリである、ArviZの紹介です。

ArviZはpyMCに限らず、ベイズモデルの分析や可視化、比較等を行うライブラリです。ベイズ推論の結果の分析に特化しているだけあって非常に多くの機能を持っています。
参考: ArviZ: Exploratory analysis of Bayesian models — ArviZ 0.18.0 documentation

今回は特に利用頻度が高いと思われるメソッドに絞って紹介していきます。

例としては前回の記事で作った単純なモデルを使います。前回のコードを走らせてサンプリングした結果が trace という変数に入ってるという前提で見ていってください。

参考: PyMC version 5 超入門

推論結果のサマリーをまとめる

最初に紹介するのは推論結果を統計値で返してくれる、az.summary()です。これだけは可視化ではない(グラフ等での表示ではない)のですがよく使うのでこの記事で紹介します。
参考: arviz.summary — ArviZ 0.18.0 documentation

pm.summary()とほとんど同じ挙動なのですが、トレース結果の統計値をDataFrame形式で返してくれます。

import pymc as pm
import arviz as az


az.summary(trace)
# 以下結果
"""
mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
mu	3.157	0.192	2.774	3.501	0.003	0.002	4182.0	2738.0	1.0
sigma	1.933	0.133	1.687	2.184	0.002	0.002	3934.0	3009.0	1.0
"""

各変数の平均値等が確認できるので便利ですね。

サンプルの系列(トレース)を可視化する

次に紹介するのは az.plot_trace()です。MCMCの可視化としては一番ポピュラーなやつではないでしょうか。
参考: arviz.plot_trace — ArviZ 0.18.0 documentation

今回用意している例では2変数を4系列で1000ステップサンプリングしていますので、その分布とトレースを一気に可視化してくれます。
手元で試したところ、ちょっとラベルが重なっていたので、matplotlibのメソッドを一つ呼び出して調整しています。

import matplotlib.pyplot as plt


az.plot_trace(trace)
plt.tight_layout()
plt.show()

出力結果がこちらです。

いい感じですね。

事後分布を可視化する

さっきのトレースの左半分にも表示されてはいるのですが、本当に欲しい結果は事後分布です。それを表示することに特化しているのが az.plot_posterior() です。
参考: arviz.plot_posterior — ArviZ 0.18.0 documentation

やってみます。

az.plot_posterior(trace)
plt.show()

出力がこちら。事後分布と変数名、期待値等やhdiなどを可視化してくれましたね。

フォレストプロットで可視化する

フォレストプロットとは何か?というのは実物を見ていただいた方が早いと思うのでやってみます。2変数なのでいまいちありが分かりにくいと思いますが、変数の数が多いとこれは非常に便利です。
参考: arviz.plot_forest — ArviZ 0.18.0 documentation

az.plot_forest(trace)
plt.show()

結果がこちら。

サンプリングの系列ごとに可視化してくれていますね。引数で、 combined=True を一緒に渡すと、系列をまとめて変数ごとに集計してくれますよ。

kind等の引数で見た目を変えていくこともできるのでドキュメントを参照していろいろ試してみてください。

分布を可視化する

最後は、ちょっと特殊です。AzviZにはnumpyの配列などを受け取って単純に分布を表示するメソッドなども用意されています。それが、az.plot_dist()です。
参考: arviz.plot_dist — ArviZ 0.18.0 documentation

これは、numpy配列(要するにarray)を受け取るので、先ほどまでの例のようにtraceをそのまま渡せません。pyMCの事後分布を可視化したいのであれば、traceからサンプリングした結果の部分を自分で取り出して渡す必要があります。

例えば、muの方であればこのようになります。

az.plot_dist(trace.posterior['mu'].values.ravel())
plt.show()

結果がこちら。

これはpyMCの結果以外でも汎用的に使えるやつなので一緒に紹介しました。

その他の補足

今回の記事では、例としたモデルが非常に単純だったので使いませんでしたが、大規模なものになると全変数表示すると潰れてしまって読み取れないということが起きます。
そのような場合、それぞれのメソッドが var_names という引数で出力する変数を絞り込めるようになっているで使ってみてください。

また、多くのメソッドは ax 引数などを受け取れるようになっているで、出力先のaxを指定してmatplotlibの機能で出力を加工することなどもできます。

それ以外にも各メソッドさまざまなオプションを持っているのでぜひドキュメントを参照しながら使いこなしてみてください。

PyMC version 5 超入門

半年ほど前から、PyMCを使うようになりました。だいぶ慣れてきたのでこれから数回の記事でPyMCの入門的な内容をまとめていこうと思います。(記事執筆時間の制約等の要因で途中で違うテーマの記事を挟むかもしれませんができるだけ連続させたいです。)

PyMCとは

PyMCはPythonで書かれたオープンソースの確率的プログラミングライブラリです。ベイズ統計モデルを構築、分析し、複雑な統計的問題を解くことができます。PyMCのversion4の開発ではいろいろゴタゴタがありバージョン番号がスキップされたようですが、現在ではverison5がリリースされています。

公式ドキュメントはこちらです。
参考: Home — PyMC project website

特徴としては、線形モデルから複雑な階層モデルまで、幅広いモデルを柔軟に構築できることや、最新のMCMCアルゴリズムを利用して、効率的にサンプリングが行えることが挙げられます。

ただし、柔軟にモデルを構築できる反面で、自分でモデルの内容を実装しないといけないのでscikit-learnのような、既存のモデルをimportしてfit-predictさせたら完結するような単純なAPIにはなっていません。それでも、かなり直感的なAPIにはなっていると感じています。

サンプルコード

一番最初の記事なので、今回は本当に一番単純なサンプルコードを紹介します。これは正規分布に従う標本を生成して、そのパラメーターを推定するというものです。

ダミーデータは平均3, 標準偏差2の正規分布から取りました。

ベイズ推定するので、パラメーターに事前分布が必要です。これは平均の方は平均0、標準偏差10の正規分布、標準偏差の方はsigma=10の半正規分布を設定しました。

ダミーデータ生成からモデルの作成までが以下のコードです。

import pymc as pm
import numpy as np

# ダミーデータを生成
true_mu = 3
true_sigma = 2
np.random.seed(seed=10)
data = np.random.normal(true_mu, true_sigma, size=100)

with pm.Model() as model:
    # 事前分布を設定
    mu = pm.Normal('mu', mu=0, sigma=10)
    sigma = pm.HalfNormal('sigma', sigma=1)

    # 尤度関数を設定
    likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, observed=data)

ダミーデータの生成分は単純なのでいいですね。
その後が本番です。
PyMCでは、withを使ってコンテキストを生成し、その中に実際のコードを書いていきます。
pm.Normal や pm.HalfNormal など、さまざまな確率分布が用意されていますが、それを使って変数の事前分布を定義しています。

そして、そこで事前分布を設定された変数、mu, sigmaを使って最後の正規分布を定義し、観測値(observed)として用意したダミーデータを渡しています。

Graphvizを導入している環境の場合、次のようにしてモデルを可視化できます。
これはコンテキストの外で行えるので注意してください。(displayしていますが、これはjupyter上に表示することを想定しています。)

g = pm.model_to_graphviz(model)
display(g)

出力結果がこちらです。

モデルが出来上がったらサンプリングを行います。

サンプリングはコンテキスト内で、pm.sample()メソッドを呼び出すことで行います。
引数としては初期の捨てるサンプル数(tune)と、分析に利用するサンプル数(draws)、さらにサンプル値系列を幾つ生成するかを示すchainsを渡します。

with model:
    trace = pm.sample(
        draws=1000,
        tune=1000,
        chains=4,
    )

# 以下出力。時間がかかる処理の場合プログレスバーも見れるので助かります。
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [mu, sigma]

 100.00% [8000/8000 00:00<00:00 Sampling 4 chains, 0 divergences]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.

サンプルが終わったら要約を表示します。

# サンプルの要約を表示
summary = pm.summary(trace)
print(summary)

# 結果
	mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
mu	3.155	0.191	2.792	3.506	0.003	0.002	3867.0	2867.0	1.0
sigma	1.933	0.135	1.687	2.191	0.002	0.002	3927.0	3150.0	1.0

meanの値を見ると、それぞれ真の値に結構近い値が得られていますね。

以上が、本当に一番シンプルなPyMCの使い方の記事でした。

今後の記事ではもう少し細かい仕様の話や発展的な使い方、ArviZという専用の可視化ライブラリの話などを紹介していきたいと思います。

J-Quants APIのページング処理に対応する

久々にJ-Quants API の記事です。もう結構前の話(2023/06/16)の話ですが、J-Quants APIはデータ量の増加位に対応するためにページング処理というものが導入されました。
参考: お知らせ – J-Quants API の 過去のお知らせ部分見てください。

要するにAPIから取得できるデータの量が多い時に、全部のデータを一度では取得できず、一部分だけ取得できるって話ですね。

こちらについて利用方法を記事にしておきます。

ページング処理対応方法

詳しくはこちらをご参照ください。
参考: API共通の留意事項 – J-Quants API

レスポンスが帰ってきた時、結果にpagination_key が含まれていたらページング(ページネーション)が発生しており、そこで得られた結果は取得したかったデータの全量ではありません。
得られたpagination_keyの値を付与して再度リクエストすることで以降のデータを得ることができます。

サンプルコード参照してやってみましょう。
ちなみに、認証にidトークンが必要ですがその取得方法は僕の過去記事参照してください。
参考: J-Quants API の基本的な使い方
以下の記事では、 id_token って変数にすでにトークンが取得できているものとします。

import json
import requests
import pandas as pd


print(len(id_token))  # id_tokenは過去記事の方法ですでに取得してるとします。(文字数確認)
# 1107

# 特定の日付の4本値を取得する
date = "2024-03-15"
daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}"
headers = {"Authorization": f"Bearer {id_token}"}
daily_quotes_result = requests.get(daily_quotes_url, headers=headers)

# レスポンスに、pagination_key が含まれていることが確認できる。
print(daily_quotes_result.json().keys())
# dict_keys(['daily_quotes', 'pagination_key'])

pagination_key = daily_quotes_result.json()["pagination_key"]

# pagination_key も付与してもう一度リクエストする。
daily_quotes_url_2 = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}&pagination_key={pagination_key}"
daily_quotes_result_2 = requests.get(daily_quotes_url_2, headers=headers)

# 今度は、pagination_keyは含まれていない。
print(daily_quotes_result_2.json().keys())
# dict_keys(['daily_quotes'])

# それぞれデータが得られている。
len(daily_quotes_result.json()["daily_quotes"]),  len(daily_quotes_result_2.json()["daily_quotes"])
# (4030, 312)

# それぞれ配列型のデータなので + で連結できる。
# DataFrame化までついでに行った。
df = pd.DataFrame(daily_quotes_result.json()["daily_quotes"]
                  + daily_quotes_result_2.json()["daily_quotes"])

print(len(df))
# 4342

1回目のリクエストでは、本当は4342件得られるはずだったデータのうち、4030件しか取得できてなかったことがわかりますね。そして、pagination_keyを合わせて送信することで、続きを取得できています。

上記のサンプルコードはわかりやすさ優先のため、2回で全部取得できると決め打ちしていますが、実際は2回目のリクエストでもpagination_keyが戻ってくる可能性があります。

そのため、実際の運用ではドキュメントのコードのようにpagination_keyがなくなるまでループするような実装にすると良いでしょう。

# 特定の日付の4本値を取得する
date = "2024-03-15"
daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}"
headers = {"Authorization": f"Bearer {id_token}"}
daily_quotes_result = requests.get(daily_quotes_url, headers=headers)

# 1回目のレスポンスで得られたdata
data = daily_quotes_result.json()["daily_quotes"]

# pagination_keyが含まれている限りはループする。
while "pagination_key" in daily_quotes_result.json():
    pagination_key = daily_quotes_result.json()["pagination_key"]
    daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}&pagination_key={pagination_key}"
    daily_quotes_result = requests.get(daily_quotes_url, headers=headers)
    # 得られたデータを連結する。
    data += daily_quotes_result.json()["daily_quotes"]


# データが揃っている。
print(len(data))
# 4342

これで、J-Quants APIのページング処理にも対応できました。

pandas.qcutでデータを分位数で離散化する

今回の記事ではpandasのqcutという関数を紹介します。
参考: pandas.qcut — pandas 2.2.1 ドキュメント

記事タイトルに書いていますが、これは分位数に基づいてデータを離散化する関数です。
実は以前、数値の区間で区切って離散化するpandas.cutというのを紹介したことがあるのですが、その仲間みたいなものですね。僕はつい最近までqcutを知りませんでしたが。
参考: pandasで数値データを区間ごとに区切って数える

cutでは数字の絶対値を基準に、0以上100未満、100以上200未満、みたいにデータを切り分けることができましたが、qcutでは分位数(パーセンタイル)を基準にデータを分けることができます。要するに、4つに分けるのであれば、25%以下、50%以下、75%以下、それより上、みたいにデータを区切り、各区切りには大体同じ件数のデータが分類されます。

cutだったら区間の幅が揃い、qcutだったら各区間に含まれるデータの件数が揃うというのが一番簡潔な説明ですね。

適当に乱数を使ってやってみましょう。ポアソン分布で200個ほどデータを作って、q_cutで5つのグループに分けてみます。

import pandas as pd
from scipy.stats import poisson  # テストデータ生成用


# λ=100のポアソン分布に従う乱数を200個生成
data = poisson(mu=100).rvs(size=200, random_state=0)
print(data[:10])  # 最初の10項表示
# [101 103  98  98 127 109 102  82  99  86]

# データと区切りたいグループの個数を指定して実行
out = pd.qcut(data, q=5)
# 各データがそれが含まれる区間お
print(out)
"""
[(97.0, 102.0], (102.0, 107.0], (97.0, 102.0], (97.0, 102.0], (107.0, 127.0], ..., (102.0, 107.0], (102.0, 107.0], (92.0, 97.0], (92.0, 97.0], (78.999, 92.0]]
Length: 200
Categories (5, interval[float64, right]): [(78.999, 92.0] < (92.0, 97.0] < (97.0, 102.0] < (102.0, 107.0] < (107.0, 127.0]]
"""

データの先頭の方と、あと、結果をprintして表示されたやつを上のコードに出しました。Categories として5つの区間が表示されていますが、「それぞれのデータがどの区間に含まれているのか」に変換されたものが得られていますね。例えば最初のデータは101ですが、これは区間(97, 102] に含まれます。

区間にラベルをつけることもできます。低い方からL1, L2, L3 みたいにつけていく場合はlabel引数にqで指定した数と同じ要素数の配列を渡して実現します。(今回文字列でサンプル作っていますが、数値をラベルにすることもできます。)

# ラベルを指定する
out = pd.qcut(data, q=5, labels=["L1", "L2", "L3", "L4", "L5"])
print(out)
"""
['L3', 'L4', 'L3', 'L3', 'L5', ..., 'L4', 'L4', 'L2', 'L2', 'L1']
Length: 200
Categories (5, object): ['L1' < 'L2' < 'L3' < 'L4' < 'L5']
"""

変換後のデータとして扱いやすそうな形で結果が得られました。

ただ、それぞれのラベルの区間がこれだとわからないですね。区間の情報を別途得る必要があるのでその場合はretbins引数にTrueを渡して、結果を受け取るときにもう一個変数を用意して受け取ることで、区切り位置の譲歩を得ることもできます。もちろん、labelsは使わずに、retbinsだけ指定することもできますよ。

# ラベルを指定する
out, bins = pd.qcut(data, q=5, labels=["L1", "L2", "L3", "L4", "L5"], retbins=True)
print(out)
"""
['L3', 'L4', 'L3', 'L3', 'L5', ..., 'L4', 'L4', 'L2', 'L2', 'L1']
Length: 200
Categories (5, object): ['L1' < 'L2' < 'L3' < 'L4' < 'L5']
"""
# 区切り位置の情報
print(bins)
# [ 79.  92.  97. 102. 107. 127.]

最後に注意です。qcutを使うと連続値のデータは大体同じ個数ずつに分けてくれることが多くそれが目的で使うことが多くなるのですが、今回の例のように整数値など離散な値しか取らない場合はそうでもなくなってきます。今回乱数で発生したデータはちょうど区切り位置の107が10個も混ざってた等々の事情で、ちょっとだけ偏りが出ています。実際に使う場合はこのあたりの結果もよく注意してみてください。

print(out.value_counts())
"""
L1    44
L2    39
L3    39
L4    41
L5    37
dtype: int64
"""

np.vectorizeで関数をベクトル化する

NumPyやScyPyの関数って非常に便利で、NumPy配列(要するにArray)を渡すと空気を読んでその渡したデータの各要素に関数を適用してNumPy配列で結果を返してくれたりします。

自分で定義した関数でもNumPyやSciPyの関数の組み合わせで作った関数であれば結構そのように動いてくれるのですが、文字列操作が入ったりif文による分岐等があると必ずしもそうはならず、スカラー値を受け取ってスカラー値を返すだけの関数になることがあります。

そのような関数を、手軽にベクトルか対応することができる方法があるのでこの記事で紹介します。

それが、記事タイトルのnp.vectorizeです。

ドキュメント: numpy.vectorize — NumPy v1.26 Manual

関数を渡すと戻り値で新しい関数オブジェクトが帰ってきてそれがベクトル対応(配列対応)しています。

基本的な使い方

数学関数だと特にArrayを渡すと元々期待通り動いたりするので、少々無理矢理な例ですが文字列操作の関数を作ってお見せします。これは数値を1個受けとって、その数値に、「回目」っていいう単位をつけて返すだけの関数です。普通に実験、そのまま配列渡してみる、ベクとライズして配列を渡してみる、の3パターンやってみました。

import numpy as np


# 数値に単位をつける関数を実装
def number_format(n):
    return f"{n}回目"


# 数値を渡すと想定通り動く
print(number_format(5))
# 5回目

# 配列を渡すと配列を一個の値とみなして文字列化して単位をつけてしまう。
print(number_format([1, 2, 3]))
# [1, 2, 3]回目

# ベクトル化した関数を作る
number_format_vec = np.vectorize(number_format)

# それに配列を渡すと配列の各要素に元の関数を適用してくれる。
print(number_format_vec([1, 2, 3]))
# ['1回目' '2回目' '3回目']

# Array型もタプルもいける
print(number_format_vec(np.array([1, 2, 3])))
# ['1回目' '2回目' '3回目']
print(number_format_vec((1, 2, 3)))
# ['1回目' '2回目' '3回目']

# もちろん、内包表記で同じことをすることは可能(ただし、この結果はlist)
print([number_format(n) for n in [1, 2, 3]])
# ['1回目', '2回目', '3回目']

ベクトル化した関数を1回しか使わないなら内包表記で済ましちゃっていいんじゃないかな、と思うのですが、何度も利用したい関数であればnp.vectorizeを使うと言う選択肢もあるのかな、と思います。

注意点

NumPyやSciPyで実装されている関数群って並列処理できる部分は並列処理するような賢い実装になっていることがありますが、この np.vectorize はそこまで気が利いたものではありません。どうやら単純にfor文で順次処理するようになるだけらしいので処理の高速化等の効果はありません。ドキュメントにも利便性のためのもので、パフォーマンスのため使うようなものではなく、for loop回してるだけだって書いてありますね。

そのため、本当に頻繁に大規模なベクトルを処理する関数なのであれば別の方法で対応させる必要があるでしょう。

もう一点、細かいですが戻り値がNumPyのarrayであることも注意が必要ですね。と言ってもこれは便利に感じることが多いですが。内包表記であればlistで結果が得られますがvectorizeするとlist渡してもlistではなくarrayで帰ってきます。

引数を複数受け取る関数の場合

この np.vectorize は引数を複数受け取る関数にも対応しています。ドキュメントのサンプルもa, b の2変数受け取っていますしね。一応その例も見ておきましょう。年と月の数値を受け取って何年何月、という文字列返す関数でやってみます。

def month_str(year, month):
    return (f"{year}年{month}月")


month_str_vec = np.vectorize(month_str)

# 元の関数はyear, monthは1個ずつしか値を受け取れない
print(month_str([2020, 2023, 2026], [1, 4, 7]))
# [2020, 2023, 2026]年[1, 4, 7]月

# ベクトル化すると複数ペアをまとめて処理できる。
print(month_str_vec([2020, 2023, 2026], [1, 4, 7]))
# ['2020年1月' '2023年4月' '2026年7月']

# 片方は配列で、片方はスカラーというパターンにも対応する
print(month_str_vec([2020, 2023, 2026], 1))
# ['2020年1月' '2023年1月' '2026年1月']

さいごに

以上が手軽に関数をベクトル化する方法でした。まぁ、内包表記もあればmapを使うやり方もあるのでこれが必須というわけではないのですがいい感じに動く関数を手軽に作る方法として頭の片隅に置いておくと使う場面はあるんじゃないかなと思います。

ちなみに、関数を定義した直後にベクトル化した関数で元の関数名を上書きしておくと、最初っからベクトル化した関数を宣言したのと同じように使えますよ。

def func(x):
    # 何かの処理


func = np.vectorize(func)
# 以降に呼び出されるfuncはベクトル対応した関数。