Beam YAML エラー処理
パイプラインが大規模になるほど、形式が正しくない、適切な前提条件を満たしていない、あるいは処理中に何らかのエラーが発生する「例外的な」データに遭遇する可能性が高くなります。一般的に、このようなレコードはパイプラインの永続的な障害を引き起こしますが、多くの場合、パイプラインを継続し、不正なレコードを別のパスにリダイレクトして特別な処理を行ったり、後でオフライン分析するために記録したりすることが望ましいです。これは、しばしば「デッドレターキュー」パターンと呼ばれます。
Beam YAML は、変換が `output` フィールドを持つ `error_handling` 設定パラメータをサポートしている場合、このパターンを特別にサポートします。 `output` パラメータは、エラーを処理する別の変換(例:書き出し)への入力として参照される必要がある名前です。たとえば、次のコードは、すべての「正常な」処理済みレコードを1つのファイルに書き込み、エラーが発生した「不正な」レコードと、発生したエラーに関するメタデータを別のファイルに書き込みます。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
`error_handling` が宣言されている場合、`MapToFields.my_error_output` は**必ず**消費されなければなりません。これを無視するとエラーになります。どのような使用方法でも問題ありません。たとえば、不正なレコードを標準出力にログ出力するだけでも十分です(ただし、堅牢なパイプラインにはお勧めしません)。
また、エラー出力の正確な形式はまだ最終決定されていないことに注意してください。安全に印刷して出力に書き込むことはできますが、正確なスキーマは Beam の将来のバージョンで変更される可能性があり、まだ依存すべきではありません。現在、少なくともエラーの原因となった要素を保持する `element` フィールドがあります。
一部の変換では、`error_handling` 設定で追加の引数が許可されています。たとえば、Python関数の場合、パイプライン全体を障害とみなす前に不正なレコードの相対数を制限する `threshold` を指定できます。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
# If more than 10% of records throw an error, stop the pipeline.
threshold: 0.1
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
必要に応じて、これらの失敗したレコードに対して任意の追加処理を実行できます。例:
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: ComputeRatio
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: MapToFields
name: ComputeRatioForBadRecords
input: ComputeRatio.my_error_output
config:
language: python
fields:
col1: col1
ratio: col2 / (col3 + 1)
error_handling:
output: still_bad
- type: WriteToJson
# Takes as input everything from the "success" path of both transforms.
input: [ComputeRatio, ComputeRatioForBadRecords]
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
# These failed the first and the second transform.
input: ComputeRatioForBadRecords.still_bad
config:
path: /path/to/errors.json
`chain` 構文を使用する場合、必要なエラー消費は `extra_transforms` ブロックで行うことができます。
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: SomeStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: errors
- type: MapToFields
name: AnotherStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
inverse_ratio: 1 / ratio
error_handling:
output: errors
- type: WriteToJson
config:
path: /path/to/output.json
extra_transforms:
- type: WriteToJson
name: WriteErrors
input: [SomeStep.errors, AnotherStep.errors]
config:
path: /path/to/errors.json