Beam YAMLマッピング
Beam YAMLは、データを正しい形状にするために使用できる単純な変換を実行する機能があります。最も単純なものは`MapToFields`で、入力フィールドに基づいて新しいフィールドを持つレコードを作成します。
フィールド名の変更
フィールド名を変更するには、次のように記述します。
- type: MapToFields
config:
fields:
new_col1: col1
new_col2: col2
これにより、それぞれ`col1`と`col2`の値(入力スキーマの2つのフィールド名)を持つ`new_col1`と`new_col2`の2つのフィールドを持つ出力レコードが生成されます。
SQLの`SELECT`文の`*`と同様に、元のフィールドを保持する必要があることを示す`append`パラメータを指定できます。たとえば
- type: MapToFields
config:
append: true
fields:
new_col1: col1
new_col2: col2
は、`new_col1`と`new_col2`を*追加*フィールドとして持つレコードを出力します。`append`フィールドが指定されている場合、フィールドを削除することもできます。例:
- type: MapToFields
config:
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
は、2つの新しいフィールドに加えて、`col3`を除くすべての元のフィールドを含みます。
マッピング関数
もちろん、フィールドの削除と名前変更以上の変換を行いたい場合があります。Beam YAMLは、単純なUDFをインライン化する機能を備えています。これには、言語の指定が必要です。たとえば、入力フィールドを参照するPython式を提供できます。
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
さらに、行を引数として取る完全なPython呼び出し可能関数を提供して、より複雑なマッピングを行うことができます(許容される形式についてはPythonCallableSourceを参照)。したがって、次のように記述できます。
- type: MapToFields
config:
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
ある程度の複雑さに達したら、これを依存関係としてパッケージ化し、完全修飾名で参照する方が適切な場合があります。例:
- type: MapToFields
config:
language: python
fields:
new_col:
callable: pkg.module.fn
関数ロジックをファイルに保存し、関数名を指定することもできます。例:
- type: MapToFields
config:
language: python
fields:
new_col:
path: /path/to/some/udf.py
name: my_mapping
現在、Pythonに加えて、Java、SQL、JavaScript(実験的)式もサポートされています。
Java
Javaマッピングを使用する場合、単純な式の場合でも、UDF型を宣言する必要があります。例:
- type: MapToFields
config:
language: java
fields:
new_col:
expression: col1.toUpperCase()
呼び出し可能UDFの場合、Javaでは、関数を`java.util.function.Function`を実装するクラスとして宣言する必要があります。例:
- type: MapToFields
config:
language: java
fields:
new_col:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return row.getString("col1").toUpperCase();
}
}
SQL
SQLが`MapToFields` UDFに使用される場合、それは本質的にSQLの`SELECT`文です。
たとえば、クエリ`SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION`は次のようになります。
- type: MapToFields
config:
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
出力に含めたくないフィールドは、`drop`フィールドに追加する必要があることに注意してください。
予約済みのSQLキーワードと衝突するフィールドを選択する場合は、フィールドをバッククォートで囲む必要があります。たとえば、入力PCollectionに「timestamp」フィールドがある場合、次のように記述する必要があります。
- type: MapToFields
config:
language: sql
fields:
new_col: "`timestamp`"
**注記**: `drop`で定義されたフィールドマッピングタグとフィールドをエスケープする必要はありません。UDF自体が有効なSQL文である必要があります。
汎用
言語が指定されていない場合、式のセットは既存のフィールドと整数、浮動小数点数、または文字列リテラルに限定されます。例:
- type: MapToFields
config:
fields:
new_col: col1
int_literal: 389
float_litera: 1.90216
str_literal: '"example"' # note the double quoting
FlatMap
入力レコードごとに複数のレコードを出力する場合があります。これは、反復可能な型にマッピングし、`Explode`操作を続けて行うことで実現できます。例:
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
- type: Explode
config:
fields: new_col
は、入力レコードごとに3つの出力レコードを生成します。
複数のレコードを展開する場合は、すべてのフィールドの直積をとる必要があるかどうかを指定する必要があります。たとえば
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
は9つのレコードを出力しますが
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
は3つだけを出力します。
対象フィールドが既に反復可能な型である場合、`Explode`操作を単独で使用できます。
- type: Explode
config:
fields: [col1]
フィルタリング
特定の条件を満たすレコードだけを保持することが望ましい場合があります。これは`Filter`変換を使用して行うことができます。例:
- type: Filter
config:
keep: "col2 > 0"
既存のフィールドと数値リテラル間の単純な比較よりも複雑な場合は、`language`パラメータを提供する必要があります。例:
- type: Filter
config:
language: python
keep: "col2 + col3 > 0"
より複雑なフィルタリング関数の場合、行を引数として取る完全なPython呼び出し可能関数を提供して、より複雑なマッピングを行うことができます(許容される形式についてはPythonCallableSourceを参照)。したがって、次のように記述できます。
- type: Filter
config:
language: python
keep:
callable: |
import re
def my_filter(row):
return re.match("[0-9]+", row.col1) and row.col2 > 0
ある程度の複雑さに達したら、これを依存関係としてパッケージ化し、完全修飾名で参照する方が適切な場合があります。例:
- type: Filter
config:
language: python
keep:
callable: pkg.module.fn
関数ロジックをファイルに保存し、関数名を指定することもできます。例:
- type: Filter
config:
language: python
keep:
path: /path/to/some/udf.py
name: my_filter
現在、Pythonに加えて、Java、SQL、JavaScript(実験的)式もサポートされています。
Java
Javaフィルタリングを使用する場合、単純な式の場合でも、UDF型を宣言する必要があります。例:
- type: Filter
config:
language: java
keep:
expression: col2 > 0
呼び出し可能UDFの場合、Javaでは、関数を`java.util.function.Function`を実装するクラスとして宣言する必要があります。例:
- type: Filter
config:
language: java
keep:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import java.util.regex.Pattern;
public class MyFunction implements Function<Row, Boolean> {
public Boolean apply(Row row) {
Pattern pattern = Pattern.compile("[0-9]+");
return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
}
}
SQL
マッピング関数と同様に、SQLが`MapToFields` UDFに使用される場合、それは本質的にSQLの`WHERE`文です。
たとえば、クエリ`SELECT * FROM PCOLLECTION WHERE col2 > 0`は次のようになります。
- type: Filter
config:
language: sql
keep: "col2 > 0"
予約済みのSQLキーワードと衝突するフィールドでフィルタリングする場合は、フィールドをバッククォートで囲む必要があります。たとえば、入力PCollectionに「timestamp」フィールドがある場合、次のように記述する必要があります。
- type: Filter
config:
language: sql
keep: "`timestamp` > 0"
パーティショニング
他のSDKのサイド出力で行われていることと同様に、異なる要素を異なる場所に送信することが役立つ場合があります。これは`Filter`操作のセットで行うことができますが、すべての要素に単一の宛先がある場合、代わりに`Partition`変換を使用する方が自然です。これは、`col1`が`"a"`であるすべての要素を`Partition.a`出力に送信します。
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
- type: SomeTransform
input: Partition.a
config:
param: ...
- type: AnotherTransform
input: Partition.b
config:
param: ...
宛先を関数として指定することもできます。例:
- type: Partition
input: input
config:
by: "'even' if col2 % 2 == 0 else 'odd'"
language: python
outputs: ['even', 'odd']
名前付き出力にないすべての要素を捕捉する包括的な出力をオプションで提供できます(そうでなければエラーになります)。
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
unknown_output: 'other'
PCollectionを必ずしも重複しない複数のPCollectionに分割したい場合があります。要素を複数の(またはゼロの)出力に送信するには、反復可能な列を使用し、`Partition`の前に`Explode`を使用できます。
- type: Explode
input: input
config:
fields: col1
- type: Partition
input: Explode
config:
by: col1
outputs: ['a', 'b', 'c']
型
Beamはマッピングに関与する型を推論しようとしますが、不可能な場合もあります。このような場合は、期待される出力型を明示的に指定できます。例:
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
期待される型はJSONスキーマ表記で指定されます。ただし、最上位の基本型は`{type: 'basic_type_name'}`のネストを必要とせずにリテラル文字列として指定できます。
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
another_col:
expression: "beam.Row(a=col1, b=[col2])"
output_type:
type: 'object'
properties:
a:
type: 'string'
b:
type: 'array'
items:
type: 'number'
これは、`beam:logical:pythonsdk_any:v1`型を処理できないことに関連するエラーを解決するのに特に役立ちます。