os/exec で外部プロセスを正しく実行する

これは Go 4 Advent Calendar 2020 7日目の記事です。

この記事では Go の os/exec パッケージで外部プロセスを実行する際にやりがちな間違いをクイズ形式で紹介します。

外部プロセスを実行するなんていうのはとてもありふれた作業ですが、複数プロセスが走る以上、並行性を扱う必要がどうしても生じます。そして並行処理は人類には早すぎることがよく知られており、外部プロセスの実行も油断していると罠にはまってしまうことがあるので気をつけましょう、という話がメインです。

なお、対象 OS は Linux とします。

問題 1/4

以下はシェルスクリプト scan1.sh を実行してその標準出力を返すコードですが、バグがあります。どのようなバグでしょうか?

func Scan() ([]byte, error) {
    cmd := exec.Command("./scan1.sh")
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, err
    }
    if err := cmd.Start(); err != nil {
        return nil, err
    }
    b, err := ioutil.ReadAll(stdout)
    if err != nil {
        return nil, err
    }
    return b, nil
}

答え:

exec.Cmd.Wait を呼び忘れているのでリソースリークします。具体的には、実行元プロセスが終了するまでゾンビプロセスが残り続けます。

これを避けるには、defer を使って exec.Cmd.Wait を必ず呼び出すようにします。Wait もエラーを返しうるのでこれを正しくハンドルするのはそこそこ面倒で、たとえば次のように名前付き戻り値を使って defer 内でエラーを書き換える方法などがあります。

func Scan() (b []byte, err error) {
    cmd := exec.Command("./scan1.sh")
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, err
    }
    if err := cmd.Start(); err != nil {
        return nil, err
    }
    defer func() {
        if werr := cmd.Wait(); werr != nil && err == nil {
            err = werr
        }
    }()
    b, err = ioutil.ReadAll(stdout)
    if err != nil {
        return nil, err
    }
    return b, nil
}

面倒すぎますね! 今回のように標準出力結果だけに興味がある場合はexec.Cmd.Start ではなく exec.Cmd.Output を使ったほうがリソースリークの危険もなく圧倒的に短く記述できます。

func Scan() ([]byte, error) {
    return exec.Command("./scan1.sh").Output()
}

一般論として、exec.Cmd.Run, exec.Cmd.Output, exec.Cmd.CombinedOutput で済む場合はそちらを優先して使い、exec.Cmd.Start は本当に必要な場合だけ使うようにしましょう。

問題 2/4

シェルスクリプト scan2.sh は実行中、一般のログを標準出力に、エラーログを標準エラー出力に書き出します。エラーメッセージはすべて "ERROR" という文字列を含み、一般のログに "ERROR" という文字列は含まれないとします。

以下は scan2.sh を実行してエラーが発生したかどうかをチェックするプログラムですが、バグがあります。どのようなバグでしょうか?

func Scan() error {
    cmd := exec.Command("./scan2.sh")
    b, err := cmd.CombinedOutput()
    if err != nil {
        return err
    }
    if strings.Contains(string(b), "ERROR") {
        return errors.New("output contains ERROR")
    }
    return nil
}

答え:

exec.Cmd.CombinedOutput は標準出力と標準エラー出力を一つにまとめたものを返す関数です。2つの出力がどう混ぜられるかは定義されていないので、標準エラー出力に "ERROR" と書かれてもそれが検出できない可能性があります。

たとえばシェルスクリプトが標準出力に "ok", 標準エラー出力に "ERROR" という文字列をそれぞれ書き出した時に、CombinedOutput が "ERRoOkR" という文字列を返したとしても文句は言えないということです。そうなると上記のコードはエラーを検出することに失敗します。

もちろんこれは極端な例で、現実的な CombinedOutput の実装ではこのような短い出力の場合には変な混ぜ方をしない可能性が高いですが、特に出力サイズが大きい時に標準出力と標準エラー出力を行ごとにマージすることを期待することはできません。

よって、一般論として CombinedOutput の結果をパースしてはいけません。人間が見るデバッグログを取る用途なら良いでしょう。

