パイプラインのテスト
パイプラインのテストは、効果的なデータ処理ソリューションを開発する上で特に重要なステップです。ユーザーコードがリモートで実行されるパイプライングラフを構築するという、Beamモデルの間接的な性質により、失敗した実行のデバッグは簡単な作業ではありません。多くの場合、パイプラインのリモート実行をデバッグするよりも、パイプラインコードでローカルユニットテストを実行する方が高速で簡単です。
選択したランナーでパイプラインを実行する前に、パイプラインコードをローカルでユニットテストすることが、パイプラインコードのバグを特定して修正する最良の方法であることがよくあります。パイプラインをローカルでユニットテストすることで、使い慣れた/お気に入りのローカルデバッグツールを使用することもできます。
DirectRunner、またはPrismRunnerを使用できます。どちらもテストとローカル開発に役立つローカルランナーです。
パイプラインをローカルでテストした後、選択したランナーを使用して小規模でテストできます。たとえば、ローカルまたはリモートのFlinkクラスターでFlinkランナーを使用します。
Beam SDKには、パイプラインコードをユニットテストするためのさまざまな方法が、最も低いレベルから最も高いレベルまで用意されています。最も低いレベルから最も高いレベルまで、これらは次のとおりです。
- パイプラインで使用される個々の関数をテストできます。
- ユニットとして変換全体をテストできます。
- パイプライン全体のエンドツーエンドテストを実行できます。
ユニットテストをサポートするために、Java用Beam SDKは、testingパッケージに多数のテストクラスを提供しています。これらのテストを参考にガイドとして使用できます。
変換のテスト
作成した変換をテストするには、次のパターンを使用できます。
TestPipeline
を作成します。- いくつかの静的で既知のテスト入力データを作成します。
Create
変換を使用して、入力データのPCollection
を作成します。- 入力
PCollection
に変換を適用
し、結果の出力PCollection
を保存します。 PAssert
とそのサブクラスを使用して、出力PCollection
に期待する要素が含まれていることを確認します。
TestPipeline
TestPipelineは、変換をテストするために特別にBeam Java SDKに含まれているクラスです。
TestPipelineは、変換をテストするために特別にBeam Python SDKに含まれているクラスです。
テストでは、パイプラインオブジェクトを作成するときに、Pipeline
の代わりにTestPipeline
を使用します。Pipeline.create
とは異なり、TestPipeline.create
は内部的にPipelineOptions
の設定を処理します。次のようにTestPipeline
を作成します。
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
ptest.Main(m)
}
func TestPipeline(t *testing.T) {
...
// The Go SDK doesn't use a TestPipeline concept,
// and recommends using the ptest harness
// to wrap pipeline construction.
pr := ptest.BuildAndRun(t, func(s beam.Scope) {
...
})
...
}
注: Beamでの非境界パイプラインのテストについては、このブログ投稿で説明しています。
Create変換の使用
Create
変換を使用すると、JavaやPythonのList
などの標準のインメモリコレクションクラスからPCollection
を作成できます。詳細については、PCollectionの作成を参照してください。
PAssert
PAssertは、Beam Java SDKに含まれているクラスで、PCollection
の内容に関するアサーションです。PAssert
を使用して、PCollection
に特定の期待される要素のセットが含まれていることを確認できます。
指定されたPCollection
について、次のようにPAssert
を使用して内容を確認できます。
PAssert
を使用するJavaコードは、JUnit
とHamcrest
にリンクする必要があります。Mavenを使用している場合は、プロジェクトのpom.xml
ファイルに次の依存関係を追加することで、Hamcrest
にリンクできます。
これらのクラスの動作の詳細については、org.apache.beam.sdk.testingパッケージのドキュメントを参照してください。
複合変換のテスト例
以下のコードは、複合変換の完全なテストを示しています。このテストでは、String
要素の入力PCollection
にCount
変換を適用しています。このテストでは、List<String>
から入力PCollection
を作成するために、Create
変換を使用しています。
public class CountTest {
// Our static input data, which will make up the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// Create a test pipeline.
Pipeline p = TestPipeline.create();
// Create an input PCollection.
PCollection<String> input = p.apply(Create.of(WORDS));
// Apply the Count transform under test.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// Assert on the results.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountTest(unittest.TestCase):
def test_count(self):
# Our static input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Create a test pipeline.
with TestPipeline() as p:
# Create an input PCollection.
input = p | beam.Create(WORDS)
# Apply the Count transform under test.
output = input | beam.combiners.Count.PerElement()
# Assert on the results.
assert_that(
output,
equal_to([
("hi", 4),
("there", 1),
("sue", 2),
("bob", 2),
("", 3),
("ZOW", 1)]))
# The pipeline will run and verify the results.
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %d", w, c)
}
// Register the functional DoFn to ensure execution on workers.
func init() {
register.Function2x1(formatFn)
}
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Apply the Count transform under test.
output := stats.Count(s, col)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
パイプラインのエンドツーエンドテスト
Beam SDKのテストクラス(Java用Beam SDKのTestPipeline
やPAssert
など)を使用して、パイプライン全体をエンドツーエンドでテストできます。通常、パイプライン全体をテストするには、次の手順を実行します。
- パイプラインへのすべての入力データソースに対して、既知の静的テスト入力データを作成します。
- パイプラインの最終出力
PCollection
に期待される内容と一致する、静的なテスト出力データを作成します。 - 標準の
Pipeline.create
の代わりに、TestPipeline
を作成します。 - パイプラインの
Read
変換の代わりに、Create
変換を使用して、静的な入力データから1つ以上のPCollection
を作成します。 - パイプラインの変換を適用します。
- パイプラインの
Write
変換の代わりに、PAssert
を使用して、パイプラインが生成する最終的なPCollection
の内容が、静的な出力データ内の期待値と一致することを確認します。
WordCountパイプラインのテスト
次のコード例は、WordCountのパイプライン例をどのようにテストできるかを示しています。WordCount
は通常、入力データとしてテキストファイルから行を読み取ります。代わりに、このテストではいくつかのテキスト行を含むList<String>
を作成し、Create
変換を使用して最初のPCollection
を作成します。
WordCount
の最後の変換(複合変換CountWords
から)は、印刷に適した書式設定された単語カウントのPCollection<String>
を生成します。このPCollection
を出力テキストファイルに書き込むのではなく、テストパイプラインではPAssert
を使用して、PCollection
の要素が、期待される出力データを含む静的なString
配列の要素と一致することを確認します。
public class WordCountTest {
// Our static input data, which will comprise the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
// Our static output data, which is the expected data that the final PCollection must match.
static final String[] COUNTS_ARRAY = new String[] {
"hi: 5", "there: 1", "sue: 2", "bob: 2"};
// Example test that tests the pipeline's transforms.
public void testCountWords() throws Exception {
Pipeline p = TestPipeline.create();
// Create a PCollection from the WORDS static input data.
PCollection<String> input = p.apply(Create.of(WORDS));
// Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
PCollection<String> output = input.apply(new CountWords());
// Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountWords(beam.PTransform):
# CountWords transform omitted for conciseness.
# Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py
class WordCountTest(unittest.TestCase):
# Our input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Our output data, which is the expected data that the final PCollection must match.
EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]
# Example test that tests the pipeline's transforms.
def test_count_words(self):
with TestPipeline() as p:
# Create a PCollection from the WORDS static input data.
input = p | beam.Create(WORDS)
# Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
output = input | CountWords()
# Assert that the output PCollection matches the EXPECTED_COUNTS data.
assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')
# The pipeline will run and verify the results.
package wordcount
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }
func formatFn(w string, c int) string { ... }
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Run ALL the pipeline's transforms
// (in this case, the CountWords composite transform).
output := CountWords(s, input)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches
// the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
最終更新日: 2024/10/31
お探しの情報はすべて見つかりましたか?
すべて有益で分かりやすかったですか?変更したいことはありますか?ぜひお知らせください!