I/O 標準

概要

このApache Beam I/O標準ドキュメントでは、Apache Beam I/Oコネクタを開発するファーストパーティ/サードパーティ開発者向けの規範的なガイダンスを示します。これらのガイドラインは、ドキュメント、開発、テストをシンプルかつ簡潔な方法で網羅するベストプラクティスを作成することを目的としています。

ビルトインI/Oコネクタとは何ですか?

Apache Beam GithubリポジトリにあるI/Oコネクタ(I/O)は、**ビルトインI/Oコネクタ**と呼ばれます。ビルトインI/Oは、Google Cloud DataflowチームがDataflowランナーを使用して定期的に統合テストとパフォーマンステストを実行し、参照用にメトリクスを公開しています。それ以外の場合、明示的に記載がない限り、以下のガイドラインは両方に適用されます。

ガイダンス

ドキュメント

このセクションでは、I/Oで利用可能になることが期待されるすべてのドキュメントのスーパーセットについて説明します。このセクション全体で参照されるApache Beamドキュメントは、こちらにあります。また、一般的に良い例は、ビルトインI/OであるSnowflake I/Oです。

ビルトインI/O

I/Oの関連言語のコードドキュメントを提供します。これには、Apache Beamサイト内または外部ドキュメントの場所にある情報源へのリンクも含まれている必要があります。

**I/Oコネクタガイド**に、特定のヒントと設定を説明する新しいページを追加します。以下は、ParquetHadoopなどの例です。

I/O connector guides screenshot

Javadoc/Pythondocのセクションヘッダーのフォーマットは、他のページのプログラムによる情報抽出を将来有効にできるよう、全体を通して一貫している必要があります。

ページに含めるセクションの**サブセット**の例(順序どおり)

  1. 始める前に
  2. {コネクタ}IOの基本
  3. サポートされている機能
    1. リレーショナル
  4. 認証
  5. {コネクタ}からの読み取り
  6. {コネクタ}への書き込み
  7. リソースのスケーラビリティ
  8. 制限事項
  9. 問題の報告

KafkaIO JavaDoc

I/Oコネクタには、**サポートされている機能**サブヘッダーの下に、使用されているリレーショナル機能を示す表を含める必要があります。

リレーショナル機能は、効率を向上させるのに役立つ概念であり、I/Oコネクタによってオプションで実装できます。エンドユーザーが提供するパイプライン設定(SchemaIO)とユーザー問合せ(FieldAccessDescriptor)データを使用して、リレーショナル理論を適用して、パイプライン実行の高速化、運用コストの削減、データの読み取り/書き込みの削減などの改善を導き出します。

テーブルの例

I/O connector guides screenshot

<div class="table-container-wrapper">
<table class="table table-bordered table-io-standards-relational-features">
   <tr>
      <th>
         <p><strong>Relational Feature</strong>
      </th>
      <th>
         <p><strong>Supported</strong>
      </th>
      <th>
         <p><strong>Notes</strong>
      </th>
   </tr>
   <tr>
      <td>
         <p>Column Pruning
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Filter Pushdown
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Table Statistics
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Partition Metadata
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Metastore
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
</table>
</div>

実装例

BigQueryIO 列のプルーニング(ProjectionPushdownによる)により、エンドユーザーの問合せで示された必要な列のみを返します。これは、BigQuery DirectRead APIを使用して実現されます。

必要に応じて、**一般的なパイプラインパターン**の下に、I/Oに関連する一般的な使用パターンを概説するページを追加します。

https://beam.dokyumento.jp/documentation/patterns/bigqueryio/

**I/Oコネクタ**をI/Oの情報で更新します。

https://beam.dokyumento.jp/documentation/io/connectors/#built-in-io-connectors

alt_text

**始める前に**ヘッダーの下に、I/Oを使用するためのセットアップ手順を提供します。

https://beam.dokyumento.jp/documentation/io/built-in/parquet/#before-you-start

サポートされている各言語の最初の説明の後に、標準的な読み取り/書き込みコードスニペットを含めます。以下の例は、Javaの例を含むHadoopを示しています。

https://beam.dokyumento.jp/documentation/io/built-in/hadoop/#reading-using-hadoopformation

要素のタイムスタンプがどのように割り当てられるかを示します。これには、`current_time()`よりも有用な情報を提供できる将来のI/Oを許可するためのバッチソースが含まれます。

タイムスタンプがどのように進むかを示します。バッチソースの場合、ほとんどの場合、これはn/aとマークされます。

コネクタが作成する一時リソース(ファイルなど)の概要を示します。

BigQueryバッチロードは、最初に一時GCSロケーションを作成します。

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L455

**認証**サブヘッダーの下に、ソース/シンクに安全にアクセスするためのパートナー認証マテリアルを取得する方法を提供します。

https://beam.dokyumento.jp/documentation/io/built-in/snowflake/#authentication

ここでは、BigQueryはそれを権限と呼んでいますが、トピックは類似点をカバーしています。

https://beam.dokyumento.jp/releases/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

I/Oは、**始める前に**ヘッダー内にソース/シンクドキュメントへのリンクを提供する必要があります。

https://beam.dokyumento.jp/documentation/io/built-in/snowflake/

各言語におけるネイティブまたはX言語サポートの有無を、ドキュメントへのリンクと共に示してください。

Kinesis I/Oは、Javaのネイティブ実装とPythonのX言語サポートを備えていますが、Golangのサポートはありません。

