Beam YAML 変換インデックス

AssertEqual

入力に指定された要素が正確に含まれていることをアサートします。

これは主にテストに使用されます。この変換への入力が、configパラメータで指定されたelementsのセットと正確に一致しない場合、パイプライン全体が失敗します。

Createと同様に、YAML/JSON形式のマッピングはBeam行として解釈されます。例:

type: AssertEqual
input: SomeTransform
config:
  elements:
     - {a: 0, b: "foo"}
     - {a: 1, b: "bar"}

は、SomeTransformが正確に2つの要素(値はそれぞれ(a=0, b="foo")(a=1, b="bar"))を生成することを保証します。

設定

使用方法

type: AssertEqual
input: ...
config:
  elements:
  - element
  - element
  - ...

AssignTimestamps

入力の各要素に新しいタイムスタンプを割り当てます。

これは、タイムスタンプが埋め込まれたレコードを読み取る場合(たとえば、さまざまなファイルタイプや、デフォルトですべてのタイムスタンプを無限の過去に設定するその他のソースなど)に役立ちます。

タイムスタンプは前方へのみ設定する必要があります。後方に設定すると、既に進んだウォーターマークを保持できなくなり、データがドロップ可能に遅れる可能性があります。

サポートされている言語:汎用、JavaScript、Python

設定

使用方法

type: AssignTimestamps
input: ...
config:
  timestamp: timestamp
  language: "language"
  error_handling:
    output: "output"

Combine

共通のフィールドを共有するレコードをグループ化して結合します。

組み込みの結合関数は、summaxminallanymeancountgroupconcatですが、カスタム集計関数も使用できます。

YAML集計に関するドキュメントも参照してください。YAML集計

サポートされている言語:Calcite、汎用、JavaScript、Python、SQL

設定

使用方法

type: Combine
input: ...
config:
  group_by:
  - "group_by"
  - "group_by"
  - ...
  combine:
    a:
      a: combine_value_a_value_a
      b: combine_value_a_value_b
      c: ...
    b:
      a: combine_value_b_value_a
      b: combine_value_b_value_b
      c: ...
    c: ...
  language: "language"

Create

指定された要素セットを含むコレクションを作成します。

この変換は常にスキーマ付きデータを出力します。たとえば

type: Create
config:
  elements: [1, 2, 3]

は、Row(element=int)のスキーマを持つ3つの要素を出力しますが、YAML/JSON形式のマッピングはBeam行として直接解釈されます。例:

type: Create
config:
  elements:
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

は(int, Row(string, List[int]))形式のスキーマになります。

これはYAMLとして表現することもできます。

type: Create
config:
  elements:
    - first: 0
      second:
        str: "foo"
         values: [1, 2, 3]
    - first: 1
      second:
        str: "bar"
         values: [4, 5, 6]

設定

使用方法

type: Create
config:
  elements:
  - element
  - element
  - ...
  reshuffle: true|false

Explode

1つ以上のフィールドを展開(別名アンネスト/フラット化)して複数の行を生成します。

