ランナー作成ガイド

このガイドでは、新しいランナーの実装方法について説明します。データ処理システムがあり、それを使用してBeamパイプラインを実行したい人を対象としています。ガイドは、今後の作業を評価するのに役立つ基本から始まります。その後、セクションはますます詳細になり、ランナーの開発全体を通してリソースとなります。

対象となるトピック

Beamプリミティブの実装

データのエンコードと永続化(おそらくあなたのエンジンはすでに何らかの方法で行っている)は別として、あなたがする必要があることのほとんどは、Beamプリミティブを実装することです。このセクションでは、各プリミティブについて詳しく説明し、明らかではないかもしれない知っておくべきことと、提供されているサポートコードについて説明します。

プリミティブは、ランナーの作成者ではなく、パイプラインの作成者のために設計されています。それぞれは、特定の実装の決定ではなく、異なる概念的な操作モード(外部IO、要素単位、グループ化、ウィンドウ化、ユニオン)を表します。同じプリミティブでも、ユーザーがどのようにインスタンス化するかによって、まったく異なる実装が必要になる場合があります。たとえば、状態またはタイマーを使用するParDoではキーのパーティション分割が必要になる場合があり、投機的トリガーを使用するGroupByKeyでは、よりコストがかかり複雑な実装が必要になる場合があります。

これらの機能の一部を実装していない場合はどうなりますか?

それで問題ありません!すべてを一度に行う必要はなく、ランナーがサポートする意味のない機能さえあるかもしれません。Beamサイトでは機能マトリックスを管理しているので、ユーザーに何をサポートしているかを伝えることができます。Pipelineを受け取ったら、それをトラバースして、見つかった各DoFnを実行できるかどうかを判断する必要があります。パイプライン内の一部のDoFnを実行できない場合(またはランナーに不足している他の要件がある場合)、パイプラインを拒否する必要があります。ネイティブ環境では、これはUnsupportedOperationExceptionをスローするのと同じように見える場合があります。Runner API RPCは、言語間の移植性のために、これを明示的にします。

Impulseプリミティブの実装

Impulseは、入力を取らず、パイプラインのライフタイム中に正確に1つの出力を生成するPTransformです。これは、最小タイムスタンプを持つグローバルウィンドウ内の空のバイトである必要があります。標準のウィンドウ値コーダーでエンコードすると、エンコード値は7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00になります。

Impulseは一般的にユーザーによって呼び出されませんが、唯一のルートプリミティブ操作であり、他のルート操作(ReadCreateなど)は、Impulseとそれに続く一連の(場合によっては分割可能な)ParDoで構成される複合操作です。

ParDoプリミティブの実装

ParDoプリミティブは、PCollectionの要素ごとの変換を記述します。ParDoは、要素ごとの処理が記述されている場所であるため、最も複雑なプリミティブです。関数型プログラミングの標準的なmapflatMapなどの非常に単純な操作に加えて、ParDoは複数の出力、サイド入力、初期化、フラッシュ、ティアダウン、およびステートフル処理もサポートしています。

各要素に適用されるUDFは、DoFnと呼ばれます。DoFnの正確なAPIは言語/ SDKによって異なる場合がありますが、一般的には同じパターンに従うため、擬似コードで説明できます。また、現在および将来のランナーのほとんどはJavaベースであるため、Javaサポートコードをよく参照します。

一般に、入力データセット全体に一連のParDoを一度に1つずつ適用するのではなく、一連のマッピング操作(一般的にはDAG)で構成される単一の実行可能ステージで複数のParDoを融合する方が効率的です。ParDoに加えて、ウィンドウ操作、ローカル(GBK前またはGBK後)の結合操作、および他のマッピング操作もこれらのステージに融合される場合があります。

DoFnはランナー自体とは異なる言語でコードを実行したり、異なる環境を必要とする場合があるため、Beamはこれらをプロセス間で呼び出す機能を提供します。これは、Beam Fn APIの核心であり、詳細は以下で説明します。ただし、環境に互換性がある場合、ランナーがプロセス内でこのユーザーコードを呼び出す(簡潔さまたは効率のために)ことは完全に許容されます。

バンドル

正確性を期すために、DoFnは要素ごとの関数を表す*べき*ですが、ほとんどのSDKでは、これはバンドルと呼ばれる小さなグループで要素を処理する長寿命のオブジェクトです。

