パイプラインの設計
このページは、Apache Beamパイプラインの設計に役立ちます。パイプラインの構造の決定方法、データに適用するトランスフォームの選択方法、入力と出力の方法の決定方法に関する情報が含まれています。
このセクションを読む前に、Beamプログラミングガイドの情報に精通しておくことをお勧めします。
パイプライン設計時の考慮事項
Beamパイプラインを設計する際には、いくつかの基本的な質問を検討してください。
- 入力データはどこに保存されていますか? 入力データセットはいくつありますか? これにより、パイプラインの先頭で適用する必要のある
Read
トランスフォームの種類が決まります。 - データの形式は? プレーンテキスト、フォーマットされたログファイル、またはデータベーステーブルの行である可能性があります。 一部のBeamトランスフォームは、キー/値ペアの
PCollection
でのみ動作します。データにキーが設定されているかどうか、そしてパイプラインのPCollection
でどのように最適に表現するかを判断する必要があります。 - データに対してどのような処理を行いたいですか? Beam SDKのコアトランスフォームは汎用です。 データの変更または操作が必要な方法を把握することで、ParDoなどのコアトランスフォームの構築方法、またはBeam SDKに含まれる事前に記述されたトランスフォームを使用するタイミングが決まります。
- 出力データの形式は?どこに保存しますか? これにより、パイプラインの最後に適用する必要のある
Write
トランスフォームの種類が決まります。
基本的なパイプライン
最も単純なパイプラインは、図1に示すように、線形な操作の流れを表しています。
図1:線形パイプライン。
ただし、パイプラインははるかに複雑になる可能性があります。 パイプラインは、ステップの有向非巡回グラフを表します。 複数の入力ソース、複数の出力シンクを持つことができ、その操作(PTransform
)は、複数のPCollection
の読み取りと出力の両方を行うことができます。 以下の例は、パイプラインがとることができるさまざまな形状の一部を示しています。
分岐PCollection
トランスフォームはPCollection
を消費しないことを理解することが重要です。 代わりに、PCollection
の個々の要素をそれぞれ考慮し、出力として新しいPCollection
を作成します。このようにして、同じPCollection
内の異なる要素に対して異なる処理を行うことができます。
同じPCollectionを処理する複数のトランスフォーム
入力として同じPCollection
を複数のトランスフォームに使用できます。入力を消費したり、変更したりすることはありません。
図2のパイプラインは、分岐パイプラインです。 パイプラインは、(文字列として表現された)名を入力としてデータベーステーブルから読み取り、テーブル行のPCollection
を作成します。 次に、パイプラインは**同じ**PCollection
に複数のトランスフォームを適用します。 トランスフォームAは、そのPCollection
内の「A」で始まるすべての名前を抽出し、トランスフォームBはそのPCollection
内の「B」で始まるすべての名前を抽出します。 トランスフォームAとトランスフォームBの両方には、同じ入力PCollection
があります。
図2:分岐パイプライン。 2つのトランスフォームが、データベーステーブル行の単一のPCollectionに適用されます。
次のコード例では、単一の入力コレクションに2つのトランスフォームを適用します。
PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){
c.output(c.element());
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){
c.output(c.element());
}
}
}));
複数の出力を生成する単一のトランスフォーム
パイプラインを分岐させるもう1つの方法は、タグ付き出力を使用して、**単一**のトランスフォームが複数のPCollection
に出力することです。 複数の出力を生成するトランスフォームは、入力の各要素を一度処理し、0個以上のPCollection
に出力します。
図3は、上記と同じ例を、複数の出力を生成する1つのトランスフォームを使用して示しています。「A」で始まる名前はメイン出力PCollection
に追加され、「B」で始まる名前は追加出力PCollection
に追加されます。
図3:複数のPCollectionを出力するトランスフォームを持つパイプライン。
図2と図3のパイプラインを比較すると、異なる方法で同じ操作を実行していることがわかります。 図2のパイプラインには、同じ入力PCollection
内の要素を処理する2つのトランスフォームが含まれています。 1つのトランスフォームは次のロジックを使用します。
if (starts with 'A') { outputToPCollectionA }
一方、もう1つのトランスフォームは次のロジックを使用します。
if (starts with 'B') { outputToPCollectionB }
各トランスフォームは入力PCollection
全体を読み取るため、入力PCollection
の各要素は2回処理されます。
図3のパイプラインは、次のロジックを使用する1つのトランスフォームのみを使用して、異なる方法で同じ操作を実行します。
if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }
ここで、入力PCollection
の各要素は1回処理されます。
次のコード例では、各要素を1回処理して2つのコレクションを出力する1つのトランスフォームを適用します。
// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// Emit to main output, which is the output with tag startsWithATag.
c.output(c.element());
} else if(c.element().startsWith("B")) {
// Emit to output with tag startsWithBTag.
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
いずれかのメカニズムを使用して、複数の出力PCollection
を生成できます。 最初のオプションは、処理ロジックを1つのParDo
に結合することが論理的に意味をなさない場合にお勧めします。 ただし、2番目のオプション(複数の出力を生成する単一のトランスフォーム)は、要素ごとのトランスフォームの計算に時間がかかる場合、より意味があります。また、将来より多くの出力タイプを追加する予定がある場合は、よりスケーラブルになります。
PCollectionの結合
多くの場合、複数のトランスフォームを介してPCollection
を複数のPCollection
に分岐した後、それらの結果のPCollection
の一部またはすべてを再びマージしたい場合があります。 次のいずれかを使用して実行できます。
- Flatten - Beam SDKの
Flatten
トランスフォームを使用して、**同じ型**の複数のPCollection
をマージできます。 - Join - Beam SDKの
CoGroupByKey
トランスフォームを使用して、2つのPCollection
間のリレーショナル結合を実行できます。PCollection
にはキーを設定する必要があり(つまり、キー/値ペアのコレクションである必要があります)、同じキータイプを使用する必要があります。
図4の例は、上記のセクションの図2の例の続きです。2つのPCollection
(1つは名前が「A」で始まるもの、もう1つは名前が「B」で始まるもの)に分岐した後、パイプラインはこれらを1つのPCollection
にマージします。このPCollection
には、「A」または「B」で始まるすべての名前が含まれます。ここでFlatten
を使用するのは理にかなっています。なぜなら、マージされるPCollection
はどちらも同じ型を含むためです。
図4:Flatten変換を使用して2つのコレクションを1つのコレクションにマージするパイプライン。
次のコード例では、Flatten
を使用して2つのコレクションをマージしています。
//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
複数のソース
パイプラインは、1つ以上のソースから入力を読み取ることができます。複数のソースから読み取り、それらのソースからのデータが関連している場合、入力を結合すると役立つことがあります。以下の図5に示す例では、パイプラインはデータベーステーブルから名前と住所を読み取り、Kafkaトピックから名前と注文番号を読み取ります。次に、パイプラインはCoGroupByKey
を使用してこの情報を結合します。ここでキーは名前であり、結果のPCollection
には、名前、住所、注文のすべての組み合わせが含まれます。
図5:2つの入力コレクションのリレーショナル結合を行うパイプライン。
次のコード例では、Join
を使用して2つの入力コレクションを結合しています。
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
次のステップ
最終更新日:2024年10月31日
お探しのものはお見つけいただけましたか?
すべて役に立ち、明確でしたか?変更したい点があればお知らせください!