pandasのSeriesのlocには関数も渡せる

何となくpandasのドキュメントを眺めていたら見つけた小ネタの紹介です。
この記事を読むと、pandasのSeriesをもっと手軽に値で絞り込める様になります。
参考: pandas.Series.loc — pandas 2.0.3 documentation

pandasのlocといえば、自分としてはDataFrameで使うことが多いプロパティですが、もちろんSeriesにも実装されています。そして、これを使うとindexの値に従って要素を絞り込むことができます。

今回見つけたのは、このlocにcallable、要するにメソッドが渡せるってことです。渡したメソッドにSeriesの値が渡され、その結果がTrueのものに絞り込まれます。

こんな感じで使えます。例えば値が3以上の要素だけに絞り込む例です。

import pandas as pd

# Seriesを作成
s = pd.Series([1, 2, 3, 4, 5], index=['a', 'b', 'c', 'd', 'e'])

# 値が3以上の要素だけを絞り込む
s[lambda x: x>=3]
"""
c    3
d    4
e    5
dtype: int64
"""

上記の例だとメリットがわかりにくいかと思います。と言うのもpandasに慣れてる人だったら次の様に書けることがわかってると思いますし、タイプ数も少なく可読性も高いからです。

s[s>=3]
"""
c    3
d    4
e    5
dtype: int64
"""

単純な大小比較や一致不一致ではなく、もっと複雑な判定を行うメソッドを適用するときなどは、このlocにメソッドを渡すやり方が便利に使えますね。

僕が個人的に気に入ったのは、この絞り込みが特定の変数に格納されていないSeriesについても使えると言うことです。
「特定の変数に格納されていないSeries」ってのは、例えばDataFrameのvalues_counts()メソッドなどを実行した結果として得られる値などの形で取得されます。

例えば、dfというDataFrameがあるとして、そのcolumn_name列の値の出現回数を数え、そのうち出現回数が10以上のものだけを取り出すとしましょう。

通常であれば、value_counts()の結果を何かの変数に格納して実行するか、もしくは2回value_counts()を実行する非効率に我慢するかして次の様に実装します。

# カウント結果を一度変数に格納する場合
count_sr = df["column_name"].value_counts()
count_sr[count_sr >= 10]

# value_countsを2回実行する場合
df["column_name"].value_counts()[df["column_name"].value_counts() >= 10]

これが、locにメソッドが渡せることを知っていると次の様に書けます。

# 無駄な変数も定義しないし、value_counts()の実行も1回でいい書き方
df["column_name"].value_counts()[lambda x: x>=10]

個人的に、「ある列の要素ごとに数を数えて、一定件数以上データがあったものだけ残す処理」ってのをやることが頻繁にあり、Seriesのfiterメソッドが値ではなくindexにしか使えないことを日々残念に思っていた自分にとってはめっちゃ嬉しいテクニックだったので紹介しました。

ちなみに、DataFrameのlocも同じ様にcallableを渡せます。こっちはあまり使い道が思いつかないですね。
参考: pandas.DataFrame.loc — pandas 2.0.3 documentation

Jupyter notebook ファイルをモジュールとして import する

タイトルの通りで、Jupyter notebookファイル (.ipynb ファイル)をモジュールとしてインポートする方法を紹介します。

僕は普段のコーディングをJupyterでやっているので、自分で使う汎用的なモジュールを作るときも一度Jupyterで作って.pyファイルに移行するという手順で作っていました。一回完成させて仕舞えばそれでいいのですが、作りかけのものを別のプログラムで使いたいとか、それに限らず、あるnotebookで定義した関数を別のnotebookで使いたい、ってときにこれまでセルの中身をコピペしてたのですが、実はnotebookファイルのままimportできるという噂を聞き試しました。

機械学習の前処理とか何度も同じコードを書いてるので使いまわせる様になると便利そうです。

使うのはこちらの importnb というライブラリです。
参考: importnb · PyPI

とりあえず試してみましょう。
importされる側のnotebookファイルを以下の内容で作ります。importできることの確認だけなので、適当なメソッドと定数が定義されているだけのファイルです。

ファイル名は sub_file.ipynb としました。

def foo():
    return "bar"


hoge = "hoge"

では、このファイルをimportしてみましょう。

importnb sub_file とかで済むと簡単なのですが、やや独特な記法でimportします。先ほどのsub_file.ipynb は閉じて新しいnotebookファイルで以下の様に書きます。

from importnb import imports


with imports("ipynb"):
    import sub_file

これでimportできました。
メソッド foo や 変数 hoge が使えます。

print(sub_file.foo())
# bar

print(sub_file.hoge)
# hoge

少し検証してみたのですが、このライブラリはimportされるnotebookの中身によっては注意して使う必要があります。というのも、notebookを import するときにimport されるnotebookの全てのセルが実行されるのです。なので、何かファイルを書き出す処理があればimportした時点でファイルを書き出ししますし、重い処理があれば時間かかりますし、外部APIを叩く処理が入っていたら外部APIを叩きます。

宣言されているメソッドやクラス、変数だけを持ってきて使えると言うわけでは無いのでimportするnotebookは慎重に選びましょう。

というか、その確認作業をするのであれば .py ファイルに書き出すとか必要なセルだけコピペして持ってくるといった対応をする方が早いことが多く、この importnb を使う場面ってかなり限られるなぁと言うのが自分の所感です。

頻繁に使っうMeCabを使った前処理とかワードクラウドクラウド作成とか汎用的なSQLとか自分が頻繁に使うメソッドや定数をまとめた神notebook集を用意したりするとまた活用の幅も出てくるかもしれませんね。

もう一点、検証時に気付いた注意点があります。これ、notebookのセルを全て実行するので、その中に一つでもエラーになるセルがあったらimportを失敗します。

そのエラーになるセルより先に実行されたセルの中身だけimportされるのではなく、何もimportされない結果になります。これも注意しましょう。

