pyMCで線形回帰分析

pyMCの記事4記事目です。これまでの記事では観測値だけ与えてそれを生成する確率分布を考えてきましたが、今回は観測値だけでなく何か特徴量を持つデータを考えます。
その最も単純な例として1変数の線形回帰をやってみましょう。特徴量が増えて重回帰分析になってもほとんど同じように対応できるので汎用性は高いと思います。

データの準備

何のデータを使ってもいいのですが、今回はscikit-learnのiris使います。3種類のアヤメのうち、virginicaに絞って、petal length (cm)からsepal length (cm)を予測するモデルを考えてみましょう。(相関係数が0.86くらいあって予測が簡単なのです。)

次のようにしてデータを取得します。

%%pycodestyle
import pandas as pd
from sklearn.datasets import load_iris


# irisデータ取得
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df["label"] = iris.target
# virginica に絞る
df = df[df.label == 2].reset_index(drop=True)

# 回帰分析に使う特徴量xと目的変数yを取得
x = df["petal length (cm)"].values
y = df["sepal length (cm)"].values

モデルの実装

データが揃ったらpyMCでモデルを作っていきます。

回帰係数を a, 定数項をb、誤差をε とするとモデルの式はこのようになりますね。
$$y= ax + c + \epsilon$$

ベイズで行いますので、それぞれに事前分布が必要です。
a, c の事前分布は期待値が0、標準偏差が10の正規分布としましょう。
そして、誤差項εは、期待値が0で、標準偏差がσの正規分布に従うとし、このσは標準偏差が10の半正規分布に従うとします。

これを実装してきますが、今回新たに使うのは、特徴料等の定数を格納する pm.ConstantData と 数式を定義できるpm.Deterministic です。

参考:
pymc.ConstantData — PyMC dev documentation
pymc.Deterministic — PyMC v5.6.0 documentation

正確には、 ConstantData の方は使わなくてもいいのですが、明示的に書いておくとモデルを可視化した時に定数部分も表示されるので便利です。

実際にコードを見ていただくと使い方がわかると思うのでやっていきましょう。例によって、Graphvizで可視化してJpyterで表示しています。

import numpy as np
import matplotlib.pyplot as plt
import pymc as pm
import arviz as az


model = pm.Model()

with model:
    x_data = pm.ConstantData("x_data", x)
    y_data = pm.ConstantData("y_data", y)

    # 回帰係数と定数項
    a = pm.Normal("a", mu=0, sigma=10)
    c = pm.Normal("c", mu=0, sigma=10)

    # yの期待値
    mu = pm.Deterministic("mu", a*x_data + c)
    # 誤差
    sigma = pm.HalfNormal("sigma", sigma=10)

    # 観測値
    obs = pm.Normal("obs", mu=mu, sigma=sigma, observed=y_data)

# モデルの可視化
g = pm.model_to_graphviz(model)
display(g)

参考ですが、ConstantDataを使わない場合はこうなります。

model2 = pm.Model()

with model2:

    # 回帰係数と定数項
    a = pm.Normal("a", mu=0, sigma=10)
    c = pm.Normal("c", mu=0, sigma=10)

    # yの期待値
    mu = pm.Deterministic("mu", a*x + c)
    # 誤差
    sigma = pm.HalfNormal("sigma", sigma=10)

    # 観測値
    obs = pm.Normal("obs", mu=mu, sigma=sigma, observed=y)

g = pm.model_to_graphviz(model2)
display(g)

モデルができたのでサンプリングして結果を見ていきましょう。

with model:
    trace = pm.sample(random_seed=42, chains=2)


display(az.summary(trace, var_names=["a", "c", "sigma"]))
"""
	mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
a	0.995	0.084	0.828	1.143	0.003	0.002	641.0	636.0	1.0
c	1.066	0.470	0.216	1.988	0.019	0.013	648.0	679.0	1.0
sigma	0.331	0.035	0.259	0.391	0.001	0.001	554.0	349.0	1.0
"""

az.plot_trace(trace, var_names=["a", "c", "sigma"], compact=False)
plt.tight_layout()

いい感じに推定できていますね。

回帰直線の可視化

せっかく単回帰したので、回帰直線を可視化してみたいと思います。上記のsummaryのa,cで可視化してもいいのですがせっかくなのでサンプリングの各ステップの値で可視化してみましょう。

for y_pred in y_preds:
    plt.plot(x_values, y_pred, lw=1, alpha=0.01, c="c")
plt.scatter(x, y)
plt.show()

なかなか妥当な結果が得られましたね。

まとめ

今回は線形回帰を題材として取り上げましたが、線形回帰に限らず特徴量を使うモデリングは同じようにして実装していくことができます。pm.Deterministicを使うと一気に実装の幅が広がりますのでぜひ試してみてください。

pyMCを使用した事前分布からのサンプリング方法

pyMCの3つ目の記事です。

このブログではまだ超単純なものしか扱っていませんが、pyMCではかなり柔軟にモデルを構築できます。そうなってくると、自分が実装したモデルが妥当なものなのか、思うような事象を表現できているのかといったことをサンプリング前に確認しておきたくなるものだと思います。

そのような場合に備えて、pyMCでは、ベイズ推論を行う前に事前分布からそのままサンプリングを行い結果を確認するメソッドとして、 pm.sample_prior_predictive() というものが用意されています。
参考: pymc.sample_prior_predictive — PyMC dev documentation

