Beam YAMLマッピング

Beam YAMLは、データを正しい形状にするために使用できる単純な変換を実行する機能があります。最も単純なものは`MapToFields`で、入力フィールドに基づいて新しいフィールドを持つレコードを作成します。

フィールド名の変更

フィールド名を変更するには、次のように記述します。

- type: MapToFields
  config:
    fields:
      new_col1: col1
      new_col2: col2

これにより、それぞれ`col1`と`col2`の値(入力スキーマの2つのフィールド名)を持つ`new_col1`と`new_col2`の2つのフィールドを持つ出力レコードが生成されます。

SQLの`SELECT`文の`*`と同様に、元のフィールドを保持する必要があることを示す`append`パラメータを指定できます。たとえば

- type: MapToFields
  config:
    append: true
    fields:
      new_col1: col1
      new_col2: col2

は、`new_col1`と`new_col2`を*追加*フィールドとして持つレコードを出力します。`append`フィールドが指定されている場合、フィールドを削除することもできます。例:

- type: MapToFields
  config:
    append: true
    drop:
      - col3
    fields:
      new_col1: col1
      new_col2: col2

は、2つの新しいフィールドに加えて、`col3`を除くすべての元のフィールドを含みます。

マッピング関数

もちろん、フィールドの削除と名前変更以上の変換を行いたい場合があります。Beam YAMLは、単純なUDFをインライン化する機能を備えています。これには、言語の指定が必要です。たとえば、入力フィールドを参照するPython式を提供できます。

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "col1.upper()"
      another_col: "col2 + col3"

さらに、行を引数として取る完全なPython呼び出し可能関数を提供して、より複雑なマッピングを行うことができます(許容される形式についてはPythonCallableSourceを参照)。したがって、次のように記述できます。

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: |
          import re
          def my_mapping(row):
            if re.match("[0-9]+", row.col1) and row.col2 > 0:
              return "good"
            else:
              return "bad"

ある程度の複雑さに達したら、これを依存関係としてパッケージ化し、完全修飾名で参照する方が適切な場合があります。例:

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: pkg.module.fn

関数ロジックをファイルに保存し、関数名を指定することもできます。例:

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        path: /path/to/some/udf.py
        name: my_mapping

現在、Pythonに加えて、Java、SQL、JavaScript(実験的)式もサポートされています。

Java

Javaマッピングを使用する場合、単純な式の場合でも、UDF型を宣言する必要があります。例:

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        expression: col1.toUpperCase()

呼び出し可能UDFの場合、Javaでは、関数を`java.util.function.Function`を実装するクラスとして宣言する必要があります。例:

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        callable: |
          import org.apache.beam.sdk.values.Row;
          import java.util.function.Function;
          public class MyFunction implements Function<Row, String> {
            public String apply(Row row) {
              return row.getString("col1").toUpperCase();
            }
          }

SQL

SQLが`MapToFields` UDFに使用される場合、それは本質的にSQLの`SELECT`文です。

たとえば、クエリ`SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION`は次のようになります。

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "UPPER(col1)"
      another_col: "col2 + col3"

出力に含めたくないフィールドは、`drop`フィールドに追加する必要があることに注意してください。

予約済みのSQLキーワードと衝突するフィールドを選択する場合は、フィールドをバッククォートで囲む必要があります。たとえば、入力PCollectionに「timestamp」フィールドがある場合、次のように記述する必要があります。

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "`timestamp`"

**注記**: `drop`で定義されたフィールドマッピングタグとフィールドをエスケープする必要はありません。UDF自体が有効なSQL文である必要があります。

汎用

言語が指定されていない場合、式のセットは既存のフィールドと整数、浮動小数点数、または文字列リテラルに限定されます。例:

- type: MapToFields
  config:
    fields:
      new_col: col1
      int_literal: 389
      float_litera: 1.90216
      str_literal: '"example"'  # note the double quoting

FlatMap

入力レコードごとに複数のレコードを出力する場合があります。これは、反復可能な型にマッピングし、`Explode`操作を続けて行うことで実現できます。例:

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "col2 + col3"
- type: Explode
  config:
    fields: new_col

は、入力レコードごとに3つの出力レコードを生成します。

複数のレコードを展開する場合は、すべてのフィールドの直積をとる必要があるかどうかを指定する必要があります。たとえば

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: true

は9つのレコードを出力しますが

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: false

は3つだけを出力します。

対象フィールドが既に反復可能な型である場合、`Explode`操作を単独で使用できます。

- type: Explode
  config:
    fields: [col1]

フィルタリング