Excel VBA で J-Quants APIを実行する

自分はもう10年以上使っているのですが意外と知られていない技術として、Excel VBAではHTTPアクセスを用いてWebサイトの情報取得やWeb APIの実行ができます。もしかしたらニーズがあるかもしれない技術なので紹介します。

APIの例として J-Quants API を利用しますが他のAPIでも同様に使えると思います。J-Quants APIを選定したのは利用手順の中でPOSTやGETやヘッダーの設定等いろいろ技術が必要で網羅的な紹介ができるからです。

このブログでは普段は暗黙のうちにOSがMacであることを前提としていますが、この記事に限ってはWindows前提です。MacのExcelでは動作しないと思います。

APIの利用方法自体はPython版の記事があるのでこちらをご参照ください。照らし合わせながら見ると、Excel VBA の XMLHTTP60 オブジェクトの使い方が分かってくると思います。

参考: J-Quants API の基本的な使い方

参照設定

以下の二つを参照設定しておいてください。XMLのほうがhttpアクセスに必要です。正規表現のほうは返ってきたJSONから必要な部分を取得するのに使います。VBAはJSONの扱いが不便なので、何か事情が無ければPython等の他の言語をお勧めします。

  • Microsoft XML, v6.0
  • Microsoft VBScript Regular Expressions 5.5

リフレッシュトークンの取得関数

リフレッシュトークンを取得する関数のコードは以下のようになります。

Public Function get_refresh_token(email As String, passoword As String) As String
    Dim objXMLHTTP As New XMLHTTP60
    Dim re As New RegExp
    Dim mc As MatchCollection
    Dim account_data As String
    Dim auth_user_url As String
    
    account_data = "{""mailaddress"": """ & email & """,""password"": """ & passoword & """}"
    auth_user_url = "https://api.jquants.com/v1/token/auth_user"
    
    Call objXMLHTTP.Open("POST", auth_user_url, False)
    Call objXMLHTTP.send(account_data)
    Do While objXMLHTTP.readyState <> 4
                DoEvents
    Loop
    
    ' Rehresh Tokenを取り出す正規表現
    re.Pattern = "refreshToken"": ""([^""]+)"""
    
    Set mc = re.Execute(objXMLHTTP.responseText)
    get_refresh_token = mc.Item(0).SubMatches(0)
    
End Function

12行目から15行目までが、APIにデータをPOSTして結果を待っている部分です。メソッド(POST)、URLをopenで指定して、sendするときにPOSTするデータを渡しています。この構文を覚えておくと大抵のAPIは使えます。GETメソッドの時はPOSTするデータはないのでsendの引数は空でよいです。

戻ってくるデータはJSONの文字列なので、正規表現で取り出してます。

idトークンの取得関数

リフレッシュトークンが取得出来たら次はidトークンです。これはリフレッシュトークンを組クエリパラメーターでPOSTします。

Public Function get_id_token(refresh_token As String) As String
    Dim objXMLHTTP As New XMLHTTP60
    Dim re As New RegExp
    Dim mc As MatchCollection
    Dim auth_refresh_url  As String

    auth_refresh_url = "https://api.jquants.com/v1/token/auth_refresh?refreshtoken=" & refresh_token
    
    Call objXMLHTTP.Open("POST", auth_refresh_url, False)
    Call objXMLHTTP.send
    Do While objXMLHTTP.readyState <> 4
                DoEvents
    Loop
    
    
    ' id Tokenを取り出す正規表現
    re.Pattern = "idToken"": ""([^""]+)"""
    
    Set mc = re.Execute(objXMLHTTP.responseText)
    get_id_token = mc.Item(0).SubMatches(0)
    

ほとんど同じですね。

メインのAPIを実行する関数

idトークンが取得出来たら目当てのAPIを取得する関数を実行します。とりあえず時系列データを取ってみましょうか。

JSONで各日の4本値データが返ってくるので、1日分ずつ取得して Sheet1 のセルに張り付ける処理にしました。この時点では、まだ1日分のデータがJSON形式になっているので、Excel や VBAで利用するにはもう一段階パースする必要がありますが、ここまでできればあとは手間だけの問題でしょう。

先ほどまでのTokenの取得と違って、リクエストのヘッダーを設定しないといけないのでその処理が入っています。

Public Sub get_price(id_token As String, code As String)
    Dim objXMLHTTP As New XMLHTTP60
    Dim re As New RegExp
    Dim mc As MatchCollection
    Dim daily_quotes_url As String
    Dim i As Integer

    ' daily_quotes_urlを構築
    daily_quotes_url = "https://api.jquants.com/v1/prices/daily_quotes?code=" & code

    Dim from_ As String
    Dim to_ As String
    Dim headers As Object
    Dim daily_quotes_result As Object
    Dim daily_quotes_df As Object
    
    Call objXMLHTTP.Open("GET", daily_quotes_url, False)
    Call objXMLHTTP.setRequestHeader("Authorization", "Bearer " & id_token)
    Call objXMLHTTP.send
    Do While objXMLHTTP.readyState <> 4
                DoEvents
    Loop
    
    ' 1日分のデータにマッチする正規表現
    re.Pattern = "{[^{}]*Date[^{}]*}"
    re.Global = True
    
    Set mc = re.Execute(objXMLHTTP.responseText)
    ' セルに出力
    For i = 0 To mc.Count - 1
        Sheet1.Cells(i + 1, 1) = mc.Item(i)
    Next i
End Sub

各関数と処理を実行する

一通り関数を作りましたので、次のプロシージャを使って呼び出しましょう。

Sub main()
    Dim refresh_token As String
    Dim id_token As String
    
    refresh_token = get_refresh_token("{メールアドレス}", "{パスワード}")
    id_token = get_id_token(refresh_token)
    Call get_price(id_token, "{証券コード}")
    
End Sub

これで動作するはずです。

