SciPyの optimize.minimizeを使って関数が最小値を取る点を探す

最近よく使っている、scipyの最適化関数の一つであるminimizeについて、まだ記事を書いてなかったので紹介します。

公式ドキュメントはこちらです。
参考: minimize — SciPy v1.14.1 Manual

これはタイトルの通りで、数値を返す関数を渡すとその関数が最小値をとる引数を探してくれるものです。ちなみに、最大値になる引数を探すメソッドはないので最大値を探したかったら、その関数に-1をかけて符号を反転させた関数を用意してください。

サクッと一つやってみましょう。正解がわかりやすいよう、2次関数でも例にとりましょうか。次の関数を使います。

$$x^2-6x+5 = (x-3)^2 -4 $$

平方完成から分かる通り、$x=3$で最小ですね。

import numpy as np
from scipy.optimize import minimize

# 目的関数の定義
def sample_function(x):
    return x**2 - 6*x + 5

# 初期値
x0 = 0

# 最適化の実行
result = minimize(sample_function, x0, method='L-BFGS-B')

# 結果の表示
print("最適化の結果:", result)
print("最小値をとる点 x:", result.x)
print("最小値 f(x):", result.fun)
"""
最適化の結果:   message: CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL
  success: True
   status: 0
      fun: -3.9999999999999982
        x: [ 3.000e+00]
      nit: 2
      jac: [ 1.776e-07]
     nfev: 6
     njev: 3
 hess_inv: <1x1 LbfgsInvHessProduct with dtype=float64>
最小値をとる点 x: [3.00000004]
最小値 f(x): -3.9999999999999982
"""

上のサンプルコードのように、最初の引数で最適化したい関数、2個目の引数で初期値、そしてオプションで利用するアルゴリズムなどの各種設定を行います。

ここで注意しないといけないのは、渡した関数の第1引数だけが最適化の対象ということです。なので、元の関数が複数の引数をとる多変数関数の場合、最適化したい引数等を一個の配列にまとめて渡す関数でラップしてあげる必要があります。また、最適化対象外の値を固定する引数については、argsで固定します。

例えば、$f(x, y, a, b) = x^2 + ax + y^2 + by$ みたいな関数があって、a, bは固定したとき、これを最小にする $x, y$を求めたい場合次のようにします。

# 元の関数(オリジナルの目的関数)
def original_function(x, y, a, b):
    return x**2 + a*x + y**2 + b*y


# 最適化用にラップする関数
# 元の関数のx, y を xという配列で渡してx[0], x[1]として内部で使っている
def optimization_target_function(x, *params):
    return original_function(x[0], x[1], *params)


# 最適化の実行
x0 = [0, 0]
result = minimize(optimization_target_function, x0, args=(4, -6), method='L-BFGS-B')

# 結果の表示
print("最適化の結果:", result)
print("最小値をとる点 x:", result.x[0])
print("最小値をとる点 y:", result.x[1])
print("最小値 f(x, y):", result.fun)

"""
最適化の結果:   message: CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL
  success: True
   status: 0
      fun: -12.999999999999925
        x: [-2.000e+00  3.000e+00]
      nit: 2
      jac: [-1.776e-07 -7.105e-07]
     nfev: 9
     njev: 3
 hess_inv: <2x2 LbfgsInvHessProduct with dtype=float64>
最小値をとる点 x: -2.0000000804663682
最小値をとる点 y: 2.9999997404723753
最小値 f(x, y): -12.999999999999925
"""

最適化したい引数が何変数なのか?ってのを一見どこでも指定してないように見えて不安になるのですが、これは初期値の配列の要素数から自動的に判断してくれています。

結果は result.x に入っているのでここから自分で取得します。

本当に基本的な使い方は以上になります。

少し発展的な内容なのですが、最適化のアルゴリズムは何種類もある中から選べますが、中には導関数を利用するものもあります。このminimizeに導関数を渡さない場合は、数値微分でいい感じにやってくれるのですが、明示的に導関数を指定することも可能です。

やり方は簡単で、元の関数と同じ形で引数を受け取るように導関数を定義して、jac引数に渡すだけです。1変数の場合は簡単すぎるので2変数の例を出しますが、2変数の場合は第1引数による微分と第2引数による微分の2つがあるので、その結果を配列で返す関数を定義してそれを渡します。