使い方は簡単で、モデルを組んだ後にsample()の代わりにこれを呼び出すだけです。引数としてはサンプリングしたいデータ数を渡します。

では具体的にやってみましょう。今回は二項分布あたりを扱ってみましょうかね。$n=10$くらいで、$p$は0から1の一様分布からサンプリングしてみましょう。通常二項分布って$np$を期待値として山形になるのですが、$p$が固定されていないのでそうはならない例をお見せできると思います。

import pymc as pm
import arviz as az
import matplotlib.pyplot as plt


with pm.Model() as model:
    # 事前分布を設定
    p = pm.Uniform('p', lower=0, upper=1)
    y = pm.Binomial('y', n=10, p=p)

    # 事前分布からサンプリング
    prior_predictive = pm.sample_prior_predictive(samples=1000)

このサンプリングはかなり短時間で終わります。複雑なモデルの通常のサンプリングは結構時間がかかるので、その意味でも事前確認をしておくメリットはありますね。

サンプリングしたら結果を確認します。prior_predictive って変数に結果が確認されていますが、`prior_predictive.prior[“p”]`や`prior_predictive.prior[“y”]`のような形式でサンプリング結果を取り出せます。

arviz使って確認しましょう。

fig = plt.figure(figsize=(12, 5), facecolor="w")
ax = fig.add_subplot(1, 2, 1, title="p")
az.plot_dist(prior_predictive.prior["p"], ax=ax)
ax = fig.add_subplot(1, 2, 2, title="y")
az.plot_dist(prior_predictive.prior["y"], ax=ax)
plt.show()

結果がこちらです。

サンプルサイズ(1000と設定しました。デフォルトは500です。)が小さいのか、ちょっとpの分布がボコボコしていますが概ね想定通りの結果が得られましたね。

ArviZを使ってpyMCの推論結果を可視化する

pyMCの記事2記事目です。
前回非常にシンプルなモデルで推論をやりましたが、今回はその結果を可視化する便利なライブラリである、ArviZの紹介です。

ArviZはpyMCに限らず、ベイズモデルの分析や可視化、比較等を行うライブラリです。ベイズ推論の結果の分析に特化しているだけあって非常に多くの機能を持っています。
参考: ArviZ: Exploratory analysis of Bayesian models — ArviZ 0.18.0 documentation

今回は特に利用頻度が高いと思われるメソッドに絞って紹介していきます。

例としては前回の記事で作った単純なモデルを使います。前回のコードを走らせてサンプリングした結果が trace という変数に入ってるという前提で見ていってください。

参考: PyMC version 5 超入門

推論結果のサマリーをまとめる

最初に紹介するのは推論結果を統計値で返してくれる、az.summary()です。これだけは可視化ではない(グラフ等での表示ではない)のですがよく使うのでこの記事で紹介します。
参考: arviz.summary — ArviZ 0.18.0 documentation

pm.summary()とほとんど同じ挙動なのですが、トレース結果の統計値をDataFrame形式で返してくれます。

import pymc as pm
import arviz as az


az.summary(trace)
# 以下結果
"""
mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
mu	3.157	0.192	2.774	3.501	0.003	0.002	4182.0	2738.0	1.0
sigma	1.933	0.133	1.687	2.184	0.002	0.002	3934.0	3009.0	1.0
"""

各変数の平均値等が確認できるので便利ですね。

サンプルの系列(トレース)を可視化する

次に紹介するのは az.plot_trace()です。MCMCの可視化としては一番ポピュラーなやつではないでしょうか。
参考: arviz.plot_trace — ArviZ 0.18.0 documentation

今回用意している例では2変数を4系列で1000ステップサンプリングしていますので、その分布とトレースを一気に可視化してくれます。
手元で試したところ、ちょっとラベルが重なっていたので、matplotlibのメソッドを一つ呼び出して調整しています。

import matplotlib.pyplot as plt


az.plot_trace(trace)
plt.tight_layout()
plt.show()

出力結果がこちらです。

いい感じですね。

事後分布を可視化する

さっきのトレースの左半分にも表示されてはいるのですが、本当に欲しい結果は事後分布です。それを表示することに特化しているのが az.plot_posterior() です。
参考: arviz.plot_posterior — ArviZ 0.18.0 documentation

やってみます。

az.plot_posterior(trace)
plt.show()

出力がこちら。事後分布と変数名、期待値等やhdiなどを可視化してくれましたね。

フォレストプロットで可視化する

フォレストプロットとは何か?というのは実物を見ていただいた方が早いと思うのでやってみます。2変数なのでいまいちありが分かりにくいと思いますが、変数の数が多いとこれは非常に便利です。
参考: arviz.plot_forest — ArviZ 0.18.0 documentation

az.plot_forest(trace)
plt.show()

結果がこちら。

サンプリングの系列ごとに可視化してくれていますね。引数で、 combined=True を一緒に渡すと、系列をまとめて変数ごとに集計してくれますよ。

kind等の引数で見た目を変えていくこともできるのでドキュメントを参照していろいろ試してみてください。

分布を可視化する

最後は、ちょっと特殊です。AzviZにはnumpyの配列などを受け取って単純に分布を表示するメソッドなども用意されています。それが、az.plot_dist()です。
参考: arviz.plot_dist — ArviZ 0.18.0 documentation

