Apache Beamにおける無制限パイプラインのテスト

Beamプログラミングモデルは、バッチとストリーミングパイプラインの両方に対応するパイプライン記述を統一します。最近、無制限データセット上で実行され、順序外れおよび遅延データに対応する必要があるパイプラインのテストを記述するための新しいPTransformが導入されました。

ウォーターマーク、ウィンドウ、トリガーはBeamプログラミングモデルの中核を成しています。これらはそれぞれ、データのグループ化方法、入力の完了時期、結果の生成時期を決定します。これは、バウンドされた入力か無制限の入力かを問わず、すべてのパイプラインに当てはまります。Beamモデルにおけるウォーターマーク、ウィンドウ、トリガーに慣れていない場合は、ストリーミング101ストリーミング102が最適な出発点です。これらの記事の重要なポイント:断続的な障害や接続解除されたユーザーがいる現実的なストリーミングシナリオでは、データが順序外れに到着したり、遅延したりすることがあります。Beamのプリミティブは、ユーザーがこれらの課題にもかかわらず、有用で強力で正確な計算を実行する方法を提供します。

Beamパイプラインの開発者として、重要な障害シナリオとコーナーケースを網羅する包括的なテストが必要であり、パイプラインが本番環境に準備できているという確信を得る必要があります。Beam SDK内の既存のテストインフラストラクチャでは、実行時にパイプラインの内容を検査するテストを作成できます。しかし、遅延データを受信したり、複数回トリガーされたりするパイプラインの単体テストを作成することは、これまで複雑なものから不可能なものまでありました。これは、無制限のソースから読み取るパイプラインは外部からの介入なしにシャットダウンしない一方、制限されたソースからのみ読み取るパイプラインは、遅延データやほとんどの推測的トリガーでの動作をテストできないためです。追加のツールがなければ、カスタムトリガーを使用し、順序外れデータに対応するパイプラインは簡単にテストできませんでした。

このブログ投稿では、モバイルゲームの例シリーズのLeaderBoardパイプラインのコンテキストで、遅延データと順序外れデータに対応するパイプラインのテストを記述するための新しいフレームワークを紹介します。

LeaderBoardとモバイルゲームの例

LeaderBoardは、Beamモバイルゲームの例(およびチュートリアル)の一部であり、ユーザーとチームのスコアの継続的な集計を作成します。ユーザーのスコアはプログラムのライフタイムにわたって計算され、チームのスコアはデフォルトで1時間の長さの固定ウィンドウ内で計算されます。LeaderBoardパイプラインは、パイプラインの構成されたトリガーと許容される遅延に基づいて、必要に応じて推測的ペインと遅延ペインを生成します。LeaderBoardパイプラインの期待される出力は、ウォーターマークと処理時間の進行状況に関連して要素が到着したタイミングによって異なります。これは以前はテスト内で制御できませんでした。

非決定性をエミュレートする決定論的テストの記述

Beamのテストインフラストラクチャは、PAssertメソッドを提供します。これは、パイプライン内からPCollectionの内容に関するプロパティをアサートします。このインフラストラクチャは、TestStreamを含めるように拡張されました。これは、パイプラインに追加の要素を追加すること、TestStreamのウォーターマークを進めること、およびパイプラインの処理時間クロックを進めることからなる一連のイベントを実行するPTransformです。TestStreamは、トリガーがパイプラインが生成する出力に及ぼす影響を観察するテストを可能にします。

TestStreamから読み取るパイプラインを実行すると、読み取りは各イベントの結果がすべて完了するまで待機してから次のイベントに進みます。これにより、処理時間が進むと、処理時間に基づくトリガーが適切に発生します。このトランスフォームを使用すると、推測的ペインや遅延ペインへの反応、およびドロップされたデータを含め、トリガーと許容される遅延の影響をパイプラインで観察できます。

要素のタイミング

要素は、ウォーターマークの前、ウォーターマークと同時に、またはウォーターマークの後に到着します。これにより、「早期」、「時間通り」、「遅延」の3つのカテゴリに分類されます。ウィンドウに割り当てられるウィンドウと、ウィンドウ戦略で指定された最大許容遅延に応じて、「遅延」要素はさらに、「観測不能」、「観測可能」、「ドロップ可能」の遅延に細分化できます。これらのタイミングで到着する要素は、ペインに送出されます。ペインは、ペインが送出されたときのウォーターマークの位置に応じて、「EARLY」、「ON-TIME」、「LATE」になります。

TestStreamを使用すると、推測的ペインがトリガー条件を満たした後に出力されること、ウォーターマークの進展が時間通りのペインの生成を引き起こすこと、遅延データが最大許容遅延前に到着したときに結果を改良し、その後ドロップされることを示すテストを作成できます。

次の例では、TestStreamを使用して、要素の到着がウォーターマークの更新と処理時間の進展と混在する、パイプラインに一連のイベントを提供する方法を示しています。これらの各イベントは、追加のイベントが発生する前に完了します。

図では、「リアル」(イベント)時間でのイベント発生時刻はグラフが右に移動するにつれて進み、パイプラインがイベントを受信する時刻はグラフが上に移動するにつれて進みます。ウォーターマークは赤の波線で表され、各スターバーストはトリガーの発火とその関連ペインです。

Elements on the Event and Processing time axes, with the Watermark and produced panes

すべてが時間通りに到着

たとえば、すべてのデータがウォーターマークの前に到着するTestStreamを作成し、結果のPCollectionをCalculateTeamScores PTransformへの入力として提供する場合

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 12, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark past the end the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .plus(Duration.standardMinutes(1)))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));

