Java 用 I/O コネクタの開発
重要: 新しい I/O を開発するには、Splittable DoFn
を使用してください。詳細については、新しい I/O コネクタの概要をご覧ください。
Beam の既存の I/O コネクタでサポートされていないデータストアに接続するには、通常、ソースとシンクで構成されるカスタム I/O コネクタを作成する必要があります。すべての Beam ソースとシンクは複合変換です。ただし、カスタム I/O の実装はユースケースによって異なります。開始する前に、新しい I/O コネクタの概要を読んで、新しい I/O コネクタの開発、利用可能な実装オプション、およびユースケースに適したオプションの選択方法の概要を理解してください。
このガイドでは、Java を使用した Source
および FileBasedSink
インターフェースの使用について説明します。Python SDK は同じ機能を提供しますが、わずかに異なる API を使用します。Python SDK 固有の情報については、Python 用 I/O コネクタの開発をご覧ください。
基本的なコード要件
Beam ランナーは、複数のワーカーインスタンスを並行して使用してデータの読み取りや書き込みを行うために、提供されたクラスを使用します。そのため、Source
および FileBasedSink
サブクラスに提供するコードは、いくつかの基本的な要件を満たす必要があります。
シリアライズ可能性:
Source
またはFileBasedSink
サブクラスは、バインドされているかアンバウンドであるかに関係なく、シリアライズ可能である必要があります。ランナーは、並行して読み取りまたは書き込みを容易にするために、複数のリモートワーカーに送信されるSource
またはFileBasedSink
サブクラスの複数のインスタンスを作成する場合があります。不変性:
Source
またはFileBasedSink
サブクラスは、事実上不変である必要があります。すべてのプライベートフィールドは final と宣言する必要があり、コレクション型のすべてのプライベート変数は事実上不変である必要があります。クラスに setter メソッドがある場合、それらのメソッドは、関連するフィールドが変更されたオブジェクトの独立したコピーを返す必要があります。Source
またはFileBasedSink
サブクラスで可変状態を使用する必要があるのは、ソースまたはシンクを実装するために必要な高コストな計算の遅延評価を使用している場合のみです。その場合、すべての可変インスタンス変数を transient と宣言する必要があります。スレッドセーフ: コードはスレッドセーフである必要があります。動的な作業再調整で動作するようにソースを構築する場合、コードをスレッドセーフにすることが重要です。Beam SDK には、これを簡単にするためのヘルパークラスが用意されています。詳細については、動的な作業再調整による BoundedSource の使用を参照してください。
テスト可能性: 特に動的な作業再調整などの高度な機能で動作するようにクラスを構築する場合は、すべての
Source
およびFileBasedSink
サブクラスを徹底的にユニットテストすることが重要です。わずかな実装エラーは、検出が難しいデータ破損やデータ損失 (レコードのスキップや重複など) につながる可能性があります。BoundedSource
実装のテストを支援するために、SourceTestUtils クラスを使用できます。SourceTestUtils
には、BoundedSource
実装のいくつかのプロパティを自動的に検証するためのユーティリティが含まれています。SourceTestUtils
を使用すると、比較的少ないコード行で広範囲の入力を使用して、実装のテスト範囲を広げることができます。SourceTestUtils
を使用する例については、AvroSourceTest および TextIOReadTest ソースコードを参照してください。
さらに、Beam の変換スタイルガイダンスについては、PTransform スタイルガイドを参照してください。
Source インターフェースの実装
パイプラインのデータソースを作成するには、ランナーに入力ソースからデータを読み取る方法と、複数のワーカーインスタンスが並行してデータを読み取ることができるようにデータソースを複数の部分に分割する方法を指示する、形式固有のロジックを提供する必要があります。無限データを読み取るデータソースを作成している場合は、ソースのウォーターマークとオプションのチェックポイントを管理するための追加のロジックを提供する必要があります。
次のクラスを作成して、ソースのロジックを提供します。
有限 (バッチ) データセットを読み取る場合は
BoundedSource
のサブクラス、無限 (ストリーミング) データセットを読み取る場合はUnboundedSource
のサブクラス。これらのサブクラスは、読み取るデータの場所とパラメーター (読み取るデータの量など) を含む、読み取るデータを記述します。Source.Reader
のサブクラス。各 Source には、そのSource
からの読み取りに関与するすべての状態をキャプチャする関連付けられた Reader が必要です。これには、ファイルハンドル、RPC 接続、および読み取るデータ形式の特定の要件に依存するその他のパラメーターが含まれます。Reader
クラス階層は、Source 階層を反映しています。BoundedSource
を拡張する場合は、関連付けられたBoundedReader
を提供する必要があります。UnboundedSource
を拡張する場合は、関連付けられたUnboundedReader
を提供する必要があります。読み取り操作をラップする 1 つ以上のユーザー向けラッパー複合変換 (
PTransform
)。PTransform ラッパーでは、ソースを公開することを避けるべき理由について説明します。
Source サブクラスの実装
データが有限のバッチであるか、無限のストリームであるかに応じて、BoundedSource
またはUnboundedSource
のいずれかのサブクラスを作成する必要があります。いずれの場合も、Source
のサブクラスは、スーパークラスの抽象メソッドをオーバーライドする必要があります。ランナーは、データソースを使用する際にこれらのメソッドを呼び出す可能性があります。たとえば、有限ソースから読み取る場合、ランナーはこれらのメソッドを使用してデータセットのサイズを見積もり、並列読み取りのために分割します。
Source
のサブクラスは、場所などのデータソースに関する基本情報も管理する必要があります。たとえば、BeamのDatastoreIOクラスのサンプルSource
実装では、ホスト、datasetID、およびクエリが引数として使用されます。コネクタはこれらの値を使用して、Cloud Datastoreからデータを取得します。
BoundedSource
BoundedSource
は、Beamランナーが並列に読み取ることができる有限のデータセットを表します。BoundedSource
には、複数のワーカーによる読み取りのためにデータセットを分割するためにランナーが使用する一連の抽象メソッドが含まれています。
BoundedSource
を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。
split
: ランナーは、このメソッドを使用して、有限データを指定されたサイズのバンドルに分割します。getEstimatedSizeBytes
: ランナーは、このメソッドを使用して、データの合計サイズをバイト単位で見積もります。createReader
: このBoundedSource
に関連付けられたBoundedReader
を作成します。
BoundedSource
と必須の抽象メソッドを実装する方法のモデルは、Cloud BigTable(BigtableIO.java)およびBigQuery(BigQuerySourceBase.java)のBeam実装で確認できます。
UnboundedSource
UnboundedSource
は、ランナーが並列に読み取ることができる無限のデータストリームを表します。UnboundedSource
には、ランナーが並列ストリーミング読み取りをサポートするために使用する一連の抽象メソッドが含まれています。これらには、障害回復のためのチェックポイント、データ重複を防止するためのレコードID、およびパイプラインの下流部分におけるデータ完了度を見積もるためのウォーターマークが含まれます。
UnboundedSource
を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。
split
: ランナーは、このメソッドを使用して、サービスが並列に読み取る必要があるサブストリームインスタンスの数を表すUnboundedSource
オブジェクトのリストを生成します。getCheckpointMarkCoder
: ランナーは、このメソッドを使用して、ソースのチェックポイント(存在する場合)のCoderを取得します。requiresDeduping
: ランナーは、このメソッドを使用して、データが重複レコードの明示的な削除を必要とするかどうかを判断します。このメソッドがtrueを返す場合、ランナーはソースの出力から重複を削除するステップを自動的に挿入します。このメソッドは、ソースが各レコードのレコードIDを提供する場合にのみ、trueを返す必要があります。これがいつ行うべきかについては、UnboundedReader.getCurrentRecordId
を参照してください。createReader
: このUnboundedSource
に関連付けられたUnboundedReader
を作成します。
Reader サブクラスの実装
ソースのサブクラスのcreateReader
メソッドによって返されるBoundedReader
またはUnboundedReader
のいずれかのサブクラスを作成する必要があります。ランナーは、データセットの実際の読み取りを行うために、Reader
(有限または無限)のメソッドを使用します。
BoundedReader
とUnboundedReader
には、定義する必要がある同様の基本インターフェイスがあります。さらに、無制限のデータを操作するために実装する必要があるUnboundedReader
に固有の追加メソッドがいくつかあり、BoundedReader
が動的なワークリバランスを利用したい場合に実装できるオプションのメソッドがあります。また、UnboundedReader
を使用する場合、start()
およびadvance()
メソッドのセマンティクスにはわずかな違いがあります。
BoundedReader と UnboundedReader の両方に共通する Reader メソッド
ランナーは、次のメソッドを使用して、BoundedReader
またはUnboundedReader
を使用してデータを読み取ります。
start
:Reader
を初期化し、読み取る最初のレコードに進みます。このメソッドは、ランナーがデータの読み取りを開始するときに正確に1回呼び出され、初期化に必要な高コストの操作を配置するのに適した場所です。advance
: リーダーを次の有効なレコードに進めます。利用可能な入力がない場合、このメソッドはfalseを返す必要があります。BoundedReader
はadvanceがfalseを返したら読み取りを停止する必要がありますが、UnboundedReader
はストリームからさらにデータが利用可能になったら、将来の呼び出しでtrueを返すことができます。getCurrent
: 現在の位置にあるデータレコードを返します。これは、startまたはadvanceによって最後に読み取られたものです。getCurrentTimestamp
: 現在のデータレコードのタイムスタンプを返します。ソースが固有のタイムスタンプを持つデータを読み取る場合にのみ、getCurrentTimestamp
をオーバーライドする必要があります。ランナーはこの値を使用して、結果の出力PCollection
の各要素に固有のタイムスタンプを設定します。
UnboundedReader 固有の Reader メソッド
基本的なReader
インターフェイスに加えて、UnboundedReader
には、無制限のデータソースからの読み取りを管理するための追加メソッドがいくつかあります。
getCurrentRecordId
: 現在のレコードの一意の識別子を返します。ランナーはこれらのレコードIDを使用して、重複レコードをフィルタリングします。データに各レコードに存在する論理IDがある場合は、このメソッドでそれらを返すことができます。それ以外の場合は、少なくとも128ビットのハッシュを使用して、レコードコンテンツのハッシュを返すことができます。JavaのObject.hashCode()
を使用するのは誤りです。32ビットハッシュは通常、衝突を防ぐのに不十分であり、hasCode()
はプロセス間で安定することが保証されていないためです。ソースが各レコードを一意に識別するチェックポイントスキームを使用している場合、
getCurrentRecordId
の実装はオプションです。たとえば、分割がファイルであり、チェックポイントがすべてのデータが読み取られたファイルの場所である場合、レコードIDは必要ありません。ただし、アップストリームシステムがソースにデータを書き込む際に、ソースが読み取る可能性のある重複レコードを時折生成する場合、レコードIDは依然として役立ちます。getWatermark
:Reader
が提供するウォーターマークを返します。ウォーターマークは、Reader
によって読み取られる将来の要素のタイムスタンプの概算下限です。ランナーは、ウォーターマークをデータ完了度の推定値として使用します。ウォーターマークは、ウィンドウ処理とトリガーで使用されます。getCheckpointMark
: ランナーは、このメソッドを使用してデータストリームにチェックポイントを作成します。チェックポイントは、障害回復に使用できるUnboundedReader
の進捗状況を表します。データストリームが異なれば、異なるチェックポイント方式を使用する場合があります。一部のソースでは、受信したレコードの確認応答が必要になる場合があります。一方、位置チェックポイントを使用するソースもあります。このメソッドを、最も適切なチェックポイント方式に合わせる必要があります。たとえば、このメソッドで最後に確認応答されたレコードを返すことができます。getCheckpointMark
はオプションです。データに意味のあるチェックポイントがない場合は、実装する必要はありません。ただし、ソースにチェックポイントを実装しないことを選択した場合、データソースがエラー発生時にレコードの再送信を試みるかどうかによっては、パイプラインでデータが重複したり、データが失われたりする可能性があります。
ソースから読み取る際に.withMaxNumRecords
または.withMaxReadTime
のいずれかを指定することにより、UnboundedSource
から制限付きPCollection
を読み取ることができます。.withMaxNumRecords
は、無制限のソースから固定の最大レコード数を読み取り、.withMaxReadTime
は、無制限のソースから固定の最大時間の間読み取ります。
動的な作業再調整による BoundedSource の使用
ソースが制限付きデータを提供する場合、BoundedReader
はsplitAtFraction
メソッドを実装することにより、動的なワークリバランスを使用できます。ランナーは、Source
の残りのデータを分割して他のワーカーに再配布できるように、特定のリーダーでstartまたはadvanceと同時にsplitAtFraction
を呼び出す場合があります。
splitAtFraction
を実装するとき、コードは、それらの分割の和がデータセット全体と一致する、相互に排他的な分割のセットを生成する必要があります。
splitAtFraction
を実装する場合は、splitAtFraction
とgetFractionConsumed
の両方をスレッドセーフな方法で実装する必要があります。そうしないと、データが失われる可能性があります。また、データの重複やデータ損失を回避するために、実装を徹底的に単体テストする必要があります。
コードがスレッドセーフであることを保証するには、splitAtFraction
およびgetFractionConsumed
を実装する際に、RangeTracker
スレッドセーフヘルパーオブジェクトを使用してデータソースの位置を管理します。
splitAtFraction
の実装をテストするには、SourceTestUtils
クラスを使用することを強くお勧めします。SourceTestUtils
には、徹底的な自動テストを含む、splitAtFraction
の実装をテストするための多数のメソッドが含まれています。
便利な Source および Reader の基本クラス
Beam SDKには、ファイルなどの一般的なデータストレージ形式を処理するSource
およびReader
クラスの作成に役立つ便利な抽象基本クラスがいくつか含まれています。
FileBasedSource
データソースがファイルを使用する場合、Source
およびReader
クラスをFileBasedSource
およびFileBasedReader
抽象基本クラスから派生させることができます。FileBasedSource
は、ファイルと対話するBeamソースに共通のコードを実装する制限付きソースサブクラスです。これには、次のものが含まれます。
- ファイルパターン展開
- 順次レコード読み取り
- 分割点
FileBasedSink 抽象化の使用
データソースがファイルを使用する場合、FileBasedSink
抽象化を実装して、ファイルベースのシンクを作成できます。その他のシンクについては、Java用Beam SDKで提供されるParDo
、GroupByKey
、およびその他の変換を使用します。詳細については、I/Oコネクタの開発の概要を参照してください。
FileBasedSink
インターフェイスを使用する場合、パイプラインのPCollection
sから出力シンクに制限付きデータを書き込む方法をランナーに指示する形式固有のロジックを提供する必要があります。ランナーは、複数のワーカーを使用して並列にデータのバンドルを書き込みます。
次のクラスを実装して、ファイルベースのシンクのロジックを提供します。
抽象基本クラス
FileBasedSink
のサブクラス。FileBasedSink
は、パイプラインが並列に書き込むことができる場所またはリソースを記述します。シンクをエンドユーザーに公開しないように、FileBasedSink
サブクラスは保護またはプライベートにする必要があります。ロジックの一部としてWriteFilesを呼び出し、パラメーターとして
FileBasedSink
を渡す、ユーザー向けのラッパーPTransform
。ユーザーはWriteFiles
を直接呼び出す必要はありません。
FileBasedSink
抽象基本クラスは、ファイルと対話するBeamシンクに共通のコードを実装します。これには、次のものが含まれます。
- ファイルヘッダーとフッターの設定
- 順次レコード書き込み
- 出力MIMEタイプの設定
FileBasedSink
とそのサブクラスは、BeamがサポートするFileSystem
実装へのファイルの書き込みをサポートします。例については、Beamが提供する次のFileBasedSink
実装を参照してください。
PTransform ラッパー
エンドユーザーが使用するソースまたはシンクを作成する場合は、ソースまたはシンクコードを公開しないようにしてください。ソースとシンクをエンドユーザーに公開しないように、新しいクラスは保護またはプライベートにする必要があります。次に、ユーザー向けのラッパーPTransform
を実装します。ソースまたはシンクを変換として公開することにより、実装は非表示になり、任意に複雑または単純にすることができます。実装の詳細を公開しないことの最大の利点は、後でユーザーの既存の実装を壊すことなく、機能を追加できることです。
たとえば、ユーザーのパイプラインがread
を使用してソースから読み取り、パイプラインにリシャーディングを挿入する場合、すべてのユーザーが自分でリシャーディング(GroupByKey
変換を使用)を追加する必要があります。これを解決するために、読み取り操作とリシャーディングの両方を実行する複合PTransform
としてソースを公開することをお勧めします。
PTransform
でのラップの詳細については、BeamのPTransformスタイルガイドを参照してください。
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべてが役立ち、明確でしたか?何か変更したいことはありますか?お知らせください!