これは、numpy配列(要するにarray)を受け取るので、先ほどまでの例のようにtraceをそのまま渡せません。pyMCの事後分布を可視化したいのであれば、traceからサンプリングした結果の部分を自分で取り出して渡す必要があります。

例えば、muの方であればこのようになります。

az.plot_dist(trace.posterior['mu'].values.ravel())
plt.show()

結果がこちら。

これはpyMCの結果以外でも汎用的に使えるやつなので一緒に紹介しました。

その他の補足

今回の記事では、例としたモデルが非常に単純だったので使いませんでしたが、大規模なものになると全変数表示すると潰れてしまって読み取れないということが起きます。
そのような場合、それぞれのメソッドが var_names という引数で出力する変数を絞り込めるようになっているで使ってみてください。

また、多くのメソッドは ax 引数などを受け取れるようになっているで、出力先のaxを指定してmatplotlibの機能で出力を加工することなどもできます。

それ以外にも各メソッドさまざまなオプションを持っているのでぜひドキュメントを参照しながら使いこなしてみてください。

PyMC version 5 超入門

半年ほど前から、PyMCを使うようになりました。だいぶ慣れてきたのでこれから数回の記事でPyMCの入門的な内容をまとめていこうと思います。(記事執筆時間の制約等の要因で途中で違うテーマの記事を挟むかもしれませんができるだけ連続させたいです。)

PyMCとは

PyMCはPythonで書かれたオープンソースの確率的プログラミングライブラリです。ベイズ統計モデルを構築、分析し、複雑な統計的問題を解くことができます。PyMCのversion4の開発ではいろいろゴタゴタがありバージョン番号がスキップされたようですが、現在ではverison5がリリースされています。

公式ドキュメントはこちらです。
参考: Home — PyMC project website

特徴としては、線形モデルから複雑な階層モデルまで、幅広いモデルを柔軟に構築できることや、最新のMCMCアルゴリズムを利用して、効率的にサンプリングが行えることが挙げられます。

ただし、柔軟にモデルを構築できる反面で、自分でモデルの内容を実装しないといけないのでscikit-learnのような、既存のモデルをimportしてfit-predictさせたら完結するような単純なAPIにはなっていません。それでも、かなり直感的なAPIにはなっていると感じています。

サンプルコード

一番最初の記事なので、今回は本当に一番単純なサンプルコードを紹介します。これは正規分布に従う標本を生成して、そのパラメーターを推定するというものです。

ダミーデータは平均3, 標準偏差2の正規分布から取りました。

ベイズ推定するので、パラメーターに事前分布が必要です。これは平均の方は平均0、標準偏差10の正規分布、標準偏差の方はsigma=10の半正規分布を設定しました。

ダミーデータ生成からモデルの作成までが以下のコードです。

import pymc as pm
import numpy as np

# ダミーデータを生成
true_mu = 3
true_sigma = 2
np.random.seed(seed=10)
data = np.random.normal(true_mu, true_sigma, size=100)

with pm.Model() as model:
    # 事前分布を設定
    mu = pm.Normal('mu', mu=0, sigma=10)
    sigma = pm.HalfNormal('sigma', sigma=1)

    # 尤度関数を設定
    likelihood = pm.Normal('likelihood', mu=mu, sigma=sigma, observed=data)

ダミーデータの生成分は単純なのでいいですね。
その後が本番です。
PyMCでは、withを使ってコンテキストを生成し、その中に実際のコードを書いていきます。
pm.Normal や pm.HalfNormal など、さまざまな確率分布が用意されていますが、それを使って変数の事前分布を定義しています。

そして、そこで事前分布を設定された変数、mu, sigmaを使って最後の正規分布を定義し、観測値(observed)として用意したダミーデータを渡しています。

Graphvizを導入している環境の場合、次のようにしてモデルを可視化できます。
これはコンテキストの外で行えるので注意してください。(displayしていますが、これはjupyter上に表示することを想定しています。)

g = pm.model_to_graphviz(model)
display(g)

出力結果がこちらです。

モデルが出来上がったらサンプリングを行います。

サンプリングはコンテキスト内で、pm.sample()メソッドを呼び出すことで行います。
引数としては初期の捨てるサンプル数(tune)と、分析に利用するサンプル数(draws)、さらにサンプル値系列を幾つ生成するかを示すchainsを渡します。

with model:
    trace = pm.sample(
        draws=1000,
        tune=1000,
        chains=4,
    )

# 以下出力。時間がかかる処理の場合プログレスバーも見れるので助かります。
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [mu, sigma]

 100.00% [8000/8000 00:00<00:00 Sampling 4 chains, 0 divergences]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.

サンプルが終わったら要約を表示します。

# サンプルの要約を表示
summary = pm.summary(trace)
print(summary)

# 結果
	mean	sd	hdi_3%	hdi_97%	mcse_mean	mcse_sd	ess_bulk	ess_tail	r_hat
mu	3.155	0.191	2.792	3.506	0.003	0.002	3867.0	2867.0	1.0
sigma	1.933	0.135	1.687	2.191	0.002	0.002	3927.0	3150.0	1.0

meanの値を見ると、それぞれ真の値に結構近い値が得られていますね。

以上が、本当に一番シンプルなPyMCの使い方の記事でした。

今後の記事ではもう少し細かい仕様の話や発展的な使い方、ArviZという専用の可視化ライブラリの話などを紹介していきたいと思います。

J-Quants APIのページング処理に対応する