Pythonを覚えて以来、この種の処理はほとんど全部Pythonでやるようになりましたが、まだまだデータ加工でExcelの出番が発生することはあり、Excel VBA でデータ取得から一貫して行えると便利な場面もあると思います。

とはいえ、通常はVBAはJSONの扱いが不便すぎるので、Pythonでデータ取得スクリプト書いた方が早かったりもするのですがPythonが使えない環境では重宝するでしょう。

Jupyterのnotebookファイルをコマンドラインでクリアする

JupyterのノートブックをGitで管理する場合、出力をクリアしてコミットすることが多いと思います。もちろん、exampleファイルなどの場合は出力結果の図などが付いた状況で保存したいということもあるとは思いますが。

それ以外にも、notebookファイルの数が大量になってくるとグラフやワードクラウドなどの出力を含む一つ一つのファイルサイズの大きさがディスク容量を圧迫するということもあるでしょう。そういった場合、最初は出力結果も残しておきたいけど1~2年経ったら中身クリアしてディスク節約したいなってこともあります。

この様な場合に、一回一回notebookを起動して出力をクリアして保存し直すというのはかなり手間です。

そこで、コマンドラインで実行する方法を紹介します。

利用するのは jupyter nbconvert です。
以前、notebookをコマンドラインで実行する記事でも使いましたね。
参考: Jupyter notebookのファイルをコマンドラインで実行する

ドキュメントを見ても、該当の記述が見つからないのですが、jupyter nbvonvertには –clear-output というオプションがあり、これを使うと出力をクリアできます。

ヘルプを見るとその中には記載があります。

$ jupyter nbconvert --help
# 該当部分を抜粋
--clear-output
    Clear output of current file and save in place,
            overwriting the existing notebook.
    Equivalent to: [--NbConvertApp.use_output_suffix=False --NbConvertApp.export_format=notebook --FilesWriter.build_directory= --ClearOutputPreprocessor.enabled=True]

使い方は簡単で、あとはnotebookファイル名を指定するだけです。

$ jupyter nbconvert --clear-output sample.ipynb

これで、notebookがクリアされ、未実行の状態になって上書き保存されます。

もし、ディスク容量の節約が目的であれば、この時点では思ったほど容量が節約できていないということもあるかもしれません。それは大抵、隠しディレクトリの .ipynb_checkpoints というのが生成されているせいなのでこれを丸ごと消しておきましょう。(実行時のバックアップなので、このディレクトリは消しても実害ありません。)

NetworkXのグラフをpyvisのグラフに変換して可視化する

pyvisはグラフを手軽に可視化できますし、pyvis自体のメソッドでノードやエッジを追加してグラフを構築することもできるので基本的な処理はこれだけで完結させることももちろんできます。しかし、グラフデータの分析をしていると、各種アルゴリズムが充実しているNetworkXでいろんな分析を行って、それを最後にpyvisで可視化したい、ということはよくある話です。

こういう場合、pyvisのネットワークオブジェクトが持っている、from_nxってメソッドが使えます。
参考: Documentation — pyvis 0.1.3.1 documentation

この記事の主題は「from_nxが便利だよ」で終わりなのですが、それだけではあんまりなので細かい話をいろいろ書いていきます。

まず、基本的な使い方ですが、from_nxは pyvisモジュールから直接呼び出せるメソッドではなく、pyvisのpyvis.network.Network オブジェクトに実装されているメソッドなので、まずそのインスタンスを生成します。以前の記事でも書いていますが、可視化するときのキャンパスサイズとか背景色などを指定して生成するやつですね。

具体的に適当なネットワークでやってみると次の様になります。

import networkx as nx
from pyvis.network import Network


# サンプルとして、NetworkXのグラフを生成
nx_graph = nx.Graph()
nx_graph.add_node("a")
nx_graph.add_node("b")
nx_graph.add_node("c")
nx_graph.add_edge("a", "b")
nx_graph.add_edge("b", "c")
nx_graph.add_edge("c", "a")


# ネットワークのインスタンス生成
network = Network(
    height="500px",
    width="500px",
    notebook=True,
    bgcolor='#ffffff',
    directed=False, 
)

# pyvisのネットワークに、NetworkXのグラフのノードやエッジ情報を取り込む。
network.from_nx(nx_graph)
# 可視化
network.show("sample.html")

結果は省略しますが、これで、a,b,cの3個のノードを持ったネットワークが可視化されます。

NetworkXで設定されていなかったがpyvisで可視化するときに必要な種類の情報はデフォルト値で補完されています。

# NetworkXのノードの情報
print(dict(nx_graph.nodes))
# {'a': {'size': 10}, 'b': {'size': 10}, 'c': {'size': 10}}

# pyvisに取り込んだときに、colorやshape、sizeのデフォルト値が設定されている。
print(network.nodes)
"""
[{'color': '#97c2fc', 'size': 10, 'id': 'a', 'label': 'a', 'shape': 'dot'},
 {'color': '#97c2fc', 'size': 10, 'id': 'b', 'label': 'b', 'shape': 'dot'},
 {'color': '#97c2fc', 'size': 10, 'id': 'c', 'label': 'c', 'shape': 'dot'}]
"""

# NetworkXのエッジの情報
print(dict(nx_graph.edges))
# {('a', 'b'): {'width': 1}, ('a', 'c'): {'width': 1}, ('b', 'c'): {'width': 1}}

# エッジはほぼそのまま取り込まれている。
print(network.edges)
# [{'width': 1, 'from': 'a', 'to': 'b'}, {'width': 1, 'from': 'a', 'to': 'c'}, {'width': 1, 'from': 'b', 'to': 'c'}]

ノードのサイズやエッジの太さは、デフォルト値をそれぞれ、default_node_size, default_edge_weight という引数で指定することもできるので、有効に使っていきましょう。例えばWebサイトのページ遷移データのネットワーク等で、NetworkX時点ではPVなどの大きい値をsizeとして計算していたら、それを可視化時のサイズとして使ってしまうとえらいことになります。

