パイプラインの設計

このページは、Apache Beamパイプラインの設計に役立ちます。パイプラインの構造の決定方法、データに適用するトランスフォームの選択方法、入力と出力の方法の決定方法に関する情報が含まれています。

このセクションを読む前に、Beamプログラミングガイドの情報に精通しておくことをお勧めします。

パイプライン設計時の考慮事項

Beamパイプラインを設計する際には、いくつかの基本的な質問を検討してください。

基本的なパイプライン

最も単純なパイプラインは、図1に示すように、線形な操作の流れを表しています。

A linear pipeline starts with one input collection, sequentially appliesthree transforms, and ends with one output collection.

図1:線形パイプライン。

ただし、パイプラインははるかに複雑になる可能性があります。 パイプラインは、ステップの有向非巡回グラフを表します。 複数の入力ソース、複数の出力シンクを持つことができ、その操作(PTransform)は、複数のPCollectionの読み取りと出力の両方を行うことができます。 以下の例は、パイプラインがとることができるさまざまな形状の一部を示しています。

分岐PCollection

トランスフォームはPCollectionを消費しないことを理解することが重要です。 代わりに、PCollectionの個々の要素をそれぞれ考慮し、出力として新しいPCollectionを作成します。このようにして、同じPCollection内の異なる要素に対して異なる処理を行うことができます。

同じPCollectionを処理する複数のトランスフォーム

入力として同じPCollectionを複数のトランスフォームに使用できます。入力を消費したり、変更したりすることはありません。

図2のパイプラインは、分岐パイプラインです。 パイプラインは、(文字列として表現された)名を入力としてデータベーステーブルから読み取り、テーブル行のPCollectionを作成します。 次に、パイプラインは**同じ**PCollectionに複数のトランスフォームを適用します。 トランスフォームAは、そのPCollection内の「A」で始まるすべての名前を抽出し、トランスフォームBはそのPCollection内の「B」で始まるすべての名前を抽出します。 トランスフォームAとトランスフォームBの両方には、同じ入力PCollectionがあります。

The pipeline applies two transforms to a single input collection. Eachtransform produces an output collection.

図2:分岐パイプライン。 2つのトランスフォームが、データベーステーブル行の単一のPCollectionに適用されます。

次のコード例では、単一の入力コレクションに2つのトランスフォームを適用します。

PCollection<String> dbRowCollection = ...;

PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));

PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

複数の出力を生成する単一のトランスフォーム

パイプラインを分岐させるもう1つの方法は、タグ付き出力を使用して、**単一**のトランスフォームが複数のPCollectionに出力することです。 複数の出力を生成するトランスフォームは、入力の各要素を一度処理し、0個以上のPCollectionに出力します。

図3は、上記と同じ例を、複数の出力を生成する1つのトランスフォームを使用して示しています。「A」で始まる名前はメイン出力PCollectionに追加され、「B」で始まる名前は追加出力PCollectionに追加されます。

The pipeline applies one transform that produces multiple output collections.

図3:複数のPCollectionを出力するトランスフォームを持つパイプライン。

図2と図3のパイプラインを比較すると、異なる方法で同じ操作を実行していることがわかります。 図2のパイプラインには、同じ入力PCollection内の要素を処理する2つのトランスフォームが含まれています。 1つのトランスフォームは次のロジックを使用します。

if (starts with 'A') { outputToPCollectionA }

一方、もう1つのトランスフォームは次のロジックを使用します。

if (starts with 'B') { outputToPCollectionB }

各トランスフォームは入力PCollection全体を読み取るため、入力PCollectionの各要素は2回処理されます。

図3のパイプラインは、次のロジックを使用する1つのトランスフォームのみを使用して、異なる方法で同じ操作を実行します。

if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }

ここで、入力PCollectionの各要素は1回処理されます。

次のコード例では、各要素を1回処理して2つのコレクションを出力する1つのトランスフォームを適用します。

// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};

PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));

// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);

// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

いずれかのメカニズムを使用して、複数の出力PCollectionを生成できます。 最初のオプションは、処理ロジックを1つのParDoに結合することが論理的に意味をなさない場合にお勧めします。 ただし、2番目のオプション(複数の出力を生成する単一のトランスフォーム)は、要素ごとのトランスフォームの計算に時間がかかる場合、より意味があります。また、将来より多くの出力タイプを追加する予定がある場合は、よりスケーラブルになります。

PCollectionの結合

多くの場合、複数のトランスフォームを介してPCollectionを複数のPCollectionに分岐した後、それらの結果のPCollectionの一部またはすべてを再びマージしたい場合があります。 次のいずれかを使用して実行できます。

図4の例は、上記のセクションの図2の例の続きです。2つのPCollection(1つは名前が「A」で始まるもの、もう1つは名前が「B」で始まるもの)に分岐した後、パイプラインはこれらを1つのPCollectionにマージします。このPCollectionには、「A」または「B」で始まるすべての名前が含まれます。ここでFlattenを使用するのは理にかなっています。なぜなら、マージされるPCollectionはどちらも同じ型を含むためです。

The pipeline merges two collections into one collection with the Flatten transform.

図4:Flatten変換を使用して2つのコレクションを1つのコレクションにマージするパイプライン。

次のコード例では、Flattenを使用して2つのコレクションをマージしています。

//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

複数のソース

パイプラインは、1つ以上のソースから入力を読み取ることができます。複数のソースから読み取り、それらのソースからのデータが関連している場合、入力を結合すると役立つことがあります。以下の図5に示す例では、パイプラインはデータベーステーブルから名前と住所を読み取り、Kafkaトピックから名前と注文番号を読み取ります。次に、パイプラインはCoGroupByKeyを使用してこの情報を結合します。ここでキーは名前であり、結果のPCollectionには、名前、住所、注文のすべての組み合わせが含まれます。

The pipeline joins two input collections into one collection with the Join transform.

図5:2つの入力コレクションのリレーショナル結合を行うパイプライン。

次のコード例では、Joinを使用して2つの入力コレクションを結合しています。

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);

PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);

final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();

// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
  KeyedPCollectionTuple.of(addressTag, userAddress)
                       .and(orderTag, userOrder)
                       .apply(CoGroupByKey.<String>create());

joinedCollection.apply(...);

次のステップ