Direct Runnerの使用
- Java SDK
- Python SDK
Direct Runnerは、パイプラインをローカルマシンで実行し、Apache Beamモデルにできるだけ厳密に準拠していることを検証するように設計されています。効率的なパイプライン実行に焦点を当てるのではなく、Direct Runnerは、ユーザーがモデルで保証されていないセマンティクスに依存しないように追加のチェックを実行します。これらのチェックには、以下が含まれます。
- 要素の不変性の強制
- 要素のエンコード可能性の強制
- すべてのポイントで要素が任意の順序で処理される
- ユーザー関数のシリアル化 (
DoFn
、CombineFn
など) 詳細はDoFnsのシリアライズ可能性を参照してください。
Direct Runnerを使用してテストと開発を行うと、さまざまなBeam Runner間でパイプラインが堅牢であることが保証されます。さらに、パイプラインがリモートクラスタで実行されると、失敗した実行のデバッグは非自明なタスクになる可能性があります。代わりに、パイプラインコードでローカル単体テストを実行する方が、多くの場合、迅速かつ簡単です。ローカルでパイプラインの単体テストを行うと、好みのローカルデバッグツールを使用することもできます。
パイプラインのテスト方法に関する情報を提供するリソースをいくつか紹介します。
- パイプラインのテスト
- Apache Beamにおける無界パイプラインのテストでは、パイプラインのテストにJavaクラスPAssertとTestStreamを使用する方法について説明しています。
- Apache Beam WordCountチュートリアルには、PAssertを使用したパイプラインのログ記録とテストの例が含まれています。
- Apache Beam WordCountチュートリアルには、
assert_that
を使用したパイプラインのログ記録とテストの例が含まれています。
Direct Runnerは、パフォーマンスよりも正確性を重視しているため、本番パイプラインには設計されていません。Direct Runnerは、すべてのユーザーデータをメモリに収容する必要がありますが、FlinkおよびSpark Runnerは、メモリに収容できない場合はデータをディスクにスプールできます。その結果、FlinkおよびSpark Runnerはより大規模なパイプラインを実行でき、本番ワークロードに適しています。
Direct Runnerの前提条件と設定
依存関係の指定
Javaを使用する場合は、`pom.xml`でDirect Runnerへの依存関係を指定する必要があります。
このセクションは、Python用のBeam SDKには適用されません。
Direct Runnerのパイプラインオプション
パイプラインオプションの設定方法に関する一般的な手順については、プログラミングガイドを参照してください。
コマンドラインからパイプラインを実行する場合は、`runner`を`direct`または`DirectRunner`に設定します。他のパイプラインオプションのデフォルト値は、一般的に十分です。
デフォルト値と追加のパイプライン構成オプションについては、DirectOptions
DirectOptions
インターフェースのリファレンスドキュメントを参照してください。
追加情報と注意点
メモリに関する考慮事項
ローカル実行は、ローカル環境で使用可能なメモリによって制限されます。ローカルメモリに収まる程度の小さなデータセットを使用してパイプラインを実行することを強くお勧めします。Create
Create
変換を使用して小さなインメモリデータセットを作成するか、Read
Read
変換を使用して、小さなローカルファイルまたはリモートファイルを使用できます。
ストリーミング実行
Python DirectRunnerのストリーミングサポートは限定的です。既知の問題については、https://github.com/apache/beam/issues/24528を参照してください。
パイプラインが無界のデータソースまたはシンクを使用する場合は、`streaming`オプションを`true`に設定する必要があります。
並列実行
Python FnApiRunnerは、マルチスレッドとマルチプロセッシングモードをサポートしています。
並列度の設定
ワーカスレッドの数は、`targetParallelism`パイプラインオプションで定義されます。デフォルトでは、`targetParallelism`は、使用可能なプロセッサ数と3のうち大きい方の値になります。
スレッドまたはサブプロセスの数は、`direct_num_workers`パイプラインオプションを設定することで定義されます。2.22.0以降、`direct_num_workers = 0`がサポートされています。`direct_num_workers`を0に設定すると、スレッド/サブプロセスの数が、パイプラインを実行しているマシンのコア数に設定されます。
実行モードの設定
Beam 2.19.0以降では、`direct_running_mode`パイプラインオプションを使用して実行モードを設定できます。`direct_running_mode`には、[ `'in_memory'`、`'multi_threading'`、`'multi_processing'` ]のいずれかを指定できます。
in_memory: Runnerとワーカー間の通信はメモリ内で行われます(gRPC経由ではありません)。これはデフォルトモードです。
multi_threading: RunnerとワーカーはgRPCを介して通信し、各ワーカーはスレッドで実行されます。
multi_processing: RunnerとワーカーはgRPCを介して通信し、各ワーカーはサブプロセスで実行されます。
パイプラインをリモートRunnerにデプロイする前に
Direct Runnerでのテストは便利ですが、特にランタイム環境関連の問題については、Beamモデルのセマンティクスを超えて、リモートRunnerとは異なる動作をする可能性があります。一般的に、本番環境に完全にデプロイする前に、ターゲットとするリモートRunnerで小規模にパイプラインをテストすることをお勧めします。
最終更新日: 2024/10/31
必要な情報は見つかりましたか?
すべて役立ち、分かりやすかったですか?変更してほしい点があれば教えてください!