また、node_size_transf , edge_weight_transf という引数で、関数を渡しておくことで、それぞれの値を変換することもできます。元々の値が非常に大きい、または小さい場合にこれを使って補正することができますね。
例えば、
node_size_transf = (lambda x: x/10) とすると、ノードのサイズを1/10にできます。

ここで注意というか、version 3.2時点の pyvisにはバグがあって、エッジを持たない孤立ノードには node_size_transf が適用されません。

該当部分のソースコードがこちらです。

        if len(edges) > 0:
            for e in edges:
                if 'size' not in nodes[e[0]].keys():
                    nodes[e[0]]['size']=default_node_size
                nodes[e[0]]['size']=int(node_size_transf(nodes[e[0]]['size']))
                if 'size' not in nodes[e[1]].keys():
                    nodes[e[1]]['size']=default_node_size
                nodes[e[1]]['size']=int(node_size_transf(nodes[e[1]]['size']))
                self.add_node(e[0], **nodes[e[0]])
                self.add_node(e[1], **nodes[e[1]])

                # if user does not pass a 'weight' argument
                if "value" not in e[2] or "width" not in e[2]:
                    if edge_scaling:
                        width_type = 'value'
                    else:
                        width_type = 'width'
                    if "weight" not in e[2].keys():
                        e[2]["weight"] = default_edge_weight
                    e[2][width_type] = edge_weight_transf(e[2]["weight"])
                    # replace provided weight value and pass to 'value' or 'width'
                    e[2][width_type] = e[2].pop("weight")
                self.add_edge(e[0], e[1], **e[2])

        for node in nx.isolates(nx_graph):
            if 'size' not in nodes[node].keys():
                nodes[node]['size'] = default_node_size
            self.add_node(node, **nodes[node])

エッジが存在するノードの情報を取り込むときは、node_size_transfしてるのに、その後の孤立ノードの取り込みでは元のsizeとデフォルトノードサイズしか考慮してませんね。

これは将来のバージョンで修正されると思うのですが、こういうバグもあるので、サイズを変換したい場合はnode_size_transfではなく、自分で元のデータを修正してform_nxに渡した方が良いでしょう。

さらに、便利な機能なのですがノードのサイズやエッジの太さ以外の属性については、一通り全部コピーしてくれます。これを使って、可視化するときに設定したい情報などをNetowrkXのグラフオブジェクトの時点で設定しておくことも可能です。これはもちろん、pyvisのネットワークに変換してから付与してももちろん大丈夫なのですが。

ただ、例えばノードのクラスタリング結果を可視化時の色に反映させたい等の、何かしらのアルゴリズムの結果を可視化に使いたい場合は、NetworkXの時点で設定する方がやりやすいことが多いです。ただ、可視化時点でしか必要のない情報をNetworkXのオブジェクトに付与していくことに抵抗がある人もいるかもしれないので好みの問題だと思います。

基本的な話なのですが、以下の様にして属性を付与していけます。ノードを1つ、赤い三角形にしてみたり、エッジの一つを黒く塗ってラベルつけたりしています。

# 事前にグラフにいろいろ属性を設定できる。
nx_graph.nodes["a"]["color"] = "red"
nx_graph.nodes["a"]["shape"] = "triangle"

nx_graph.edges[("b", "c")]["color"] = "black"
nx_graph.edges[("b", "c")]["label"] = "hogehoge"

# 以降の Networkオブジェクトを作って from_nxするところは同じ。

以上が from_nx の説明です。とても便利なのでぜひ使ってみてください。

逆に、pyvisのNetworkをNetworkXのグラフに変換するメソッドは無いのかな、とも思ったのですが、専用のものはなさそうですね。まぁ、それぞれのライブラリの用途を考えれば必要もなさそうですし、どうしてもやりたければノードとエッジの情報をそれぞれ取り出してNetworkXのグラフを構築すればいいだけの話なのでそんなに難しくもなさそうです。

NetworkXでグラフが平面グラフかどうか判定する

諸事情ありグラフが平面グラフかどうか手軽に判定する方法が必要になったので、その方法を調べました。(正確には、僕が調べたかったのは平面的グラフかどうかですが。)

平面グラフの定義についてはWikipediaがわかりやすいです。
参考: 平面グラフ – Wikipedia

簡単にいうと、グラフを2次元の図に可視化したときに、辺が交差しない様に書いたものが平面グラフです。そして、グラフによっては描き方によって辺が交差したりしなかったりするのですが、平面グラフと同型なグラフを平面的グラフと呼びます。
どうにか頑張れば辺が交差しない様に頂点を配置できるグラフが平面的グラフ、どうやってもどこかで交差してしまうグラフが平面的で無いグラフ、と考えた方がわかりやすいでしょう。

これをどうやって判定しようかなと考えていたのですが、networkxに専用のメソッドが用意されていました。
check_planarity
is_planar

is_planar の方がシンプルで、NetworkXのグラフオブジェクトを渡すとそれが平面的だったかどうかでTrue/Falseを返してくれます。
check_planarity の方は、平面的だったかどうかの情報だけでなく、埋め込みの情報も取得できます。これは、ノードをどの様に配置したら平面的になるかを示すものです。

is_planar の方が結果がシンプルなので速度面で早いとか何かメリットあるのかなと思っていたのですが、NetworkXのソースコードを見ると、内部でcheck_planarityを呼び出して、一個目の戻り値だけを返すという作りだったのでそういうメリットはなさそうです。

とりあえず、結果がシンプルなis_planarの方を使ってみます。

import networkx as nx


# 頂点4個の完全グラフは平面的
K4 = nx.complete_graph(4)
print(nx.is_planar(K4))
# True

# 頂点5個の完全グラフは平面的では無い
K5 = nx.complete_graph(5)
print(nx.is_planar(K5))
# False

