PythonのライブラリでBar Chart Raceを作ってみた

皆さんもどこかでご覧になったことがあると思うのですが、項目ごとに増加し続けるデータの面白い可視化方法として、Bar Chart Race というものがあります。
棒グラフがグイグイ伸びて順位を争っているようなアニメーションですね。

その、Bar Chart Race をPythonで手軽に作れるライブラリを見つけたので今回の記事ではそれを紹介します。
その名も、 bar_chart_race です。 そのままですね。
公式ドキュメントはこちらになります。
Bar Chart Race

インストールはpipでもcondaでも可能です。(僕はcondaで入れました)
次の2行のコードのどちらかを実行してください。

pip install bar_chart_race
conda install -c conda-forge bar_chart_race

使い方は非常に簡単で、ライブラリをインポートしたら、bar_chart_raceというメソッドにデータと保存するファイル名を渡すだけです。

実際にデータを渡す前に、どんなデータを渡せばいいのか確認しておきましょう。
このライブラリ自体にサンプルデータとして国別のコロナウィルス感染者数のデータが同梱されているので、それをみてみます。

import bar_chart_race as bcr


# サンプルデータ読み込み
df = bcr.load_dataset('covid19')

# データのサイズ
print(df.shape)
# (57, 20)

print(df.index[: 5])
"""
DatetimeIndex(['2020-02-26', '2020-02-27', '2020-02-28', '2020-02-29',
               '2020-03-01'],
              dtype='datetime64[ns]', name='date', freq=None)
"""
print(df.columns[: 5])
"""
Index(['Belgium', 'Brazil', 'Canada', 'China', 'France'], dtype='object')
"""

上記の通り、インデックスに日付、カラムに国名(比較するアイテム)を持ったデータフレームがこのライブラリが想定しているデータ形式のようです。

早速、Bar Chart Raceを作ってみましょう。まずはメソッドに単純に渡して初期設定の出来を見てみます。

bcr.bar_chart_race(
    df=df,
    filename='covid19_default.mp4',
)

初期設定でも十分な見栄えのBar Chart Race ができましたね。

さらにこのライブラリは、見た目を変えたい場合に備えて、非常に多くのオプションが用意されています。
公式ドキュメントに色々設定を加えたバージョンも出てるのでそれも見ておきましょう。

bcr.bar_chart_race(
    df=df,
    filename='covid19_horiz.mp4',
    orientation='h',
    sort='desc',
    n_bars=6,
    fixed_order=False,
    fixed_max=True,
    steps_per_period=10,
    interpolate_period=False,
    label_bars=True,
    bar_size=.95,
    period_label={'x': .99, 'y': .25, 'ha': 'right', 'va': 'center'},
    period_fmt='%B %d, %Y',
    period_summary_func=lambda v, r: {'x': .99, 'y': .18,
                                      's': f'Total deaths: {v.nlargest(6).sum():,.0f}',
                                      'ha': 'right', 'size': 8, 'family': 'Courier New'},
    perpendicular_bar_func='median',
    period_length=500,
    figsize=(5, 3),
    dpi=144,
    cmap='dark12',
    title='COVID-19 Deaths by Country',
    title_size='',
    bar_label_size=7,
    tick_label_size=7,
    shared_fontdict={'family' : 'Helvetica', 'color' : '.1'},
    scale='linear',
    writer=None,
    fig=None,
    bar_kwargs={'alpha': .7},
    filter_column_colors=False)  

これの出力がこちらです。

引数名を見れば各引数が何を設定しているのかは大まかにわかるのではないかと思います。
また、公式ドキュメントのAPIリファレンスにも解説が充実しています。

Matplotlibで多角形や円などの図形を描写する

Matplotlibのグラフに図形を入れる方法の紹介です。
(強調したい部分に丸や四角で目印をつけたり矢印を引いたりできます。)

Matplotlibのグラフに図形を入れるには、 matplotlib.patches の下に定義されている各種クラスを使います。
長方形は、 matplotlib.patches.Rectangle,
円は、 matplotlib.patches.Circle, など、図形に応じたクラスが用意されているので、
それぞれインスタンスを作成し、
ax.add_patch() で作ったインスタンスをグラフに挿入します。

