概要: 新しいI/Oコネクタの開発
組み込みI/Oコネクタでサポートされていないデータストアに接続する必要があるユーザー向けのガイド
Beamの既存のI/Oコネクタでサポートされていないデータストアに接続するには、カスタムI/Oコネクタを作成する必要があります。コネクタは通常、ソースとシンクで構成されます。すべてのBeamソースとシンクは複合変換ですが、カスタムI/Oの実装はユースケースによって異なります。開始するための推奨手順を以下に示します。
この概要を読み、実装を選択してください。質問がある場合は、Beam開発メーリングリストにメールで問い合わせることができます。また、他の誰かが同じI/Oコネクタに取り組んでいないかを確認できます。
I/OコネクタをBeamコミュニティに貢献する予定がある場合は、Apache Beam貢献ガイドを参照してください。
追加のスタイルガイドの推奨事項については、PTransformスタイルガイドを参照してください。
ソース
境界付き(バッチ)ソースの場合、現在、Beamソースを作成するための2つのオプションがあります。
Splittable DoFn
を使用します。ParDo
とGroupByKey
を使用します。
Splittable DoFn
は、境界付きソースと境界なしソースの両方にとって最新のソースフレームワークであるため、推奨されるオプションです。これは、新しいシステムでSource
API(BoundedSourceおよびUnboundedSource)を置き換えることを目的としています。Splittable DoFnの書き方については、分割可能なDoFnプログラミングガイドをお読みください。詳細については、マルチSDKコネクタの取り組みに関するロードマップを参照してください。
JavaおよびPythonの境界なし(ストリーミング)ソースの場合、チェックポイント、ウォーターマークの制御、バックログの追跡など、ストリーミングパイプラインに役立つ機能をサポートするSplittable DoFn
を使用する必要があります。
分割可能なDoFnインターフェースを使用するタイミング
Splittable DoFn
を使用するかどうか不明な場合は、お気軽にBeam開発メーリングリストにメールで問い合わせください。具体的なケースの長所と短所について話し合うことができます。
場合によっては、Splittable DoFn
を実装することが必要になったり、パフォーマンスが向上したりする可能性があります。
境界なしソース:
ParDo
は境界なしソースからの読み取りには機能しません。ParDo
は、ストリーミングデータソースに役立つチェックポイントや重複排除などのメカニズムをサポートしていません。進捗状況とサイズの推定:
ParDo
は、読み取っているデータの進捗状況やサイズに関するヒントをランナーに提供できません。データのサイズや読み取りの進捗状況の見積もりがないと、ランナーは読み取りがどのくらいの大きさになるかを推測する方法がありません。したがって、ランナーがワーカーを動的に割り当てようとしても、パイプラインに必要なワーカーの数に関する手がかりはありません。動的な作業のリバランス:
ParDo
は、一部のリーダーがジョブの処理速度を向上させるために使用する動的な作業のリバランスをサポートしていません。データソースによっては、動的な作業のリバランスができない場合があります。並列処理を増やすための最初の分割:
ParDo
には、最初の分割を実行する機能がありません。
たとえば、ファイルごとに多数のレコードを含む新しいファイル形式から読み取る場合や、ソートされたキー順での読み取り操作をサポートするキー値ストアから読み取る場合などです。
SDFを使用したI/Oの例
Javaの例
- Kafka: Apache Kafka(オープンソースの分散イベントストリーミングプラットフォーム)用のI/Oコネクタ。
- Watch: 入力ごとに、出力の成長するセットを生成するポーリング関数を使用し、入力ごとの終了条件が満たされるまで処理を継続します。
- Parquet: Apache Parquet(オープンソースの列指向ストレージ形式)用のI/Oコネクタです。
- HL7v2: HL7v2メッセージ(組織内で発生するイベントに関するデータを提供する臨床メッセージ形式)用のI/Oコネクタで、Google Cloud Healthcare APIの一部です。
- BoundedSource ラッパー: 既存のBoundedSource実装を分割可能なDoFnに変換するラッパーです。
- UnboundedSource ラッパー: 既存のUnboundedSource実装を分割可能なDoFnに変換するラッパーです。
Pythonの例
- BoundedSourceWrapper: 既存のBoundedSource実装を分割可能なDoFnに変換するラッパーです。
ParDoとGroupByKeyの使用
データを並行して読み取ることができるデータストアまたはファイル形式の場合、プロセスをミニパイプラインとして考えることができます。これは多くの場合、次の2つのステップで構成されます。
並列で読み取るためにデータを分割する
それらの各部分から読み取る
これらの各ステップはParDo
になり、その間にGroupByKey
が挟まります。GroupByKey
は実装の詳細ですが、ほとんどのランナーでは、GroupByKey
を使用すると、状況によってはランナーが異なる数のワーカーを使用できるようになります。
読み取るデータをチャンクに分割する方法を決定する
データの読み取り(多くの場合、より多くのワーカーがいると有利になります)
さらに、GroupByKey
を使用すると、その機能をサポートするランナーで動的なワークリバランスも可能になります。
以下は、データを並列に読み取れる場合に、「ミニパイプラインとして読み取る」モデルを使用する読み取り変換の実装例です。
ファイルglobからの読み取り:たとえば、「~/data/**」内のすべてのファイルを読み取ります。
- ファイルパス取得
ParDo
:入力として、ファイルglobを受け取ります。それぞれがファイルパスである文字列のPCollection
を生成します。 - 読み取り
ParDo
:ファイルパスのPCollection
が与えられたら、それぞれを読み取り、レコードのPCollection
を生成します。
- ファイルパス取得
NoSQLデータベースからの読み取り(Apache HBaseなど):これらのデータベースでは、多くの場合、範囲を並列に読み取ることができます。
- キー範囲決定
ParDo
:入力として、データベースの接続情報と読み取るキー範囲を受け取ります。効率的に並列に読み取ることができるキー範囲のPCollection
を生成します。 - キー範囲読み取り
ParDo
:キー範囲のPCollection
が与えられたら、キー範囲を読み取り、レコードのPCollection
を生成します。
- キー範囲決定
並列読み取りができないデータストアまたはファイルの場合、読み取りは単一のParDo
+GroupByKey
で完了できる単純なタスクです。例えば
データベースクエリからの読み取り:従来のSQLデータベースクエリは、多くの場合、シーケンスでしか読み取ることができません。この場合、
ParDo
はデータベースへの接続を確立し、レコードのバッチを読み取り、それらのレコードのPCollection
を生成します。gzipファイルからの読み取り:gzipファイルは順番に読み取る必要があり、読み取りを並列化することはできません。この場合、
ParDo
はファイルを開いて順番に読み取り、ファイルからレコードのPCollection
を生成します。
シンク
Beamシンクを作成するには、受信したレコードをデータストアに書き込むParDo
を使用することをお勧めします。より複雑なシンク(たとえば、ランナーが再試行したときにデータの重複排除をサポートするなど)を開発するには、ParDo
、GroupByKey
、およびその他の利用可能なBeam変換を使用します。多くのデータサービスは、要素のバッチを一度に書き込むように最適化されているため、書き込み前に要素をバッチにグループ化することが理にかなっている場合があります。永続的な接続は、すべての要素を受信するたびにではなく、DoFnのsetUp
またはstartBundle
メソッドで初期化することもできます。大規模な分散システムでは、作業が失敗したり、再試行されたりする可能性があるため、外部とのやり取りは可能な限りべき等にすることが望ましいことにも注意する必要があります。
ファイルベースのシンクの場合、JavaおよびPython SDKの両方で提供されるFileBasedSink
抽象化を使用できます。BeamのFileSystems
ユーティリティクラスは、ファイルの読み書きにも役立ちます。詳細については、言語固有の実装ガイドを参照してください。
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべてが役立ち、明確でしたか?何か変更したいことはありますか?ぜひ教えてください!