特定の条件を満たすレコードだけを保持することが望ましい場合があります。これは`Filter`変換を使用して行うことができます。例:

- type: Filter
  config:
    keep: "col2 > 0"

既存のフィールドと数値リテラル間の単純な比較よりも複雑な場合は、`language`パラメータを提供する必要があります。例:

- type: Filter
  config:
    language: python
    keep: "col2 + col3 > 0"

より複雑なフィルタリング関数の場合、行を引数として取る完全なPython呼び出し可能関数を提供して、より複雑なマッピングを行うことができます(許容される形式についてはPythonCallableSourceを参照)。したがって、次のように記述できます。

- type: Filter
  config:
    language: python
    keep:
      callable: |
        import re
        def my_filter(row):
          return re.match("[0-9]+", row.col1) and row.col2 > 0

ある程度の複雑さに達したら、これを依存関係としてパッケージ化し、完全修飾名で参照する方が適切な場合があります。例:

- type: Filter
  config:
    language: python
    keep:
      callable: pkg.module.fn

関数ロジックをファイルに保存し、関数名を指定することもできます。例:

- type: Filter
  config:
    language: python
    keep:
      path: /path/to/some/udf.py
      name: my_filter

現在、Pythonに加えて、Java、SQL、JavaScript(実験的)式もサポートされています。

Java

Javaフィルタリングを使用する場合、単純な式の場合でも、UDF型を宣言する必要があります。例:

- type: Filter
  config:
    language: java
    keep:
      expression: col2 > 0

呼び出し可能UDFの場合、Javaでは、関数を`java.util.function.Function`を実装するクラスとして宣言する必要があります。例:

- type: Filter
  config:
    language: java
    keep:
      callable: |
        import org.apache.beam.sdk.values.Row;
        import java.util.function.Function;
        import java.util.regex.Pattern;
        public class MyFunction implements Function<Row, Boolean> {
          public Boolean apply(Row row) {
            Pattern pattern = Pattern.compile("[0-9]+");
            return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
          }
        }

SQL

マッピング関数と同様に、SQLが`MapToFields` UDFに使用される場合、それは本質的にSQLの`WHERE`文です。

たとえば、クエリ`SELECT * FROM PCOLLECTION WHERE col2 > 0`は次のようになります。

- type: Filter
  config:
    language: sql
    keep: "col2 > 0"

予約済みのSQLキーワードと衝突するフィールドでフィルタリングする場合は、フィールドをバッククォートで囲む必要があります。たとえば、入力PCollectionに「timestamp」フィールドがある場合、次のように記述する必要があります。

- type: Filter
  config:
    language: sql
    keep: "`timestamp` > 0"

パーティショニング

他のSDKのサイド出力で行われていることと同様に、異なる要素を異なる場所に送信することが役立つ場合があります。これは`Filter`操作のセットで行うことができますが、すべての要素に単一の宛先がある場合、代わりに`Partition`変換を使用する方が自然です。これは、`col1`が`"a"`であるすべての要素を`Partition.a`出力に送信します。

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']

- type: SomeTransform
  input: Partition.a
  config:
    param: ...

- type: AnotherTransform
  input: Partition.b
  config:
    param: ...

宛先を関数として指定することもできます。例:

- type: Partition
  input: input
  config:
    by: "'even' if col2 % 2 == 0 else 'odd'"
    language: python
    outputs: ['even', 'odd']

名前付き出力にないすべての要素を捕捉する包括的な出力をオプションで提供できます(そうでなければエラーになります)。

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']
    unknown_output: 'other'

PCollectionを必ずしも重複しない複数のPCollectionに分割したい場合があります。要素を複数の(またはゼロの)出力に送信するには、反復可能な列を使用し、`Partition`の前に`Explode`を使用できます。

- type: Explode
  input: input
  config:
    fields: col1

- type: Partition
  input: Explode
  config:
    by: col1
    outputs: ['a', 'b', 'c']

Beamはマッピングに関与する型を推論しようとしますが、不可能な場合もあります。このような場合は、期待される出力型を明示的に指定できます。例:

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string

期待される型はJSONスキーマ表記で指定されます。ただし、最上位の基本型は`{type: 'basic_type_name'}`のネストを必要とせずにリテラル文字列として指定できます。

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string
      another_col:
        expression: "beam.Row(a=col1, b=[col2])"
        output_type:
          type: 'object'
          properties:
            a:
              type: 'string'
            b:
              type: 'array'
              items:
                type: 'number'

これは、`beam:logical:pythonsdk_any:v1`型を処理できないことに関連するエラーを解決するのに特に役立ちます。