Python SDKのための性能重視ランタイム型チェック

このブログ投稿では、開発環境と本番環境の両方でパフォーマンスが最適化された、BeamのPython SDK向けの新しいオプトインランタイム型チェックシステムの今後のリリースを発表します。

しかし、一歩引いて考えてみましょう - なぜそもそもランタイム型チェックを気にするのでしょうか?例を見てみましょう。

class MultiplyNumberByTwo(beam.DoFn):
    def process(self, element: int):
        return element * 2

p = Pipeline()
p | beam.Create(['1', '2'] | beam.ParDo(MultiplyNumberByTwo())

このコードでは、明らかに整数用に意図されたDoFnに文字列のリストを渡しました。幸いにも、このコードはパイプライン構築中にエラーをスローします。なぜなら、`beam.Create(['1', '2'])`の推論された出力型は`str`であり、`MultiplyNumberByTwo.process`の宣言された入力型`int`と互換性がないからです。

しかし、`no_pipeline_type_check`フラグを使用してパイプラインの型チェックをオフにした場合はどうでしょうか?あるいは、より現実的には、`MultiplyNumberByTwo`への入力PCollectionがデータベースから到着した場合、出力データ型はランタイムでしか知ることができません。

いずれの場合も、パイプライン構築中にエラーはスローされません。そして、ランタイムにおいても、このコードは機能します。各文字列は2倍になり、`['11', '22']`という結果が得られますが、それは確かに私たちが望む結果ではありません。

では、この種の「隠れた」エラーをどのようにデバッグするのでしょうか?より広義には、Beamにおける型付けエラーやシリアライゼーションエラーをどのようにデバッグするのでしょうか?

答えは、ランタイム型チェックを使用することです。

ランタイム型チェック(RTC)

この機能は、パイプラインの実行中に実際の入力値と出力値が宣言された型制約を満たしているかどうかをチェックすることによって機能します。`runtime_type_check`をオンにして前のコードを実行すると、次のエラーメッセージが表示されます。

Type hint violation for 'ParDo(MultiplyByTwo)': requires <class 'int'> but got <class 'str'> for element

これは実行可能なエラーメッセージです。コードにバグがあるか、宣言された型ヒントが間違っているかのどちらかを教えてくれます。十分に簡単そうに聞こえますが、落とし穴は何でしょうか?

非常に遅いです。自分で確かめてください。

要素サイズ通常のパイプラインランタイム型チェックパイプライン
15.3秒5.6秒
2,0019.4秒57.2秒
10,00124.5秒259.8秒
18,00138.7秒450.5秒

このマイクロベンチマークでは、ランタイム型チェックを使用したパイプラインは10倍以上遅く、入力PCollectionのサイズが大きくなるにつれてその差は大きくなりました。

では、本番環境で使用できる代替案はあるのでしょうか?

パフォーマンスランタイム型チェック

あります!私たちは、以下の組み合わせを使用してパイプラインの時間計算量への影響を最小限に抑える`performance_runtime_type_check`という新しいフラグを開発しました。

  • 効率的なCythonコード、
  • スマートサンプリングテクニック、および
  • 最適化されたメガ型ヒント。

では、新しい数値はどうでしょうか?

要素サイズ通常RTCパフォーマンスRTC
15.3秒5.6秒5.4秒
2,0019.4秒57.2秒11.2秒
10,00124.5秒259.8秒25.5秒
18,00138.7秒450.5秒39.4秒

平均して、新しいパフォーマンスRTCは通常のパイプラインよりも4.4%遅く、古いRTCは900%以上遅いです!さらに、入力PCollectionのサイズが大きくなるにつれて、パフォーマンスRTCシステムの設定の固定コストは各要素に分散され、全体的なパイプラインへの相対的な影響は減少します。18,001個の要素では、差は1秒未満です。

どのように動作するのでしょうか?

このパフォーマンスの向上には、3つの重要な要因が関係しています。

  1. すべての値の型チェックを行う代わりに、統計におけるサンプルとして知られる値のサブセットのみを型チェックします。最初はかなりの数の要素をサンプリングしますが、時間の経過とともに要素の型が変化しないという確信度が高まると、サンプリングレートを下げます(固定最小値まで)。

  2. 古いRTCシステムは型チェックを実行するために重いラッパーを使用していましたが、新しいRTCシステムは型チェックをCythonで最適化された、デコレーションされていないコードベースの部分に移動します。参考までに、CythonはPythonコードにCのようなパフォーマンスを与えるプログラミング言語です。

  3. 最後に、変換の出力値のみを型チェックするために、単一のメガ型ヒントを使用します。入力値と出力値を個別に型チェックする必要はありません。このメガ型ヒントは、元の変換の出力型制約とすべてのコンシューマー変換の入力型制約で構成されています。このメガ型ヒントを使用することで、オーバーヘッドを削減しながら、より実行可能なエラーをスローすることができます。たとえば、次のエラー(古いRTCシステムから生成されたもの)を考えてみましょう。

Runtime type violation detected within ParDo(DownstreamDoFn): Type-hint for argument: 'element' violated. Expected an instance of <class ‘str’>, instead found 9, an instance of <class ‘int’>.

このエラーは、`DownstreamDoFn`が`str`を期待していたのに`int`を受け取ったことを教えてくれますが、そもそもその`int`を作成したのは誰なのかを教えてくれません。この`int`の原因となっている、問題のあるアップストリーム変換はどこにあるのでしょうか?おそらく、その変換の出力型ヒントは広すぎたり(例:`Any`)、存在しなかったりしたため、その出力のランタイム型チェック中にエラーはスローされませんでした。

ここでの問題は、コンテキストの不足に帰着します。型チェックを行う際にコンシューマーが誰であるかを知っていれば、出力値を出力型制約とすべてのコンシューマーの入力型制約に対して同時に型チェックして、ミスマッチの可能性が全くないかどうかを確認できます。これがまさにメガ型ヒントが行うことであり、例外発生時ではなく宣言時にエラーをスローできるため、貴重な時間を節約し、より高品質のエラーメッセージを提供できます。

では、パフォーマンスRTCを使用した同じエラーはどう見えるでしょうか?まったく同じ文字列ですが、1行追加されています。

[while running 'ParDo(UpstreamDoFn)']

そして、それは調査にとってはるかに実行可能です :)

次のステップ

新しい`performance_runtime_type_check`機能を試してみてください!

実験段階なので、問題が発生した場合はお知らせください