既知の制限事項は、制限事項ヘッダーの下に示してください。制限事項に追跡 issue がある場合は、インラインでリンクしてください。

https://beam.dokyumento.jp/documentation/io/built-in/snowflake/#limitations

I/O(ビルトインではない)

カスタムI/Oは、Apache Beam Githubリポジトリには含まれていません。例としては、SolaceIOなどがあります。

Apache Beamのその他のI/Oコネクタの表を、あなたの情報で更新してください。

前述の表

## 開発

このセクションでは、新規および既存のApache Beam I/Oコネクタで採用すべき機能のAPI構文、セマンティクス、および推奨事項について概説します。

I/Oコネクタ開発ガイドラインは、以下の原則を念頭に置いて作成されています。

すべてのSDK

パイプライン設定/実行/ストリーミング/ウィンドウイングセマンティクスガイドライン

トピック

セマンティクス

パイプラインオプション

I/Oは、内部パラメータを調整するためにPipelineOptionsサブクラスに依存することはほとんどありません。

必要な場合は、コネクタ関連のパイプラインオプションクラスは、

  • 各オプションについて、その効果と変更する理由を明確に文書化する必要があります。
  • 競合を避けるために、オプション名には名前空間が必要です。
  • クラス名: {Connector}Options
  • メソッド名: set{Connector}{Option}get{Connector}{Option}

ソースウィンドウイング

ソースは、APIでユーザーが明示的にパラメータ化しない限り、GlobalWindowで要素を返す必要があります。

許可される非グローバルウィンドウパターン

  • ReadFromIO(window_by=...)
  • ReadFromIO.IntoFixedWindows(...)
  • ReadFromIO(apply_windowing=True/False) (例: PeriodicImpulse)
  • IO.read().withWindowing(...)
  • IO.read().windowBy(...)
  • IO.read().withFixedWindows(...)

シンクウィンドウイング

シンクは、明示的にパラメータ化またはAPIで表現されていない限り、Windowに依存せず、任意のWindowingメソッドで送信された要素を処理する必要があります。

シンクは、内部的にPCollectionのウィンドウイングを任意の方法で変更できます。ただし、結果オブジェクトの一部として返すメタデータは、

  • APIで明示的に宣言されていない限り、入力と同じウィンドウにある必要があります。
  • 正確なタイムスタンプが必要です。
  • ウィンドウイングに関する追加情報を含めることができます(例:BigQueryジョブはタイムスタンプを持つ場合がありますが、関連付けられたウィンドウも持つ場合があります)。

許可される非グローバルウィンドウパターン

  • WriteToIO(triggering_frequency=...) - 例: WriteToBigQuery (これは変換内のウィンドウイングのみを設定します - 入力データは引き続きグローバルウィンドウにあります)。
  • WriteBatchesToIO(...)
  • WriteWindowsToIO(...)

スロットリング

ストリーミングシンク(または外部サービスにアクセスする変換)は、外部サービスの過負荷を防ぐために、リクエストのスロットリングを実装する場合があります。

TODO: Beamはスロットリングユーティリティを公開する必要があります (追跡 issue)

  • キーごとの固定スロットリング
  • シンク報告のバックプレッシャーによる適応スロットリング
  • 開始点からのランプアップスロットリング

エラー処理

TODO: 追跡 issue

Java

全般

コネクタの操作に使用されるプライマリクラスの名前は、{connector}IOにする必要があります。

BigQuery I/Oは、org.apache.beam.sdk.io.bigquery.BigQueryIOです。

クラスは、パッケージorg.apache.beam.sdk.io.{connector}に配置する必要があります。

BigQueryIOは、Javaパッケージorg.apache.beam.sdk.io.bigqueryに属します。

単体/統合/パフォーマンステストは、パッケージorg.apache.beam.sdk.io.{connector}.testingの下に配置する必要があります。これにより、さまざまなテストがコネクタの標準のユーザー向けインターフェースで機能します。

単体テストは、コネクタの内部をテストすることが多いため、同じパッケージ(つまり、org.apache.beam.sdk.io.{connector})に存在する必要があります。

BigQueryIOは、Javaパッケージorg.apache.beam.sdk.io.bigqueryに属します。

I/O変換は、ユーザ型からコネクタ固有型に要素をマッピングするために、ユーザーラムダを受信することを避ける必要があります。代わりに、コネクタ固有のデータ型(可能な場合はスキーマ情報を含む)とインターフェースする必要があります。

必要な場合、I/O変換は、変換の入力型(シンクの場合)または出力型(ソースの場合)を指定する型パラメータを受け取る必要があります。

I/O変換は、出力型が変更されないことが確実な場合にのみ、型パラメータを持たない場合があります(例:FileIO.MatchAllおよびその他のFileIO変換)。

以下の理由により、I/OコネクタのパブリックAPI部分でサードパーティライブラリを直接公開することは、強くお勧めしません。

  • Apache Beamの互換性保証が低下します - サードパーティライブラリの変更により、既存のユーザーのパイプラインが直接破損する可能性があります。
  • コードの保守が困難になります - ライブラリがAPIレベルで直接公開されている場合、依存関係の変更には、I/O実装コード全体で複数の変更が必要になります。
  • エンドユーザーにサードパーティの依存関係を強制します。

代わりに、Beamネイティブインターフェースと、マッピングロジックを保持するアダプターを公開することを強くお勧めします。

