Beamにおける単体テスト:意見に基づくガイド

テストは、ソフトウェアエンジニアリングにおける最も基本的な構成要素の1つです。このブログ記事では、Apache Beamがテストのために提供するいくつかの構成要素について説明します。データパイプラインの単体テストを作成するための、意見に基づいたベストプラクティスについて解説します。この記事には統合テストは含まれておらず、それらは別途作成する必要があります。この記事のすべてのコードスニペットは、このノートブックに含まれています。さらに、ベストプラクティスを示すテストを見るには、ベストプラクティスを示すテストを含むBeamスタータープロジェクトをご覧ください。

ベストプラクティス

Beamパイプラインをテストする際には、次のベストプラクティスをお勧めします。

  1. ReadFromBigQueryWriteToTextなど、Beamライブラリですでにサポートされているコネクタについては、単体テストを作成しないでください。これらのコネクタは、Beamのテストスイートですでにテストされており、正しい機能が保証されています。単体テストに不要なコストと依存関係を追加するだけです。

  2. MapFlatMap、またはFilterと共に使用する場合は、関数が十分にテストされていることを確認してください。Map(your_function)を使用する場合、関数は意図したとおりに動作すると仮定できます。

  3. ParDo、サイド入力、タイムスタンプ検査など、より複雑な変換については、変換全体を単体として扱い、テストしてください。

  4. 必要に応じて、モッキングを使用して、DoFnに存在する可能性のあるAPI呼び出しをモックしてください。モッキングの目的は、API呼び出しからの特定のレスポンスが必要であっても、機能を徹底的にテストすることです。

    1. API呼び出しをDoFn内で直接行うのではなく、別々の関数でモジュール化してください。この手順により、外部API呼び出しをモックする際のよりクリーンなエクスペリエンスが提供されます。

例1

次のパイプラインを例として使用します。関数median_house_value_per_bedroomはコードの他の場所で単体テストされていると仮定すると、このパイプラインのコンテキストでこの関数をテストするために、個別の単体テストを作成する必要はありません。Mapプリミティブは期待通りに動作すると信頼できます(これは、前に述べたポイント#2を示しています)。

# The following code computes the median house value per bedroom.

with beam.Pipeline() as p1:
   result = (
       p1
       | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
       | beam.Map(median_house_value_per_bedroom)
       | WriteToText("/content/example2")
   )

例2

次の関数を例として使用します。関数median_house_value_per_bedroommultiply_by_factorは他の場所でテストされていますが、複合変換で構成されるパイプライン全体はテストされていません。

with beam.Pipeline() as p2:
    result = (
        p2
        | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
        | beam.Map(median_house_value_per_bedroom)
        | beam.Map(multiply_by_factor)
        | beam.CombinePerKey(sum)
        | WriteToText("/content/example3")
    )

前のコードのベストプラクティスは、ReadFromTextWriteToTextの間のすべての関数を含む変換を作成することです。この手順により、変換ロジックをI/Oから分離し、変換ロジックを単体テストできます。次の例は、前のコードのリファクタリングです。

def transform_data_set(pcoll):
  return (pcoll
          | beam.Map(median_house_value_per_bedroom)
          | beam.Map(multiply_by_factor)
          | beam.CombinePerKey(sum))

# Define a new class that inherits from beam.PTransform.
class MapAndCombineTransform(beam.PTransform):
  def expand(self, pcoll):
    return transform_data_set(pcoll)

with beam.Pipeline() as p2:
   result = (
       p2
       | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
       | MapAndCombineTransform()
       | WriteToText("/content/example3")
   )

このコードは、前の例の対応する単体テストを示しています。

import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestBeam(unittest.TestCase):

# This test corresponds to example 3, and is written to confirm the pipeline works as intended.
  def test_transform_data_set(self):
    expected=[(1, 10570.185786231425), (2, 13.375337533753376), (3, 13.315649867374006)]
    input_elements = [
      '-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
      '121.05,99.99,23.30,39.5,55.55,41.01,10,34,74.30,91.91',
      '122.05,100.99,24.30,40.5,56.55,42.01,11,35,75.30,92.91',
      '-120.05,39.37,29.00,4085.00,681.00,1557.00,626.00,6.8085,364700.00'
    ]
    with beam.Pipeline() as p2:
      result = (
                p2
                | beam.Create(input_elements)
                | beam.Map(MapAndCombineTransform())
        )
      assert_that(result,equal_to(expected))

例3

JSONファイルからデータを読み取り、外部API呼び出しを行うカスタム関数を通してデータを渡し、カスタム宛先に書き込むパイプラインを作成するとします(たとえば、ダウンストリームアプリケーション用にデータを準備するために、カスタムデータフォーマットを行う必要がある場合)。

パイプラインは次の構造になっています。

# The following packages are used to run the example pipelines.

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

class MyDoFn(beam.DoFn):
  def process(self,element):
          returned_record = MyApiCall.get_data("http://my-api-call.com")
          if len(returned_record)!=10:
            raise ValueError("Length of record does not match expected length")
          yield returned_record

with beam.Pipeline() as p3:
  result = (
          p3
          | ReadFromText("/content/sample_data/anscombe.json")
          | beam.ParDo(MyDoFn())
          | WriteToText("/content/example1")
  )

このテストは、APIレスポンスが間違った長さのレコードかどうかをチェックし、テストが失敗した場合に予想されるエラーをスローします。

!pip install mock  # Install the 'mock' module.
# Import the mock package for mocking functionality.
from unittest.mock import Mock,patch
# from MyApiCall import get_data
import mock


# MyApiCall is a function that calls get_data to fetch some data by using an API call.
@patch('MyApiCall.get_data')
def test_error_message_wrong_length(self, mock_get_data):
 response = ['field1','field2']
 mock_get_data.return_value = Mock()
 mock_get_data.return_value.json.return_value=response

 input_elements = ['-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000'] #input length 9
 with self.assertRaisesRegex(ValueError,
                             "Length of record does not match expected length'"):
     p3 = beam.Pipeline()
     result = p3 | beam.create(input_elements) | beam.ParDo(MyDoFn())
     result

その他のテストのベストプラクティス

  1. 発生させるすべてのエラーメッセージをテストします。
  2. データに存在する可能性のあるエッジケースをカバーします。
  3. 例1では、beam.Map(median_house_value_per_bedroom)ではなくラムダ関数を使用してbeam.Mapステップを記述できた可能性があります。
beam.Map(lambda x: x.strip().split(',')) | beam.Map(lambda x: float(x[8])/float(x[4])

beam.Map(median_house_value_per_bedroom)を使用してラムダ関数をヘルパー関数に分離することは、関数の変更がモジュール化されるため、よりテスト可能なコードのための推奨アプローチです。

  1. 前の例のように、assert_thatステートメントを使用して、PCollection値が正しく一致することを確認します。

BeamとDataflowでのテストに関する詳細なガイダンスについては、Google Cloudドキュメントを参照してください。Beamでの単体テストのさらなる例については、base_test.pyコードを参照してください。

この投稿のアイデアを洗練するのに役立ったRobert Bradshaw、Danny McCormick、XQ Hu、Surjit Singh、Rebecca Spzerに特別な感謝を述べます。