久々にJ-Quants API の記事です。もう結構前の話(2023/06/16)の話ですが、J-Quants APIはデータ量の増加位に対応するためにページング処理というものが導入されました。
参考: お知らせ – J-Quants API の 過去のお知らせ部分見てください。

要するにAPIから取得できるデータの量が多い時に、全部のデータを一度では取得できず、一部分だけ取得できるって話ですね。

こちらについて利用方法を記事にしておきます。

ページング処理対応方法

詳しくはこちらをご参照ください。
参考: API共通の留意事項 – J-Quants API

レスポンスが帰ってきた時、結果にpagination_key が含まれていたらページング(ページネーション)が発生しており、そこで得られた結果は取得したかったデータの全量ではありません。
得られたpagination_keyの値を付与して再度リクエストすることで以降のデータを得ることができます。

サンプルコード参照してやってみましょう。
ちなみに、認証にidトークンが必要ですがその取得方法は僕の過去記事参照してください。
参考: J-Quants API の基本的な使い方
以下の記事では、 id_token って変数にすでにトークンが取得できているものとします。

import json
import requests
import pandas as pd


print(len(id_token))  # id_tokenは過去記事の方法ですでに取得してるとします。(文字数確認)
# 1107

# 特定の日付の4本値を取得する
date = "2024-03-15"
daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}"
headers = {"Authorization": f"Bearer {id_token}"}
daily_quotes_result = requests.get(daily_quotes_url, headers=headers)

# レスポンスに、pagination_key が含まれていることが確認できる。
print(daily_quotes_result.json().keys())
# dict_keys(['daily_quotes', 'pagination_key'])

pagination_key = daily_quotes_result.json()["pagination_key"]

# pagination_key も付与してもう一度リクエストする。
daily_quotes_url_2 = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}&pagination_key={pagination_key}"
daily_quotes_result_2 = requests.get(daily_quotes_url_2, headers=headers)

# 今度は、pagination_keyは含まれていない。
print(daily_quotes_result_2.json().keys())
# dict_keys(['daily_quotes'])

# それぞれデータが得られている。
len(daily_quotes_result.json()["daily_quotes"]),  len(daily_quotes_result_2.json()["daily_quotes"])
# (4030, 312)

# それぞれ配列型のデータなので + で連結できる。
# DataFrame化までついでに行った。
df = pd.DataFrame(daily_quotes_result.json()["daily_quotes"]
                  + daily_quotes_result_2.json()["daily_quotes"])

print(len(df))
# 4342

1回目のリクエストでは、本当は4342件得られるはずだったデータのうち、4030件しか取得できてなかったことがわかりますね。そして、pagination_keyを合わせて送信することで、続きを取得できています。

上記のサンプルコードはわかりやすさ優先のため、2回で全部取得できると決め打ちしていますが、実際は2回目のリクエストでもpagination_keyが戻ってくる可能性があります。

そのため、実際の運用ではドキュメントのコードのようにpagination_keyがなくなるまでループするような実装にすると良いでしょう。

# 特定の日付の4本値を取得する
date = "2024-03-15"
daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}"
headers = {"Authorization": f"Bearer {id_token}"}
daily_quotes_result = requests.get(daily_quotes_url, headers=headers)

# 1回目のレスポンスで得られたdata
data = daily_quotes_result.json()["daily_quotes"]

# pagination_keyが含まれている限りはループする。
while "pagination_key" in daily_quotes_result.json():
    pagination_key = daily_quotes_result.json()["pagination_key"]
    daily_quotes_url = f"https://api.jquants.com/v1/prices/daily_quotes?date={date}&pagination_key={pagination_key}"
    daily_quotes_result = requests.get(daily_quotes_url, headers=headers)
    # 得られたデータを連結する。
    data += daily_quotes_result.json()["daily_quotes"]


# データが揃っている。
print(len(data))
# 4342

これで、J-Quants APIのページング処理にも対応できました。

pandas.qcutでデータを分位数で離散化する

今回の記事ではpandasのqcutという関数を紹介します。
参考: pandas.qcut — pandas 2.2.1 ドキュメント

記事タイトルに書いていますが、これは分位数に基づいてデータを離散化する関数です。
実は以前、数値の区間で区切って離散化するpandas.cutというのを紹介したことがあるのですが、その仲間みたいなものですね。僕はつい最近までqcutを知りませんでしたが。
参考: pandasで数値データを区間ごとに区切って数える

cutでは数字の絶対値を基準に、0以上100未満、100以上200未満、みたいにデータを切り分けることができましたが、qcutでは分位数(パーセンタイル)を基準にデータを分けることができます。要するに、4つに分けるのであれば、25%以下、50%以下、75%以下、それより上、みたいにデータを区切り、各区切りには大体同じ件数のデータが分類されます。

cutだったら区間の幅が揃い、qcutだったら各区間に含まれるデータの件数が揃うというのが一番簡潔な説明ですね。

適当に乱数を使ってやってみましょう。ポアソン分布で200個ほどデータを作って、q_cutで5つのグループに分けてみます。

import pandas as pd
from scipy.stats import poisson  # テストデータ生成用


# λ=100のポアソン分布に従う乱数を200個生成
data = poisson(mu=100).rvs(size=200, random_state=0)
print(data[:10])  # 最初の10項表示
# [101 103  98  98 127 109 102  82  99  86]