問題のライブラリが非常に静的な性質であると考える場合は、I/O自体にその旨を記してください。

ソースとシンクは、PTransformラッパーで抽象化し、内部クラスはprotectedまたはprivateとして宣言する必要があります。これにより、依存関係による実装を壊すことなく、実装の詳細を追加/変更/修正できます。

クラス/メソッド/プロパティ

Java構文

セマンティクス

class IO.Read

I/O内の読み取りを表すクラスへのアクセスを提供します。Readクラスは、fluentbuilderパターン(例:withX(...).withY(...))に似たfluentインターフェースを実装する必要があります。デフォルト値と併せて、builderパターンよりも少し冗長性の低い、fail-fast(各.withX()後に即時検証フィードバックが得られる)を提供します。

ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。

class IO.ReadAll

いくつかの異なるソースは、データソースからの読み取りのためのランタイム構成を実装しています。これは、純粋なバッチソースをより高度なストリーミングソースにすることができるため、貴重なパターンです。

可能な限り、このタイプの変換は、構築時設定変換のタイプの豊富さを備えている必要があります。

  • 構築時に既知のスキーマを持つBeam Row出力をサポートします。
  • この場合、追加の構成が必要になる(そして許容される)場合があります(例:SchemaProviderパラメータ、Schemaパラメータ、Schema Catalog、またはその種のユーティリティ)。
  • 入力PCollectionは、スキーマを持つ固定型である必要があるため、ユーザーが簡単に操作できます。

JdbcIO.ReadAllParquetIO.ReadFiles

class IO.Write

I/O内の書き込みを表すクラスへのアクセスを提供します。Writeクラスは、IO.Readについて上記で説明したように、fluentインターフェースパターン(例:withX(...).withY(...))を実装する必要があります。

ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。

その他の変換クラス

一部のデータストレージおよび外部システムは、読み取りまたは書き込みのセマンティクスに簡単に調整できないAPIを実装しています(例:FhirIOは、Fhirにデータを取得または送信するいくつかの異なる変換を実装しています)。

これらのクラスは、読み取り、書き込み、およびReadAll変換の追加構成の一部として機能をカプセル化することが不可能または非常に困難な場合にのみ追加する必要があります。これは、ユーザーの認知負荷を増大させることを避けるためです。

ユーザーはこのクラスを直接作成するべきではありません。トップレベルの静的メソッドによって作成する必要があります。

ユーティリティクラス

一部のコネクタは、構成パラメータを設定するために、他のユーザー向けクラスに依存しています。

(例:JdbcIO.DataSourceConfiguration)。これらのクラスは、{Connector}IOクラス内にネストする必要があります

この形式により、メインのJavadocで表示され、ユーザーが簡単に見つけることができます。

メソッドIO<T>.write()

トップレベルのI/Oクラスは、I/O.Write変換の構築を開始するための**静的メソッド**を提供します。これは、単一の入力PCollectionとWrite.Result出力を持つPTransformを返します。

このメソッドは、その名前で以下を指定するべきではありません。

  • 内部データ形式
  • データの書き込みに使用される戦略
  • 入力または出力データ型

上記は、可能であれば構成パラメータを介して指定する必要があります。**不可能な場合**は、**新しい静的メソッド**を導入できますが、これは**例外的な場合でなければなりません**。

メソッドIO<T>.read()

I/O.Read変換の構築を開始するためのメソッド。これは、単一の出力PCollectionを持つPTransformを返します。

このメソッドは、その名前で以下を指定するべきではありません。

  • 内部データ形式
  • データの読み取りに使用される戦略
  • 出力データ型

上記は、可能であれば構成パラメータを介して指定する必要があります。**不可能な場合**は、**新しい静的メソッド**を導入できますが、これは**例外的な場合でなければならず、APIの一部としてI/Oヘッダーに文書化する必要があります**。

初期静的コンストラクタメソッドは、これらが少なく一般的である場合、または変換の構成に必要な場合(例:FhirIO.exportResourcesToGcsJdbcIO.ReadWithPartitionsは初期構成にTypeDescriptorが必要です)、パラメータを受け取ることができます。

IO.Read.from(source)

Read変換は、ユーザーが読み取り元を指定できる**from**メソッドを提供する必要があります。変換が異なる*種類の*ソース(例:テーブル、クエリ、トピック、パーティション)から読み取ることができる場合、これを収容するために、このfromメソッドの複数の実装を提供できます。

  • IO.Read from(Query query)
  • IO.Read from(Table table) / from(String table)
  • IO.Read from (Topic topic)
  • IO.Read from(Partition partition)

これらのメソッドの入力型は、外部ソースのAPIを反映できます(例:Kafka TopicPartitionは、**Beam実装の**TopicPartitionオブジェクトを使用する必要があります)。

場合によっては、同じ入力型を使用する複数の**from**ロケーションが存在する可能性があり、メソッドのオーバーロードを利用できないことを意味します。これを念頭に置いて、この状況を有効にするには、新しいメソッドを使用してください。

  • IO.Read from(String table)
  • IO.Read fromQuery(String query)

IO.Read.fromABC(String abc)

メソッドのオーバーロードが可能な場合は、このパターンはお勧めしません。**Read.from(source)**のガイダンスに従ってください。

IO.Write.to(destination)

