実行モデル

Beam モデルでは、ランナーがパイプラインをさまざまな方法で実行できます。ランナーの選択の結果として、さまざまな影響が見られる場合があります。このページでは、Beam パイプラインの実行方法をよりよく理解できるように、これらの影響について説明します。

要素の処理

マシン間の要素のシリアル化と通信は、パイプラインの分散実行において最もコストのかかる操作の 1 つです。このシリアル化を回避するには、障害後に要素を再処理するか、別のマシンへの出力の分散を制限する必要がある場合があります。

シリアル化と通信

ランナーは、通信目的や、永続化などの他の理由でマシン間で要素をシリアル化する場合があります。

ランナーは、次のようなさまざまな方法で Transform 間で要素を転送することを決定する場合があります。

ランナーが要素をシリアル化して永続化する可能性のある状況には、次のようなものがあります。

  1. ステートフル DoFn の一部として使用する場合、ランナーは値を何らかの状態メカニズムに永続化する場合があります。
  2. 処理の結果をコミットする場合、ランナーは出力をチェックポイントとして永続化する場合があります。

バンドリングと永続化

Beam パイプラインは、「容易に並列化できる」問題に焦点を当てていることがよくあります。このため、API は要素を並行して処理することを強調しており、「PCollection 内の各要素にシーケンス番号を割り当てる」などのアクションを表現することが困難になっています。このようなアルゴリズムはスケーラビリティの問題に苦しむ可能性が高いため、これは意図的です。

すべての要素を並行して処理することにも欠点があります。具体的には、シンクに要素を書き込んだり、処理中に進捗状況をチェックポイントしたりするなど、操作をバッチ処理することが不可能になります。

すべての要素を同時に処理するのではなく、PCollection 内の要素は *バンドル* で処理されます。コレクションのバンドルへの分割は任意であり、ランナーによって選択されます。これにより、ランナーは、すべての要素の後に結果を永続化することと、障害が発生した場合にすべてを再試行する必要があることとの間の適切な中間点を選択できます。たとえば、ストリーミングランナーは小さなバンドルを処理してコミットすることを好み、バッチランナーは大きなバンドルを処理することを好む場合があります。

データパーティショニングとステージ間の実行

Beam パイプライン内の要素処理のパーティショニングと並列化は、次の 2 つのことに依存します。

Beam パイプラインは、ソース (たとえば、KafkaIOBigQueryIOJdbcIO、または独自のソース実装) からデータを読み取ります。Beam で Source を実装するには、それを分割可能な DoFn として実装する必要があります。分割可能な DoFn は、ランナーに作業の分割を容易にするためのインターフェイスを提供します。

Beam でキーベースの操作 (たとえば、GroupByKeyCombineReshuffle.perKey、およびステートフル DoFn) を実行する場合、Beam ランナーは *シャッフル*1 と呼ばれるデータのシリアル化と転送を実行します。シャッフルにより、同じキーのデータ要素を一緒に処理できます。

ランナーがデータを *シャッフル* する方法は、バッチとストリーミングの実行モードでわずかに異なる場合があります。

1一部のランナーの shuffle 操作と混同しないでください。

パイプライン実行におけるデータの順序付け

Beam モデルは、ランナーが要素を処理したり、PTransform 間で要素を転送したりする順序に関する厳密なガイドラインを定義していません。ランナーは、さまざまな形式でデータ転送セマンティクスを自由に実装できます。

ユーザーパイプラインがパイプライン実行で特定の順序付けセマンティクスに依存する必要があるユースケースがいくつか存在します。機能マトリックスドキュメントには、**キー順序付き配信** のランナーの動作が記載されています。

同じ Beam Transform から一連のバンドルを処理する単一の Beam ワーカーを考え、このステージからダウンストリームの PCollection にデータを出力する PTransform を考えます。最後に、このワーカーによって (同じバンドル内または異なるバンドルの一部として) 特定の順序で出力された *同じキーを持つ* 2 つのイベントを考えます。

Beam ランナーが、データ伝送方法の種類に関係なく、直ちにダウンストリームにある PTransform によってこれら 2 つのイベントが同じ順序で観測されることを保証する場合、Beam ランナーは **キー順序付き配信** をサポートすると言います。

この特性は、キーによって並列処理が制限されているランナーと操作で当てはまります。

Transform 内および Transform 間の障害と並列処理

このセクションでは、入力コレクションの要素が並行して処理される方法と、障害が発生した場合に Transform がどのように再試行されるかについて説明します。

1 つの Transform 内のデータ並列処理

単一の ParDo を実行する場合、ランナーは、図 1 に示すように、9 つの要素の入力コレクションの例を 2 つのバンドルに分割する可能性があります。

Bundle A contains five elements. Bundle B contains four elements.

図 1: ランナーが入力コレクションを 2 つのバンドルに分割します。