ランナーは、バンドルに含める要素の数とどの要素を含めるかを決定し、処理の途中で現在のバンドルが「終了」したことを動的に決定することさえできます。バンドルの処理方法は、DoFnの残りのライフサイクルと結びついています。

一般に、初期化と終了のコストが多くの要素に償却されるため、可能な限り最大のバンドルを作成することでスループットが向上します。ただし、データがストリームとして到着する場合、適切なレイテンシを実現するためにバンドルを終了する必要があるため、バンドルはわずか数個の要素になる場合があります。

バンドルは、Beamにおけるコミットメントの単位です。バンドルの処理中にエラーが発生した場合、そのバンドルの以前のすべての出力(状態またはタイマーへの変更を含む)はランナーによって破棄され、バンドル全体が再試行される必要があります。バンドルが正常に完了すると、その出力は、状態/タイマーの変更とウォーターマークの更新とともに、アトミックにコミットされる必要があります。

DoFnライフサイクル

多くのSDKのDoFnsには、標準の要素ごとのprocess呼び出しに加えて、setupstart_bundlefinish_bundleteardownなどのメソッドがいくつかあります。このライフサイクルの適切な呼び出しは、標準バンドルプロセッサから1つ以上のDoFnを呼び出すときに(FnAPIを介して、またはBundleProcessor(javapython))を使用して直接)自動的に処理される必要があります。SDKに依存しないランナーは、これらの詳細を直接気にする必要はありません。

サイド入力

主な設計ドキュメント:https://s.apache.org/beam-side-inputs-1-pager

サイド入力は、PCollectionのウィンドウのグローバルビューです。これは、一度に1つの要素が処理されるメイン入力とは異なります。SDK/ユーザーはPCollectionを適切に準備し、ランナーはそれを具体化し、ランナーはそれをDoFnにフィードします。

ランナーによってParDoにプッシュされるメイン入力データ(一般的にはFnApiデータチャネルを介して)とは異なり、サイド入力データはParDoによってランナーからプルされます(一般的にはFnAPI状態チャネルを介して)。

サイド入力には、特定のaccess_patternを介してアクセスします。現在、StandardSideInputTypesプロトコルで列挙されているアクセス パターンは2つあります。beam:side_input:iterable:v1は、ランナーが特定のウィンドウに対応するPCollectionのすべての値を返す必要があることを示し、beam:side_input:multimap:v1は、ランナーが特定のキーとウィンドウに対応するすべての値を返す必要があることを示します。これらのアクセス パターンを効率的に提供できるかどうかは、ランナーがこのPCollectionをどのように具体化するか influenced how a runner materializes this PCollection.

SideInputは、ParDo変換のParDoPayloadにあるside_inputsマップを調べることで検出できます。 ParDo操作自体は、window_mapping_fn(ランナーを呼び出す前)とview_fn(ランナーから返された値に対して)を呼び出す役割を担っているため、ランナーはこれらのフィールドを気にする必要はありません。

サイド入力が必要なのに、特定のウィンドウに対応するデータがサイド入力にない場合、そのウィンドウの要素は、サイド入力にデータが入るか、ウォーターマークが十分に進み、そのウィンドウにデータがないことが確実になるまで遅延させる必要があります。 PushBackSideInputDoFnRunner は、これを実装した例です。

状態とタイマー

主要な設計ドキュメント: https://s.apache.org/beam-state

ParDoに状態とタイマーが含まれている場合、ランナーでの実行は通常大きく異なります。具体的には、バンドルが完了したときに状態を永続化し、将来のバンドルのために取得する必要があります。設定されたタイマーは、ウォーターマークが十分に進んだときに、将来のバンドルに挿入する必要もあります。

状態とタイマーは、キーとウィンドウごとに分割されます。つまり、特定のキーを処理するDoFnは、このキーを共有するすべての要素にわたって、状態とタイマーの一貫したビューを持つ必要があります。これをサポートするために、明示的にデータをシャッフルする必要がある場合や、そうした方が良い場合があります。ウォーターマークがウィンドウの終わり(および許可されている遅延時間がある場合はそれを加えた時間)を過ぎると、このウィンドウに関連付けられた状態は破棄できます。

状態の設定と取得はFnAPI Stateチャネルで実行され、タイマーの設定と起動はFnAPI Dataチャネルで実行されます。

分割可能なDoFn

主要な設計ドキュメント: https://s.apache.org/splittable-do-fn