反復可能な型の1つ以上のフィールドが与えられると、そのフィールドの各値に対して1つの行を生成します。たとえば、('a', [1, 2, 3])形式の行は、2番目のフィールドで展開されると、('a', 1)('a', 2')('a', 3)に展開されます。

これは、MapToFields変換と組み合わせたFlatMapに似ています。

YAMLマッピング関数に関するより完全なドキュメントを参照してください。YAMLマッピング関数

設定

使用方法

type: Explode
input: ...
config:
  fields: fields
  cross_product: true|false
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...

Filter

指定された条件を満たすレコードのみを保持します。

YAMLフィルタリングに関するより完全なドキュメントを参照してください。YAMLフィルタリング

サポートされている言語:Calcite、汎用、Java、JavaScript、Python、SQL

設定

使用方法

type: Filter
input: ...
config:
  keep: keep
  language: "language"
  error_handling:
    output: "output"

Flatten

複数のPCollectionを単一のPCollectionにフラット化します。

結果のPCollectionの要素は、すべての入力のすべての要素の(非重複の)和集合になります。

YAML変換では、常に暗黙的にフラット化される入力のリストを取ることができます。

設定

設定パラメータはありません。

使用方法

type: Flatten
input: ...
config: ...

Join

指定された条件を使用して、2つ以上の入力を結合します。

例えば

type: Join
input:
  input1: SomeTransform
  input2: AnotherTransform
  input3: YetAnotherTransform
config:
  type: inner
  equalities:
    - input1: colA
      input2: colB
    - input2: colX
      input3: colY
  fields:
    input1: [colA, colB, colC]
    input2: {new_name: colB}

は、input1.colA = input2.colBおよびinput2.colX = input3.colYという制約を満たす3つの入力に対して内部結合を実行し、input1からのcolAcolBcolCnew_nameという名前のフィールドとしてのinput2.colBの値、およびinput3からのすべてのフィールドを含む行を出力します。

設定

使用方法

type: Join
input: ...
config:
  equalities: equalities
  type: type
  fields:
    a: fields_value_a
    b: fields_value_b
    c: ...

LogForTesting

入力PCollectionの各要素をログに記録します。

この変換の出力は、チェーンスタイルのパイプラインで使いやすくするために、入力のコピーです。

設定

使用方法

type: LogForTesting
input: ...
config:
  level: "level"
  prefix: "prefix"

MLTransform

設定

使用方法

type: MLTransform
input: ...
config:
  write_artifact_location: "write_artifact_location"
  read_artifact_location: "read_artifact_location"
  transforms:
  - transforms
  - transforms
  - ...

MapToFields

入力フィールドに基づいて定義された新しいフィールドを持つレコードを作成します。

YAMLマッピング関数に関するより完全なドキュメントを参照してください。YAMLマッピング関数

サポートされている言語:Calcite、汎用、Java、JavaScript、Python、SQL

設定

使用方法

type: MapToFields
input: ...
config:
  language: language
  error_handling:
    output: "output"
  mapping_args: mapping_args

Partition

入力をいくつかの異なる出力に分割します。

各入力要素は、by設定パラメータで指定されたフィールドまたは関数に基づいて、異なる出力に移動します。

サポートされている言語:汎用、JavaScript、Python

設定

使用方法

type: Partition
input: ...
config:
  by: by
  outputs:
  - "outputs"
  - "outputs"
  - ...
  unknown_output: "unknown_output"
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...
  language: "language"

PyTransform

完全修飾名で識別されるPython PTransform。

これにより、任意のBeam Python変換をインポート、構築、および適用できます。これは、YAMLインターフェースを介してまだ公開されていない変換を使用する場合に役立ちます。ただし、この変換がBeam行を受け入れないか生成しない場合は、変換が必要になる場合があります。

例えば

type: PyTransform
config:
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
   kwargs:
     baz: 3

を使用して、変換apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)にアクセスできます。

Pythonのインライン化に関するドキュメントも参照してください。Pythonのインライン化

設定

使用方法

type: PyTransform
input: ...
config:
  constructor: "constructor"
  args:
  - arg
  - arg
  - ...
  kwargs:
    a: kwargs_value_a
    b: kwargs_value_b
    c: ...

Sql

設定

使用方法

type: Sql
input: ...
config: ...

WindowInto

PCollectionの各要素にウィンドウを割り当てるウィンドウ変換。

割り当てられたウィンドウは、すべてのダウンストリームの集計操作に影響し、キーだけでなくウィンドウによっても集計されます。

詳細については、ウィンドウ処理に関するBeamのドキュメントを参照してください。

サイズ、オフセット、期間、ギャップ(該当する場合)は、ミリ秒、秒、分、時、日の単位を表す接尾辞'ms'、's'、'm'、'h'、または'd'を使用して定義する必要があります。時間単位が指定されていない場合、デフォルトで's'になります。

例えば

windowing:
   type: fixed
   size: 30s

任意のYaml変換には、ウィンドウ処理パラメータを含めることができ、これは入力(存在する場合)または出力(入力がない場合)に適用されます。つまり、明示的なWindowInto操作は通常必要ありません。

設定

使用方法

type: WindowInto
input: ...
config:
  windowing: windowing

ReadFromAvro

Avroファイルからレコードを読み取るためのPTransform

結果のPCollectionの各レコードには、ソースから読み取られた単一のレコードが含まれます。単純な型のレコードは、レコードの値を含む単一のrecordフィールドを持つBeam行にマッピングされます。Avro型RECORDのレコードは、それらのレコードを含むAvroファイルに含まれるスキーマに準拠したBeam行にマッピングされます。