簡単ですね。

check_planarityの方を使う場合は、戻り値として判定結果と埋め込み情報が返ってくるので、それぞれ個別に受け取ります。

is_planar, P = nx.check_planarity(K4)

print(is_planar)
# True
print(P)
# PlanarEmbedding with 4 nodes and 12 edges
# これは、ノードをキーにして辞書の様にして使うと中身を見れる。
print(P[0])
# {1: {'cw': 3, 'ccw': 2}, 2: {'cw': 1, 'ccw': 3}, 3: {'cw': 2, 'ccw': 1}}
# 上記の結果で、ノード0に、1, 2, 3と繋がるエッジがあることがわかる。
print(P[0][1])
# {'cw': 3, 'ccw': 2}
# これは、ノード0に他のノードがどの様な順番で繋がっているかを示し、
# ノード1の時計回りの隣が3,反時計回りの隣が2であることを示す。

注意しないといけないのは、平面的グラフだからといって描写したら平面グラフで描かれるとは限らないという点です。というか、平面グラフで描かれる方が稀です。さっきの例で挙げた頂点4個の完全フラフでさえ、普通に力学モデルで配置したらエッジが交差します。

そのため、NetworkXでは平面グラフ描写用のメソッドも持っています。
参考: planar_layout — NetworkX 3.1 documentation

spring_layoutと比較してみましょう。なぜか、with_labels引数のデフォルト値が違うのでTrueつけないとノードのidを表示してくれません。

import matplotlib.pyplot as plt


fig = plt.figure(figsize=(12, 6), facecolor="w")
ax = fig.add_subplot(121, title="spring_layout")
ax.axis("off")
nx.draw_networkx(K4, ax=ax)

ax = fig.add_subplot(122, title="planar_layout")
nx.draw_planar(K4, ax=ax, with_labels=True)

出てくる図がこちら。

draw_planar を使わなくても、他のアルゴリズム同様にlayout系のメソッドで座標を取得してそれで描写することもできます。
参考: planar_layout — NetworkX 3.1 documentation

pos = nx.layout.planar_layout(K4)
print(pos)
"""
{0: array([-1.   , -0.375]),
 1: array([ 1.   , -0.375]),
 2: array([0.   , 0.125]),
 3: array([0.   , 0.625])}
"""

nx.draw_networkx(K4, pos=pos)
# 結果略

Pythonですでに宣言されている変数名や関数名などを取得、確認する

ある変数がすでに宣言されているかどうかで処理を分けるにはどうしたら良いか調べたので記事にしておきます。また、変数以外にも組み込み関数名の一覧の確認方法とか、あるオブジェクトが持ってるプロパティの取得方法とか似てる話題をまとめておきます。

コードを雑に書いてると、if文の制御の中で変数を定義することがあります。そうなるとその以降の処理でその変数が宣言されているかどうかが不明になりますね。例えば、どこからかデータの取得を試みて、成功したらpandasのDataFrameにして、dfとかそれらしい名前の変数に格納するけど、取得失敗したらdf変数が宣言されていない状態になるみたいなケースです。

ここから紹介する内容を否定する様なことを先に書いておきますが、この様な状況では、df = pd.DataFrame() みたいに空っぽのデータフレームか何か作って確実に変数が宣言されている状態にして、その後データが取れたらdfを置き換えて、以降はlen(df)が0か1以上かで処理を分けるみたいな回避策をとった方が確実にバグが少なく可読性の高いコードが書けそうです。

せっかく調べたので記事は書きますが、豆知識くらいに思って実際は別の回避策を検討するのがおすすめです。

それではやっていきましょう。結論として、Pythonの組み込み関数の中に、宣言されている変数の一覧(シンボルテーブル)を取得するメソッドがあります。グローバルレベルで結果を返してくれるものと、関数内等で使えばローカル変数だけ返してくれるもの(モジュールレベルで使うとグローバルと同じ結果)の二つがあります。

参考: 組み込み関数 — Python 3.10.11 ドキュメント の globals()とlocals()を参照。

イメージを掴むには使ってみると早いです。辞書型で結果を返してくれます。

$ python
>>> print(globals())
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>}
# 変数を一つ宣言してみる。
>>> foo = 4
# 最後に含まれている。
>>> print(globals())
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, 'foo': 4}

# この様にして 変数 foo がすでに宣言されていることがわかる。
>>> "foo" in globals()
True

# locals() はスコープ内のローカル変数だけを持っている。
>>> def dummy_func():
...     func_var = 1
...     print(locals())
...
>>> dummy_func()
{'func_var': 1}

# スコープの外では参照できないので、globals()の結果には含まれていない。
>>> "func_var" in globals()
False

Jupyter等で長い長い作業を行ってたら、新しい変数を使うときにその時点で使ってないかの確認に使ったりとかできるかなぁとも思ったのですが、正直これを使わずにすむ様な回避策を探った方が良いと思います。僕自身、6年以上Python書いてますが、これまでglobals()やlocals()が必須な状況にはなった事ありませんし。

さて、以上で自分で宣言した変数やメソッド名の一覧(元々存在している__name__など含む)の取得方法がわかりました。

ここから先は元の主題から外れますが興味があったので調べた内容のメモです。

Pythonに限らずプログラミング言語では、あらかじめ予約語として抑えられていて使えない単語が複数あります。ifとかforとかfromとかですね。これらの名前は先ほどのglobals()の結果では出てきませんでした。

これらの予約語については、確認するための専用ライブラリが標準で用意されています。
参考: keyword — Python キーワードチェック — Python 3.11.3 ドキュメント

キーワードと、3個だけですがソフトキーワドの2種類あります。

import keyword


print(keyword.kwlist)
"""
['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await',
 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except',
 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is',
 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try',
 'while', 'with', 'yield']
"""
print(keyword.softkwlist)
"""
['_', 'case', 'match']
"""

