Python の型安全性の確保

Python は静的な型チェックのない動的型付け言語です。Python の型チェックの仕組みと、ランナーの実行が遅延する性質のために、開発者の生産性は、型関連のエラーの調査に費やす時間によって簡単にボトルネックになる可能性があります。

Python 用の Apache Beam SDK は、パイプラインの構築と実行時に型ヒントを使用して、真の静的型付けによって達成される正当性の保証をエミュレートしようとします。さらに、型ヒントを使用すると、バックエンドサービスが効率的な型推論と Coder オブジェクトの登録を実行できるようになります。

Python バージョン 3.5 では、言語の型バリデーターにヒントを提供するためのtypingというモジュールが導入されました。Python 用の Beam SDK は、PEP 484 のサブセットを実装し、独自の typehints モジュールで可能な限りそれに従うことを目指しています。

これらのフラグは Beam の型安全性を制御します。

型ヒントの利点

型ヒントを使用すると、Beam は実行時ではなく、パイプライン構築時に例外を発生させます。たとえば、パイプラインが一致しない PTransform を適用している(ある変換の予期される出力が、後続の変換の予期される入力と一致しない)ことを検出した場合、Beam は例外を生成します。これらの例外は、パイプラインがどこで実行されるかにかかわらず、パイプライン構築時に発生します。定義する PTransform に型ヒントを導入すると、深い複雑なパイプラインの実行が数分経過した後ではなく、ローカルランナーで潜在的なバグを事前にキャッチできます。

numbersstr 値の PCollection である次の例を考えてみましょう。

p = TestPipeline()

numbers = p | beam.Create(['1', '2', '3'])

次に、コードは、偶数を取得する呼び出し可能オブジェクトを使用して、numbers コレクションに Filter 変換を適用します。

evens = numbers | beam.Filter(lambda x: x % 2 == 0)

p.run() を呼び出すと、このコードは Filter が整数の PCollection を予期しているが、代わりに文字列の PCollection が渡されているため、この変換を実行しようとするとエラーを生成します。型ヒントを使用すると、このエラーはパイプラインが実行を開始する前のパイプライン構築時にキャッチできた可能性があります。

Python 用の Beam SDK には、いくつかの自動型ヒントが含まれています。たとえば、Create や単純な ParDo 変換などの一部の PTransform は、入力に基づいて出力型を推測しようとします。ただし、Beam はすべての場合に型を推測できるわけではありません。したがって、独自の型チェックを実行する際に役立つように型ヒントを宣言することをお勧めします。

型ヒントの宣言

呼び出し可能オブジェクト、DoFn、または PTransform 全体で型ヒントを宣言できます。型ヒントを宣言する方法は 3 つあります。パイプライン構築中にインラインで宣言する方法、デコレータを使用して DoFn または PTransform のプロパティとして宣言する方法、または特定の関数で Python 3 の型アノテーションとして宣言する方法です。

型ヒントは常にインラインで宣言できますが、再利用されるコードに型ヒントが必要な場合は、アノテーションまたはデコレータとして宣言します。たとえば、DoFnint 入力を必要とする場合、入力の型ヒントをインラインではなく、process への引数のアノテーション(または DoFn のプロパティ)として宣言する方が理にかなっています。

アノテーションを使用すると、静的型チェッカー(mypy など)を使用してコードの型をさらにチェックできるという利点もあります。すでに型チェッカーを使用している場合は、デコレータの代わりにアノテーションを使用すると、コードの重複が減ります。ただし、アノテーションは、デコレータとインライン宣言が行うすべてのユースケースをカバーしていません。たとえば、ラムダ関数では機能しません。

型アノテーションを使用した型ヒントの宣言

バージョン 2.21.0 で新しく追加。

特定の関数のアノテーションとして型ヒントを指定するには、通常どおりに使用し、デコレータヒントまたはインラインヒントを省略します。

アノテーションは現在、次でサポートされています。

次のコードでは、my_fn のアノテーションを使用して、to_id 変換に int 入力と str 出力の型ヒントを宣言しています。

def my_fn(element: int) -> str:
  return 'id_' + str(element)

ids = numbers | 'to_id' >> beam.Map(my_fn)

次のコードは、PTransform サブクラスでアノテーションを使用する方法を示しています。有効なアノテーションは、内部(ネストされた)型、PBeginPDone、または None をラップする PCollection です。次のコードでは、アノテーションを使用して、PCollection[int] 入力を受け取り、PCollection[str] を出力するカスタム PTransform で型ヒントを宣言しています。

from apache_beam.pvalue import PCollection

class IntToStr(beam.PTransform):
  def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
    return pcoll | beam.Map(lambda elem: str(elem))

ids = numbers | 'convert to str' >> IntToStr()

次のコードでは、FilterEvensDoFn.process のアノテーションを使用して、filter_evensint 入力と出力の型ヒントを宣言しています。process はジェネレーターを返すため、PCollection[int] を生成する DoFn の出力型は Iterable[int] としてアノテーションされます (Generator[int, None, None] でもここで機能します)。Beam は、結果の PCollection の要素型を推測するために、DoFn.process メソッドと FlatMap に渡される関数の戻り型の外側の反復可能型を削除します。これらの関数に反復可能型以外の戻り型のアノテーションを設定するとエラーになります。サポートされている他の反復可能型には、IteratorGeneratorTupleList があります。

