パイプラインの作成

Beamプログラムは、開始から終了までのデータ処理パイプラインを表現します。このセクションでは、Beam SDKのクラスを使用してパイプラインを構築するメカニズムについて説明します。Beam SDKのクラスを使用してパイプラインを構築するには、プログラムで次の一般的な手順を実行する必要があります。

パイプラインオブジェクトの作成

Beamプログラムは、多くの場合、Pipelineオブジェクトを作成することから始まります。

Beam SDKでは、各パイプラインは型がPipelineの明示的なオブジェクトで表されます。各Pipelineオブジェクトは独立したエンティティであり、パイプラインが操作するデータと、そのデータに適用される変換の両方をカプセル化します。

パイプラインを作成するには、Pipelineオブジェクトを宣言し、構成オプションを渡します。

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

パイプラインへのデータの読み込み

パイプラインの最初のPCollectionを作成するには、ルート変換をパイプラインオブジェクトに適用します。ルート変換は、外部データソースまたは指定したローカルデータからPCollectionを作成します。

Beam SDKには、ReadCreateの2種類のルート変換があります。Read変換は、テキストファイルやデータベーステーブルなどの外部ソースからデータを読み取ります。Create変換は、インメモリのjava.util.CollectionからPCollectionを作成します。

次のコード例は、テキストファイルからデータを読み取るためにTextIO.Readルート変換をapplyする方法を示しています。変換はPipelineオブジェクトpに適用され、PCollection<String>の形式でパイプラインデータセットを返します。

PCollection<String> lines = p.apply(
  "ReadLines", TextIO.read().from("gs://some/inputData.txt"));

パイプラインデータを処理するための変換の適用

Beam SDKで提供されているさまざまな変換を使用して、データを操作できます。これを行うには、処理する各PCollectionapplyメソッドを呼び出し、必要な変換オブジェクトを引数として渡すことによって、パイプラインのPCollectionに変換を適用します。

次のコードは、文字列のPCollectionに変換をapplyする方法を示しています。変換は、各文字列の内容を反転させ、反転された文字列を含む新しいPCollectionを出力するユーザー定義のカスタム変換です。

入力はwordsという名前のPCollection<String>です。コードは、ReverseWordsという名前のPTransformオブジェクトのインスタンスをapplyに渡し、戻り値をreversedWordsという名前のPCollection<String>として保存します。

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

最終的なパイプラインデータの書き込みまたは出力

パイプラインがすべての変換を適用したら、通常は結果を出力する必要があります。パイプラインの最終的なPCollectionを出力するには、そのPCollectionWrite変換を適用します。Write変換は、PCollectionの要素をデータベーステーブルなどの外部データシンクに出力できます。Writeを使用すると、パイプラインの任意の時点でPCollectionを出力できますが、通常はパイプラインの最後にデータを出力します。

次のコード例は、TextIO.Write変換をapplyして、StringPCollectionをテキストファイルに書き込む方法を示しています。

PCollection<String> filteredWords = ...;

filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));

パイプラインの実行

パイプラインを構築したら、runメソッドを使用してパイプラインを実行します。パイプラインは非同期で実行されます。作成したプログラムは、パイプラインの仕様をパイプラインランナーに送信し、パイプラインランナーは実際のパイプライン操作のシーケンスを構築して実行します。

p.run();

runメソッドは非同期です。代わりにブロッキング実行が必要な場合は、waitUntilFinishメソッドを追加してパイプラインを実行します。

p.run().waitUntilFinish();

次のステップ