分割可能なDoFnは、並列実行できる高ファンアウトマッピングに役立つParDoの一般化です。このような操作の典型的な例は、ファイルからの読み取りです。単一のファイル名(入力要素として)を、そのファイルに含まれるすべての要素にマッピングできます。 DoFnは、たとえば単一のファイルを表現する要素を、異なるワーカーによって処理(たとえば、読み取り)されるように分割(たとえば、そのファイルの範囲に分割)できるという意味で、分割可能と見なされます。このプリミティブの真価は、これらの分割が静的に(つまり事前に)行われるだけでなく、動的に行われることで、過剰分割または分割不足の問題を回避できることにあります。

分割可能なDoFnの完全な説明はこのドキュメントの範囲外ですが、その実行に関連する簡単な概要を以下に示します。

分割可能なDoFnは、要素内と要素間の両方で分割することにより、動的分割プロトコルに参加できます。動的分割は、ランナーが制御チャネルでProcessBundleSplitRequestメッセージを発行することによってトリガーされます。 SDKは、指定された要素の一部のみを処理することをコミットし、残りの部分(つまり、未処理の部分)の説明をProcessBundleSplitResponseでランナーに返して、ランナーによってスケジュールされるようにします(たとえば、別のワーカーで、または別のバンドルの一部として)。

分割可能なDoFnは、独自の分割を開始することもできます。これは、要素を可能な限り処理したが(たとえば、ファイルの末尾を読み取っている場合)、まだ処理すべきものが残っていることを示します。これらは、非有界ソースを読み取るときに最も頻繁に発生します。この場合、延期された作業を表す要素のセットは、ProcessBundleResponseresidual_rootsフィールドに返されます。ランナーは、将来のある時点で、residual_rootsで指定された要素を使用して、これらの同じ操作を再起動する必要があります。

GroupByKey(およびウィンドウ)プリミティブの実装

GroupByKey操作(略してGBKと呼ばれることもあります)は、キーとウィンドウごとにキーと値のペアのPCollectionをグループ化し、PCollectionのトリガー設定に従って結果を出力します。

これは、同じキーを持つ要素を単に同じ場所に配置するよりもはるかに複雑であり、PCollectionのウィンドウ戦略の多くのフィールドを使用します。

エンコードされたバイトによるグループ化

キーとウィンドウの両方について、ランナーはそれらを「単なるバイト」として認識します。そのため、関係する型の特別な知識がある場合でも、それらのバイトによるグループ化と一致する方法でグループ化する必要があります。

処理する要素はキーと値のペアであり、キーを抽出する必要があります。このため、キーと値のペアの形式は、すべてのSDKで標準化および共有されています。バイナリ形式のドキュメントについては、JavaのKvCoderまたはPythonのTupleCoderを参照してください。

ウィンドウのマージ

ランナーは、キー別にグループ化するだけでなく、ウィンドウ別に要素をグループ化する必要があります。 WindowFnには、キーごとにウィンドウをマージすることを宣言するオプションがあります。たとえば、同じキーのセッションウィンドウは、重複する場合にマージされます。そのため、ランナーはグループ化中にWindowFnのマージメソッドを呼び出す必要があります。

GroupByKeyOnly + GroupAlsoByWindowによる実装

JavaおよびPythonのコードベースには、完全なGroupByKey操作を実装するための特に一般的な方法のサポートコードが含まれています。最初にキーをグループ化し、次にウィンドウ別にグループ化します。マージウィンドウの場合、マージはキーごとに行われるため、これは基本的に必須です。

多くの場合、タイムスタンプ順に値のセットを提示することで、これらの値を最終的なウィンドウにグループ化することをより効率的に行うことができます。

遅延データの破棄

主要な設計ドキュメント: https://s.apache.org/beam-lateness

入力PCollectionのウォーターマークが、ウィンドウの終わりを超えて、入力PCollectionの許容遅延時間以上になった場合、PCollectionのウィンドウは期限切れになります。

期限切れのウィンドウのデータはいつでも破棄でき、GroupByKeyで破棄する必要があります。 GroupAlsoByWindowを使用している場合は、この変換を実行する直前に破棄します。 GroupByKeyOnlyの前にデータを破棄すると、シャッフルされるデータが少なくなりますが、期限切れに見えるウィンドウがマージされて期限切れでなくなる可能性があるため、マージしないウィンドウの場合にのみ安全に行う必要があります。

トリガー

主要な設計ドキュメント: https://s.apache.org/beam-triggers