Rectangle は左下の座標と幅と高さ、Circleは中心の座標と半径など、それぞれ固有のオプションがあり、それを指定することで思い通りの位置とサイズの図形を作れます。
詳しくはドキュメントの各クラスの説明をご参照ください。
参考: matplotlib.patches — Matplotlib 3.4.2 documentation

これらのクラスは全て、
matplotlib.patches.Patch
を継承して実装されています。
塗りつぶしの色や線のスタイルの指定、模様を入れるなど、汎用的な引数の説明は、matplotlib.patches.Patchのページに説明があるのでこちらも合わせて参照すると良いでしょう。

全てを紹介はしませんが、いくつかの図形を実際に書いてみたコードが以下です。


import matplotlib.pyplot as plt
from matplotlib import patches


fig = plt.figure(facecolor="w")
ax = fig.add_subplot(1, 1, 1, aspect="equal")

# 長方形
patch = patches.Rectangle(
    xy=(20, 10),  # 左下の頂点の座標
    width=30,  #  長方形の幅
    height=50,  # 長方形の高さ
    angle=10,  # 傾き
    facecolor="b",  # 塗りつぶしの色
    edgecolor="c",  # 辺の色
    linewidth=3,  # 辺の線幅
    hatch=".",  # 塗りつぶしの模様 {'/', '\', '|', '-', '+', 'x', 'o', 'O', '.', '*'}
    # 辺のスタイル  {'-', '--', '-.', ':', '', (offset, on-off-seq), ...}
    linestyle="-.",
    fill=True,  # 塗りつぶしあり。Falseにすると塗りつぶし無し。
)
ax.add_patch(patch)

# 円
patch = patches.Circle(
    xy=(80, 40),  #  中心
    radius=20,  # 半径
    fill=False,  # 塗りつぶし無し
)
ax.add_patch(patch)

# 矢印
patch = patches.Arrow(
    x=110,  # 始点のx座標
    y=20,  # 始点のy座標
    dx=60,  # x軸方向の長さ。 x+dx が終点のx座標。
    dy=20,  # y軸方向の長さ。 y+dy が終点のy座標。
    width=30,  # 矢印の幅
)
ax.add_patch(patch)

# 楕円
patch = patches.Ellipse(
    xy=(40, 90),  # 中心
    width=50,  # 横幅
    height=20,  # 高さ
    angle=-30,  # 傾き
)
ax.add_patch(patch)

# 多角形
patch = patches.Polygon(
    # 頂点の座標を n*2 次元配列で指定
    xy=[
        [80, 100],
        [80, 80],
        [110, 70],
        [120, 90],
        [100, 110]
    ]
)
ax.add_patch(patch)

# 正多角形
patch = patches.RegularPolygon(
    xy=(140, 70),  # 中心の座標
    numVertices=7,  # 頂点の数
    radius=20,  # 半径
    orientation=10,  # 角度
)
ax.add_patch(patch)

ax.autoscale()
plt.show()

patchたちを追加した最後に、 ax.autoscale() して、挿入した図形たちがグラフの描写範囲に収まるように調整しています。
これをしないと、グラフの描写範囲がデフォルトの x座標の区間[0, 1]、y座標の区間[0, 1] のままになってしまい、せっかく描いた図形が見えなくなってしまいます。

上記のコードで、以下の図が出力されます。

参考として長方形だけ、facecolor、edgecolorなど、色々指定しましたが、
これは継承元のPatchで定義されているので、もちろん他の図形でも指定できます。

PythonでBase64エンコードとデコード

Base64というのは、データ(バイト列)を、64種類の文字(と、パディング用の”=”を含めた65文字)で表現するエンコード方法です。
64種類の文字の内訳はアルファベット小文字(a-z)26種類、大文字(A-Z)26種類、数値(0-9)10種類、記号(+,/)2種類です。
文字データしかやり取りのできないプロトコルで一般のデータを送受したりするために使われます。

詳しくはWikipediaをご参照ください。
参考: Base64 – Wikipedia

今回の記事は、このBase64のエンコードをPythonで実装する方法の紹介です。

