2022年のご挨拶と今年の方針

新年明けましておめでとうございます。本年もよろしくお願いします。

さて、今年のこのブログの更新方針について決めたのでまとめておきます。
昨年末の記事でも少し頭出ししていましたが、ブログに限らず今年の計画や目標について考え、今年1年はアウトプットよりもインプットを重視した年にしようと決めました。また、その内容もデータサイエンス関連に限らず幅広く吸収していく年にしたいです。

アウトプットの時間は減らしたいのとインプット内容にこのブログ記事につながるようなテーマの物が減るということで、このブログの更新ペースは落とします。昨年の半分くらいにして週1回更新、年間50記事程度を目標にゆっくりやっていこうと思います。もし書きたいことがありすぎて困るようなことになったらまたその時にペースを見直すかもしれませんが。

僕はもともと読書が好きで色々なジャンルの本を幅広く読んでいました。その後、2017年に転職してデータサイエンティストになってからこの5年ほどの期間、まずは仕事で使うデータ分析のスキルを優先しようということで読む本がほとんど広い意味でのデータサイエンス関連や、ドメイン知識としての人材業界関連の本ばかりになっていました。特にそれが不満というわけでもなく、どんどん新しい知識が身に付き、できることが増えていくことにやりがいも感じていました。この分野は本当に学ぶことが多く、この先も興味が尽きることはなさそうです。ただその一方で、趣味に関する本とか書店でたまたま見かけて興味を持った本とか話題のベストセラー等々の他の読みたい本を読むのが完全に後回しになってきたのも事実です。

今年もデータ分析の勉強は継続はしますし、今の時点で絶対読みたいと思ってる本はそこそこあるのですがが、それらを読むのは月に1〜2冊程度に抑えようと思ってます。そして浮いた時間はまた昔みたいに、仕事や実用性を無視して興味を持ったものを何でも読んでいく時間にします。

その他、流石に3年も運用しているとこのブログにも色々改善したい点あったり、内容が古くなってしまった記事などもあります。新規の記事を書く時間を減らした範囲内で、過去記事の見直しなどを含めたメンテナンスにも細々と着手しようと思います。例えば「プログラミング」っていう非常に雑なカテゴリに多くの記事が集中してしまっているのでこの辺の見直しもしたいです。

以上のような方針のためこのブログの更新は昨年に比べてゆっくりになりますが、本年もよろしくお願いいたします。

2021年のまとめ

2021年の最後の投稿になります。

本年も訪問者の皆様には大変お世話になりました。書いた記事が多くの方に読んでいただけたということはもちろんですが、土日祝日なども平日より少ないとはいえ多くのアクセスがあり、休日も技術的な調べ物をしている熱心な人たちがいると実感できることは自分が学習を続けていく上でも大きな励みになりました。

今年も1年間の振り返りをやっていきたいと思います。本年までの累積の記事数および、年間のアクセス数は次のようになりました。
– 累計記事数 514記事 (この記事含む。昨年時点 409記事)
– 訪問ユーザー数 200,661人 (昨年実績 146,674人)
– ページビュー 348,595回 (昨年実績 258,698回)

年間100記事更新の目標を無事に達成でき、それに伴って訪問者の数も増えているので達成感を感じています。

とはいえ、多くのpvを集めているのは古い記事が多く、今年特に力を入れて書いたMeCabのアルゴリズムの話や、AWSのAI関連サービスの話、トレジャーデータの小ネタなどはあまり読まれていないようです。テーマ選びなのか僕の文章力なのか、なんらかの課題はあるように感じています。

さて、恒例のよく読まれた記事ランキングを見ていきましょう。
今回は2021年1年間でのPV数によるランキングです。

  1. matplotlibのグラフを高解像度で保存する (昨年1位)
  2. ネットワークグラフの中心性 (New)
  3. Pythonで連続した日付のリストを作る (New)
  4. pyenvで作成した環境を消す (New)
  5. TensorflowやKerasでJupyterカーネルが落ちるようになってしまった場合の対応 (New)
  6. numpyのpercentile関数の仕様を確認する (昨年4位)
  7. INSERT文でWITH句を使う (昨年7位)
  8. matplotlibでグラフ枠から見た指定の位置にテキストを挿入する (New)
  9. kerasのto_categoricalを使ってみる (昨年3位)
  10. Pythonで多変量正規分布に従う乱数を生成する (昨年10位)

Googleアナリティクスで確認した時、1位と10位が昨年と同じなので今年もあまり変わり映えしないなという印象を持っていました。しかし、改めて昨年のランキングと比較してみると昨年ランクインしなかった記事が5記事も入っており意外と顔ぶれ変わってましたね。

このブログもこれで開設から丸三年になります。流石にネタ切れを感じる日もあるので来年の運用をどうしようかと考えています。(とはいえ、ブログネタのストックは今時点で40個程度はあるので本当の意味ではネタ切れしてないのですが、書きたいけどなかなか筆が進まないものやタイミングを逃した感があるのも多く難しいところです。)

来年も技術的なスキルアップを目指した学習はもちろん続けていきますし、仕事の中での疑問や課題感からネタが出てくることもあると思うので、ブログの更新自体は続けていきます。ただ、技術関連以外のインプットにももっと力を入れていきたいですし、休日の時間を今以上に読書や講座受講などに使いたいので、更新頻度は見直したほうがいいかもとは思っています。

この年末年始で来年をどう過ごすかを考えて、その中でブログの運用方針も決めたいと思います。

それではみなさん、今年も1年間ありがとうございました。良いお年を。

pipでライブラリをインストールする前に依存ライブラリを確認する

僕はAnacondaで環境を構築してcondaで運用しているのですが、どうしてもcondaでは入れられないライブラリがある時など、やむを得ずpipを使うことがあります。その場合、condaで入れられる限りの依存ライブラリを入れた後に必要最小限のライブラリをpipで入れるようにしているのですが、依存ライブラリの確認漏れ等があり、想定外のライブラリがpipで入ってしまうことがありました。(この運用もそろそろ限界を感じていて、次に環境を作り直す機会があったらpipで統一したいと思っています。)

問題の一つはpipでインストールする前に依存ライブラリを調べる方法が分かりにくかったことだと思っていたのですが、ようやく事前に調べかたがわかったのでそれを紹介します。

どうやら、PyPI の特定のURLでアクセスできるJSONファイルに、必要な情報が載っているようです。ここに書いてありました。
参考: PyPIJSON – Python Wiki

バージョンを指定しない場合は、
https://pypi.python.org/pypi/<package_name>/json
バージョンを指定する場合は、
https://pypi.python.org/pypi/<package_name>/<version>/json
というURLにアクセスすると、そのパッケージ(ライブラリ)の情報が取得できます。

