boto3のclient API とresource APIについて

boto3を使ったソースコードを読んでいると、
boto3.client(“サービス名”) と使っているものと、boto3.resource(“サービス名”) と使っているものがあり、
自分でも無意識に使い分けていたことに気づいたのでこれらの違いについて調べてみました。

ドキュメントを探すと、次のページに書いてありました。
参照: AWS SDK for Python | AWS

Boto3 には、2 つの異なるレベルの API があります。クライアント(「低レベル」)API では、下層の HTTP API 操作との 1 対 1 のマッピングが提供されます。 リソース API では、明示的なネットワーク呼び出しが表示されず、属性にアクセスしアクションを実行するためのリソースオブジェクトとコレクションが提供されます。

ちょっとわかりにくいですね。実際に動かしてみた違いから考えると、
クライアント API(boto3.clientの方)は、AWSの各リソースが持っているREST APIと1対1に対応した単純なPythonラッパーのようです。
それに対して、リソースAPI(boto3.resourceの方)はAWSのリソースをオブジェクト指向のプログラムで操作できるようにしたもののようです。

実際に動かしてみましょう。
EC2のインスタンスのインスタンスIDとインスタンスタイプ、現在の動作状況を一覧取得するプログラムをそれぞれ書いてみます。

まず、クライアントAPIの方です。
インスタンスの一覧は、describe_instances()で取得できます。
まず、単純に結果を表示すると、結果が辞書型で得られていることが確認できます。 (すごく長いので省略しています)


import boto3


ec2_client = boto3.client("ec2")
result = ec2_client.describe_instances()
print(result)

