Python向けI/Oコネクタの開発
**重要:** 新しいI/Oの開発には`Splittable DoFn`を使用してください。詳細は、新しいI/Oコネクタの概要をご覧ください。
Beamの既存のI/Oコネクタでサポートされていないデータストアに接続するには、通常、ソースとシンクで構成されるカスタムI/Oコネクタを作成する必要があります。すべてのBeamソースとシンクは複合変換です。ただし、カスタムI/Oの実装はユースケースによって異なります。開始する前に、新しいI/Oコネクタの概要を読んで、新しいI/Oコネクタの開発の概要、利用可能な実装オプション、ユースケースに適したオプションの選択方法を確認してください。
このガイドでは、Python向けSourceおよびFileBasedSinkインターフェースの使用方法について説明します。Java SDKは同じ機能を提供しますが、APIがわずかに異なります。Java SDK固有の情報については、Java向けI/Oコネクタの開発を参照してください。
基本的なコード要件
Beamランナーは、提供されたクラスを使用して、複数のワーカーインスタンスで並列にデータを読み書きします。そのため、`Source`および`FileBasedSink`サブクラスに提供するコードは、いくつかの基本的な要件を満たす必要があります。
**シリアライズ可能性:** `Source`または`FileBasedSink`サブクラスはシリアライズ可能である必要があります。サービスは、並列読み取りまたは書き込みを容易にするために、複数のリモートワーカーに送信される`Source`または`FileBasedSink`サブクラスの複数のインスタンスを作成する場合があります。ソースおよびシンクオブジェクトのシリアライズ方法は、ランナーによって異なります。
**不変性:** `Source`または`FileBasedSink`サブクラスは事実上不変である必要があります。ソースを実装するために必要な、高コストな計算の遅延評価を使用している場合にのみ、`Source`または`FileBasedSink`サブクラスで可変状態を使用する必要があります。
**スレッドセーフ:** コードはスレッドセーフである必要があります。Python向けBeam SDKは、これを容易にするために`RangeTracker`クラスを提供します。
**テスト容易性:** すべての`Source`および`FileBasedSink`サブクラスを徹底的に単体テストすることが重要です。実装の軽微なエラーが、検出が困難なデータの破損またはデータの損失(レコードのスキップや重複など)につながる可能性があります。 source_test_utilsモジュールで利用できるテストハーネスとユーティリティメソッドを使用して、ソースのテストを開発できます。
さらに、Beamの変換スタイルのガイダンスについては、PTransformスタイルガイドを参照してください。
Sourceインターフェースの実装
パイプラインの新しいデータソースを作成するには、サービスに入力ソースからデータを読み取る方法と、複数のワーカーインスタンスがデータを並列に読み取ることができるようにデータソースを複数の部分に分割する方法を指示する、形式固有のロジックを提供する必要があります。
次のクラスを作成することで、新しいソースのロジックを提供します。
- `BoundedSource`のサブクラス。 `BoundedSource`は、有限量の入力レコードを読み取るソースです。クラスは、データの場所とパラメータ(読み取るデータ量など)を含め、読み取るデータを記述します。
- `RangeTracker`のサブクラス。 `RangeTracker`は、特定の位置タイプの範囲を管理するために使用されるスレッドセーフなオブジェクトです。
- 読み取り操作をラップする1つ以上のユーザー向けラッパー複合変換(`PTransform`)。 PTransformラッパーでは、ソースを公開しないようにする理由と、ラッパーの作成方法について説明します。
これらのクラスは、apache_beam.io.iobaseモジュールにあります。
BoundedSourceサブクラスの実装
`BoundedSource`は、サービスが、場合によっては並列に読み取る有限のデータセットを表します。 `BoundedSource`には、サービスが複数のリモートワーカーによる読み取りのためにデータセットを分割するために使用する一連のメソッドが含まれています。
`BoundedSource`を実装するには、サブクラスで次のメソッドをオーバーライドする必要があります。
`estimate_size`:サービスはこのメソッドを使用して、データの*合計サイズ*をバイト単位で見積もります。この見積もりは、解凍やその他の処理を実行する前の外部ストレージサイズに基づいています。
`split`:サービスはこのメソッドを使用して、有限のデータを特定のサイズのバンドルに分割します。
`get_range_tracker`:サービスはこのメソッドを使用して、特定の位置範囲の`RangeTracker`を取得し、その情報を使用して進捗を報告し、ソースの動的分割を実行します。
`read`:このメソッドは、指定された`RangeTracker`オブジェクトで定義された境界に従って、ソースからデータを読み取るイテレータを返します。
RangeTrackerサブクラスの実装
`RangeTracker`は、`BoundedSource`のリーダーの現在の範囲と現在位置を管理し、それらへの同時アクセスを保護するために使用されるスレッドセーフなオブジェクトです。
`RangeTracker`を実装するには、まず次の定義をよく理解する必要があります。
**位置ベースのソース** - 位置ベースのソースは、順序付けられたタイプの位置の範囲で記述でき、ソースによって読み取られるレコードは、そのタイプの位置で記述できます。たとえば、ファイル内のレコードの場合、位置はレコードの開始バイトオフセットにすることができます。この場合のレコードの位置タイプは`long`です。
位置ベースのソースの主な要件は、**結合性**です。位置範囲 '[A、B)'のレコードと位置範囲 '[B、C)'のレコードを読み取ると、位置範囲 '[A、C)'のレコードを読み取るのと同じレコードが得られるはずです。ここで、'A' <= 'B' <= 'C'です。このプロパティにより、位置の範囲がいくつの任意のサブ範囲に分割されても、それらが記述するレコードの合計セットは同じままになります。
もう一つの重要な特性は、ソースの範囲がソース内のレコードの位置とどのように関連しているかです。多くのソースでは、各レコードは一意の開始位置によって識別できます。この場合
- ソース '[A, B)' によって返されるすべてのレコードは、この範囲内に開始位置を持つ必要があります。
- 最後のレコードを除くすべてのレコードはこの範囲内で終了する必要があります。最後のレコードは、範囲の終わりを超えて拡張される場合とされない場合があります。
- レコードは重複してはなりません。
このようなソースは、「'[A, B)' を読み取る」を「'A' 以降で開始する最初のレコードから、'B' 以降で開始する最初のレコードを含まないまで読み取る」と定義する必要があります。
このようなソースの例としては、テキストファイルから行または CSV を読み取る、データベースからキーと値を読み取るなどがあります。
分割点の概念により、一意の開始位置で識別できないレコードがいくつかあるソースを扱うための定義を拡張できます。
分割点 - 分割点は、位置 **A** 以上(つまり、[A、無限大))から無限大までの範囲を読み取るときに返される最初のレコードを表します。
一部のソースには、直接アドレス指定できないレコードがある場合があります。たとえば、一連の圧縮ブロックで構成されるファイル形式を想像してみてください。各ブロックにはオフセットを割り当てることができますが、ブロック内のレコードはブロックを解凍せずに直接アドレス指定することはできません。この仮説的な形式を *CBF(Compressed Blocks Format)* と呼ぶことにしましょう。
このような形式の多くは、依然として結合性プロパティを満たすことができます。たとえば、CBF では、[A, B) を読み取ることは、「開始オフセットが [A, B) にあるすべてのブロック内のすべてのレコードを読み取る」ことを意味する場合があります。
このような複雑な形式をサポートするために、Beam は *分割点* の概念を導入しています。レコードは、範囲 [A、無限大) を読み取るときに返される最初のレコードとなるような位置 **A** が存在する場合、分割点となります。CBF では、分割点は各ブロックの最初のレコードのみになります。
分割点を使用すると、次の場合にレコードの位置とソースの範囲の意味を定義できます
- 分割点にあるレコードの場合、その位置は、範囲 [A、無限大) でソースを読み取るとこのレコードが返されるような最大の **A** として定義されます。
- 他のレコードの位置は、非減少であることのみが要求されます。
- ソース [A, B) を読み取ると、**A** 以降の最初の分割点から、**B** 以降の最初の分割点を含まないまでのレコードが返される必要があります。特に、これは、ソースによって返される最初のレコードは常に分割点でなければならないことを意味します。
- 分割点の位置は一意である必要があります。
その結果、ソースの全範囲を位置範囲に分解すると、レコードの合計セットはソース内のレコードの完全なセットになり、各レコードは正確に1回読み取られます。
消費された位置 - 消費された位置は、読み取られたレコードを指します。
ソースが読み取られ、そこから読み取られたレコードがパイプラインの下流の変換に渡されると、ソース内の位置が *消費* されていると言います。リーダーがレコードを読み取った(またはレコードが返されることを呼び出し元に約束した)場合、レコードの開始位置までの位置は *消費された* と見なされます。
動的分割は、*消費されていない* 位置でのみ発生する可能性があります。リーダーがファイル内のオフセット42のレコードを返した場合、動的分割はオフセット43以降でのみ発生する可能性があります。そうでない場合、そのレコードは(現在のリーダーと新しいタスクのリーダーによって)2回読み取られる可能性があります。
RangeTrackerのメソッド
RangeTracker
を実装するには、サブクラスで次のメソッドをオーバーライドする必要があります
start_position
:現在の範囲の開始位置を返します(包括的)。stop_position
:現在の範囲の終了位置を返します(排他的)。try_claim
:このメソッドは、分割点にあるレコードが範囲内にあるかどうかを判断するために使用されます。このメソッドは、最後に消費された位置を、ソースによって読み取られているレコードの指定された開始position
に更新することにより、RangeTracker
の内部状態を変更する必要があります。指定された位置が現在の範囲内にある場合、メソッドは true を返します。set_current_position
:このメソッドは、最後に消費された位置を、ソースによって読み取られているレコードの指定された開始位置に更新します。分割点で開始されないレコードに対してこのメソッドを呼び出すことができ、これはRangeTracker
の内部状態を変更する必要があります。レコードが分割点で開始する場合、このメソッドの代わりにtry_claim
を呼び出す必要があります。position_at_fraction
:範囲 [0.0, 1.0) 内の fraction が与えられると、このメソッドは、位置範囲 [self.start_position
,self.stop_position
) と比較して、指定された fraction での位置を返します。try_split
:このメソッドは、現在の範囲を、提案された位置を中心に2つの部分に分割しようとします。異なる位置で分割することは許可されていますが、ほとんどの場合、提案された位置で分割されます。
このメソッドは、split_position
がまだ消費されていないと仮定して、現在の範囲 [self.start_position
, self.stop_position
) を「プライマリ」部分 [self.start_position
, split_position
) と「残差」部分 [split_position
, self.stop_position
) に分割します。
split_position
がすでに消費されている場合、メソッドは None
を返します。そうでない場合、現在の範囲をプライマリに更新し、タプル (split_position
, split_fraction
) を返します。split_fraction
は、元の(分割前)範囲 [self.start_position
, self.stop_position
) と比較した範囲 [self.start_position
, split_position
) のサイズの割合である必要があります。
fraction_consumed
:ソース内で消費された位置のおおよその割合を返します。
**注:** クラス iobase.RangeTracker
のメソッドは複数のスレッドによって呼び出される可能性があるため、このクラスは、たとえば単一のロックオブジェクトを使用してスレッドセーフにする必要があります。
便利なSource基本クラス
Python 用 Beam SDK には、新しいソースを簡単に作成するのに役立つ便利な抽象基底クラスがいくつか含まれています。
FileBasedSource
FileBasedSource
は、新しいファイルタイプのソースを開発するためのフレームワークです。FileBasedSource クラスから BoundedSource
クラスを派生させることができます。
新しいファイルタイプのソースを作成するには、FileBasedSource
のサブクラスを作成する必要があります。FileBasedSource
のサブクラスは、メソッド FileBasedSource.read_records()
を実装する必要があります。
FileBasedSource
の実装例については、AvroSource を参照してください。
新しいSourceからの読み取り
次の例 CountingSource
は、BoundedSource
の実装を示し、SDK 提供の OffsetRangeTracker
と呼ばれる RangeTracker
を使用しています。
class CountingSource(iobase.BoundedSource):
def __init__(self, count):
self.records_read = Metrics.counter(self.__class__, 'recordsRead')
self._count = count
def estimate_size(self):
return self._count
def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
return OffsetRangeTracker(start_position, stop_position)
def read(self, range_tracker):
for i in range(range_tracker.start_position(),
range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
self.records_read.inc()
yield i
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
bundle_start = start_position
while bundle_start < stop_position:
bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
yield iobase.SourceBundle(
weight=(bundle_stop - bundle_start),
source=self,
start_position=bundle_start,
stop_position=bundle_stop)
bundle_start = bundle_stop
パイプラインでソースからデータを読み取るには、Read
変換を使用します
**注:** エンドユーザーが使用するソースを作成する場合、上記の例に示すようにソース自体のコードを公開しないことをお勧めします。代わりに、ラッパー PTransform
を使用してください。PTransform ラッパー では、ソースを公開しない理由と、ラッパーの作成方法について説明しています。
FileBasedSink抽象化の使用
データソースでファイルを使用する場合、FileBasedSink 抽象化を実装して、ファイルベースのシンクを作成できます。他のシンクの場合は、ParDo
、GroupByKey
、および Python 用 Beam SDK によって提供される他の変換を使用します。詳細については、I/O コネクタの開発の概要 を参照してください。
FileBasedSink
インターフェースを使用する場合、パイプラインの PCollection
から出力シンクに境界のあるデータを書き込む方法をランナーに指示する、形式固有のロジックを提供する必要があります。ランナーは、複数のワーカーを使用してデータのバンドルを並列に書き込みます。
次のクラスを実装することにより、ファイルベースのシンクのロジックを提供します
抽象基底クラス
FileBasedSink
のサブクラス。FileBasedSink
は、パイプラインが並列に書き込むことができる場所またはリソースを記述します。シンクをエンドユーザーに公開しないようにするには、FileBasedSink
サブクラスを作成するときに `_` プレフィックスを使用します。ユーザー向けのラッパー
PTransform
。ロジックの一部としてWrite
を呼び出し、FileBasedSink
をパラメーターとして渡します。ユーザーはWrite
を直接呼び出す必要はありません。
FileBasedSink
抽象基底クラスは、次のようなファイルと対話する Beam シンクに共通のコードを実装します。
- ファイルヘッダーとフッターの設定
- 順次レコード書き込み
- 出力 MIME タイプの設定
FileBasedSink
とそのサブクラスは、Beam でサポートされている FileSystem
実装へのファイルの書き込みをサポートしています。例として、次の Beam 提供の FileBasedSink
実装を参照してください。
PTransformラッパー
エンドユーザーが使用するソースまたはシンクを作成する場合、ソースまたはシンクのコードを公開しないでください。ソースとシンクをエンドユーザーに公開しないようにするには、新しいクラスで `_` プレフィックスを使用する必要があります。次に、ユーザー向けのラッパー PTransform
を実装します。ソースまたはシンクを変換として公開することにより、実装は非表示になり、任意に複雑または単純にすることができます。実装の詳細を公開しないことの最大の利点は、後で既存のユーザー向けの実装を壊すことなく、追加の機能を追加できることです。
たとえば、ユーザーのパイプラインが beam.io.Read
を使用してソースから読み取り、パイプラインに再シャードを挿入する場合、すべてのユーザーは再シャードを自分で追加する必要があります(GroupByKey
変換を使用)。これを解決するために、読み取り操作と再シャードの両方を実行する複合 PTransform
としてソースを公開することをお勧めします。
PTransform
でのラッピングの詳細については、Beam の PTransform スタイルガイド を参照してください。
次の例では、上記のセクションのソースとシンクを変更して、エンドユーザーに公開されないようにします。ソースの場合は、CountingSource
の名前を _CountingSource
に変更します。次に、ReadFromCountingSource
と呼ばれるラッパー PTransform
を作成します
最後に、ソースから読み取ります
シンクの場合は、SimpleKVSink
の名前を _SimpleKVSink
に変更します。次に、WriteToKVSink
と呼ばれるラッパー PTransform
を作成します
class WriteToKVSink(PTransform):
def __init__(self, simplekv, url, final_table_name):
self._simplekv = simplekv
super().__init__()
self._url = url
self._final_table_name = final_table_name
def expand(self, pcoll):
return pcoll | iobase.Write(
_SimpleKVSink(self._simplekv, self._url, self._final_table_name))
最後に、シンクに書き込みます
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!