試しに jupyter notebook (pip install notebook でインストールするので、ライブラリ名はnotebook)の情報ページである
https://pypi.python.org/pypi/notebook/json
にアクセスしていただくと分かりますが、かなりでかいJSONが得られます。

ここからテキストエディターで必要な情報を得るのは骨が折れるので、Python使って欲しい情報を探しましょう。

偶然見つけたのですが、 pprint というメソッドのドキュメントでの使用例がこのJSONの表示だったりします。そこでは urllibを使っていますがこれは若干使いにくいので僕はrequestsを使います。
参考: requestsを使って、Webサイトのソースコードを取得する

では、試しに notebook の 情報をとってみましょう。

import requests


package_name = "notebook"
url = f"https://pypi.org/pypi/{package_name}/json"
json = requests.get(url).json()

# このJSONはかなりでかい
print(len(str(json)))
# 113699

# JSONのkeys。 この中の info が必要な情報を含んでいる。
print(json.keys())
# dict_keys(['info', 'last_serial', 'releases', 'urls', 'vulnerabilities'])

# infoの下に、多くの情報がある。
print(json["info"].keys())
"""
dict_keys(['author', 'author_email', 'bugtrack_url', 'classifiers',
        'description', 'description_content_type', 'docs_url', 'download_url',
        'downloads', 'home_page', 'keywords', 'license', 'maintainer',
        'maintainer_email', 'name', 'package_url', 'platform', 'project_url',
        'project_urls', 'release_url', 'requires_dist', 'requires_python',
        'summary', 'version', 'yanked', 'yanked_reason'])
"""

# requires_dist が依存ライブラリの情報。リスト形式なので、順番に表示する
for requires_dist_text in json["info"]["requires_dist"]:
    print(requires_dist_text)

"""
jinja2
tornado (>=6.1)
pyzmq (>=17)
argon2-cffi
ipython-genutils
traitlets (>=4.2.1)
jupyter-core (>=4.6.1)
jupyter-client (>=5.3.4)
nbformat
nbconvert
nest-asyncio (>=1.5)
ipykernel
Send2Trash (>=1.8.0)
terminado (>=0.8.3)
prometheus-client
sphinx ; extra == 'docs'
nbsphinx ; extra == 'docs'
sphinxcontrib-github-alt ; extra == 'docs'
sphinx-rtd-theme ; extra == 'docs'
myst-parser ; extra == 'docs'
json-logging ; extra == 'json-logging'
pytest ; extra == 'test'
coverage ; extra == 'test'
requests ; extra == 'test'
nbval ; extra == 'test'
selenium ; extra == 'test'
pytest-cov ; extra == 'test'
requests-unixsocket ; (sys_platform != "win32") and extra == 'test'
"""

# requires_python で Pythonのバージョンの指定も見れる
print(json["info"]["requires_python"])
# >=3.6

extra がついているのはオプション付きでインストールする時に必要になる物なので、基本的に、次のライブラリが必要であることがわかりますね。
jinja2
tornado (>=6.1)
pyzmq (>=17)
argon2-cffi
ipython-genutils
traitlets (>=4.2.1)
jupyter-core (>=4.6.1)
jupyter-client (>=5.3.4)
nbformat
nbconvert
nest-asyncio (>=1.5)
ipykernel
Send2Trash (>=1.8.0)
terminado (>=0.8.3)
prometheus-client

