パイプラインの作成
Beamプログラムは、開始から終了までのデータ処理パイプラインを表現します。このセクションでは、Beam SDKのクラスを使用してパイプラインを構築するメカニズムについて説明します。Beam SDKのクラスを使用してパイプラインを構築するには、プログラムで次の一般的な手順を実行する必要があります。
Pipeline
オブジェクトを作成します。- ReadまたはCreate変換を使用して、パイプラインデータ用に1つ以上の
PCollection
を作成します。 - 各
PCollection
に変換を適用します。変換は、PCollection
内の要素を変更、フィルタリング、グループ化、分析、またはその他の方法で処理できます。各変換は新しい出力PCollection
を作成し、処理が完了するまで、追加の変換を適用できます。 - 最終的な変換された
PCollection
を書き込むか、または出力します。 - パイプラインを実行します。
パイプラインオブジェクトの作成
Beamプログラムは、多くの場合、Pipeline
オブジェクトを作成することから始まります。
Beam SDKでは、各パイプラインは型がPipeline
の明示的なオブジェクトで表されます。各Pipeline
オブジェクトは独立したエンティティであり、パイプラインが操作するデータと、そのデータに適用される変換の両方をカプセル化します。
パイプラインを作成するには、Pipeline
オブジェクトを宣言し、構成オプションを渡します。
パイプラインへのデータの読み込み
パイプラインの最初のPCollection
を作成するには、ルート変換をパイプラインオブジェクトに適用します。ルート変換は、外部データソースまたは指定したローカルデータからPCollection
を作成します。
Beam SDKには、Read
とCreate
の2種類のルート変換があります。Read
変換は、テキストファイルやデータベーステーブルなどの外部ソースからデータを読み取ります。Create
変換は、インメモリのjava.util.Collection
からPCollection
を作成します。
次のコード例は、テキストファイルからデータを読み取るためにTextIO.Read
ルート変換をapply
する方法を示しています。変換はPipeline
オブジェクトp
に適用され、PCollection<String>
の形式でパイプラインデータセットを返します。
パイプラインデータを処理するための変換の適用
Beam SDKで提供されているさまざまな変換を使用して、データを操作できます。これを行うには、処理する各PCollection
でapply
メソッドを呼び出し、必要な変換オブジェクトを引数として渡すことによって、パイプラインのPCollection
に変換を適用します。
次のコードは、文字列のPCollection
に変換をapply
する方法を示しています。変換は、各文字列の内容を反転させ、反転された文字列を含む新しいPCollection
を出力するユーザー定義のカスタム変換です。
入力はwords
という名前のPCollection<String>
です。コードは、ReverseWords
という名前のPTransform
オブジェクトのインスタンスをapply
に渡し、戻り値をreversedWords
という名前のPCollection<String>
として保存します。
最終的なパイプラインデータの書き込みまたは出力
パイプラインがすべての変換を適用したら、通常は結果を出力する必要があります。パイプラインの最終的なPCollection
を出力するには、そのPCollection
にWrite
変換を適用します。Write
変換は、PCollection
の要素をデータベーステーブルなどの外部データシンクに出力できます。Write
を使用すると、パイプラインの任意の時点でPCollection
を出力できますが、通常はパイプラインの最後にデータを出力します。
次のコード例は、TextIO.Write
変換をapply
して、String
のPCollection
をテキストファイルに書き込む方法を示しています。
パイプラインの実行
パイプラインを構築したら、run
メソッドを使用してパイプラインを実行します。パイプラインは非同期で実行されます。作成したプログラムは、パイプラインの仕様をパイプラインランナーに送信し、パイプラインランナーは実際のパイプライン操作のシーケンスを構築して実行します。
run
メソッドは非同期です。代わりにブロッキング実行が必要な場合は、waitUntilFinish
メソッドを追加してパイプラインを実行します。
次のステップ
- プログラミングガイド - パイプラインの作成、パイプラインオプションの構成、変換の適用について詳しく説明します。
- パイプラインをテストする.
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべて有用で明確でしたか?変更したいことはありますか?お知らせください!