入力PCollectionのトリガーと累積モードは、GroupByKey操作から出力をいつどのように出力するかを指定します。

Javaでは、GroupAlsoByWindowの実装、ReduceFnRunner(レガシー名)、およびTriggerStateMachineに、トリガーを実行するための多くのサポートコードがあります。 TriggerStateMachineは、すべてのトリガーを要素とタイマーに対するイベント駆動型マシンとして実装する明白な方法です。 Pythonでは、これはTriggerDriverクラスによってサポートされています。

TimestampCombiner

集約された出力が複数の入力から生成される場合、GroupByKey操作は、組み合わせのタイムスタンプを選択する必要があります。そのため、最初にWindowFnはタイムスタンプをシフトする機会があります。これは、ウォーターマークがスライドウィンドウなどのウィンドウの進行を妨げないようにするために必要です(詳細は、このドキュメントの範囲外です)。次に、シフトされたタイムスタンプを組み合わせる必要があります。これはTimestampCombinerによって指定されます。 TimestampCombinerは、入力の最小値または最大値を選択するか、入力を無視してウィンドウの終わりを選択することができます。

Windowプリミティブの実装

ウィンドウプリミティブは、WindowFn UDFを適用して、各入力要素を出力PCollectionの1つ以上のウィンドウに配置します。プリミティブは、一般にPCollectionのウィンドウ戦略の他の側面も構成しますが、ランナーが受信する完全に構築されたグラフには、各PCollectionの完全なウィンドウ戦略が既に含まれています。

このプリミティブを実装するには、提供されたWindowFnを各要素で呼び出す必要があります。これにより、その要素が出力PCollectionの一部となるウィンドウのセットが返されます。

ほとんどのランナーは、これらのウィンドウ変更マッピングをDoFnsと融合することによってこれを実装します。

実装に関する考慮事項

「ウィンドウ」は、「最大タイムスタンプ」を持つ2番目のグループ化キーにすぎません。任意のユーザー定義型にすることができます。 WindowFnは、ウィンドウ型のコーダーを提供します。

Beamのサポートコードは、複数のウィンドウの要素の圧縮表現であるWindowedValueを提供します。これを使用するか、独自の圧縮表現を使用することができます。それは単に複数の要素を同時に表していることを忘れないでください。要素が「複数のウィンドウにある」ということはありません。

グローバルウィンドウの値については、ウィンドウをまったく含まない、さらに圧縮された表現を使用することをお勧めします。

シリアル化されたデータのサイズを削減するために使用できるPARAM_WINDOWED_VALUEなどの最適化を備えたコーダーを提供しています。

将来的には、このプリミティブは、ParDoの機能が新しいウィンドウへの出力を許可するように拡張された場合、ParDoとして実装できるため、廃止される可能性があります。

Flattenプリミティブの実装

これは簡単です。有限のPCollectionセットを入力として受け取り、ウィンドウをそのままにして、それらのバッグユニオンを出力します。

この操作を意味のあるものにするために、ウィンドウ戦略に互換性があることを確認するのはSDKの責任です。

また、すべてのPCollectionのコーダーが同じである必要はないことに注意してください。ランナーがそれを要求する場合(面倒な再エンコードを避けるため)、自分でそれを強制する必要があります。または、高速パスを最適化として実装することもできます。

特記事項:Combineコンポジット

ランナーによってほぼ常に特別に扱われる複合変換は、CombinePerKeyです。これは、PCollectionの要素に結合的で可換な演算子を適用します。この複合はプリミティブではありません。 ParDoGroupByKeyで実装されているため、ランナーはそれを処理せずに動作しますが、最適化のために使用したい追加情報、つまりCombineFnと呼ばれる結合的-可換演算子が含まれています。

一般に、ランナーは、いわゆるコンバイナーリフティングによってこれを実装することを望みます。コンバイナーリフティングでは、GroupByKeyの前に新しい操作が配置され、部分的な(バンドル内)結合が行われます。これには、多くの場合、GroupByKeyの後にあるものもわずかに変更する必要があります。この変換の例は、この最適化のPythonまたはGo実装にあります。結果として得られるGroupByKeyの前後の操作は、一般にParDoと融合され、上記のように実行されます。

パイプラインの操作

ユーザーからパイプラインを受信すると、それを変換する必要があります。 Beamパイプラインの表現方法の説明は、こちらにあります。これは、公式のプロト宣言を補完するものです。

