ParDo
![]() |
汎用並列処理のための変換です。ParDo
変換は、入力PCollection
内の各要素を考慮し、その要素に対して何らかの処理関数(ユーザーコード)を実行し、ゼロ個以上の要素を出力PCollection
に出力します。
Beamプログラミングガイドで詳細情報をご覧ください。
例
以下の例では、カスタムDoFn
を作成し、タイムスタンプとウィンドウ情報をアクセスする方法を説明します。
例1:単純なDoFnを使用したParDo
次の例では、delimiter
をオブジェクトフィールドとして格納するSplitWords
という単純なDoFn
クラスを定義します。process
メソッドは要素ごとに1回呼び出され、ゼロ個以上の出力要素を生成できます。
例2:タイムスタンプとウィンドウ情報を使用したParDo
この例では、実行時にパラメータ値をバインドするために、process
メソッドに新しいパラメータを追加します。
beam.DoFn.TimestampParam
は、タイムスタンプ情報をapache_beam.utils.timestamp.Timestamp
オブジェクトとしてバインドします。beam.DoFn.WindowParam
は、ウィンドウ情報を適切なapache_beam.transforms.window.*Window
オブジェクトとしてバインドします。
例3:DoFnメソッドを使用したParDo
DoFn
は、より複雑な動作を作成するのに役立ついくつかのメソッドでカスタマイズできます。ワーカーが開始時とシャットダウン時に実行する動作を、setup
とteardown
でカスタマイズできます。要素のバンドルの開始と終了時に実行する動作を、start_bundle
とfinish_bundle
でカスタマイズすることもできます。
DoFn.setup()
:DoFn
インスタンスがワーカーで逆シリアル化されるたびに呼び出されます。これは、特定のDoFn
サブクラスの複数のインスタンスが作成される可能性があるため(例:並列化による、または一定期間の使用がない場合のガベージコレクションによる)、ワーカーごとに複数回呼び出される可能性があります。これは、データベースインスタンスへの接続、ネットワーク接続のオープン、その他のリソースの取得に適した場所です。コンテキストマネージャーを介してこれを実現する方法については、DoFn.SetupContextParam
も参照してください。DoFn.start_bundle()
:バンドルの最初の要素でprocess
を呼び出す前に、要素のバンドルごとに1回呼び出されます。これは、バンドルの要素の追跡を開始するのに適した場所です。コンテキストマネージャーを介してこれを実現する方法については、DoFn.BundleContextParam
も参照してください。DoFn.process(element, *args, **kwargs)
:要素ごとに1回呼び出され、ゼロ個以上の要素を生成できます。追加の*args
または**kwargs
は、beam.ParDo()
を介して渡すことができます。[必須]DoFn.finish_bundle()
:バンドルの最後の要素の後でprocess
を呼び出した後、要素のバンドルごとに1回呼び出され、ゼロ個以上の要素を生成できます。これは、バンドルの要素に対してバッチ呼び出しを行うのに適した場所です(データベースクエリの実行など)。たとえば、
start_bundle
でバッチを初期化し、process
で要素をバッチに追加し(生成する代わりに)、finish_bundle
でそれらの要素に対してバッチクエリを実行し、すべての結果を生成できます。finish_bundle
から生成された要素は、apache_beam.utils.windowed_value.WindowedValue
型である必要があります。タイムスタンプをUnixタイムスタンプとして提供する必要があります。これは、関連する処理済み要素から取得できます。ウィンドウも提供する必要があります。これは、以下の例のように、関連する処理済み要素から取得できます。DoFn.teardown()
:DoFn
インスタンスがシャットダウンしているときに、DoFn
インスタンスごとに1回(ベストエフォートとして)呼び出されます。これは、データベースインスタンスのクローズ、ネットワーク接続のクローズ、その他のリソースの解放に適した場所です。teardown
はベストエフォートとして呼び出され、保証されていません。たとえば、ワーカーがクラッシュした場合、teardown
は呼び出されない可能性があります。
既知の問題
- [Issue 19394]
DoFn.teardown()
のメトリクスが失われます。
関連する変換
- マップは同じ動作をしますが、各入力に対して正確に1つの出力を生成します。
- FlatMapは
Map
と同じ動作をしますが、各入力に対してゼロ個以上の出力を生成できます。 - フィルタは、関数が要素を出力するかどうかを決定する場合に役立ちます。
![]() |
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべて役立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!