Beam YAML API

Beam YAMLは、YAMLファイルを使用してApache Beamパイプラインを記述するための宣言的な構文です。Beam YAMLを使用すると、コードを記述せずにBeamパイプラインを作成および実行できます。

概要

Beamは、高度なデータ処理パイプラインを作成するための強力なモデルを提供します。ただし、Beamプログラミングを開始するには、サポートされているBeam SDK言語のいずれかでコードを記述する必要があるため、難しい場合があります。APIを理解し、プロジェクトを設定し、依存関係を管理し、その他のプログラミングタスクを実行する必要があります。

Beam YAMLを使用すると、Beamパイプラインの作成を簡単に開始できます。コードを記述する代わりに、テキストエディターを使用してYAMLファイルを作成します。次に、YAMLファイルをランナーによって実行されるように送信します。

Beam YAML構文は、人間が読みやすいように設計されていますが、ツールのための中間表現としても適しています。たとえば、パイプライン作成GUIはYAMLを出力でき、リネージ分析ツールはYAMLパイプライン仕様を使用できます。

Beam YAMLはまだ開発中ですが、すでに含まれている機能はすべて安定していると見なされます。フィードバックはdev@apache.beam.orgまでお寄せください。

前提条件

Beam YAMLパーサーは現在、Apache Beam Python SDKの一部として含まれています。Beam YAMLを使用するためにPythonコードを記述する必要はありませんが、パイプラインをローカルで実行するにはSDKが必要です。

すべてのパッケージが隔離された自己完結型の環境にインストールされるように、仮想環境を作成することをお勧めします。Python環境を設定したら、次のようにSDKをインストールします。

pip install apache_beam[yaml,gcp]

さらに、SQL変換など、提供される変換のいくつかはJavaで実装されており、動作するJavaインタープリターが必要です。これらの変換を使用してパイプラインを実行すると、必要なアーティファクトがApache Mavenリポジトリから自動的にダウンロードされます。

始めましょう

テキストエディターを使用して、pipeline.yamlという名前のファイルを作成します。次のテキストをファイルに貼り付けて保存します。

pipeline:
  transforms:
    - type: Create
      config:
        elements: [1, 2, 3]
    - type: LogForTesting
      input: Create

このファイルは、2つの変換を含む単純なパイプラインを定義します。

パイプラインを実行する

パイプラインを実行するには、次のPythonコマンドを実行します。

python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml

出力には、次のようなログステートメントが含まれている必要があります。

INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}

Dataflowでパイプラインを実行する

gcloud CLIを使用して、YAMLパイプラインをDataflowに送信できます。YAMLファイルからDataflowジョブを作成するには、gcloud dataflow yaml runコマンドを使用します。

gcloud dataflow yaml run $JOB_NAME \
  --yaml-pipeline-file=pipeline.yaml \
  --region=$REGION

gcloudCLIを使用する場合、Beam SDKをローカルにインストールする必要はありません。

パイプラインを可視化する

apache_beam.runners.renderモジュールを使用して、パイプライン実行グラフをPNGファイルとしてレンダリングできます。以下を参照してください。

  1. Graphvizをインストールします。

  2. 次のコマンドを実行します。

    python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \
      --runner=apache_beam.runners.render.RenderRunner \
      --render_output=out.png
    

例:CSVデータの読み取り

次のパイプラインは、一連のCSVファイルからデータを読み取り、データをJSON形式で書き込みます。このパイプラインは、CSVファイルにヘッダー行があることを前提としています。列名はJSONフィールド名になります。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: ReadFromCsv

フィルターを追加する

Filter変換はレコードをフィルタリングします。ブール述語を満たす入力レコードを保持し、述語を満たさないレコードを破棄します。次の例では、col3の値が100より大きいレコードを保持します。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Filter

マッピング関数を追加する

Beam YAMLは、さまざまなマッピング関数をサポートしています。次の例では、Sql変換を使用してcol1でグループ化し、各キーのカウントを出力します。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: Sql
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: Filter
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Sql

パターン

このセクションでは、Beam YAMLの一般的なパターンについて説明します。

名前付き変換

パイプライン内の変換に名前を付けると、監視とデバッグに役立ちます。名前は、パイプラインに同じタイプの変換が複数含まれている場合に、変換を区別するためにも使用されます。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadMyData
      config:
        path: /path/to/input*.csv
    - type: Filter
      name: KeepBigRecords
      config:
        language: python
        keep: "col3 > 100"
      input: ReadMyData
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: KeepBigRecords
    - type: WriteToJson
      name: WriteTheOutput
      config:
        path: /path/to/output.json
      input: MySqlTransform

変換の連鎖

パイプラインが線形(分岐またはマージなし)の場合、パイプラインをchainタイプとして指定できます。chainタイプのパイプラインでは、入力を指定する必要はありません。入力は、YAMLファイルに表示される順序から暗黙的に指定されます。

pipeline:
  type: chain

  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
    - type: WriteToJson
      config:
        path: /path/to/output.json

ソースおよびシンク変換

構文糖として、パイプラインの最初と最後の変換にsourcesinkという名前を付けることができます。この規則は、結果のパイプラインを変更しませんが、ソースおよびシンク変換の目的を示します。

pipeline:
  type: chain

  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"

    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

非線形パイプライン

