YAML からの PyTransform の使用
Beam YAML は、`PyTransform` タイプを介して Python 変換を簡単に呼び出す機能を提供し、完全修飾名で参照するだけで済みます。例えば、
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
は、`apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)` 変換を呼び出します。この完全修飾名は、任意の PTransform クラス、または PTransform を返す他の呼び出し可能オブジェクトにすることができます。ただし、スキーマ化されたデータを受け入れない、または返さない PTransform は、YAML から使用するのには適さない場合があります。スキーマを返さない変換の後にスキーマ性を復元するには、要素全体を入力として受け取る `MapToFields` の `callable` オプションを使用します。例えば、
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
- type: MapToFields
config:
language: python
fields:
col1:
callable: 'lambda element: element.col1'
output_type: string
col2:
callable: 'lambda element: element.col2'
output_type: integer
これは、Beam SDK で任意の変換を呼び出すために使用できます。例えば、
pipeline:
transforms:
- type: PyTransform
name: ReadFromTsv
input: {}
config:
constructor: apache_beam.io.ReadFromCsv
kwargs:
path: '/path/to/*.tsv'
sep: '\t'
skip_blank_lines: True
true_values: ['yes']
false_values: ['no']
comment: '#'
on_bad_lines: 'skip'
binary: False
splittable: False
__constructor__
を使用した変換のインライン定義
目的の変換が存在しない場合は、インラインで定義することもできます。これは、言語間変換を行う方法と同様に、特別な `__constructor__` キーワードを使用して行われます。
`__constuctor__` キーワードを使用すると、呼び出し時に目的の変換を *返す* Python 呼び出し可能オブジェクトを定義します。最初の引数(または位置引数がない場合は `source` キーワード引数)は Python コードとして解釈されます。例えば、
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
def create_my_transform(inc):
return beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
は、受信 PCollection に `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` を適用します。
クラスオブジェクトは独自のコンストラクターとして呼び出すことができるため、`beam.PTransform` をインラインで定義できます。例えば、
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
class MyPTransform(beam.PTransform):
def __init__(self, inc):
self._inc = inc
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))
inc: 10
これは期待どおりに動作します。
__callable__
を使用した変換のインライン定義
`__callable__` キーワードも同様に機能しますが、適用可能な `PTransform` を返す呼び出し可能オブジェクトを定義する代わりに、実行される展開を呼び出し可能オブジェクトとして定義するだけです。これは、BeamPython の `ptransform.ptransform_fn` デコレーターに類似しています。
この場合、単純に次のように記述できます。
- type: PyTransform
config:
constructor: __callable__
kwargs:
source: |
def my_ptransform(pcoll, inc):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
外部変換
`python` プロバイダーを介して他の場所で定義された PTransform を呼び出すこともできます。例えば、
pipeline:
transforms:
- ...
- type: MyTransform
config:
kwarg: whatever
providers:
- ...
- type: python
input: ...
config:
packages:
- 'some_pypi_package>=version'
transforms:
MyTransform: 'pkg.module.MyTransform'
これらは、依存関係の有無にかかわらず、インラインで定義することもできます。例えば、
pipeline:
transforms:
- ...
- type: ToCase
input: ...
config:
upper: True
providers:
- type: python
config: {}
transforms:
'ToCase': |
@beam.ptransform_fn
def ToCase(pcoll, upper):
if upper:
return pcoll | beam.Map(lambda x: str(x).upper())
else:
return pcoll | beam.Map(lambda x: str(x).lower())