Pythonには標準ライブラリに専用のモジュールが用意されています。
参考: base64 — Base16, Base32, Base64, Base85 データの符号化 — Python 3.9.4 ドキュメント

まず、文字列をbase64エンコーディングしてみましょう。
使い方は簡単で、base64.b64encodeに、バイトデータを渡してあげるだけです。
string型のデータはそのままでは受け取れない(エラー:a bytes-like object is required, not ‘str’ が発生する)ので、
元のテキストをencode()メソッドを使ってbyteデータに変換するのがポイントです。


import base64


text = "ハローワールド!"
print(base64.b64encode(text.encode()))
# b'44OP44Ot44O844Ov44O844Or44OJ77yB'

出力結果が b’〜’ となっているのでわかる通り、結果はbyte型で得られます。
base64の結果を文字列で欲しい時は、decode()する必要があります。


print(base64.b64encode(text.encode()).decode())
# 44OP44Ot44O844Ov44O844Or44OJ77yB

逆に、Base64のデータを、元のデータに戻したい場合は、base64.b64decode()を使います。
不思議なことに、このメソッドは、string型のデータもbyte型のデータも両方受け取ってくれます。
結果はbyte型で受け取ることになるので、元の文字列型のデータとして結果を得たい場合は、改めてdecode()する必要があります。


# byteを渡した場合
print(base64.b64decode(b'44OP44Ot44O844Ov44O844Or44OJ77yB'))
# b'\xe3\x83\x8f\xe3\x83\xad\xe3\x83\xbc\xe3\x83\xaf\xe3\x83\xbc\xe3\x83\xab\xe3\x83\x89\xef\xbc\x81'
print(base64.b64decode(b'44OP44Ot44O844Ov44O844Or44OJ77yB').decode())
# ハローワールド!

# stringを渡した場合
print(base64.b64decode('44OP44Ot44O844Ov44O844Or44OJ77yB'))
# b'\xe3\x83\x8f\xe3\x83\xad\xe3\x83\xbc\xe3\x83\xaf\xe3\x83\xbc\xe3\x83\xab\xe3\x83\x89\xef\xbc\x81'
print(base64.b64decode('44OP44Ot44O844Ov44O844Or44OJ77yB').decode())
# ハローワールド!

以上で、文字列データをBase64エンコード/デコードできました。
ただ、Base64の本領は、文字列ではない一般のデータを文字列で表現できることにあります。
なので、サンプルとして画像データをBase64エンコードするコードも紹介しておきます。
といっても、やることは単純で、画像をバイナリとして読み込んで、文字列のときと同じメソッドに渡すだけです。

結果はものすごく長いデータになるので出力しませんが、以下のようなコードで、img_base64変数に、
sample.jpg ファイルをBase64した結果が格納されます。


with open("./sample.jpg", "rb") as f:
    img = f.read()

img_base64 = base64.b64encode(img)

元々がbyte型なので文字列の時より単純ですね。
逆変換も文字列の時と同様にbase64.b64decodeでできます。
逆変換した結果を別のファイル名で保存して、元の画像と同じものであることを確認しておきましょう。(結果省略)


with open("./sample2.jpg", "wb") as f:
    f.write(base64.b64decode(img_base64))

失敗しやすい処理にリトライをスクラッチで実装する

とあるSDKを使って実装している処理で、利用しているAPIのエラーが頻発するようになり、
エラーが起きたらリトライする処理をスクラッチで作る必要があったのでその時の実装をメモしておきます。

今回は利用しているSDKのメソッドにリトライ処理が内包されておらず、
気軽にライブラリを追加できる環境でもなかったのでスクラッチで実装しましたが、
通常は他の手段がないか探すことをお勧めします。

例えば、requestsのようなライブラリは引数でリトライ回数を指定できますし、
世の中にはリトライを行う専用のライブラリなども転がっています。

この記事のコードはそれらが使えなかった場合の最後の手段です。

実際の処理はお見せできないので、サンプルとして一定確率(80%)エラーになる関数を作っておきます。


import numpy as np


def main_function():
    num = int(np.random.choice([0, 1], p=[0.8, 0.2]))
    return 1/num