# データと区切りたいグループの個数を指定して実行
out = pd.qcut(data, q=5)
# 各データがそれが含まれる区間お
print(out)
"""
[(97.0, 102.0], (102.0, 107.0], (97.0, 102.0], (97.0, 102.0], (107.0, 127.0], ..., (102.0, 107.0], (102.0, 107.0], (92.0, 97.0], (92.0, 97.0], (78.999, 92.0]]
Length: 200
Categories (5, interval[float64, right]): [(78.999, 92.0] < (92.0, 97.0] < (97.0, 102.0] < (102.0, 107.0] < (107.0, 127.0]]
"""

データの先頭の方と、あと、結果をprintして表示されたやつを上のコードに出しました。Categories として5つの区間が表示されていますが、「それぞれのデータがどの区間に含まれているのか」に変換されたものが得られていますね。例えば最初のデータは101ですが、これは区間(97, 102] に含まれます。

区間にラベルをつけることもできます。低い方からL1, L2, L3 みたいにつけていく場合はlabel引数にqで指定した数と同じ要素数の配列を渡して実現します。(今回文字列でサンプル作っていますが、数値をラベルにすることもできます。)

# ラベルを指定する
out = pd.qcut(data, q=5, labels=["L1", "L2", "L3", "L4", "L5"])
print(out)
"""
['L3', 'L4', 'L3', 'L3', 'L5', ..., 'L4', 'L4', 'L2', 'L2', 'L1']
Length: 200
Categories (5, object): ['L1' < 'L2' < 'L3' < 'L4' < 'L5']
"""

変換後のデータとして扱いやすそうな形で結果が得られました。

ただ、それぞれのラベルの区間がこれだとわからないですね。区間の情報を別途得る必要があるのでその場合はretbins引数にTrueを渡して、結果を受け取るときにもう一個変数を用意して受け取ることで、区切り位置の譲歩を得ることもできます。もちろん、labelsは使わずに、retbinsだけ指定することもできますよ。

# ラベルを指定する
out, bins = pd.qcut(data, q=5, labels=["L1", "L2", "L3", "L4", "L5"], retbins=True)
print(out)
"""
['L3', 'L4', 'L3', 'L3', 'L5', ..., 'L4', 'L4', 'L2', 'L2', 'L1']
Length: 200
Categories (5, object): ['L1' < 'L2' < 'L3' < 'L4' < 'L5']
"""
# 区切り位置の情報
print(bins)
# [ 79.  92.  97. 102. 107. 127.]

最後に注意です。qcutを使うと連続値のデータは大体同じ個数ずつに分けてくれることが多くそれが目的で使うことが多くなるのですが、今回の例のように整数値など離散な値しか取らない場合はそうでもなくなってきます。今回乱数で発生したデータはちょうど区切り位置の107が10個も混ざってた等々の事情で、ちょっとだけ偏りが出ています。実際に使う場合はこのあたりの結果もよく注意してみてください。

print(out.value_counts())
"""
L1    44
L2    39
L3    39
L4    41
L5    37
dtype: int64
"""

np.vectorizeで関数をベクトル化する

NumPyやScyPyの関数って非常に便利で、NumPy配列(要するにArray)を渡すと空気を読んでその渡したデータの各要素に関数を適用してNumPy配列で結果を返してくれたりします。

自分で定義した関数でもNumPyやSciPyの関数の組み合わせで作った関数であれば結構そのように動いてくれるのですが、文字列操作が入ったりif文による分岐等があると必ずしもそうはならず、スカラー値を受け取ってスカラー値を返すだけの関数になることがあります。

そのような関数を、手軽にベクトルか対応することができる方法があるのでこの記事で紹介します。

それが、記事タイトルのnp.vectorizeです。

ドキュメント: numpy.vectorize — NumPy v1.26 Manual

関数を渡すと戻り値で新しい関数オブジェクトが帰ってきてそれがベクトル対応(配列対応)しています。

基本的な使い方

数学関数だと特にArrayを渡すと元々期待通り動いたりするので、少々無理矢理な例ですが文字列操作の関数を作ってお見せします。これは数値を1個受けとって、その数値に、「回目」っていいう単位をつけて返すだけの関数です。普通に実験、そのまま配列渡してみる、ベクとライズして配列を渡してみる、の3パターンやってみました。

import numpy as np


# 数値に単位をつける関数を実装
def number_format(n):
    return f"{n}回目"


# 数値を渡すと想定通り動く
print(number_format(5))
# 5回目

# 配列を渡すと配列を一個の値とみなして文字列化して単位をつけてしまう。
print(number_format([1, 2, 3]))
# [1, 2, 3]回目

# ベクトル化した関数を作る
number_format_vec = np.vectorize(number_format)

# それに配列を渡すと配列の各要素に元の関数を適用してくれる。
print(number_format_vec([1, 2, 3]))
# ['1回目' '2回目' '3回目']

# Array型もタプルもいける
print(number_format_vec(np.array([1, 2, 3])))
# ['1回目' '2回目' '3回目']
print(number_format_vec((1, 2, 3)))
# ['1回目' '2回目' '3回目']

# もちろん、内包表記で同じことをすることは可能(ただし、この結果はlist)
print([number_format(n) for n in [1, 2, 3]])
# ['1回目', '2回目', '3回目']

ベクトル化した関数を1回しか使わないなら内包表記で済ましちゃっていいんじゃないかな、と思うのですが、何度も利用したい関数であればnp.vectorizeを使うと言う選択肢もあるのかな、と思います。

