Beam YAML 変換インデックス
AssertEqual
入力に指定された要素が正確に含まれていることをアサートします。
これは主にテストに使用されます。この変換への入力が、configパラメータで指定されたelementsのセットと正確に一致しない場合、パイプライン全体が失敗します。
Createと同様に、YAML/JSON形式のマッピングはBeam行として解釈されます。例:
type: AssertEqual
input: SomeTransform
config:
elements:
- {a: 0, b: "foo"}
- {a: 1, b: "bar"}
は、SomeTransformが正確に2つの要素(値はそれぞれ(a=0, b="foo")と(a=1, b="bar"))を生成することを保証します。
設定
- elements
Array[?]: PCollectionに属するべき要素のセット。YAML/JSON形式のマッピングはBeam行として解釈されます。
使用方法
type: AssertEqual
input: ...
config:
elements:
- element
- element
- ...
AssignTimestamps
入力の各要素に新しいタイムスタンプを割り当てます。
これは、タイムスタンプが埋め込まれたレコードを読み取る場合(たとえば、さまざまなファイルタイプや、デフォルトですべてのタイムスタンプを無限の過去に設定するその他のソースなど)に役立ちます。
タイムスタンプは前方へのみ設定する必要があります。後方に設定すると、既に進んだウォーターマークを保持できなくなり、データがドロップ可能に遅れる可能性があります。
サポートされている言語:汎用、JavaScript、Python
設定
-
timestamp
?(オプション) : 新しいタイムスタンプを与えるフィールド、呼び出し可能オブジェクト、または式。 -
language
string(オプション) : タイムスタンプ式の言語。 -
error_handling
Row: タイムスタンプ評価中のエラーの処理方法。Rowフィールド
- output
string
- output
使用方法
type: AssignTimestamps
input: ...
config:
timestamp: timestamp
language: "language"
error_handling:
output: "output"
Combine
共通のフィールドを共有するレコードをグループ化して結合します。
組み込みの結合関数は、sum、max、min、all、any、mean、count、group、concatですが、カスタム集計関数も使用できます。
YAML集計に関するドキュメントも参照してください。YAML集計
サポートされている言語:Calcite、汎用、JavaScript、Python、SQL
設定
-
group_by
Array[string] -
combine
Map[string, Map[string, ?]] -
language
string(オプション)
使用方法
type: Combine
input: ...
config:
group_by:
- "group_by"
- "group_by"
- ...
combine:
a:
a: combine_value_a_value_a
b: combine_value_a_value_b
c: ...
b:
a: combine_value_b_value_a
b: combine_value_b_value_b
c: ...
c: ...
language: "language"
Create
指定された要素セットを含むコレクションを作成します。
この変換は常にスキーマ付きデータを出力します。たとえば
type: Create
config:
elements: [1, 2, 3]
は、Row(element=int)のスキーマを持つ3つの要素を出力しますが、YAML/JSON形式のマッピングはBeam行として直接解釈されます。例:
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
は(int, Row(string, List[int]))形式のスキーマになります。
これはYAMLとして表現することもできます。
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
設定
-
elements
Array[?]: PCollectionに属するべき要素のセット。YAML/JSON形式のマッピングはBeam行として解釈されます。プリミティブは、単一の"element"フィールドを持つ行にマッピングされます。 -
reshuffle
boolean(オプション) : コレクションに複数の要素がある場合に、シャッフル(作業の再分散)を行うかどうか(オプション)。デフォルトはTrue。
使用方法
type: Create
config:
elements:
- element
- element
- ...
reshuffle: true|false
Explode
1つ以上のフィールドを展開(別名アンネスト/フラット化)して複数の行を生成します。
反復可能な型の1つ以上のフィールドが与えられると、そのフィールドの各値に対して1つの行を生成します。たとえば、('a', [1, 2, 3])形式の行は、2番目のフィールドで展開されると、('a', 1)、('a', 2')、('a', 3)に展開されます。
これは、MapToFields変換と組み合わせたFlatMapに似ています。
YAMLマッピング関数に関するより完全なドキュメントを参照してください。YAMLマッピング関数
設定
-
fields
?(オプション) : 展開するフィールドのリスト。 -
cross_product
boolean(オプション) : 複数のフィールドが指定されている場合、完全な組み合わせのクロス積を生成するかどうか、または最初のフィールドの最初の要素が2番目のフィールドの最初の要素に対応するかどうかなどを示します。たとえば、(['a', 'b'], [1, 2])行は、cross_productがtrueに設定されている場合は4つの行('a', 1)、('a', 2)、('b', 1)、('b', 2)に展開されますが、falseに設定されている場合は2つの行('a', 1)と('b', 2)にのみ展開されます。複数の行が指定されている場合にのみ意味があり(かつ必要です)。 -
error_handling
Map[string, ?](オプション) : 反復処理中のエラーの処理方法。
使用方法
type: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
Filter
指定された条件を満たすレコードのみを保持します。
YAMLフィルタリングに関するより完全なドキュメントを参照してください。YAMLフィルタリング
サポートされている言語:Calcite、汎用、Java、JavaScript、Python、SQL
設定
-
keep
?(オプション) -
language
string(オプション) -
error_handling
RowRowフィールド
- output
string
- output
使用方法
type: Filter
input: ...
config:
keep: keep
language: "language"
error_handling:
output: "output"
Flatten
複数のPCollectionを単一のPCollectionにフラット化します。
結果のPCollectionの要素は、すべての入力のすべての要素の(非重複の)和集合になります。
YAML変換では、常に暗黙的にフラット化される入力のリストを取ることができます。
設定
設定パラメータはありません。
使用方法
type: Flatten
input: ...
config: ...
Join
指定された条件を使用して、2つ以上の入力を結合します。
例えば
type: Join
input:
input1: SomeTransform
input2: AnotherTransform
input3: YetAnotherTransform
config:
type: inner
equalities:
- input1: colA
input2: colB
- input2: colX
input3: colY
fields:
input1: [colA, colB, colC]
input2: {new_name: colB}
は、input1.colA = input2.colBおよびinput2.colX = input3.colYという制約を満たす3つの入力に対して内部結合を実行し、input1からのcolA、colB、colC、new_nameという名前のフィールドとしてのinput2.colBの値、およびinput3からのすべてのフィールドを含む行を出力します。
設定
-
equalities
?(オプション) : 結合条件。結合条件を満たすために等しくする必要がある列のセットのリスト。列名が同じであるすべての入力で同じ列に対して結合する単純なシナリオでは、すべての入力に対して列名をリストするのではなく、列名を等価として指定できます。 -
type
?(オプション) : 結合の種類。"inner"、"left"、"right"、"outer"という文字列値で、実行する結合の種類を指定できます。異なる結合タイプが望まれる複数の入力を結合するシナリオでは、外部結合する入力を指定します。たとえば、{outer: [input1, input2]}は、指定された条件を使用してinput1とinput2が外部結合されることを意味し、他の入力は内部結合されます。 -
fields
Map[string, ?](オプション) : 出力するフィールド。キーとして入力エイリアス、値として出力する入力内のフィールドリストを持つマッピング。マップの値は、新しいフィールド名をキー、元のフィールド名を値とする辞書(例:new_field_name: field_name)、または元の名前を持つ出力するフィールドのリスト(例:[col1, col2, col3])、または入力のすべてのフィールドが出力されることを示す'*'のいずれかになります。指定されていない場合、すべての入力からのすべてのフィールドが出力されます。
使用方法
type: Join
input: ...
config:
equalities: equalities
type: type
fields:
a: fields_value_a
b: fields_value_b
c: ...
LogForTesting
入力PCollectionの各要素をログに記録します。
この変換の出力は、チェーンスタイルのパイプラインで使いやすくするために、入力のコピーです。
設定
-
level
string(オプション) : ERROR、INFO、またはDEBUGのいずれか。対応する言語固有のログレベルにマッピングされます。 -
prefix
string(オプション) : ログに記録される要素の前に追加されるオプションの識別子。
使用方法
type: LogForTesting
input: ...
config:
level: "level"
prefix: "prefix"
MLTransform
設定
-
write_artifact_location
string(オプション) -
read_artifact_location
string(オプション) -
transforms
Array[?](オプション)
使用方法
type: MLTransform
input: ...
config:
write_artifact_location: "write_artifact_location"
read_artifact_location: "read_artifact_location"
transforms:
- transforms
- transforms
- ...
MapToFields
入力フィールドに基づいて定義された新しいフィールドを持つレコードを作成します。
YAMLマッピング関数に関するより完全なドキュメントを参照してください。YAMLマッピング関数
サポートされている言語:Calcite、汎用、Java、JavaScript、Python、SQL
設定
-
language
?(オプション) -
error_handling
RowRowフィールド
- output
string
- output
-
mapping_args
?(オプション)
使用方法
type: MapToFields
input: ...
config:
language: language
error_handling:
output: "output"
mapping_args: mapping_args
Partition
入力をいくつかの異なる出力に分割します。
各入力要素は、by設定パラメータで指定されたフィールドまたは関数に基づいて、異なる出力に移動します。
サポートされている言語:汎用、JavaScript、Python
設定
-
by
?(オプション) : この要素の宛先出力を指定するフィールド、呼び出し可能オブジェクト、または式。outputsパラメータのメンバーである文字列を返す必要があります。unknown_outputも設定されている場合、その他の戻り値も許容されます。それ以外の場合はエラーが発生します。 -
outputs
Array[string]: この入力がパーティション分割される出力のセット。 -
unknown_output
string(オプション) : (オプション) 設定されている場合、outputsパラメータにリストされていない出力に割り当てられていない要素の宛先出力を示します。 -
error_handling
Map[string, ?](オプション) : (オプション) パーティション分割中のエラーの処理方法。 -
language
string(オプション) : (オプション)by式の言語。
使用方法
type: Partition
input: ...
config:
by: by
outputs:
- "outputs"
- "outputs"
- ...
unknown_output: "unknown_output"
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
language: "language"
PyTransform
完全修飾名で識別されるPython PTransform。
これにより、任意のBeam Python変換をインポート、構築、および適用できます。これは、YAMLインターフェースを介してまだ公開されていない変換を使用する場合に役立ちます。ただし、この変換がBeam行を受け入れないか生成しない場合は、変換が必要になる場合があります。
例えば
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
を使用して、変換apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)にアクセスできます。
Pythonのインライン化に関するドキュメントも参照してください。Pythonのインライン化
設定
-
constructor
string: 変換の構築に使用される呼び出し可能オブジェクトの完全修飾名。多くの場合、apache_beam.pkg.mod.SomeClassなどのクラスですが、関数やPTransformを返すその他の呼び出し可能オブジェクトでもかまいません。 -
args
Array[?](オプション) : 位置引数として呼び出し可能オブジェクトに渡すパラメータのリスト。 -
kwargs
Map[string, ?](オプション) : キーワード引数として呼び出し可能オブジェクトに渡すパラメータのリスト。
使用方法
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- arg
- arg
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
Sql
設定
使用方法
type: Sql
input: ...
config: ...
WindowInto
PCollectionの各要素にウィンドウを割り当てるウィンドウ変換。
割り当てられたウィンドウは、すべてのダウンストリームの集計操作に影響し、キーだけでなくウィンドウによっても集計されます。
詳細については、ウィンドウ処理に関するBeamのドキュメントを参照してください。
サイズ、オフセット、期間、ギャップ(該当する場合)は、ミリ秒、秒、分、時、日の単位を表す接尾辞'ms'、's'、'm'、'h'、または'd'を使用して定義する必要があります。時間単位が指定されていない場合、デフォルトで's'になります。
例えば
windowing:
type: fixed
size: 30s
任意のYaml変換には、ウィンドウ処理パラメータを含めることができ、これは入力(存在する場合)または出力(入力がない場合)に適用されます。つまり、明示的なWindowInto操作は通常必要ありません。
設定
- windowing
?(オプション) : 実行するウィンドウ処理の種類とパラメータ。
使用方法
type: WindowInto
input: ...
config:
windowing: windowing
ReadFromAvro
Avroファイルからレコードを読み取るためのPTransform。
結果のPCollectionの各レコードには、ソースから読み取られた単一のレコードが含まれます。単純な型のレコードは、レコードの値を含む単一のrecordフィールドを持つBeam行にマッピングされます。Avro型RECORDのレコードは、それらのレコードを含むAvroファイルに含まれるスキーマに準拠したBeam行にマッピングされます。
設定
- path
?(オプション)
使用方法
type: ReadFromAvro
config:
path: path
WriteToAvro
Avroファイルを作成するためのPTransform。
入力にスキーマがある場合、対応するAvroスキーマが自動的に生成され、出力レコードの書き込みに使用されます。
設定
- path
?(オプション)
使用方法
type: WriteToAvro
input: ...
config:
path: path
ReadFromBigQuery
BigQueryからデータを読み込みます。
tableまたはqueryのいずれか1つのみを設定する必要があります。queryが設定されている場合、row_restrictionとfieldsは設定しないでください。
設定
-
table
string(オプション): 読み込むテーブル。DATASET.TABLEまたはPROJECT:DATASET.TABLEとして指定します。 -
query
string(オプション): table引数の代わりに使用するクエリ。 -
row_restriction
string(オプション): クエリ内のWHERE句と同様の、オプションのSQLテキストフィルタリングステートメント。集計はサポートされていません。最大長は1MBに制限されています。 -
fields
Array[string](オプション)
使用方法
type: ReadFromBigQuery
config:
table: "table"
query: "query"
row_restriction: "row_restriction"
fields:
- "field"
- "field"
- ...
WriteToBigQuery
設定
使用方法
type: WriteToBigQuery
input: ...
config: ...
ReadFromCsv
コンマ区切り値(CSV)ファイルをPCollectionに読み込むためのPTransformです。
設定
-
path
string: 読み込むファイルパス。パスには*や?などのglob文字を含めることができます。 -
delimiter
?(オプション) -
comment
?(オプション)
使用方法
type: ReadFromCsv
config:
path: "path"
delimiter: delimiter
comment: comment
WriteToCsv
スキーマのあるPCollectionをコンマ区切り値(CSV)ファイル(の集合)として書き込むためのPTransformです。
設定
-
path
string: 書き込むファイルパス。書き込まれるファイルは、このプレフィックスにシャード識別子(num_shardsを参照)がfile_namingパラメータに従って追加されたものになります。 -
delimiter
?(オプション)
使用方法
type: WriteToCsv
input: ...
config:
path: "path"
delimiter: delimiter
ReadFromJdbc
設定
使用方法
type: ReadFromJdbc
config: ...
WriteToJdbc
設定
使用方法
type: WriteToJdbc
input: ...
config: ...
ReadFromJson
ファイルからJSON値をPCollectionに読み込むためのPTransformです。
設定
- path
string: 読み込むファイルパス。パスには*や?などのglob文字を含めることができます。
使用方法
type: ReadFromJson
config:
path: "path"
WriteToJson
PCollectionをファイルにJSON値として書き込むためのPTransformです。
設定
- path
string: 書き込むファイルパス。書き込まれるファイルは、このプレフィックスにシャード識別子(num_shardsを参照)がfile_namingパラメータに従って追加されたものになります。
使用方法
type: WriteToJson
input: ...
config:
path: "path"
ReadFromKafka
設定
使用方法
type: ReadFromKafka
config: ...
WriteToKafka
設定
使用方法
type: WriteToKafka
input: ...
config: ...
ReadFromMySql
設定
使用方法
type: ReadFromMySql
config: ...
WriteToMySql
設定
使用方法
type: WriteToMySql
input: ...
config: ...
ReadFromOracle
設定
使用方法
type: ReadFromOracle
config: ...
WriteToOracle
設定
使用方法
type: WriteToOracle
input: ...
config: ...
ReadFromParquet
Parquetファイルを読み込むためのPTransformです。
設定
- path
?(オプション)
使用方法
type: ReadFromParquet
config:
path: path
WriteToParquet
Parquetファイル書き込むためのPTransformです。
設定
- path
?(オプション)
使用方法
type: WriteToParquet
input: ...
config:
path: path
ReadFromPostgres
設定
使用方法
type: ReadFromPostgres
config: ...
WriteToPostgres
設定
使用方法
type: WriteToPostgres
input: ...
config: ...
ReadFromPubSub
Cloud Pub/Subからメッセージを読み込みます。
設定
-
topic
string(オプション): "projects/"/topics/" の形式のCloud Pub/Subトピック。 指定されている場合、subscriptionはNoneにする必要があります。 -
subscription
string(オプション): "projects/"/subscriptions/" の形式で使用される既存のCloud Pub/Subサブスクリプション。 指定されていない場合、指定されたトピックから一時的なサブスクリプションが作成されます。指定されている場合、topicはNoneにする必要があります。 -
format
string: メッセージペイロードの期待される形式。現在サポートされている形式は- RAW: 内容がPubSubメッセージの生のバイトである単一の
payloadフィールドを持つレコードを生成します。 - AVRO: 指定されたAvroスキーマを使用してレコードを解析します。
- JSON: 指定されたJSONスキーマを使用してレコードを解析します。
- RAW: 内容がPubSubメッセージの生のバイトである単一の
-
schema
?(オプション): 指定された形式のスキーマ仕様。 -
attributes
Array[string](オプション): 値が出力メッセージに追加フィールドとしてフラット化される属性キーのリスト。たとえば、形式がrawで属性が["a", "b"]の場合、この読み込みはRow(payload=..., a=..., b=...)形式の要素を生成します。 -
attributes_map
string(オプション) -
id_attribute
string(オプション): 一意のレコード識別子として使用する、受信Pub/Subメッセージの属性。指定されている場合、この属性の値(レコードを一意に識別できる任意の文字列)が、メッセージの重複排除に使用されます。提供されていない場合、Pub/Subストリームで重複データが配信されないことを保証できません。この場合、ストリームの重複排除は厳密にベストエフォートになります。 -
timestamp_attribute
string(オプション): 要素のタイムスタンプとして使用するメッセージ値。Noneの場合、メッセージの公開時間をタイムスタンプとして使用します。タイムスタンプ値は、次の2つの形式のいずれかである必要があります。
- Unixエポックからのミリ秒数を表す数値。
- RFC 3339形式の文字列(UTCタイムゾーン)。例:
2015-10-29T23:41:41.123Z。タイムスタンプのサブ秒コンポーネントはオプションであり、最初の3桁を超える桁(つまり、ミリ秒より小さい時間単位)は無視される場合があります。
-
error_handling
RowRowフィールド
- output
string
- output
使用方法
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
WriteToPubSub
Cloud Pub/Subにメッセージを書き込みます。
設定
-
topic
string: "/topics/" の形式のCloud Pub/Subトピック。/ ". -
format
string: メッセージペイロードのフォーマット方法。現在サポートされている形式は- RAW: 属性関連のフィールドを除く、単一フィールド(内容がPubSubメッセージの生のバイトとして使用される)を持つメッセージを期待します。
- AVRO: 入力PCollectionスキーマから推論できる、指定されたAvroスキーマを使用してレコードをエンコードします。
- JSON: 入力PCollectionスキーマから推論できる、指定されたJSONスキーマを使用してレコードをフォーマットします。
-
schema
?(オプション): 指定された形式のスキーマ仕様。 -
attributes
Array[string](オプション): PubSubメッセージ属性として取り出される属性キーのリスト。たとえば、形式がrawで属性が["a", "b"]の場合、Row(any_field=..., a=..., b=...)形式の要素は、ペイロードにany_fieldの内容があり、属性がaとbの値で設定されたPubSubメッセージになります。 -
attributes_map
string(オプション) -
id_attribute
string(オプション): 設定されている場合、指定された名前と一意の値を持つ属性を各Cloud Pub/Subメッセージに設定します。この属性は、ReadFromPubSub PTransformでメッセージの重複排除に使用できます。 -
timestamp_attribute
string(オプション): 設定されている場合、指定された名前とメッセージの公開時間を値として持つ属性を各Cloud Pub/Subメッセージに設定します。 -
error_handling
RowRowフィールド
- output
string
- output
使用方法
type: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
ReadFromPubSubLite
設定
使用方法
type: ReadFromPubSubLite
config: ...
WriteToPubSubLite
設定
使用方法
type: WriteToPubSubLite
input: ...
config: ...
ReadFromSpanner
設定
使用方法
type: ReadFromSpanner
config: ...
WriteToSpanner
設定
使用方法
type: WriteToSpanner
input: ...
config: ...
ReadFromSqlServer
設定
使用方法
type: ReadFromSqlServer
config: ...
WriteToSqlServer
設定
使用方法
type: WriteToSqlServer
input: ...
config: ...
ReadFromText
テキストファイルから行を読み込みます。
結果のPCollectionは、「line」という名前の単一の文字列フィールドを持つ行で構成されます。
設定
- path
string: 読み込むファイルパス。パスには*や?などのglob文字を含めることができます。
使用方法
type: ReadFromText
config:
path: "path"
WriteToText
PCollectionをテキストファイル(の集合)に書き込みます。
入力は、スキーマに正確に1つのフィールドを持つPCollectionである必要があります。
設定
- path
string: 書き込むファイルパス。書き込まれるファイルは、このプレフィックスにシャード識別子が追加されたものになります。
使用方法
type: WriteToText
input: ...
config:
path: "path"