意外と少ないですね。

今の時点で、 sum や len など普段よく使っている組み込み関数たちが登場してないので、これらの情報がどこかで取得できないかも調べました。結果わかったのは、__builtins__ ってモジュールの配下に定義されているってことです。dirメソッドで確認できます。組み込みエラーとかもここにあるんですね。

dir(__builtins__)
['ArithmeticError',
 'AssertionError',
 'AttributeError',
 'BaseException',
 'BaseExceptionGroup',
# 中略
 'abs',
 'aiter',
 'all',
 'anext',
 'any',
 'ascii',
 'bin',
 'bool',
# 中略
 'enumerate',
 'eval',
 'exec',
 'execfile',
 'filter',
 'float',
 'format',
# 中略
 'str',
 'sum',
 'super',
 'tuple',
 'type',
 'vars',
 'zip']

sumとかabsとかstrとかお馴染みさんたちがいましたね。

ここで使いましたが、dir()は引数で渡したオブジェクトやモジュールが持っている属性やメソッドを羅列してくれる組み込み関数です。これもメソッド探しで使うことがあります。
参考: dir([object])

__builtins__ は省略してもメソッドにアクセスできるので通常は使うことはないし、これを使わなきゃいけない様な状況も作るべきでは無いと思いますが、無理やり活用するとしたら組み込み変数名を上書きしちゃったときでも元の機能が呼び出せます。

# 元々は足し算
sum([1, 2, 3])
# 6

# 予約後と違って組み込み変数は上書き出てきてしまう。
sum = sum([1, 2, 3])
# これでsumの中身が数値6になったので、ただのsumは関数として使えなくなった。
print(sum)
# 6

# __builtins__.sum は元通り足し算として機能する
__builtins__.sum([1, 2, 3])
# 6

まぁ、常識的に考えてこんな使い方をしなくてすむ様にコードを書くべきだと思いますね。和の変数名をsumにしたり、最大値の変数名をmaxにしたりして組み込み関数を上書きしてしまった経験は初心者時代にありますが。

boto3のAPIエラー発生時のリトライ回数の設定を変更する

先日、boto3でAmazon Comprehendを使っていたら、大量のデータを処理する中で次のエラーが頻発する様になってしまいました。

ClientError: An error occurred (ThrottlingException) when calling the BatchDetectSentiment operation (reached max retries: 4): Rate exceeded

短時間に実行しすぎてレートの上限に引っかかり、リトライも規定回数失敗してしまった様です。

Comprehend の使い方自体は過去の記事で書いてます。
参考: Amazon Comprehend でテキストのセンチメント分析

軽く紹介しておくとこんな感じですね。

import boto3


comprehend = boto3.client("comprehend")
comprehend_result = comprehend.batch_detect_sentiment(
        TextList=text_list,  # ここで判定したいテキストを渡す。
        LanguageCode="ja"
    )

応急処置的な対応としては、エラーをキャッチして自分でリトライを実装するという手もあります。
参考: 失敗しやすい処理にリトライをスクラッチで実装する

ただ、boto3はどうやら標準機能でリトライ設定の回数を設定できる様なのです。batch_detect_sentiment メソッドのドキュメントをいくら読んでもそれらしい引数がないのでできないと思ってました。
この設定は、その一歩手前、クライアントインスタンスを生成する時点で設定しておく必要があります。

ドキュメントはこちら。
参考: Retries – Boto3 1.26.142 documentation

ドキュメントでは ec2 のAPIに適用していますが、comprehendでも同じ様に使えます。以下の様にするとリトライ回数の上限を10回に上げられる様です。

import boto3
from botocore.config import Config

config = Config(
   retries = {
      'max_attempts': 10,
      'mode': 'standard'
   }
)

comprehend = boto3.client('comprehend', config=config)

mode は、 legacy, standard, adaptive の3種類があります。とりあえず、standardを選んだら良いのではないでしょうか。対応しているエラーがlegacyより多く、以下のエラーに対してリトライしてくれます。

# Transient errors/exceptions
RequestTimeout
RequestTimeoutException
PriorRequestNotComplete
ConnectionError
HTTPClientError

# Service-side throttling/limit errors and exceptions
Throttling
ThrottlingException
ThrottledException
RequestThrottledException
TooManyRequestsException
ProvisionedThroughputExceededException
TransactionInProgressException
RequestLimitExceeded
BandwidthLimitExceeded
LimitExceededException
RequestThrottled
SlowDown
EC2ThrottledException

max_attempts の方が、リトライ回数の設定ですが、正直、何回に設定したら成功する様になるのかは状況による部分が多く、僕もまだ良い策定方法がわかっていません。(というより、これまでこんなに失敗することがなかった)。一応、responseのメタデータの中に、RetryAttemptsって項目があって、何回目の再実行で成功したかとか見れるのですが、ほとんど成功するので大体これ0なんですよね。ここを監視してたら参考指標になるのかもしれませんが、これを監視できる実装にするのは面倒です。基本的には成功するものですから。

もし、上に列挙したエラーのどれかが発生してboto3の実行が止まってしまうよ、という状況が発生したら、リトライ回数の設定変更を検討に入れてみてください。

ただ、リトライというのはあくまでも同じ処理を繰り返すだけなので、百発百中で失敗する様な処理をリトライしても無意味です。ほとんど確実に成功するのに超低確率で失敗する事象が起きてしまう場合に、その失敗確率をもっと下げるという用途でのみ有効です。

Louvain法のmodularityの式について検証してみる

前回に引き続き、Louvain法の話です。前回はPythonで使う方法を紹介しましたが、今回はこの手法が利用しているモジュラリティ(modularity)という指標について検証していきます。そもそもなぜこの式でグラフのノードのクラスタを評価できるのかってのを確認しましょう。
(ちなみに、モジュラリティを最大化するアルゴリズムについては今回の記事では取り上げません。あくまでもモジュラリティの定義について見ていきます。)