注意点

NumPyやSciPyで実装されている関数群って並列処理できる部分は並列処理するような賢い実装になっていることがありますが、この np.vectorize はそこまで気が利いたものではありません。どうやら単純にfor文で順次処理するようになるだけらしいので処理の高速化等の効果はありません。ドキュメントにも利便性のためのもので、パフォーマンスのため使うようなものではなく、for loop回してるだけだって書いてありますね。

そのため、本当に頻繁に大規模なベクトルを処理する関数なのであれば別の方法で対応させる必要があるでしょう。

もう一点、細かいですが戻り値がNumPyのarrayであることも注意が必要ですね。と言ってもこれは便利に感じることが多いですが。内包表記であればlistで結果が得られますがvectorizeするとlist渡してもlistではなくarrayで帰ってきます。

引数を複数受け取る関数の場合

この np.vectorize は引数を複数受け取る関数にも対応しています。ドキュメントのサンプルもa, b の2変数受け取っていますしね。一応その例も見ておきましょう。年と月の数値を受け取って何年何月、という文字列返す関数でやってみます。

def month_str(year, month):
    return (f"{year}年{month}月")


month_str_vec = np.vectorize(month_str)

# 元の関数はyear, monthは1個ずつしか値を受け取れない
print(month_str([2020, 2023, 2026], [1, 4, 7]))
# [2020, 2023, 2026]年[1, 4, 7]月

# ベクトル化すると複数ペアをまとめて処理できる。
print(month_str_vec([2020, 2023, 2026], [1, 4, 7]))
# ['2020年1月' '2023年4月' '2026年7月']

# 片方は配列で、片方はスカラーというパターンにも対応する
print(month_str_vec([2020, 2023, 2026], 1))
# ['2020年1月' '2023年1月' '2026年1月']

さいごに

以上が手軽に関数をベクトル化する方法でした。まぁ、内包表記もあればmapを使うやり方もあるのでこれが必須というわけではないのですがいい感じに動く関数を手軽に作る方法として頭の片隅に置いておくと使う場面はあるんじゃないかなと思います。

ちなみに、関数を定義した直後にベクトル化した関数で元の関数名を上書きしておくと、最初っからベクトル化した関数を宣言したのと同じように使えますよ。

def func(x):
    # 何かの処理


func = np.vectorize(func)
# 以降に呼び出されるfuncはベクトル対応した関数。

SciPyで重積分

もう結構古い記事なのですが、以前SciPyで定積分をやる方法を記事にしたことがあります。
参考: scipyで定積分

最近、2変数関数の積分をやる機会があったのでこの機会に重積分をSciPyで行う方法を紹介します。SciPyのintegrateモジュールには、重積分用の関数が複数あります。
dblquad (2変数関数の定積分)
tplquad (3変数関数の定積分)
nquad (一般のn変数の定積分)

dblquadの使い方

順番に説明していきます。まずは2重積分のdblquadです。関数の定義は次のようになっています。
scipy.integrate.dblquad(func, a, b, gfun, hfun, args=(), epsabs=1.49e-08, epsrel=1.49e-08)

必須なのは、積分対象のfunc, 外側の積分区間のa, b、そして内側の積分区間を示す、gfun, hfunです。

gfunとhfunは名前からわかる通り、定数ではなく関数です。これにより内側の積分の積分区間を変数にすることができます。つまり以下のような積分区間の積分ができます。
$$\int_0^1\int_0^y xy \,dxdy$$

例に挙げたのでこれを実装してみましょう。ちなみに解は$1/8=0.125$です。funcの定義は、内側の関数を第1引数にする必要があるので注意してください。

from scipy.integrate import dblquad


def f1(x, y):
    return x*y


def x0(y):
    return 0


def x1(y):
    return y


print(dblquad(f1, 0, 1, x0, x1))
# (0.125, 5.515032205777789e-15)

想定通りですね。積分結果と推定誤差が返ってくるのは1変数の積分と同様です。

内側の積分区間も定数から定数までだよ、要するに長方形領域で積分したいよ、って場合はgfun, hfun に定数を返す関数を返してください。

tplquadの使い方

続いて、3変数向けのtplquadです。これもdblquadとかなり似てる感じで使えます。積分変数が一個増えているので上限加減の指定がもう一個ある感じです。
scipy.integrate.tplquad(func, a, b, gfun, hfun, qfun, rfun, args=(), epsabs=1.49e-08, epsrel=1.49e-08)

たとえば次の積分をやってみましょう。

$$\int_0^1\int_0^z\int_0^{y+z} xyz \,dxdydz.$$

ちなみに答えは$17/144=0.11805555…$となるはずです。

引数の順番に注意が必要なので慎重にコーディングしてください。

from scipy.integrate import tplquad


def f2(x, y, z):
    return x*y*z


def x0(y, z):
    return 0


def x1(y, z):
    return y + z


def y0(z):
    return 0


def y1(z):
    return z


print(tplquad(f2, 0, 1, y0, y1,  x0, x1))
# (0.11805555555555557, 2.1916761217856673e-14)

バッチリですね。

nquadの使い方

最後に一般のn変数を積分できるnquadの使い方を紹介します。

引数の形式が先ほどの二つと少し違います。
scipy.integrate.nquad(func, ranges, args=None, opts=None, full_output=False)