さて、これを成功するまでリトライする関数を作ります。
実装するにあたって定めた要件は以下の通りです。

– 規定回数リトライする。(今回は5回と定めました。)
– 初回の実行と合わせて、実際に実行を試みるのは、{1+リトライ回数}回。
– 発生したエラーは都度表示する。
– 規定回数全てエラーになったら、最後に発生したエラーを呼び出し元に返す。
– エラーから次のリトライまでは、1秒,2秒,4秒,8秒,…と間隔をあける。

書いてみたのが次のコードです。


from time import sleep


max_retry_count = 5  # リトライ回数

for retry_count in range(max_retry_count+1):
    try:
        print(main_function())  # 実行したい処理
        break  # 成功したらループを抜ける
    except Exception as e:
        print(e)
        if retry_count == max_retry_count:
            print(f"規定回数({max_retry_count}回)のリトライに失敗しました。")
            raise
        # {2^リトライ回数}秒待ちを入れる
        dely = 2**retry_count
        print(f"{dely}秒後にリトライします。{retry_count+1}/{max_retry_count}回目。")
        sleep(dely)

2回のリトライ(3回目の試行)で成功すると出力は以下のようになります。


division by zero
1秒後にリトライします。1/5回目。
division by zero
2秒後にリトライします。2/5回目。
1.0

最後まで失敗すると以下のようになります。


division by zero
1秒後にリトライします。1/5回目。
division by zero
2秒後にリトライします。2/5回目。
division by zero
4秒後にリトライします。3/5回目。
division by zero
8秒後にリトライします。4/5回目。
division by zero
16秒後にリトライします。5/5回目。
division by zero
規定回数(5回)のリトライに失敗しました。
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
 in 
      7 for retry_count in range(max_retry_count+1):
      8     try:
----> 9         print(main_function())  # 実行したい処理
     10         break  # 成功したらループを抜ける
     11     except Exception as e:

 in main_function()
      3 def main_function():
      4     num = int(np.random.choice([0, 1], p=[0.8, 0.2]))
----> 5     return 1/num

ZeroDivisionError: division by zero

ちゃんとエラーになりましたね。

PyMySQLのcursorclassについて

PyMySQLの公式ドキュメントのExamplesで使われている、
cursorclass=pymysql.cursors.DictCursor
の話です。


import pymysql.cursors

# Connect to the database
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='passwd',
    database='db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor # これの話。
)

ドキュメントにも詳しい説明はないし、pep249でも言及されていないようなのでPyMySQLのソースコードも含めて調べてみました。

結論から言えば、実用上は黙って cursorclass=pymysql.cursors.DictCursor を指定しておけばよく、この記事は無駄知識の類のものになります。

まず、cursorclass に指定できる値は pymysql.cursors.DictCursor 以外に何があるのかですが、
これは、こちらのファイルで定義されている4種類のクラスが指定できます。
Github: pymysql/cursors.py

– pymysql.cursors.Cursor
– pymysql.cursors.DictCursor
– pymysql.cursors.SSCursor
– pymysql.cursors.SSDictCursor

https://github.com/PyMySQL/PyMySQL/blob/master/pymysql/connections.py#L179
に、


