パイプラインの作成
Beamプログラムは、開始から終了までのデータ処理パイプラインを表現します。このセクションでは、Beam SDKのクラスを使用してパイプラインを構築するメカニズムについて説明します。Beam SDKのクラスを使用してパイプラインを構築するには、プログラムで次の一般的な手順を実行する必要があります。
Pipelineオブジェクトを作成します。- ReadまたはCreate変換を使用して、パイプラインデータ用に1つ以上の
PCollectionを作成します。 - 各
PCollectionに変換を適用します。変換は、PCollection内の要素を変更、フィルタリング、グループ化、分析、またはその他の方法で処理できます。各変換は新しい出力PCollectionを作成し、処理が完了するまで、追加の変換を適用できます。 - 最終的な変換された
PCollectionを書き込むか、または出力します。 - パイプラインを実行します。
パイプラインオブジェクトの作成
Beamプログラムは、多くの場合、Pipelineオブジェクトを作成することから始まります。
Beam SDKでは、各パイプラインは型がPipelineの明示的なオブジェクトで表されます。各Pipelineオブジェクトは独立したエンティティであり、パイプラインが操作するデータと、そのデータに適用される変換の両方をカプセル化します。
パイプラインを作成するには、Pipelineオブジェクトを宣言し、構成オプションを渡します。
パイプラインへのデータの読み込み
パイプラインの最初のPCollectionを作成するには、ルート変換をパイプラインオブジェクトに適用します。ルート変換は、外部データソースまたは指定したローカルデータからPCollectionを作成します。
Beam SDKには、ReadとCreateの2種類のルート変換があります。Read変換は、テキストファイルやデータベーステーブルなどの外部ソースからデータを読み取ります。Create変換は、インメモリのjava.util.CollectionからPCollectionを作成します。
次のコード例は、テキストファイルからデータを読み取るためにTextIO.Readルート変換をapplyする方法を示しています。変換はPipelineオブジェクトpに適用され、PCollection<String>の形式でパイプラインデータセットを返します。
パイプラインデータを処理するための変換の適用
Beam SDKで提供されているさまざまな変換を使用して、データを操作できます。これを行うには、処理する各PCollectionでapplyメソッドを呼び出し、必要な変換オブジェクトを引数として渡すことによって、パイプラインのPCollectionに変換を適用します。
次のコードは、文字列のPCollectionに変換をapplyする方法を示しています。変換は、各文字列の内容を反転させ、反転された文字列を含む新しいPCollectionを出力するユーザー定義のカスタム変換です。
入力はwordsという名前のPCollection<String>です。コードは、ReverseWordsという名前のPTransformオブジェクトのインスタンスをapplyに渡し、戻り値をreversedWordsという名前のPCollection<String>として保存します。
最終的なパイプラインデータの書き込みまたは出力
パイプラインがすべての変換を適用したら、通常は結果を出力する必要があります。パイプラインの最終的なPCollectionを出力するには、そのPCollectionにWrite変換を適用します。Write変換は、PCollectionの要素をデータベーステーブルなどの外部データシンクに出力できます。Writeを使用すると、パイプラインの任意の時点でPCollectionを出力できますが、通常はパイプラインの最後にデータを出力します。
次のコード例は、TextIO.Write変換をapplyして、StringのPCollectionをテキストファイルに書き込む方法を示しています。
パイプラインの実行
パイプラインを構築したら、runメソッドを使用してパイプラインを実行します。パイプラインは非同期で実行されます。作成したプログラムは、パイプラインの仕様をパイプラインランナーに送信し、パイプラインランナーは実際のパイプライン操作のシーケンスを構築して実行します。
runメソッドは非同期です。代わりにブロッキング実行が必要な場合は、waitUntilFinishメソッドを追加してパイプラインを実行します。
次のステップ
- プログラミングガイド - パイプラインの作成、パイプラインオプションの構成、変換の適用について詳しく説明します。
- パイプラインをテストする.
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべて有用で明確でしたか?変更したいことはありますか?お知らせください!