例えば、$f(x,y)=x^2+y^2+4x+6y+13$ みたいなのを考えてみましょう。$x, y$での微分はそれぞれ、$2x+4$, $2y+6$ですね。

# 目的関数
def objective_function(vars):
    """
    2変数関数 f(x, y) = x^2 + y^2 + 4x + 6y + 13
    vars: [x, y]
    """
    x, y = vars
    return x**2 + y**2 + 4*x + 6*y + 13

# 勾配(偏微分値)
def gradient_function(vars):
    """
    勾配 ∇f(x, y) = [∂f/∂x, ∂f/∂y]
    vars: [x, y]
    """
    x, y = vars
    grad_x = 2 * x + 4
    grad_y = 2 * y + 6
    return np.array([grad_x, grad_y])  # 勾配ベクトルを返す

# 初期値
x0 = [0, 0]

# 最適化の実行 (勾配を指定)
result = minimize(objective_function, x0, jac=gradient_function, method='L-BFGS-B')

# 結果の表示
print("最適化の結果:", result)
print("最小値をとる点 [x, y]:", result.x)
print("最小値 f(x, y):", result.fun)

"""
最適化の結果:   message: CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL
  success: True
   status: 0
      fun: 0.0
        x: [-2.000e+00 -3.000e+00]
      nit: 2
      jac: [ 0.000e+00  0.000e+00]
     nfev: 3
     njev: 3
 hess_inv: <2x2 LbfgsInvHessProduct with dtype=float64>
最小値をとる点 [x, y]: [-2. -3.]
最小値 f(x, y): 0.0
"""

関数が簡単で計算量も小さい場合は数値微分でも特に問題ないのですが、そうでない場合は明示的に導関数を渡すことでリソースを節約することもできます。

長くなってきたので今回の記事はここまでにしようと思います。境界条件とうの制約をつける方法とかを今後紹介したいですね。

最後にminimizeを使う時の注意点ですが、これ、極小値が複数あるような関数の場合、最小値ではなく極小値の一つを返してくることがあります。結果が初期値に依存してしまうのです。複雑な形の関数を最適化する場合は注意してください。

boto3を使ってEC2のCPUクレジットを取得する

EC2を使っていて、動作が重いなーって感じた時にCPUクレジットの状況を確認することがあります。AWSのコンソールにログインしたら見れはするのですが2要素認証も使っていてやや面倒なので、boto3で確認する方法を調べました。
(※ 個人利用しているAWSの話です。普段jupyterlabで触っていて何かしらの作業をインタラクティブに行っています。業務におけるWebサービスの運用等であればそのインスタンスにログインするのもコンソールに入るのも手間は変わらないと思います。)

僕がクラウドウォッチに詳しくなくて、あまり詳細な説明ができないというのもあるのですが、いろいろ試した結果、以下のコードで取得できることが確認できたのでそのまま掲載します。

import boto3
import requests
import datetime


# EC2メタデータからインスタンスIDを取得
def get_instance_id():
    url = "http://169.254.169.254/latest/meta-data/instance-id"
    response = requests.get(url)
    return response.text

# インスタンスIDを取得
instance_id = get_instance_id()

# CloudWatch クライアントを作成
cloudwatch = boto3.client('cloudwatch')

# 現在の UTC 時刻を取得
end_time = datetime.datetime.utcnow()
# 過去 5 分間のデータを取得
start_time = end_time - datetime.timedelta(minutes=5)

# CPUCreditBalance メトリクスを取得
response = cloudwatch.get_metric_statistics(
    Namespace='AWS/EC2',
    MetricName='CPUCreditBalance',
    Dimensions=[
        {
            'Name': 'InstanceId',
            'Value': instance_id
        },
    ],
    StartTime=start_time,
    EndTime=end_time,
    Period=300,  # 5 分間隔
    Statistics=['Average']
)

# データポイントを取得
data_points = response['Datapoints']
if data_points:
    # 最新のデータポイントを取得
    cpu_credit_balance = data_points[0]['Average']
    print(f'インスタンスID: {instance_id}')
    print(f'CPUクレジット残高: {cpu_credit_balance}')
else:
    print('CPUクレジット残高のデータが見つかりませんでした。')

これで現在のCPUクレジット残高が取得できます。
CPUクレジッド残高はインスタンスサイズによって初期値や上限値が異なるので、別途最大値を確認しておくと参考になると思います。

