Java 用 I/O コネクタの開発

重要: 新しい I/O を開発するには、Splittable DoFn を使用してください。詳細については、新しい I/O コネクタの概要をご覧ください。

Beam の既存の I/O コネクタでサポートされていないデータストアに接続するには、通常、ソースとシンクで構成されるカスタム I/O コネクタを作成する必要があります。すべての Beam ソースとシンクは複合変換です。ただし、カスタム I/O の実装はユースケースによって異なります。開始する前に、新しい I/O コネクタの概要を読んで、新しい I/O コネクタの開発、利用可能な実装オプション、およびユースケースに適したオプションの選択方法の概要を理解してください。

このガイドでは、Java を使用した Source および FileBasedSink インターフェースの使用について説明します。Python SDK は同じ機能を提供しますが、わずかに異なる API を使用します。Python SDK 固有の情報については、Python 用 I/O コネクタの開発をご覧ください。

基本的なコード要件

Beam ランナーは、複数のワーカーインスタンスを並行して使用してデータの読み取りや書き込みを行うために、提供されたクラスを使用します。そのため、Source および FileBasedSink サブクラスに提供するコードは、いくつかの基本的な要件を満たす必要があります。

  1. シリアライズ可能性: Source または FileBasedSink サブクラスは、バインドされているかアンバウンドであるかに関係なく、シリアライズ可能である必要があります。ランナーは、並行して読み取りまたは書き込みを容易にするために、複数のリモートワーカーに送信される Source または FileBasedSink サブクラスの複数のインスタンスを作成する場合があります。

  2. 不変性: Source または FileBasedSink サブクラスは、事実上不変である必要があります。すべてのプライベートフィールドは final と宣言する必要があり、コレクション型のすべてのプライベート変数は事実上不変である必要があります。クラスに setter メソッドがある場合、それらのメソッドは、関連するフィールドが変更されたオブジェクトの独立したコピーを返す必要があります。

    Source または FileBasedSink サブクラスで可変状態を使用する必要があるのは、ソースまたはシンクを実装するために必要な高コストな計算の遅延評価を使用している場合のみです。その場合、すべての可変インスタンス変数を transient と宣言する必要があります。

  3. スレッドセーフ: コードはスレッドセーフである必要があります。動的な作業再調整で動作するようにソースを構築する場合、コードをスレッドセーフにすることが重要です。Beam SDK には、これを簡単にするためのヘルパークラスが用意されています。詳細については、動的な作業再調整による BoundedSource の使用を参照してください。

  4. テスト可能性: 特に動的な作業再調整などの高度な機能で動作するようにクラスを構築する場合は、すべての Source および FileBasedSink サブクラスを徹底的にユニットテストすることが重要です。わずかな実装エラーは、検出が難しいデータ破損やデータ損失 (レコードのスキップや重複など) につながる可能性があります。

    BoundedSource 実装のテストを支援するために、SourceTestUtils クラスを使用できます。SourceTestUtils には、BoundedSource 実装のいくつかのプロパティを自動的に検証するためのユーティリティが含まれています。SourceTestUtils を使用すると、比較的少ないコード行で広範囲の入力を使用して、実装のテスト範囲を広げることができます。SourceTestUtils を使用する例については、AvroSourceTest および TextIOReadTest ソースコードを参照してください。

さらに、Beam の変換スタイルガイダンスについては、PTransform スタイルガイドを参照してください。

Source インターフェースの実装

パイプラインのデータソースを作成するには、ランナーに入力ソースからデータを読み取る方法と、複数のワーカーインスタンスが並行してデータを読み取ることができるようにデータソースを複数の部分に分割する方法を指示する、形式固有のロジックを提供する必要があります。無限データを読み取るデータソースを作成している場合は、ソースのウォーターマークとオプションのチェックポイントを管理するための追加のロジックを提供する必要があります。

次のクラスを作成して、ソースのロジックを提供します。

Source サブクラスの実装

データが有限のバッチであるか、無限のストリームであるかに応じて、BoundedSourceまたはUnboundedSourceのいずれかのサブクラスを作成する必要があります。いずれの場合も、Sourceのサブクラスは、スーパークラスの抽象メソッドをオーバーライドする必要があります。ランナーは、データソースを使用する際にこれらのメソッドを呼び出す可能性があります。たとえば、有限ソースから読み取る場合、ランナーはこれらのメソッドを使用してデータセットのサイズを見積もり、並列読み取りのために分割します。

Sourceのサブクラスは、場所などのデータソースに関する基本情報も管理する必要があります。たとえば、BeamのDatastoreIOクラスのサンプルSource実装では、ホスト、datasetID、およびクエリが引数として使用されます。コネクタはこれらの値を使用して、Cloud Datastoreからデータを取得します。

BoundedSource

BoundedSourceは、Beamランナーが並列に読み取ることができる有限のデータセットを表します。BoundedSourceには、複数のワーカーによる読み取りのためにデータセットを分割するためにランナーが使用する一連の抽象メソッドが含まれています。

BoundedSourceを実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

BoundedSourceと必須の抽象メソッドを実装する方法のモデルは、Cloud BigTable(BigtableIO.java)およびBigQuery(BigQuerySourceBase.java)のBeam実装で確認できます。

UnboundedSource

UnboundedSourceは、ランナーが並列に読み取ることができる無限のデータストリームを表します。UnboundedSourceには、ランナーが並列ストリーミング読み取りをサポートするために使用する一連の抽象メソッドが含まれています。これらには、障害回復のためのチェックポイント、データ重複を防止するためのレコードID、およびパイプラインの下流部分におけるデータ完了度を見積もるためのウォーターマークが含まれます。

