I/O 標準
概要
このApache Beam I/O標準ドキュメントでは、Apache Beam I/Oコネクタを開発するファーストパーティ/サードパーティ開発者向けの規範的なガイダンスを示します。これらのガイドラインは、ドキュメント、開発、テストをシンプルかつ簡潔な方法で網羅するベストプラクティスを作成することを目的としています。
ビルトインI/Oコネクタとは何ですか?
Apache Beam GithubリポジトリにあるI/Oコネクタ(I/O)は、**ビルトインI/Oコネクタ**と呼ばれます。ビルトインI/Oは、Google Cloud DataflowチームがDataflowランナーを使用して定期的に統合テストとパフォーマンステストを実行し、参照用にメトリクスを公開しています。それ以外の場合、明示的に記載がない限り、以下のガイドラインは両方に適用されます。
ガイダンス
ドキュメント
このセクションでは、I/Oで利用可能になることが期待されるすべてのドキュメントのスーパーセットについて説明します。このセクション全体で参照されるApache Beamドキュメントは、こちらにあります。また、一般的に良い例は、ビルトインI/OであるSnowflake I/Oです。
ビルトインI/O
I/Oの関連言語のコードドキュメントを提供します。これには、Apache Beamサイト内または外部ドキュメントの場所にある情報源へのリンクも含まれている必要があります。 例 |
**I/Oコネクタガイド**に、特定のヒントと設定を説明する新しいページを追加します。以下は、Parquet、Hadoopなどの例です。 例 |
Javadoc/Pythondocのセクションヘッダーのフォーマットは、他のページのプログラムによる情報抽出を将来有効にできるよう、全体を通して一貫している必要があります。 ページに含めるセクションの**サブセット**の例(順序どおり)
例 KafkaIO JavaDoc |
I/Oコネクタには、**サポートされている機能**サブヘッダーの下に、使用されているリレーショナル機能を示す表を含める必要があります。 リレーショナル機能は、効率を向上させるのに役立つ概念であり、I/Oコネクタによってオプションで実装できます。エンドユーザーが提供するパイプライン設定(SchemaIO)とユーザー問合せ(FieldAccessDescriptor)データを使用して、リレーショナル理論を適用して、パイプライン実行の高速化、運用コストの削減、データの読み取り/書き込みの削減などの改善を導き出します。 テーブルの例
実装例 BigQueryIO 列のプルーニング(ProjectionPushdownによる)により、エンドユーザーの問合せで示された必要な列のみを返します。これは、BigQuery DirectRead APIを使用して実現されます。 |
必要に応じて、**一般的なパイプラインパターン**の下に、I/Oに関連する一般的な使用パターンを概説するページを追加します。 https://beam.dokyumento.jp/documentation/patterns/bigqueryio/ |
**I/Oコネクタ**をI/Oの情報で更新します。 例 https://beam.dokyumento.jp/documentation/io/connectors/#built-in-io-connectors |
**始める前に**ヘッダーの下に、I/Oを使用するためのセットアップ手順を提供します。 例 https://beam.dokyumento.jp/documentation/io/built-in/parquet/#before-you-start |
サポートされている各言語の最初の説明の後に、標準的な読み取り/書き込みコードスニペットを含めます。以下の例は、Javaの例を含むHadoopを示しています。 例 https://beam.dokyumento.jp/documentation/io/built-in/hadoop/#reading-using-hadoopformation |
要素のタイムスタンプがどのように割り当てられるかを示します。これには、`current_time()`よりも有用な情報を提供できる将来のI/Oを許可するためのバッチソースが含まれます。 例 |
タイムスタンプがどのように進むかを示します。バッチソースの場合、ほとんどの場合、これはn/aとマークされます。 |
コネクタが作成する一時リソース(ファイルなど)の概要を示します。 例 BigQueryバッチロードは、最初に一時GCSロケーションを作成します。 |
**認証**サブヘッダーの下に、ソース/シンクに安全にアクセスするためのパートナー認証マテリアルを取得する方法を提供します。 例 https://beam.dokyumento.jp/documentation/io/built-in/snowflake/#authentication ここでは、BigQueryはそれを権限と呼んでいますが、トピックは類似点をカバーしています。 |
I/Oは、**始める前に**ヘッダー内にソース/シンクドキュメントへのリンクを提供する必要があります。 例 https://beam.dokyumento.jp/documentation/io/built-in/snowflake/ |
各言語におけるネイティブまたはX言語サポートの有無を、ドキュメントへのリンクと共に示してください。 例 Kinesis I/Oは、Javaのネイティブ実装とPythonのX言語サポートを備えていますが、Golangのサポートはありません。 |
既知の制限事項は、制限事項ヘッダーの下に示してください。制限事項に追跡 issue がある場合は、インラインでリンクしてください。 例 https://beam.dokyumento.jp/documentation/io/built-in/snowflake/#limitations |
I/O(ビルトインではない)
カスタムI/Oは、Apache Beam Githubリポジトリには含まれていません。例としては、SolaceIOなどがあります。
Apache Beamのその他のI/Oコネクタの表を、あなたの情報で更新してください。 |
このセクションでは、新規および既存のApache Beam I/Oコネクタで採用すべき機能のAPI構文、セマンティクス、および推奨事項について概説します。
I/Oコネクタ開発ガイドラインは、以下の原則を念頭に置いて作成されています。
- 一貫性により、APIは学習しやすくなります。
- 何かを行う方法が複数ある場合は、まず一貫性を保つように努めるべきです。
- ドキュメントを数分間調べれば、ユーザーはほとんどのI/Oコネクタを理解できるはずです。
- 新しいI/Oの設計では、進化の可能性を考慮する必要があります。
- 変換は、他のBeamユーティリティと適切に統合する必要があります。
すべてのSDK
パイプライン設定/実行/ストリーミング/ウィンドウイングセマンティクスガイドライン
トピック | セマンティクス |
---|---|
パイプラインオプション | I/Oは、内部パラメータを調整するためにPipelineOptionsサブクラスに依存することはほとんどありません。 必要な場合は、コネクタ関連のパイプラインオプションクラスは、
|
ソースウィンドウイング | ソースは、APIでユーザーが明示的にパラメータ化しない限り、GlobalWindowで要素を返す必要があります。 許可される非グローバルウィンドウパターン
|
シンクウィンドウイング | シンクは、明示的にパラメータ化またはAPIで表現されていない限り、Windowに依存せず、任意のWindowingメソッドで送信された要素を処理する必要があります。 シンクは、内部的にPCollectionのウィンドウイングを任意の方法で変更できます。ただし、結果オブジェクトの一部として返すメタデータは、
許可される非グローバルウィンドウパターン
|
スロットリング | ストリーミングシンク(または外部サービスにアクセスする変換)は、外部サービスの過負荷を防ぐために、リクエストのスロットリングを実装する場合があります。 TODO: Beamはスロットリングユーティリティを公開する必要があります (追跡 issue)
|
エラー処理 | TODO: 追跡 issue |
Java
全般
コネクタの操作に使用されるプライマリクラスの名前は、{connector}IOにする必要があります。 例 BigQuery I/Oは、org.apache.beam.sdk.io.bigquery.BigQueryIOです。 |
クラスは、パッケージorg.apache.beam.sdk.io.{connector}に配置する必要があります。 例 BigQueryIOは、Javaパッケージorg.apache.beam.sdk.io.bigqueryに属します。 |
単体/統合/パフォーマンステストは、パッケージorg.apache.beam.sdk.io.{connector}.testingの下に配置する必要があります。これにより、さまざまなテストがコネクタの標準のユーザー向けインターフェースで機能します。 単体テストは、コネクタの内部をテストすることが多いため、同じパッケージ(つまり、org.apache.beam.sdk.io.{connector})に存在する必要があります。 BigQueryIOは、Javaパッケージorg.apache.beam.sdk.io.bigqueryに属します。 |
I/O変換は、ユーザ型からコネクタ固有型に要素をマッピングするために、ユーザーラムダを受信することを避ける必要があります。代わりに、コネクタ固有のデータ型(可能な場合はスキーマ情報を含む)とインターフェースする必要があります。 必要な場合、I/O変換は、変換の入力型(シンクの場合)または出力型(ソースの場合)を指定する型パラメータを受け取る必要があります。 I/O変換は、出力型が変更されないことが確実な場合にのみ、型パラメータを持たない場合があります(例:FileIO.MatchAllおよびその他のFileIO変換)。 |
以下の理由により、I/OコネクタのパブリックAPI部分でサードパーティライブラリを直接公開することは、強くお勧めしません。
代わりに、Beamネイティブインターフェースと、マッピングロジックを保持するアダプターを公開することを強くお勧めします。 問題のライブラリが非常に静的な性質であると考える場合は、I/O自体にその旨を記してください。 |
ソースとシンクは、PTransformラッパーで抽象化し、内部クラスはprotectedまたはprivateとして宣言する必要があります。これにより、依存関係による実装を壊すことなく、実装の詳細を追加/変更/修正できます。 |
クラス/メソッド/プロパティ
Java構文 | セマンティクス |
---|---|
class IO.Read | I/O内の読み取りを表すクラスへのアクセスを提供します。 ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。 |
class IO.ReadAll | いくつかの異なるソースは、データソースからの読み取りのためのランタイム構成を実装しています。これは、純粋なバッチソースをより高度なストリーミングソースにすることができるため、貴重なパターンです。 可能な限り、このタイプの変換は、構築時設定変換のタイプの豊富さを備えている必要があります。
例 |
class IO.Write | I/O内の書き込みを表すクラスへのアクセスを提供します。 ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。 |
その他の変換クラス | 一部のデータストレージおよび外部システムは、読み取りまたは書き込みのセマンティクスに簡単に調整できないAPIを実装しています(例:FhirIOは、Fhirにデータを取得または送信するいくつかの異なる変換を実装しています)。 これらのクラスは、読み取り、書き込み、およびReadAll変換の追加構成の一部として機能をカプセル化することが不可能または非常に困難な場合にのみ追加する必要があります。これは、ユーザーの認知負荷を増大させることを避けるためです。 ユーザーはこのクラスを直接作成するべきではありません。トップレベルの静的メソッドによって作成する必要があります。 |
ユーティリティクラス | 一部のコネクタは、構成パラメータを設定するために、他のユーザー向けクラスに依存しています。 (例:JdbcIO.DataSourceConfiguration)。これらのクラスは、{Connector}IOクラス内にネストする必要があります。 この形式により、メインのJavadocで表示され、ユーザーが簡単に見つけることができます。 |
メソッドIO<T>.write() | トップレベルのI/Oクラスは、I/O.Write変換の構築を開始するための**静的メソッド**を提供します。これは、単一の入力PCollectionとWrite.Result出力を持つPTransformを返します。 このメソッドは、その名前で以下を指定するべきではありません。
上記は、可能であれば構成パラメータを介して指定する必要があります。**不可能な場合**は、**新しい静的メソッド**を導入できますが、これは**例外的な場合でなければなりません**。 |
メソッドIO<T>.read() | I/O.Read変換の構築を開始するためのメソッド。これは、単一の出力PCollectionを持つPTransformを返します。 このメソッドは、その名前で以下を指定するべきではありません。
上記は、可能であれば構成パラメータを介して指定する必要があります。**不可能な場合**は、**新しい静的メソッド**を導入できますが、これは**例外的な場合でなければならず、APIの一部としてI/Oヘッダーに文書化する必要があります**。 初期静的コンストラクタメソッドは、これらが少なく一般的である場合、または変換の構成に必要な場合(例:FhirIO.exportResourcesToGcs、JdbcIO.ReadWithPartitionsは初期構成にTypeDescriptorが必要です)、パラメータを受け取ることができます。 |
IO.Read.from(source) | Read変換は、ユーザーが読み取り元を指定できる**from**メソッドを提供する必要があります。変換が異なる*種類の*ソース(例:テーブル、クエリ、トピック、パーティション)から読み取ることができる場合、これを収容するために、このfromメソッドの複数の実装を提供できます。
これらのメソッドの入力型は、外部ソースのAPIを反映できます(例:Kafka TopicPartitionは、**Beam実装の**TopicPartitionオブジェクトを使用する必要があります)。 場合によっては、同じ入力型を使用する複数の**from**ロケーションが存在する可能性があり、メソッドのオーバーロードを利用できないことを意味します。これを念頭に置いて、この状況を有効にするには、新しいメソッドを使用してください。
|
IO.Read.fromABC(String abc) | |
IO.Write.to(destination) | Write変換は、ユーザーがデータの書き込み先を指定できる**to**メソッドを提供する必要があります。変換が同じ入力要素型(例:テーブル、クエリ、トピック、パーティション)を使用しながら異なる*種類の*ソースに書き込むことができる場合、これを収容するために、このfromメソッドの複数の実装を提供できます。
これらのメソッドの入力型は、外部シンクのAPIを反映できます(例:Kafka TopicPartitionは、**Beam実装の**TopicPartitionオブジェクトを使用する必要があります)。 異なる種類の宛先で異なる種類の入力オブジェクト型が必要な場合は、これらを別々のI/Oコネクタで行う必要があります。 場合によっては、同じ入力型を使用する複数の**from**ロケーションが存在する可能性があり、メソッドのオーバーロードを利用できないことを意味します。これを念頭に置いて、この状況を有効にするには、新しいメソッドを使用してください。
|
IO.Write.to(DynamicDestination destination) | 書き込み変換により、複数の宛先に書き込むことができます。これは、慎重に実装する必要がある複雑なパターンになる可能性があります(単一のパイプラインで複数の宛先を持つ可能性が高いコネクタに推奨されるパターンです)。 このための推奨されるパターンは、ユーザーが宛先の構成に必要なすべてのパラメータを定義できるようにする DynamicDestinations インターフェース(例:BigQueryIO.DynamicDestinations)を定義することです。 DynamicDestinations インターフェースを使用すると、メンテナーは時間の経過とともに新しいメソッドを追加できます(既存のユーザーに影響を与えないように**デフォルト実装**を使用)。必要に応じて、追加の構成パラメータを定義します。 |
IO.Write.toABC(destination) | |
class IO.Read.withX IO.Write.withX | withX は、Read メソッドに構成を渡すためのメソッドを提供します。ここで、X は作成される構成を表します。汎用 with ステートメント(以下に定義)を除いて、I/O は、構成オプションの名前をソースのオプション名と一致させるように試みる必要があります。 これらのメソッドは、既存のインスタンスを変更するのではなく、I/O の新しいインスタンスを返す必要があります。 例 |
IO.Read.withConfigObject IO.Write.withConfigObject | Java の一部のコネクタは、構成の一部として構成オブジェクトを受け取ります。**このパターンは、特定のケースのみに推奨されます**。ほとんどの場合、コネクタは必要なすべての構成パラメータをトップレベルで保持できます。 複数パラメータの構成オブジェクトが上位変換の適切なパラメータであるかどうかを判断するには、構成オブジェクトは次の条件を満たす必要があります。
例 JdbcIO.DataSourceConfiguration, SpannerConfig, KafkaIO.Read.withConsumerConfigUpdates |
class IO.Write.withFormatFunction | 推奨されません - 動的宛先を除く Beam Row 型の PCollection を受信できるソースの場合、フォーマット関数は必要ありません。Beam はスキーマに基づいて入力データをフォーマットできる必要があるためです。 動的宛先機能を提供するシンクの場合、要素は宛先の決定に役立つデータを保持している場合があります。これらのデータは、最終宛先に書き込む前に削除する必要がある場合があります。 このメソッドを含めるには、コネクタは次のことを行う必要があります。
|
IO.Read.withCoder IO.Write.withCoder | 強く推奨されません このコネクタの出力/入力 PCollection の要素タイプのエンコード/デコードに使用するコーダーを設定します。一般に、ソースは次のことをお勧めします。
#1 と #2 のどちらも不可能な場合は、`withCoder(...)` メソッドを追加できます。 |
IO.ABC.withEndpoint / with{IO}Client / withClient | コネクタ変換は、自身と通信する外部システムとの間のインターフェースをオーバーライドするメソッドを提供する必要があります。これにより、さまざまな用途が可能になります。 このコネクタの出力/入力 PCollection の要素タイプのエンコード/デコードに使用するコーダーを設定します。一般に、ソースは次のことをお勧めします。
例 |
型
Java構文 | セマンティクス |
---|---|
メソッド IO.Read.expand | Read 変換の expand メソッドは、型を持つ PCollection オブジェクトを返す必要があります。型はパラメータ化されるか、クラスに固定される場合があります。 ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。 |
メソッド IO.Read.expand の PCollection タイプ | PCollection のタイプは、通常、次の 4 つのオプションのいずれかになります。これらの各オプションについて、エンコーディング/データは次のようにすることをお勧めします。
いずれの場合も、ユーザーにコーダーを渡すように求めること (例:`withCoder(...)`) は**推奨されません**。 |
メソッド IO.Write.expand | 書き込み変換の expand メソッドは、PCollectionTuple を拡張する IO.Write.Result 型のオブジェクトを返す必要があります。このオブジェクトを使用すると、変換はその書き込み結果に関するメタデータを返し、この書き込みに他の PTransform を続けることができます。 Write 変換がメタデータを返す必要がない場合でも、Write.Result オブジェクトは**依然として推奨されます**。これは、変換が時間の経過とともにメタデータを進化させることができるためです。 メタデータの例
例 BigQueryIO の WriteResult |
進化
時間の経過とともに、I/O は新しいユースケースに対応したり、内部で新しい API を使用したりするために進化する必要があります。I/O の必要な進化の例を次に示します。
- 新しいデータ型をサポートする必要がある(例:JdbcIO.ReadWithPartitions での任意タイプのパーティショニング)
- 新しいバックエンド API をサポートする必要がある
Java構文 | セマンティクス |
---|---|
トップレベルの静的メソッド | 一般に、既存のメソッド内の構成としてキャプチャできる機能のために、完全に新しい静的メソッドを追加することは避ける必要があります。 構成でサポートできるトップレベルメソッドが多すぎる例は、PubsubIO です。 新しいトップレベルの静的メソッドは、次の場合にのみ追加する必要があります。
|
Python
全般
I/O が Apache Beam に存在する場合、パッケージ **apache_beam.io.{connector}** または **apache_beam.io.{namespace}.{connector}** に配置する必要があります。 例 apache_beam.io.fileio および apache_beam.io.gcp.bigquery |
パイプラインでコネクタを操作する際に使用されるプライマリエントリポイントである {connector}.py という名前のモジュールがあります。**apache_beam.io.{connector}** または **apache_beam.io.{namespace}.{connector}** 例 apache_beam.io.gcp.bigquery / apache_beam/io/gcp/bigquery.py 別の可能なレイアウト:apache_beam/io/gcp/bigquery/bigquery.py (bigquery/__init__.py のパブリッククラスを自動的にインポート) |
コネクタは、メインファイルに `__all__` 属性を定義し、ユーザーがアクセスすることを意図したクラスとメソッドのみをエクスポートする必要があります。 |
I/O 実装が単一のモジュール(単一のファイル)に存在する場合、ファイル {connector}.py がそれを保持できます。 それ以外の場合、コネクタコードは、パブリック API を文書化する __init__.py ファイルを含むディレクトリ(コネクタパッケージ)内に定義する必要があります。 コネクタが実装用のユーティリティを含む他のファイルを定義する場合、これらのファイルはパブリックインターフェースではないことを明確に文書化する必要があります。 |
クラス/メソッド/プロパティ
Python 構文 | セマンティクス |
---|---|
呼び出し可能 ReadFrom{Connector} | これにより、特定のデータソースから読み取るための PTransform にアクセスできます。受信した引数を使用して構成できます。オプションパラメータの長いリストについては、デフォルト値を持つパラメータとして定義できます。 Q. Java はビルダーパターンを使用します。Python ではなぜそれができないのですか?オプションパラメータは Python で同じ役割を果たすことができます。 例 |
呼び出し可能 ReadAllFrom{Connector} | いくつかの異なるソースは、データソースからの読み取りのためのランタイム構成を実装しています。これは、純粋なバッチソースをより高度なストリーミングソースにすることができるため、貴重なパターンです。 可能な限り、このタイプの変換は、構築時設定変換のタイプ richness と安全性を備えている必要があります。
例 |
呼び出し可能 WriteTo{Connector} | これにより、特定のデータシンクに書き込むための PTransform にアクセスできます。受信した引数を使用して構成できます。オプションパラメータの長いリストについては、デフォルト値を持つパラメータとして定義できます。 Q. Java はビルダーパターンを使用します。Python ではなぜそれができないのですか?オプションパラメータは同じ役割を果たすことができます 役割 Python です。 例 |
呼び出し可能 Read/Write | トップレベルの変換イニシャライザ (ReadFromIO/ReadAllFromIO/WriteToIO) は、使用を簡素化し、ユーザーがすぐに使用できるように、可能な限り少ないパラメータを必要とすることを目指す必要があります。 |
パラメータ ReadFrom{Connector}({source}) パラメータ WriteTo{Connector}({sink}) | Read または Write I/O コネクタの最初のパラメータは、リーダーのソースまたはライターの宛先を指定する必要があります。 変換が異なる種類のソース(例:テーブル、クエリ、トピック、パーティション)から読み取ることができる場合、推奨されるアプローチは、優先順位の高い順に次のとおりです。
|
パラメータ WriteToIO(destination={multiple_destinations}) | 書き込み変換により、複数の宛先に書き込むことができます。これは、慎重に実装する必要がある複雑なパターンになる可能性があります(単一のパイプラインで複数の宛先を持つ可能性が高いコネクタに推奨されるパターンです)。 Pythonの推奨APIパターンは、設定が必要なすべてのパラメータに呼び出し可能オブジェクト(例:WriteToBigQuery)を渡すことです。一般的に、呼び出し可能パラメータの例としては、以下が挙げられます。
これらの呼び出し可能オブジェクトを使用すると、メンテナーは、必要に応じて追加の設定パラメータを定義する、新しいパラメータ化可能な呼び出し可能オブジェクトを時間の経過とともに追加できます(既存のユーザーに影響を与えないようにデフォルト値を設定します)。 コーナーケース:これらの呼び出し可能オブジェクトの一部にサイド入力値を渡す必要があることがよくあります。推奨されるパターンは、コンストラクターにこれらのサイド入力値を含めるための追加パラメータを用意することです(例:WriteToBigQueryのtable_side_inputsパラメータ) |
パラメータ ReadFromIO(param={param_val}) パラメータ WriteToIO(param={param_val}) | 追加の設定は、I/Oのコンストラクターにオプションパラメータとして追加できます。可能な限り、必須の追加パラメータは避けるべきです。オプションパラメータには妥当なデフォルト値を設定し、新しいコネクタの選択をできるだけ簡単にする必要があります。 |
パラメータ ReadFromIO(config={config_object}) | 非推奨 Pythonの一部のコネクタは、設定の一部として複雑な設定オブジェクトを受け取る場合があります。コネクタは必要なすべての設定パラメータをトップレベルで保持できるため、このパターンは推奨されません。 複数パラメータの構成オブジェクトが上位変換の適切なパラメータであるかどうかを判断するには、構成オブジェクトは次の条件を満たす必要があります。
|
型
Python 構文 | セマンティクス |
---|---|
ReadFromIO.expandメソッドの出力 | Read変換のexpandメソッドは、型を持つPCollectionオブジェクトを返し、型でアノテーションを付ける必要があります。Pythonで推奨されるPCollection型は、優先順位の高い順に次のとおりです。 単純なPython型(bytes、str、数値) 複雑な型の場合
|
WriteToIO.expandメソッドの出力 | 書き込み変換のexpandメソッドは、固定クラスタイプを持つPythonオブジェクトを返す必要があります。推奨されるクラス名はWriteTo{IO}Resultです。このオブジェクトを使用すると、変換はその書き込み結果に関するメタデータを返すことができます。 Write変換がメタデータを返す必要がない場合でも、クラスタイプを持つPythonオブジェクトの方が依然として望ましいです。これは、変換が時間の経過とともにメタデータを進化させることができるためです。 メタデータの例
例 BigQueryIO の WriteResult |
WriteToIO.expandメソッドの入力 | Write変換のexpandメソッドは、型を持つPCollectionオブジェクトを返し、型でアノテーションを付ける必要があります。Pythonで推奨されるPCollection型は、T1で参照されるReadFromIOの出力型と同じです。 |
GoLang
全般
I/OがApache Beamにある場合、パッケージに配置する必要があります {connector}io 例 avroio および bigqueryio |
統合テストとパフォーマンステストは、I/O自体と同じパッケージに配置する必要があります {connector}io |
Typescript
クラス/メソッド/プロパティ
TypeScript構文 | セマンティクス |
---|---|
function readFromXXX | I/O.Read変換の構築を開始するメソッド。 |
function writeToXXX | I/O.Write変換の構築を開始するメソッド。 |
テスト
I/Oには、単体テスト、統合テスト、およびパフォーマンステストが必要です。以下のガイダンスでは、各テストの種類の目的と、テストカバレッジの基準スタンダードについて説明します。実際のテストケースと実際のテストのビジネスロジックは、各ソース/シンクの specifics によって異なりますが、基準としていくつかの推奨テストケースを含めました。
このガイドは、具体的なテストケースとシナリオを追加することで、Apache Beam I/O変換テストガイドを補完します。Beam I/Oコネクタのテストに関する一般的な情報については、そのガイドを参照してください。
統合テストとパフォーマンステストは、org.apache.beam.sdk.io.{connector}.testingパッケージに配置する必要があります。これにより、さまざまなテストがコネクタの標準のユーザー向けインターフェースで機能します。 単体テストは、コネクタの内部をテストすることが多いため、同じパッケージ(つまり、org.apache.beam.sdk.io.{connector})に配置する必要があります。 |
単体テスト
I/O単体テストでは、コードの機能を効率的にテストする必要があります。単体テストは複数のテストスイート(たとえば、Pythonの各バージョン)で何度も実行されることが想定されているため、これらのテストは比較的速く実行され、副作用があってはなりません。単体テストを通じて100%のコードカバレッジを達成することをお勧めします。
可能な場合は、実行時間が短くリソース使用量が少ないため、統合テストよりも単体テストが優先されます。さらに、単体テストはプリコミットテストスイート(たとえば、Jenkinsのbeam_PreCommit_*テストスイート)に簡単に含めることができるため、早期に回帰を発見できる可能性が高くなります。また、エラー状態には単体テストが推奨されます。
単体テストクラスは、IOと同じパッケージの一部であり、{connector}IOTestという名前である必要があります。 例 sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java |
推奨テストケース
テストする機能 | 説明 | 例 |
---|---|---|
デフォルトオプションでの読み取り | DirectRunnerとデータストアの偽物を使用して、パイプラインをローカルで実行することが望ましいです。ただし、モックを使用してソース変換の単体テストを行うこともできます。 | |
デフォルトオプションでの書き込み | DirectRunnerとデータストアの偽物を使用して、パイプラインをローカルで実行することが望ましいです。ただし、モックを使用してシンク変換の単体テストを行うこともできます。 | |
追加オプションを使用した読み取り | ユーザーが使用できるすべてのオプションについて。 | |
追加オプションを使用した書き込み | ユーザーが使用できるすべてのオプションについて。たとえば、動的デスティネーションへの書き込み。 | |
追加の要素タイプの読み取り | データストアの読み取りスキーマが異なるデータ型をサポートしている場合。 | |
追加の要素タイプの書き込み | データストアの書き込みスキーマが異なるデータ型をサポートしている場合。 | |
データの表示 | ソース/シンクがディスプレイデータを正しく設定するかどうかをテストします。 | AvroIOTest.testReadDisplayData DatastoreV1Test.testReadDisplayData bigquery_test.TestBigQuerySourcetest_table_reference_display_data |
初期分割 | これらのテストには多くのバリエーションがあります。詳細については、例を参照してください。 | |
動的な作業の再調整 | これらのテストには多くのバリエーションがあります。詳細については、例を参照してください。 | BigTableIOTest.testReadingSplitAtFractionExhaustive avroio_test.AvroBase.test_dynamic_work_rebalancing_exhaustive |
スキーマのサポート | PCollection<Row>の読み取りまたはPCollection<Row>の書き込み ソースからのスキーマの取得、およびシンクへのスキーマのプッシュ/検証を確認する必要があります。 | |
検証テスト | ソース/シンク変換が正しく検証されているかどうか、つまり、正しくない/互換性のない構成が実行可能なエラーで拒否されているかどうかをテストします。 | |
メトリクス | さまざまな読み取り/書き込みメトリックが設定されていることを確認します | |
すべてを読む | テストのすべてを読む(PCollection<Read Config>)バージョンが機能するかどうかをテストします | |
シンクバッチングテスト | シンクがパフォーマンス上の理由からバッチングを実行する場合、書き込む前にシンクがデータをバッチ処理することを確認します。 | |
エラー処理 | データストアからのさまざまなエラー(たとえば、HTTPエラーコード)が正しく処理されることを確認します | |
再試行ポリシー | ソース/シンクが期待どおりにリクエストを再試行することを確認します | |
シンクからの出力PCollection | シンクは、後続のステップが依存できるPCollectionを生成する必要があります。 | |
バックログバイトレポート | 無制限のソース変換がバックログバイトを正しく報告することを確認するためのテスト。 | KinesisReaderTest.getSplitBacklogBytesShouldReturnBacklogUnknown |
透かしレポート | 無制限のソース変換が透かしを正しく報告することを確認するためのテスト。 | WatermarkPolicyTest.shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords |
統合テスト
統合テストは、Beamランナーと、特定のI/Oが接続するデータストアとの間のエンドツーエンドのインタラクションをテストします。これらには通常、リモートRPC呼び出しが伴うため、統合テストの実行には時間がかかります。さらに、Beamランナーは、統合テストを実行するときに複数のワーカーを使用する場合があります。これらのコストのため、統合テストは、特定のシナリオを単体テストでカバーできない場合にのみ実装する必要があります。
Beamと外部ストレージシステムとの間のインタラクションを含む統合テストを少なくとも1つ実装することは、提出に必須です。 |
ソースとシンクの両方を含むI/Oコネクタの場合、Beamガイドでは、読み取りと書き込みの両方を同じテストパイプラインでカバーできるように、書き込みと読み取りの形式でテストを実装することを推奨しています。 |
統合テストクラスは、I/Oと同じパッケージの一部であり、**{connector}IOIT**という名前である必要があります。 例えば sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java |
推奨テストケース
テストの種類 | 説明 | 例 |
---|---|---|
Dataflowを使用した「書き込みと読み取り」テスト | 生成されたデータをデータストアに書き込み、Dataflowを使用してデータストアから同じデータを読み取ります。 | |
Dataflowを使用した「すべて書き込んでから読む」テスト | 「書き込み後読み取り」と同じですが、ソース設定のPCollectionの読み取りをサポートするソースの場合です。将来の(SDF)ソースはすべてこれをサポートすることが期待されています。 同じ変換が「読み取り」と「すべて読み取り」の形式で使用される場合、または2つの変換が本質的に同じである場合(たとえば、読み取り変換がすべて読み取りの単純なラッパーである場合、またはその逆の場合)、単一の「すべて読み取り」テストを追加するだけで十分です。 | |
Dataflowを使用した非有界書き込み後読み取り | データを継続的に書き込み、読み取るパイプライン。このようなパイプラインは、結果を検証するためにキャンセルする必要があります。これは、非有界読み取りをサポートするコネクタの場合のみです。 |
パフォーマンステスト
パフォーマンステストフレームワークはまだ流動的であるため、パフォーマンステストは実際のI/Oコードの後に提出することができます。
パフォーマンステストフレームワークは、まだGoLangまたはTypescriptをサポートしていません。
パフォーマンスベンチマークは、I/Oのベストプラクティスの重要な部分であり、いくつかの領域に効果的に対処します
- 特定のI/Oまたはデータフローテンプレートのコストとパフォーマンスが顧客のビジネス要件を満たしているかどうかを評価するため。
- コード変更間のI/Oまたはデータフローテンプレートのパフォーマンスの低下と改善を説明するため。
- エンドユーザーがSLOを満たすためのコストを見積もり、容量を計画できるようにするため。
ダッシュボード
Googleは、組み込みI/Oのパフォーマンステストを定期的に実行し、Java と Python の外部から閲覧可能なダッシュボードに公開しています。
ガイダンス
可能な場合は、統合テストとパフォーマンステストに同じテストを使用してください。パフォーマンステストは通常、統合テストと同じですが、より大量のデータが関係します。テストフレームワーク(内部および外部)は、これらのテストに関連するパフォーマンスベンチマークを追跡し、異常を検出するためのダッシュボード/ツールを提供する機能を提供します。 |
ページの組み込みI/Oコネクタガイド ドキュメントにリソーススケーラビリティセクションを含め、IOが統合テストを行っている上限を示します。 例えば KafkaIOがxxxxトピックで統合テストを実施していることを示します。ドキュメントでは、コネクタの作成者がコネクタが統合テストの数を超えて拡張できると考えているかどうかを記載できますが、これにより、テスト済みのパスの制限がユーザーに明確になります。 ドキュメントには、制限に使用された構成が明確に示されている必要があります。たとえば、ランナーxと構成オプションaを使用します。 |
I/Oが収集するパフォーマンス/内部メトリックを、それらが何を意味するのか、どのように使用できるのかを含めて文書化します(一部のコネクタは、レイテンシ/バンドルサイズ/etcなどのパフォーマンスメトリックを収集して公開します)。 |
コネクタに実装されているパフォーマンステストに基づいて、I/Oの予想されるパフォーマンス特性を含めます。 |
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!