参考: バーストパフォーマンスインスタンスに関する主要な概念 – Amazon Elastic Compute Cloud

EC2の内部からそのインスタンスのidなどのメタデータを取得する

EC2で作業をしていて今使っているそのインスタンスのidをコードで取得したい、った場面があったのでその方法を紹介します。

最初はてっきり、環境変数か何かに入っててそこから使えるだろう、とか思っていたのですが予想に反する場所にメタデータの格納場所がありました。

インスタンスidなどのメタデータにはインスタンス内部から特定のURLにアクセスするとで取得できます。

この辺りが参考になります。
EC2 インスタンスのインスタンスメタデータにアクセスする – Amazon Elastic Compute Cloud
インスタンスメタデータを使用して EC2 インスタンスを管理する – Amazon Elastic Compute Cloud

アクセスするURLは以下です。
http://169.254.169.254/latest/meta-data/
これは情報を取得したいインスタンス内部からしかアクセスできないので気をつけてください。同じAWSアカウントであっても別のインスタンスの情報は取れないので取得したい場合はboto3などの別の手段を使う必要があります。

実際にcurlでアクセスすると、次のような情報が返ってきます。

!curl http://169.254.169.254/latest/meta-data/
ami-id
ami-launch-index
ami-manifest-path
block-device-mapping/
events/
hostname
iam/
identity-credentials/
instance-action
instance-id
instance-life-cycle
instance-type
local-hostname
local-ipv4
mac
metrics/
network/
placement/
profile
public-hostname
public-ipv4
public-keys/
reservation-id
security-groups
services/
system

これらは取得可能な情報の一覧ですね。
/で終わってるやつはまだ配下に階層が続くのでそれを手がかりに深掘りしていくことになりますが、instance-id は直下にあるようです。先ほどのURLに取得したい情報のpathを追加して実行すると欲しい情報が取れます。

!curl http://169.254.169.254/latest/meta-data/instance-id
i-*****************

Pythonでやる場合は次のようにして簡単に取得できます。

import requests


metadata_url = "http://169.254.169.254/latest/meta-data/instance-id"
print(requests.get(metadata_url).text)

これでそのコードが動いているインスタンスのidが取得できました。
boto3等を使ったコードで、実行しているインスタンス自身に対して何かやりたいのでそのインスタンスのidの指定が必要、って場合に同一のコードを使いまわせるので便利になります。

Pythonコードでimportに失敗するライブラリのバージョンを確認する

とある特殊な環境でPythonを書いていて、いくつかのライブラリがimportに失敗するという事態に遭遇しました。自分のローカルPC上であれば、pip freeze とかしてライブラリのバージョンを調べて原因を調査するのですが、その環境ではOSコマンドが打てず、同様の調査が不可能でした。importさえできれば {ライブラリ名}.__version__ みたいなプロパティから取得することもできたのですがimport自体が失敗するとあって調査に苦戦していました。

ところがどうやらimportを行わずにライブラリのメタデータにアクセスする方法がちゃんとあるようだったのでこの記事にまとめておきます。(正直、普通の環境であればpipで調べれれば済む話なので、ほとんどの人にとっては不要な知識だと思います。)

pkg_resources を使う方法

importせずにライブラリのバージョンを取得する方法の1個目は pkg_resources を使うものです。

次の例は、 arviz というライブラリのバージョンを調べたものです。

from pkg_resources import get_distribution


try:
    version = get_distribution("arviz").version
    print(f"version: {version}")
except pkg_resources.DistributionNotFound:
    print("ライブラリが見つかりません")

# version: 0.16.1

僕は上記の方法で一旦解決しました。

ただ、これはsetuptoolsに依存したライブラリなのですが、その公式ドキュメントを見ると、もう廃止されたからimportlib.metadataを使えと書いてあるのですよね。

ということで合わせてそちらを紹介します。

importlib を使う方法

importlib はPython3.8から標準ライブラリに含まれたライブラリです。

これはインストール済みのライブラリのメタデータにアクセスする機能を持っています。標準ライブラリになったわけなので、最近のPythonであればおそらくこちらを使うのが適切なんだと思います。

参考: importlib.metadata — パッケージメタデータへのアクセス — Python 3.13.0 ドキュメント

サンプルコードでもバージョンを取得していますね。それにならってやってみましょう。