結果のPCollectionに、到着した要素が含まれていることをアサートできます。

Elements all arrive before the watermark, and are produced in the on-time pane
// Only one value is emitted for the blue team
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

一部の要素は遅延していますが、ウィンドウの終了前に到着します

ウォーターマークの後、ウィンドウの終了前(赤のウォーターマークの左側)にTestStreamにデータを追加することもできます。これは「観測不能な遅延」データを示しています。つまり、遅延して到着しますが、ウォーターマークがウィンドウの終了時刻を通過する前に到着するため、システムによって時間通りに昇格されるデータです。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .minus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
An element arrives late, but before the watermark passes the end of the window, and is produced in the on-time pane
// Only one value is emitted for the blue team
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

要素は遅延しており、ウィンドウの終了時刻後に到着します

遅延データを追加する前にウォーターマークをさらに時間的に進めることで、システムが時間通りのペインを送出し、遅延データが到着した後に結果を改良するペインを送出するトリガー動作を示すことができます。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
                 new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
                                       .minus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
// An on-time pane is emitted with the events that arrived before the window closed
PAssert.that(teamScores)
       .inOnTimePane(window)
       .containsInAnyOrder(KV.of("blue", 6));
// The final pane contains the late refinement
PAssert.that(teamScores)
       .inFinalPane(window)
       .containsInAnyOrder(KV.of("blue", 18));
p.run();

要素は遅延しており、ウィンドウの終了時刻と許容される遅延時刻の後です

ウォーターマークを最大構成された許容遅延を超えてさらに将来に進めると、遅延要素がシステムによってドロップされることを示すことができます。

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("sky", "blue", 3, Duration.ZERO),
                 new GameActionInfo("navy", "blue", 3, Duration.standardMinutes(3)))
    // Move the watermark up to "near" the end of the window
    .advanceWatermarkTo(new Instant(0).plus(TEAM_WINDOW_DURATION)
                                         .plus(ALLOWED_LATENESS)
                                         .plus(Duration.standardMinutes(1)))
    .addElements(new GameActionInfo(
                     "sky",
                     "blue",
                     12,
                     new Instant(0).plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1))))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
    .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
// An on-time pane is emitted with the events that arrived before the window closed
PAssert.that(teamScores)
       .inWindow(window)
       .containsInAnyOrder(KV.of("blue", 6));

p.run();

要素はウィンドウの終了時刻前に到着し、処理時間が経過します

追加のメソッドを使用すると、TestStreamの処理時間を進めることで、推測的トリガーの動作を示すことができます。入力PCollectionに要素を追加し、処理時間クロックを時々進めて、`CalculateUserScores`を適用すると

TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
    .addElements(new GameActionInfo("scarlet", "red", 3, new Instant(0L)),
                 new GameActionInfo("scarlet", "red", 2, new Instant(0L).plus(Duration.standardMinutes(1))))
    .advanceProcessingTime(Duration.standardMinutes(12))
    .addElements(new GameActionInfo("oxblood", "red", 2, new Instant(0L)).plus(Duration.standardSeconds(22)),
                 new GameActionInfo("scarlet", "red", 4, new Instant(0L).plus(Duration.standardMinutes(2))))
    .advanceProcessingTime(Duration.standardMinutes(15))
    .advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> userScores =
    p.apply(createEvents).apply(new CalculateUserScores(ALLOWED_LATENESS));
Elements all arrive before the watermark, and are produced in the on-time pane
PAssert.that(userScores)
       .inEarlyGlobalWindowPanes()
       .containsInAnyOrder(KV.of("scarlet", 5),
                           KV.of("scarlet", 9),
                           KV.of("oxblood", 2));

p.run();

TestStream - 内部動作

TestStreamは、既存のランナーインフラストラクチャを利用しながら、ルートトランスフォームがランナーによっていつ呼び出されるかについて保証を提供するために、導入したパイプラインの概念であるクワイエセンスに依存します。これは、保留中の要素とトリガーに関するプロパティで構成されます。

  • トリガーの発火は許可されていませんが、発火していません。
  • すべての要素は、状態にバッファリングされているか、サイド入力を使用できるようになるまで進行できません。

簡単に言うと、入力ウォーターマークや処理時間の進展がない場合、またはパイプラインに追加の要素が追加されていない場合、パイプラインは進捗しません。TestStream PTransformがアクションを実行するたびに、ランナーはパイプラインがクワイエセンスになるまで同じインスタンスを再呼び出ししてはなりません。これにより、TestStreamで指定されたイベントが「順序どおり」に発生することが保証され、入力ウォーターマークとシステムクロックが、保持しようとした要素よりも先に進むことが防止されます。

DirectRunnerは、さらに作業をパイプラインに追加する信号としてクワイエセンスを使用するように変更され、そのランナーでのTestStreamの実装では、この事実を使用してイベントごとに単一の出力を実行します。DirectRunnerの実装は、ランナーのシステムクロックも直接制御するため、パイプライン内に数分間の処理時間トリガーがある場合でも、テストはすぐに完了します。

TestStreamトランスフォームはDirectRunnerでサポートされています。ほとんどのユーザーにとって、TestPipelineとPAssertsを使用して記述されたテストは、TestStreamを使用しても自動的に機能します。

まとめ

PAssertのウィンドウとペイン固有のマッチャーと併せてTestStreamを追加することで、推測的ペインと遅延ペインを生成するパイプラインのテストが可能になりました。これにより、すべてのスタイルのパイプラインのテストをJava SDK内で直接表現できます。ご質問やご意見がありましたら、メーリングリストでお知らせください。