今回の場合、標準エラー出力だけが欲しいので、exec.Cmd.Stderrbytes.Buffer を設定した上で exec.Cmd.Run を呼んでやるのが良いです。

func Scan() error {
    cmd := exec.Command("./scan2.sh")
    var buf bytes.Buffer
    cmd.Stderr = &buf
    if err := cmd.Run(); err != nil {
        return err
    }
    if strings.Contains(buf.String(), "ERROR") {
        return errors.New("output contains ERROR")
    }
    return nil
}

しかしそもそもの話でいうと、エラーを検出するためにログをスクレイプするのは脆弱です。スクリプトに変更が加えられるなら、エラーが起こった際には別の終了コードを返すようにしたほうが安全でしょう。

問題 3/4

シェルスクリプト scan3.sh は実行中にログを標準出力と標準エラー出力に書き出しますが、よく分からない理由により、出力はすべて各バイトを 0x55 で XOR することにより暗号化(?) されています。

以下は scan3.sh を実行して XOR 暗号化を解除しつつ標準出力と標準エラー出力をまとめたものを返すコードですが、バグがあります。どのようなバグでしょうか?

type xorWriter struct {
    w io.Writer
}

func (x *xorWriter) Write(p []byte) (int, error) {
    q := make([]byte, len(p))
    for i, b := range p {
        q[i] = b ^ 0x55
    }
    return x.w.Write(q)
}

