Python向けI/Oコネクタの開発

**重要:** 新しいI/Oの開発には`Splittable DoFn`を使用してください。詳細は、新しいI/Oコネクタの概要をご覧ください。

Beamの既存のI/Oコネクタでサポートされていないデータストアに接続するには、通常、ソースとシンクで構成されるカスタムI/Oコネクタを作成する必要があります。すべてのBeamソースとシンクは複合変換です。ただし、カスタムI/Oの実装はユースケースによって異なります。開始する前に、新しいI/Oコネクタの概要を読んで、新しいI/Oコネクタの開発の概要、利用可能な実装オプション、ユースケースに適したオプションの選択方法を確認してください。

このガイドでは、Python向けSourceおよびFileBasedSinkインターフェースの使用方法について説明します。Java SDKは同じ機能を提供しますが、APIがわずかに異なります。Java SDK固有の情報については、Java向けI/Oコネクタの開発を参照してください。

基本的なコード要件

Beamランナーは、提供されたクラスを使用して、複数のワーカーインスタンスで並列にデータを読み書きします。そのため、`Source`および`FileBasedSink`サブクラスに提供するコードは、いくつかの基本的な要件を満たす必要があります。

  1. **シリアライズ可能性:** `Source`または`FileBasedSink`サブクラスはシリアライズ可能である必要があります。サービスは、並列読み取りまたは書き込みを容易にするために、複数のリモートワーカーに送信される`Source`または`FileBasedSink`サブクラスの複数のインスタンスを作成する場合があります。ソースおよびシンクオブジェクトのシリアライズ方法は、ランナーによって異なります。

  2. **不変性:** `Source`または`FileBasedSink`サブクラスは事実上不変である必要があります。ソースを実装するために必要な、高コストな計算の遅延評価を使用している場合にのみ、`Source`または`FileBasedSink`サブクラスで可変状態を使用する必要があります。

  3. **スレッドセーフ:** コードはスレッドセーフである必要があります。Python向けBeam SDKは、これを容易にするために`RangeTracker`クラスを提供します。

  4. **テスト容易性:** すべての`Source`および`FileBasedSink`サブクラスを徹底的に単体テストすることが重要です。実装の軽微なエラーが、検出が困難なデータの破損またはデータの損失(レコードのスキップや重複など)につながる可能性があります。 source_test_utilsモジュールで利用できるテストハーネスとユーティリティメソッドを使用して、ソースのテストを開発できます。

さらに、Beamの変換スタイルのガイダンスについては、PTransformスタイルガイドを参照してください。

Sourceインターフェースの実装

パイプラインの新しいデータソースを作成するには、サービスに入力ソースからデータを読み取る方法と、複数のワーカーインスタンスがデータを並列に読み取ることができるようにデータソースを複数の部分に分割する方法を指示する、形式固有のロジックを提供する必要があります。

次のクラスを作成することで、新しいソースのロジックを提供します。

これらのクラスは、apache_beam.io.iobaseモジュールにあります。

BoundedSourceサブクラスの実装

`BoundedSource`は、サービスが、場合によっては並列に読み取る有限のデータセットを表します。 `BoundedSource`には、サービスが複数のリモートワーカーによる読み取りのためにデータセットを分割するために使用する一連のメソッドが含まれています。

`BoundedSource`を実装するには、サブクラスで次のメソッドをオーバーライドする必要があります。

RangeTrackerサブクラスの実装

`RangeTracker`は、`BoundedSource`のリーダーの現在の範囲と現在位置を管理し、それらへの同時アクセスを保護するために使用されるスレッドセーフなオブジェクトです。

`RangeTracker`を実装するには、まず次の定義をよく理解する必要があります。

RangeTrackerのメソッド

RangeTracker を実装するには、サブクラスで次のメソッドをオーバーライドする必要があります

このメソッドは、split_position がまだ消費されていないと仮定して、現在の範囲 [self.start_position, self.stop_position) を「プライマリ」部分 [self.start_position, split_position) と「残差」部分 [split_position, self.stop_position) に分割します。

split_position がすでに消費されている場合、メソッドは None を返します。そうでない場合、現在の範囲をプライマリに更新し、タプル (split_position, split_fraction) を返します。split_fraction は、元の(分割前)範囲 [self.start_position, self.stop_position) と比較した範囲 [self.start_position, split_position) のサイズの割合である必要があります。

**注:** クラス iobase.RangeTracker のメソッドは複数のスレッドによって呼び出される可能性があるため、このクラスは、たとえば単一のロックオブジェクトを使用してスレッドセーフにする必要があります。

便利なSource基本クラス

Python 用 Beam SDK には、新しいソースを簡単に作成するのに役立つ便利な抽象基底クラスがいくつか含まれています。

FileBasedSource

FileBasedSource は、新しいファイルタイプのソースを開発するためのフレームワークです。FileBasedSource クラスから BoundedSource クラスを派生させることができます。