Write変換は、ユーザーがデータの書き込み先を指定できる**to**メソッドを提供する必要があります。変換が同じ入力要素型(例:テーブル、クエリ、トピック、パーティション)を使用しながら異なる*種類の*ソースに書き込むことができる場合、これを収容するために、このfromメソッドの複数の実装を提供できます。

  • IO.Write to(Query query)
  • IO.Write to(Table table) / from(String table)
  • IO.Write to(Topic topic)
  • IO.Write to(Partition partition)

これらのメソッドの入力型は、外部シンクのAPIを反映できます(例:Kafka TopicPartitionは、**Beam実装の**TopicPartitionオブジェクトを使用する必要があります)。

異なる種類の宛先で異なる種類の入力オブジェクト型が必要な場合は、これらを別々のI/Oコネクタで行う必要があります。

場合によっては、同じ入力型を使用する複数の**from**ロケーションが存在する可能性があり、メソッドのオーバーロードを利用できないことを意味します。これを念頭に置いて、この状況を有効にするには、新しいメソッドを使用してください。

  • IO.Write to(String table)
  • IO.Write toTable(String table)

IO.Write.to(DynamicDestination destination)

書き込み変換により、複数の宛先に書き込むことができます。これは、慎重に実装する必要がある複雑なパターンになる可能性があります(単一のパイプラインで複数の宛先を持つ可能性が高いコネクタに推奨されるパターンです)。

このための推奨されるパターンは、ユーザーが宛先の構成に必要なすべてのパラメータを定義できるようにする DynamicDestinations インターフェース(例:BigQueryIO.DynamicDestinations)を定義することです。

DynamicDestinations インターフェースを使用すると、メンテナーは時間の経過とともに新しいメソッドを追加できます(既存のユーザーに影響を与えないように**デフォルト実装**を使用)。必要に応じて、追加の構成パラメータを定義します。

IO.Write.toABC(destination)

メソッドのオーバーロードが可能な場合は、このパターンは推奨されません。**Write.to(destination)** のガイダンスに従ってください。

class IO.Read.withX

IO.Write.withX

withX は、Read メソッドに構成を渡すためのメソッドを提供します。ここで、X は作成される構成を表します。汎用 with ステートメント(以下に定義)を除いて、I/O は、構成オプションの名前をソースのオプション名と一致させるように試みる必要があります。

これらのメソッドは、既存のインスタンスを変更するのではなく、I/O の新しいインスタンスを返す必要があります。

TextIO.Read.withCompression

IO.Read.withConfigObject

IO.Write.withConfigObject

Java の一部のコネクタは、構成の一部として構成オブジェクトを受け取ります。**このパターンは、特定のケースのみに推奨されます**。ほとんどの場合、コネクタは必要なすべての構成パラメータをトップレベルで保持できます。

複数パラメータの構成オブジェクトが上位変換の適切なパラメータであるかどうかを判断するには、構成オブジェクトは次の条件を満たす必要があります。

  • 外部データストアの接続/認証パラメータに関連するプロパティのみを保持します(例:JdbcIO.DataSourceConfiguration)。
    • 一般的に、**シークレットはパラメータとして渡すべきではありません**。ただし、代替手段がない場合は除きます。シークレット管理には、シークレット管理サービスまたは KMS を使用することをお勧めします。
  • **または**、Beam API で外部 API を公開することなく、外部データソースの API 特性を反映します(例:KafkaIO.Read.withConsumerConfigUpdates)。
    • メソッドは、API オブジェクトの名前を反映する必要があります(例:SubscriptionStatConfig オブジェクトが指定されている場合、メソッドは withSubscriptionStatConfig になります)。
  • **または**、コネクタが異なる構成「パス」をサポートできる場合、特定のプロパティを指定するには他のプロパティを指定する必要があります(例:BigQueryIO のメソッドにはさまざまなプロパティが含まれます)。(最後の例を参照)。

JdbcIO.DataSourceConfiguration, SpannerConfig, KafkaIO.Read.withConsumerConfigUpdates

BigQueryIO.write()
  .withWriteConfig(FileLoadsConfig.withAvro()
                                 .withTriggeringFrequency()...)

BigQueryIO.write()
  .withWriteConfig(StreamingInsertsConfig.withDetailedError()
                                  .withExactlyOnce().etc..)

class IO.Write.withFormatFunction

推奨されません - 動的宛先を除く

Beam Row 型の PCollection を受信できるソースの場合、フォーマット関数は必要ありません。Beam はスキーマに基づいて入力データをフォーマットできる必要があるためです。

動的宛先機能を提供するシンクの場合、要素は宛先の決定に役立つデータを保持している場合があります。これらのデータは、最終宛先に書き込む前に削除する必要がある場合があります。

このメソッドを含めるには、コネクタは次のことを行う必要があります。

  • データマッチングを自動的に実行できないことを示します。
  • 動的宛先をサポートし、その理由により入力データの変更が必要になります。

IO.Read.withCoder

IO.Write.withCoder

強く推奨されません

このコネクタの出力/入力 PCollection の要素タイプのエンコード/デコードに使用するコーダーを設定します。一般に、ソースは次のことをお勧めします。

  1. 自動的に推測されるスキーマを持つ Row オブジェクトを返します。
  2. 固定の出力/入力タイプを持つことで、または出力/入力タイプを推測することで、必要なコーダーを自動的に設定します。

#1 と #2 のどちらも不可能な場合は、`withCoder(...)` メソッドを追加できます。