func Scan() ([]byte, error) {
    cmd := exec.Command("./scan3.sh")
    var buf bytes.Buffer
    cmd.Stdout = &xorWriter{&buf}
    cmd.Stderr = &xorWriter{&buf}
    if err := cmd.Run(); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

答え:

exec.Cmd.Stdout と exec.Cmd.Stderr は別々のゴルーチンから同時に書き込みがされる可能性があります。xorWriter はスレッドセーフですが bytes.Buffer はそうではないので、データレースが発生します。

この問題は exec.Cmd.Stdout と exec.Cmd.Stderr にまったく同一の xorWriter を設定することで解決できます。

func Scan() ([]byte, error) {
    cmd := exec.Command("./scan3.sh")
    var buf bytes.Buffer
    xor := &xorWriter{&buf}
    cmd.Stdout = xor
    cmd.Stderr = xor
    if err := cmd.Run(); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

これは、exec.Cmd.Stdout と exec.Cmd.Stderr が比較可能で同一の値の場合、同時に2つ以上のゴルーチンから書き込みがされないことが保証されているからです。

If Stdout and Stderr are the same writer, and have a type that can be compared with ==, at most one goroutine at a time will call Write. https://godoc.org/os/exec#Cmd

問題 4/4

シェルスクリプト scan4.sh は標準入力から読み込んだ行ごとに何らかの処理を行い標準出力に結果を書き出します。

以下は scan4.sh を実行してその結果を返すコードですが、バグがあります。どのようなバグでしょうか?

func Scan(input []byte) (b []byte, err error) {
    cmd := exec.Command("./scan4.sh")

    stdin, err := cmd.StdinPipe()
    if err != nil {
        return nil, err
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, err
    }

    if err := cmd.Start(); err != nil {
        return nil, err
    }
    defer func() {
        if werr := cmd.Wait(); werr != nil && err == nil {
            err = werr
        }
    }()

    if _, err := stdin.Write(input); err != nil {
        return nil, err
    }
    if err := stdin.Close(); err != nil {
        return nil, err
    }

    b, err = ioutil.ReadAll(stdout)
    return b, err
}

答え:

標準入力に書き込む間、標準出力から読み出していません。そのためパイプにデータが詰まりデッドロックする可能性があります。

説明のために scan4.sh は標準入力から読み取ったデータをそのまま標準出力に書き出すプログラムだったとしましょう。Go プログラムが標準入力に書き込んだデータは scan4.sh により読み出され、そのまま標準出力に書き出されていくことになりますが、Go プログラムは標準出力からデータを読み出していないため、データが標準出力パイプに溜まっていくことになります。

Linux のパイプは固定長のサイズを持っており、多くの場合デフォルトは 64KB です(pipe(7))。標準出力パイプのキャパシティに達すると scan4.sh の標準出力への書き込みはブロックし、scan4.sh の動作が停止します。

標準入力からの読み出しがストップすると、次は標準入力パイプにデータが溜まり始めます。これがキャパシティに達した時、Go プログラムの標準入力への書き込みがブロックします。デッドロックの完成です。

この問題を回避するには、exec.Cmd.StdinPipeexec.Cmd.StdoutPipe でパイプを明示的に扱う代わりに exec.Cmd.Stdin や exec.Cmd.Stdout を設定して使う、あるいは各パイプを別々のゴルーチンで処理します。標準入力を動的に生成する必要があったりするのでない限り、明示的にパイプを使わない前者の方法が良いでしょう。

まとめ

  • Start/Wait および StdinPipe/StdoutPipe/StderrPipe はできるだけ回避する
    • Run/Output および Stdin/Stdout/Stderr の方が圧倒的にハマりどころが少ない
    • プロセスとインタラクティブに通信したい場合などは例外
    • パイプを明示的に扱うなら、パイプごとにゴルーチンを作ること
  • CombinedOutput の結果をパースしてはいけない
  • やっぱり並行処理は人類には早い

Advent of Code 2019 が楽しかったので布教したい

今年のゴールデンウィークは暇だなぁと思いながらネットを巡回していた時、ふと見つけた Advent of Code 2019 が思いのほか面白く、2日間ほどかけて一気に解いてしまいました。

終わったあとふと TwitterGoogle で検索をかけてみたところ、日本語圏では言及がほとんどなかったので、この面白さをもっとたくさんの人々に共有したいと思い記事を書きました。

Advent of Code とは

まずは簡単に形式的な説明を。

Advent of CodeEric Wastl 氏が2015年から毎年12月に開催している、プログラミングコンテストアドベントカレンダーを融合したようなイベントです。

開催期間中の12月1日から25日にかけて、プログラミングで解く問題が1日あたり2問ずつ、合計50問投稿されます。

リアルタイムで参加する場合は、解く速さに応じてポイントが割り振られ、ランキングがつけられるようです。しかしウェブサイトは常時動いているので、イベント後もいつでも問題を解くことが出来ます。

問題には入力データファイルが添付されており、参加者は自分で書いた解答プログラムをそのデータファイルに対してローカル環境で走らせます。出力をウェブサイトのフォームから送信し、一致していれば問題クリアです。サーバ上でプログラムを評価する形式ではないので、好きなプログラミング言語やライブラリを使うことが出来ますし、デバッグも比較的容易です。

何が面白いのか

私が Advent of Code をやってみて面白いと感じたのは、大きく分けて次の3点です。

  1. 作り込まれたゲーム性
  2. 1日2問ずつ出題される問題
  3. バラエティに富んだ問題セット

作り込まれたゲーム性

f:id:nya3jp:20200506131133p:plain
2019年のカレンダー(初期状態)

前述の通り、Advent of Code はプログラミングコンテストであると同時にアドベントカレンダーです。

アドベントカレンダー (Advent calendar) は、クリスマスまでの期間に日数を数えるために使用されるカレンダーである。待降節の期間(イエス・キリストの降誕を待ち望む期間)に窓を毎日ひとつずつ開けていくカレンダーである。すべての窓を開け終わるとクリスマスを迎えたことになる。

アドベントカレンダー - Wikipedia

ウェブサイトの問題一覧 は ASCII アートで書かれたアドベントカレンダーになっています。最初は地味ですが、問題を1問解くごとに星⭐が1個カレンダーに追加され、デザインも少しずつカラフルに変化していきます。最終的にすべての問題(50問)を解いてクリアするとカレンダーが完成するというわけです。この少しずつ出来上がっていくカレンダーを眺めるのが一つの楽しみになっています。

また、すべての問題がクリスマスにちなんだ一貫したストーリーに沿っており、問題文自体が楽しい読み物になっています。Advent of Code 2019 第1問 は次のように始まります。

他の惑星にプレゼントを届ける途中、太陽系の辺境でサンタが遭難してしまった! ワープ装置を使って地球に戻るためには、50個の星を観測して現在位置を正しく計算しなければならない。クリスマスを無事に迎えるために、50個の星のデータを集めてサンタに届けてほしい。

Santa has become stranded at the edge of the Solar System while delivering presents to other planets! To accurately calculate his position in space, safely align his warp drive, and return to Earth in time to save Christmas, he needs you to bring him measurements from fifty stars.

このあと主人公は地球をロケットで飛び立ち、太陽系の様々な星に立ち寄りながらサンタを救出しに行くこととなります。果たしてサンタの宇宙船で何があったのか? それは全ての問題を解くと明らかになります。

1日2問ずつ出題される問題

前述した通り、問題は1日あたり2問ずつ出題されます。

最初は各日あたり1問目のみ見ることが出来るようになっています。1問目を正解することで初めて2問目が見られるようになります。

2問は密接に繋がっており、入力データはまったく同じものを使います。実際、ほとんどの場合、1問目の解答プログラムの大部分を2問目の解答プログラムに再利用することができます。

2問の関係は様々です。1問目では入力データを正しく解釈できていることを確認するのみで、2問目で本来のタスクを解く、ということもあります。問題の仕様は同じだが入力データを10,000倍に膨らませて解けと言われることもあります。時には入力の一部を書き換える指示が出て、問題の性質が大きく変わってしまうこともあります。

1問目の解答プログラムを書いている間は2問目の問題が分からないので、2問目では問題がどのように変わるだろう、と考えながら解答プログラムを設計するのも楽しみの一つです。

バラエティに富んだ問題セット

問題のジャンルの広さも大きな魅力です。特に2019年の問題セットは特徴的になっています。

2019年の問題セットの半分ほどはクラシックなアルゴリズム問題からなっています。シミュレーション・グラフ・計算幾何・数論などからの出題があります。一般的なプログラミングコンテストに比べると難易度は高くないように感じますが、前述した2問制により面白さが増しています。

残りの半分は Intcode という独自に定義されたプログラムを使った問題です。問題セットの序盤で数問に渡って Intcode の仕様が与えられ、処理系を実装させられます。その後の問題では入力データファイルとして Intcode のプログラムが与えられ、それを自分で実装した処理系で実行してタスクをこなすこととなります。

たとえば 13日目の問題 Care Package ではブロック崩しのプログラムが Intcode で与えられます。このプログラムを実行しているところを動画にしてツイートしてくださった方がいました。

このゲーム、人力でプレイすることも出来るのですが、すべてのブロックを消してクリアするにはとても長い時間がかかります。ゲームプレイを自動化するプログラムを書く必要があるというわけです。

まとめ

というわけで、Advent of Code の魅力を紹介しました。

プログラミングの問題を楽しく解けるようにとても上手くデザインされたイベントだと思いますので、時間のあるときに是非トライしてみてください。

私もまだ 2019 年の問題セットしかやっていないので、過去の問題セットもやってみようと思います。

参考

Python に end を導入してみる

この記事は Python Advent Calendar 2016 9日目の記事です。

モチベーション

みなさん御存知の通り、Python は文法上のブロックがインデントによって表現されています。たとえば次のような 3 重ループを含む Python プログラムを考えてみましょう。

def warshall_floyd(g):
    n = len(g)
    for j in range(n):
        for i in range(n):
            for k in range(n):
                g[i][k] = min(g[i][k], g[i][j] + g[j][k])
    return g[0][n - 1]

for ループの終了はインデントが減ることで表現されています。これによりブロックが簡潔に表現できるなどの利点はある一方で、複数レベルのブロックが一度に終わった時いくつのレベルが終わったか分かりづらいのが難点の一つでもあります。

ところで Ruby ではブロックの終了を end というキーワードで表現します。

n = g.length
n.times do |j|
  n.times do |i|
    n.times do |k|
      g[i][k] = [g[i][k], g[i][j] + g[j][k]].min
    end
  end
end

(注: 私は Ruby にまったく詳しくないのでこの素人っぽいコードにはあまり深く突っ込まないでください……) これなら3つのブロックが終了したことが明確ですね。

では Python にも end を導入すれば Ruby みたいにブロックの終了が見やすくなるのでは? という小学生並みの発想により、そのようなモジュールをメタプログラミング的に作ってみました、というのが今回の記事の内容になります。

使用例

何はともあれ、使用例です。ちなみにこのモジュールは PyPI に登録してあるので、pip install end で簡単にインストールできます。

import end

def func():
    for i in range(3):
        print(i)
    end
end

with open('a.txt') as f:
    print(f.read())
end

try:
    time.sleep(3)
except KeyboardInterrupt:
    print('Interrupted')
finally:
    print('Slept')
end

if 28 > 3:
    print('nya-n')
# end がここに無いので SyntaxError が投げられる!

スクリプト内で import end と書くことにより、そのスクリプト内でのみチェックが有効になり、ブロックの終了を end で示すことが強制されますend をブロックの末尾に書き忘れたり、ブロックの末尾でないところに end を置くと SyntaxError が発生します

end モジュールのソースコードhttps://github.com/nya3jp/end に置いてあります。ちなみに end モジュール自身とそのユニットテストも end を利用して書かれていますので、もっと実用的(?)なコードでの使用例はそちらで見ることが出来ます。

実装方法

さて、以下では end モジュールをどのように実装したかを解説していきます。解説で用いる Python 処理系は CPython 3.3+ を仮定します。CPython 2.7 でも動作しますが、標準モジュールの名前などに多少変更があります。

基本的なアイデア

end なるキーワードは Python の文法に存在しないので、何も考えずに end と書くと NameError が発生します。まずはこれを回避したいわけですが、これは簡単で、単にグローバル変数として end という名前のものを定義してあげればいいだけです。

end = object()

def main():
    for i in range(3):
        print(i)
    end  # OK
end  # OK

変数 end の中身はなんでも構いません。そこで end.py という空のファイルを作成し、import end としてやることにします。すると end はモジュールを指すグローバル変数となり、先ほどの例と同様に end と書くことができるようになります。

# end.py
# ここにはまだ何も書いていない
# sample.py
import end

def main():
    for i in range(3):
        print(i)
    end  # OK
end  # OK

とりあえずこれだけでも見た目はそれっぽいのですが、このままでは end と書き忘れても何も怒られません。エラーが出ないと誰も end なんて書かないのであまり意味がないですね。

そこで、end.py に色々書くことにより、end モジュールが import された時に import 元のモジュール(上記の例の場合は sample.py) に対して文法チェックを走らせることにします。これは大まかに言って次の2ステップからなります。

  • end.py が import されるたびに import 元のソースコードを読み出す
  • 読み出したソースコードを文法解析して end が正しく使われているかチェックする

それぞれ解説していきます。

import 元のソースコードを読む

まずは import 元のソースコードを読むことを考えましょう。

import のフック

Python の import 文はいろいろな方法でフックすることができます。大まかに言って 2 種類くらい方法があります。

  1. ビルトイン関数 __import__() を置き換える
  2. sys.meta_path に meta path loader を登録する

実は import 文は __import__() というビルトイン関数の呼び出しの単なるラッパーです。なので、この関数を自分で定義したものに置き換えることで import をフックすることができます。これが 1 番目の方法になります。

また、デフォルトの __import__() ビルトイン関数はユーザがカスタマイズ可能なフックの手段を用意しており、それが 2 番目の sys.meta_path を使う方法になります。こちらについてはわたしが Python Advent Calendar 2015 で書いた Hacking import mechanism という記事で解説しているので、気になる方はそちらもご覧ください。

今回は 1 番目の __import__() を置き換える方法を利用します。これは、デフォルトの __import__() は meta path loader を呼び出す前に sys.modules に既にモジュールがロードされているかチェックして既にロードされていればそれを返すようなキャッシュ機構を備えているため、2 番目の sys.meta_path による方法では end モジュールの 2 回目以降の import をフックするのが難しいからです。

というわけで肩慣らしに __import__() ビルトイン関数を置き換えてみましょう。

# first_hack.py
import builtins
import functools

original_import = builtins.__import__

@functools.wraps(original_import)
def my_import(name, *args, **kwargs):
    print('import is hacked! name=%r' % name)
    return original_import(name, *args, **kwargs)

builtins.__import__ = my_import

import os  # => import is hacked! name="os"

Python 3 ではビルトイン関数は builtins モジュールに入っています(Python 2 では __builtin__)。この中にある __import__() を自分で定義した関数で置き換えることで、あっさりと import 文を乗っ取ることができました。

スタックフレームの走査

import のフックが出来たので、次に import 元がどこかを調べます。これは import が関数呼び出しに帰着される以上、スタックフレームを辿れば分かるはずです。

スタックフレーム(あるいは単にフレーム)は Python インタプリタの実行中、各関数呼び出しの各レベルにおいて今どこが実行されておりどのような環境を持っているかを示しているものです。現在のスタックフレームは inspect.currentframe() 関数で取得することが出来て、バックリンクにより関数の呼び出し元を辿っていくことができます。

これを使って、我々の __import__() を呼び出したコードのファイル名と呼び出し元の行数を出力してみましょう。

# second_hack.py
import builtins
import functools
import inspect

original_import = builtins.__import__

@functools.wraps(original_import)
def my_import(name, *args, **kwargs):
    caller = inspect.currentframe().f_back
    print('called from %s:%d' % (caller.f_globals['__name__'], caller.f_lineno))
    return original_import(name, *args, **kwargs)

builtins.__import__ = my_import

import os  # => called from second_hack.py:15

inspect.currentframe() で手に入るフレームは my_import() 内を表すフレームなので、f_back を一度辿って呼び出し元のフレームを取得します。フレームからはそこにおけるグローバル変数テーブルが frame.f_globals で参照できるので、これを使って __name__ を読んでソースコードのパスを取得します。ついでに実行中の行番号も frame.f_lineno で取得可能です。フレームオブジェクトに入っている情報の一覧は inspect モジュールのドキュメント に書いてあります。

CPython バイトコードの解析

もう import 元のファイル名が手に入ったように見えますが、実はこれでは不完全です。というのも、__import__() をフックしているのが end.py だけであればこのままで十分なのですが、他にもフックしているコードがあった場合、我々の定義した __import__() の呼び出し元は import 文を実行したユーザーコードではなく、他のフックコードになってしまう可能性があるからです。実際、Python 3 でいろいろテストをしていたところ、__import__() を置き換えるコードが標準ライブラリ内にあり、単純に親フレームを見るだけでは正しく import 元を特定できない現象に遭遇しました。

ではスタックフレームの列から import を行っているユーザーコードを正しく特定するにはどうしたらいいでしょうか。これは、各スタックフレームで実行中のバイトコードを解析することで解決します。

スタックフレームには、そのフレームで実行中のバイトコードとインストラクションポインタ(いまどの命令を実行しようとしているかを示すカウンタ)が入っています。これを使って、今まさに import 文を実行しているフレームを探し出すのです。__import__() をフックするようなコードはその性質上 import 文ではなく関数呼び出しを実行しているはずなので、そのようなフレームはスキップすることができます。

CPython には dis というモジュールが用意されており、これを使ってバイトコードを逆アセンブルすることができます。試しに import 文だけを含むような関数 f() を逆アセンブルしてみましょう。

def f():
    import os

これは次のようなバイトコードになります。

 4           0 LOAD_CONST               1 (0)
             3 LOAD_CONST               0 (None)
             6 IMPORT_NAME              0 (os)
             9 STORE_FAST               0 (os)
            12 LOAD_CONST               0 (None)
            15 RETURN_VALUE

CPython のバイトコードに馴染みのない方も多いかと思いますが、IMPORT_NAME が import 文に対応する命令であり、そのパラメータ(オペランド)として “os” という文字列が渡されている、ということだけ分かれば十分です。

import 文に対応する命令が分かったので、あとはスタックフレームを走査してバイトコードを見つつ import end を実行しているフレームを探します。そのコードは以下のような感じになります(注: いろいろ端折っています)。

import dis
import inspect

def find_importer_frame():
    frame = inspect.currentframe()
    while frame:
        code = frame.f_code
        lasti = frame.f_lasti
        if code.co_code[lasti] == dis.opmap['IMPORT_NAME']:
            arg = code.co_code[lasti + 1] + code.co_code[lasti + 2] * 256
            name = code.co_names[arg]
            if name == 'end':
                break
        frame = frame.f_back
    return frame

スタックフレームが特定できれば、ソースコードを取得するのは inspect.getsource(frame) を呼び出すだけで終わりです。

ソースコードの解析

ソースコードが手に入ったので、あとはこれを解析して、end が正しく置かれているかをチェックします。

Python には標準で Python ソースコードを解析して AST (Abstract Syntax Tree) を出力する ast モジュールバンドルされています。これを使うことでコードの解析を手軽に行なうことができます。

たとえばこのようなシンプルなソースコードを考えましょう。

def main():
    for i in range(3):
        print(i)
    print('done')

これは ast.parse() により次のような AST に変換されます。

Module(
  body=[
    FunctionDef(
      name='main',
      args=arguments(args=[], vararg=None, kwonlyargs=[], kw_defaults=[], kwarg=None, defaults=[]),
      body=[
        For(
          target=Name(id='i', ctx=Store()),
          iter=Call(func=Name(id='range', ctx=Load()), args=[Num(n=3)], keywords=[]),
          body=[
            Expr(value=Call(func=Name(id='print', ctx=Load()), args=[Name(id='i', ctx=Load())], keywords=[])),
          ],
          orelse=[]),
        Expr(value=Call(func=Name(id='print', ctx=Load()), args=[Str(s='done')], keywords=[])),
       ],
       decorator_list=[],
       returns=None),
  ],
)

Module.body や FunctionDef.body がブロックの中身に対応しています。また、単に end と書くと AST では Expr(value=Name(id='end', ctx=Load())) と表現されます。よって、ブロックを見つけたらその次に end が来ること、またそれ以外のところで end が単体で現れないことを確認すれば終わりです。Python の文法要素すべてに対して場合分けをしたりしないといけないので地味に面倒な作業ではありますが、やるだけなので頑張ります。チェックに失敗した場合は SyntaxError を raise します。

以上で import 元の文法チェックが出来るようになりました!

だいたい完成

end.py が初めて import された時は、まず __import__() フックをインストールし、その場で呼び出し元の文法チェックを実行します。以降は import end が実行される度に __import__() フック内から文法チェックを呼び出します。

これでほとんど完成です。細かなところは端折っているので、詳しくは実際のソースコードをご覧ください: https://github.com/nya3jp/end/blob/master/end.py

落ち穂拾い

この end チェックは実は不完全で、正しく解析できないケースがあることが分かっています。一番の問題は一行にブロックを押し込めた時 (if a > 28: a = 3 など) の扱いで、Python物理行・論理行の仕様などを考えると字句解析レベルの情報が必要になり、ast モジュールの出力だけではどうやっても正しく解析できません。

あと、そもそもソースコードがない場合はどうにもなりません。コンパイル済みのバイトコードを実行している時とか、REPL を実行している時とかですね。REPL で使えないのはちょっと悲しい。

まとめ

以上、Python に end を導入する話でした。end がソースコードの見やすさに与える影響はさておいて、Python における(変な)メタプログラミングのサンプルとしては多少は役に立つかな? と思います。

ちなみに実際に end.py やそのユニットテストを end を使って書いてみたわけですが、感想としては、まぁ確かにブロックの終了は分かりやすくなるものの、すごく野暮ったくなるなという印象でした。別に Ruby を dis っているわけではなくて、見慣れていないせいで Python のコードっぽく見えないというか。あとエディタのシンタックスハイライトはことごとく end を無視するのでそれもマイナスですね。

そして念のため最後にひとこと: end を実用してはいけません。これを仕事のコードに導入したりすると同僚との関係が悪化すること請け合いです :)

参考文献