UnboundedSourceを実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

Reader サブクラスの実装

ソースのサブクラスのcreateReaderメソッドによって返されるBoundedReaderまたはUnboundedReaderのいずれかのサブクラスを作成する必要があります。ランナーは、データセットの実際の読み取りを行うために、Reader(有限または無限)のメソッドを使用します。

BoundedReaderUnboundedReaderには、定義する必要がある同様の基本インターフェイスがあります。さらに、無制限のデータを操作するために実装する必要があるUnboundedReaderに固有の追加メソッドがいくつかあり、BoundedReaderが動的なワークリバランスを利用したい場合に実装できるオプションのメソッドがあります。また、UnboundedReaderを使用する場合、start()およびadvance()メソッドのセマンティクスにはわずかな違いがあります。

BoundedReader と UnboundedReader の両方に共通する Reader メソッド

ランナーは、次のメソッドを使用して、BoundedReaderまたはUnboundedReaderを使用してデータを読み取ります。

UnboundedReader 固有の Reader メソッド

基本的なReaderインターフェイスに加えて、UnboundedReaderには、無制限のデータソースからの読み取りを管理するための追加メソッドがいくつかあります。

ソースから読み取る際に.withMaxNumRecordsまたは.withMaxReadTimeのいずれかを指定することにより、UnboundedSourceから制限付きPCollectionを読み取ることができます。.withMaxNumRecordsは、無制限のソースから固定の最大レコード数を読み取り、.withMaxReadTimeは、無制限のソースから固定の最大時間の間読み取ります。

動的な作業再調整による BoundedSource の使用

ソースが制限付きデータを提供する場合、BoundedReadersplitAtFractionメソッドを実装することにより、動的なワークリバランスを使用できます。ランナーは、Sourceの残りのデータを分割して他のワーカーに再配布できるように、特定のリーダーでstartまたはadvanceと同時にsplitAtFractionを呼び出す場合があります。

splitAtFractionを実装するとき、コードは、それらの分割の和がデータセット全体と一致する、相互に排他的な分割のセットを生成する必要があります。

splitAtFractionを実装する場合は、splitAtFractiongetFractionConsumedの両方をスレッドセーフな方法で実装する必要があります。そうしないと、データが失われる可能性があります。また、データの重複やデータ損失を回避するために、実装を徹底的に単体テストする必要があります。

コードがスレッドセーフであることを保証するには、splitAtFractionおよびgetFractionConsumedを実装する際に、RangeTrackerスレッドセーフヘルパーオブジェクトを使用してデータソースの位置を管理します。

splitAtFractionの実装をテストするには、SourceTestUtilsクラスを使用することを強くお勧めします。SourceTestUtilsには、徹底的な自動テストを含む、splitAtFractionの実装をテストするための多数のメソッドが含まれています。

便利な Source および Reader の基本クラス

Beam SDKには、ファイルなどの一般的なデータストレージ形式を処理するSourceおよびReaderクラスの作成に役立つ便利な抽象基本クラスがいくつか含まれています。

FileBasedSource

データソースがファイルを使用する場合、SourceおよびReaderクラスをFileBasedSourceおよびFileBasedReader抽象基本クラスから派生させることができます。FileBasedSourceは、ファイルと対話するBeamソースに共通のコードを実装する制限付きソースサブクラスです。これには、次のものが含まれます。

FileBasedSink 抽象化の使用

データソースがファイルを使用する場合、FileBasedSink抽象化を実装して、ファイルベースのシンクを作成できます。その他のシンクについては、Java用Beam SDKで提供されるParDoGroupByKey、およびその他の変換を使用します。詳細については、I/Oコネクタの開発の概要を参照してください。

FileBasedSinkインターフェイスを使用する場合、パイプラインのPCollectionsから出力シンクに制限付きデータを書き込む方法をランナーに指示する形式固有のロジックを提供する必要があります。ランナーは、複数のワーカーを使用して並列にデータのバンドルを書き込みます。

次のクラスを実装して、ファイルベースのシンクのロジックを提供します。

FileBasedSink抽象基本クラスは、ファイルと対話するBeamシンクに共通のコードを実装します。これには、次のものが含まれます。

FileBasedSinkとそのサブクラスは、BeamがサポートするFileSystem実装へのファイルの書き込みをサポートします。例については、Beamが提供する次のFileBasedSink実装を参照してください。

PTransform ラッパー

エンドユーザーが使用するソースまたはシンクを作成する場合は、ソースまたはシンクコードを公開しないようにしてください。ソースとシンクをエンドユーザーに公開しないように、新しいクラスは保護またはプライベートにする必要があります。次に、ユーザー向けのラッパーPTransformを実装します。ソースまたはシンクを変換として公開することにより、実装は非表示になり、任意に複雑または単純にすることができます。実装の詳細を公開しないことの最大の利点は、後でユーザーの既存の実装を壊すことなく、機能を追加できることです。

たとえば、ユーザーのパイプラインがreadを使用してソースから読み取り、パイプラインにリシャーディングを挿入する場合、すべてのユーザーが自分でリシャーディング(GroupByKey変換を使用)を追加する必要があります。これを解決するために、読み取り操作とリシャーディングの両方を実行する複合PTransformとしてソースを公開することをお勧めします。

PTransformでのラップの詳細については、BeamのPTransformスタイルガイドを参照してください。