IO.ABC.withEndpoint / with{IO}Client / withClient

コネクタ変換は、自身と通信する外部システムとの間のインターフェースをオーバーライドするメソッドを提供する必要があります。これにより、さまざまな用途が可能になります。

このコネクタの出力/入力 PCollection の要素タイプのエンコード/デコードに使用するコーダーを設定します。一般に、ソースは次のことをお勧めします。

  • 宛先サービスをモックすることによるローカルテスト
  • ユーザーが有効にしたメトリクス、監視、およびクライアントでのセキュリティ処理。
  • エミュレータに基づく統合テスト

BigQueryIO.Write.withTestServices(BigQueryServices)

Java構文

セマンティクス

メソッド IO.Read.expand

Read 変換の expand メソッドは、型を持つ PCollection オブジェクトを返す必要があります。型はパラメータ化されるか、クラスに固定される場合があります。

ユーザーはこのクラスを直接作成するべきではありません。トップレベルのユーティリティメソッドによって作成する必要があります。

メソッド IO.Read.expand の PCollection タイプ

PCollection のタイプは、通常、次の 4 つのオプションのいずれかになります。これらの各オプションについて、エンコーディング/データは次のようにすることをお勧めします。

  • 定義済みの基本的な Java タイプ (例:String)
    • このエンコーディングはシンプルで、シンプルな Beam コーダー (例:Utf8StringCoder) を使用する必要があります。
  • スキーマを持つプリセット POJO タイプ (例:Metadata)
    • これらに推奨される戦略は、出力タイプを `@AutoValue`、`@DefaultSchema`、および `@SchemaCreate` アノテーションを付けたものとして定義することです。これにより、RowCoder を使用したコンパクトで高速なエンコーディングが保証されます。
  • 特定のスキーマを持つ Beam Row
  • 構築時にスキーマがわからないタイプ

いずれの場合も、ユーザーにコーダーを渡すように求めること (例:`withCoder(...)`) は**推奨されません**。

メソッド IO.Write.expand

書き込み変換の expand メソッドは、PCollectionTuple を拡張する IO.Write.Result 型のオブジェクトを返す必要があります。このオブジェクトを使用すると、変換はその書き込み結果に関するメタデータを返し、この書き込みに他の PTransform を続けることができます。

Write 変換がメタデータを返す必要がない場合でも、Write.Result オブジェクトは**依然として推奨されます**。これは、変換が時間の経過とともにメタデータを進化させることができるためです。

メタデータの例

  • 失敗した要素とエラー
  • 正常に書き込まれた要素
  • 変換によって発行された呼び出しからの API トークン

BigQueryIO の WriteResult

進化

時間の経過とともに、I/O は新しいユースケースに対応したり、内部で新しい API を使用したりするために進化する必要があります。I/O の必要な進化の例を次に示します。

Java構文

セマンティクス

トップレベルの静的メソッド

一般に、既存のメソッド内の構成としてキャプチャできる機能のために、完全に新しい静的メソッドを追加することは避ける必要があります。

構成でサポートできるトップレベルメソッドが多すぎる例は、PubsubIO です。

新しいトップレベルの静的メソッドは、次の場合にのみ追加する必要があります。

Python

全般

I/O が Apache Beam に存在する場合、パッケージ **apache_beam.io.{connector}** または **apache_beam.io.{namespace}.{connector}** に配置する必要があります。

apache_beam.io.fileio および apache_beam.io.gcp.bigquery

パイプラインでコネクタを操作する際に使用されるプライマリエントリポイントである {connector}.py という名前のモジュールがあります。**apache_beam.io.{connector}** または **apache_beam.io.{namespace}.{connector}**

apache_beam.io.gcp.bigquery / apache_beam/io/gcp/bigquery.py

別の可能なレイアウト:apache_beam/io/gcp/bigquery/bigquery.py (bigquery/__init__.py のパブリッククラスを自動的にインポート)

コネクタは、メインファイルに `__all__` 属性を定義し、ユーザーがアクセスすることを意図したクラスとメソッドのみをエクスポートする必要があります。

I/O 実装が単一のモジュール(単一のファイル)に存在する場合、ファイル {connector}.py がそれを保持できます。

それ以外の場合、コネクタコードは、パブリック API を文書化する __init__.py ファイルを含むディレクトリ(コネクタパッケージ)内に定義する必要があります。

コネクタが実装用のユーティリティを含む他のファイルを定義する場合、これらのファイルはパブリックインターフェースではないことを明確に文書化する必要があります。

クラス/メソッド/プロパティ

Python 構文

セマンティクス

呼び出し可能 ReadFrom{Connector}

これにより、特定のデータソースから読み取るための PTransform にアクセスできます。受信した引数を使用して構成できます。オプションパラメータの長いリストについては、デフォルト値を持つパラメータとして定義できます。

Q. Java はビルダーパターンを使用します。Python ではなぜそれができないのですか?オプションパラメータは Python で同じ役割を果たすことができます

apache_beam.io.gcp.bigquery.ReadFromBigQuery

呼び出し可能 ReadAllFrom{Connector}

いくつかの異なるソースは、データソースからの読み取りのためのランタイム構成を実装しています。これは、純粋なバッチソースをより高度なストリーミングソースにすることができるため、貴重なパターンです。

