Google Cloud Dataflow Runner の使用
- Java SDK
- Python SDK
Google Cloud Dataflow Runner は、Cloud Dataflow マネージドサービスを使用します。Cloud Dataflow サービスでパイプラインを実行すると、ランナーは実行可能なコードと依存関係を Google Cloud Storage バケットにアップロードし、Google Cloud Platform のマネージドリソースでパイプラインを実行する Cloud Dataflow ジョブを作成します。
Cloud Dataflow Runner とサービスは大規模な連続ジョブに適しており、以下の機能を提供します。
Beam 機能マトリックスには、Cloud Dataflow Runner でサポートされている機能が記載されています。
Cloud Dataflow Runner の前提条件と設定
Cloud Dataflow Runner を使用するには、選択した言語のCloud Dataflow クイックスタートの「始める前に」セクションの設定を完了する必要があります。
- Google Cloud Platform Console プロジェクトを選択または作成します。
- プロジェクトの課金を有効にします。
- 必要な Google Cloud API(Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON、Cloud Resource Manager)を有効にします。パイプラインコードで BigQuery、Cloud Pub/Sub、Cloud Datastore などの API を使用する場合は、追加で有効にする必要がある場合があります。
- Google Cloud Platform で認証します。
- Google Cloud SDK をインストールします。
- Cloud Storage バケットを作成します。
依存関係の指定
Java を使用する場合、`pom.xml` で Cloud Dataflow Runner への依存関係を指定する必要があります。
このセクションは、Python 用 Beam SDK には適用されません。
自己実行JAR
このセクションは、Python 用 Beam SDK には適用されません。
Apache AirFlowなどのスケジューラを使用してパイプラインを開始する場合など、自己完結型アプリケーションが必要になる場合があります。前のセクションに示されている既存の依存関係を追加するだけでなく、`pom.xml` のプロジェクトセクションに次の依存関係を明示的に追加することで、自己実行 JAR をパックできます。
次に、Maven JAR プラグインに `mainClass` 名を追加します。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>YOUR_MAIN_CLASS_NAME</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
`mvn package -Pdataflow-runner` を実行した後、`ls target` を実行すると、(アーティファクトID が `beam-examples`、バージョンが 1.0.0 であると仮定すると)次の出力が表示されます。
Cloud Dataflow で自己実行 JAR を実行するには、次のコマンドを使用します。
Cloud Dataflow Runner のパイプラインオプション
Cloud Dataflow Runner(Java)でパイプラインを実行する場合は、以下の一般的なパイプラインオプションを検討してください。 Cloud Dataflow Runner(Python)でパイプラインを実行する場合は、以下の一般的なパイプラインオプションを検討してください。
フィールド | 説明 | デフォルト値 |
---|---|---|
runner | 使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。 | Cloud Dataflow サービスで実行するには、`dataflow` または `DataflowRunner` に設定します。 |
project | Google Cloud プロジェクトのプロジェクトID。 | 設定されていない場合、現在の環境のデフォルトプロジェクトがデフォルトになります。デフォルトプロジェクトは `gcloud` で設定されます。 |
region | ジョブを作成する Google Compute Engine リージョン。 | 設定されていない場合、現在の環境のデフォルトリージョンがデフォルトになります。デフォルトリージョンは `gcloud` で設定されます。 |
streaming | ストリーミングモードが有効か無効か。有効な場合は `true`。非有界 `PCollection` を使用してパイプラインを実行する場合は `true` に設定します。 | false |
`tempLocation` `temp_location` | オプション。 必須。一時ファイルのパス。`gs://` で始まる有効な Google Cloud Storage URL である必要があります。設定されている場合、`tempLocation` は `gcpTempLocation` のデフォルト値として使用されます。 | デフォルト値なし。 |
gcpTempLocation | 一時ファイルの Cloud Storage バケットパス。`gs://` で始まる有効な Cloud Storage URL である必要があります。 | 設定されていない場合、`tempLocation` が有効な Cloud Storage URL であることを条件に、`tempLocation` の値がデフォルトになります。`tempLocation` が有効な Cloud Storage URL でない場合は、`gcpTempLocation` を設定する必要があります。 |
`stagingLocation` `staging_location` | オプション。バイナリと一時ファイルをステージングするための Cloud Storage バケットパス。`gs://` で始まる有効な Cloud Storage URL である必要があります。 | 設定されていない場合、`gcpTempLocation` 内のステージングディレクトリがデフォルトになります。 設定されていない場合、`temp_location` 内のステージングディレクトリがデフォルトになります。 |
save_main_session | `__main__`(例:インタラクティブセッション)で定義された pickle 化された関数とクラスを unpickle できるように、メインセッションの状態を保存します。たとえば、すべての関数/クラスが適切なモジュール(`__main__` ではない)で定義されており、モジュールがワーカーでインポート可能な場合、一部のワークフローはセッション状態を必要としません。 | false |
sdk_location | Beam SDK のダウンロード元のデフォルトの場所をオーバーライドします。この値は、URL、Cloud Storage パス、または SDK tarball へのローカルパスにすることができます。ワークフローの送信は、この場所から SDK tarball をダウンロードまたはコピーします。文字列 `default` に設定すると、標準の SDK の場所が使用されます。空の場合、SDK はコピーされません。 | default |
その他のパイプライン構成オプションについては、DataflowPipelineOptions `PipelineOptions` インターフェース(およびすべてのサブインターフェース)のリファレンスドキュメントを参照してください。
追加情報と注意事項
ジョブのモニタリング
パイプラインの実行中は、Dataflow モニタリングインターフェースまたはDataflow コマンドラインインターフェースを使用して、ジョブの進行状況を監視し、実行の詳細を表示し、パイプラインの結果に関する更新を受信できます。
ブロッキング実行
ジョブが完了するまでブロックするには、`pipeline.run()` から返された `PipelineResult` で `waitToFinish``wait_until_finish` を呼び出します。Cloud Dataflow Runner は、待機中にジョブステータスの更新とコンソールメッセージを出力します。結果がアクティブなジョブに接続されている間は、コマンドラインから **Ctrl + C** を押してもジョブはキャンセルされないことに注意してください。ジョブをキャンセルするには、Dataflow モニタリングインターフェースまたはDataflow コマンドラインインターフェースを使用します。
ストリーミング実行
パイプラインで非有界データソースまたはシンクを使用する場合は、`streaming` オプションを `true` に設定する必要があります。
ストリーミング実行を使用する場合は、以下の考慮事項に留意してください。
ストリーミングパイプラインは、ユーザーが明示的にキャンセルしない限り終了しません。ストリーミングジョブは、Dataflow モニタリングインターフェースから、またはDataflow コマンドラインインターフェース(gcloud dataflow jobs cancel コマンド)を使用してキャンセルできます。
ストリーミングジョブは、デフォルトで Google Compute Engine マシンプラス `n1-standard-2` 以上を使用します。`n1-standard-2` はストリーミングジョブを実行するために必要な最小マシンプラスであるため、これをオーバーライドしないでください。
ストリーミング実行の料金はバッチ実行とは異なります。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!