ランナー作成ガイド
このガイドでは、新しいランナーの実装方法について説明します。データ処理システムがあり、それを使用してBeamパイプラインを実行したい人を対象としています。ガイドは、今後の作業を評価するのに役立つ基本から始まります。その後、セクションはますます詳細になり、ランナーの開発全体を通してリソースとなります。
対象となるトピック
Beamプリミティブの実装
データのエンコードと永続化(おそらくあなたのエンジンはすでに何らかの方法で行っている)は別として、あなたがする必要があることのほとんどは、Beamプリミティブを実装することです。このセクションでは、各プリミティブについて詳しく説明し、明らかではないかもしれない知っておくべきことと、提供されているサポートコードについて説明します。
プリミティブは、ランナーの作成者ではなく、パイプラインの作成者のために設計されています。それぞれは、特定の実装の決定ではなく、異なる概念的な操作モード(外部IO、要素単位、グループ化、ウィンドウ化、ユニオン)を表します。同じプリミティブでも、ユーザーがどのようにインスタンス化するかによって、まったく異なる実装が必要になる場合があります。たとえば、状態またはタイマーを使用するParDo
ではキーのパーティション分割が必要になる場合があり、投機的トリガーを使用するGroupByKey
では、よりコストがかかり複雑な実装が必要になる場合があります。
これらの機能の一部を実装していない場合はどうなりますか?
それで問題ありません!すべてを一度に行う必要はなく、ランナーがサポートする意味のない機能さえあるかもしれません。Beamサイトでは機能マトリックスを管理しているので、ユーザーに何をサポートしているかを伝えることができます。Pipeline
を受け取ったら、それをトラバースして、見つかった各DoFn
を実行できるかどうかを判断する必要があります。パイプライン内の一部のDoFn
を実行できない場合(またはランナーに不足している他の要件がある場合)、パイプラインを拒否する必要があります。ネイティブ環境では、これはUnsupportedOperationException
をスローするのと同じように見える場合があります。Runner API RPCは、言語間の移植性のために、これを明示的にします。
Impulseプリミティブの実装
Impulse
は、入力を取らず、パイプラインのライフタイム中に正確に1つの出力を生成するPTransformです。これは、最小タイムスタンプを持つグローバルウィンドウ内の空のバイトである必要があります。標準のウィンドウ値コーダーでエンコードすると、エンコード値は7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00
になります。
Impulse
は一般的にユーザーによって呼び出されませんが、唯一のルートプリミティブ操作であり、他のルート操作(Read
やCreate
など)は、Impulse
とそれに続く一連の(場合によっては分割可能な)ParDo
で構成される複合操作です。
ParDoプリミティブの実装
ParDo
プリミティブは、PCollection
の要素ごとの変換を記述します。ParDo
は、要素ごとの処理が記述されている場所であるため、最も複雑なプリミティブです。関数型プログラミングの標準的なmap
やflatMap
などの非常に単純な操作に加えて、ParDo
は複数の出力、サイド入力、初期化、フラッシュ、ティアダウン、およびステートフル処理もサポートしています。
各要素に適用されるUDFは、DoFn
と呼ばれます。DoFn
の正確なAPIは言語/ SDKによって異なる場合がありますが、一般的には同じパターンに従うため、擬似コードで説明できます。また、現在および将来のランナーのほとんどはJavaベースであるため、Javaサポートコードをよく参照します。
一般に、入力データセット全体に一連のParDo
を一度に1つずつ適用するのではなく、一連のマッピング操作(一般的にはDAG)で構成される単一の実行可能ステージで複数のParDo
を融合する方が効率的です。ParDo
に加えて、ウィンドウ操作、ローカル(GBK前またはGBK後)の結合操作、および他のマッピング操作もこれらのステージに融合される場合があります。
DoFnはランナー自体とは異なる言語でコードを実行したり、異なる環境を必要とする場合があるため、Beamはこれらをプロセス間で呼び出す機能を提供します。これは、Beam Fn APIの核心であり、詳細は以下で説明します。ただし、環境に互換性がある場合、ランナーがプロセス内でこのユーザーコードを呼び出す(簡潔さまたは効率のために)ことは完全に許容されます。
バンドル
正確性を期すために、DoFn
は要素ごとの関数を表す*べき*ですが、ほとんどのSDKでは、これはバンドルと呼ばれる小さなグループで要素を処理する長寿命のオブジェクトです。
ランナーは、バンドルに含める要素の数とどの要素を含めるかを決定し、処理の途中で現在のバンドルが「終了」したことを動的に決定することさえできます。バンドルの処理方法は、DoFnの残りのライフサイクルと結びついています。
一般に、初期化と終了のコストが多くの要素に償却されるため、可能な限り最大のバンドルを作成することでスループットが向上します。ただし、データがストリームとして到着する場合、適切なレイテンシを実現するためにバンドルを終了する必要があるため、バンドルはわずか数個の要素になる場合があります。
バンドルは、Beamにおけるコミットメントの単位です。バンドルの処理中にエラーが発生した場合、そのバンドルの以前のすべての出力(状態またはタイマーへの変更を含む)はランナーによって破棄され、バンドル全体が再試行される必要があります。バンドルが正常に完了すると、その出力は、状態/タイマーの変更とウォーターマークの更新とともに、アトミックにコミットされる必要があります。
DoFnライフサイクル
多くのSDKのDoFns
には、標準の要素ごとのprocess
呼び出しに加えて、setup
、start_bundle
、finish_bundle
、teardown
などのメソッドがいくつかあります。このライフサイクルの適切な呼び出しは、標準バンドルプロセッサから1つ以上のDoFn
を呼び出すときに(FnAPIを介して、またはBundleProcessor(java(python))を使用して直接)自動的に処理される必要があります。SDKに依存しないランナーは、これらの詳細を直接気にする必要はありません。
サイド入力
主な設計ドキュメント:https://s.apache.org/beam-side-inputs-1-pager
サイド入力は、PCollection
のウィンドウのグローバルビューです。これは、一度に1つの要素が処理されるメイン入力とは異なります。SDK/ユーザーはPCollection
を適切に準備し、ランナーはそれを具体化し、ランナーはそれをDoFn
にフィードします。
ランナーによってParDo
にプッシュされるメイン入力データ(一般的にはFnApiデータチャネルを介して)とは異なり、サイド入力データはParDo
によってランナーからプルされます(一般的にはFnAPI状態チャネルを介して)。
サイド入力には、特定のaccess_pattern
を介してアクセスします。現在、StandardSideInputTypes
プロトコルで列挙されているアクセス パターンは2つあります。beam:side_input:iterable:v1
は、ランナーが特定のウィンドウに対応するPCollectionのすべての値を返す必要があることを示し、beam:side_input:multimap:v1
は、ランナーが特定のキーとウィンドウに対応するすべての値を返す必要があることを示します。これらのアクセス パターンを効率的に提供できるかどうかは、ランナーがこのPCollectionをどのように具体化するか influenced how a runner materializes this PCollection.
SideInputは、ParDo
変換のParDoPayload
にあるside_inputs
マップを調べることで検出できます。 ParDo
操作自体は、window_mapping_fn
(ランナーを呼び出す前)とview_fn
(ランナーから返された値に対して)を呼び出す役割を担っているため、ランナーはこれらのフィールドを気にする必要はありません。
サイド入力が必要なのに、特定のウィンドウに対応するデータがサイド入力にない場合、そのウィンドウの要素は、サイド入力にデータが入るか、ウォーターマークが十分に進み、そのウィンドウにデータがないことが確実になるまで遅延させる必要があります。 PushBackSideInputDoFnRunner
は、これを実装した例です。
状態とタイマー
主要な設計ドキュメント: https://s.apache.org/beam-state
ParDo
に状態とタイマーが含まれている場合、ランナーでの実行は通常大きく異なります。具体的には、バンドルが完了したときに状態を永続化し、将来のバンドルのために取得する必要があります。設定されたタイマーは、ウォーターマークが十分に進んだときに、将来のバンドルに挿入する必要もあります。
状態とタイマーは、キーとウィンドウごとに分割されます。つまり、特定のキーを処理するDoFn
は、このキーを共有するすべての要素にわたって、状態とタイマーの一貫したビューを持つ必要があります。これをサポートするために、明示的にデータをシャッフルする必要がある場合や、そうした方が良い場合があります。ウォーターマークがウィンドウの終わり(および許可されている遅延時間がある場合はそれを加えた時間)を過ぎると、このウィンドウに関連付けられた状態は破棄できます。
状態の設定と取得はFnAPI Stateチャネルで実行され、タイマーの設定と起動はFnAPI Dataチャネルで実行されます。
分割可能なDoFn
主要な設計ドキュメント: https://s.apache.org/splittable-do-fn
分割可能なDoFn
は、並列実行できる高ファンアウトマッピングに役立つParDo
の一般化です。このような操作の典型的な例は、ファイルからの読み取りです。単一のファイル名(入力要素として)を、そのファイルに含まれるすべての要素にマッピングできます。 DoFn
は、たとえば単一のファイルを表現する要素を、異なるワーカーによって処理(たとえば、読み取り)されるように分割(たとえば、そのファイルの範囲に分割)できるという意味で、分割可能と見なされます。このプリミティブの真価は、これらの分割が静的に(つまり事前に)行われるだけでなく、動的に行われることで、過剰分割または分割不足の問題を回避できることにあります。
分割可能なDoFn
の完全な説明はこのドキュメントの範囲外ですが、その実行に関連する簡単な概要を以下に示します。
分割可能なDoFn
は、要素内と要素間の両方で分割することにより、動的分割プロトコルに参加できます。動的分割は、ランナーが制御チャネルでProcessBundleSplitRequest
メッセージを発行することによってトリガーされます。 SDKは、指定された要素の一部のみを処理することをコミットし、残りの部分(つまり、未処理の部分)の説明をProcessBundleSplitResponse
でランナーに返して、ランナーによってスケジュールされるようにします(たとえば、別のワーカーで、または別のバンドルの一部として)。
分割可能なDoFn
は、独自の分割を開始することもできます。これは、要素を可能な限り処理したが(たとえば、ファイルの末尾を読み取っている場合)、まだ処理すべきものが残っていることを示します。これらは、非有界ソースを読み取るときに最も頻繁に発生します。この場合、延期された作業を表す要素のセットは、ProcessBundleResponse
のresidual_roots
フィールドに返されます。ランナーは、将来のある時点で、residual_roots
で指定された要素を使用して、これらの同じ操作を再起動する必要があります。
GroupByKey(およびウィンドウ)プリミティブの実装
GroupByKey
操作(略してGBKと呼ばれることもあります)は、キーとウィンドウごとにキーと値のペアのPCollection
をグループ化し、PCollection
のトリガー設定に従って結果を出力します。
これは、同じキーを持つ要素を単に同じ場所に配置するよりもはるかに複雑であり、PCollection
のウィンドウ戦略の多くのフィールドを使用します。
エンコードされたバイトによるグループ化
キーとウィンドウの両方について、ランナーはそれらを「単なるバイト」として認識します。そのため、関係する型の特別な知識がある場合でも、それらのバイトによるグループ化と一致する方法でグループ化する必要があります。
処理する要素はキーと値のペアであり、キーを抽出する必要があります。このため、キーと値のペアの形式は、すべてのSDKで標準化および共有されています。バイナリ形式のドキュメントについては、JavaのKvCoder
またはPythonのTupleCoder
を参照してください。
ウィンドウのマージ
ランナーは、キー別にグループ化するだけでなく、ウィンドウ別に要素をグループ化する必要があります。 WindowFn
には、キーごとにウィンドウをマージすることを宣言するオプションがあります。たとえば、同じキーのセッションウィンドウは、重複する場合にマージされます。そのため、ランナーはグループ化中にWindowFn
のマージメソッドを呼び出す必要があります。
GroupByKeyOnly + GroupAlsoByWindowによる実装
JavaおよびPythonのコードベースには、完全なGroupByKey
操作を実装するための特に一般的な方法のサポートコードが含まれています。最初にキーをグループ化し、次にウィンドウ別にグループ化します。マージウィンドウの場合、マージはキーごとに行われるため、これは基本的に必須です。
多くの場合、タイムスタンプ順に値のセットを提示することで、これらの値を最終的なウィンドウにグループ化することをより効率的に行うことができます。
遅延データの破棄
主要な設計ドキュメント: https://s.apache.org/beam-lateness
入力PCollectionのウォーターマークが、ウィンドウの終わりを超えて、入力PCollection
の許容遅延時間以上になった場合、PCollection
のウィンドウは期限切れになります。
期限切れのウィンドウのデータはいつでも破棄でき、GroupByKey
で破棄する必要があります。 GroupAlsoByWindow
を使用している場合は、この変換を実行する直前に破棄します。 GroupByKeyOnly
の前にデータを破棄すると、シャッフルされるデータが少なくなりますが、期限切れに見えるウィンドウがマージされて期限切れでなくなる可能性があるため、マージしないウィンドウの場合にのみ安全に行う必要があります。
トリガー
主要な設計ドキュメント: https://s.apache.org/beam-triggers
入力PCollection
のトリガーと累積モードは、GroupByKey
操作から出力をいつどのように出力するかを指定します。
Javaでは、GroupAlsoByWindow
の実装、ReduceFnRunner
(レガシー名)、およびTriggerStateMachine
に、トリガーを実行するための多くのサポートコードがあります。 TriggerStateMachine
は、すべてのトリガーを要素とタイマーに対するイベント駆動型マシンとして実装する明白な方法です。 Pythonでは、これはTriggerDriverクラスによってサポートされています。
TimestampCombiner
集約された出力が複数の入力から生成される場合、GroupByKey
操作は、組み合わせのタイムスタンプを選択する必要があります。そのため、最初にWindowFnはタイムスタンプをシフトする機会があります。これは、ウォーターマークがスライドウィンドウなどのウィンドウの進行を妨げないようにするために必要です(詳細は、このドキュメントの範囲外です)。次に、シフトされたタイムスタンプを組み合わせる必要があります。これはTimestampCombiner
によって指定されます。 TimestampCombiner
は、入力の最小値または最大値を選択するか、入力を無視してウィンドウの終わりを選択することができます。
Windowプリミティブの実装
ウィンドウプリミティブは、WindowFn
UDFを適用して、各入力要素を出力PCollectionの1つ以上のウィンドウに配置します。プリミティブは、一般にPCollection
のウィンドウ戦略の他の側面も構成しますが、ランナーが受信する完全に構築されたグラフには、各PCollection
の完全なウィンドウ戦略が既に含まれています。
このプリミティブを実装するには、提供されたWindowFnを各要素で呼び出す必要があります。これにより、その要素が出力PCollection
の一部となるウィンドウのセットが返されます。
ほとんどのランナーは、これらのウィンドウ変更マッピングをDoFns
と融合することによってこれを実装します。
実装に関する考慮事項
「ウィンドウ」は、「最大タイムスタンプ」を持つ2番目のグループ化キーにすぎません。任意のユーザー定義型にすることができます。 WindowFn
は、ウィンドウ型のコーダーを提供します。
Beamのサポートコードは、複数のウィンドウの要素の圧縮表現であるWindowedValue
を提供します。これを使用するか、独自の圧縮表現を使用することができます。それは単に複数の要素を同時に表していることを忘れないでください。要素が「複数のウィンドウにある」ということはありません。
グローバルウィンドウの値については、ウィンドウをまったく含まない、さらに圧縮された表現を使用することをお勧めします。
シリアル化されたデータのサイズを削減するために使用できるPARAM_WINDOWED_VALUE
などの最適化を備えたコーダーを提供しています。
将来的には、このプリミティブは、ParDoの機能が新しいウィンドウへの出力を許可するように拡張された場合、ParDoとして実装できるため、廃止される可能性があります。
Flattenプリミティブの実装
これは簡単です。有限のPCollection
セットを入力として受け取り、ウィンドウをそのままにして、それらのバッグユニオンを出力します。
この操作を意味のあるものにするために、ウィンドウ戦略に互換性があることを確認するのはSDKの責任です。
また、すべてのPCollection
のコーダーが同じである必要はないことに注意してください。ランナーがそれを要求する場合(面倒な再エンコードを避けるため)、自分でそれを強制する必要があります。または、高速パスを最適化として実装することもできます。
特記事項:Combineコンポジット
ランナーによってほぼ常に特別に扱われる複合変換は、CombinePerKey
です。これは、PCollection
の要素に結合的で可換な演算子を適用します。この複合はプリミティブではありません。 ParDo
とGroupByKey
で実装されているため、ランナーはそれを処理せずに動作しますが、最適化のために使用したい追加情報、つまりCombineFn
と呼ばれる結合的-可換演算子が含まれています。
一般に、ランナーは、いわゆるコンバイナーリフティングによってこれを実装することを望みます。コンバイナーリフティングでは、GroupByKey
の前に新しい操作が配置され、部分的な(バンドル内)結合が行われます。これには、多くの場合、GroupByKey
の後にあるものもわずかに変更する必要があります。この変換の例は、この最適化のPythonまたはGo実装にあります。結果として得られるGroupByKey
の前後の操作は、一般にParDo
と融合され、上記のように実行されます。
パイプラインの操作
ユーザーからパイプラインを受信すると、それを変換する必要があります。 Beamパイプラインの表現方法の説明は、こちらにあります。これは、公式のプロト宣言を補完するものです。
ランナーのテスト
Beam Java SDKとPython SDKには、ランナー検証テストスイートがあります。設定はこのドキュメントよりも早く進化する可能性があるため、他のBeamランナーの設定を確認してください。しかし、テストがあり、非常に簡単に使用できることに注意してください! Gradleを使用してJavaベースのランナーでこれらのテストを有効にするには、JUnitカテゴリValidatesRunner
を使用してSDKの依存関係でテストをスキャンします。
task validatesRunner(type: Test) {
group = "Verification"
description = "Validates the runner"
def pipelineOptions = JsonOutput.toJson(["--runner=MyRunner", ... misc test options ...])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
}
他の言語でこれらのテストを有効にすることは未調査です。
SDKとのランナーの適切な統合
ランナーがSDK(Javaなど)と同じ言語で記述されているかどうかに関わらず、他のSDK(Pythonなど)のユーザーが使用できるようにするには、そのSDKから呼び出すためのshimを提供する必要があります。
Java SDKとの統合
ユーザーがランナーにオプションを渡せるようにする
設定のメカニズムは、通常のJavaオブジェクトとは全く異なる動作をするインターフェースであるPipelineOptions
です。これまでの知識を忘れ、ルールに従えば、PipelineOptions
は正しく動作します。
以下のように、一致する名前のゲッターとセッターを持つランナーのサブインターフェースを実装する必要があります。
public interface MyRunnerOptions extends PipelineOptions {
@Description("The Foo to use with MyRunner")
@Required
public Foo getMyRequiredFoo();
public void setMyRequiredFoo(Foo newValue);
@Description("Enable Baz; on by default")
@Default.Boolean(true)
public Boolean isBazEnabled();
public void setBazEnabled(Boolean newValue);
}
デフォルトなどを設定できます。詳細はjavadocを参照してください。ランナーがPipelineOptions
オブジェクトでインスタンス化されると、options.as(MyRunnerOptions.class)
によってインターフェースにアクセスできます。
これらのオプションをコマンドラインで使用できるようにするには、PipelineOptionsRegistrar
にオプションを登録します。@AutoService
を使用すると簡単です。
コマンドラインで使用するためにSDKにランナーを登録する
ランナーをコマンドラインで使用できるようにするには、PipelineRunnerRegistrar
にオプションを登録します。@AutoService
を使用すると簡単です。
Python SDKとの統合
Python SDKでは、コードの登録は自動ではありません。そのため、新しいランナーを作成する際に留意すべき点がいくつかあります。
新しいランナーのパッケージへの依存関係はオプションにする必要があるため、setup.py
のextra_requires
に新しいランナーに必要な新しいターゲットを作成します。
すべてのランナーコードは、apache_beam/runners
ディレクトリにある独自のパッケージに配置する必要があります。
runner.py
のcreate_runner
関数に新しいランナーを登録して、部分名と使用する正しいクラスが一致するようにします。
Pythonランナーは、Beamリポジトリに存在するかどうかに関わらず、完全修飾名で識別することもできます(ランナーパラメータを渡す場合など)。
SDKに依存しないランナーの作成
ランナーをSDKに依存せず、他の言語で記述されたパイプラインを実行できるようにするには、Fn APIとRunner APIの2つの側面があります。
Fn API
設計ドキュメント
ユーザーのパイプラインを実行するには、ユーザーのUDFを呼び出すことができる必要があります。Fn APIは、gRPCを介したプロトコルバッファを使用して実装された、Beamの標準UDFのRPCインターフェースです。
Fn APIには以下が含まれます。
- UDFのサブグラフを登録するためのAPI
- バンドルの要素をストリーミングするためのAPI
- 共有データ形式(キーと値のペア、タイムスタンプ、反復可能オブジェクトなど)
ユーティリティコードのために、言語のSDKを*併用*することも、同じ言語のUDF用にバンドル処理の最適化された実装を提供することもできます。
Runner API
Runner APIは、パイプラインを起動し、ジョブのステータスを確認するためのRPCインターフェースとともに、パイプラインのSDKに依存しないスキーマです。Runner APIインターフェースのみを介してパイプラインを検査することにより、ランナーのパイプライン分析とジョブ変換における言語のSDKへの依存関係を排除します。
このようなSDKに依存しないパイプラインを実行するには、Fn APIをサポートする必要があります。UDFは、関数の仕様(多くの場合、特定の言語の不透明なシリアル化されたバイト)と、それを実行できる環境の仕様(基本的に特定のSDK)としてパイプラインに埋め込まれています。これまでのところ、この仕様は、SDKのFn APIハーネスをホストするDockerコンテナのURIであると予想されています。
言語のSDKを*併用*することもできます。SDKは便利なユーティリティコードを提供する場合があります。
パイプラインの言語に依存しない定義は、プロトコルバッファスキーマを介して記述されます。これは、以下で参照用に説明します。ただし、ランナーはprotobufメッセージを直接操作する*必要はありません*。代わりに、Beamコードベースはパイプラインを操作するためのユーティリティを提供するため、パイプラインがシリアル化または送信されたことがあるかどうか、またはどの言語で記述されたのかを認識する必要はありません。
Java
ランナーがJavaベースの場合、SDKに依存しない方法でパイプラインと対話するためのツールは、beam-runners-core-construction-java
アーティファクトのorg.apache.beam.sdk.util.construction
名前空間にあります。ユーティリティの名前は、以下のように一貫して付けられています。
PTransformTranslation
- 既知の変換と標準URNのレジストリParDoTranslation
- 言語に依存しない方法でParDo
を操作するためのユーティリティWindowIntoTranslation
-Window
の場合も同じFlattenTranslation
-Flatten
の場合も同じWindowingStrategyTranslation
- ウィンドウイング戦略の場合も同じCoderTranslation
- コーダーの場合も同じ- … などなど …
これらのクラスのみを介して変換を検査することにより、ランナーはJava SDKの詳細に依存しません。
Runner APIプロトコル
Runner APIは、プロトコルバッファスキーマとして、Beamモデルの概念の具体的な表現を指します。これらのメッセージを直接操作するべきではありませんが、パイプラインを構成する標準データを知ることは役立ちます。
APIのほとんどは、高レベルの説明とまったく同じです。すべての低レベルの詳細を理解していなくても、ランナーの実装を開始できます。
Runner APIの最も重要なポイントは、Beamパイプラインの言語に依存しない定義であるということです。特定のSDKのサポートコードを介して、これらの定義を適切な慣用APIでラップして対話することがほとんどですが、これは仕様であり、他のデータは必ずしもパイプラインに固有のものではなく、SDK固有の拡張機能(またはバグ!)である可能性があることに常に注意してください。
パイプライン内のUDFは、任意のBeam SDK用に記述できます。同じパイプライン内で複数のSDK用に記述することもできます。そのため、ここから始め、高レベルの、ほとんど明白なレコード定義に戻る前に、UDFのプロトコルバッファ定義を理解するためのボトムアップアプローチを採用します。
FunctionSpec
プロトコル
言語間の移植性の核心は、FunctionSpec
です。これは、副作用などを含む通常のプログラミングの意味での、言語に依存しない関数の仕様です。
FunctionSpec
には、関数を識別するURNと、任意の固定パラメータが含まれています。たとえば、(仮説の)「max」CombineFnには、URN beam:combinefn:max:0.1
と、最大値を取得するための比較方法を示すパラメータがある場合があります。
特定の言語のSDKを使用して構築されたパイプライン内のほとんどのUDFの場合、URNはSDKがそれを解釈する必要があることを示します。たとえば、beam:dofn:javasdk:0.1
またはbeam:dofn:pythonsdk:0.1
です。パラメータには、Javaでシリアル化されたDoFn
やPythonでpickle化されたDoFn
などのシリアル化されたコードが含まれます。
FunctionSpec
はUDFだけのものではありません。任意の関数を命名/指定するための一般的な方法です。また、PTransform
の仕様としても使用されます。ただし、PTransform
で使用される場合、PCollection
からPCollection
への関数を記述し、SDKに固有のものであることはできません。これは、ランナーが変換を評価し、PCollection
を生成する役割を担っているためです。
すべての環境がすべての関数仕様をデシリアライズできるわけではないことは言うまでもありません。このため、PTransform
には、含まれているURNを解釈できる少なくとも1つの環境を示すenvironment_id
パラメータがあります。これは、Pipelineプロトの環境マップ内の環境への参照であり、通常はdockerイメージ(場合によっては追加の依存関係を含む)によって定義されます。これを行うことができる他の環境もあるかもしれず、ランナーはこの知識があればそれらを使用できます。
プリミティブ変換ペイロードプロトコル
プリミティブ変換のペイロードは、仕様のプロトシリアル化にすぎません。ここで完全なコードを再現するのではなく、重要な部分を取り上げて、それらがどのように組み合わされているかを示します。
これらのペイロードと直接対話することはおそらくないでしょうが、変換に固有のデータはこれらだけであることをもう一度強調しておく価値があります。
ParDoPayload
プロトコル
ParDo
変換は、SdkFunctionSpec
にDoFn
を保持し、他の機能(サイド入力、状態宣言、タイマー宣言など)の言語に依存しない仕様を提供します。
CombinePayload
プロトコル
Combine
はプリミティブではありません。ただし、非プリミティブは、最適化を改善するための追加情報を完全に保持できます。Combine
変換が保持する最も重要なものは、SdkFunctionSpec
レコード内のCombineFn
です。目的の最適化を効果的に実行するには、中間累積のコーダーを知る必要もあるため、このコーダーへの参照も保持します。
PTransform
プロトコル
PTransform
は、PCollection
からPCollection
への関数です。これは、プロトではFunctionSpecを使用して表されます。
PTransform
が複合である場合は、サブ変換を持つ場合があります。この場合、サブ変換がその動作を定義するため、FunctionSpec
は省略できます。
入力と出力のPCollection
は順序付けされておらず、ローカル名で参照されます。SDKは、シリアル化されたUDFに埋め込まれる可能性が高いため、この名前を決定します。
FunctionSpec
で定義されている、特定のPTransform
(プリミティブか複合かを問わず)の仕様を理解するランナーは、セマンティクスが同一である別のPTransform
(またはそのセット)に置き換えることができます。これは通常、CombinePerKey
の処理方法ですが、他にも多くの置換を行うことができます。
PCollection
プロトコル
PCollection
は、コーダー、ウィンドウイング戦略、および境界があるかどうかのみを格納します。
Coder
プロトコル
これは非常に興味深いプロトです。コーダーは、特定のSDKによってのみ理解される可能性のあるパラメータ化された関数であるため、FunctionSpec
ですが、それを完全に定義するコンポーネントコーダーも持つ場合があります。たとえば、ListCoder
はメタフォーマットにすぎませんが、ListCoder(VarIntCoder)
は完全に指定されたフォーマットです。
ほとんどのSDK、そうでないとしてもすべてのSDKで理解される標準コーダーが多数あります。これらを使用すると、言語に依存しない変換が可能になります。
Jobs API RPC
言語のSDKはRunner APIプロトを直接操作することから保護しますが、ランナーを別の言語に公開するために、ランナーのアダプターを実装する必要がある場合があります。これにより、Python SDKはJavaランナーを呼び出すことができ、その逆も可能です。この典型的な実装は、local_job_service.pyにあります。これは、いくつかのPython実装のランナーを直接フロントエンドするために使用されます。
RPC自体は、必然的にPipelineRunnerとPipelineResultの既存のAPIに従いますが、豊富で便利なAPIではなく、最小限のバックエンドチャネルに変更されます。
この重要な部分は、Artifacts APIです。これにより、Runnerは、さまざまな環境で依存関係としてリストされており、さまざまな表現を持つ可能性のあるバイナリ成果物(jar、pypiパッケージなど)を取得してデプロイできます。これは、パイプラインが送信された後、実行される前に呼び出されます。パイプラインを送信するSDKは、リクエストを受信するランナーの成果物サーバーとして機能し、次にランナーは、ユーザーのUDFをホストするワーカー(環境)の成果物サーバーとして機能します。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!