ランナーのテスト

Beam Java SDKとPython SDKには、ランナー検証テストスイートがあります。設定はこのドキュメントよりも早く進化する可能性があるため、他のBeamランナーの設定を確認してください。しかし、テストがあり、非常に簡単に使用できることに注意してください! Gradleを使用してJavaベースのランナーでこれらのテストを有効にするには、JUnitカテゴリValidatesRunnerを使用してSDKの依存関係でテストをスキャンします。

task validatesRunner(type: Test) {
  group = "Verification"
  description = "Validates the runner"
  def pipelineOptions = JsonOutput.toJson(["--runner=MyRunner", ... misc test options ...])
  systemProperty "beamTestPipelineOptions", pipelineOptions
  classpath = configurations.validatesRunner
  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
  useJUnit {
    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
  }
}

他の言語でこれらのテストを有効にすることは未調査です。

SDKとのランナーの適切な統合

ランナーがSDK(Javaなど)と同じ言語で記述されているかどうかに関わらず、他のSDK(Pythonなど)のユーザーが使用できるようにするには、そのSDKから呼び出すためのshimを提供する必要があります。

Java SDKとの統合

ユーザーがランナーにオプションを渡せるようにする

設定のメカニズムは、通常のJavaオブジェクトとは全く異なる動作をするインターフェースであるPipelineOptionsです。これまでの知識を忘れ、ルールに従えば、PipelineOptionsは正しく動作します。

以下のように、一致する名前のゲッターとセッターを持つランナーのサブインターフェースを実装する必要があります。

public interface MyRunnerOptions extends PipelineOptions {
  @Description("The Foo to use with MyRunner")
  @Required
  public Foo getMyRequiredFoo();
  public void setMyRequiredFoo(Foo newValue);

  @Description("Enable Baz; on by default")
  @Default.Boolean(true)
  public Boolean isBazEnabled();
  public void setBazEnabled(Boolean newValue);
}

デフォルトなどを設定できます。詳細はjavadocを参照してください。ランナーがPipelineOptionsオブジェクトでインスタンス化されると、options.as(MyRunnerOptions.class)によってインターフェースにアクセスできます。

これらのオプションをコマンドラインで使用できるようにするには、PipelineOptionsRegistrarにオプションを登録します。@AutoServiceを使用すると簡単です。

@AutoService(PipelineOptionsRegistrar.class)
public static class MyOptionsRegistrar implements PipelineOptionsRegistrar {
  @Override
  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
    return ImmutableList.<Class<? extends PipelineOptions>>of(MyRunnerOptions.class);
  }
}

コマンドラインで使用するためにSDKにランナーを登録する

ランナーをコマンドラインで使用できるようにするには、PipelineRunnerRegistrarにオプションを登録します。@AutoServiceを使用すると簡単です。

@AutoService(PipelineRunnerRegistrar.class)
public static class MyRunnerRegistrar implements PipelineRunnerRegistrar {
  @Override
  public Iterable<Class<? extends PipelineRunner>> getPipelineRunners() {
    return ImmutableList.<Class<? extends PipelineRunner>>of(MyRunner.class);
  }
}

Python SDKとの統合

Python SDKでは、コードの登録は自動ではありません。そのため、新しいランナーを作成する際に留意すべき点がいくつかあります。

新しいランナーのパッケージへの依存関係はオプションにする必要があるため、setup.pyextra_requiresに新しいランナーに必要な新しいターゲットを作成します。

すべてのランナーコードは、apache_beam/runnersディレクトリにある独自のパッケージに配置する必要があります。

runner.pycreate_runner関数に新しいランナーを登録して、部分名と使用する正しいクラスが一致するようにします。

Pythonランナーは、Beamリポジトリに存在するかどうかに関わらず、完全修飾名で識別することもできます(ランナーパラメータを渡す場合など)。

SDKに依存しないランナーの作成

ランナーをSDKに依存せず、他の言語で記述されたパイプラインを実行できるようにするには、Fn APIとRunner APIの2つの側面があります。

Fn API

設計ドキュメント

ユーザーのパイプラインを実行するには、ユーザーのUDFを呼び出すことができる必要があります。Fn APIは、gRPCを介したプロトコルバッファを使用して実装された、Beamの標準UDFのRPCインターフェースです。

Fn APIには以下が含まれます。

