Python の型安全性の確保
Python は静的な型チェックのない動的型付け言語です。Python の型チェックの仕組みと、ランナーの実行が遅延する性質のために、開発者の生産性は、型関連のエラーの調査に費やす時間によって簡単にボトルネックになる可能性があります。
Python 用の Apache Beam SDK は、パイプラインの構築と実行時に型ヒントを使用して、真の静的型付けによって達成される正当性の保証をエミュレートしようとします。さらに、型ヒントを使用すると、バックエンドサービスが効率的な型推論と Coder
オブジェクトの登録を実行できるようになります。
Python バージョン 3.5 では、言語の型バリデーターにヒントを提供するためのtypingというモジュールが導入されました。Python 用の Beam SDK は、PEP 484 のサブセットを実装し、独自の typehints モジュールで可能な限りそれに従うことを目指しています。
これらのフラグは Beam の型安全性を制御します。
--no_pipeline_type_check
パイプライン構築中の型チェックを無効にします。デフォルトではこれらのチェックを実行します。
--runtime_type_check
すべての要素のランタイム型チェックを有効にします。これによりパイプラインのパフォーマンスに影響を与える可能性があるため、デフォルトではこれらのチェックはスキップされます。
--type_check_additional
追加の型チェックを有効にします。これらは、下位互換性を維持するために、デフォルトでは有効になっていません。このフラグは、コンマ区切りのオプションのリストを受け入れます。
all
: すべての追加チェックを有効にします。ptransform_fn
:@ptransform_fn
デコレーターで使用する場合に型ヒントデコレータを有効にします。
型ヒントの利点
型ヒントを使用すると、Beam は実行時ではなく、パイプライン構築時に例外を発生させます。たとえば、パイプラインが一致しない PTransform
を適用している(ある変換の予期される出力が、後続の変換の予期される入力と一致しない)ことを検出した場合、Beam は例外を生成します。これらの例外は、パイプラインがどこで実行されるかにかかわらず、パイプライン構築時に発生します。定義する PTransform
に型ヒントを導入すると、深い複雑なパイプラインの実行が数分経過した後ではなく、ローカルランナーで潜在的なバグを事前にキャッチできます。
numbers
が str
値の PCollection
である次の例を考えてみましょう。
次に、コードは、偶数を取得する呼び出し可能オブジェクトを使用して、numbers
コレクションに Filter
変換を適用します。
p.run()
を呼び出すと、このコードは Filter
が整数の PCollection
を予期しているが、代わりに文字列の PCollection
が渡されているため、この変換を実行しようとするとエラーを生成します。型ヒントを使用すると、このエラーはパイプラインが実行を開始する前のパイプライン構築時にキャッチできた可能性があります。
Python 用の Beam SDK には、いくつかの自動型ヒントが含まれています。たとえば、Create
や単純な ParDo
変換などの一部の PTransform
は、入力に基づいて出力型を推測しようとします。ただし、Beam はすべての場合に型を推測できるわけではありません。したがって、独自の型チェックを実行する際に役立つように型ヒントを宣言することをお勧めします。
型ヒントの宣言
呼び出し可能オブジェクト、DoFn
、または PTransform
全体で型ヒントを宣言できます。型ヒントを宣言する方法は 3 つあります。パイプライン構築中にインラインで宣言する方法、デコレータを使用して DoFn
または PTransform
のプロパティとして宣言する方法、または特定の関数で Python 3 の型アノテーションとして宣言する方法です。
型ヒントは常にインラインで宣言できますが、再利用されるコードに型ヒントが必要な場合は、アノテーションまたはデコレータとして宣言します。たとえば、DoFn
が int
入力を必要とする場合、入力の型ヒントをインラインではなく、process
への引数のアノテーション(または DoFn
のプロパティ)として宣言する方が理にかなっています。
アノテーションを使用すると、静的型チェッカー(mypy など)を使用してコードの型をさらにチェックできるという利点もあります。すでに型チェッカーを使用している場合は、デコレータの代わりにアノテーションを使用すると、コードの重複が減ります。ただし、アノテーションは、デコレータとインライン宣言が行うすべてのユースケースをカバーしていません。たとえば、ラムダ関数では機能しません。
型アノテーションを使用した型ヒントの宣言
バージョン 2.21.0 で新しく追加。
特定の関数のアノテーションとして型ヒントを指定するには、通常どおりに使用し、デコレータヒントまたはインラインヒントを省略します。
アノテーションは現在、次でサポートされています。
DoFn
サブクラスのprocess()
メソッド。PTransform
サブクラスのexpand()
メソッド。ParDo
、Map
、FlatMap
、Filter
に渡される関数。
次のコードでは、my_fn
のアノテーションを使用して、to_id
変換に int
入力と str
出力の型ヒントを宣言しています。
次のコードは、PTransform
サブクラスでアノテーションを使用する方法を示しています。有効なアノテーションは、内部(ネストされた)型、PBegin
、PDone
、または None
をラップする PCollection
です。次のコードでは、アノテーションを使用して、PCollection[int]
入力を受け取り、PCollection[str]
を出力するカスタム PTransform で型ヒントを宣言しています。
次のコードでは、FilterEvensDoFn.process
のアノテーションを使用して、filter_evens
に int
入力と出力の型ヒントを宣言しています。process
はジェネレーターを返すため、PCollection[int]
を生成する DoFn の出力型は Iterable[int]
としてアノテーションされます (Generator[int, None, None]
でもここで機能します)。Beam は、結果の PCollection の要素型を推測するために、DoFn.process
メソッドと FlatMap
に渡される関数の戻り型の外側の反復可能型を削除します。これらの関数に反復可能型以外の戻り型のアノテーションを設定するとエラーになります。サポートされている他の反復可能型には、Iterator
、Generator
、Tuple
、List
があります。
次のコードでは、FilterEvensDoubleDoFn.process
のアノテーションを使用して、double_evens
に int
入力と出力の型ヒントを宣言しています。process
は list
または None
を返すため、出力型は Optional[List[int]]
としてアノテーションされます。Beam は、DoFn.process
メソッドと FlatMap
に渡される関数のみで、外側の Optional
と(上記のように)戻り型の外側の反復可能型も削除します。
インラインでの型ヒントの宣言
型ヒントをインラインで指定するには、with_input_types
メソッドと with_output_types
メソッドを使用します。次のコード例では、インラインで入力型ヒントを宣言しています。
上記の例の numbers コレクションに Filter 変換を適用すると、パイプラインの構築中にエラーをキャッチできます。
デコレータを使用した型ヒントの宣言
DoFn
または PTransform
のプロパティとして型ヒントを指定するには、デコレーター @with_input_types()
と @with_output_types()
を使用します。
次のコードでは、デコレーター @with_input_types()
を使用して、FilterEvensDoFn
に int
型ヒントを宣言しています。
デコレーターは、位置引数やキーワード引数を任意に受け取ります。通常、それらはラップしている関数のコンテキストで解釈されます。一般的に、最初の引数はメイン入力の型ヒントであり、追加の引数はサイド入力の型ヒントです。
アノテーションの使用の無効化
このスタイルの型ヒント宣言はデフォルトで有効になっているため、無効にする方法をいくつか示します。
- Beam に注釈を無視させたい特定の関数に対して、
@beam.typehints.no_annotations
デコレーターを使用します。 - 上記のデコレーターまたはインラインメソッドを使用して型ヒントを宣言します。これらは、アノテーションよりも優先されます。
- パイプラインを作成する前に
beam.typehints.disable_type_annotations()
を呼び出します。これにより、Beam はすべての関数のアノテーションを参照しなくなります。
ジェネリック型の定義
型ヒントのアノテーションを使用して、ジェネリック型を定義できます。次のコードは、ジェネリック型 T
をアサートする入力型ヒントと、型 Tuple[int, T]
をアサートする出力型ヒントを指定します。MyTransform
への入力が str
型の場合、Beam は出力型が Tuple[int, str]
であると推論します。
型ヒントの種類
型ヒントは、Python のプリミティブ型、コンテナクラス、およびユーザー定義クラスを含む、任意のクラスで使用できます。int
、float
、ユーザー定義クラスなど、すべてのクラスを型ヒントを定義するために使用でき、これらは **単純型ヒント** と呼ばれます。リスト、タプル、イテラブルなどのコンテナ型も型ヒントを定義するために使用でき、これらは **パラメータ化された型ヒント** と呼ばれます。最後に、Any
、Optional
、Union
など、具体的な Python クラスに対応しない特別な型もあり、これらも型ヒントとして許可されています。
Beam は独自の内部型ヒント型を定義しており、後方互換性のために引き続き使用できます。また、Python の typing モジュールの型もサポートしており、内部的に Beam 内部型に変換されます。
新しいコードでは、typing モジュールの型を使用することをお勧めします。
シンプルな型ヒント
型ヒントは、int
や str
からユーザー定義クラスまで、任意のクラスを使用できます。クラスを型ヒントとして使用する場合は、そのクラスのコーダーを定義することが望ましい場合があります。
パラメータ化された型ヒント
パラメータ化された型ヒントは、list
などのコンテナのような Python オブジェクトの型をヒントするのに役立ちます。これらの型ヒントは、コンテナオブジェクト内の要素をさらに絞り込みます。
パラメータ化された型ヒントのパラメータには、単純型、パラメータ化された型、または型変数を使用できます。型変数である要素型 (例: T
) は、操作の入力と出力の間にリレーションシップを課します (例: List[T]
-> T
)。型ヒントはネストできるため、複雑な型の型ヒントを定義できます。たとえば、List[Tuple[int, int, str]]
のように。
組み込みのコンテナ型の名前空間と競合しないように、最初の文字は大文字で記述します。
以下のパラメータ化された型ヒントが許可されています
Tuple[T, U]
Tuple[T, ...]
List[T]
KV[T, U]
Dict[T, U]
Set[T]
FrozenSet[T]
Iterable[T]
Iterator[T]
Generator[T]
PCollection[T]
注: Tuple[T, U]
型ヒントは、異なる型を持つ固定数の要素を持つタプルであり、Tuple[T, ...]
型ヒントは、同じ型を持つ可変数の要素を持つタプルです。
特別な型ヒント
以下は、クラスに対応するのではなく、PEP 484 で導入された特別な型に対応する特別な型ヒントです。
Any
Union[T, U, V]
Optional[T]
ランタイム型チェック
パイプライン構築時の型チェックに型ヒントを使用することに加えて、実行時に型チェックを有効にして、実際の要素がパイプライン実行中に宣言された型制約を満たしているかを確認できます。
たとえば、次のパイプラインは間違った型の要素を出力します。ランナーの実装によっては、実行時に失敗する場合と失敗しない場合があります。
ただし、ランタイム型チェックを有効にすると、コードは実行時に必ず失敗します。ランタイム型チェックを有効にするには、パイプラインオプション runtime_type_check
を True
に設定します。
ランタイム型チェックは PCollection
の各要素に対して行われるため、この機能を有効にすると、パフォーマンスが大幅に低下する可能性があることに注意してください。したがって、本番パイプラインではランタイム型チェックを無効にすることをお勧めします。より高速で、本番環境に適した代替案については、次のセクションを参照してください。
より高速なランタイム型チェック
パイプラインオプション performance_runtime_type_check
を True
に設定すると、より高速なサンプリングベースのランタイム型チェックを有効にできます。
これは Python 3 のみの機能であり、最適化された Cython コードを使用して、サンプルと呼ばれる値の小さなサブセットをランタイム型チェックすることで機能します。
現在、この機能はサイド入力または結合操作のランタイム型チェックをサポートしていません。これらは、Beam の将来のリリースでサポートされる予定です。
コーダーでの型ヒントの使用
パイプラインがデータを読み取り、書き込み、またはその他の方法で具体化する場合、PCollection
内の要素は、バイト文字列との間でエンコードおよびデコードする必要があります。バイト文字列は、中間ストレージ、GroupByKey
操作でのキーの比較、ソースからの読み取り、およびシンクへの書き込みに使用されます。
Beam SDK for Python は、不明な型のオブジェクトをシリアル化するための Python のネイティブサポートを使用します。これは **ピクル** と呼ばれるプロセスです。ただし、PickleCoder
を使用すると、時間と空間の効率が低下し、使用されるエンコーディングが非決定的であるという欠点があります。これは、分散パーティショニング、グループ化、および状態のルックアップを妨げます。
これらの欠点を回避するために、より効率的な方法で型をエンコードおよびデコードするための Coder
クラスを定義できます。Coder
を指定して、指定された PCollection
の要素をエンコードおよびデコードする方法を記述できます。
正しく効率的に動作するには、Coder
に型情報が必要であり、PCollection
を特定の型に関連付ける必要があります。この型情報を使用可能にするのが型ヒントです。Beam SDK for Python は、int
、float
、str
、bytes
、unicode
などの標準的な Python 型の組み込みコーダーを提供します。
決定論的コーダー
Coder
を定義しない場合、デフォルトは不明な型に対してピクルにフォールバックするコーダーになります。場合によっては、決定的な Coder
を指定する必要があります。そうしないと、ランタイムエラーが発生します。
たとえば、キーが Player
オブジェクトであるキーと値のペアの PCollection
があるとします。このようなコレクションに GroupByKey
変換を適用すると、デフォルトのピクルコーダーなどの非決定的なコーダーを使用した場合、キーオブジェクトが異なるマシンで異なるシリアル化される可能性があります。GroupByKey
はこのシリアル化された表現を使用してキーを比較するため、これにより誤った動作が発生する可能性があります。要素が常に同じ方法でエンコードおよびデコードされるようにするには、Player
クラスに対して決定的な Coder
を定義する必要があります。
次のコードは、Player
クラスの例と、その Coder
を定義する方法を示しています。型ヒントを使用すると、Beam は beam.coders.registry
を使用して、使用する Coder
を推論します。次のコードは、PlayerCoder
を Player
クラスのコーダーとして登録します。この例では、CombinePerKey
に対して宣言された入力型は Tuple[Player, int]
です。この場合、Beam は使用する Coder
オブジェクトが TupleCoder
、PlayerCoder
、および IntCoder
であると推論します。
from typing import Tuple
class Player(object):
def __init__(self, team, name):
self.team = team
self.name = name
class PlayerCoder(beam.coders.Coder):
def encode(self, player):
return ('%s:%s' % (player.team, player.name)).encode('utf-8')
def decode(self, s):
return Player(*s.decode('utf-8').split(':'))
def is_deterministic(self):
return True
beam.coders.registry.register_coder(Player, PlayerCoder)
def parse_player_and_score(csv):
name, team, score = csv.split(',')
return Player(team, name), int(score)
totals = (
lines
| beam.Map(parse_player_and_score)
| beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))