可能な限り、このタイプの変換は、構築時設定変換のタイプ richness と安全性を備えている必要があります。

  • 構築時に既知のスキーマを持つ出力をサポートします。
    • この場合、追加の構成が必要になる(そして許容される)場合があります(例:SchemaProviderパラメータ、Schemaパラメータ、Schema Catalog、またはその種のユーティリティ)。
  • 入力PCollectionは、スキーマを持つ固定型である必要があるため、ユーザーが簡単に操作できます。

ReadAllFromBigQuery

呼び出し可能 WriteTo{Connector}

これにより、特定のデータシンクに書き込むための PTransform にアクセスできます。受信した引数を使用して構成できます。オプションパラメータの長いリストについては、デフォルト値を持つパラメータとして定義できます。

Q. Java はビルダーパターンを使用します。Python ではなぜそれができないのですか?オプションパラメータは同じ役割を果たすことができます

役割 Python です。

apache_beam.io.gcp.bigquery.WriteToBigQuery

呼び出し可能 Read/Write

トップレベルの変換イニシャライザ (ReadFromIO/ReadAllFromIO/WriteToIO) は、使用を簡素化し、ユーザーがすぐに使用できるように、可能な限り少ないパラメータを必要とすることを目指す必要があります。

パラメータ ReadFrom{Connector}({source})

パラメータ WriteTo{Connector}({sink})

Read または Write I/O コネクタの最初のパラメータは、リーダーのソースまたはライターの宛先を指定する必要があります。