funcにn変数関数を渡して、rangesに積分区間を渡すことになります。rangesは配列で、1変数目から順番に区間の下限上限の2値の配列を格納しておけば良いです。また、ここにも一応関数を使うことはできます。

これはシンプルな例で、定数関数1を超立方体区間で積分してみました。

from scipy.integrate import nquad


def f3(w, x, y, z):
    return 1

print(nquad(f3, [[-2, 2], [-2, 2], [-2, 2], [-2, 2]]))
# (256.0, 2.8421709430404007e-12)

$4^4=256$になりましたね。

ここで急にシンプルな例を出したのには事情がありまして、変数の数が多くなるとやはり積分は困難なようで、ちょっと複雑な例になると規定の反復回数をこなしても必要な精度に届かずWarningが出たりするケースが多々あります。

どうしても計算したい場合は limit パラメーター等をいじっての対応になりますのでドキュメントを参照しながら調整してみてください。
(僕も実運用で必要になったら改めて調査して紹介しようと思います。)

Pythonの関数から一部の引数を固定して新しい関数を作る

Pythonの多くのライブラリの様々な関数が非常に汎用的に使えるように作られているので多くの引数を受け取れるようになっています。しかし、そのほとんどの引数を固定して1変数関数として使いたいなぁと思うようなことがあります。PandasのDataFrameのapplyなど関数を引数として受け取る関数に渡す場合等ですね。
また、大量にある引数のほとんどを固定して一部だけ変えながら何度も実行する、といった場面も考えられます。

