YAML からの PyTransform の使用

Beam YAML は、`PyTransform` タイプを介して Python 変換を簡単に呼び出す機能を提供し、完全修飾名で参照するだけで済みます。例えば、

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3

は、`apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)` 変換を呼び出します。この完全修飾名は、任意の PTransform クラス、または PTransform を返す他の呼び出し可能オブジェクトにすることができます。ただし、スキーマ化されたデータを受け入れない、または返さない PTransform は、YAML から使用するのには適さない場合があります。スキーマを返さない変換の後にスキーマ性を復元するには、要素全体を入力として受け取る `MapToFields` の `callable` オプションを使用します。例えば、

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3
- type: MapToFields
  config:
    language: python
    fields:
      col1:
        callable: 'lambda element: element.col1'
        output_type: string
      col2:
        callable: 'lambda element: element.col2'
        output_type: integer

これは、Beam SDK で任意の変換を呼び出すために使用できます。例えば、

pipeline:
  transforms:
    - type: PyTransform
      name: ReadFromTsv
      input: {}
      config:
        constructor: apache_beam.io.ReadFromCsv
        kwargs:
           path: '/path/to/*.tsv'
           sep: '\t'
           skip_blank_lines: True
           true_values: ['yes']
           false_values: ['no']
           comment: '#'
           on_bad_lines: 'skip'
           binary: False
           splittable: False

__constructor__ を使用した変換のインライン定義

目的の変換が存在しない場合は、インラインで定義することもできます。これは、言語間変換を行う方法と同様に、特別な `__constructor__` キーワードを使用して行われます。

`__constuctor__` キーワードを使用すると、呼び出し時に目的の変換を *返す* Python 呼び出し可能オブジェクトを定義します。最初の引数(または位置引数がない場合は `source` キーワード引数)は Python コードとして解釈されます。例えば、

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        def create_my_transform(inc):
          return beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

は、受信 PCollection に `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` を適用します。

クラスオブジェクトは独自のコンストラクターとして呼び出すことができるため、`beam.PTransform` をインラインで定義できます。例えば、

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        class MyPTransform(beam.PTransform):
          def __init__(self, inc):
            self._inc = inc
          def expand(self, pcoll):
            return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))

      inc: 10

これは期待どおりに動作します。

__callable__ を使用した変換のインライン定義

`__callable__` キーワードも同様に機能しますが、適用可能な `PTransform` を返す呼び出し可能オブジェクトを定義する代わりに、実行される展開を呼び出し可能オブジェクトとして定義するだけです。これは、BeamPython の `ptransform.ptransform_fn` デコレーターに類似しています。

この場合、単純に次のように記述できます。

- type: PyTransform
  config:
    constructor: __callable__
    kwargs:
      source: |
        def my_ptransform(pcoll, inc):
          return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

外部変換

`python` プロバイダーを介して他の場所で定義された PTransform を呼び出すこともできます。例えば、

pipeline:
  transforms:
    - ...
    - type: MyTransform
      config:
        kwarg: whatever

providers:
  - ...
  - type: python
    input: ...
    config:
      packages:
        - 'some_pypi_package>=version'
    transforms:
      MyTransform: 'pkg.module.MyTransform'

これらは、依存関係の有無にかかわらず、インラインで定義することもできます。例えば、

pipeline:
  transforms:
    - ...
    - type: ToCase
      input: ...
      config:
        upper: True

providers:
  - type: python
    config: {}
    transforms:
      'ToCase': |
        @beam.ptransform_fn
        def ToCase(pcoll, upper):
          if upper:
            return pcoll | beam.Map(lambda x: str(x).upper())
          else:
            return pcoll | beam.Map(lambda x: str(x).lower())