Beam YAMLは、任意の非線形パイプラインをサポートしています。次のパイプラインは、2つのソースを読み取り、それらを結合し、2つの出力を書き込みます。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: Filter
      name: FilterToBig
      input: Sql
      config:
        language: python
        keep: "col2 > 100"

    - type: WriteToCsv
      name: WriteBig
      input: FilterToBig
      config:
        path: /path/to/big.csv

パイプラインが線形ではないため、各変換の入力を明示的に宣言する必要があります。ただし、非線形パイプライン内にchainをネストすることができます。チェーンは、パイプライン内の線形サブパスです。

次の例では、ExtraProcessingForBigRowsという名前のチェーンを作成します。チェーンは、Sql変換から入力を受け取り、いくつかの追加のフィルターとシンクを適用します。チェーン内では、入力を指定する必要がないことに注意してください。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: chain
      name: ExtraProcessingForBigRows
      input: Sql
      transforms:
        - type: Filter
          config:
            language: python
            keep: "col2 > 100"
        - type: Filter
          config:
            language: python
            keep: "len(col1) > 10"
        - type: Filter
          config:
            language: python
            keep: "col1 > 'z'"
      sink:
        type: WriteToCsv
        config:
          path: /path/to/big.csv

ウィンドウ処理

このAPIは、ストリーミングパイプラインとバッチパイプラインの両方を定義するために使用できます。ストリーミングパイプラインで要素を意味のある方法で集計するには、通常、何らかのウィンドウ処理が必要です。Beamのウィンドウ処理トリガーは、他のすべてのBeam SDKで使用可能な同じWindowInto変換を使用して宣言できます。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: JSON
        schema:
          type: object
          properties:
            col1: {type: string}
            col2: {type: integer}
            col3: {type: number}
    - type: WindowInto
      windowing:
        type: fixed
        size: 60s
    - type: SomeGroupingTransform
      config:
        arg: ...
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

明示的なWindowInto操作を使用するのではなく、変換に指定されたウィンドウ処理でタグ付けすることができます。これにより、その入力(したがって変換自体)がそのウィンドウ処理で適用されます。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: SomeGroupingTransform
      config:
        arg: ...
      windowing:
        type: sliding
        size: 60s
        period: 10s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

Sql操作自体がしばしば集計の形式であり、ウィンドウ処理を適用する(またはすでにウィンドウ処理された入力を消費する)と、すべてのグループ化がウィンドウごとに行われることに注意してください。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
      windowing:
        type: sessions
        gap: 60s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

指定されたウィンドウ処理はすべての入力に適用され、この場合はウィンドウごとに結合が行われます。

pipeline:
  transforms:
    - type: ReadFromPubSub
      name: ReadLeft
      config:
        topic: leftTopic
        format: ...
        schema: ...

    - type: ReadFromPubSub
      name: ReadRight
      config:
        topic: rightTopic
        format: ...
        schema: ...

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight
      windowing:
        type: fixed
        size: 60s
options:
  streaming: true

入力を持たない変換の場合、指定されたウィンドウ処理は代わりにその出力に適用されます。Beamモデルに従い、ウィンドウ処理はその後、すべての消費オペレーションに継承されます。これは、Readのようなルートオペレーションで特に役立ちます。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

パイプライン(またはコンポジット)のトップレベルでウィンドウ処理を指定することもできます。これは、自身のウィンドウ処理を指定していないすべてのルートオペレーションに同じウィンドウ処理を適用するためのショートハンドです。このアプローチは、パイプライン全体にウィンドウ処理を適用する効果的な方法です。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
  windowing:
    type: fixed
    size: 60
options:
  streaming: true

これらのウィンドウ処理の指定はすべて、sourceおよびsink構文とも互換性があることに注意してください。

pipeline:
  type: chain

  source:
    type: ReadFromPubSub
    config:
      topic: myPubSubTopic
      format: ...
      schema: ...
    windowing:
      type: fixed
      size: 10s

  transforms:
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"

  sink:
    type: WriteToCsv
    config:
      path: /path/to/output.json
    windowing:
      type: fixed
      size: 5m

options:
  streaming: true

プロバイダー

私たちは組み込み変換の豊富なスイートを提供することを目指していますが、人々が独自の変換を作成したいと思うのは避けられません。これは、拡張サービスとスキーマ変換を活用するプロバイダーの概念によって可能になります。

たとえば、クロス言語変換またはスキーマ変換を提供するjarを構築し、次のように変換で使用できます。

pipeline:
  type: chain
  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: MyCustomTransform
      config:
        arg: whatever

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

providers:
  - type: javaJar
    config:
       jar: /path/or/url/to/myExpansionService.jar
    transforms:
       MyCustomTransform: "urn:registered:in:expansion:service"

任意のPython変換も、次の構文を使用して提供できます。

providers:
  - type: pythonPackage
    config:
       packages:
           - my_pypi_package>=version
           - /path/to/local/package.zip
    transforms:
       MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"

パイプラインオプション

パイプラインオプションは、パイプラインを実行するパイプラインランナーや、選択したランナーに必要なランナー固有の構成など、パイプラインのさまざまな側面を構成するために使用されます。パイプラインオプションを設定するには、yamlファイルの最後にオプションブロックを追加します。例:

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

その他のリソース