lambda式などを作ってラップした新しい関数を実装してもいいのですが、 functoolsという標準ライブラリにその専用のpartial というメソッドが用意されています。
参考: functools.partial(func/*args**keywords)

このpartialを使うと、引数の一部を固定した引数の少ない新しい関数を作ってくれます。

一個目の引数に元になる関数を渡し、2個目以降の引数に渡したものが、元の関数の固定引数として使われます。keyword引数で渡せばそのkeyword引数が固定されます。

一引数の固定の方は先頭から順番に固定されるので注意してください。つまり2番目以降の引数を固定したい場合はそれらはキーワード引数として指定する必要があります。

サンプル

引数を順番に表示するだけの単純な関数を作ってやってみましょう。

from functools import partial


# 3つの引数を表示するだけの関数
def sample_func(a, b, c):
    print("a=", a)
    print("b=", b)
    print("c=", c)


# テスト実行
sample_func(1, 2, 3)
"""
a= 1
b= 2
c= 3
"""

# a = 10, b = 20 を固定した新しい関数が作られる。
partial_f = partial(sample_func, 10, 20)


# 3個目の引数 c = 50だけ渡して実行できる。
partial_f(50)
"""
a= 10
b= 20
c= 50
"""

# キーワード引数で固定することもできる。
partial_f2 = partial(sample_func, a=100, c=200)

# b の値だけ渡して実行できる
partial_f2(b=-5)
"""
a= 100
b= -5
c= 200
"""

キーワード引数を固定した関数を、位置引数で使う場合は注意が必要です。
たとえば、次のようにaを固定して生成した関数に、残り2個の引数を位置引数で渡すと、aを2回渡した扱いになってエラーが起きます。

# aを固定
partial_f3 = partial(sample_func, a=1)

# bとcのつもりで残り2個の引数を渡すとエラー
try:
    partial_f3(2, 3)
except Exception as e:
    print(e)
# sample_func() got multiple values for argument 'a'

# bとcもキーワード引数で渡す。
partial_f3(b=2, c=3)
"""
a= 1
b= 2
c= 3
"""

まとめ

ほぼ小ネタのような内容でしたが、自作関数をベースに一部の振る舞いを固定した簡易的な関数を作るとか、apply等の1変数関数を受け取るメソッドに渡したいとかそういう場面で役に立つことがあるテクニックとしてpartialを紹介しました。

scipyのstats配下の各種メソッドであれば、それぞれがパラメーターを固定するfrozenメソッドを持ってるとか、引数が多いなら引数を辞書にまとめて**(アスタリスク2個)で展開すればいいとか、ラップした関数を自分で実装したらいいとか、代用手段も多いのですが、partialを使うとその辺の記述がシンプルになるので機会があれば使ってみてください。

SciPyでニュートン法を利用する

前回の記事の二分法に続いて、もう一つ求根アルゴリズムを紹介します。
参考: 二分法を用いて関数の根を求める

今回紹介するのはニュートン法です。これは微分可能な関数$f(x)$の根を求めることができるアルゴリズムです。
参考: ニュートン法 – Wikipedia

詳しい説明は上記のWikipediaにあるので、ざっくりと概要を説明します。

この方法の背景にあるのは、滑らかな関数をある点の近くだけ着目してみるとほぼ直線になり、接線で近似できるということをベースのアイデアにしています。

つまり、微分可能な関数$f$があって$f(x)=0$だとします。その根$x$の近くに点$x_0$を取ると、$x_0$の近くでは、$f$と$f$の接線ってかなり近いよね、それなら$f$の根と$f$の$x_0$における接線の根って近いよね、っていうのが基本的なアイデアです。

関数$f$の$f(x_0)$における接戦は次の式で書けます。

$$y=f'(x_0)(x-x_0)+f(x_0).$$

$f(x)=0$は解けない場合でも、この接線の根は容易に算出することができ、

$$x_0-\frac{f(x_0)}{f'(x_0)}$$

と求まります。

この値は元の$x_0$よりも真の根に近いことが期待され、これをもう一回$x_0$とおいて同じ操作を繰り返せば真の根にたどり着く、というのがニュートン法です。

Wikipediaから画像拝借しますが、図で見るとイメージしやすいですね。

注意しないといけないのは、初期値$x_0$は真の根$x$の十分近くに取らないといけない点です。十分近くを見れば関数をその接線で近侍できるよね、というのがアイデアの前提なので、根が近くになかったらその前提が崩れてしまいこのアルゴリズムは真の根に収束しなくなってしまいます。

ニュートン法のメリットとデメリット

先に紹介した二分法と比べて、ニュートン法のメリットデメリットを説明していきます。

1番のメリットは収束の速さです。二分法に比べてより少ない計算回数で効率的に会を探索することができます。

また、初期値として与える点が1点だけで良いというのもメリットです。二分法の場合は初期値は区間で設定する必要がありましたからね。

その一方で複数のデメリットもあります。実装していて一番不便に感じるのはその関数だけでなく微分も必要ということでしょうか。もちろん微分不可能な関数ではニュートン法は使えません。

また、初期値が真の解に十分近くない場合や、微分した値が$0$に近い場合、うまく収束せずにアルゴリズムが失敗してしまう、という点も大きなデメリットです。

SciPyによる実装

SciPyではscipy.optimizeというモジュールで実装されています。newtonという専用メソッドを使うか、root_scalarという汎用的なメソッドで(method=’newton’)を指定して使うことになります。二分法と同じですね。

参考:
scipy.optimize.newton — SciPy v1.12.0 Manual
root_scalar(method=’newton’) — SciPy v1.12.0 Manual

二分法の時と同じように、$\sin$関数の根$\pi$を探索させてみましょう。微分は$\cos$なのでこれを使います。

from scipy import optimize
import numpy as np


root1 = optimize.newton(np.sin, x0=3, fprime=np.cos)
print(root1)
# 3.141592653589793

root_result = optimize.root_scalar(np.sin, method="newton", x0=3, fprime=np.cos)
print(root_result)
"""
      converged: True
           flag: 'converged'
 function_calls: 6
     iterations: 3
           root: 3.141592653589793
"""

print(root_result.root)
# 3.141592653589793

簡単ですね。

注目するのは、iterationsの部分です。たった3回のイテレーションで収束していて、関数が実行されたのは、fとfの微分合わせて6回だけです。
二分法の時は39回もイテレーションが必要だったのと大違いです。そして実はこの例では解の精度もニュートン法の方が高くなっています。

ニュートン法が失敗する例

初期値が真の解の近くにないと失敗するという話がありましたのでそちらも見ておきます。

例えば、タンジェントの逆関数、$\arctan$で試してみましょう。(sin, cosは根が無限にあって、根から遠い実数を用意できないので関数を変えます。)

$\arctan(x)$の微分は$\frac{1}{1+x^2}$です。

やってみました。

def f(x):
    return np.arctan(x)


def fprime(x):
    return 1/(1+x**2)


# 初期値が1なら収束する
root_result_1 = optimize.root_scalar(np.sin, method="newton", x0=1, fprime=fprime)
print(root_result_1)
"""
      converged: True
           flag: 'converged'
 function_calls: 12
     iterations: 6
           root: 0.0
"""

# 初期値が2だと失敗し、結果のflagが'convergence error'になる。
root_result_2 = optimize.root_scalar(np.sin, method="newton", x0=2, fprime=fprime)
print(root_result_2)
"""
      converged: False
           flag: 'convergence error'
 function_calls: 100
     iterations: 50
           root: 1.854706857103781
"""


# optimize.newton の方だと例外が上がる。
try:
    optimize.newton(f, fprime=fprime, x0=2)
except Exception as e:
    print(e)
# Derivative was zero. Failed to converge after 10 iterations, value is -6.999943395317963e+168.

失敗した時の振る舞いがそれぞれ違うので、どちらのコードを使うかで注意深く扱う必要がありますね。optimize.root_scalarはコード自体は正常に終了しますがフラグが立ち、optimize.newtonの方は例外があがります。

ちなみに、エラーの中で出てくるvalue の値、 -6.999943395317963e+168 は次のように自分でニュートン法を実装しても同じ値が出て来ます。

x0 = 2  # 初期値
for i in range(12):
    x0 = x0 - f(x0)/fprime(x0)
    print(i+1, "回目: x0=", x0)

"""
1 回目: x0= -3.535743588970453
2 回目: x0= 13.950959086927496
3 回目: x0= -279.34406653361754
4 回目: x0= 122016.9989179547
5 回目: x0= -23386004197.933937
6 回目: x0= 8.590766671950415e+20
7 回目: x0= -1.1592676698907411e+42
8 回目: x0= 2.110995587611039e+84
9 回目: x0= -6.999943395317963e+168
10 回目: x0= inf
11 回目: x0= nan
12 回目: x0= nan
"""

絶対値が大きくなり続けていて全く収束に向かっていないのがわかりますね。

まとめ

元の関数だけではなく導関数も必要だったり、初期値の設定段階である程度解の目星をつけておかないといけないなどのデメリットはありますが、速度や精度の面で優秀でしかもロジックもわかりやすい手法なので、何か機会があればニュートン法の活用を検討してみてください。