ユーティリティコードのために、言語のSDKを*併用*することも、同じ言語のUDF用にバンドル処理の最適化された実装を提供することもできます。

Runner API

Runner APIは、パイプラインを起動し、ジョブのステータスを確認するためのRPCインターフェースとともに、パイプラインのSDKに依存しないスキーマです。Runner APIインターフェースのみを介してパイプラインを検査することにより、ランナーのパイプライン分析とジョブ変換における言語のSDKへの依存関係を排除します。

このようなSDKに依存しないパイプラインを実行するには、Fn APIをサポートする必要があります。UDFは、関数の仕様(多くの場合、特定の言語の不透明なシリアル化されたバイト)と、それを実行できる環境の仕様(基本的に特定のSDK)としてパイプラインに埋め込まれています。これまでのところ、この仕様は、SDKのFn APIハーネスをホストするDockerコンテナのURIであると予想されています。

言語のSDKを*併用*することもできます。SDKは便利なユーティリティコードを提供する場合があります。

パイプラインの言語に依存しない定義は、プロトコルバッファスキーマを介して記述されます。これは、以下で参照用に説明します。ただし、ランナーはprotobufメッセージを直接操作する*必要はありません*。代わりに、Beamコードベースはパイプラインを操作するためのユーティリティを提供するため、パイプラインがシリアル化または送信されたことがあるかどうか、またはどの言語で記述されたのかを認識する必要はありません。

Java

ランナーがJavaベースの場合、SDKに依存しない方法でパイプラインと対話するためのツールは、beam-runners-core-construction-javaアーティファクトのorg.apache.beam.sdk.util.construction名前空間にあります。ユーティリティの名前は、以下のように一貫して付けられています。

これらのクラスのみを介して変換を検査することにより、ランナーはJava SDKの詳細に依存しません。

Runner APIプロトコル

Runner APIは、プロトコルバッファスキーマとして、Beamモデルの概念の具体的な表現を指します。これらのメッセージを直接操作するべきではありませんが、パイプラインを構成する標準データを知ることは役立ちます。

APIのほとんどは、高レベルの説明とまったく同じです。すべての低レベルの詳細を理解していなくても、ランナーの実装を開始できます。

Runner APIの最も重要なポイントは、Beamパイプラインの言語に依存しない定義であるということです。特定のSDKのサポートコードを介して、これらの定義を適切な慣用APIでラップして対話することがほとんどですが、これは仕様であり、他のデータは必ずしもパイプラインに固有のものではなく、SDK固有の拡張機能(またはバグ!)である可能性があることに常に注意してください。

パイプライン内のUDFは、任意のBeam SDK用に記述できます。同じパイプライン内で複数のSDK用に記述することもできます。そのため、ここから始め、高レベルの、ほとんど明白なレコード定義に戻る前に、UDFのプロトコルバッファ定義を理解するためのボトムアップアプローチを採用します。

FunctionSpecプロトコル

言語間の移植性の核心は、FunctionSpecです。これは、副作用などを含む通常のプログラミングの意味での、言語に依存しない関数の仕様です。

message FunctionSpec {
  string urn;
  bytes payload;
}

FunctionSpecには、関数を識別するURNと、任意の固定パラメータが含まれています。たとえば、(仮説の)「max」CombineFnには、URN beam:combinefn:max:0.1と、最大値を取得するための比較方法を示すパラメータがある場合があります。

特定の言語のSDKを使用して構築されたパイプライン内のほとんどのUDFの場合、URNはSDKがそれを解釈する必要があることを示します。たとえば、beam:dofn:javasdk:0.1またはbeam:dofn:pythonsdk:0.1です。パラメータには、Javaでシリアル化されたDoFnやPythonでpickle化されたDoFnなどのシリアル化されたコードが含まれます。

FunctionSpecはUDFだけのものではありません。任意の関数を命名/指定するための一般的な方法です。また、PTransformの仕様としても使用されます。ただし、PTransformで使用される場合、PCollectionからPCollectionへの関数を記述し、SDKに固有のものであることはできません。これは、ランナーが変換を評価し、PCollectionを生成する役割を担っているためです。

すべての環境がすべての関数仕様をデシリアライズできるわけではないことは言うまでもありません。このため、PTransformには、含まれているURNを解釈できる少なくとも1つの環境を示すenvironment_idパラメータがあります。これは、Pipelineプロトの環境マップ内の環境への参照であり、通常はdockerイメージ(場合によっては追加の依存関係を含む)によって定義されます。これを行うことができる他の環境もあるかもしれず、ランナーはこの知識があればそれらを使用できます。

