Beam YAML エラー処理

パイプラインが大規模になるほど、形式が正しくない、適切な前提条件を満たしていない、あるいは処理中に何らかのエラーが発生する「例外的な」データに遭遇する可能性が高くなります。一般的に、このようなレコードはパイプラインの永続的な障害を引き起こしますが、多くの場合、パイプラインを継続し、不正なレコードを別のパスにリダイレクトして特別な処理を行ったり、後でオフライン分析するために記録したりすることが望ましいです。これは、しばしば「デッドレターキュー」パターンと呼ばれます。

Beam YAML は、変換が `output` フィールドを持つ `error_handling` 設定パラメータをサポートしている場合、このパターンを特別にサポートします。 `output` パラメータは、エラーを処理する別の変換(例:書き出し)への入力として参照される必要がある名前です。たとえば、次のコードは、すべての「正常な」処理済みレコードを1つのファイルに書き込み、エラーが発生した「不正な」レコードと、発生したエラーに関するメタデータを別のファイルに書き込みます。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

`error_handling` が宣言されている場合、`MapToFields.my_error_output` は**必ず**消費されなければなりません。これを無視するとエラーになります。どのような使用方法でも問題ありません。たとえば、不正なレコードを標準出力にログ出力するだけでも十分です(ただし、堅牢なパイプラインにはお勧めしません)。

また、エラー出力の正確な形式はまだ最終決定されていないことに注意してください。安全に印刷して出力に書き込むことはできますが、正確なスキーマは Beam の将来のバージョンで変更される可能性があり、まだ依存すべきではありません。現在、少なくともエラーの原因となった要素を保持する `element` フィールドがあります。

一部の変換では、`error_handling` 設定で追加の引数が許可されています。たとえば、Python関数の場合、パイプライン全体を障害とみなす前に不正なレコードの相対数を制限する `threshold` を指定できます。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output
          # If more than 10% of records throw an error, stop the pipeline.
          threshold: 0.1

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

必要に応じて、これらの失敗したレコードに対して任意の追加処理を実行できます。例:

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: ComputeRatio
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: MapToFields
      name: ComputeRatioForBadRecords
      input: ComputeRatio.my_error_output
      config:
        language: python
        fields:
          col1: col1
          ratio: col2 / (col3 + 1)
        error_handling:
          output: still_bad

    - type: WriteToJson
      # Takes as input everything from the "success" path of both transforms.
      input: [ComputeRatio, ComputeRatioForBadRecords]
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      # These failed the first and the second transform.
      input: ComputeRatioForBadRecords.still_bad
      config:
        path: /path/to/errors.json

`chain` 構文を使用する場合、必要なエラー消費は `extra_transforms` ブロックで行うことができます。

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: SomeStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: errors

    - type: MapToFields
      name: AnotherStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          inverse_ratio: 1 / ratio
        error_handling:
          output: errors

    - type: WriteToJson
      config:
        path: /path/to/output.json

  extra_transforms:
    - type: WriteToJson
      name: WriteErrors
      input: [SomeStep.errors, AnotherStep.errors]
      config:
        path: /path/to/errors.json