新しいファイルタイプのソースを作成するには、FileBasedSource のサブクラスを作成する必要があります。FileBasedSource のサブクラスは、メソッド FileBasedSource.read_records() を実装する必要があります。

FileBasedSource の実装例については、AvroSource を参照してください。

新しいSourceからの読み取り

次の例 CountingSource は、BoundedSource の実装を示し、SDK 提供の OffsetRangeTracker と呼ばれる RangeTracker を使用しています。

class CountingSource(iobase.BoundedSource):
  def __init__(self, count):
    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
    self._count = count

  def estimate_size(self):
    return self._count

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    return OffsetRangeTracker(start_position, stop_position)

  def read(self, range_tracker):
    for i in range(range_tracker.start_position(),
                   range_tracker.stop_position()):
      if not range_tracker.try_claim(i):
        return
      self.records_read.inc()
      yield i

  def split(self, desired_bundle_size, start_position=None, stop_position=None):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    bundle_start = start_position
    while bundle_start < stop_position:
      bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
      yield iobase.SourceBundle(
          weight=(bundle_stop - bundle_start),
          source=self,
          start_position=bundle_start,
          stop_position=bundle_stop)
      bundle_start = bundle_stop

パイプラインでソースからデータを読み取るには、Read 変換を使用します

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))

**注:** エンドユーザーが使用するソースを作成する場合、上記の例に示すようにソース自体のコードを公開しないことをお勧めします。代わりに、ラッパー PTransform を使用してください。PTransform ラッパー では、ソースを公開しない理由と、ラッパーの作成方法について説明しています。

FileBasedSink抽象化の使用

データソースでファイルを使用する場合、FileBasedSink 抽象化を実装して、ファイルベースのシンクを作成できます。他のシンクの場合は、ParDoGroupByKey、および Python 用 Beam SDK によって提供される他の変換を使用します。詳細については、I/O コネクタの開発の概要 を参照してください。

FileBasedSink インターフェースを使用する場合、パイプラインの PCollection から出力シンクに境界のあるデータを書き込む方法をランナーに指示する、形式固有のロジックを提供する必要があります。ランナーは、複数のワーカーを使用してデータのバンドルを並列に書き込みます。

次のクラスを実装することにより、ファイルベースのシンクのロジックを提供します

FileBasedSink 抽象基底クラスは、次のようなファイルと対話する Beam シンクに共通のコードを実装します。

FileBasedSink とそのサブクラスは、Beam でサポートされている FileSystem 実装へのファイルの書き込みをサポートしています。例として、次の Beam 提供の FileBasedSink 実装を参照してください。

PTransformラッパー

エンドユーザーが使用するソースまたはシンクを作成する場合、ソースまたはシンクのコードを公開しないでください。ソースとシンクをエンドユーザーに公開しないようにするには、新しいクラスで `_` プレフィックスを使用する必要があります。次に、ユーザー向けのラッパー PTransform を実装します。ソースまたはシンクを変換として公開することにより、実装は非表示になり、任意に複雑または単純にすることができます。実装の詳細を公開しないことの最大の利点は、後で既存のユーザー向けの実装を壊すことなく、追加の機能を追加できることです。

たとえば、ユーザーのパイプラインが beam.io.Read を使用してソースから読み取り、パイプラインに再シャードを挿入する場合、すべてのユーザーは再シャードを自分で追加する必要があります(GroupByKey 変換を使用)。これを解決するために、読み取り操作と再シャードの両方を実行する複合 PTransform としてソースを公開することをお勧めします。

PTransform でのラッピングの詳細については、Beam の PTransform スタイルガイド を参照してください。

次の例では、上記のセクションのソースとシンクを変更して、エンドユーザーに公開されないようにします。ソースの場合は、CountingSource の名前を _CountingSource に変更します。次に、ReadFromCountingSource と呼ばれるラッパー PTransform を作成します

class ReadFromCountingSource(PTransform):
  def __init__(self, count):
    super().__init__()
    self._count = count

  def expand(self, pcoll):
    return pcoll | iobase.Read(_CountingSource(self._count))

最後に、ソースから読み取ります

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)

シンクの場合は、SimpleKVSink の名前を _SimpleKVSink に変更します。次に、WriteToKVSink と呼ばれるラッパー PTransform を作成します

class WriteToKVSink(PTransform):
  def __init__(self, simplekv, url, final_table_name):
    self._simplekv = simplekv
    super().__init__()
    self._url = url
    self._final_table_name = final_table_name

  def expand(self, pcoll):
    return pcoll | iobase.Write(
        _SimpleKVSink(self._simplekv, self._url, self._final_table_name))

最後に、シンクに書き込みます

with beam.Pipeline(options=PipelineOptions()) as pipeline:
  kvs = pipeline | 'CreateKVs' >> beam.core.Create(KVs)
  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
      simplekv, 'http://url_to_simple_kv/', final_table_name)