Beam YAML API
Beam YAMLは、YAMLファイルを使用してApache Beamパイプラインを記述するための宣言的な構文です。Beam YAMLを使用すると、コードを記述せずにBeamパイプラインを作成および実行できます。
概要
Beamは、高度なデータ処理パイプラインを作成するための強力なモデルを提供します。ただし、Beamプログラミングを開始するには、サポートされているBeam SDK言語のいずれかでコードを記述する必要があるため、難しい場合があります。APIを理解し、プロジェクトを設定し、依存関係を管理し、その他のプログラミングタスクを実行する必要があります。
Beam YAMLを使用すると、Beamパイプラインの作成を簡単に開始できます。コードを記述する代わりに、テキストエディターを使用してYAMLファイルを作成します。次に、YAMLファイルをランナーによって実行されるように送信します。
Beam YAML構文は、人間が読みやすいように設計されていますが、ツールのための中間表現としても適しています。たとえば、パイプライン作成GUIはYAMLを出力でき、リネージ分析ツールはYAMLパイプライン仕様を使用できます。
Beam YAMLはまだ開発中ですが、すでに含まれている機能はすべて安定していると見なされます。フィードバックはdev@apache.beam.orgまでお寄せください。
前提条件
Beam YAMLパーサーは現在、Apache Beam Python SDKの一部として含まれています。Beam YAMLを使用するためにPythonコードを記述する必要はありませんが、パイプラインをローカルで実行するにはSDKが必要です。
すべてのパッケージが隔離された自己完結型の環境にインストールされるように、仮想環境を作成することをお勧めします。Python環境を設定したら、次のようにSDKをインストールします。
pip install apache_beam[yaml,gcp]
さらに、SQL変換など、提供される変換のいくつかはJavaで実装されており、動作するJavaインタープリターが必要です。これらの変換を使用してパイプラインを実行すると、必要なアーティファクトがApache Mavenリポジトリから自動的にダウンロードされます。
始めましょう
テキストエディターを使用して、pipeline.yaml
という名前のファイルを作成します。次のテキストをファイルに貼り付けて保存します。
pipeline:
transforms:
- type: Create
config:
elements: [1, 2, 3]
- type: LogForTesting
input: Create
このファイルは、2つの変換を含む単純なパイプラインを定義します。
Create
変換はコレクションを作成します。config
の値は、構成設定の辞書です。この場合、elements
はコレクションのメンバーを指定します。他の変換タイプには、他の構成設定があります。LogForTesting
変換は、各入力要素をログに記録します。この変換には、config
設定は必要ありません。input
キーは、LogForTesting
がCreate
変換から入力を受け取ることを指定します。
パイプラインを実行する
パイプラインを実行するには、次のPythonコマンドを実行します。
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml
出力には、次のようなログステートメントが含まれている必要があります。
INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}
Dataflowでパイプラインを実行する
gcloud CLIを使用して、YAMLパイプラインをDataflowに送信できます。YAMLファイルからDataflowジョブを作成するには、gcloud dataflow yaml run
コマンドを使用します。
gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file=pipeline.yaml \
--region=$REGION
gcloud
CLIを使用する場合、Beam SDKをローカルにインストールする必要はありません。
パイプラインを可視化する
apache_beam.runners.render
モジュールを使用して、パイプライン実行グラフをPNGファイルとしてレンダリングできます。以下を参照してください。
Graphvizをインストールします。
次のコマンドを実行します。
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \ --runner=apache_beam.runners.render.RenderRunner \ --render_output=out.png
例:CSVデータの読み取り
次のパイプラインは、一連のCSVファイルからデータを読み取り、データをJSON形式で書き込みます。このパイプラインは、CSVファイルにヘッダー行があることを前提としています。列名はJSONフィールド名になります。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: WriteToJson
config:
path: /path/to/output.json
input: ReadFromCsv
フィルターを追加する
Filter
変換はレコードをフィルタリングします。ブール述語を満たす入力レコードを保持し、述語を満たさないレコードを破棄します。次の例では、col3
の値が100より大きいレコードを保持します。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: WriteToJson
config:
path: /path/to/output.json
input: Filter
マッピング関数を追加する
Beam YAMLは、さまざまなマッピング関数をサポートしています。次の例では、Sql
変換を使用してcol1
でグループ化し、各キーのカウントを出力します。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: Sql
パターン
このセクションでは、Beam YAMLの一般的なパターンについて説明します。
名前付き変換
パイプライン内の変換に名前を付けると、監視とデバッグに役立ちます。名前は、パイプラインに同じタイプの変換が複数含まれている場合に、変換を区別するためにも使用されます。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadMyData
config:
path: /path/to/input*.csv
- type: Filter
name: KeepBigRecords
config:
language: python
keep: "col3 > 100"
input: ReadMyData
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: KeepBigRecords
- type: WriteToJson
name: WriteTheOutput
config:
path: /path/to/output.json
input: MySqlTransform
変換の連鎖
パイプラインが線形(分岐またはマージなし)の場合、パイプラインをchain
タイプとして指定できます。chain
タイプのパイプラインでは、入力を指定する必要はありません。入力は、YAMLファイルに表示される順序から暗黙的に指定されます。
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
config:
path: /path/to/output.json
ソースおよびシンク変換
構文糖として、パイプラインの最初と最後の変換にsource
とsink
という名前を付けることができます。この規則は、結果のパイプラインを変更しませんが、ソースおよびシンク変換の目的を示します。
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
config:
path: /path/to/output.json
非線形パイプライン
Beam YAMLは、任意の非線形パイプラインをサポートしています。次のパイプラインは、2つのソースを読み取り、それらを結合し、2つの出力を書き込みます。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: Filter
name: FilterToBig
input: Sql
config:
language: python
keep: "col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
config:
path: /path/to/big.csv
パイプラインが線形ではないため、各変換の入力を明示的に宣言する必要があります。ただし、非線形パイプライン内にchain
をネストすることができます。チェーンは、パイプライン内の線形サブパスです。
次の例では、ExtraProcessingForBigRows
という名前のチェーンを作成します。チェーンは、Sql
変換から入力を受け取り、いくつかの追加のフィルターとシンクを適用します。チェーン内では、入力を指定する必要がないことに注意してください。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: Filter
config:
language: python
keep: "col2 > 100"
- type: Filter
config:
language: python
keep: "len(col1) > 10"
- type: Filter
config:
language: python
keep: "col1 > 'z'"
sink:
type: WriteToCsv
config:
path: /path/to/big.csv
ウィンドウ処理
このAPIは、ストリーミングパイプラインとバッチパイプラインの両方を定義するために使用できます。ストリーミングパイプラインで要素を意味のある方法で集計するには、通常、何らかのウィンドウ処理が必要です。Beamのウィンドウ処理とトリガーは、他のすべてのBeam SDKで使用可能な同じWindowInto
変換を使用して宣言できます。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: JSON
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: SomeGroupingTransform
config:
arg: ...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
明示的なWindowInto
操作を使用するのではなく、変換に指定されたウィンドウ処理でタグ付けすることができます。これにより、その入力(したがって変換自体)がそのウィンドウ処理で適用されます。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeGroupingTransform
config:
arg: ...
windowing:
type: sliding
size: 60s
period: 10s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
Sql
操作自体がしばしば集計の形式であり、ウィンドウ処理を適用する(またはすでにウィンドウ処理された入力を消費する)と、すべてのグループ化がウィンドウごとに行われることに注意してください。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
指定されたウィンドウ処理はすべての入力に適用され、この場合はウィンドウごとに結合が行われます。
pipeline:
transforms:
- type: ReadFromPubSub
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
windowing:
type: fixed
size: 60s
options:
streaming: true
入力を持たない変換の場合、指定されたウィンドウ処理は代わりにその出力に適用されます。Beamモデルに従い、ウィンドウ処理はその後、すべての消費オペレーションに継承されます。これは、Readのようなルートオペレーションで特に役立ちます。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
パイプライン(またはコンポジット)のトップレベルでウィンドウ処理を指定することもできます。これは、自身のウィンドウ処理を指定していないすべてのルートオペレーションに同じウィンドウ処理を適用するためのショートハンドです。このアプローチは、パイプライン全体にウィンドウ処理を適用する効果的な方法です。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
windowing:
type: fixed
size: 60
options:
streaming: true
これらのウィンドウ処理の指定はすべて、source
およびsink
構文とも互換性があることに注意してください。
pipeline:
type: chain
source:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10s
transforms:
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
config:
path: /path/to/output.json
windowing:
type: fixed
size: 5m
options:
streaming: true
プロバイダー
私たちは組み込み変換の豊富なスイートを提供することを目指していますが、人々が独自の変換を作成したいと思うのは避けられません。これは、拡張サービスとスキーマ変換を活用するプロバイダーの概念によって可能になります。
たとえば、クロス言語変換またはスキーマ変換を提供するjarを構築し、次のように変換で使用できます。
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
config:
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
任意のPython変換も、次の構文を使用して提供できます。
providers:
- type: pythonPackage
config:
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
パイプラインオプション
パイプラインオプションは、パイプラインを実行するパイプラインランナーや、選択したランナーに必要なランナー固有の構成など、パイプラインのさまざまな側面を構成するために使用されます。パイプラインオプションを設定するには、yamlファイルの最後にオプションブロックを追加します。例:
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true