# 以下出力
{
    'Reservations': [
        {
            'Groups': [],
            'Instances': [
                {
                    'AmiLaunchIndex': 0,
                    'ImageId': 'ami-da9e2cbc',
                    'InstanceId': '{1つ目のインスタンスID}',
                    'InstanceType': 't1.micro',
                    'KeyName': '{キーファイルの名前}',
                    'LaunchTime': datetime.datetime(2021, 3, 20, 5, 35, 47, tzinfo=tzutc()),
                    'Monitoring': {'State': 'enabled'},
                    'Placement': {'AvailabilityZone': 'ap-northeast-1a',
                    'GroupName': '',
                    'Tenancy': 'default'
                },
    # 中略
    'ResponseMetadata': {
        'RequestId': '1b6c171d-d199-46c0-b2a6-7037fcfda28b',
        'HTTPStatusCode': 200,
        'HTTPHeaders': {
            'x-amzn-requestid': '1b6c171d-d199-46c0-b2a6-7037fcfda28b',
            'cache-control': 'no-cache, no-store',
            'strict-transport-security': 'max-age=31536000; includeSubDomains',
            'content-type': 'text/xml;charset=UTF-8',
            'transfer-encoding': 'chunked',
            'vary': 'accept-encoding',
            'date': 'Sun, 23 May 2021 08:21:51 GMT',
            'server': 'AmazonEC2'
        },
        'RetryAttempts': 0
    }
}

この巨大な辞書ファイルの中から必要な情報を取り出します。


for r in result["Reservations"]:
    print(r["Instances"][0]["InstanceId"])
    print(r["Instances"][0]["InstanceType"])
    print(r["Instances"][0]["State"])
    print(" - ・"*10)
"""
{1つ目のインスタンスID}
t1.micro
{'Code': 80, 'Name': 'stopped'}
 - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・
{2つ目のインスタンスID}
t2.nano
{'Code': 16, 'Name': 'running'}
 - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・
"""

上のコードを見ていただければわかる通り、単純な辞書の巨大な塊なのでちょっと情報が取り出しにくいのがわかると思います。

一方で、リソースAPIではどうでしょうか。
こちらはインスタンスの一覧は、ec2_resource.instances.all()でそれらのイテレーターが取得できます。
そして、インスタンスIDやインスタンスタイプ、状態などは属性として取得できます。
実際に先程のクライアントAPIと同様の情報を取得してみましょう。


ec2_resource = boto3.resource("ec2")
for instance in ec2_resource.instances.all():
    print(instance.instance_id)
    print(instance.instance_type)
    print(instance.public_ip_address)
    print(instance.state)
    print(" - ・"*10)
"""
{1つ目のインスタンスID}
t1.micro
None
{'Code': 80, 'Name': 'stopped'}
 - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・
{2つ目のインスタンスID}
t2.nano
54.199.43.209
{'Code': 16, 'Name': 'running'}
 - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・ - ・
"""

得られる結果は同じですが、コードがずいぶんわかりやすいのが実感していただけると思います。

さて、結論としてどっちを使えばいいのか、という話ですが、
実現したい操作がリソースAPIでできるのであればリソースAPIを使えばいいのではないかなと思いました。

ただ、提供されているREST APIと1対1に対応しているクライアントAPIと違って、
リソースAPIは全ての操作が実現できると保証されているものではありません。

たとえば、翻訳サービスである、Translateなどは、クライアントAPIしか存在せず、リソースAPIで使おうとするとエラーになります。


try:
    boto3.resource("translate")
except Exception as e:
    print(e)
"""
The 'translate' resource does not exist.
The available resources are:
   - cloudformation
   - cloudwatch
   - dynamodb
   - ec2
   - glacier
   - iam
   - opsworks
   - s3
   - sns
   - sqs

Consider using a boto3.client('translate') instead of a resource for 'translate'
"""

boto3.client(‘translate’) 使えって言われてますね。
このように、リソースAPIが未対応の時は、諦めてクライアントAPIを使いましょう。

LightsailのWordPressにads.txtを設置する

Googleアドセンスの管理画面に入ると、

要注意 – 収益に重大な影響が出ないよう、ads.txt ファイルの問題を修正してください。

という警告が出続けているので、対応することにしました。

ads.txtについての説明は以下のページなどをご参照ください。
広告枠の管理 ads.txt に関するガイド
Ads.Txt – Authorized Digital Sellers

さて、早速作業していきます。
まず、配置するads.txtファイルを入手します。

これは、Googleアドセンスの警告の右側に表示されている「今すぐ修正」をクリックすると、
「ダウンロード」できるようになります。

ダウンロードしたファイルをサイトのルートディレクトリ(トップレベル ドメイン直下のディレクトリ)に配置します。

LightsailのWordpressの場合、
/home/bitnami/apps/wordpress/htdocs/ads.txt
に配置すればOKです。
scpか何かでアップロードしても良いでしょうし、たった1行なので内容をコピーして貼り付けても良いでしょう。

htts://{サイトのドメイン}/ads.txt
にアクセスして、ads.txtファイルの内容が表示されたら成功です。

クローラーが検知してくれるのを気長に待ちましょう。
クローリングしてくれたらGoogleアドセンス管理画面の警告も消えるはずです。

サービスとして動かしているjupyter notebookに環境変数を設定する

jupyterをサービスとして動かしていると、 .bash_profile で設定した環境変数を読み込んでくれなかったので、その対応のメモです。

ちなみに、今回設定したい環境変数はAWSのデフォルトリージョンで、値としては、
AWS_DEFAULT_REGION=ap-northeast-1
です。
方法としては、Unit ファイルに直接書き込んで設定する方法と、
環境変数をまとめた設定ファイルを作成し、そのファイルパスをUnitファイルに指定する方法があります。

直接書き込む場合は、 Unitファイルの [Service] のセクションに、
Environment=”環境変数名=値”
で指定します。
複数指定したい場合は、
Environment=”環境変数名1=値1″
Environment=”環境変数名2=値2″
と2行に分けて書くか、
Environment=”環境変数名1=値1″ “環境変数名2=値2”
のように空白で区切って指定すれば良いようです (値にスペースがない場合はダブルクオーテーションは省略可能)
ドキュメントはここ。
systemd.exec Environment=

ちょっと試してみましょう。


sudo vim /etc/systemd/system/jupyter.service
ファイル中にEnvironment= の2行を追加

[Unit]
Description=Jupyter Notebook

[Service]
ExecStart=/home/ec2-user/.pyenv/shims/jupyter notebook
Restart=always
User=ec2-user
Group=ec2-user
Environment="VAR1=word1 word2"
Environment=VAR2=word3 "VAR3=$word 5 6"

[Install]
WantedBy=multi-user.target

# 上記ファイルを保存
# サービスを再起動
sudo systemctl daemon-reload
sudo systemctl restart jupyter

jupyter notebookで認識できているか確認します。
!をつけるとOSコマンドが実行できるのでそれを使います。


!set | grep VAR
# 以下出力
BASH_EXECUTION_STRING='set | grep VAR'
VAR1='word1 word2'
VAR2=word3
VAR3='$word 5 6'

設定されていますね。

ということで、今回の要件だけ考えれば
Environment=AWS_DEFAULT_REGION=ap-northeast-1
と直書きしてしまって良さそうです。

ただ、将来的に設定したい環境変数が増えていくことも考えられますので、その時Unitファイルが煩雑にならないように、
今の段階でもう一個の方法の、環境変数の設定ファイルを作る方法を使うことにしました。

ドキュメントはこちらです。
systemd.exec EnvironmentFile=

環境変数が入力されたファイル自体は、
/etc/sysconfig/サービス名
に作成するのがお作法らしいです。
そして、作成したファイルのパスを EnvironmentFile= に指定します。


$ sudo vim /etc/sysconfig/jupyter

# 以下の内容を記入して保存
AWS_DEFAULT_REGION=ap-northeast-1

$ sudo vim /etc/systemd/system/jupyter.service
# EnvironmentFile= の行を追加

[Unit]
Description=Jupyter Notebook

[Service]
EnvironmentFile=/etc/sysconfig/jupyter
ExecStart=/home/ec2-user/.pyenv/shims/jupyter notebook
Restart=always
User=ec2-user
Group=ec2-user

[Install]
WantedBy=multi-user.target

# 上記ファイルを保存
# サービスを再起動
sudo systemctl daemon-reload
sudo systemctl restart jupyter

設定されたことを確認してみましょう。


!set | grep VAR
# 以下出力
AWS_DEFAULT_REGION=ap-northeast-1
BASH_EXECUTION_STRING='set | grep AWS'

ちゃんと設定されましたね。
これで boto3 を使うときに、
region_name=”ap-northeast-1
をいちいち指定しなくて良くなりました。

DBのエンドポイントや接続情報など環境変数に入れておきたい内容はこの調子で、
/etc/sysconfig/jupyter
に突っ込んでいきましょう。

失敗しやすい処理にリトライをスクラッチで実装する

とあるSDKを使って実装している処理で、利用しているAPIのエラーが頻発するようになり、
エラーが起きたらリトライする処理をスクラッチで作る必要があったのでその時の実装をメモしておきます。

今回は利用しているSDKのメソッドにリトライ処理が内包されておらず、
気軽にライブラリを追加できる環境でもなかったのでスクラッチで実装しましたが、
通常は他の手段がないか探すことをお勧めします。

例えば、requestsのようなライブラリは引数でリトライ回数を指定できますし、
世の中にはリトライを行う専用のライブラリなども転がっています。

この記事のコードはそれらが使えなかった場合の最後の手段です。

実際の処理はお見せできないので、サンプルとして一定確率(80%)エラーになる関数を作っておきます。


import numpy as np


def main_function():
    num = int(np.random.choice([0, 1], p=[0.8, 0.2]))
    return 1/num

さて、これを成功するまでリトライする関数を作ります。
実装するにあたって定めた要件は以下の通りです。

– 規定回数リトライする。(今回は5回と定めました。)
– 初回の実行と合わせて、実際に実行を試みるのは、{1+リトライ回数}回。
– 発生したエラーは都度表示する。
– 規定回数全てエラーになったら、最後に発生したエラーを呼び出し元に返す。
– エラーから次のリトライまでは、1秒,2秒,4秒,8秒,…と間隔をあける。

書いてみたのが次のコードです。


from time import sleep


max_retry_count = 5  # リトライ回数

for retry_count in range(max_retry_count+1):
    try:
        print(main_function())  # 実行したい処理
        break  # 成功したらループを抜ける
    except Exception as e:
        print(e)
        if retry_count == max_retry_count:
            print(f"規定回数({max_retry_count}回)のリトライに失敗しました。")
            raise
        # {2^リトライ回数}秒待ちを入れる
        dely = 2**retry_count
        print(f"{dely}秒後にリトライします。{retry_count+1}/{max_retry_count}回目。")
        sleep(dely)

2回のリトライ(3回目の試行)で成功すると出力は以下のようになります。


division by zero
1秒後にリトライします。1/5回目。
division by zero
2秒後にリトライします。2/5回目。
1.0

最後まで失敗すると以下のようになります。


division by zero
1秒後にリトライします。1/5回目。
division by zero
2秒後にリトライします。2/5回目。
division by zero
4秒後にリトライします。3/5回目。
division by zero
8秒後にリトライします。4/5回目。
division by zero
16秒後にリトライします。5/5回目。
division by zero
規定回数(5回)のリトライに失敗しました。
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
 in 
      7 for retry_count in range(max_retry_count+1):
      8     try:
----> 9         print(main_function())  # 実行したい処理
     10         break  # 成功したらループを抜ける
     11     except Exception as e:

 in main_function()
      3 def main_function():
      4     num = int(np.random.choice([0, 1], p=[0.8, 0.2]))
----> 5     return 1/num

ZeroDivisionError: division by zero

ちゃんとエラーになりましたね。

EC2にIAMロールを設定する

EC2に構築したJupyterサーバーでboto3を使うとき、ローカルのMacbookと同じようにIAMユーザーのアクセスキーを、
環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY に設定して使うものだと思っていたのですが、
実はEC2にメタデータとしてIAMロールを設定できることを知ったのでそのメモです。

boto3のドキュメントを見ると、認証情報として以下の8種類が使える(上のものほど優先される)ことがわかります。
参考: Configuring credentials

1. boto.client()メソッドにパラメーターとして渡された認証情報
2. セッションオブジェクトを作成するときににパラメータとして渡された認証情報
3. 環境変数
4. 共有認証情報ファイル(~/.aws/credentials)
5. AWS設定ファイル(~/.aws /config)
6. ロールの引き受けの提供
7. Boto2設定ファイル(/etc/boto.cfg と 〜/.boto)
8. IAMロールが設定されているAmazonEC2インスタンスでは、インスタンスメタデータサービス

(6. とかよく意味がわからない。。。)

この中の、8. のものを使ってみようという話です。

設定したら設定できたことを確認したいので、とりあえず以下の記事で紹介した翻訳サービスでも使ってみましょう。
参考: Amazon Translate を試してみた

まず、何も設定していないと、認証情報がないって趣旨のエラーが出ることを確認しておきます。


import boto3

text = """
メロスは激怒した。
必ず、かの邪智暴虐の王を除かなければならぬと決意した。
"""

client = boto3.client("translate", region_name="ap-northeast-1")

result = client.translate_text(
    Text=text,
    SourceLanguageCode="ja",
    TargetLanguageCode="en",
)

# NoCredentialsError: Unable to locate credentials

ちなみに、 region_name=”ap-northeast-1″ を指定しないと、
NoRegionError: You must specify a region.
が出ます。

さて、本題の IAMの設定に移ります。
まずはEC2に付与するIAMロールを作成します。IAMユーザーではないので注意が必要です。
この辺、正確に理解できてるわけではないのですが、人に権限を付与するのがIAMユーザーで、AWSのリソースに権限を付与するのがIAMロールのようです。(超雑な説明)

1. AWSの管理コンソールのIAMのページの、左ペインでロールを選択します。
https://console.aws.amazon.com/iam/home#/roles
2. ロールの作成、をクリックします。
3. 信頼されたエンティティの種類を選択で、AWSサービスを選択します。 (他にも選択肢があるってことは、AWSサービス以外にもロールを付与できるはずなのですが使ったことがありません。)
4. ユースケースの選択から EC2 を選択します。
5. 次のステップ:アクセス権限 ボタンをクリックします。
6. Attach アクセス権限ポリシー で必要な権限を選択します。 (今回の例では、 TranslateFullAccess を選びます。)
7. 次のステップ:タグ をクリック。
8. 次のステップ:確認 をクリック。
9. ロール名と説明を入力 (例: ec2-jupyter)。
10. ロールの作成 をクリック。

これでロールができるので、 EC2に付与します。
1. EC2の管理画面に移動し、付与したいインスタンスを選択。
2. アクションのセキュリティにある、IAMロールを変更を選択。
3. IAM ロール に先ほど作ったIAMロールを選択。
4. 保存をクリック。

これで、 EC2にIAMロールが付与され、boto3が動くようになりました。
試します。


import boto3

text = """
メロスは激怒した。
必ず、かの邪智暴虐の王を除かなければならぬと決意した。
"""

client = boto3.client("translate", region_name="ap-northeast-1")

result = client.translate_text(
    Text=text,
    SourceLanguageCode="ja",
    TargetLanguageCode="en",
)

print(result["TranslatedText"])
"""
Melos got furious.
He determined that he must exclude the king of wicked violence.
"""

将来的に、 Translate 以外のサービスも使いたくなったら、
ロールの管理画面から今回作ったIAMロールにポリシーを追加でアタッチしていけば使うことができます。

PyMySQLのcursorclassについて

PyMySQLの公式ドキュメントのExamplesで使われている、
cursorclass=pymysql.cursors.DictCursor
の話です。


import pymysql.cursors

# Connect to the database
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='passwd',
    database='db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor # これの話。
)

ドキュメントにも詳しい説明はないし、pep249でも言及されていないようなのでPyMySQLのソースコードも含めて調べてみました。

結論から言えば、実用上は黙って cursorclass=pymysql.cursors.DictCursor を指定しておけばよく、この記事は無駄知識の類のものになります。

まず、cursorclass に指定できる値は pymysql.cursors.DictCursor 以外に何があるのかですが、
これは、こちらのファイルで定義されている4種類のクラスが指定できます。
Github: pymysql/cursors.py

– pymysql.cursors.Cursor
– pymysql.cursors.DictCursor
– pymysql.cursors.SSCursor
– pymysql.cursors.SSDictCursor

https://github.com/PyMySQL/PyMySQL/blob/master/pymysql/connections.py#L179
に、


class Connection:
    # 中略
    def __init__(
        # 中略
        cursorclass=Cursor,

とある通り、デフォルトは、 pymysql.cursors.Cursor です。

この4種類のカーソルの違いですが、
Cursor と SSCursor は結果をタプルで返し、
DictCursor と SSDictCursor は結果を辞書で返してくれます。
結果の形に、CursorとSSCursor、DictCursorとSSDictCursorの間にはそれぞれ違いはありません。

SSとつく二つの方ですが、これらは主にデータが非常に大きいときや、ネットワークが遅いときなどに使います。
SSCursorのコメントがわかりやすいですね。

Unbuffered Cursor, mainly useful for queries that return a lot of data,
or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this is the client uses much less memory,
and rows are returned much faster when traveling over a slow network
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn’t support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn’t
possible to scroll backwards, as only the current row is held in memory.

個人的な感想としては最近の端末には十分なメモリが搭載されていて、数百万行単位のレコードを扱うときも、SS無しの方で十分さばけているので、
とりあえず DictCursor を使って、本当にメモリ不足で困った時だけ SSDictCursor を検討したらいいのかなと思っています。

Cursor(タプル) と DictCursor(辞書)についてはそれぞれ実行して結果を比較しておきましょう。

まず、cursorclassにpymysql.cursors.Cursorを指定した(もしくは何も指定しなかった場合)の結果です。
テーブルは以前の記事で作ったやつをそのまま使います。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
print(result)
"""
(
    (1, 'webmaster@python.org', 'very-secret'),
    (2, 'sato@python.org', 'very-secret'),
    (3, 'suzuki@python.org', 'very-secret'),
    (4, 'takahashi@python.org', 'very-secret'),
    (5, 'tanaka@python.org', 'very-secret')
)
"""

1レコードごとに結果がタプルで戻っているだけでなく、fetchallすると戻り値はタプルのタプルになっていますね。
戻ってきた結果の各値が、SELECT句のどの列の値なのかが明示されていないので、自分でマッピングする必要があります。
正直これは少し使いにくいです。

続いて、pymysql.cursors.DictCursor を指定した場合の結果です。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
print(result)
"""
[
    {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'},
    {'id': 2, 'email': 'sato@python.org', 'password': 'very-secret'},
    {'id': 3, 'email': 'suzuki@python.org', 'password': 'very-secret'},
    {'id': 4, 'email': 'takahashi@python.org', 'password': 'very-secret'},
    {'id': 5, 'email': 'tanaka@python.org', 'password': 'very-secret'}
]
"""

ご覧の通り、1レコードごとに「列名:値」の辞書として値が得られ、それらの配列として結果が返されます。
各値がSELECT句のどの列のものなのかはっきりしているのでとても便利です。
また、このままpandasのデータフレームに変換することもできます。
通常はこれを使えば良いでしょう。

PyMySQLでまとめてデータをインサートする(executemanyを使う方法)

※この記事ではDBはAWSのAuroraを想定しています。

Auroraに多くのデータをまとめて登録したいとき、1レコードごとにINSERT文を書いて実行するのは少しイケてない思っていたので、
良い方法を探していたところ、 executemany というメソッドが用意されているのを知りました。
pep249 にも記載があるので、PyMySQL以外のライブラリでも用意されていると思います。

サンプルコード載せるに状況の説明です。
INSERTしたいデータは例えば次のようなものだとします。


print(df)
"""
           date      open      high       low     close
0    2020-07-08  22481.61  22667.95  22438.65  22438.65
1    2020-07-09  22442.30  22679.08  22434.38  22529.29
2    2020-07-10  22534.97  22563.68  22285.07  22290.81
3    2020-07-13  22591.81  22784.74  22561.47  22784.74
4    2020-07-14  22631.87  22677.02  22538.78  22587.01
..          ...       ...       ...       ...       ...
195  2021-04-23  28939.12  29035.34  28770.62  29020.63
196  2021-04-26  29095.49  29241.28  28896.37  29126.23
197  2021-04-27  29174.53  29187.11  28990.19  28991.89
198  2021-04-28  28935.51  29139.70  28875.91  29053.97
199  2021-04-30  28996.66  29046.49  28760.27  28812.63

[200 rows x 5 columns]
"""

そして、INSERT先のテーブルは以下です。
(connection は既にRDSへの接続が完了しているものとします。)


with connection.cursor() as cursor:
    cursor.execute("SHOW CREATE TABLE nikkei")
    result = cursor.fetchone()
    
print(result["Create Table"])
"""
CREATE TABLE `nikkei` (
  `date` date NOT NULL,
  `open` double NOT NULL,
  `high` double NOT NULL,
  `low` double NOT NULL,
  `close` double NOT NULL,
  PRIMARY KEY (`date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
"""

通常であれば、この200あるレコード一つ一つについて、for文で回して、それぞれINSERT文を発行するところなのですが、
cursor.executemany を使うと次のように(表向きは)一発でINSERTできます。
(おそらく内部では1レコードずつINSERT文が発行されているのですが。)


insert_sql = """
    INSERT INTO
        nikkei (
            date,
            open,
            high,
            low,
            close
        )
    VALUES
        (
            %s, %s, %s, %s, %s
        )
"""

with connection.cursor() as cursor:
    cursor.executemany(insert_sql, df.values.tolist())
    connection.commit()

PyMySQLではプレースホルダにformat(%s)とpyformat(%{name}s)を使える

タイトルを逆にいうとPyMySQLのプレースホルダには qmark(?), numeric(:1), named(:name) は実装されてないようです。

※ この記事は、 version 1.0.2 の時点の PyMySQLについて書いています。将来的に他のプレースホルダーも実装されるかもしれません。


!pip freeze | grep PyMySQL
PyMySQL==1.0.2

pep249では、paramstyle として次の5つが定められています。

名前 説明
qmark Question mark style WHERE name=?
numeric Numeric, positional style WHERE name=:1
named Named style WHERE name=:name
format ANSI C printf format codes WHERE name=%s
pyformat Python extended format codes WHERE name=%(name)s

PyMySQLはpep249に従って実装さているので、この5つが全部使えるのかなと思って試したら、そうなってないことに気づきました。
タイトルにも書きました通り、実装されているのは format と pyformat だけです。
とはいえ、実用的なことを考えると、各値に名前をつけないときは、format、名前をつけたいときはpyformatを使えばいいので、この二つで十分だと思います。

実際にやってみます。
テーブルは前の記事で作ったものをそのまま使います。

まずDBに接続します。接続情報は各自の環境の値をご利用ください。


import pymysql.cursors

con_args = {
    "host": "{RDSのエンドポイント/同サーバーのDB場合はlocalhost}",
    "user": "{ユーザー名}",
    "password": "{パスワード}",
    "database": "{DB名}",
    "cursorclass": pymysql.cursors.DictCursor
}

connection = pymysql.connect(**con_args)

まず、 format スタイルから。サンプルコードなどで頻繁に見かけるのはこれです。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=%s"
        cursor.execute(sql, ('webmaster@python.org',))
        result = cursor.fetchone()
        print(result)

# {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'}

続いて、 pyformat。 このサンプルコードではありがたみがないですが、多くのプレースホルダーを使うクエリでは可読性向上に期待できます。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=%(email)s"
        cursor.execute(sql, {"email": "webmaster@python.org"})
        result = cursor.fetchone()
        print(result)

# {'id': 1, 'email': 'webmaster@python.org', 'password': 'very-secret'}

qmark だと cursor.execute でエラーになります。 (エラーになるので例外処理入れてます。)


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=?"
        try:
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # not all arguments converted during string formatting

numeric も同様。


with connection.cursor() as cursor:
        sql = "SELECT id, email, password FROM users WHERE email=:1"
        try:
            cursor.execute(sql, ('webmaster@python.org',))
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # not all arguments converted during string formatting

named もエラーになります。これだけエラー文が違います。


with connection.cursor() as cursor:
        try:
            sql = "SELECT id, email, password FROM users WHERE email=:email"
            cursor.execute(sql, {"email": "webmaster@python.org"})
            result = cursor.fetchone()
            print(result)
        except Exception as e:
            print(e)
            # (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ':email' at line 1")

PythonでAuroraを操作する(PyMySQLを使う方法)

今回の記事では、PythonのコードでSQLを実行しAuroraを操作する方法を紹介します。
AuroraはMySQL互換のインスタンスを使っているので、PythonのライブラリはMySQL用のものを利用します。
複数種類あるのですが、僕はPyMySQLを利用することが多いです。
(職場で先輩が使ってたというだけの理由で使い続けており、他のライブラリと比較検証したわけじゃないので、
いつかちゃんとメリットデメリット比較したいです。)

PyMySQL のドキュメントはこちら
PyMySQL · PyPI
前回の記事でも書きましたが、サンプルコードが軽く記載してあるだけで、あまり詳しい説明などはありません。
pep249を必要に応じて参照する必要があります。

といっても、使い方は簡単なので、ドキュメントのサンプルコードだけあれば実用上はなんとか使っていけます。
(DBヘの接続、SQLの実行と結果の取り出しができれば十分。)

この記事では、サンプルコードを参考に、最低限の操作をやってみようと思います。

まず、DBにサンプルのTableを作っておきます。


> CREATE TABLE users (
    id int(11) NOT NULL AUTO_INCREMENT,
    email varchar(255) NOT NULL,
    password varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
);

Query OK, 0 rows affected (0.05 sec)

まず、DBへの接続です。
DBのエンドポイント/ユーザー名/パスワード/DB名 をそれぞれ設定し、接続をします。
cursorclass=pymysql.cursors.DictCursor を設定するのは、結果をDict形で受け取るためです。
(これは必須ではないのですが、Dictで受け取った方が後々扱いやすいです。)
{}で書いてる部分は各自の環境の値を入れてください。


import pymysql.cursors


con_args = {
    "host": "{RDSのエンドポイント/同サーバーのDB場合はlocalhost}",
    "user": "{ユーザー名}",
    "password": "{パスワード}",
    "database": "{DB名}",
    "cursorclass": pymysql.cursors.DictCursor
}

connection = pymysql.connect(**con_args)

新規レコードを挿入してみます。
SQLインジェクション対策にプレースホルダ(%s)を使います。
自動コミットではないので、 connection.commit() を忘れないようにするのがポイントです。


with connection.cursor() as cursor:
    sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
    cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
    connection.commit()

次はSELECT文です。 1レコード取り出すサンプルコードは次のようになります。


with connection.cursor() as cursor:
    sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
    cursor.execute(sql, ('webmaster@python.org',))
    result = cursor.fetchone()
    print(result)

# 結果
# {'id': 1, 'password': 'very-secret'}

ドキュメントのサンプルコードはこれだけですね。

ほとんど代わり映えしないのですが、結果が複数行になるSELECT文も試しておきましょう。
まずレコード自体が1行しかないとどうしようもないのでデータを増やします。


with connection.cursor() as cursor:
    sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
    cursor.execute(sql, ('sato@python.org', 'very-secret'))
    cursor.execute(sql, ('suzuki@python.org', 'very-secret'))
    cursor.execute(sql, ('takahashi@python.org', 'very-secret'))
    cursor.execute(sql, ('tanaka@python.org', 'very-secret'))
    connection.commit()

結果が複数行になる場合は、fetchone()の代わりに、fetchall()を使います。


with connection.cursor() as cursor:
        sql = "SELECT id, email FROM users"
        cursor.execute(sql)
        result = cursor.fetchall()
        print(result)

# [{'id': 1, 'email': 'webmaster@python.org'}, {'id': 2, 'email': 'sato@python.org'}, 
# {'id': 3, 'email': 'suzuki@python.org'}, {'id': 4, 'email': 'takahashi@python.org'},
# {'id': 5, 'email': 'tanaka@python.org'}]

ちなみにこの結果の型ですが、pandasのDataFrameに簡単に変換できるので便利です。


import pandas as pd


df = pd.DataFrame(result)
print(df)
"""
   id                 email
0   1  webmaster@python.org
1   2       sato@python.org
2   3     suzuki@python.org
3   4  takahashi@python.org
4   5     tanaka@python.org
"""

もちろんfetchoneを繰り返して1行ずつ取り出すこともできるのですが、個人的にはfetchallでさっさと取り出して
データフレームにしてしまった方が扱いやすいと思います。
fetchoneで順番に取り出すとしたら次のようなコードでしょうか。


with connection.cursor() as cursor:
        sql = "SELECT id, email FROM users"
        cursor.execute(sql)

row = cursor.fetchone()
while row is not None:
    print(row)
    row = cursor.fetchone()
"""
{'id': 1, 'email': 'webmaster@python.org'}
{'id': 2, 'email': 'sato@python.org'}
{'id': 3, 'email': 'suzuki@python.org'}
{'id': 4, 'email': 'takahashi@python.org'}
{'id': 5, 'email': 'tanaka@python.org'}
"""

次のような書き方でも動きます。


with connection.cursor() as cursor:
    sql = "SELECT id, email FROM users"
    cursor.execute(sql)

for row in cursor:
    print(row)
    
"""
{'id': 1, 'email': 'webmaster@python.org'}
{'id': 2, 'email': 'sato@python.org'}
{'id': 3, 'email': 'suzuki@python.org'}
{'id': 4, 'email': 'takahashi@python.org'}
{'id': 5, 'email': 'tanaka@python.org'}
"""