前回の記事でも書いてるのですが今回の記事では主役なので定義式を再掲します。

$$Q=\frac{1}{2m}\sum_{ij}\left[A_{ij}\ -\ \frac{k_ik_j}{2m} \right]\delta(c_i, c_j).$$

$A_{ij}$: ノード$i$, ノード$j$間のエッジの重み。
$k_i, k_j$: ノード$i$, ノード$j$それぞれに接続されたエッジの重みの合計。
$m$: グラフの全てのエッジの重みの合計。
$c_i, c_j$: ノード$i$, ノード$j$それぞれが所属しているコミュニティ。
$\delta$: クロネッカーのデルタ。二つの引数が等しければ1でそれ以外は0を返す関数。

一見しただけだと、これでクラスタを評価できるってわかりにくいですよね。

順番に見ていきましょう。

まず、$\delta(c_i, c_j)$の部分です。これ、$c_i$と$c_j$が別のクラスタだったら値が$0$なので、$\sum$のそれらの項は消滅し、さらに同じクラスタだったら値が$1$になるので、それらの項はそのままの値で残ります。

なのでこの式はこうなります。

$$Q=\frac{1}{2m}\sum_{c_iとc_jが同じクラスタとなるi,j}\left[A_{ij}\ -\ \frac{k_ik_j}{2m} \right].$$

そして、$A_{ij}$はエッジの重みなので、和の中身の前半部分だけに着目すると、次の様に言えます。

$$\frac{1}{2m}\sum_{c_iとc_jが同じクラスタとなるi,j}A_{ij} = \frac{1}{2m}\{クラスタ内部のエッジの重みの総和\}$$

$m$はクラスタに関係なく、グラフによって定まってる定数なので無視して良いですから、この部分は確かに同じクラスタの中にたくさんエッジがあると値が大きくなり評価指標としての意味を持っている様に見えます。

ただし、この項しかないと、全部のノードを1個のクラスタとした場合に$Q$が最大となっしまうので、それに対応する必要があります。そこで登場するのが残りの項です。

もし、ノード$i$とノード$j$の間にエッジがなかったりあってもウェイトが非常に小さかったりすると、負の数を足すことによってモジュラリティが下がる様になってるのです。たとえば、エッジがない場合、$A_{ij}$が0ですから、$A_{ij}\ -\ \frac{k_ik_j}{2m} = -\frac{k_ik_j}{2m} $です。

そして、$k_i$、$k_j$はそれぞれのノードに接続されたエッジの重みの和ですから、重みの大きなエッジをたくさん持っているノード間についてはこの値の絶対値は大きくなります。

つまり、エッジがたくさんあるノード同士にも関わらずそれらのノード間にエッジがなかったら強くペナルティを課すよ、ってのがこの項の意味なのです。
分母の$2m$とかにもちゃんと意味があるのですが細かい話になるので割愛します。これらの細かい工夫により、-0.5から1の値を取る指標となっています。計算量とのバランスもとりながら非常によく考えられた指標だと思いました。

最後に、前回の記事でも少しだけ触れているのですが、この$\sum_{ij}$が取る和の$ij$部分について確認したのでその話書いておきます。

これは$i$と$j$がそれぞれ独立に全てのパターンを取ります。$(i, j)$と$(j, i)$は個別に足されるし、$(i, i)$も考慮されるってことですね。ライブラリの結果と、自分で実装した結果を並べてみて確認しました。とりあえず復習兼ねて前回のサンプルのグラフでやります。まずライブラリ利用。

import numpy as np
import networkx as nx
import community

# 前回の記事で生成したのと同じグラフを再現する。
# 30個のノード
node_list = list(range(30))

# 同じクラスタ内は0.5, 別クラスタは0.02の確率でエッジを生成する
edge_list = []
np.random.seed(1)  # シード固定
for i in range(30):
    for j in range(i+1, 30):
        if i // 10 == j // 10:
            if np.random.rand() < 0.5:
                edge_list.append((i, j))
        else:
            if np.random.rand() < 0.02:
                edge_list.append((i, j))

# グラフ生成
G = nx.Graph()
G.add_nodes_from(node_list)
G.add_edges_from(edge_list)

# コミュニティーの検出
partition = community.best_partition(G)

# ライブラリによるモジュラリティの計算
print(community.modularity(partition, G))
# 0.5316751700680272

同じ値を自分で計算して出してみましょう。エッジのウェイトが全部1なので、$m$はただのエッジの本数になるし、$k_i$, $k_j$はそれぞれのノードの次数に等しいことに注意してください。

Q = 0
m = G.size()
# iと jは全部の組み合わせをとる。
for i in range(len(G)):
    for j in range(len(G)):
        if partition[i] == partition[j]:
            # ノードi と ノードjの間にエッジがあったら1(ウェイト)を足す
            if (i, j) in G.edges:
                Q += 1 
            
            # Σの後半部分
            Q -= G.degree(i) * G.degree(j) / (2*m)
# 全体を2mで割る
Q/=2*m
print(Q)
# 0.5316751700680273

一致してますね。

ここから余談ですが、Qは -0.5から 1の値を取りますが、どんなグラフについてもQが-0.5を取ったり1を取ったりする分割が存在するわけではなさそうです。上記のサンプルでも0.53程度に留まっていますしね。

-0.5 の方は、完全2部グラフを構築して、さらにその2部をそれぞれコミュニティとすることで、同じクラスタ内部には1本もエッジがないのに別クラスタのノード間には必ずエッジが存在するという状態にすると発生します。

また、逆にQが大きくなるのは非連結な多数の完全グラフからなるグラフをそれぞれの完全グラフを一つのクラスタとすると、非連結なグラフが増えるにつれて1に近づいていくのを確認できています。ただ、近づくだけで1になることはないんじゃないでしょうか。(証明はできていませんが。)

