実行モデル
Beam モデルでは、ランナーがパイプラインをさまざまな方法で実行できます。ランナーの選択の結果として、さまざまな影響が見られる場合があります。このページでは、Beam パイプラインの実行方法をよりよく理解できるように、これらの影響について説明します。
要素の処理
マシン間の要素のシリアル化と通信は、パイプラインの分散実行において最もコストのかかる操作の 1 つです。このシリアル化を回避するには、障害後に要素を再処理するか、別のマシンへの出力の分散を制限する必要がある場合があります。
シリアル化と通信
ランナーは、通信目的や、永続化などの他の理由でマシン間で要素をシリアル化する場合があります。
ランナーは、次のようなさまざまな方法で Transform 間で要素を転送することを決定する場合があります。
- グループ化操作の一部として、処理のために要素をワーカーにルーティングします。これには、要素をシリアル化し、キーでグループ化またはソートすることが含まれる場合があります。
- 並列処理を調整するために、ワーカー間で要素を再配布します。これには、要素をシリアル化し、他のワーカーに通信することが含まれる場合があります。
ParDo
への Side Input で要素を使用します。これには、要素をシリアル化し、ParDo
を実行するすべてのワーカーにブロードキャストすることが必要な場合があります。- 同じワーカーで実行されている Transform 間で要素を渡します。これにより、ランナーは要素をシリアル化する必要がなくなり、代わりにランナーはメモリ内で要素を渡すだけです。これは、融合として知られる最適化の一部として行われます。
ランナーが要素をシリアル化して永続化する可能性のある状況には、次のようなものがあります。
- ステートフル
DoFn
の一部として使用する場合、ランナーは値を何らかの状態メカニズムに永続化する場合があります。 - 処理の結果をコミットする場合、ランナーは出力をチェックポイントとして永続化する場合があります。
バンドリングと永続化
Beam パイプラインは、「容易に並列化できる」問題に焦点を当てていることがよくあります。このため、API は要素を並行して処理することを強調しており、「PCollection 内の各要素にシーケンス番号を割り当てる」などのアクションを表現することが困難になっています。このようなアルゴリズムはスケーラビリティの問題に苦しむ可能性が高いため、これは意図的です。
すべての要素を並行して処理することにも欠点があります。具体的には、シンクに要素を書き込んだり、処理中に進捗状況をチェックポイントしたりするなど、操作をバッチ処理することが不可能になります。
すべての要素を同時に処理するのではなく、PCollection
内の要素は *バンドル* で処理されます。コレクションのバンドルへの分割は任意であり、ランナーによって選択されます。これにより、ランナーは、すべての要素の後に結果を永続化することと、障害が発生した場合にすべてを再試行する必要があることとの間の適切な中間点を選択できます。たとえば、ストリーミングランナーは小さなバンドルを処理してコミットすることを好み、バッチランナーは大きなバンドルを処理することを好む場合があります。
データパーティショニングとステージ間の実行
Beam パイプライン内の要素処理のパーティショニングと並列化は、次の 2 つのことに依存します。
- データソースの実装
- ステージ間のキー並列処理
Beam パイプラインは、ソース (たとえば、KafkaIO
、BigQueryIO
、JdbcIO
、または独自のソース実装) からデータを読み取ります。Beam で Source を実装するには、それを分割可能な DoFn
として実装する必要があります。分割可能な DoFn
は、ランナーに作業の分割を容易にするためのインターフェイスを提供します。
Beam でキーベースの操作 (たとえば、GroupByKey
、Combine
、Reshuffle.perKey
、およびステートフル DoFn
) を実行する場合、Beam ランナーは *シャッフル*1 と呼ばれるデータのシリアル化と転送を実行します。シャッフルにより、同じキーのデータ要素を一緒に処理できます。
ランナーがデータを *シャッフル* する方法は、バッチとストリーミングの実行モードでわずかに異なる場合があります。
1一部のランナーの shuffle
操作と混同しないでください。
パイプライン実行におけるデータの順序付け
Beam モデルは、ランナーが要素を処理したり、PTransform
間で要素を転送したりする順序に関する厳密なガイドラインを定義していません。ランナーは、さまざまな形式でデータ転送セマンティクスを自由に実装できます。
ユーザーパイプラインがパイプライン実行で特定の順序付けセマンティクスに依存する必要があるユースケースがいくつか存在します。機能マトリックスドキュメントには、**キー順序付き配信** のランナーの動作が記載されています。
同じ Beam Transform から一連のバンドルを処理する単一の Beam ワーカーを考え、このステージからダウンストリームの PCollection
にデータを出力する PTransform
を考えます。最後に、このワーカーによって (同じバンドル内または異なるバンドルの一部として) 特定の順序で出力された *同じキーを持つ* 2 つのイベントを考えます。
Beam ランナーが、データ伝送方法の種類に関係なく、直ちにダウンストリームにある PTransform によってこれら 2 つのイベントが同じ順序で観測されることを保証する場合、Beam ランナーは **キー順序付き配信** をサポートすると言います。
この特性は、キーによって並列処理が制限されているランナーと操作で当てはまります。
Transform 内および Transform 間の障害と並列処理
このセクションでは、入力コレクションの要素が並行して処理される方法と、障害が発生した場合に Transform がどのように再試行されるかについて説明します。
1 つの Transform 内のデータ並列処理
単一の ParDo
を実行する場合、ランナーは、図 1 に示すように、9 つの要素の入力コレクションの例を 2 つのバンドルに分割する可能性があります。
図 1: ランナーが入力コレクションを 2 つのバンドルに分割します。
ParDo
が実行されると、ワーカーは図 2 に示すように 2 つのバンドルを並行して処理できます。
図 2: 2 つのワーカーが 2 つのバンドルを並行して処理します。
要素は分割できないため、Transform の最大並列処理はコレクション内の要素の数によって異なります。図 3 では、入力コレクションに 9 つの要素があるため、最大並列処理は 9 つです。
図3:9人のワーカーが、9つの要素を持つ入力コレクションを並行処理します。
注:分割可能なParDoを使用すると、単一の入力の処理を複数のバンドルに分割できます。この機能は開発中です。
Transform 間の依存並列処理
シーケンス内のParDo
変換は、ランナーがバンドルを変更せずに、生成側の変換の出力要素を消費側の変換で実行するように選択した場合、依存並列になる可能性があります。図4では、特定の要素に対するParDo1
の出力が同じワーカーで処理される必要がある場合、ParDo1
とParDo2
は依存並列です。
図4:シーケンス内の2つの変換と、対応する入力コレクション。
図5は、これらの依存並列変換がどのように実行されるかを示しています。最初のワーカーはバンドルA内の要素に対してParDo1
を実行し(結果としてバンドルCになります)、次にバンドルC内の要素に対してParDo2
を実行します。同様に、2番目のワーカーはバンドルB内の要素に対してParDo1
を実行し(結果としてバンドルDになります)、次にバンドルD内の要素に対してParDo2
を実行します。
図5:2つのワーカーが依存並列のParDo変換を実行します。
このように変換を実行することで、ランナーはワーカー間で要素を再配布する必要がなくなり、通信コストを節約できます。ただし、最大の並列処理は、依存並列ステップの最初のステップの最大の並列処理に依存するようになりました。
1 つの Transform 内での障害
バンドル内の要素の処理が失敗した場合、バンドル全体が失敗します。バンドル内の要素は(パイプライン全体が失敗しないように)再試行する必要がありますが、同じバンドルで再試行する必要はありません。
この例では、9つの要素を持つ入力コレクションを持ち、2つのバンドルに分割された図1のParDo
を使用します。
図6では、最初のワーカーはバンドルA内の5つの要素すべてを正常に処理します。2番目のワーカーはバンドルB内の4つの要素を処理します。最初の2つの要素は正常に処理され、3番目の要素の処理は失敗し、まだ処理を待機している要素が1つあります。
ランナーはバンドルB内のすべての要素を再試行し、2回目の試行で処理が正常に完了することがわかります。図に示すように、再試行は必ずしも最初の処理試行と同じワーカーで発生するとは限りません。
図6:バンドルB内の要素の処理が失敗し、別のワーカーがバンドル全体を再試行します。
入力バンドル内の要素の処理中にエラーが発生したため、入力バンドル内のすべての要素を再処理する必要がありました。つまり、ランナーはバンドル全体の出力を破棄する必要があります(すべての結果が再計算されるため、状態の変更や設定されたタイマーなどを含みます)。
失敗した変換がParDo
の場合、DoFn
インスタンスは破棄され、放棄されることに注意してください。
連鎖障害: Transform 間の障害
ParDo2
の要素の処理の失敗がParDo1
の再実行を引き起こす場合、これらの2つのステップは同時失敗すると言われます。
この例では、図4の2つのParDo
を使用します。
図7では、ワーカー2はバンドルB内のすべての要素に対してParDo1
を正常に実行します。ただし、ワーカーはバンドルD内の要素の処理に失敗したため、ParDo2
が失敗します(赤いXで示されています)。その結果、ランナーはParDo2
の出力を破棄して再計算する必要があります。ランナーはParDo1
とParDo2
を一緒に実行していたため、ParDo1
からの出力バンドルも破棄する必要があり、入力バンドル内のすべての要素を再試行する必要があります。これら2つのParDo
は同時失敗です。
図7:バンドルD内の要素の処理が失敗したため、入力バンドル内のすべての要素が再試行されます。
図に示すように、再試行は必ずしも最初の試行と同じ処理時間を持つとは限りません。
結合された失敗を経験したすべてのDoFn
は、通常のDoFn
ライフサイクルに従っていないため、終了および破棄する必要があります。
このように変換を実行することで、ランナーは変換間で要素を永続化する必要がなくなり、永続化コストを節約できます。
最終更新日:2024/10/31
お探しの情報はすべて見つかりましたか?
すべてが役立ち、明確でしたか?変更したい点はありますか?ぜひお知らせください!