class Connection:
    # 中略
    def __init__(
        # 中略
        cursorclass=Cursor,

とある通り、デフォルトは、 pymysql.cursors.Cursor です。

この4種類のカーソルの違いですが、
Cursor と SSCursor は結果をタプルで返し、
DictCursor と SSDictCursor は結果を辞書で返してくれます。
結果の形に、CursorとSSCursor、DictCursorとSSDictCursorの間にはそれぞれ違いはありません。

SSとつく二つの方ですが、これらは主にデータが非常に大きいときや、ネットワークが遅いときなどに使います。
SSCursorのコメントがわかりやすいですね。

Unbuffered Cursor, mainly useful for queries that return a lot of data,
or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this is the client uses much less memory,
and rows are returned much faster when traveling over a slow network
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn’t support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn’t
possible to scroll backwards, as only the current row is held in memory.

個人的な感想としては最近の端末には十分なメモリが搭載されていて、数百万行単位のレコードを扱うときも、SS無しの方で十分さばけているので、
とりあえず DictCursor を使って、本当にメモリ不足で困った時だけ SSDictCursor を検討したらいいのかなと思っています。

Cursor(タプル) と DictCursor(辞書)についてはそれぞれ実行して結果を比較しておきましょう。

まず、cursorclassにpymysql.cursors.Cursorを指定した(もしくは何も指定しなかった場合)の結果です。
テーブルは以前の記事で作ったやつをそのまま使います。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
print(result)
"""
(
    (1, 'webmaster@python.org', 'very-secret'),
    (2, 'sato@python.org', 'very-secret'),
    (3, 'suzuki@python.org', 'very-secret'),
    (4, 'takahashi@python.org', 'very-secret'),
    (5, 'tanaka@python.org', 'very-secret')
)
"""

1レコードごとに結果がタプルで戻っているだけでなく、fetchallすると戻り値はタプルのタプルになっていますね。
戻ってきた結果の各値が、SELECT句のどの列の値なのかが明示されていないので、自分でマッピングする必要があります。
正直これは少し使いにくいです。

続いて、pymysql.cursors.DictCursor を指定した場合の結果です。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
print(result)
"""
[
    {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'},
    {'id': 2, 'email': 'sato@python.org', 'password': 'very-secret'},
    {'id': 3, 'email': 'suzuki@python.org', 'password': 'very-secret'},
    {'id': 4, 'email': 'takahashi@python.org', 'password': 'very-secret'},
    {'id': 5, 'email': 'tanaka@python.org', 'password': 'very-secret'}
]
"""

ご覧の通り、1レコードごとに「列名:値」の辞書として値が得られ、それらの配列として結果が返されます。
各値がSELECT句のどの列のものなのかはっきりしているのでとても便利です。
また、このままpandasのデータフレームに変換することもできます。
通常はこれを使えば良いでしょう。

PyMySQLでまとめてデータをインサートする(executemanyを使う方法)

※この記事ではDBはAWSのAuroraを想定しています。

Auroraに多くのデータをまとめて登録したいとき、1レコードごとにINSERT文を書いて実行するのは少しイケてない思っていたので、
良い方法を探していたところ、 executemany というメソッドが用意されているのを知りました。
pep249 にも記載があるので、PyMySQL以外のライブラリでも用意されていると思います。

サンプルコード載せるに状況の説明です。
INSERTしたいデータは例えば次のようなものだとします。


print(df)
"""
           date      open      high       low     close
0    2020-07-08  22481.61  22667.95  22438.65  22438.65
1    2020-07-09  22442.30  22679.08  22434.38  22529.29
2    2020-07-10  22534.97  22563.68  22285.07  22290.81
3    2020-07-13  22591.81  22784.74  22561.47  22784.74
4    2020-07-14  22631.87  22677.02  22538.78  22587.01
..          ...       ...       ...       ...       ...
195  2021-04-23  28939.12  29035.34  28770.62  29020.63
196  2021-04-26  29095.49  29241.28  28896.37  29126.23
197  2021-04-27  29174.53  29187.11  28990.19  28991.89
198  2021-04-28  28935.51  29139.70  28875.91  29053.97
199  2021-04-30  28996.66  29046.49  28760.27  28812.63

[200 rows x 5 columns]
"""

そして、INSERT先のテーブルは以下です。
(connection は既にRDSへの接続が完了しているものとします。)


with connection.cursor() as cursor:
    cursor.execute("SHOW CREATE TABLE nikkei")
    result = cursor.fetchone()
    
print(result["Create Table"])
"""
CREATE TABLE `nikkei` (
  `date` date NOT NULL,
  `open` double NOT NULL,
  `high` double NOT NULL,
  `low` double NOT NULL,
  `close` double NOT NULL,
  PRIMARY KEY (`date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
"""

通常であれば、この200あるレコード一つ一つについて、for文で回して、それぞれINSERT文を発行するところなのですが、
cursor.executemany を使うと次のように(表向きは)一発でINSERTできます。
(おそらく内部では1レコードずつINSERT文が発行されているのですが。)


insert_sql = """
    INSERT INTO
        nikkei (
            date,
            open,
            high,
            low,
            close
        )
    VALUES
        (
            %s, %s, %s, %s, %s
        )
"""

with connection.cursor() as cursor:
    cursor.executemany(insert_sql, df.values.tolist())
    connection.commit()

PyMySQLではプレースホルダにformat(%s)とpyformat(%{name}s)を使える

タイトルを逆にいうとPyMySQLのプレースホルダには qmark(?), numeric(:1), named(:name) は実装されてないようです。

※ この記事は、 version 1.0.2 の時点の PyMySQLについて書いています。将来的に他のプレースホルダーも実装されるかもしれません。


!pip freeze | grep PyMySQL
PyMySQL==1.0.2

pep249では、paramstyle として次の5つが定められています。

名前 説明
qmark Question mark style WHERE name=?
numeric Numeric, positional style WHERE name=:1
named Named style WHERE name=:name
format ANSI C printf format codes WHERE name=%s
pyformat Python extended format codes WHERE name=%(name)s

PyMySQLはpep249に従って実装さているので、この5つが全部使えるのかなと思って試したら、そうなってないことに気づきました。
タイトルにも書きました通り、実装されているのは format と pyformat だけです。
とはいえ、実用的なことを考えると、各値に名前をつけないときは、format、名前をつけたいときはpyformatを使えばいいので、この二つで十分だと思います。

実際にやってみます。
テーブルは前の記事で作ったものをそのまま使います。

まずDBに接続します。接続情報は各自の環境の値をご利用ください。


import pymysql.cursors

con_args = {
    "host": "{RDSのエンドポイント/同サーバーのDB場合はlocalhost}",
    "user": "{ユーザー名}",
    "password": "{パスワード}",
    "database": "{DB名}",
    "cursorclass": pymysql.cursors.DictCursor
}

connection = pymysql.connect(**con_args)

まず、 format スタイルから。サンプルコードなどで頻繁に見かけるのはこれです。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=%s"
        cursor.execute(sql, ('webmaster@python.org',))
        result = cursor.fetchone()
        print(result)

# {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'}

続いて、 pyformat。 このサンプルコードではありがたみがないですが、多くのプレースホルダーを使うクエリでは可読性向上に期待できます。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=%(email)s"
        cursor.execute(sql, {"email": "webmaster@python.org"})
        result = cursor.fetchone()
        print(result)

# {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'}

qmark だと cursor.execute でエラーになります。 (エラーになるので例外処理入れてます。)


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=?"
        try:
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # not all arguments converted during string formatting

numeric も同様。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=:1"
        try:
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # not all arguments converted during string formatting

named もエラーになります。これだけエラー文が違います。


with connection.cursor() as cursor:
        try:
            sql = "SELECT id, email, password FROM users WHERE email=:email"
            cursor.execute(sql, {"email": "webmaster@python.org"})
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ':email' at line 1")

PythonでAuroraを操作する(PyMySQLを使う方法)

今回の記事では、PythonのコードでSQLを実行しAuroraを操作する方法を紹介します。
AuroraはMySQL互換のインスタンスを使っているので、PythonのライブラリはMySQL用のものを利用します。
複数種類あるのですが、僕はPyMySQLを利用することが多いです。
(職場で先輩が使ってたというだけの理由で使い続けており、他のライブラリと比較検証したわけじゃないので、
いつかちゃんとメリットデメリット比較したいです。)

PyMySQL のドキュメントはこちら
PyMySQL · PyPI
前回の記事でも書きましたが、サンプルコードが軽く記載してあるだけで、あまり詳しい説明などはありません。
pep249を必要に応じて参照する必要があります。

といっても、使い方は簡単なので、ドキュメントのサンプルコードだけあれば実用上はなんとか使っていけます。
(DBヘの接続、SQLの実行と結果の取り出しができれば十分。)

この記事では、サンプルコードを参考に、最低限の操作をやってみようと思います。

まず、DBにサンプルのTableを作っておきます。


> CREATE TABLE users (
    id int(11) NOT NULL AUTO_INCREMENT,
    email varchar(255) NOT NULL,
    password varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
);

Query OK, 0 rows affected (0.05 sec)

まず、DBへの接続です。
DBのエンドポイント/ユーザー名/パスワード/DB名 をそれぞれ設定し、接続をします。
cursorclass=pymysql.cursors.DictCursor を設定するのは、結果をDict形で受け取るためです。
(これは必須ではないのですが、Dictで受け取った方が後々扱いやすいです。)
{}で書いてる部分は各自の環境の値を入れてください。


import pymysql.cursors


con_args = {
    "host": "{RDSのエンドポイント/同サーバーのDB場合はlocalhost}",
    "user": "{ユーザー名}",
    "password": "{パスワード}",
    "database": "{DB名}",
    "cursorclass": pymysql.cursors.DictCursor
}

connection = pymysql.connect(**con_args)

新規レコードを挿入してみます。
SQLインジェクション対策にプレースホルダ(%s)を使います。
自動コミットではないので、 connection.commit() を忘れないようにするのがポイントです。


with connection.cursor() as cursor:
    sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
    cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
    connection.commit()

次はSELECT文です。 1レコード取り出すサンプルコードは次のようになります。


with connection.cursor() as cursor:
    sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
    cursor.execute(sql, ('webmaster@python.org',))
    result = cursor.fetchone()
    print(result)

# 結果
# {'id': 1, 'password': 'very-secret'}

ドキュメントのサンプルコードはこれだけですね。

ほとんど代わり映えしないのですが、結果が複数行になるSELECT文も試しておきましょう。
まずレコード自体が1行しかないとどうしようもないのでデータを増やします。


with connection.cursor() as cursor:
    sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
    cursor.execute(sql, ('sato@python.org', 'very-secret'))
    cursor.execute(sql, ('suzuki@python.org', 'very-secret'))
    cursor.execute(sql, ('takahashi@python.org', 'very-secret'))
    cursor.execute(sql, ('tanaka@python.org', 'very-secret'))
    connection.commit()

結果が複数行になる場合は、fetchone()の代わりに、fetchall()を使います。


with connection.cursor() as cursor:
        sql = "SELECT id, email FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
        print(result)

# [{'id': 1, 'email': 'webmaster@python.org'}, {'id': 2, 'email': 'sato@python.org'}, 
# {'id': 3, 'email': 'suzuki@python.org'}, {'id': 4, 'email': 'takahashi@python.org'},
# {'id': 5, 'email': 'tanaka@python.org'}]

ちなみにこの結果の型ですが、pandasのDataFrameに簡単に変換できるので便利です。


import pandas as pd


df = pd.DataFrame(result)
print(df)
"""
   id                 email
0   1  webmaster@python.org
1   2       sato@python.org
2   3     suzuki@python.org
3   4  takahashi@python.org
4   5     tanaka@python.org
"""

もちろんfetchoneを繰り返して1行ずつ取り出すこともできるのですが、個人的にはfetchallでさっさと取り出して
データフレームにしてしまった方が扱いやすいと思います。
fetchoneで順番に取り出すとしたら次のようなコードでしょうか。


with connection.cursor() as cursor:
        sql = "SELECT id, email FROM users"
        cursor.execute(sql)

row = cursor.fetchone()
while row is not None:
    print(row)
    row = cursor.fetchone()
"""
{'id': 1, 'email': 'webmaster@python.org'}
{'id': 2, 'email': 'sato@python.org'}
{'id': 3, 'email': 'suzuki@python.org'}
{'id': 4, 'email': 'takahashi@python.org'}
{'id': 5, 'email': 'tanaka@python.org'}
"""

次のような書き方でも動きます。


with connection.cursor() as cursor:
    sql = "SELECT id, email FROM users"
    cursor.execute(sql)

for row in cursor:
    print(row)
    
"""
{'id': 1, 'email': 'webmaster@python.org'}
{'id': 2, 'email': 'sato@python.org'}
{'id': 3, 'email': 'suzuki@python.org'}
{'id': 4, 'email': 'takahashi@python.org'}
{'id': 5, 'email': 'tanaka@python.org'}
"""

Pythonの DB-API 2.0 (PEP 249)について

PEP 249 — Python Database API Specification v2.0と言うものがあることを最近知ったのでその紹介です。

お恥ずかしい話ですが正直に言うと、これまでPython経由でデータベースを操作するコードを書くときは、
誰かが既に書いたコードや本のサンプルなどを参照して、DBヘの接続やSQLの実行部分はほとんどコピーして使っていました。

頻繁に実施する処理ではあるので、このブログにもまとめておこうと思い、
ブログ記事にするからには仕様を正しく理解してから書きたいと考えて調べてたら行きあたったのがこのPEP249です。

これは何かというと、リレーショナルデータベースにアクセスする機能を提供する
Pythonモジュールが似たような作りになるようにAPI仕様を定義したものです。

PEP249のページでも
This API has been defined to encourage similarity between the Python modules that are used to access databases.
と書いてありますね。

なぜこんなものが必要になるかというと、RDBには多くの種類があり、そしてそれぞれのRDBごとにそれを利用するモジュールが作られたとして、
それらが全部バラバラに仕様を決めて実装されていたら、使いにくいし、もし別のDBやモジュールに移行したくなったときに使い勝手が違うと大変だからです。

そこで、PEP249 (DB-API 2.0)として、DB接続や、SQLの実行、結果の取り出しなどのAPIの仕様を定めてくれています。
実際、多くのモジュールがこれに沿って実装されているようです。

PEP249 のページを読んでいくと、DBの接続に使う connect メソッドや、 Connection Object、
SQLを実行するexecuteメソッドや、結果を取り出す、fetchone/ fetchall など見慣れた(コピペで使ってた)メソッド群の説明が書いてあります。

PyMySQL などのドキュメントを見てると、
サンプルコードが少し書いてあるだけで、各メソッドの説明がほとんどなく非常に不親切だなと前々から思っていたのですが、
PEP249に沿って実装されているのであれば話は別です。
これらのモジュールはPEP249を読んで使うものだったのですね。

PythonでURL文字列を構築する

前回の記事がURL文字列の各要素への分解だったので、今回は逆に構築です。

まずはクエリパラメーター(クエリストリング)部分についてです。
今更説明するまではないのですが、クエリパラメーターはキーと値を=(イコール)で結び、さらにそれらを&で繋いでいきます。
自分で実装すると地味に手間なのですが、これもurllibに実装があります。

それが urllib.parse.urlencode です。

試しにやってみます。


from urllib import parse

qs = parse.urlencode(
    {
        "key1": "キー1",
        "value1": "値1",
        "id": "123"
    }
)
print(qs)
# key1=%E3%82%AD%E3%83%BC1&value1=%E5%80%A41&id=123

この例で気づかれたと思いますが、このメソッド、とても気の利いてることにパーセントエンコーディングもやってくれます。
辞書の他に、(キー, 値)のタプルの配列も引数として受け取ってくれます。


qs = parse.urlencode(
    [
        ("key1", "キー1"),
        ("value1", "値1"),
        ("id", "123")
    ]
)
print(qs)
# key1=%E3%82%AD%E3%83%BC1&value1=%E5%80%A41&id=123

クエリパラメーターの文字列を構築するのに、非常に便利ですね。

もう一つ、プロトコルやホスト名、パスなどからURL全体を構築するメソッドもあります。
それが、 urllib.parse.urlunparse です。

scheme, netloc, path, parameters, query, fragment の 6要素のタプルを受け取り、
scheme://netloc/path;parameters?query#fragment の形に結合してくれます。
タプルないの要素は省略は不可で、それを含まないようなURLを作るときは空白文字列を入れないといけません。

正直、そこまで便利には感じませんでした。

一応やってみた例です。


print(
    parse.urlunparse(
        (
            "https",
            "analytics-note.xyz",
            "",
            "",
            "s=統計学",
            "",
        )
    )
)
# https://analytics-note.xyz?s=統計学

上の例で分かる通り、クエリストリングはパーセントエンコーディングやってくれません。
そのため、クエリパラメーターに日本語を含む場合は本当は事前に何らかの手段でパーセントエンコーディングしておく必要があります。

urlunparse はそこまで使うメリットがなさそうなので、単純に文字列の結合を実装しても良さそうです。