Direct Runnerの使用

Direct Runnerは、パイプラインをローカルマシンで実行し、Apache Beamモデルにできるだけ厳密に準拠していることを検証するように設計されています。効率的なパイプライン実行に焦点を当てるのではなく、Direct Runnerは、ユーザーがモデルで保証されていないセマンティクスに依存しないように追加のチェックを実行します。これらのチェックには、以下が含まれます。

Direct Runnerを使用してテストと開発を行うと、さまざまなBeam Runner間でパイプラインが堅牢であることが保証されます。さらに、パイプラインがリモートクラスタで実行されると、失敗した実行のデバッグは非自明なタスクになる可能性があります。代わりに、パイプラインコードでローカル単体テストを実行する方が、多くの場合、迅速かつ簡単です。ローカルでパイプラインの単体テストを行うと、好みのローカルデバッグツールを使用することもできます。

パイプラインのテスト方法に関する情報を提供するリソースをいくつか紹介します。

Direct Runnerは、パフォーマンスよりも正確性を重視しているため、本番パイプラインには設計されていません。Direct Runnerは、すべてのユーザーデータをメモリに収容する必要がありますが、FlinkおよびSpark Runnerは、メモリに収容できない場合はデータをディスクにスプールできます。その結果、FlinkおよびSpark Runnerはより大規模なパイプラインを実行でき、本番ワークロードに適しています。

Direct Runnerの前提条件と設定

依存関係の指定

Javaを使用する場合は、`pom.xml`でDirect Runnerへの依存関係を指定する必要があります。

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.60.0</version>
   <scope>runtime</scope>
</dependency>

このセクションは、Python用のBeam SDKには適用されません。

Direct Runnerのパイプラインオプション

パイプラインオプションの設定方法に関する一般的な手順については、プログラミングガイドを参照してください。

コマンドラインからパイプラインを実行する場合は、`runner`を`direct`または`DirectRunner`に設定します。他のパイプラインオプションのデフォルト値は、一般的に十分です。

デフォルト値と追加のパイプライン構成オプションについては、DirectOptions DirectOptionsインターフェースのリファレンスドキュメントを参照してください。

追加情報と注意点

メモリに関する考慮事項

ローカル実行は、ローカル環境で使用可能なメモリによって制限されます。ローカルメモリに収まる程度の小さなデータセットを使用してパイプラインを実行することを強くお勧めします。CreateCreate変換を使用して小さなインメモリデータセットを作成するか、ReadRead変換を使用して、小さなローカルファイルまたはリモートファイルを使用できます。

ストリーミング実行

Python DirectRunnerのストリーミングサポートは限定的です。既知の問題については、https://github.com/apache/beam/issues/24528を参照してください。

パイプラインが無界のデータソースまたはシンクを使用する場合は、`streaming`オプションを`true`に設定する必要があります。

並列実行

Python FnApiRunnerは、マルチスレッドとマルチプロセッシングモードをサポートしています。

並列度の設定

ワーカスレッドの数は、`targetParallelism`パイプラインオプションで定義されます。デフォルトでは、`targetParallelism`は、使用可能なプロセッサ数と3のうち大きい方の値になります。

スレッドまたはサブプロセスの数は、`direct_num_workers`パイプラインオプションを設定することで定義されます。2.22.0以降、`direct_num_workers = 0`がサポートされています。`direct_num_workers`を0に設定すると、スレッド/サブプロセスの数が、パイプラインを実行しているマシンのコア数に設定されます。

実行モードの設定

Beam 2.19.0以降では、`direct_running_mode`パイプラインオプションを使用して実行モードを設定できます。`direct_running_mode`には、[ `'in_memory'`、`'multi_threading'`、`'multi_processing'` ]のいずれかを指定できます。

in_memory: Runnerとワーカー間の通信はメモリ内で行われます(gRPC経由ではありません)。これはデフォルトモードです。

multi_threading: RunnerとワーカーはgRPCを介して通信し、各ワーカーはスレッドで実行されます。

multi_processing: RunnerとワーカーはgRPCを介して通信し、各ワーカーはサブプロセスで実行されます。

パイプラインをリモートRunnerにデプロイする前に

Direct Runnerでのテストは便利ですが、特にランタイム環境関連の問題については、Beamモデルのセマンティクスを超えて、リモートRunnerとは異なる動作をする可能性があります。一般的に、本番環境に完全にデプロイする前に、ターゲットとするリモートRunnerで小規模にパイプラインをテストすることをお勧めします。