import importlib.metadata


try:
    version = importlib.metadata.version("arviz")
    print(f"version: {version}")
except importlib.metadata.PackageNotFoundError:
    print("ライブラリが見つかりません")

# version: 0.16.1

同じような結果が得られました。

余談ですが、公式ドキュメントのサンプルコードでは、
from importlib.metadata import version
として versionという関数をインポートしています。僕も最初それに倣ってやったのですが、versionを結果の変数名で使いたいな、と思ったので上記のインポート方法にしました。
ただ、これはこれでイマイチな気もします。

pandasのwhereとmask

前回の記事で、np.whereという関数の紹介をしたのですが、pandasにも同名のwhereっていうメソッドがあるので紹介します。また、非常に似た挙動のmaskというメソッドもありますので合わせて書きます。

pandasのwhereとmaskはDataFrameやSeriesが持っているメソッドです。
参考:
pandas.DataFrame.where — pandas 2.2.3 documentation
pandas.DataFrame.mask — pandas 2.2.3 documentation

挙動はnumpyのwhereと似ている部分があり、条件に応じて要素を置き換えます。ただし、使い方が少しだけ異なっており、np.where()のようにnumpy自体が持っていた関数ではなくDataFrameやSeriesなどのメソッドなので元の値が利用される分、np.whereより引数が一つ少なくなります。

1個目の引数に条件、2個目の引数に置き換える値(省略すればNoneになります。また関数を指定することもできます。)を入れて使用します。

そしてこの条件の扱いがwhereとmaskで異なります。
whereは条件がFalseの場合に値を置き換えmaskは条件がTrueの場合に値を置き換えます。

例えば、0~9の値を並べたデータフレームで、3の倍数かどうかという条件で置き換え対象を負の数にするような書き方をすると、whereは3の倍数以外の数がマイナスになり、maskの方は3の倍数がマイナスになります。

import pandas as pd
import numpy as np


df = pd.DataFrame(np.array(range(10)).reshape(5, 2))
print(df)
"""
   0  1
0  0  1
1  2  3
2  4  5
3  6  7
4  8  9
"""

# 3の倍数が条件を満たすのでそのまま残り、それ以外がマイナスになる。
print(df.where(df%3==0, -df))
"""
   0  1
0  0 -1
1 -2  3
2 -4 -5
3  6 -7
4 -8  9
"""

# 3の倍数が条件を満たすのでマイナスになる。
print(df.mask(df%3==0, -df))
"""
   0  1
0  0  1
1  2 -3
2  4  5
3 -6  7
4  8 -9
"""

上記の例は、2個目の引数に元のデータと同じ形のデータフレームが渡されていますが、2個目の引数は定数を渡すこともできるし、関数を渡すこともできます。

例えば奇数を定数-1に置き換えたり、奇数を3倍して1足すようなメソッドは次のようになるでしょう。

print(df.mask(df%2==0, -1))
"""
   0  1
0 -1  1
1 -1  3
2 -1  5
3 -1  7
4 -1  9
"""

print(df.mask(df%2==1, lambda x: 3*x+1))
"""
   0   1
0  0   4
1  2  10
2  4  16
3  6  22
4  8  28
"""

「こういう値に対してこうしたい」っていう日本語の説明に対して直感的に書けるのはmaskの方ですね。fillna()の汎用版みたいなイメージで使いやすいです。

whereの方は、「こういう条件を満たす値はそのままでいいんだ、そうでは無いのを置き換えたい」っていうイメージでしょうか。

np.whereで効率的に値を出し分ける

今回もnumpyのテクニックの紹介です。np.whereというメソッドを解説します。

参考: numpy.where — NumPy v2.0 Manual

これは何かというと、第1引数にTrue/Falseで評価できるデータの配列を渡すとその評価に応じてTrueなら第2引数、Falseなら第3引数の値を返す、というものです。

第2, 第3引数に渡すのは第1引数に渡した配列と同じ長さ(多次元なら全て同じ)でも良いし、定数であったり、ブロードキャストすれば同じ形にできるものなら何でも良いです。

一番シンプルな例としては、条件を満たすかどうかでそれぞれ異なる定数を返すようなものでしょうか。

import numpy as np


scores = np.array([45, 85, 72, 50, 90])
results = np.where(scores >= 60, '合格', '不合格')
print(results)
# ['不合格' '合格' '合格' '不合格' '合格']