from typing import Iterable

class TypedFilterEvensDoFn(beam.DoFn):
  def process(self, element: int) -> Iterable[int]:
    if element % 2 == 0:
      yield element

evens = numbers | 'filter_evens' >> beam.ParDo(TypedFilterEvensDoFn())

次のコードでは、FilterEvensDoubleDoFn.process のアノテーションを使用して、double_evensint 入力と出力の型ヒントを宣言しています。processlist または None を返すため、出力型は Optional[List[int]] としてアノテーションされます。Beam は、DoFn.process メソッドと FlatMap に渡される関数のみで、外側の Optional と(上記のように)戻り型の外側の反復可能型も削除します。

from typing import List, Optional

class FilterEvensDoubleDoFn(beam.DoFn):
  def process(self, element: int) -> Optional[List[int]]:
    if element % 2 == 0:
      return [element, element]
    return None

evens = numbers | 'double_evens' >> beam.ParDo(FilterEvensDoubleDoFn())

インラインでの型ヒントの宣言

型ヒントをインラインで指定するには、with_input_types メソッドと with_output_types メソッドを使用します。次のコード例では、インラインで入力型ヒントを宣言しています。

evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)

上記の例の numbers コレクションに Filter 変換を適用すると、パイプラインの構築中にエラーをキャッチできます。

デコレータを使用した型ヒントの宣言

DoFn または PTransform のプロパティとして型ヒントを指定するには、デコレーター @with_input_types()@with_output_types() を使用します。

次のコードでは、デコレーター @with_input_types() を使用して、FilterEvensDoFnint 型ヒントを宣言しています。

@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
  def process(self, element):
    if element % 2 == 0:
      yield element

evens = numbers | beam.ParDo(FilterEvensDoFn())

デコレーターは、位置引数やキーワード引数を任意に受け取ります。通常、それらはラップしている関数のコンテキストで解釈されます。一般的に、最初の引数はメイン入力の型ヒントであり、追加の引数はサイド入力の型ヒントです。

アノテーションの使用の無効化

このスタイルの型ヒント宣言はデフォルトで有効になっているため、無効にする方法をいくつか示します。

  1. Beam に注釈を無視させたい特定の関数に対して、@beam.typehints.no_annotations デコレーターを使用します。
  2. 上記のデコレーターまたはインラインメソッドを使用して型ヒントを宣言します。これらは、アノテーションよりも優先されます。
  3. パイプラインを作成する前に beam.typehints.disable_type_annotations() を呼び出します。これにより、Beam はすべての関数のアノテーションを参照しなくなります。

ジェネリック型の定義

型ヒントのアノテーションを使用して、ジェネリック型を定義できます。次のコードは、ジェネリック型 T をアサートする入力型ヒントと、型 Tuple[int, T] をアサートする出力型ヒントを指定します。MyTransform への入力が str 型の場合、Beam は出力型が Tuple[int, str] であると推論します。

from typing import Tuple, TypeVar

T = TypeVar('T')

@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(Tuple[int, T])
class MyTransform(beam.PTransform):
  def expand(self, pcoll):
    return pcoll | beam.Map(lambda x: (len(x), x))

words_with_lens = words | MyTransform()

型ヒントの種類

型ヒントは、Python のプリミティブ型、コンテナクラス、およびユーザー定義クラスを含む、任意のクラスで使用できます。intfloat、ユーザー定義クラスなど、すべてのクラスを型ヒントを定義するために使用でき、これらは **単純型ヒント** と呼ばれます。リスト、タプル、イテラブルなどのコンテナ型も型ヒントを定義するために使用でき、これらは **パラメータ化された型ヒント** と呼ばれます。最後に、AnyOptionalUnion など、具体的な Python クラスに対応しない特別な型もあり、これらも型ヒントとして許可されています。

Beam は独自の内部型ヒント型を定義しており、後方互換性のために引き続き使用できます。また、Python の typing モジュールの型もサポートしており、内部的に Beam 内部型に変換されます。

新しいコードでは、typing モジュールの型を使用することをお勧めします。

シンプルな型ヒント

型ヒントは、intstr からユーザー定義クラスまで、任意のクラスを使用できます。クラスを型ヒントとして使用する場合は、そのクラスのコーダーを定義することが望ましい場合があります。

パラメータ化された型ヒント

パラメータ化された型ヒントは、list などのコンテナのような Python オブジェクトの型をヒントするのに役立ちます。これらの型ヒントは、コンテナオブジェクト内の要素をさらに絞り込みます。

パラメータ化された型ヒントのパラメータには、単純型、パラメータ化された型、または型変数を使用できます。型変数である要素型 (例: T) は、操作の入力と出力の間にリレーションシップを課します (例: List[T] -> T)。型ヒントはネストできるため、複雑な型の型ヒントを定義できます。たとえば、List[Tuple[int, int, str]] のように。

