パイプラインのテスト

パイプラインのテストは、効果的なデータ処理ソリューションを開発する上で特に重要なステップです。ユーザーコードがリモートで実行されるパイプライングラフを構築するという、Beamモデルの間接的な性質により、失敗した実行のデバッグは簡単な作業ではありません。多くの場合、パイプラインのリモート実行をデバッグするよりも、パイプラインコードでローカルユニットテストを実行する方が高速で簡単です。

選択したランナーでパイプラインを実行する前に、パイプラインコードをローカルでユニットテストすることが、パイプラインコードのバグを特定して修正する最良の方法であることがよくあります。パイプラインをローカルでユニットテストすることで、使い慣れた/お気に入りのローカルデバッグツールを使用することもできます。

DirectRunner、またはPrismRunnerを使用できます。どちらもテストとローカル開発に役立つローカルランナーです。

パイプラインをローカルでテストした後、選択したランナーを使用して小規模でテストできます。たとえば、ローカルまたはリモートのFlinkクラスターでFlinkランナーを使用します。

Beam SDKには、パイプラインコードをユニットテストするためのさまざまな方法が、最も低いレベルから最も高いレベルまで用意されています。最も低いレベルから最も高いレベルまで、これらは次のとおりです。

ユニットテストをサポートするために、Java用Beam SDKは、testingパッケージに多数のテストクラスを提供しています。これらのテストを参考にガイドとして使用できます。

変換のテスト

作成した変換をテストするには、次のパターンを使用できます。

TestPipeline

TestPipelineは、変換をテストするために特別にBeam Java SDKに含まれているクラスです。

TestPipelineは、変換をテストするために特別にBeam Python SDKに含まれているクラスです。

テストでは、パイプラインオブジェクトを作成するときに、Pipelineの代わりにTestPipelineを使用します。Pipeline.createとは異なり、TestPipeline.createは内部的にPipelineOptionsの設定を処理します。

次のようにTestPipelineを作成します。

Pipeline p = TestPipeline.create();
with TestPipeline as p:
    ...
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"

// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
	ptest.Main(m)
}

func TestPipeline(t *testing.T) {
	...
	// The Go SDK doesn't use a TestPipeline concept,
	// and recommends using the ptest harness
	// to wrap pipeline construction.
	pr := ptest.BuildAndRun(t, func(s beam.Scope) {
		...
	})
	...
}

注: Beamでの非境界パイプラインのテストについては、このブログ投稿で説明しています。

Create変換の使用

Create変換を使用すると、JavaやPythonのListなどの標準のインメモリコレクションクラスからPCollectionを作成できます。詳細については、PCollectionの作成を参照してください。

PAssert

PAssertは、Beam Java SDKに含まれているクラスで、PCollectionの内容に関するアサーションです。PAssertを使用して、PCollectionに特定の期待される要素のセットが含まれていることを確認できます。

指定されたPCollectionについて、次のようにPAssertを使用して内容を確認できます。

PCollection<String> output = ...;

// Check whether a PCollection contains some elements in any order.
PAssert.that(output)
.containsInAnyOrder(
  "elem1",
  "elem3",
  "elem2");
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

output = ...

# Check whether a PCollection contains some elements in any order.
assert_that(
    output,
    equal_to(["elem1", "elem3", "elem2"]))
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"

output := ... // beam.PCollection

// Check whether a PCollection contains some elements in any order.
passert.EqualsList(s, output, ["elem1", "elem3", "elem2"])

PAssertを使用するJavaコードは、JUnitHamcrestにリンクする必要があります。Mavenを使用している場合は、プロジェクトのpom.xmlファイルに次の依存関係を追加することで、Hamcrestにリンクできます。

<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest</artifactId>
    <version>2.2</version>
    <scope>test</scope>
</dependency>

これらのクラスの動作の詳細については、org.apache.beam.sdk.testingパッケージのドキュメントを参照してください。

複合変換のテスト例

以下のコードは、複合変換の完全なテストを示しています。このテストでは、String要素の入力PCollectionCount変換を適用しています。このテストでは、List<String>から入力PCollectionを作成するために、Create変換を使用しています。

public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
  "hi", "there", "hi", "hi", "sue", "bob",
  "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

  public void testCount() {
    // Create a test pipeline.
    Pipeline p = TestPipeline.create();

    // Create an input PCollection.
    PCollection<String> input = p.apply(Create.of(WORDS));

    // Apply the Count transform under test.
    PCollection<KV<String, Long>> output =
      input.apply(Count.<String>perElement());

    // Assert on the results.
    PAssert.that(output)
      .containsInAnyOrder(
          KV.of("hi", 4L),
          KV.of("there", 1L),
          KV.of("sue", 2L),
          KV.of("bob", 2L),
          KV.of("", 3L),
          KV.of("ZOW", 1L));

    // Run the pipeline.
    p.run();
  }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountTest(unittest.TestCase):

  def test_count(self):
    # Our static input data, which will make up the initial PCollection.
    WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
    ]
    # Create a test pipeline.
    with TestPipeline() as p:

      # Create an input PCollection.
      input = p | beam.Create(WORDS)

      # Apply the Count transform under test.
      output = input | beam.combiners.Count.PerElement()

      # Assert on the results.
      assert_that(
        output,
        equal_to([
            ("hi", 4),
            ("there", 1),
            ("sue", 2),
            ("bob", 2),
            ("", 3),
            ("ZOW", 1)]))

      # The pipeline will run and verify the results.
import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)

// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
	return fmt.Sprintf("%s: %d", w, c)
}

// Register the functional DoFn to ensure execution on workers.
func init() {
	register.Function2x1(formatFn)
}

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Apply the Count transform under test.
		output := stats.Count(s, col)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}

パイプラインのエンドツーエンドテスト

Beam SDKのテストクラス(Java用Beam SDKのTestPipelinePAssertなど)を使用して、パイプライン全体をエンドツーエンドでテストできます。通常、パイプライン全体をテストするには、次の手順を実行します。

WordCountパイプラインのテスト

次のコード例は、WordCountのパイプライン例をどのようにテストできるかを示しています。WordCountは通常、入力データとしてテキストファイルから行を読み取ります。代わりに、このテストではいくつかのテキスト行を含むList<String>を作成し、Create変換を使用して最初のPCollectionを作成します。

WordCountの最後の変換(複合変換CountWordsから)は、印刷に適した書式設定された単語カウントのPCollection<String>を生成します。このPCollectionを出力テキストファイルに書き込むのではなく、テストパイプラインではPAssertを使用して、PCollectionの要素が、期待される出力データを含む静的なString配列の要素と一致することを確認します。

public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.

    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS));

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountWords(beam.PTransform):
    # CountWords transform omitted for conciseness.
    # Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py

class WordCountTest(unittest.TestCase):

  # Our input data, which will make up the initial PCollection.
  WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
  ]

  # Our output data, which is the expected data that the final PCollection must match.
  EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]

  # Example test that tests the pipeline's transforms.

  def test_count_words(self):
    with TestPipeline() as p:

      # Create a PCollection from the WORDS static input data.
      input = p | beam.Create(WORDS)

      # Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      output = input | CountWords()

      # Assert that the output PCollection matches the EXPECTED_COUNTS data.
      assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')

    # The pipeline will run and verify the results.
package wordcount

import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go

func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }

func formatFn(w string, c int) string { ... }

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Run ALL the pipeline's transforms
		// (in this case, the CountWords composite transform).
		output := CountWords(s, input)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches
		// the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}