以前、PyMySQLを使って、Amazon RDSを利用する方法を記事にしました。
参考: PythonでAuroraを操作する(PyMySQLを使う方法)
DBに直接接続できる場合はこれで良いのですが、場合によっては踏み台となるサーバーを経由して接続しなければならないことがあります。
僕の場合は職場ではセキュリティ上の理由から分析用のDBの一つがローカルから直接接続できないようになっていますし、プライベートではAurora Serverless v1使っているので、これはAWS内のリソース経由でしか接続できません。
ということで、Pythonで踏み台経由してAWSに接続する方法を書いていきます。
実はこれまで人からもらったコードをそのまま使っていたのですが、この記事書くために改めてsshtunnel のドキュメントを読んで仕組みを理解しました。
参考: Welcome to sshtunnel’s documentation! — sshtunnel 0.4.0 documentation
さて、さっそくやっていきましょう。セキュリティ的に接続情報はブログに書くわけにいかないので、以下の変数に入ってるものとします。
あと、サンプルなので実行したいSQL文も sql って変数に入ってるものとします。
サーバーのネットワーク設定ですが、踏み台はSSHのポート(通常は22番)、RDSはDBの接続ポート(通常は3306番)を開けておいてください。以降のコードで出てくる9999番ポートは、ローカル端末のポートなので踏み台やDBのサーバーでは開けておかなくて良いです。
# DBの接続情報 (RDSを想定)
db_host = "{DBのエンドポイント}" # xxxx.rds.amazon.com みたいなの
db_port = 3306 # DBのポート(デフォルトから変更している場合は要修正)
db_name = "{データベース名}"
db_user = "{DBに接続するユーザー名}"
db_pass = "{DBに接続するユーザーのパスワード}"
# 踏み台サーバーの接続情報 (EC2を想定)
ssh_ip = "{サーバーのIPアドレス}"
ssh_user = "{SSH接続するユーザー名}" # EC2のデフォルトであれば ec2-user
ssh_port = 22 # SS接続するポート(デオフォルトから変更している場合は要修正)
ssh_pkey = "{秘密鍵ファイルの配置パス}" # .pem ファイルのパス
sql = "{実行したいSQL文}"
さて、さっそく行ってみましょう。単発で1個だけSQLを打って結果を取得したい、という場合、以下のコードで実行できます。
ローカル(手元のPCやMac)の 9999 番ポート (これは他で使ってなければ何番でもいい)への通信が、踏み台サーバーを経由してRDSに届くようになります。
from sshtunnel import SSHTunnelForwarder
from pymysql import cursors
from pymysql import connect
with SSHTunnelForwarder(
ssh_address_or_host=(ssh_ip, ssh_port), # 踏み台にするサーバーのIP/SSHポート
ssh_username=ssh_user, # SSHでログインするユーザー
ssh_pkey=ssh_pkey, # SSHの認証に使う秘密鍵
remote_bind_address=(db_host, db_port), # 踏み台を経由して接続したいDBのホスト名とポート
local_bind_address=("localhost", 9999), # バインドするローカル端末のホスト名とポート
) as tunnel:
with connect(
host="localhost", # DBのエンドポイントではなく、ローカルの端末を指定する
port=9999, # これもDBのポートでは無く、バインドしたポート番号を指定する
user=db_user, # これ以下は普通にDB接続する場合と同じ引数
password=db_pass,
database=db_name,
charset="utf8mb4",
autocommit=True,
cursorclass=cursors.DictCursor,
).cursor() as cursor:
cursor.execute(sql) # これでSQL実行
rows = cursor.fetchall() # 結果の取り出し
これで、通常はローカルからはアクセスできないDBへSQLを発行し、結果を変数rowsに取得することができました。SELECT文を打ったのであればpandasのDataFrame等に変換して使いましょう。
with文で変数をたくさん呼び出すインスタンスを使うのはコードの見栄えが非常に悪くなりますが、以下のように変数を事前に辞書にまとめておくと少しマシになります。
ssh_args = {
"ssh_address_or_host": (ssh_ip, ssh_port),
"ssh_username": ssh_user,
"ssh_pkey": ssh_pkey,
"remote_bind_address": (db_host, db_port),
"local_bind_address": ("localhost", 9999),
}
db_args = {
"host": "localhost",
"port": 9999,
"user": db_user,
"password": db_pass,
"database": db_name,
"charset": "utf8mb4",
"autocommit": True,
"cursorclass": cursors.DictCursor,
}
with SSHTunnelForwarder(**ssh_args) as tunnel:
with connect(**db_args).cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
以上で、一応やりたいことはできると思いますが、発行したいSQLが複数ある場合かつ途中に別の処理も含むような場合一回ごとにポートフォワードとDBの接続をやり直していたらリソースの無駄です。(といっても、最近のコンピューター環境ならこれがストレスになる程時間かかるってことはないと思いますが。)
DBヘ接続しっぱなしにしておく場合は、withを使わずに次のように書きます。引数は上のコード例で作った、ssh_args, db_args をそのまま使います。
server = SSHTunnelForwarder(**ssh_args)
server.start() # ポートフォワード開始
connection = connect(**db_args) # DB接続
# 以上の3行で DBに接続した状態になる。
# 以下のようにして接続を使ってSQLを実行する。
with connection.cursor() as cursor:
cursor.execute(sql)
rows = cursor.fetchall()
# サンプルコードなのでSQLを1回しかやってないけど、続けて複数実行できる。
# 終わったらDB接続とポートフォワードをそれぞれ閉じる
connection.close()
server.stop()
これで、一つの接続を使い回すこともできるようになりました。
ちなみにですが、このsshtunnelで作ったポートフォワードの設定は端末単位で有効です。どういうことかというと、複数のPythonプロセス(例えば別々のJupyter notebook)間で、共有することができます。というより、Pythonに限らず他のプログラムからも使えます。
コンソールで、以下のコマンド使ってポートの動きを見ながら試すとよくわかります。
# 9999番ポートの利用状況を確認する
$ sudo lsof -i:9999
普段は何も結果が返ってこないか、ここまでのプログラムを実行してたらいろんな情報と共に(CLOSED)が返ってくると思いますが、ポートフォワードしている最中はESTABLISHEDになっていて、pythonが使っていることが確認できます。
特にPythonでDB操作したいという場合に限って言えば、別々のnotebookで操作するメリットなんて無いのですが、全く別の用途でポートフォワードだけPythonでやっておきたい、ということはあるかもしれないので、覚えておくと使う機会があるかもしれません。