説明いらないと思いますが、60点以上なら合格、と判定するメソッドですね。

上記の例のように、事前にTrue/False の配列を作っておくのではなく、何かしらの条件式を代1引数に渡すような使い方になると思います。条件に応じて何かしらの演算を行いたい場合は、第2, 第3引数に計算式を入れて結果を渡すような形になります。例えば、偶数なら1/2, 奇数なら 3倍して1を足す、みたいな処理をするならこうです。

np.where(scores%2 == 0, scores/2, 3*scores+1)
# array([136., 256.,  36.,  25.,  45.])

1次元配列の場合は、内包表記でもほぼ同じことができるのでありがたみが薄いですが、np.whereは多次元配列で便利なことがあります。(単純に、内包表記の方が不便になるだけという見方もできますが。)

自分が最近使った例としては、欠損値がある行列Aと別の行列Bがあった時に、欠損値以外は元の行列Aの値、欠損してる部分はBの値、で埋めたいというものでした。

これが次のようにして簡単に行えます。

A = np.array(
        [[1, 2, 3,], [np.nan, 5, 6], [7, np.nan, 9]]
    )
B = np.array(
        [[11, 12, 13,], [14, 15, 16], [17, 18, 19]]
    )

print(np.where(np.isnan(A), B, A))
"""
[[ 1.  2.  3.]
 [14.  5.  6.]
 [ 7. 18.  9.]]
"""

コードがすっきり書けること以外にもベクトル処理が行えることによるパフォーマンス面のメリットなど、利点があるので機会があれば使ってみてください。

Nanを含むnumpy配列のデータを専用メソッドで手軽に集計する

numpyのちょっとしたテクニックの話です。僕は最近まで知らなかったのですが、numpyには np.nansum など nan + 集計関数名 という命名規則のメソッド群が用意されています。これの紹介をします。

前提として、 numpy配列の値を合計したり平均を取ったりする時、データ中にnanがあると結果もnanになります。pandasのSeriesの場合と挙動が違うのですね。例えば以下のような感じです。(Seriesと挙動違うよという話は以前どこかの記事で書いた覚えがあります)

import numpy as np
import pandas as pd


# nanを含むデータを作る
ary = np.array([1, 1, 2, 3, np.nan, 8])
print(ary)
# [ 1.  1.  2.  3. nan  8.]

# 合計するとnanになる
print(ary.sum())
# nan

# 平均も同様
print(ary.mean())
# nan

# Series はnanを無視してくれる
print(pd.Series(ary).sum())
# 15.0
print(pd.Series(ary).mean())
# 3.0

欠損値の存在に気づくきっかけになったりしてありがたいこともありますし、仕様としてどうあるべきかを考えたらnullの伝播が実装されているこの作りが正しいと思えるのですが、この挙動が不便なことが多いのも事実です。

僕はこういう時大体Seriesに変換してしまって集計していました。

ただ、実は numpyにもNanに対応したメソッドがちゃんとあり、それが冒頭に書いたnansumです。maxにはnanmax, stdにはnanstd のように多くのメソッドに対して実装されています。

dir()で探すと一覧額作成できます。

for m in dir(np):
    if m.startswith("nan"):  # メソッド名がnanで始まるか
        if m.replace("nan", "") in dir(np):  # nanの部分を除外した場合に同じ名前のメソッドがあるか
            print(m)
"""
nanargmax
nanargmin
nancumprod
nancumsum
nanmax
nanmean
nanmedian
nanmin
nanpercentile
nanprod
nanquantile
nanstd
nansum
nanvar
"""

これらを使うと、エラーが起きずにnanを無視して無視して残りの要素について集計してくれます。

print(np.nansum(ary))
# 15.0
print(np.nanmean(ary))
# 3

1次元配列の場合は内包表記での対応とか色々やり方もあるのですが多次元になってくると面倒だし集計のために補完するのも面倒なのでありがたいですね。使い方がnp.nansum(ary)であって、ary.nansum() では無いので注意してください。

もう一点、 np.nan ではなく、Noneを含めてるとこれは数値の欠損値では無いので相わらずエラーになります。ここも注意です。

ary2 = np.array([1, 1, 2, 3, None, 8])

try:
    np.nansum(ary2)

except Exception as e:
    print(e)