組み込みのコンテナ型の名前空間と競合しないように、最初の文字は大文字で記述します。

以下のパラメータ化された型ヒントが許可されています

注: Tuple[T, U] 型ヒントは、異なる型を持つ固定数の要素を持つタプルであり、Tuple[T, ...] 型ヒントは、同じ型を持つ可変数の要素を持つタプルです。

特別な型ヒント

以下は、クラスに対応するのではなく、PEP 484 で導入された特別な型に対応する特別な型ヒントです。

ランタイム型チェック

パイプライン構築時の型チェックに型ヒントを使用することに加えて、実行時に型チェックを有効にして、実際の要素がパイプライン実行中に宣言された型制約を満たしているかを確認できます。

たとえば、次のパイプラインは間違った型の要素を出力します。ランナーの実装によっては、実行時に失敗する場合と失敗しない場合があります。

p = TestPipeline()
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)

ただし、ランタイム型チェックを有効にすると、コードは実行時に必ず失敗します。ランタイム型チェックを有効にするには、パイプラインオプション runtime_type_checkTrue に設定します。

p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()

ランタイム型チェックは PCollection の各要素に対して行われるため、この機能を有効にすると、パフォーマンスが大幅に低下する可能性があることに注意してください。したがって、本番パイプラインではランタイム型チェックを無効にすることをお勧めします。より高速で、本番環境に適した代替案については、次のセクションを参照してください。

より高速なランタイム型チェック

パイプラインオプション performance_runtime_type_checkTrue に設定すると、より高速なサンプリングベースのランタイム型チェックを有効にできます。

これは Python 3 のみの機能であり、最適化された Cython コードを使用して、サンプルと呼ばれる値の小さなサブセットをランタイム型チェックすることで機能します。

現在、この機能はサイド入力または結合操作のランタイム型チェックをサポートしていません。これらは、Beam の将来のリリースでサポートされる予定です。

コーダーでの型ヒントの使用

パイプラインがデータを読み取り、書き込み、またはその他の方法で具体化する場合、PCollection 内の要素は、バイト文字列との間でエンコードおよびデコードする必要があります。バイト文字列は、中間ストレージ、GroupByKey 操作でのキーの比較、ソースからの読み取り、およびシンクへの書き込みに使用されます。

Beam SDK for Python は、不明な型のオブジェクトをシリアル化するための Python のネイティブサポートを使用します。これは **ピクル** と呼ばれるプロセスです。ただし、PickleCoder を使用すると、時間と空間の効率が低下し、使用されるエンコーディングが非決定的であるという欠点があります。これは、分散パーティショニング、グループ化、および状態のルックアップを妨げます。

これらの欠点を回避するために、より効率的な方法で型をエンコードおよびデコードするための Coder クラスを定義できます。Coder を指定して、指定された PCollection の要素をエンコードおよびデコードする方法を記述できます。

正しく効率的に動作するには、Coder に型情報が必要であり、PCollection を特定の型に関連付ける必要があります。この型情報を使用可能にするのが型ヒントです。Beam SDK for Python は、intfloatstrbytesunicode などの標準的な Python 型の組み込みコーダーを提供します。

決定論的コーダー

Coder を定義しない場合、デフォルトは不明な型に対してピクルにフォールバックするコーダーになります。場合によっては、決定的な Coder を指定する必要があります。そうしないと、ランタイムエラーが発生します。

たとえば、キーが Player オブジェクトであるキーと値のペアの PCollection があるとします。このようなコレクションに GroupByKey 変換を適用すると、デフォルトのピクルコーダーなどの非決定的なコーダーを使用した場合、キーオブジェクトが異なるマシンで異なるシリアル化される可能性があります。GroupByKey はこのシリアル化された表現を使用してキーを比較するため、これにより誤った動作が発生する可能性があります。要素が常に同じ方法でエンコードおよびデコードされるようにするには、Player クラスに対して決定的な Coder を定義する必要があります。

次のコードは、Player クラスの例と、その Coder を定義する方法を示しています。型ヒントを使用すると、Beam は beam.coders.registry を使用して、使用する Coder を推論します。次のコードは、PlayerCoderPlayer クラスのコーダーとして登録します。この例では、CombinePerKey に対して宣言された入力型は Tuple[Player, int] です。この場合、Beam は使用する Coder オブジェクトが TupleCoderPlayerCoder、および IntCoder であると推論します。

from typing import Tuple

class Player(object):
  def __init__(self, team, name):
    self.team = team
    self.name = name

class PlayerCoder(beam.coders.Coder):
  def encode(self, player):
    return ('%s:%s' % (player.team, player.name)).encode('utf-8')

  def decode(self, s):
    return Player(*s.decode('utf-8').split(':'))

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(Player, PlayerCoder)

def parse_player_and_score(csv):
  name, team, score = csv.split(',')
  return Player(team, name), int(score)

totals = (
    lines
    | beam.Map(parse_player_and_score)
    | beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))