ParDo が実行されると、ワーカーは図 2 に示すように 2 つのバンドルを並行して処理できます。

Two workers process the two bundles in parallel. Worker one processes bundle A. Worker two processes bundle B.

図 2: 2 つのワーカーが 2 つのバンドルを並行して処理します。

要素は分割できないため、Transform の最大並列処理はコレクション内の要素の数によって異なります。図 3 では、入力コレクションに 9 つの要素があるため、最大並列処理は 9 つです。

Nine workers process a nine element input collection in parallel.

図3:9人のワーカーが、9つの要素を持つ入力コレクションを並行処理します。

注:分割可能なParDoを使用すると、単一の入力の処理を複数のバンドルに分割できます。この機能は開発中です。

Transform 間の依存並列処理

シーケンス内のParDo変換は、ランナーがバンドルを変更せずに、生成側の変換の出力要素を消費側の変換で実行するように選択した場合、依存並列になる可能性があります。図4では、特定の要素に対するParDo1の出力が同じワーカーで処理される必要がある場合、ParDo1ParDo2依存並列です。

ParDo1 processes an input collection that contains bundles A and B. ParDo2 then processes the output collection from ParDo1, which contains bundles C and D.

図4:シーケンス内の2つの変換と、対応する入力コレクション。

図5は、これらの依存並列変換がどのように実行されるかを示しています。最初のワーカーはバンドルA内の要素に対してParDo1を実行し(結果としてバンドルCになります)、次にバンドルC内の要素に対してParDo2を実行します。同様に、2番目のワーカーはバンドルB内の要素に対してParDo1を実行し(結果としてバンドルDになります)、次にバンドルD内の要素に対してParDo2を実行します。

Worker one executes ParDo1 on bundle A and Pardo2 on bundle C. Worker two executes ParDo1 on bundle B and ParDo2 on bundle D.

図5:2つのワーカーが依存並列のParDo変換を実行します。

このように変換を実行することで、ランナーはワーカー間で要素を再配布する必要がなくなり、通信コストを節約できます。ただし、最大の並列処理は、依存並列ステップの最初のステップの最大の並列処理に依存するようになりました。

1 つの Transform 内での障害

バンドル内の要素の処理が失敗した場合、バンドル全体が失敗します。バンドル内の要素は(パイプライン全体が失敗しないように)再試行する必要がありますが、同じバンドルで再試行する必要はありません。

この例では、9つの要素を持つ入力コレクションを持ち、2つのバンドルに分割された図1のParDoを使用します。

図6では、最初のワーカーはバンドルA内の5つの要素すべてを正常に処理します。2番目のワーカーはバンドルB内の4つの要素を処理します。最初の2つの要素は正常に処理され、3番目の要素の処理は失敗し、まだ処理を待機している要素が1つあります。

ランナーはバンドルB内のすべての要素を再試行し、2回目の試行で処理が正常に完了することがわかります。図に示すように、再試行は必ずしも最初の処理試行と同じワーカーで発生するとは限りません。

Worker two fails to process an element in bundle B. Worker one finishes processing bundle A and then successfully retries to execute bundle B.

図6:バンドルB内の要素の処理が失敗し、別のワーカーがバンドル全体を再試行します。

入力バンドル内の要素の処理中にエラーが発生したため、入力バンドル内のすべての要素を再処理する必要がありました。つまり、ランナーはバンドル全体の出力を破棄する必要があります(すべての結果が再計算されるため、状態の変更や設定されたタイマーなどを含みます)。

失敗した変換がParDoの場合、DoFnインスタンスは破棄され、放棄されることに注意してください。

連鎖障害: Transform 間の障害

ParDo2の要素の処理の失敗がParDo1の再実行を引き起こす場合、これらの2つのステップは同時失敗すると言われます。

この例では、図4の2つのParDoを使用します。

図7では、ワーカー2はバンドルB内のすべての要素に対してParDo1を正常に実行します。ただし、ワーカーはバンドルD内の要素の処理に失敗したため、ParDo2が失敗します(赤いXで示されています)。その結果、ランナーはParDo2の出力を破棄して再計算する必要があります。ランナーはParDo1ParDo2を一緒に実行していたため、ParDo1からの出力バンドルも破棄する必要があり、入力バンドル内のすべての要素を再試行する必要があります。これら2つのParDoは同時失敗です。

Worker two fails to process en element in bundle D, so all elements in both bundle B and bundle D must be retried.

図7:バンドルD内の要素の処理が失敗したため、入力バンドル内のすべての要素が再試行されます。

図に示すように、再試行は必ずしも最初の試行と同じ処理時間を持つとは限りません。

結合された失敗を経験したすべてのDoFnは、通常のDoFnライフサイクルに従っていないため、終了および破棄する必要があります。

このように変換を実行することで、ランナーは変換間で要素を永続化する必要がなくなり、永続化コストを節約できます。