# unsupported operand type(s) for +: 'int' and 'NoneType'

SQLでNULL同士を等しいとみなして効率的に比較を行う方法

今回はSQLの小ネタです。

初心者がミスりがちな話なのですが、SQLでは通常 NULLとNULLは等しいとは見做されません。

例えば、`select null = null ` など実行すると、Trueではなくnull が返ってきます。

しかし、場合によってはnull同士は等しいものとして判定したいことがあり、その場合は何かしら一工夫する必要があります。両方nullという場合だけTrue返せばいいということもなく、当然値が入っているなら値が入ってるもの同士は通常の比較処理を行い、0や空文字も含めてnull以外の値とnullは違うものとみなし、その上でnull同士は等しいという判定をやるケースですね。

null以外の値が全部0以上の数値であることがわかっているなら coalesce でnullを-1に変換してから比較するとか、文字列方の列で、かつ値が入っている部分にnullって文字列がないことが確認できているなら null っていう文字列で埋めて比較するといった手段が取れます。

しかし、この列に絶対無いと言い切れる値が存在しない場合、補完して比較する方法は使えません。こういった場合に、スマートにnullを考慮した比較を行える方法をMySQLとSnowflakeの両環境について紹介します。

MySQLの場合

MySQLの場合、 <=> という演算子がサポートされています。これは、「NULLセーフイコール演算子」といいます。

これを使うと、 `selct null <=> null` がTrueになります。

ドキュメント: MySQL :: MySQL 8.0 Reference Manual :: 14.4.2 Comparison Functions and Operators

Snowflakeの場合

Snowflakeの場合、上記の<=>演算子はサポートされていませんが、`is disticnt from` という演算子が実装されています。

ドキュメント: IS [ NOT ] DISTINCT FROM | Snowflake Documentation

ちょっと長いので、<=>のほうが便利だよなぁとは思うのですが、標準SQLに準拠した書き方はこちらの方です。(最初、Snwoflake専用の方言かと勘違いしていました)

そこそこの頻度で使う機会がある構文だと思うので、頭の片隅にでも置いといてください。

Pythonで非負値行列分解

これもずっと前に記事にしたような気がしていたのですが、最近ちょっと仕事で使おうと思って自分のブログで検索したら書いてなかったことがわかったのでこの機会に記事にします。

行列を複数の行列の積に分解する方法は複数ありますが、その中でも非負値行列分解というsy方があります。

これは、元の行列の全ての要素が0以上(0を許すので正値ではなく非負値といいます)の場合に使える分解方法です。一般的には低rankな二つの行列で、それぞれの行列の全ての要素も0以上の行列の積に分解します。

数式で言うと、$V \approx W \times H$ ですね。 $V$が$M\times N$行列だとした場合、分解後のrankを$K$とすると、$W$は$M\times K$行列、$H$は$K\times N$行列になります。

ここで$V$は元の行列(データ行列)で、行数はサンプル数、列数は特徴数です。
$W$は基底行列になり各行は元データの異なる要素の「基底」や「パターン」を示します。
そして、$H$は係数行列で、各列は基底行列の要素がどの程度元のデータに寄与しているかを示します。

この分解によって、元のデータの背後に潜在する低次元のパターンや構造を捉えることができます。

分解後の要素が全て非負なので、分解結果を加法的に扱えるのが利点です。負の値が混ざってるとある値が大きかった時にそれに掛け算される係数が正なのか負なのか考慮して解釈しないといけないですがここが絶対0以上と保証されていると評価されやすいですね。

また、次元削減やデータ量の削減にも有宇高です。この用途で使われるため、$K$は小さな値が採用されやすいです。

この非負値行列分解はが画像処理とか音声解析、推薦システムの中で活用されていますね。

さて、scikit-learnを使って実際にやってみましょう。乱数で生成した行列でやってみますね。

ドキュメントはこちらです。
参考: NMF — scikit-learn 1.5.2 documentation

import numpy as np
from sklearn.decomposition import NMF

# サンプルデータ生成
# 乱数で5x4の非負行列を作成
np.random.seed(0)
V = np.random.randint(0, 6, size=(5, 4))

# NMFを適用、ランク(分解する際の次元)を2に設定
model = NMF(n_components=2, init='random', random_state=0)
W = model.fit_transform(V)  # 基底行列W
H = model.components_       # 係数行列H