設定

使用方法

type: ReadFromAvro
config:
  path: path

WriteToAvro

Avroファイルを作成するためのPTransform

入力にスキーマがある場合、対応するAvroスキーマが自動的に生成され、出力レコードの書き込みに使用されます。

設定

使用方法

type: WriteToAvro
input: ...
config:
  path: path

ReadFromBigQuery

BigQueryからデータを読み込みます。

tableまたはqueryのいずれか1つのみを設定する必要があります。queryが設定されている場合、row_restrictionfieldsは設定しないでください。

設定

使用方法

type: ReadFromBigQuery
config:
  table: "table"
  query: "query"
  row_restriction: "row_restriction"
  fields:
  - "field"
  - "field"
  - ...

WriteToBigQuery

設定

使用方法

type: WriteToBigQuery
input: ...
config: ...

ReadFromCsv

コンマ区切り値(CSV)ファイルをPCollectionに読み込むためのPTransformです。

設定

使用方法

type: ReadFromCsv
config:
  path: "path"
  delimiter: delimiter
  comment: comment

WriteToCsv

スキーマのあるPCollectionをコンマ区切り値(CSV)ファイル(の集合)として書き込むためのPTransformです。

設定

使用方法

type: WriteToCsv
input: ...
config:
  path: "path"
  delimiter: delimiter

ReadFromJdbc

設定

使用方法

type: ReadFromJdbc
config: ...

WriteToJdbc

設定

使用方法

type: WriteToJdbc
input: ...
config: ...

ReadFromJson

ファイルからJSON値をPCollectionに読み込むためのPTransformです。

設定

使用方法

type: ReadFromJson
config:
  path: "path"

WriteToJson

PCollectionをファイルにJSON値として書き込むためのPTransformです。

設定

使用方法

type: WriteToJson
input: ...
config:
  path: "path"

ReadFromKafka

設定

使用方法

type: ReadFromKafka
config: ...

WriteToKafka

設定

使用方法

type: WriteToKafka
input: ...
config: ...

ReadFromMySql

設定

使用方法

type: ReadFromMySql
config: ...

WriteToMySql

設定

使用方法

type: WriteToMySql
input: ...
config: ...

ReadFromOracle

設定

使用方法

type: ReadFromOracle
config: ...

WriteToOracle

設定

使用方法

type: WriteToOracle
input: ...
config: ...

ReadFromParquet

Parquetファイルを読み込むためのPTransformです。

設定

使用方法

type: ReadFromParquet
config:
  path: path

WriteToParquet

Parquetファイル書き込むためのPTransformです。

設定

使用方法

type: WriteToParquet
input: ...
config:
  path: path

ReadFromPostgres

設定

使用方法

type: ReadFromPostgres
config: ...

WriteToPostgres

設定

使用方法

type: WriteToPostgres
input: ...
config: ...

ReadFromPubSub

Cloud Pub/Subからメッセージを読み込みます。

設定

使用方法

type: ReadFromPubSub
config:
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

WriteToPubSub

Cloud Pub/Subにメッセージを書き込みます。

設定

使用方法

type: WriteToPubSub
input: ...
config:
  topic: "topic"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

ReadFromPubSubLite

設定

使用方法

type: ReadFromPubSubLite
config: ...

WriteToPubSubLite

設定

使用方法

type: WriteToPubSubLite
input: ...
config: ...

ReadFromSpanner

設定

使用方法

type: ReadFromSpanner
config: ...

WriteToSpanner

設定

使用方法

type: WriteToSpanner
input: ...
config: ...

ReadFromSqlServer

設定

使用方法

type: ReadFromSqlServer
config: ...

WriteToSqlServer

設定

使用方法

type: WriteToSqlServer
input: ...
config: ...

ReadFromText

テキストファイルから行を読み込みます。

結果のPCollectionは、「line」という名前の単一の文字列フィールドを持つ行で構成されます。

設定

使用方法

type: ReadFromText
config:
  path: "path"

WriteToText

PCollectionをテキストファイル(の集合)に書き込みます。

入力は、スキーマに正確に1つのフィールドを持つPCollectionである必要があります。

設定

使用方法

type: WriteToText
input: ...
config:
  path: "path"