PTransform スタイルガイド
新しい再利用可能なPTransformを作成する際のスタイルガイド。
言語に依存しない考慮事項
一貫性
既存の慣習と整合性を保つ
- コントリビューションガイドをお読みください。
- 既に同様の変換が何らかのSDKに存在する場合は、ユーザーの経験が他のものに移行するように、変換のAPIを似せてください。これは、同じ言語のSDKと異なる言語のSDKの変換に適用されます。 *例外*: このガイドが承認される前に開発されたという唯一の理由で、現在のスタイルガイドに明らかに違反している既存の変換。この場合、スタイルガイドは既存の変換との一貫性よりも優先されます。
- 同様の変換が存在しない場合は、選択した言語(例:JavaまたはPython)の慣習に従ってください。
- 新しい変換を提案する場合は、可能な限りこのデザインドキュメントテンプレートを使用してください。
PTransformの公開 vs その他
Beamパイプラインで人々が使用するライブラリ(サードパーティシステムへのコネクタ、機械学習アルゴリズムなど)を開発したいと考えています。どのように公開する必要がありますか?
実施事項
- ライブラリによって実行される主要なデータ並列タスクをすべて複合 `PTransform` として公開します。これにより、変換の構造をそれを使用するコードに対して透過的に進化させることができます。たとえば、 `ParDo` として開始したものが、時間とともにより複雑な変換になる可能性があります。
- 変換のコードの大きく、重要で、再利用可能なシーケンシャルな部分を公開します。これは、予期しない方法で再利用したいと考えている可能性のある他のユーザー向けです。通常の関数またはクラスライブラリとして公開します。変換は単にこのロジックを接続する必要があります。副次的利点として、これらの関数とクラスを個別に単体テストできます。 *例*: カスタムデータ形式のファイルを解析する変換を開発する場合、形式パーサーをライブラリとして公開します。同様に、複雑な機械学習アルゴリズムを実装する変換など。
- 場合によっては、 `CombineFn` などのBeam固有のクラス、または重要な `DoFn`(単一の `@ProcessElement` メソッド以上のもの)が含まれる場合があります。経験則として:完全なパッケージ化された `PTransform` がユーザーのニーズには不十分であり、ユーザーがより低レベルのプリミティブを再利用したいと考えている可能性があると予想される場合は、これらを公開します。
実施しないこと
- 変換が内部的にどのように構造化されているかを正確に公開しないでください。例:ライブラリの公開APIサーフェスは、通常(上記の最後の箇条書きを除く)、 `DoFn` 、具体的な `Source` または `Sink` クラスなどを公開しないようにする必要があります。ユーザーに `PTransform` の適用と `DoFn` / `Source` / `Sink` の使用という混乱を招く選択肢を与えないためです。
命名
実施事項
- 言語固有の命名規則を尊重します。たとえば、JavaとPythonではクラス名を `PascalCase` で、Javaでは関数を `camelCase` で、Pythonでは `snake_case` で命名するなど。
- ファクトリ関数の名前を、関数名が動詞であるか、変換を参照すると動詞のように読めるようにします。例: `MongoDbIO.read()` 、 `Flatten.iterables()` 。
- 型付き言語では、 `PTransform` クラスにも動詞のような名前を付けます(例: `MongoDbIO.Read` 、 `Flatten.Iterables`)。
- ストレージシステムとの対話のための変換群の名前を「IO」という単語を使用して付けます。 `MongoDbIO` 、 `JdbcIO` 。
実施しないこと
- `PTransform` クラス名に「transform」、「source」、「sink」、「reader」、「writer」、「bound」、「unbound」という単語を使用しないでください(注: `bounded` と `unbounded` は、 `PCollection` がバウンドされているかアンバウンドされているかを指す場合は問題ありません)。これらの単語は冗長で、混乱を招き、時代遅れであるか、SDKで異なる既存の概念を指しています。
設定
設定と入力コレクションの区別
- **入力 `PCollection` へ:**インスタンス数が非常に多くなる可能性のあるもの(1000を超える可能性がある場合は、 `PCollection` に入れる必要があります)、またはパイプラインの構築時に不明な可能性のあるもの。例:処理またはサードパーティシステムに書き込むレコード。読み取るファイル名。例外:Beam APIでは、パイプラインの構築時に物事がわかっている必要がある場合があります。たとえば、 `Bounded` / `UnboundedSource` API。このようなAPIを絶対に使用しなければならない場合は、その入力は当然、変換設定にのみ入る可能性があります。
- **変換設定へ:**変換全体( `ValueProvider` を含む)で一定であり、変換の入力 `PCollection` の内容に依存しないもの。例:データベースクエリまたは接続文字列。資格情報。ユーザー指定のコールバック。調整パラメータ。パラメータを変換設定に入れることの利点の1つは、パイプラインの構築時に検証できることです。
公開するパラメータ
実施事項
- 出力を計算するために必要なパラメータを **公開** します。
実施しないこと
- バッチサイズ、接続プールサイズなどの調整ノブは、自動的に十分な値を供給または計算できない場合(つまり、このノブがないことを理由にバグを報告する妥当な人を想像できる場合を除く)を除いて、 **公開しない** でください。
- 多くのパラメータを持つライブラリへのコネクタを開発する際には、基になるライブラリの各パラメータを **ミラーリングしないでください** 。必要に応じて、基になるライブラリの構成クラスを再利用し、ユーザーに全体のインスタンスを供給させます。例: `JdbcIO` 。 *例外1*: 基になるライブラリのパラメータの一部がBeamセマンティクスと非自明に相互作用する場合、それらを公開します。たとえば、発行者に対して「配信保証」パラメータを持つパブリッシュ/サブスクライブシステムへのコネクタを開発する場合、パラメータを公開しますが、Beamモデルと互換性のない値(多くても一度と正確に一度)を禁止します。 *例外2*: 基になるライブラリの構成クラスが使いにくい場合(安定したAPIを宣言していない、問題のある推移的な依存関係を公開している、またはセマンティックバージョニングに従っていないなど)。この場合、それをラップして、変換のユーザーにクリーンでより安定したAPIを公開する方が良いでしょう。
エラー処理
変換設定エラー
早期にエラーを検出します。エラーは次の段階で検出できます。
- (コンパイル言語で)ユーザーのパイプラインのソースコードのコンパイル
- 変換の構築または設定
- パイプラインへの変換の適用
- パイプラインの実行
例:
- 型付き言語では、変換のAPIを厳密に型付けすることで、コンパイル時のエラーチェックを利用します。
- **厳密に型付けされた設定:**例:Javaでは、URLであるパラメータは、 `String` クラスではなく `URL` クラスを使用する必要があります。
- **厳密に型付けされた入力と出力:**例:MongoDBに書き込む変換は、 `PCollection<String>` ではなく `PCollection<Document>` を取る必要があります( `Document` の `Coder` を提供できることを前提としています)。
- セッターメソッドで個々のパラメータの無効な値を検出します。
- 変換のvalidateメソッドでパラメータの無効な組み合わせを検出します。
ランタイムエラーとデータ整合性
何よりもデータ整合性を優先します。データの損失または破損を隠蔽しないでください。データの損失を防ぐことができない場合は、失敗します。
実施事項
- `DoFn` では、操作が再試行で成功する可能性が高い場合は、一時的な失敗を再試行します。再試行される作業の量を最小限に抑えるために、可能な限り最も狭い範囲でこのような再試行を実行します(つまり、理想的にはRPCライブラリ自体レベル、または失敗したRPCをサードパーティシステムに直接送信するレベル)。それ以外の場合は、ランナーが適切な粒度レベルで作業を再試行させます(ランナーによって再試行動作は異なりますが、ほとんどのランナーは *ある程度の* 再試行を行います)。
- 変換に副作用がある場合は、それらをべき等(つまり、複数回適用しても安全)にするように努めます。再試行により、副作用が複数回、並行して実行される可能性があります。
- 変換に処理できない(永続的に失敗する)レコードがあり、それにもかかわらずパイプラインを続行したい場合
- 不良レコードを無視しても安全な場合は、不良レコードをメトリックでカウントします。変換のドキュメントにこの集計器について記載されていることを確認してください。実行中にパイプライン内から集計器の値を読み取るプログラムによるアクセスはないことに注意してください。
- 不良レコードを手動でユーザーが検査する必要がある場合は、それらのレコードのみを含む出力に出力します。
- あるいは、要素の失敗がバンドルの失敗になるしきい値(デフォルトはゼロ)を採用します(変換を構造化して要素の総数と失敗した要素の数をカウントし、それらを比較して、失敗がしきい値を超えている場合に失敗します)。
- ユーザーが提供できるよりも高いデータ整合性保証を要求する場合は、失敗します。例:ユーザーがMQTTコネクタからQoS 2(正確に一度の配信)を要求する場合、Beamランナーはコネクタへの書き込みを再試行する可能性があるため、正確に一度の配信は行えず、コネクタは失敗する必要があります。
実施しないこと
- 失敗を処理できない場合は、キャッチしないでください。*例外*: 元のエラーにない貴重なコンテキストを提供できる場合は、エラーをキャッチしてメッセージをログに記録し、再度スローすることが価値がある場合があります。
- 絶対にこれをしてはいけません: `catch(...) { log an error; return null or false or otherwise ignore; }` **経験則:バンドルが失敗しなければ、その出力は正しく完全でなければなりません。** ユーザーにとって、エラーをログ出力したが成功した変換は、サイレントデータロスです。
パフォーマンス
多くのランナーは、`ParDo`の連鎖を、各入力要素につき少量から中程度の要素を出力する、または要素ごとの処理が比較的安価な`ParDo`の場合(例:Dataflowの「融合」)、パフォーマンスを向上させる方法で最適化しますが、これらの仮定が満たされない場合は並列化を制限します。その場合、「融合ブレーク」(`Reshuffle.of()`)が必要になる可能性があり、`ParDo`の出力`PCollection`の処理の並列化が向上します。
- 変換に、入力要素あたり大量の要素を出力する可能性のある`ParDo`が含まれている場合は、この`ParDo`の後に融合ブレークを適用して、下流の変換がその出力を並列に処理できるようにします。
- 変換に、要素の処理に非常に長い時間がかかる`ParDo`が含まれている場合は、この`ParDo`の前に融合ブレークを挿入して、入力`PCollection`がどのように生成されたかに関係なく、すべての要素またはほとんどの要素を並列に処理できるようにします。
ドキュメント
変換の構成方法(コード例を示す)、および入力に関する期待する保証または出力に関する提供する保証を、Beamモデルを考慮して文書化します。例:
- この変換の入力コレクションと出力コレクションは、バウンドされているか、アンバウンドされているか、どちらにも対応できますか?
- 変換がサードパーティシステムにデータ書き込む場合、少なくとも一度はデータが書き込まれることが保証されますか?最大で一度ですか?正確に一度ですか?(ランナーが再試行または推測実行(別名バックアップ)のためにバンドルを複数回実行する場合、正確に一度を実現するにはどうすればよいですか?)
- 変換がサードパーティシステムからデータを読み取る場合、読み取りの最大潜在的な並列度はいくつですか?たとえば、変換がデータを逐次的に読み取る場合(たとえば、単一のSQLクエリを実行する場合)、ドキュメントにそれを記載する必要があります。
- 処理中に外部システムを照会する変換(例:外部キーバリューストアの情報を使用して`PCollection`を結合する)の場合、照会されたデータの新しさに関する保証は何ですか?たとえば、変換の開始時にすべてロードされるか、要素ごとに照会されますか(その場合、変換の実行中に単一要素のデータが変更された場合はどうなりますか?)
- 入力`PCollection`へのアイテムの到着と出力`PCollection`への出力の送信の間に重要な関係がある場合、その関係は何ですか?(例:変換が内部的にウィンドウ化、トリガー、グルーピングを実行するか、状態またはタイマーAPIを使用する場合)
ロギング
変換のユーザーが遭遇する可能性のある異常な状況を予測します。デバッグに十分な情報をログに記録しますが、ログのボリュームを制限します。これはすべてのプログラムに適用されるアドバイスですが、データボリュームが膨大で実行が分散されている場合は特に重要です。
実施事項
- サードパーティシステムからのエラーを処理する場合は、サードパーティシステムが提供するエラーの詳細を含む完全なエラーをログに記録し、変換が認識する追加のコンテキストを含めます。これにより、ユーザーはメッセージに提供されている情報に基づいてアクションを実行できます。例外を処理して独自の例外をスローする場合は、元の例外をラップします(一部の言語では、より高度な機能を提供しています。たとえば、Javaの「抑制された例外」)。利用可能なエラーに関する情報を黙って破棄しないでください。
- まれな(要素ごとではない)低速な操作(例:大規模なファイルパターンの展開、インポート/エクスポートジョブの開始)を実行する場合は、操作の開始時と終了時にログに記録します。操作に識別子がある場合は、識別子をログに記録して、ユーザーが後でデバッグのために操作を検索できるようにします。
- 後続の処理の正確性またはパフォーマンスにとって非常に重要な、低ボリュームのものを計算する場合は、入力と出力をログに記録して、デバッグ中のユーザーがそれらを健全性チェックしたり、異常な結果を手動で再現したりできるようにします。例:ファイルパターンをファイルに展開する場合は、ファイルパターンとその分割された部分数をログに記録します。クエリを実行する場合は、クエリとその生成した結果数をログに記録します。
実施しないこと
- 要素ごとまたはバンドルごとに`INFO`レベルでログに記録しないでください。`DEBUG`/`TRACE`は、これらのレベルはデフォルトで無効になっているため、問題ない場合があります。
- 機密情報が含まれる可能性のあるデータペイロードをログに記録したり、ログに記録する前にサニタイズしないでください(例:ユーザーデータ、資格情報など)。
テスト
データ処理は複雑で、多くの特殊なケースがあり、デバッグが困難です。これは、パイプラインの実行に時間がかかるため、出力が正しいかどうかを確認するのが難しく、デバッガーを接続できず、多くの場合、データの大量のため、希望するほど多くログに記録できないためです。そのため、テストは特に重要です。
変換のランタイム動作のテスト
- `TestPipeline`と`PAssert`を使用して、変換の全体的なセマンティクスを単体テストします。ダイレクトランナーに対するテストから始めます。`PCollection`の内容に関するアサーションは厳密である必要があります。例:データベースからの読み取りで1〜10の数字が読み取られることが予想される場合、出力`PCollection`に10個の要素があること、または各要素が範囲[1, 10]内にあることだけをアサートするのではなく、1〜10の各数字が正確に一度出現することをアサートします。
- `TestPipeline`を使用して確実にシミュレートするのが難しい特殊なケースになりやすい、変換における重要な逐次ロジックを特定し、このロジックを単体テスト可能な関数に抽出し、それらを単体テストします。一般的な特殊なケースは次のとおりです。
- 空のバンドルを処理する`DoFn`
- 非常に大きなバンドルを処理する`DoFn`(内容がメモリに収まらない、「ホットキー」を含む、非常に多くの値を持つもの)
- サードパーティAPIの失敗
- サードパーティAPIが非常に不正確な情報を提供する
- 失敗した場合の`Closeable`/`AutoCloseable`リソースのリーク
- ソースを開発する場合の一般的な特殊なケース:`BoundedSource.split`における複雑な算術(例:キーまたはオフセット範囲の分割)、空のデータソースまたはいくつかの空のコンポーネントを持つ複合データソースの反復。
- サードパーティシステムとのやり取りをモックアウトするか、または利用可能な場合は「フェイク」実装を使用します。モックアウトされたやり取りが、これらのシステムの実際の動作のすべての興味深いケースを表していることを確認します。
- `DoFn`、`CombineFn`、`BoundedSource`を単体テストするには、それぞれ`DoFnTester`、`CombineFnTester`、`SourceTestUtils`を使用することを検討してください。これにより、潜在的なバグを洗い出すためにコードを非自明な方法で実行できます。
- アンバウンドコレクションで動作する変換の場合、`TestStream`を使用して、遅延データまたは順序外れのデータの存在下での動作をテストします。
- テストは、敵対的な環境、CPUまたはネットワークの制約のある環境(継続的インテグレーションサーバーを含む)でも常に100%合格する必要があります。タイミング依存のコード(例:スリープ)をテストに入れないでください。経験上、妥当な量の睡眠では十分ではありません。コードは数秒以上中断される可能性があります。
- テストコードの構成に関する詳細な手順については、Beam Testing Guideを参照してください。
変換の構築と検証のテスト
変換の構築と検証のためのコードは通常自明であり、ほとんどが定型的なものです。ただし、その中の小さな間違いやタイプミスは、重大な結果(例:ユーザーが設定したプロパティを無視する)をもたらす可能性があるため、テストする必要があります。しかし、過剰な量の自明なテストは、保守が難しく、変換が十分にテストされているという誤った印象を与える可能性があります。
実施事項
- 重要な検証コードをテストします。欠落している/不正確な/情報量の少ない検証は、重大な問題(データ損失、直感に反する動作、プロパティの値が黙って無視される、またはその他のデバッグが困難なエラー)につながる可能性があります。重要な検証エラーのクラスごとに1つのテストを作成します。テストする必要がある検証の例をいくつか示します。
- プロパティ`withFoo()`と`withBar()`を同時に指定できない場合、両方のプロパティを指定した変換が拒否されることをテストします。ランタイムでプロパティのいずれかが黙って無視されるのではなく。
- 特定の構成で変換が誤って動作するか、直感に反して動作することがわかっている場合、その構成が拒否されることをテストします。ランタイムで誤った結果が生成されるのではなく。たとえば、変換はバウンドコレクションに対してのみ、またはグローバルにウィンドウ化されたコレクションに対してのみ適切に機能する場合があります。または、ストリーミングシステムがいくつかのレベルの「サービス品質」をサポートしているとします。その1つは「正確に一度配信」です。ただし、このシステムに書き込む変換は、障害が発生した場合の再試行のために、正確に一度配信を提供できない可能性があります。その場合、変換が正確に一度QoSを指定することを許可しないことをテストします。ランタイムで期待されるエンドツーエンドのセマンティクスを提供できないのではなく。
- `TestPipeline`と`PAssert`を使用して、期待されるテスト結果が`withFoo()`の値に依存するテストを作成し、各`withFoo()`メソッド(オーバーロードを含む)が効果があること(無視されないこと)をテストします。
実施しないこと
- 成功した検証をテストしないでください(例:「変換が正しく構成されている場合、検証は失敗しません」)。
- 自明な検証エラーをテストしないでください(例:「プロパティが設定されていない/null/空/負の/…の場合、検証は失敗します」)。
互換性
実施事項
- 一般的に、セマンティックバージョニングのルールに従ってください。
- 変換のAPIがまだ安定していない場合は、`@Experimental`(Java)または`@experimental`(Python)としてアノテーションを付けます。
- APIが非推奨の場合は、`@Deprecated`(Java)または`@deprecated`(Python)としてアノテーションを付けます。
- 変換のAPIによって公開されるサードパーティクラスの安定性とバージョン管理に注意してください。それらが不安定であるか、不適切なバージョン管理されている場合(セマンティックバージョニングに従わない場合)、それらを独自のクラスにラップする方が良いでしょう。
実施しないこと
- コードがコンパイルされ続けるが、以前のドキュメントに記載されている動作とは異なる動作をする(例:異なる出力を生成するか、異なる入力を期待する、もちろん以前の出力が間違っている場合を除く)変換の動作を黙って変更しないでください。そのような互換性のない動作の変更がコンパイルエラーを引き起こすように努めてください(例:新しい動作に対して新しい変換を導入し、古いものを非推奨にしてから削除する(新しいメジャーバージョンで)方が、既存の変換の動作を黙って変更するよりも優れています)、または少なくともランタイムエラー。
- 変換の動作が同じままで、実装またはAPIを単に変更している場合は、ユーザーのコードのコンパイルが停止するような方法で変換のAPIを変更しないでください。
Java固有の考慮事項
以下のほとんどのプラクティスの良い例は、`JdbcIO`と`MongoDbIO`です。
API
入力と出力PCollectionの型の選択
可能な限り、変換の性質に固有の型を使用してください。必要に応じて、独自の型から変換`DoFn`を使用してラップできます。例:DatastoreコネクタはDatastore`Entity`型を使用する必要があります。MongoDbコネクタは、JSONの文字列表現ではなく、Mongo`Document`型を使用する必要があります。
それが不可能な場合があります(例:JDBCはBeamと互換性のある(Coderでエンコード可能な)「JDBCレコード」データ型を提供しません)。その場合、変換固有の型とBeamと互換性のある型間の変換を行う関数をユーザーに提供させます(例:`JdbcIO`と`MongoDbGridFSIO`を参照)。
変換が、まだJavaクラスが存在しない複合型を論理的に返す必要がある場合、適切な名前のフィールドを持つ新しいPOJOクラスを作成します。汎用タプルクラスまたはKV
は、フィールドが正当にキーと値である場合を除いて使用しないでください。
複数の出力コレクションを持つ変換
変換が複数のコレクションを返す必要がある場合、それはPTransform<..., PCollectionTuple>
でなければならず、各コレクションに対してgetBlahTag()
メソッドを公開する必要があります。
例えば、PCollection<Foo>
とPCollection<Bar>
を返す場合は、TupleTag<Foo> getFooTag()
とTupleTag<Bar> getBarTag()
を公開します。
例:
public class MyTransform extends PTransform<..., PCollectionTuple> {
private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
...
PCollectionTuple expand(... input) {
...
PCollection<Moo> moo = ...;
PCollection<Blah> blah = ...;
return PCollectionTuple.of(mooTag, moo)
.and(blahTag, blah);
}
public TupleTag<Moo> getMooTag() {
return mooTag;
}
public TupleTag<Blah> getBlahTag() {
return blahTag;
}
...
}
設定のためのFluentビルダー
変換クラスを不変にし、変更された不変オブジェクトを生成するメソッドを使用します。AutoValueを使用します。AutoValueは、ビルダーヘルパークラスを提供できます。プリミティブ型(例:int)を除き、デフォルト値がないか、デフォルト値がnullであるクラス型の引数をマークするには、@Nullable
を使用します。
@AutoValue
public abstract static class MyTransform extends PTransform<...> {
int getMoo();
@Nullable abstract String getBlah();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMoo(int moo);
abstract Builder setBlah(String blah);
abstract MyTransform build();
}
...
}
ファクトリメソッド
引数のない単一の静的ファクトリメソッドを、包含クラス内(「変換ファミリの梱包」を参照)または変換クラス自体に用意します。
public class Thumbs {
public static Twiddle twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
public abstract static class Twiddle extends PTransform<...> { ... }
}
// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
public static TwiddleThumbs create() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
...
}
例外:変換に非常に重要な単一の引数がある場合、ファクトリメソッドof
を呼び出し、引数をファクトリメソッドの引数に入れます。例:ParDo.of(DoFn).withAllowedLateness()
。
パラメータを設定するためのFluentビルダーメソッド
それらをwithBlah()
と呼びます。すべてのビルダーメソッドは、まったく同じ型を返す必要があります。それがパラメータ化された(ジェネリックな)型である場合、型パラメータの値は同じです。
withBlah()
メソッドをキーワード引数の順序付けられていない集合として扱います。結果は、withFoo()
とwithBar()
を呼び出す順序に依存してはなりません(例:withBar()
はfooの現在の値を読み取ってはなりません)。
各withBlah
メソッドの影響を文書化します。このメソッドをいつ使用するべきか、どのような値が許可されているか、デフォルト値は何であるか、値を変更することの影響は何であるか。
/**
* Returns a new {@link TwiddleThumbs} transform with moo set
* to the given value.
*
* <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
*
* <p>Higher values generally improve throughput, but increase chance
* of spontaneous combustion.
*/
public Twiddle withMoo(int moo) {
checkArgument(moo >= 0 && moo < 100,
"Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
+ "Valid values are 0 (inclusive) to 100 (exclusive)",
moo);
return toBuilder().setMoo(moo).build();
}
パラメータのデフォルト値
ファクトリメソッドで指定します(ファクトリメソッドはデフォルト値を持つオブジェクトを返します)。
複数のパラメータを再利用可能なオブジェクトにパッケージ化
変換のいくつかのパラメータが非常に密接に論理的に結合されている場合、それらをコンテナオブジェクトにカプセル化することが理にかなう場合があります。このコンテナオブジェクトにも同じガイドラインを使用します(不変にする、ビルダーを含むAutoValueを使用する、withBlah()
メソッドを文書化するなど)。例として、JdbcIO.DataSourceConfigurationを参照してください。
型パラメータを持つ変換
すべての型パラメータは、ファクトリメソッドで明示的に指定する必要があります。ビルダーメソッド(withBlah()
)は型を変更してはなりません。
public class Thumbs {
public static Twiddle<T> twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
}
@AutoValue
public abstract static class Twiddle<T>
extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
…
@Nullable abstract Bar<T> getBar();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
…
abstract Builder<T> setBar(Bar<T> bar);
abstract Twiddle<T> build();
}
…
}
}
// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle() … );
例外:変換に最も重要なパラメータが1つあり、そのパラメータが型Tに依存する場合、それをファクトリメソッドに直接入れる方が優先されます。例:Combine.globally(SerializableFunction<Iterable<V>,V>
)。これにより、Javaの型推論が向上し、ユーザーは型パラメータを明示的に指定する必要がなくなります。
変換に2つ以上の型パラメータがある場合、またはパラメータの意味があまり明確でない場合、型パラメータをSomethingT
のように命名します。例:分類アルゴリズムを実装し、各入力要素にラベルを割り当てるPTransformは、Classify<InputT, LabelT>
として型指定される場合があります。
ユーザー指定の動作の注入
変換に、ユーザーコードによってカスタマイズされる動作の側面がある場合、次のように決定します。
実施事項
- 可能であれば、PTransformの合成を拡張機能として使用します。つまり、ユーザーがパイプラインで変換を適用し、別の
PTransform
と合成することで同じ効果が得られる場合、変換自体は拡張可能である必要はありません。例:サードパーティシステムにJSONオブジェクトを書き込む変換は、JsonObject
のCoder
を提供できる場合(想定)、ジェネリックなPCollection<T>
とProcessFunction<T, JsonObject>
を受け取るのではなく、PCollection<JsonObject>
を受け取る必要があります(修正する必要がある反例:TextIO
)。 - 変換内でユーザーコードによる拡張が必要な場合、ユーザーコードを
ProcessFunction
として渡すか、独自のシリアライズ可能な関数のような型(理想的には単一メソッド、Java 8ラムダとの相互運用性のため)を定義します。Javaはラムダの型を消去するため、ユーザーによってraw型のProcessFunction
が提供された場合でも、十分な型情報を持っていることを確認する必要があります。ラムダと型情報を持つ具体的なサブクラスの両方を適切にサポートする方法については、MapElements
とFlatMapElements
の例を参照してください。
実施しないこと
- 拡張のために継承を使用しないでください。ユーザーは
PTransform
クラスをサブクラス化してはなりません。
変換群のパッケージ化
密接に関連する変換のファミリー(例:異なる方法で同じシステムと対話する、または同じ上位タスクの異なる実装を提供する)を開発する場合は、最上位クラスを名前空間として使用し、個々のユースケースに対応する変換を返す複数のファクトリメソッドを使用します。
コンテナクラスはprivateコンストラクタを持たなければならないため、直接インスタンス化することはできません。
FooIO
レベルで共通のものを文書化し、各ファクトリメソッドを個別に文書化します。
/** Transforms for clustering data. */
public class Cluster {
// Force use of static factory methods.
private Cluster() {}
/** Returns a new {@link UsingKMeans} transform. */
public static UsingKMeans usingKMeans() { ... }
public static Hierarchically hierarchically() { ... }
/** Clusters data using the K-Means algorithm. */
public static class UsingKMeans extends PTransform<...> { ... }
public static class Hierarchically extends PTransform<...> { ... }
}
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static Read read() { ... }
...
public static class Read extends PTransform<...> { ... }
public static class Write extends PTransform<...> { ... }
public static class Delete extends PTransform<...> { ... }
public static class Mutate extends PTransform<...> { ... }
}
互換性のないAPIを持つ複数のバージョンをサポートする場合は、バージョンも名前空間のようなクラスとして使用し、異なるAPIバージョンの実装を異なるファイルに配置します。
// FooIO.java
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static FooV1 v1() { return new FooV1(); }
public static FooV2 v2() { return new FooV2(); }
}
// FooV1.java
public class FooV1 {
// Force use of static factory methods outside the package.
FooV1() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
// FooV2.java
public static class FooV2 {
// Force use of static factory methods outside the package.
FooV2() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
動作
不変性
- 変換クラスは不変でなければなりません。すべての変数はprivate finalで、それ自体が不変でなければなりません(例:リストの場合、
ImmutableList
でなければなりません)。 - すべての
PCollection
の要素は不変でなければなりません。
シリアライゼーション
DoFn
、PTransform
、CombineFn
などのインスタンスはシリアライズされます。シリアライズされたデータの量を最小限に抑えます。シリアライズしたくないフィールドをtransient
としてマークします。可能な限りクラスをstatic
にします(インスタンスが包含クラスインスタンスをキャプチャしてシリアライズしないようにするため)。注:場合によっては、匿名クラスを使用できないことを意味します。
検証
checkArgument()
を使用して、.withBlah()
メソッドで個々のパラメータを検証します。エラーメッセージには、パラメータの名前、実際の値、有効な値の範囲を記載する必要があります。PTransform
の.expand()
メソッドで、パラメータの組み合わせと不足している必須パラメータを検証します。PTransform
の.validate(PipelineOptions)
メソッドで、PipelineOptions
からPTransform
が取得するパラメータを検証します。これらの検証は、パイプラインが完全に構築/展開され、特定のPipelineOptions
で実行されようとしているときに実行されます。ほとんどのPTransform
はPipelineOptions
を使用しないため、validate()
メソッドは必要ありません。代わりに、上記の他の2つのメソッドを使用して検証を実行する必要があります。
@AutoValue
public abstract class TwiddleThumbs
extends PTransform<PCollection<Foo>, PCollection<Bar>> {
abstract int getMoo();
abstract String getBoo();
...
// Validating individual parameters
public TwiddleThumbs withMoo(int moo) {
checkArgument(
moo >= 0 && moo < 100,
"Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
moo);
return toBuilder().setMoo(moo).build();
}
public TwiddleThumbs withBoo(String boo) {
checkArgument(boo != null, "Boo can not be null");
checkArgument(!boo.isEmpty(), "Boo can not be empty");
return toBuilder().setBoo(boo).build();
}
@Override
public void validate(PipelineOptions options) {
int woo = options.as(TwiddleThumbsOptions.class).getWoo();
checkArgument(
woo > getMoo(),
"Woo (%s) must be smaller than moo (%s)",
woo, getMoo());
}
@Override
public PCollection<Bar> expand(PCollection<Foo> input) {
// Validating that a required parameter is present
checkArgument(getBoo() != null, "Must specify boo");
// Validating a combination of parameters
checkArgument(
getMoo() == 0 || getBoo() == null,
"Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
getMoo(), getBoo());
...
}
}
コーダー
Coder
は、Beamランナーが中間データを具体化するか、必要に応じてワーカー間で送信する方法です。Coder
の特定のバイナリエンコーディングは、そのプライベートな実装の詳細となることを意図しているため、Coder
を汎用APIとしてバイナリ形式の解析や書き込みに使用しないでください。
型のデフォルトコーダーを提供する
すべての新しいデータ型に対してデフォルトのCoder
を提供します。@DefaultCoder
アノテーションまたは@AutoService
でアノテーションされたCoderProviderRegistrar
クラスを使用します。例については、SDKでのこれらのクラスの使用例を参照してください。パフォーマンスが重要でない場合は、SerializableCoder
またはAvroCoder
を使用できます。そうでない場合は、効率的なカスタムコーダーを開発します(具体的な型にはAtomicCoder
を、ジェネリック型にはStructuredCoder
をサブクラス化します)。
出力コレクションにコーダーを設定する
PTransform
によって作成されたすべてのPCollection
(出力コレクションと中間コレクションの両方)には、Coder
が設定されている必要があります。ユーザーが.setCoder()
を呼び出して、PTransform
によって生成されたPCollection
のコーダーを「修正」する必要はありません(実際、Beamは最終的にsetCoder
を非推奨にする予定です)。場合によっては、コーダーの推論でこれを実現できます。他の場合では、変換はコレクションに対してsetCoder
を明示的に呼び出す必要があります。
コレクションが具体的な型である場合、その型には通常、対応するコーダーがあります。SerializableCoder
などの汎用コーダーではなく、特定の最も効率的なコーダー(例:文字列にはStringUtf8Coder.of()
、バイト配列にはByteArrayCoder.of()
など)を使用します。
コレクションの型にジェネリック型変数が含まれる場合、状況はより複雑になります。
- それが変換の入力型と一致するか、それを単純にラップしている場合、
input.getCoder()
から入手できる入力PCollection
のコーダーを再利用できます。 input.getPipeline().getCoderRegistry().getCoder(TypeDescriptor)
を使用して、コーダーを推論しようとします。ジェネリック型のTypeDescriptor
を取得するには、TypeDescriptors
のユーティリティを使用します。このアプローチの例として、AvroIO.parseGenericRecords()
の実装を参照してください。ただし、ジェネリック型のコーダー推論は最善の努力であり、場合によってはJavaの型消去のために失敗する可能性があります。- ユーザーが関連する型変数に
Coder
をPTransform
の構成パラメータとして明示的に指定できるようにする必要があります(例:AvroIO.<T>parseGenericRecords().withCoder(Coder<T>)
)。コーダーが明示的に指定されていない場合は、推論にフォールバックします。
最終更新日:2024年10月31日
探していたものすべてが見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!