# 分解結果を表示
print("元の行列 V:")
print(V)
"""
元の行列 V:
[[4 5 0 3]
 [3 3 1 3]
 [5 2 4 0]
 [0 4 2 1]
 [0 1 5 1]] 
 """

print("\n基底行列 W:")
print(W)
"""
基底行列 W:
[[1.97084006 0.        ]
 [1.35899323 0.33537722]
 [0.90516396 1.61014842]
 [0.76977917 0.6670942 ]
 [0.         1.7698575 ]]
"""

print("\n係数行列 H:")
print(H)
"""
係数行列 H:
[[2.09286365 2.49350723 0.         1.50627845]
 [0.63319723 0.41601049 2.69948881 0.        ]]
"""

# 元の行列の近似値を計算
V_approx = np.dot(W, H)

print("\n近似された行列 V_approx:")
print(V_approx)
"""
近似された行列 V_approx:
[[4.12469952 4.91430393 0.         2.96863391]
 [3.05654746 3.52817989 0.90534704 2.04702222]
 [2.91392627 2.92687151 4.34657765 1.36342897]
 [2.03344504 2.19696811 1.80081332 1.15950177]
 [1.12066887 0.73627928 4.7777105  0.        ]]
"""

見ての通りでちょっとクセがありますね。
基底行列の方はtransformで元のデータを変換して取得し、係数行列の方がcomponents_に入っています。

さて、近似した行列ですが、元の行列が純粋にただの乱数で生成されたもので、通常のデータであれば背景にあるはずの隠れた構造とかを一切持たないものだった割に結構近い値で近似できてるのではないでしょうか。

久しぶりに使うと、どっちの行列がどっちだっけとか、転地必要だったっけ、とか色々迷うのですが慣れれば手軽に扱えるので機会があれば試してみてください。

Pythonでマルチプロセス処理

前回の記事がマルチスレッドだったので今回はマルチプロセスを紹介します。

Pythonにおけるマルチプロセスの1番のメリットはGILの制約を回避できることでしょうね。

ただ、先に書いておきますが、この記事で書いている方法はJupyter notebookのセルに直接書くと正常に動作せずエラーになることがあります。.pyファイルを作成してそこに記入して使うようにしましょう。

マルチプロセスを実装するには、最近はconcurrent.futuresのProcessPoolExecutorを使います。
参考: concurrent.futures — 並列タスク実行 — Python 3.12.6 ドキュメント

ドキュメントのサンプルコードを参考に動かしてみましょう!
例として取り上げられているのは素数判定ですね。Pythonで処理が完結するのですが、GIL制約のためマルチスレッドだと高速化の恩恵が受けられないものです。

from concurrent.futures import ProcessPoolExecutor
import math


PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419
    ]

def is_prime(n):
    print(f"整数 {n} を素数判定します")
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d は素数か: %s' % (number, prime))

if __name__ == '__main__':
    main()

# 以下実行結果
"""
整数 112272535095293 を素数判定します
整数 112582705942171 を素数判定します
整数 112272535095293 を素数判定します
整数 115280095190773 を素数判定します
整数 115797848077099 を素数判定します
整数 1099726899285419 を素数判定します
112272535095293 は素数か: True
112582705942171 は素数か: True
112272535095293 は素数か: True
115280095190773 は素数か: True
115797848077099 は素数か: True
1099726899285419 は素数か: False
"""

最初にそれぞれの値の素数判定が始まってる旨のメッセージが出てその後に結果が順番に出てきたので、並行して処理されているのが確認できました。

is_prime(n)が並行して実行している処理です。

ProcessPoolExecutor() でエクゼキューターを作成して、今回は submit()ではなく、mapで適用していますね。map()には第一引数で並列実行したい関数を渡し、次の引数でその関数に渡す引数のリストを渡します。

submit と map はどちらもProcessPoolExecutor や ThreadPoolExecutor の継承元の抽象クラスのExecutor に実装されているメソッドなので、実はマルチプロセスとマルチスレッドのどちらでも両方使うことができます。お好みの方で書いたらよさそうです。

細かい挙動は異なっていて、前回のsubmit()ではas_completed()を使って終わった順番に処理を取り出していましたが、map()を使う場合は、処理自体は並列して同時に行われて順不同で完了しますが、結果の取り出しは渡した引数の順番になります。