プリミティブ変換ペイロードプロトコル

プリミティブ変換のペイロードは、仕様のプロトシリアル化にすぎません。ここで完全なコードを再現するのではなく、重要な部分を取り上げて、それらがどのように組み合わされているかを示します。

これらのペイロードと直接対話することはおそらくないでしょうが、変換に固有のデータはこれらだけであることをもう一度強調しておく価値があります。

ParDoPayloadプロトコル

ParDo変換は、SdkFunctionSpecDoFnを保持し、他の機能(サイド入力、状態宣言、タイマー宣言など)の言語に依存しない仕様を提供します。

message ParDoPayload {
  FunctionSpec do_fn;
  map<string, SideInput> side_inputs;
  map<string, StateSpec> state_specs;
  map<string, TimerSpec> timer_specs;
  ...
}

CombinePayloadプロトコル

Combineはプリミティブではありません。ただし、非プリミティブは、最適化を改善するための追加情報を完全に保持できます。Combine変換が保持する最も重要なものは、SdkFunctionSpecレコード内のCombineFnです。目的の最適化を効果的に実行するには、中間累積のコーダーを知る必要もあるため、このコーダーへの参照も保持します。

message CombinePayload {
  FunctionSpec combine_fn;
  string accumulator_coder_id;
  ...
}

PTransformプロトコル

PTransformは、PCollectionからPCollectionへの関数です。これは、プロトではFunctionSpecを使用して表されます。

message PTransform {
  FunctionSpec spec;
  repeated string subtransforms;

  // Maps from local string names to PCollection ids
  map<string, bytes> inputs;
  map<string, bytes> outputs;
  ...
}

PTransformが複合である場合は、サブ変換を持つ場合があります。この場合、サブ変換がその動作を定義するため、FunctionSpecは省略できます。

入力と出力のPCollectionは順序付けされておらず、ローカル名で参照されます。SDKは、シリアル化されたUDFに埋め込まれる可能性が高いため、この名前を決定します。

FunctionSpecで定義されている、特定のPTransform(プリミティブか複合かを問わず)の仕様を理解するランナーは、セマンティクスが同一である別のPTransform(またはそのセット)に置き換えることができます。これは通常、CombinePerKeyの処理方法ですが、他にも多くの置換を行うことができます。

PCollectionプロトコル

PCollectionは、コーダー、ウィンドウイング戦略、および境界があるかどうかのみを格納します。

message PCollection {
  string coder_id;
  IsBounded is_bounded;
  string windowing_strategy_id;
  ...
}

Coderプロトコル

これは非常に興味深いプロトです。コーダーは、特定のSDKによってのみ理解される可能性のあるパラメータ化された関数であるため、FunctionSpecですが、それを完全に定義するコンポーネントコーダーも持つ場合があります。たとえば、ListCoderはメタフォーマットにすぎませんが、ListCoder(VarIntCoder)は完全に指定されたフォーマットです。

message Coder {
  FunctionSpec spec;
  repeated string component_coder_ids;
}

ほとんどのSDK、そうでないとしてもすべてのSDKで理解される標準コーダーが多数あります。これらを使用すると、言語に依存しない変換が可能になります。

Jobs API RPC

概要 仕様

言語のSDKはRunner APIプロトを直接操作することから保護しますが、ランナーを別の言語に公開するために、ランナーのアダプターを実装する必要がある場合があります。これにより、Python SDKはJavaランナーを呼び出すことができ、その逆も可能です。この典型的な実装は、local_job_service.pyにあります。これは、いくつかのPython実装のランナーを直接フロントエンドするために使用されます。

RPC自体は、必然的にPipelineRunnerとPipelineResultの既存のAPIに従いますが、豊富で便利なAPIではなく、最小限のバックエンドチャネルに変更されます。

この重要な部分は、Artifacts APIです。これにより、Runnerは、さまざまな環境で依存関係としてリストされており、さまざまな表現を持つ可能性のあるバイナリ成果物(jar、pypiパッケージなど)を取得してデプロイできます。これは、パイプラインが送信された後、実行される前に呼び出されます。パイプラインを送信するSDKは、リクエストを受信するランナーの成果物サーバーとして機能し、次にランナーは、ユーザーのUDFをホストするワーカー(環境)の成果物サーバーとして機能します。