変換が異なる種類のソース(例:テーブル、クエリ、トピック、パーティション)から読み取ることができる場合、推奨されるアプローチは、優先順位の高い順に次のとおりです。

  1. 引数を1つだけ保持し、ソース/シンクの種類を自動推論します(例:テーブルとクエリをサポートするpandas.read_sql(...)
  2. 考えられるソース/シンクの種類ごとに新しい引数を追加します(例:ReadFromBigQueryにクエリ/テーブルパラメータを持たせる)

パラメータ WriteToIO(destination={multiple_destinations})

書き込み変換により、複数の宛先に書き込むことができます。これは、慎重に実装する必要がある複雑なパターンになる可能性があります(単一のパイプラインで複数の宛先を持つ可能性が高いコネクタに推奨されるパターンです)。

Pythonの推奨APIパターンは、設定が必要なすべてのパラメータに呼び出し可能オブジェクト(例:WriteToBigQuery)を渡すことです。一般的に、呼び出し可能パラメータの例としては、以下が挙げられます。

  • デスティネーション呼び出し可能オブジェクト → 要素を受け取り、その要素のデスティネーションを返します
  • その他の例
    • スキーマ呼び出し可能オブジェクト → デスティネーションを受け取り、そのデスティネーションのスキーマを返します
    • フォーマット関数 → レコード(および場合によってはデスティネーション)を受け取り、挿入されるレコードをフォーマットします。

これらの呼び出し可能オブジェクトを使用すると、メンテナーは、必要に応じて追加の設定パラメータを定義する、新しいパラメータ化可能な呼び出し可能オブジェクトを時間の経過とともに追加できます(既存のユーザーに影響を与えないようにデフォルト値を設定します)。

コーナーケース:これらの呼び出し可能オブジェクトの一部にサイド入力値を渡す必要があることがよくあります。推奨されるパターンは、コンストラクターにこれらのサイド入力値を含めるための追加パラメータを用意することです(例:WriteToBigQueryのtable_side_inputsパラメータ

パラメータ ReadFromIO(param={param_val})

パラメータ WriteToIO(param={param_val})

追加の設定は、I/Oのコンストラクターにオプションパラメータとして追加できます。可能な限り、必須の追加パラメータは避けるべきです。オプションパラメータには妥当なデフォルト値を設定し、新しいコネクタの選択をできるだけ簡単にする必要があります。

パラメータ ReadFromIO(config={config_object})

非推奨

Pythonの一部のコネクタは、設定の一部として複雑な設定オブジェクトを受け取る場合があります。コネクタは必要なすべての設定パラメータをトップレベルで保持できるため、このパターンは推奨されません

複数パラメータの構成オブジェクトが上位変換の適切なパラメータであるかどうかを判断するには、構成オブジェクトは次の条件を満たす必要があります。

Python 構文

セマンティクス

ReadFromIO.expandメソッドの出力

Read変換のexpandメソッドは、型を持つPCollectionオブジェクトを返し、型でアノテーションを付ける必要があります。Pythonで推奨されるPCollection型は、優先順位の高い順に次のとおりです。

単純なPython型(bytes、str、数値)

複雑な型の場合

  1. スキーマが設定されたNamedTupleまたはDataClassで、RowCoderでエンコードされます
  2. Python辞書
    1. 辞書は、可能な場合はRowCoderでエンコードする必要があります。
  3. スキーマが不可能な場合は、プリセットPythonクラス

WriteToIO.expandメソッドの出力

書き込み変換のexpandメソッドは、固定クラスタイプを持つPythonオブジェクトを返す必要があります。推奨されるクラス名はWriteTo{IO}Resultです。このオブジェクトを使用すると、変換はその書き込み結果に関するメタデータを返すことができます。

Write変換がメタデータを返す必要がない場合でも、クラスタイプを持つPythonオブジェクトの方が依然として望ましいです。これは、変換が時間の経過とともにメタデータを進化させることができるためです。

メタデータの例

  • 失敗した要素とエラー
  • 正常に書き込まれた要素
  • 変換によって発行された呼び出しからの API トークン

BigQueryIO の WriteResult

動機付けとなる例(悪いパターン):WriteToBigQueryの一貫性のない辞書結果[1][2]

WriteToIO.expandメソッドの入力

Write変換のexpandメソッドは、型を持つPCollectionオブジェクトを返し、型でアノテーションを付ける必要があります。Pythonで推奨されるPCollection型は、T1で参照されるReadFromIOの出力型と同じです。

GoLang

全般

I/OがApache Beamにある場合、パッケージに配置する必要があります

{connector}io

avroio および bigqueryio

統合テストとパフォーマンステストは、I/O自体と同じパッケージに配置する必要があります

{connector}io

Typescript

クラス/メソッド/プロパティ

TypeScript構文

セマンティクス

function readFromXXX

I/O.Read変換の構築を開始するメソッド。

function writeToXXX

I/O.Write変換の構築を開始するメソッド。

テスト

I/Oには、単体テスト、統合テスト、およびパフォーマンステストが必要です。以下のガイダンスでは、各テストの種類の目的と、テストカバレッジの基準スタンダードについて説明します。実際のテストケースと実際のテストのビジネスロジックは、各ソース/シンクの specifics によって異なりますが、基準としていくつかの推奨テストケースを含めました。

このガイドは、具体的なテストケースとシナリオを追加することで、Apache Beam I/O変換テストガイドを補完します。Beam I/Oコネクタのテストに関する一般的な情報については、そのガイドを参照してください。

統合テストとパフォーマンステストは、org.apache.beam.sdk.io.{connector}.testingパッケージに配置する必要があります。これにより、さまざまなテストがコネクタの標準のユーザー向けインターフェースで機能します。

単体テストは、コネクタの内部をテストすることが多いため、同じパッケージ(つまり、org.apache.beam.sdk.io.{connector})に配置する必要があります。

単体テスト

I/O単体テストでは、コードの機能を効率的にテストする必要があります。単体テストは複数のテストスイート(たとえば、Pythonの各バージョン)で何度も実行されることが想定されているため、これらのテストは比較的速く実行され、副作用があってはなりません。単体テストを通じて100%のコードカバレッジを達成することをお勧めします。

可能な場合は、実行時間が短くリソース使用量が少ないため、統合テストよりも単体テストが優先されます。さらに、単体テストはプリコミットテストスイート(たとえば、Jenkinsのbeam_PreCommit_*テストスイート)に簡単に含めることができるため、早期に回帰を発見できる可能性が高くなります。また、エラー状態には単体テストが推奨されます。

単体テストクラスは、IOと同じパッケージの一部であり、{connector}IOTestという名前である必要があります。

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java

推奨テストケース

テストする機能

説明

デフォルトオプションでの読み取り

DirectRunnerとデータストアの偽物を使用して、パイプラインをローカルで実行することが望ましいです。ただし、モックを使用してソース変換の単体テストを行うこともできます。

BigtableIOTest.testReading

pubsub_test.TestReadFromPubSub.test_read_messages_success

CassandraIOTest.testRead

デフォルトオプションでの書き込み

DirectRunnerとデータストアの偽物を使用して、パイプラインをローカルで実行することが望ましいです。ただし、モックを使用してシンク変換の単体テストを行うこともできます。

BigtableIOTest.testWriting

pubsub_test.TestWriteToPubSub.test_write_messages_success

追加オプションを使用した読み取り

ユーザーが使用できるすべてのオプションについて。

BigtableIOTest.testReadingWithFilter

追加オプションを使用した書き込み

ユーザーが使用できるすべてのオプションについて。たとえば、動的デスティネーションへの書き込み。

BigTableIOTest.testReadWithBigTableOptionsSetsRetryOptions

BigQueryIOWriteTest.testWriteDynamicDestinations

追加の要素タイプの読み取り

データストアの読み取りスキーマが異なるデータ型をサポートしている場合。

BigQueryIOReadTest.testReadTableWithSchema

追加の要素タイプの書き込み

データストアの書き込みスキーマが異なるデータ型をサポートしている場合。

データの表示

ソース/シンクがディスプレイデータを正しく設定するかどうかをテストします。

AvroIOTest.testReadDisplayData

DatastoreV1Test.testReadDisplayData

bigquery_test.TestBigQuerySourcetest_table_reference_display_data

初期分割

これらのテストには多くのバリエーションがあります。詳細については、例を参照してください。

BigqueryIOReadTest.estBigQueryQuerySourceInitSplit

avroio_test.AvroBase.test_read_with_splitting

動的な作業の再調整

これらのテストには多くのバリエーションがあります。詳細については、例を参照してください。

BigTableIOTest.testReadingSplitAtFractionExhaustive

avroio_test.AvroBase.test_dynamic_work_rebalancing_exhaustive

スキーマのサポート

PCollection<Row>の読み取りまたはPCollection<Row>の書き込み

ソースからのスキーマの取得、およびシンクへのスキーマのプッシュ/検証を確認する必要があります。

BigQueryIOReadTest.testReadTableWithSchema

BigQueryIOWriteTest.testSchemaWriteLoads

検証テスト

ソース/シンク変換が正しく検証されているかどうか、つまり、正しくない/互換性のない構成が実行可能なエラーで拒否されているかどうかをテストします。

BigQueryIOWriteTest.testWriteValidatesDataset

PubsubIOTest.testTopicValidationSuccess

メトリクス

さまざまな読み取り/書き込みメトリックが設定されていることを確認します

SpannerIOReadTest.testReadMetrics

bigtableio_test.TestWriteBigTable.test_write_metrics

すべてを読む

テストのすべてを読む(PCollection<Read Config>)バージョンが機能するかどうかをテストします

SpannerIOReadTest.readAllPipeline

CassandraIOTest.readAllQuery

シンクバッチングテスト

シンクがパフォーマンス上の理由からバッチングを実行する場合、書き込む前にシンクがデータをバッチ処理することを確認します。

SpannerIOWriteTest.testBatchFn_cells

エラー処理

データストアからのさまざまなエラー(たとえば、HTTPエラーコード)が正しく処理されることを確認します

BigQueryIOWriteTest.testExtendedErrorRetrieval

再試行ポリシー

ソース/シンクが期待どおりにリクエストを再試行することを確認します

BigQueryIOWriteTest.testRetryPolicy

シンクからの出力PCollection

シンクは、後続のステップが依存できるPCollectionを生成する必要があります。

BigQueryIOWriteTest.testWriteTables

バックログバイトレポート

無制限のソース変換がバックログバイトを正しく報告することを確認するためのテスト。

KinesisReaderTest.getSplitBacklogBytesShouldReturnBacklogUnknown

透かしレポート

無制限のソース変換が透かしを正しく報告することを確認するためのテスト。

WatermarkPolicyTest.shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords

統合テスト

統合テストは、Beamランナーと、特定のI/Oが接続するデータストアとの間のエンドツーエンドのインタラクションをテストします。これらには通常、リモートRPC呼び出しが伴うため、統合テストの実行には時間がかかります。さらに、Beamランナーは、統合テストを実行するときに複数のワーカーを使用する場合があります。これらのコストのため、統合テストは、特定のシナリオを単体テストでカバーできない場合にのみ実装する必要があります。

Beamと外部ストレージシステムとの間のインタラクションを含む統合テストを少なくとも1つ実装することは、提出に必須です。

ソースとシンクの両方を含むI/Oコネクタの場合、Beamガイドでは、読み取りと書き込みの両方を同じテストパイプラインでカバーできるように、書き込みと読み取りの形式でテストを実装することを推奨しています。

統合テストクラスは、I/Oと同じパッケージの一部であり、**{connector}IOIT**という名前である必要があります。

例えば

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java

推奨テストケース

テストの種類

説明

Dataflowを使用した「書き込みと読み取り」テスト

生成されたデータをデータストアに書き込み、Dataflowを使用してデータストアから同じデータを読み取ります。

JdbcIOIT.testWriteThenRead

Dataflowを使用した「すべて書き込んでから読む」テスト

「書き込み後読み取り」と同じですが、ソース設定のPCollectionの読み取りをサポートするソースの場合です。将来の(SDF)ソースはすべてこれをサポートすることが期待されています。

同じ変換が「読み取り」と「すべて読み取り」の形式で使用される場合、または2つの変換が本質的に同じである場合(たとえば、読み取り変換がすべて読み取りの単純なラッパーである場合、またはその逆の場合)、単一の「すべて読み取り」テストを追加するだけで十分です。

SpannerReadIT.testReadAllRecordsInDb

Dataflowを使用した非有界書き込み後読み取り

データを継続的に書き込み、読み取るパイプライン。このようなパイプラインは、結果を検証するためにキャンセルする必要があります。これは、非有界読み取りをサポートするコネクタの場合のみです。

KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming

パフォーマンステスト

パフォーマンステストフレームワークはまだ流動的であるため、パフォーマンステストは実際のI/Oコードの後に提出することができます。

パフォーマンステストフレームワークは、まだGoLangまたはTypescriptをサポートしていません。

パフォーマンスベンチマークは、I/Oのベストプラクティスの重要な部分であり、いくつかの領域に効果的に対処します

ダッシュボード

Googleは、組み込みI/Oのパフォーマンステストを定期的に実行し、JavaPython の外部から閲覧可能なダッシュボードに公開しています。

Dataflow performance test dashboard

ガイダンス

可能な場合は、統合テストとパフォーマンステストに同じテストを使用してください。パフォーマンステストは通常、統合テストと同じですが、より大量のデータが関係します。テストフレームワーク(内部および外部)は、これらのテストに関連するパフォーマンスベンチマークを追跡し、異常を検出するためのダッシュボード/ツールを提供する機能を提供します。

ページの組み込みI/Oコネクタガイド ドキュメントにリソーススケーラビリティセクションを含め、IOが統合テストを行っている上限を示します。

例えば

KafkaIOがxxxxトピックで統合テストを実施していることを示します。ドキュメントでは、コネクタの作成者がコネクタが統合テストの数を超えて拡張できると考えているかどうかを記載できますが、これにより、テスト済みのパスの制限がユーザーに明確になります。

ドキュメントには、制限に使用された構成が明確に示されている必要があります。たとえば、ランナーxと構成オプションaを使用します。

I/Oが収集するパフォーマンス/内部メトリックを、それらが何を意味するのか、どのように使用できるのかを含めて文書化します(一部のコネクタは、レイテンシ/バンドルサイズ/etcなどのパフォーマンスメトリックを収集して公開します)。

コネクタに実装されているパフォーマンステストに基づいて、I/Oの予想されるパフォーマンス特性を含めます。