ちょっとテストしてみましょう。 pyenv で新しい環境作って、notebook入れてみます。
(version 3.8.7と微妙に古いバージョン入れていますがこれは適当です。

# 新しい仮想環境を構築
$ pyenv install 3.8.7
# 環境切り替え
$ pyenv global 3.8.7
# ライブラリが何も入ってないことを確認(出力がない)
$ pip freeze
# notebook インストール
$ pip install notebook
# 依存ライブラリと共にインストールされたことを確認
$ pip freeze
appnope==0.1.2
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
attrs==21.2.0
backcall==0.2.0
bleach==4.1.0
cffi==1.15.0
debugpy==1.5.1
decorator==5.1.0
defusedxml==0.7.1
entrypoints==0.3
importlib-resources==5.4.0
ipykernel==6.6.0
ipython==7.30.1
ipython-genutils==0.2.0
jedi==0.18.1
Jinja2==3.0.3
jsonschema==4.3.2
jupyter-client==7.1.0
jupyter-core==4.9.1
jupyterlab-pygments==0.1.2
MarkupSafe==2.0.1
matplotlib-inline==0.1.3
mistune==0.8.4
nbclient==0.5.9
nbconvert==6.3.0
nbformat==5.1.3
nest-asyncio==1.5.4
notebook==6.4.6
packaging==21.3
pandocfilters==1.5.0
parso==0.8.3
pexpect==4.8.0
pickleshare==0.7.5
prometheus-client==0.12.0
prompt-toolkit==3.0.24
ptyprocess==0.7.0
pycparser==2.21
Pygments==2.10.0
pyparsing==3.0.6
pyrsistent==0.18.0
python-dateutil==2.8.2
pyzmq==22.3.0
Send2Trash==1.8.0
six==1.16.0
terminado==0.12.1
testpath==0.5.0
tornado==6.1
traitlets==5.1.1
wcwidth==0.2.5
webencodings==0.5.1
zipp==3.6.0

予想してたよりずっと多くのライブラリがインストールされましたね。どうやら依存ライブラリたちの依存ライブラリ、もちろんそれらの依存ライブラリも順次インストールされたようです。ただ、一つずつ確認したところ、JSONから取得した依存ライブラリは全て入ったことがわかります。

これは実験しておいてよかったです。必ずしも、JSONから得られたライブラリだけが入るわけではないことがわかりました。

もう一点補足しておくと、requires_dist には必ず値が入っているわけではありません。当然ですが依存ライブラリがないライブラリもあります。その場合は空配列になっているのかな、と思ったのですが、 null になるようですね。 NumPyなどがそうです。

package_name = "numpy"
url = f"https://pypi.org/pypi/{package_name}/json"
json = requests.get(url).json()
print(json["info"]["requires_dist"])
# None

以上で、pipインストール前にライブラリの依存ライブラリを調べられるようになりました。

ここで取得したJSONは他にも様々な情報を持っているようなので、それらも調べておこうと思います。

pandas.DataFrameのgroupby関数で計算した結果を各行に展開する

なんとなくドキュメントを眺めていたら、groupby().transform()っていう便利そうな関数を見つけたのでその紹介です。

DataFrameのgroupbyといえば、指定した列をキーとしてグループごとの合計や平均、分散、個数などの集計を行うことができる関数です。

通常は、集計したキーの数=グループの数の行数のDataFrameを戻り値として返してきます。

import pandas as pd


df = pd.DataFrame(
    {
        "category": ["A", "A", "A", "B", "B"],
        "amount": [100, 300, 100, 200, 200],
    }
)
print(df)
"""
  category amount
0        A    100
1        A    300
2        A    100
3        B    200
4        B    200
"""

print(df.groupby("category").sum())
"""
category        
A            500
B            400
"""

ここで、この groupby して得られた集計値を、元のDataFrameの各業に展開したいことがあります。
そのような場合、僕はpd.mergeでデータフレームを結合するか、辞書形式に変換して結合することが多かったです。
例えば以下のようなコードになります。

# mergeで結合する場合
group_df = df.groupby("category").sum()
group_df.reset_index(inplace=True)
group_df.rename(columns={"amount": "category_amount"}, inplace=True)
print(pd.merge(df, group_df, on="category", how="left"))
"""
  category  amount  category_amount
0        A     100              500
1        A     300              500
2        A     100              500
3        B     200              400
4        B     200              400
"""

# 辞書を作ってマッピングする場合
group_df = df.groupby("category").sum()
sum_dict = group_df.to_dict()["amount"]
print(sum_dict)
# {'A': 500, 'B': 400}
df["category_amount"] = df["category"].apply(sum_dict.get)
print(df)
"""
  category  amount  category_amount
0        A     100              500
1        A     300              500
2        A     100              500
3        B     200              400
4        B     200              400
"""

書いてみるとこれらの手順を踏んでもそんなに複雑ではないのですが、やっぱり一発でできるともっと便利です。

そこで使えるのが、冒頭で紹介した、transformです。
参考: pandas.core.groupby.DataFrameGroupBy.transform

これは元のデータフレームと同じインデックスを持つデータフレームとして、GroupByの結果を返してくれます。ちょっとやってみます。

df = pd.DataFrame(
    {
        "category": ["A", "A", "B", "B", "B"],
        "amount": [100, 300, 100, 200, 200],
    }
)

# 元のDataFrameと同じ行数で、対応する行の"category"列の値が含まれるグループの合計を返す
print(df.groupby("category").transform("sum"))
"""
   amount
0     400
1     400
2     500
3     500
4     500
"""

# 元のDataFrameに合計値を付与したい場合は次のようにできる
df["category_amount"] = df.groupby("category").transform("sum")["amount"]
print(df)
"""
  category  amount  category_amount
0        A     100              400
1        A     300              400
2        B     100              500
3        B     200              500
4        B     200              500
"""

1行で済みましたね。

この新しく作った列を使えば、一定件数以下しか存在しないカテゴリの行を削除するとか、カテゴリごとにそれぞれの要素のカテゴリ内で占めてる割合を計算するとか、それぞれの要素のカテゴリごとの平均との差異を求めるとかそういった計算が非常に容易にできるようになります。

そしてさらに、このtransform とlambda関数を組み合わせて使うと、カテゴリの平均との差を一発で出す、といったこともできます。

df = pd.DataFrame(
    {
        "category": ["A", "A", "B", "B", "B"],
        "amount": [100, 300, 100, 200, 200],
    }
)
print(df.groupby("category").transform(lambda x: x-x.mean()))
"""
       amount
0 -100.000000
1  100.000000
2  -66.666667
3   33.333333
4   33.333333
"""

lambda 関数に渡されている x はそれぞれの行の値のように振る舞ってくれるにもかかわらず、同時に x.mean() でグループごとの平均を出すこともでき、その差分を元のDataFrameとインデックスを揃えて返してくれています。

これは使いこなせば相当便利なメソッドになりそうです。

MeCabで分かち書き済みの単語に対して品詞を判定する

MeCabで形態素解析してテキストを単語に分解するとき、分かち書きしたテキストと、品詞情報が得られます。その単語の出現頻度等を集計した後で、この単語はこの品詞、という情報を付与して絞り込み等をやりたくなったのでその方法をメモしておきます。

実は以前ワードクラウドを作った時に品詞別に色を塗るために似たようなコードを作っています。今回の記事はその改良版です。
参考: WordCloudの文字の色を明示的に指定する

この記事では次のようなコードを使いました。(参照した記事は先行するコードでMeCabのTaggerインスタンスを作ってる前提なのでその辺ちょっと補って書きます。)

import MeCab


tagger = MeCab.Tagger()
def get_pos(word):
    parsed_lines = tagger.parse(word).split("\n")[:-2]
    features = [l.split('\t')[1] for l in parsed_lines]
    pos = [f.split(',')[0] for f in features]
    pos1 = [f.split(',')[1] for f in features]

    # 名詞の場合は、 品詞細分類1まで返す
    if pos[0] == "名詞":
        return f"{pos[0]}-{pos1[0]}"

    # 名詞以外の場合は 品詞のみ返す
    else:
        return pos[0]

参照した記事で補足説明書いてますとおり、このコードは単語をもう一回MeCabにかけて品詞を取得しています。その時に万が一単語がさらに複数の形態素に分割されてしまった場合、1つ目の形態素の品詞を返すようになっています。

このコードを書いた時、単語がさらに分解されるってことは理論上はありうるけど、滅多にないだろう、と楽観的に考えていました。ところが、色々検証していると実はそんな例が山ほどあることがわかってきました。

例えば、「中国語」という単語がありますが、これ単体でMeCabに食わせると「中国」と「語」に分かれます。以下が実行例です。

# 形態素解析結果に「中国語」が出る例
$ echo "彼は中国語を話す" | mecab
彼	名詞,代名詞,一般,*,*,*,彼,カレ,カレ
は	助詞,係助詞,*,*,*,*,は,ハ,ワ
中国語	名詞,一般,*,*,*,*,中国語,チュウゴクゴ,チューゴクゴ
を	助詞,格助詞,一般,*,*,*,を,ヲ,ヲ
話す	動詞,自立,*,*,五段・サ行,基本形,話す,ハナス,ハナス
EOS

# 「中国語」がさらに「中国」 と「語」に分かれる
$ echo "中国語" | mecab
中国	名詞,固有名詞,地域,国,*,*,中国,チュウゴク,チューゴク
語	名詞,接尾,一般,*,*,*,語,ゴ,ゴ
EOS

「中国語」が固有名詞、地域、国と判定されるとちょっと厄介ですね。

他にも、「サバサバ」は「サバ」「サバ」に割れます。

$ echo "ワタシってサバサバしてるから" | mecab
ワタシ	名詞,固有名詞,組織,*,*,*,*
って	助詞,格助詞,連語,*,*,*,って,ッテ,ッテ
サバサバ	名詞,サ変接続,*,*,*,*,サバサバ,サバサバ,サバサバ
し	動詞,自立,*,*,サ変・スル,連用形,する,シ,シ
てる	動詞,非自立,*,*,一段,基本形,てる,テル,テル
から	助詞,接続助詞,*,*,*,*,から,カラ,カラ
EOS
$ echo "サバサバ" | mecab
サバ	名詞,一般,*,*,*,*,サバ,サバ,サバ
サバ	名詞,一般,*,*,*,*,サバ,サバ,サバ
EOS

他にも「ありえる」が「あり」「える」とか、「無責任」が「無」「責任」とか「ビュッフェ」が「ビュッ」「フェ」など、かなりの種類の単語が再度分解されます。

ということで、冒頭にあげた get_pos メソッドは思っていたよりもずっと誤判定しやすいということがわかってきました。

前置きが長くなってきましたが、このことを踏まえて、単語を再度分割することのないようにその単語としての品詞情報を取得できないかを考えました。

結局、制約付き解析機能を使って実現できそうだということがわかりました。
参考: MeCabの制約付き解析機能を試す

要するに、MeCabに渡された単語はそれで1単語だ、という制約を課せば良いわけです。

そのためには、-pオプション付きでTaggerを生成し、「{単語}{タブ}*(アスタリスク)」という形式のテキストに変換してTaggerでparseすれば大丈夫です。

Pythonのコードで書くと次のようになりますね。

import MeCab
tagger = MeCab.Tagger("-p")


def get_pos(word):
    # 制約付き解析の形態素断片形式にする
    p_token = f"{word}\t*"
    # 出力のEOS部分を捨てる
    parsed_line = tagger.parse(p_token).splitlines()[0]
    feature = parsed_line.split("\t")[1]
    # ,(カンマ)で区切り、品詞,品詞細分類1,品詞細分類2,品詞細分類3 の4項目残す
    pos_list = feature.split(",")[:4]
    # もう一度 ,(カンマ) で結合して返す
    return ",".join(pos_list)


# 利用例
print(get_pos("中国語"))
# 名詞,一般,*,*

上のコードは、品詞を再分類3まで取得するようにしましたが、最初の品詞だけ取得するとか、*(アスタリスク)の部分は省略するといった改修はお好みに合わせて容易にできると思います。

これで一旦今回の記事の目的は果たされました。

ただ、元の文中でその単語が登場したときの品詞が取得されているか、という観点で見るとこのコードも完璧ではありません。

表層系や原型が等しいが品詞が異なる単語が複数存在する場合、通常のMeCabの最小コスト法に則って品詞の一つが選ばれることになります。BOS/EOSへの連接コストとその品詞の単語の生起コストが考慮されて最小になるものが選ばれる感じですね。

分かち書き前のテキストで使われていたときの品詞が欲しいんだ、となると後からそれを付与するのは困難というより不可能なので、分かち書きした時点でちゃんと保存してどこかに取っておくようにしましょう。

あとおまけで、このコードを書いてる時に気づいたMeCabの制約付き解析機能の注意点を書いておきます。MeCabを制約付き解析モードで使っている時に、「表層\t素性パターン」”ではない”テキスト、つまり文断片と呼ばれている文字列を改行コード付けずに渡すとクラッシュするようです。
-p 付きで起動したときは、「表層\t素性パターン」形式の形態素断片か改行コードを必ず含むテキストで使うようにしましょう。

jupyter notebookでやると カーネルごとお亡くなりになりますので特に要注意です。

ちょっとコンソールでやってみますね。

$ python
>>> import MeCab
>>> tagger = MeCab.Tagger("-p")
>>> tagger.parse("中国語")
Segmentation fault: 11
# これでPythonが強制終了になる
$

改行コードつければ大丈夫であることは以下のようにして確認できます。

$ python
>>> import MeCab
>>> tagger = MeCab.Tagger("-p")
>>> tagger.parse("中国語\n")
'中国\t名詞,固有名詞,地域,国,*,*,中国,チュウゴク,チューゴク\n語\t名詞,接尾,一般,*,*,*,語,ゴ,ゴ\nEOS\n'

-p をつけてないときは別に改行コードなしのテキストも読み込んでくれるのでこれはちょっと意外でした。

制約付き解析(-p付き)でMeCabを使っている時に、「Segmentation fault: 11」が出たらこのことを思い出してください。

jupyter notebookのセルの出力をコードでクリアする

諸事情ありまして、jupyter notebookのセルの出力をクリアする方法を知りたくなったので調べました。
通常、jupyterではテキストを複数回にわたってprintしたり、matplotlibの図をいくつも出力するコードを1つのセルに書くと、出力したテキストなり図なりがダーっと続けて出てきます。
ちょっとこれを逐一クリアして新しいものだけ残すようにしたかったのです。
(こんなことする必要があることは滅多にないのですが。)

実は、クリアしたい対象がprintした1行以内のテキストの場合、それを実装する方法は過去に紹介したことがあります。それはprintメソッドのend引数を使ってprint後に改行コードを出力しないようにし、キャリッジリターン(“\r”)で出力位置を行頭に戻して空白で上書きしてしまうというものです。
これ使ってプログレスバーを作った記事が過去にありますね。
参考: printでお手軽プログレスバー

例えば、jupyterで次のコードを動かすと0~49まで数字がカウントアップします。
\r でカーソルを先頭にもどして、空白で埋めて、最後に次のprintのためにもう一回カーソルを先頭に戻しています。 end=”” はprint後に改行させない設定です。
sleep() は入れておかないと一瞬すぎて何も見えないのでウェイトとして入れています。

import time

for i in range(50):
    print("\r          \r", end="")
    print(i, end="")
    time.sleep(0.5)

ただ、さっきも書きましたがこの方法だと1行のテキストしか消せません。

複数行の出力だったらどうやって消すのかなと思って調べた結果見つかったのが、IPython モジュールにあった、 clear_output というメソッドです。
正確には、IPython.display.clear_output として実装されています。
ドキュメントはこちらです。
参考: Module: display — IPython 7.30.1 documentation

Clear the output of the current cell receiving output. とある通り、これが実行されるとそのステップが含まれたセルの出力だけを消してくれます。他のセルの出力は残してくれるので安心ですね。

wait (デフォルトはFalse)という便利な引数も持っています。これは、Falseにしておくと即座に出力を消すのに対して、Trueを渡すと、次の出力がくるのを待って消してくれます。連続して何かを出力するようなコードの場合、Trueにしておくと出力をスムーズに入れ替えるような動きになるのです。 Falseだと一瞬何も出力がない状態になるので次のセルとの間が詰まって 以降のセルがガクガク動きます。

以下のようにして、1秒ごとに現在時刻を表示する時計のような出力も出せます。

from IPython.display import clear_output
from datetime import datetime
import time


for i in range(10):
    print("現在時刻\n", datetime.now())
    clear_output(True)
    time.sleep(1)

"""
現在時刻
 2021-12-14 23:58:34.942141

上のような出力が1秒ごとに更新されて書き換えられる
"""

clear_outputはテキストだけではなく、図もクリアしてくれます。これを応用すると、パラパラ漫画のようにして手軽にアニメーションを作ることができます。

徐々にデータが増えて延びる折れ線グラフを描いてみたのが次のコードです。

import matplotlib.pyplot as plt
import numpy as np

# プロットする点を格納する配列
X = []
Y = []

for i in range(100):
    # 新しい点を追加する
    X.append(i)
    Y.append(np.random.randn())  # y座標には乱数入れる

    clear_output(True)  # それまでの出力をクリアする

    # グラフ作図
    fig = plt.figure(facecolor="w")  # 出力をクリアしたら改めてfigreオブジェクトが必要らしい
    ax = fig.add_subplot(111)
    ax.plot(X, Y)
    # グラフ表示
    plt.show()
    time.sleep(0.1)

このコードで jupyter 上にはアニメーションが表示できます。

実質的には clear_output(True) を差し込んでるだけなので、かなり手軽ですね。
ただ、これには一つ欠点もあって、jpyter上で簡易的に図を書いたり消したりしてアニメーションっぽく見せているだけなのでこのまま動画として保存することはできません。
(そのためこの記事にも結果の画像を貼っていません)

もし、gif形式などで保存したい場合は、少々面倒になるのですが、 ArtistAnimation などを使いましょう。過去の記事で取り上げています。
参考: matplotlibでgif動画生成

subprocessでパイプラインの実装

前回に続いてsubprocessの話です。予告していた通り、PythonでOSコマンドをパイプラインで繋いで実行する方法を紹介します。

まず前提ですが、subprocess.run にパイプラインを含むOSコマンドを渡してもそのままでは動きません。例えば実行中のプロセスから jupyter の文字を含む次のようなコマンドを考えます。

$ ps aux | grep jupyter
yutaro             762   0.0  0.8  4315736  67452 s000  S    11:55PM   0:03.71 {Pythonのパス} {pyenvのパス}/versions/anaconda3-2019.10/bin/jupyter-notebook
yutaro             910   0.0  0.0  4278648    712 s000  S+   12:04AM   0:00.00 grep jupyter

このコマンドをそのまま subprocess に渡しても動かないわけです。

import subprocess


cp = subprocess.run(
    ["ps", "aux", "|", "grep", "jupyter"],
    capture_output=True,
    text=True
)
# リターンコードが0ではない
print(cp.returncode)
# 1

# 標準出力は空っぽ
print(cp.stdout)
# 
# 標準エラー出力にはエラーが出ている
print(cp.stderr)
"""
ps: illegal argument: |
usage: ps [-AaCcEefhjlMmrSTvwXx] [-O fmt | -o fmt] [-G gid[,gid...]]
          [-u]
          [-p pid[,pid...]] [-t tty[,tty...]] [-U user[,user...]]
       ps [-L]
"""

実は、パイプラインを含むコマンドを簡単に動かす方法はあります。それがshell引数にTrueを渡すことです。これは渡されたコマンドをシェルによって実行するオプションです。この場合、コマンドは空白で区切った配列ではなく一つの文字列で渡します。

cp = subprocess.run(
    "ps aux | grep jupyter",
    capture_output=True,
    text=True,
    shell=True
)

# リターンコードは0
print(cp.returncode)
# 0
# 標準出力に結果が入る
print(cp.stdout)
# 結果略。
# 標準エラー出力は空
print(cp.stderr)
# 

ただし、ドキュメントに「注釈 shell=True を使う前に セキュリティで考慮すべき点 を読んでください。」という注釈がついてるように、これはセキュリティ面で問題がある方法のようです。
参考: セキュリティで考慮すべき点
シェルインジェクションを避けるのはアプリ側の責任だって書いてありますね。この点気をつけて使いましょう。

さて、色々検証してみたのですが、 shell=True を使わなくてもパイプラインを実装する方法はあるようです。それは単純に標準入力を使う方法で、1個目のコマンドの標準出力を2個目のコマンドの標準入力に渡してあげます。

とりあえず、パイプラインではなく単一のコマンドで標準入力を使ってみましょう。macabコマンドに、いつもの「すもももももももものうち」を渡してみます。

runメソッドに標準入力を渡すには、 input という引数を使います。これで注意しないといけないのは、inputには”バイト列”でデータを渡す必要があることです。str型だとエラーになるので、encode() してから渡します。ただ、text=True も指定するときは逆にstrで渡さないといけないようですね。

text = "すもももももももものうち"  # 入力するテキスト
text_byte = text.encode()  # byte型にエンコード

cp = subprocess.run(
    "mecab",
    capture_output=True,
    input=text_byte  # 通常はbyte型で標準入力を渡す
)

# byte型でデータが返ってきているので、decode()して表示
print(cp.stdout.decode())
"""
すもも	名詞,一般,*,*,*,*,すもも,スモモ,スモモ
も	助詞,係助詞,*,*,*,*,も,モ,モ
もも	名詞,一般,*,*,*,*,もも,モモ,モモ
も	助詞,係助詞,*,*,*,*,も,モ,モ
もも	名詞,一般,*,*,*,*,もも,モモ,モモ
の	助詞,連体化,*,*,*,*,の,ノ,ノ
うち	名詞,非自立,副詞可能,*,*,*,うち,ウチ,ウチ
EOS
"""

# text=True を指定するときは str型で標準入力を渡す
cp = subprocess.run(
    "mecab",
    capture_output=True,
    text=True,
    input=text  # text=True を指定するときは str型で標準入力を渡す
)
# str型で格納されているのでそのままprintできる
print(cp.stdout)
"""
結果は同じなので略
"""

さて、標準入力の渡し方がわかったら、あとは先行するコマンドの標準出力を次のコマンドの標準入力に渡すだけです。

最初の ps aux | grep jupyter でやってみましょう。

cp1 = subprocess.run(
    ["ps", "aux"],
    capture_output=True,
    text=True,
)

cp2 = subprocess.run(
    ["grep", "jupyter"],
    capture_output=True,
    text=True,
    input=cp1.stdout  # 一つ目のコマンドの標準出力を渡す
)
print(cp2.stdout)
"""
yutaro             762   0.0  0.8  4315736  67720 s000  S    11:55PM   0:05.04 {Pythonのパス} {pyenvのパス} /versions/anaconda3-2019.10/bin/jupyter-notebook
"""

この記事の先頭のコマンドの結果と微妙に異なりますね。 grep jupyter のプロセスが出てきません。これは、ps aux だけ先行して動かし、その結果をもとにgrepしているので、厳密にはシェルでパイプラインしたのとは異なるからそうなっているのでしょう。

ただ、通常の用途であればほぼ同じ結果が得られると思います。
どうしても差分が気になるのであれば shell=Trueの方の方法を使うことも検討が必要でしょうね。

サンプルとして選んだコマンドがイマイチだったので、厳密にいうと再現できてないサンプルを提示してしまったのですが、このようにして、PythonでOSコマンドのパイプラインが再現できます。

subprocessでPythonからOSのコマンドを実行する

このブログの過去記事でもすでに使ったことがあるのですが、改めてsubprocessの使い方をまとめておきます。
ドキュメントはこちら。
参考: subprocess — サブプロセス管理 — Python 3.10.0b2 ドキュメント

subprocessは os.system を置き換えるために作られた新し目のモジュールらしいので、僕も新しい方法としてこれを使っていたのですが、Python 3.5 から subprocess に run() というメソッドが実装され、僕が書いていた方法はいつの間にか古い方法になってしまっていたようです。ドキュメントを少し引用します。

サブプロセスを起動するために推奨される方法は、すべての用法を扱える run() 関数を使用することです。より高度な用法では下層の Popen インターフェースを直接使用することもできます。
run() 関数は Python 3.5 で追加されました; 過去のバージョンとの互換性の維持が必要な場合は、古い高水準 API 節をご覧ください。

subprocess — サブプロセス管理 — Python 3.10.0b2 ドキュメント

ちなみに、古い方法では、コマンドを実行したいだけなら call 、出力を得たかったら getoutput を使っていました。

import subprocess


# mkdir sample_dir を実行。 空白を含むコマンドは空白で区切って配列で渡す
subprocess.call(["mkdir", "sample_dir"])  # 成功すれば戻り値 として 0が帰ってくる

# 標準出力の結果が欲しい場合は getoutput メソッドを使う
output_str = subprocess.getoutput("ls -la")
print(output_str)

さて、本題の新しい方法の run の説明に入りましょう。
このメソッドはどうやら非常に多くの種類の引数をとるそうで、ドキュメントでも、「上記の引数は、もっともよく使われるものだけ示しており、後述の よく使われる引数 で説明されています」とある通り一部の引数しか掲載されていません。それでもこれだけ書かれています。

subprocess.run(
    args, *, stdin=None, input=None, stdout=None,
    stderr=None, capture_output=False, shell=False, cwd=None,
    timeout=None, check=False, encoding=None, errors=None,
    text=None, env=None, universal_newlines=None,
    **other_popen_kwargs)

基本的には、コマンドをスペースで区切って配列にし、callの時と同じように渡せば良いようです。touchでファイルを作ってみます。

subprocess.run(["touch", "sample_dir/sample1.txt"])
# CompletedProcess(args=['touch', 'sample_dir/sample1.txt'], returncode=0)

上のコード例は jupyter notebookで動かした時のイメージなので、勝手に最後のメソッドの戻り値がnotebookに表示されたのですが、これでわかる通り、 CompletedProcess というクラスのインスタンスを返してくれます。lsなどの標準出力を取りたい場合は、 capture_output を Trueにしておきます。

cp = subprocess.run(["ls", "-la", "sample_dir"])
print(cp.stdout)  # capture_output を指定しないと、stdoutに結果が入ってない
# None

cp = subprocess.run(["ls", "-la", "sample_dir"], capture_output=True)
print(type(cp.stdout))  # 結果はバイト型で入ってくる
# <class 'bytes'>
print(cp.stdout.decode())  # 文字列に変換したい場合はdecodeする
"""
total 0
drwxr-xr-x  3 {ユーザー名}  {グループ名}   96 12  8 00:41 .
drwxr-xr-x  7 {ユーザー名}  {グループ名}  224 12  8 00:52 ..
-rw-r--r--  1 {ユーザー名}  {グループ名}    0 12  8 00:41 sample1.txt
"""

cp = subprocess.run(["ls", "-la", "sample_dir"], capture_output=True, text=True)
print(type(cp.stdout))  # text=True も指定しておくと、str型で得られるのでdecodeがいらない。
# <class 'str'>
print(cp.stdout)
# (上のと同じなので) 出力略 

この、capture_output は 3.7 で追加されたそうで runメソッド本体より新しいオプションになります。 capture_output を使わない場合、 stdout と stderr にそれぞれ標準出力と標準エラー出力を指定することになります。ドキュメントでは PIPE とか STDOUT とかを指定するよう書かれていますがこれらは、 subprocess.PIPE, subprocess.STDOUT のことです。
両引数にそれぞれsubprocess.PIPE を指定すると、capture_output=Trueにしたのと同じ動きになります。stdout=subprocess.PIPE と stderr=subprocess.STDOUT の組み合わせで指定すると、標準出力と標準エラー出力を両方ともstdoutに格納してくれます。

ちょっと tarコマンドあたりでやってみます。出力先ファイルを – (ハイフン) にしておくと tar は結果のアーカイブをファイルを作らずに結果を標準出力に出力します。
また、 v をつけておくと標準エラー出力に処理したファイル情報を出すので subprocess の挙動確認にちょうど良さそうです。

# capture_output=True, と stdout=subprocess.PIPE, stderr=subprocess.PIPE は同じ動き
cp = subprocess.run(["tar", "cvf", "-", "sample_dir"],
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(cp.stdout)
"""
{tarファイルの中身}
"""
print(cp.stderr)
"""
a sample_dir
a sample_dir/sample1.txt
"""

# stderr=subprocess.STDOUT とすると、標準エラー出力も標準出力に追記される
cp = subprocess.run(["tar", "cvf", "-", "sample_dir"],
                    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
# 標準エラー出力に出るはずだったアーカイブ対象情報もこちらに出る
print(cp.stdout)
"""
a sample_dir
a sample_dir/sample1.txt
{tarファイルの中身}
"""
# stderrは空
print(cp.stderr)
# None

最後に、コマンドがエラーになった時の処理です。
基本的には、 CompletedProcess が returncode という要素を持っているので、これで判定すれば良いと思います。 たとえば、 sample_dir というディレクトリは上のサンプルコードで作ったのが既にあるので、もう一度作ろうとすると失敗し、returncode が1になります。

cp = subprocess.run(["mkdir", "sample_dir"])
print(cp.returncode)
# 1

逆にいうと、コマンドが失敗してもPythonとしては特にエラーにならず、それ以降もコードがあるのであればプログラムは走り続けるということです。コマンドを実行したらreturncodeを確認して失敗してたら止めるような処理を明示的に作っておかないと予期せぬバグに繋がることもあるので気をつけましょう。

returncodeを確認するのではなく、コマンドが失敗したら例外を上げて欲しい、という場合は check=Trueを指定しておきましょう。

try:
    cp = subprocess.run(["mkdir", "sample_dir"], check=True)
except Exception as e:
    print(e)
    # Command '['mkdir', 'sample_dir']' returned non-zero exit status 1.

ちなみにですが、存在しないコマンドを渡すと check=True を指定していなくても例外が上がります。コマンドが存在しないのと、コマンドの結果がエラーになったのは明確に違う扱いになっているようですね。

try:
    cp = subprocess.run(["abcdefg", "aaaa"])
except Exception as e:
    print(e)
    # [Errno 2] No such file or directory: 'abcdefg': 'abcdefg'

これで簡単なコマンドであれば subprocess.run を使って実行できると思います。

あと、パイプラインを使うようなやり方について現在調べて検証しているので次の記事で紹介したいと思っています。

scikit-learnのSimpleImputerで欠損値の補完

とある講演を聞いていて、SimpleImputerという機能の存在を知ったのでその紹介です。(その講演自体は別のテーマで話されていたのですが、その中でSimpleImputerは常識みたいに扱われていたのに、僕は使ったことがなかったので試そうと思いました。)

これは何かというと、pandasのDataFrameやNumPyのArray中にある欠損値を補完してくれるものです。目的はpandasのDataFrameの機能でいうところのfillna()に近いですね。
fillna()で十分だという意見もあると思いますし、実際僕もfillna()で済ませてきたのでこれの存在を知らなかったのですが、ちゃんとSimpleImputerのメリットもあるのでその辺説明していきたいと思います。

ドキュメントはこちらです。
sklearn.impute.SimpleImputer — scikit-learn 1.0.1 documentation
6.4. Imputation of missing values — scikit-learn 1.0.1 documentation

version 0.20 から登場したモデルで、その前まで存在した、 sklearn.preprocessing.Imputer の置き換えとして実装されたようですね。

とりあえず補完対象となる欠損値を含むデータがないと始まらないので、適当に準備します。

import pandas as pd


# 欠損値を含むDataFrameを生成
df = pd.DataFrame(
    {
        "col1": [8, None, 6, 3, None],
        "col2": [None, 8, 2, 2, 10],
        "col3": [3, 10, None, 0, 3],
    }
)
print(df)
"""
   col1  col2  col3
0   8.0   NaN   3.0
1   NaN   8.0  10.0
2   6.0   2.0   NaN
3   3.0   2.0   0.0
4   NaN  10.0   3.0
"""

これで一部のデータが欠損しているDataFrameができましたね。それでは、SimpleImputer を使ってきましょう。SimpleImputer を使うときには、まず欠損値を埋める方法を決める必要があります。 その列の欠損してない値の 平均値、中央値、最頻値を用いて欠損値を埋めるか、もしくは定数を使って埋めることになります。

埋め方を決めたらそれは strategy 引数で指定します。対応は以下の通りです。
– mean ・・・ 平均値
– median ・・・ 中央値
– most_frequent ・・・ 最頻値
– constant ・・・ 定数 (別途、fill_value 引数で定数を指定する)

今回はお試しなので、 mean (平均値) でやってみます。

# インスタンスを生成
imp_mean = SimpleImputer(strategy='mean')
# 学習
imp_mean.fit(df)
# 学習したパラメーター(補完に使う平均値を表示)
print(imp_mean.statistics_)
# [5.66666667 5.5        4.        ]

# 欠損値を補完
imp_ary = imp_mean.transform(df)
# 結果を表示
print(imp_ary)
"""
[5.66666667 5.5        4.        ]
[[ 8.          5.5         3.        ]
 [ 5.66666667  8.         10.        ]
 [ 6.          2.          4.        ]
 [ 3.          2.          0.        ]
 [ 5.66666667 10.          3.        ]]
"""

# 補完後の型はNumpyのArraryになる
print(type(imp_ary))
# <class 'numpy.ndarray'>

fit で各列の平均値を学習し、その値を使ってNaNだったところを埋めてくれていますね。
注意しないといけないのは transform して戻ってくるデータはNumPyのArrayになっていることです。(上のサンプルコードで型を見ている通り。)

欠損値補完後のデータもDataFrameで欲しいんだという場合は再度DataFrameに変換する必要があるようです。モデルの引数でそういうオプションがあるといいのですが、今の時点のバージョン(1.0.1)ではなさそうなので自分でやりましょう。

imp_df = pd.DataFrame(imp_mean.transform(df), columns=df.columns)
print(imp_df)
"""
       col1  col2  col3
0  8.000000   5.5   3.0
1  5.666667   8.0  10.0
2  6.000000   2.0   4.0
3  3.000000   2.0   0.0
4  5.666667  10.0   3.0
"""

正直、このように単一のDataFrameにたいしてそのDataFrameの統計量を使って補完するのであれば、fillnaの方が使いやすい気がします。次のようにして同じ結果が得られるので。

print(df.fillna(value=df.mean()))
"""
       col1  col2  col3
0  8.000000   5.5   3.0
1  5.666667   8.0  10.0
2  6.000000   2.0   4.0
3  3.000000   2.0   0.0
4  5.666667  10.0   3.0
"""

これは、fillnaが列ごとに個別の値を設定できることを利用し、valueにそれぞれの列の平均値(df.mean())を渡すことによって実現しているものです。

ちなみに、SimpleImputer で strategy=’constant’ を指定する場合、fillnaのように列ごとに違う値を指定することはできません。次のように定数を一つだけ指定してそれで補完します。
この点はfillnaと比較したときに明確なデメリットですね。

imp_cons = SimpleImputer(strategy='constant', fill_value=-1)
imp_cons.fit(df)
print(imp_cons.transform(df))
"""
[[ 8. -1.  3.]
 [-1.  8. 10.]
 [ 6.  2. -1.]
 [ 3.  2.  0.]
 [-1. 10.  3.]]
"""

strategy=’constant’ の場合、補完する数値を計算する必要がないので、fit()したときに何を学習しているのか不明だったのですが、どうやらここで渡した列数を記憶しているようです。(fit に3列のDataFrameを渡すと、transformメソッドも3列のDataFrameしか受け付けなくなる。)

さて、これまでの説明だと、どうもfillna()の方が便利に思えてきますね。(Numpyにはfillnaがないので、元のデータがDataFrameではなく2次元のArrayの場合は使えるかもしれませんが。)

ここから SimpleImputer のメリットの紹介になります。

一つの目のメリットは、一度学習させた補完値を他のデータにも適用することができるということです。例えば機械学習の前処理でデータを補完する場合、訓練データの欠損値をある値で補完したのであれば、検証データの欠損値もその値で補完したいですし、本番環境にリリースして実運用が始まった後も欠損値があるデータに対して補完をかけるのであれば訓練時と同じデータで補完したいです。

SimpleImputer であれば、 fitした時点で補完する値を学習しているので、別のデータに対してtransformしてもその値を使って補完してくれます。
fillna でも、補完に使うデータをどこかに退避しておいてそれを使えばいいじゃないか、という声も聞こえてきそうですし、実際そうなのですが、fitしたモデルを保存しておいてそれを使う方が、補完に使うデータ(辞書型か配列か)をどこかに退避しておくより気楽に感じます。(個人の感想です。なぜそう感じるのかは自分でもよくわかりません。)

# 元と別のデータ
df2 = pd.DataFrame({
    "col1": [7, None, 2],
    "col2": [2, 2, None],
    "col3": [6, 8, None],
})

# 学習済みの値を使って補完される
print(imp_mean.transform(df2))
"""
[[7.         2.         6.        ]
 [5.66666667 2.         8.        ]
 [2.         5.5        4.        ]]
"""

もう一つの利点は、これがscikit-learnのモデルなので、Pipelineに組み込めるということです。完全に適当な例なのですが、
平均値で欠損値の補完 -> データの標準化 -> ロジスティック回帰
と処理するパイプラインを構築してみます。
ロジスティック回帰をやるので正解ラベルも適当に作ります。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression


# ロジスティック回帰のサンプルを動かすために架空の正解ラベルを作る
y = [0, 0, 0, 1, 1]

# 欠損値を補完した後、StandardScalerで標準化してロジスティック回帰で予測するパイプライン
clf = Pipeline([
    ("si", SimpleImputer(strategy="mean")),
    ("ss", StandardScaler()),
    ("lr", LogisticRegression()),
])
# 学習
clf.fit(df, y)

このようにPipelineに組み込めるというのは fillna の方にはないメリットだと思います。

繰り返しますが、上の例は完全に適当に作ったパイプラインです。
一般的な話として、前処理次に欠損値を補完した後に標準化するという前処理を推奨しているわけではないのでご注意ください。あくまでも、SimpleImputerを組み込んだパイプラインは動きますということだけを示した例です。

Pythonで月初や月末、週初めの日付を求める方法

今回もPythonの日付操作に関する話です。稀に必要になる月初や月末日付、週の開始日の日付を得る方法を紹介します。

まず、月初からです。まず、ある日付を含む月の月初の日付(要するに1日)を取得したい場合、必要なのは月初の日付を指し示すdateやdatetimeオブジェクトなのか、その日付を表す文字列なのかを考える必要があります。もし、文字列で必要なのであれば、一番簡単な方法は strftimeで’%Y-%m-01’などのフォーマットに指定することです。普通なら%dとする部分を、01にして決め打ちしてるだけですね。

strftimeについては、こちらの記事でも取り扱っています。
参考: pythonで今日の日付を表す文字列をつくる

具体的には次のようになります。

from datetime import datetime


dt1 = datetime(2021, 11, 15)
print(dt1)
# 2021-11-15 00:00:00

# strftime でその日を含む月の1日を示す文字列を得る
print(dt1.strftime("%Y-%m-01"))
# 2021-11-01

いや、日付の文字列ではなく、その月の初日を示すdatetimeオブジェクトが欲しいんだ!という場合、 dateやdatetime オブジェクトが replace というメソッドを持っているので、day=1 と渡してあげると日付部分を書き換えることができます。
参考: datetime.replace

from datetime import datetime
from datetime import date


dt1 = datetime(2021, 11, 15, 11, 0, 0)
print(dt1)
# 2021-11-15 11:00:00

date1 = date(2021, 11, 15)
print(date1)
# 2021-11-15

# replaceで 日付部分を1に書き換える
print(dt1.replace(day=1))
# 2021-11-01 11:00:00

print(date1.replace(day=1))
# 2021-11-01

日付の加算の記事の最後の方で紹介した、relativedelta において、 day=1 (days=1ではないので注意)と引数を渡すと、日付を1に置き換える動きになる、という仕様がありましたが、これを使って実現することもできます。
ただし、datetimeモジュールだけで実現できる捜査を行うのにわざわざ別のライブラリをインポートするメリットはないのでここではコードの実行例は省略します。
参考: Pythonで日付の加算、特にnヶ月後やn年後の日付を求める方法

さて、これで月初の日付を得る方法は得られました。次は月末の日付を得る方法です。

月初の日付は常に1日でしたが、月末の日付は月によって違うので、月初のように書式設定やreplaceで得るのはちょっと面倒です。そこで、月末の日付が欲しい場合は、まずその月の月初の日付を求め、それに1ヶ月足し、さらにその前日を求めるという手順を踏んでいきます。

3ステップもあって面倒だ、と思われるかもしれませんが、relativedelta が実は非常に便利な仕様を持っています。
参考: relativedelta — dateutil 2.8.2 documentation

一部引用します。
There are relative and absolute forms of the keyword arguments. The plural is relative, and the singular is absolute. For each argument in the order below, the absolute form is applied first (by setting each attribute to that value) and then the relative form (by adding the value to the attribute).

要するに、absolute(複数形のつかない置換の引数)が優先されるってことですね。
また、この引用文の直下に書かれていますが、演算は年から始まり、マイクロ秒に向かって大きい順に行われます。

これにより、relativedelta(day=1, months=1, days=-1) とすると、日付を1に置き換えて、1ヶ月足して、1日引くという望んでた処理を行ってくれることになります。

from dateutil.relativedelta import relativedelta

date2 = date(2021, 11, 15)
print(date2)
# 2021-11-15

print(date2 + relativedelta(day=1, months=1, days=-1))
# 2021-11-30

これで月末日付が得られました。

最後にこれはついでになってしまうのですが、週初め(その日付を含む週の月曜日)の日付を計算する方法も書いておきます。これがPandasのDataFrameに入ってるデータだったら以前紹介した方法が使えるのでこちらを見てください。
参考: pandasの日付データを週単位で丸める(to_periodを使う方法)

今回は単体のdateオブジェクトに対する方法です。

これはやり方はいろいろあると思うのですが、個人的にはdateオブジェクトが持っている、weekday()メソッド(月〜日の曜日を0から6の数値で得るメソッド)を使って、これで返ってきた数値分の日数を引くのがいいと思っています。
要するに次のコードのような感じです。

from datetime import timedelta


date2 = date(2021, 11, 17)  # 2021/11/17 は水曜日
print(date2)
# 2021-11-17

print(date2.weekday())  # 水曜なので2が返る
# 2

# weekday() 分の日数を引くと月曜の日付が得られる
print(date2 - timedelta(days=date2.weekday()))
# 2021-11-15

relativedelta にも weekday っていう引数を渡すことができ、これに0を渡すと月曜の日付を返してはくれるのですが、元の日付が月曜日ならその日のまま、月曜以外なら「次の」月曜の日付が帰ってくるんですよね。この仕様が使いやすい場面もあるのかもしれませんが個人的にはいまいちです。

print(date2)
# 2021-11-17

# 月曜以外の日付に対しては、次の月曜が返ってくる
print(date2 + relativedelta(weekday=0))
# 2021-11-22

date3 = date(2021, 11, 8)  # 2021/11/8は月曜
# 月曜の日付に対しては、その日のまま
print(date3 + relativedelta(weekday=0))
# 2021-11-08