便利なのでなんとなく使っていた手法でしたが、改めて指標を検証し複数のグラフや分割について自分で計算してみたことで理解を深めることができました。

モジュラリティは定義中に記号多いですしぱっと見難しそうに見えるのですが、落ち着いて見れば四則演算だけで構成されている非常にシンプルな指標なので、興味があれば皆さんもいろいろ試して遊んでみてください。

Louvain法を用いてグラフのコミュニティーを検出する

昔、グラフ関係の記事を書いてた頃に書いてたと思ってたら、まだ書いてなかったので、グラフのコミュニティーを検出する方法を書きます。

グラフのコミュニティ検出というのは、グラフのノードを複数のグループに分け、同じグループ内のノード同士ではエッジが密で、異なるグループのノード間ではエッジが疎になる様にすることを言います。人の集まりを仲良しグループでいくつかの組に分けるのに似ていますね。この記事の最後にも結果の例の画像をつけていますのでそれをみていただくとイメージがつきやすいと思います。

複数のアルゴリズムが存在するのですが、僕が一番気に入っていてよく使っているのはLouvain法による方法なのでこれを紹介していきます。
参考: Louvain method – Wikipedia

このLouvain法では、グラフとコミュニティに対してモジュラリティ(modularity)という指標を定義し、このモジュラリティが最大になる様なコミュニティの分け方を探すことでグラフのノードを分割します。モジュラリティの定義式はWikipediaにもありますが以下の通りです。

$$Q=\frac{1}{2m}\sum_{ij}\left[A_{ij}\ -\ \frac{k_ik_j}{2m} \right]\delta(c_i, c_j).$$

ここで、各記号の意味は以下の通りです。
$A_{ij}$: ノード$i$, ノード$j$間のエッジの重み。
$k_i, k_j$: ノード$i$, ノード$j$それぞれに接続されたエッジの重みの合計。
$m$: グラフの全てのエッジの重みの合計。
$c_i, c_j$: ノード$i$, ノード$j$それぞれが所属しているコミュニティ。
$\delta$: クロネッカーのデルタ。二つの引数が等しければ1でそれ以外は0を返す関数。

元論文を読めていないのですが、ライブラリの挙動を確認した結果によると、和の$ij$は$i$, $j$の全てのペアを渡る和で$i,j$の組み合わせとそれを入れ替えた$j, i$の組み合わせを両方取り、さらに$i=j$となる自己ループなエッジも考慮している様です。

さて、実際にやってみましょう。Pythonでは、communityというライブラリがあり、Louvain法が実装されています。(networkx自体にも他のコミュニティ検出のアルゴリズムが実装されているのですが、Louvain法は別ライブラリになっています。)

pipyでの登録名がpython-louvainなので、インストール時のコマンドが以下であることに注意してください。Pythonでimportするときと名前が違います。

$ pip install python-louvain

ドキュメントはこちら
参考: Community detection for NetworkX’s documentation — Community detection for NetworkX 2 documentation

早速使ってみましょう。例としてコミュニティがわかりやすいグラフを乱数で生成します。
0~29のノードを持ち、0~9, 10~19, 20〜29が同じコミュニティで、同コミュニティー内では50%の確率でエッジが貼られていて異なるミュにティだと2%の確率でしかエッジがないという設置です。

import numpy as np
import networkx as nx
import community

# 30個のノード
node_list = list(range(30))

# 同じクラスタ内は0.5, 別クラスタは0.02の確率でエッジを生成する
edge_list = []
np.random.seed(1)  # シード固定
for i in range(30):
    for j in range(i+1, 30):
        if i // 10 == j // 10:
            if np.random.rand() < 0.5:
                edge_list.append((i, j))
        else:
            if np.random.rand() < 0.02:
                edge_list.append((i, j))

# グラフ生成
G = nx.Graph()
G.add_nodes_from(node_list)
G.add_edges_from(edge_list)

さて、グラフができたのでコミュニティ検出をやっていきます。これものすごく簡単で、best_partitionというメソッドにnetworkxのグラフオブジェクトを渡すとノードと所属するグループの辞書として結果が帰ってきます。

注意点ですが、アルゴリズムの高速化のための工夫の弊害により絶対にベストな分割が得られるというわけではなく少々確率的な振る舞いをします。今回のサンプルコードでは非常にわかりやすい例を用意したので一発で成功していますが、通常の利用では何度か試して結果を比較してみるのが良いでしょう。

ではやっていきます。

partition = community.best_partition(G)

print(partition)
"""
{0: 2, 1: 2, 2: 2, 3: 2, 4: 2, 5: 2, 6: 2, 7: 2, 8: 2, 9: 2,
10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, 16: 0, 17: 0, 18: 0, 19: 0,
20: 1, 21: 1, 22: 1, 23: 1, 24: 1, 25: 1, 26: 1, 27: 1, 28: 1, 29: 1}
"""

0〜9は2で、10〜19は0で、20〜29は1と、綺麗に分類できてますね。

画像に表示してみましょう。ドキュメントのサンプルコードだとlist(partition.values())と辞書の値をそのまま可視化メソッドに渡していますが、辞書型のデータなので順序は保証されておらずこれはリスキーな気がします。実はそれでもうまくいくのですが念のため、ノードと順番が揃う様に配列として取り出してそれを使う様にしました。

# ノードの並びと同じ順でグループのリストを配列として取得する。
partition_list = [partition[n] for n in G.nodes]
# 配置決め
pos = nx.spring_layout(G, seed=3)
# 描写
nx.draw_networkx(G, node_color=partition_list, cmap="tab10", pos=pos)

結果がこちらです。

綺麗に3グループに分かれていますね。

ちなみに、このグループ分けのモジュラリティを取得するメソッドもあります。グループ分けとグラフ本体のデータをそれぞれ渡すことで使えます。

print(community.modularity(partition, G))
# 0.5316751700680272

結構高いですね。