Apache Beamにおけるループタイマー

Apache Beam のプリミティブを使用すると、さまざまなユースケースに適した表現力豊かなデータパイプラインを構築できます。特定のユースケースの 1 つは時系列データの分析であり、ウィンドウ境界を越えた連続シーケンスが重要になります。このタイプのデータに取り組む際には、いくつかの興味深い課題が生じます。このブログでは、それらの 1 つを詳しく見て、「ループタイマー」パターンを使用してタイマー API を活用します(ブログ投稿)。

Beam のストリーミングモードでは、データストリームを取得し、分析変換を構築してデータの結果を生成できます。しかし、時系列データの場合、データの欠如も有用な情報です。データがない場合、どのように結果を生成できるのでしょうか?

要件を説明するために、より具体的な例を使用しましょう。IoT デバイスから毎分到着するイベント数を合計する単純なパイプラインがあると想像してください。特定の時間間隔内にデータが見られなかった場合に値 0 を生成したいと思います。なぜこれが難しいのでしょうか?到着するイベントをカウントする単純なパイプラインを構築するのは簡単ですが、イベントがない場合、カウントするものがありません!

一緒に処理する単純なパイプラインを構築しましょう。

  // We will start our timer at 1 sec from the fixed upper boundary of our
  // minute window
  Instant now = Instant.parse("2000-01-01T00:00:59Z");

  // ----- Create some dummy data

  // Create 3 elements, incrementing by 1 minute and leaving a time gap between
  // element 2 and element 3
  TimestampedValue<KV<String, Integer>> time_1 =
    TimestampedValue.of(KV.of("Key_A", 1), now);

  TimestampedValue<KV<String, Integer>> time_2 =
    TimestampedValue.of(KV.of("Key_A", 2),
    now.plus(Duration.standardMinutes(1)));

  // No Value for start time + 2 mins
  TimestampedValue<KV<String, Integer>> time_3 =
    TimestampedValue.of(KV.of("Key_A", 3),
    now.plus(Duration.standardMinutes(3)));

  // Create pipeline
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
    .as(PipelineOptions.class);

  Pipeline p = Pipeline.create(options);

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3))
   .apply(
      Window.<KV<String,Integer>>into(
FixedWindows.<Integer>of(Duration.standardMinutes(1))))
        .apply(Sum.integersPerKey())
        .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

          @ProcessElement public void process(ProcessContext c) {
            LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());
          }
       }));

  p.run();

そのパイプラインを実行すると、次の出力が生成されます。

INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z

注:出力の順序は期待どおりではありませんが、キーとウィンドウのタプルは正しく計算されています。

期待どおり、ウィンドウの最小値と最大値の間のタイムスタンプを持つデータポイントがあった各インターバルウィンドウに出力が表示されます。タイムスタンプ 00:00:59、00:01:59、および 00:03:59 にデータポイントがあり、次のインターバルウィンドウに分類されました。

  • [00:00:00, 00:00:59.999)
  • [00:01:00, 00:01:59.999)
  • [00:03:00, 00:03:59.999)

しかし、00:02:00 と 00:02:59 の間にデータがなかったため、インターバルウィンドウ [00:02:00,00:02:59.999) には値が生成されません。

Beam にその欠落しているウィンドウの値を出力させるにはどうすればよいでしょうか?まず、タイマー API を使用しないいくつかのオプションを見ていきましょう。

オプション 1:外部ハートビート

外部システムを使用して、各時間間隔の値を発行し、Beam が消費するデータストリームに挿入できます。この単純なオプションにより、Beam パイプラインから複雑さを排除できます。しかし、外部システムを使用するということは、このシステムを監視し、Beam パイプラインと同時に他のメンテナンスタスクを実行する必要があることを意味します。

オプション 2:Beam パイプラインで生成されたソースを使用する

このコードスニペットを使用して値を発行する生成ソースを使用できます。

pipeline.apply(GenerateSequence.
            from(0).withRate(1,Duration.standardSeconds(1L)))

その後、

  1. DoFn を使用して値をゼロに変換します。
  2. この値を実際のソースとフラット化します。
  3. 各時間間隔にティックを持つ PCollection を生成します。

これは、各時間間隔に値を生成する簡単な方法でもあります。

オプション 1 と 2:複数のキーの問題

オプション 1 と 2 はどちらも、パイプラインが単一のキーを処理する場合にうまく機能します。今度は、1 つの IoT デバイスではなく、それぞれが固有のキーを持つ数千または数十万のデバイスがある場合を処理しましょう。このシナリオでオプション 1 またはオプション 2 を機能させるには、追加のステップを実行する必要があります。FanOut DoFn を作成します。各ティックはすべての潜在的なキーに配布される必要があるため、ダミー値を取得して、利用可能なすべてのキーのキーと値のペアを生成する FanOut DoFn を作成する必要があります。

たとえば、3 つの IoT デバイスの 3 つのキー {key1、key2、key3} があると仮定しましょう。GenerateSequence から最初の要素を取得したときにオプション 2 で概説した方法を使用するには、DoFn にループを追加して 3 つのキーと値のペアを生成する必要があります。これらのペアは、各 IoT デバイスのハートビート値になります。

そして、動的に変化するキーのリストを持つ多くの IoT デバイスを処理する必要がある場合、状況はさらに面白くなります。Distinct 操作を実行して生成されたデータをサイド入力として FanOut DoFn にフィードする変換を追加する必要があります。

オプション 3:Beam タイマーを使用してハートビートを実装する

では、タイマーはどのように役立つのでしょうか?新しい変換を見てみましょう。

編集:最小値チェックを許可するために、ループタイマー状態がブール値から Long に変更されました。

public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {

    Instant stopTimerTime;

    LoopingStatefulTimer(Instant stopTime){
      this.stopTimerTime = stopTime;
    }

    @StateId("loopingTimerTime")
    private final StateSpec<ValueState<Long>> loopingTimerTime =
        StateSpecs.value(BigEndianLongCoder.of());

    @StateId("key")
    private final StateSpec<ValueState<String>> key =
        StateSpecs.value(StringUtf8Coder.of());

    @TimerId("loopingTimer")
    private final TimerSpec loopingTimer =
        TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key,
        @StateId("loopingTimerTime") ValueState<Long> loopingTimerTime,
        @TimerId("loopingTimer") Timer loopingTimer) {

      // If the timer has been set already, or if the value is smaller than
      // the current element + window duration, do not set
      Long currentTimerValue = loopingTimerTime.read();
      Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1));

      if (currentTimerValue == null || currentTimerValue >
          nextTimerTimeBasedOnCurrentElement.getMillis()) {
        loopingTimer.set(nextTimerTimeBasedOnCurrentElement);
        loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis());
      }

      // We need this value so that we can output a value for the correct key in OnTimer
      if (key.read() == null) {
        key.write(c.element().getKey());
      }

      c.output(c.element());
    }

    @OnTimer("loopingTimer")
    public void onTimer(
        OnTimerContext c,
        @StateId("key") ValueState<String> key,
        @TimerId("loopingTimer") Timer loopingTimer) {

      LOG.info("Timer @ {} fired", c.timestamp());
      c.output(KV.of(key.read(), 0));

      // If we do not put in a “time to live” value, then the timer would loop forever
      Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
      if (nextTimer.isBefore(stopTimerTime)) {
        loopingTimer.set(nextTimer);
      } else {
        LOG.info(
            "Timer not being set as exceeded Stop Timer value {} ",
            stopTimerTime);
      }
    }
  }

状態 API が保持する必要があるデータ値は 2 つあります。

  1. タイマーが既に実行中の場合はリセットされないようにするブール値 `timeRunning`。
  2. 後で `OnTimer` イベントで必要になるキーを保存できる「キー」状態オブジェクト値。

また、ID `**loopingTimer**` を持つタイマーがあり、インターバルアラームクロックとして機能します。タイマーは *イベントタイマー* です。パイプラインの実行時の経過時間ではなく、ウォーターマークに基づいて発火します。

次に、@ProcessElement ブロックで何が起こっているのかを解明しましょう。

このブロックに最初に来る要素は、

  1. `timerRunner` の状態を True に設定します。
  2. キーと値のペアからキーの値をキー StateValue に書き込みます。
  3. このコードは、要素のタイムスタンプの 1 分後にタイマーが発火するように設定します。このタイムスタンプに許可される最大値は XX:XX:59.999 です。これにより、最大アラーム値が次の時間間隔の上限に配置されます。
  4. 最後に、`c.output` を使用して `@ProcessElement` ブロックからのデータを出力します。

@OnTimer ブロックでは、次のことが発生します。

  1. このコードは、キー StateValue から取得したキーと値 0 を持つ値を出力します。イベントのタイムスタンプは、タイマー発火のイベント時間に相当します。
  2. `stopTimerTime` 値を超えている場合を除き、1 分後に新しいタイマーを設定します。ユースケースでは通常、より複雑な停止条件がありますが、ここでは単純な条件を使用して、説明するコードをシンプルに保っています。停止条件のトピックについては、後で詳しく説明します。

これで完了です。変換をパイプラインに追加しましょう。

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3)).apply(
    Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1))))
    // We use a combiner to reduce the number of calls in keyed state
    // from all elements to 1 per FixedWindow
    .apply(Sum.integersPerKey())
    .apply(Window.into(new GlobalWindows()))
    .apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z"))))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

      @ProcessElement public void process(ProcessContext c) {

        LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());

     }
  }));
  1. パイプラインの最初の部分では、FixedWindows を作成し、キーごとの値を単一の Sum に減らします。
  2. 次に、出力を GlobalWindow に再ウィンドウします。状態とタイマーはウィンドウごとであるため、ウィンドウ境界内で設定する必要があります。ループタイマーはすべての固定ウィンドウにまたがるようにしたいので、グローバルウィンドウで設定します。
  3. 次に、LoopingStatefulTimer DoFn を追加します。
  4. 最後に、FixedWindows を再適用し、値を合計します。

このパイプラインは、パイプラインのソースがインターバルウィンドウの最小境界と最大境界に値を出力した場合でも、各インターバルウィンドウに値 0 が存在することを保証します。つまり、データの欠如をマークできます。

複数の `Sum.integersPerKey` を持つ 2 つの reducer を使用する理由について疑問に思うかもしれません。なぜ 1 つだけを使用しないのでしょうか?機能的には、1 つを使用しても正しい結果が生成されます。しかし、2 つの `Sum.integersPerKey` を配置すると、パフォーマンス上の利点が得られます。要素の数を時間間隔ごとに 1 つに減らすことができます。これにより、`@ProcessElement` 呼び出し中に状態 API を読み取る回数を減らすことができます。

変更されたパイプラインを実行したときのログ出力は次のとおりです。

INFO  LoopingTimer  - Timer @ 2000-01-01T00:01:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:02:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:03:59.999Z fired
INFO  LoopingTimer  - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z
INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z

やった!ソースデータセットにそのインターバルに要素がない場合でも、時間間隔 [00:01:00, 00:01:59.999) の出力が得られました。

このブログでは、時系列ユースケースに関する興味深い領域の 1 つを網羅し、タイマー API の高度なユースケースを含むいくつかのオプションを説明しました。楽しいループを皆さんに!

**注:**ループタイマーはタイマー API の興味深い新しいユースケースであり、ランナーは高度な機能セットをすべて備えてそれをサポートする必要があります。DirectRunner を使用して、このパターンを今日から実験できます。他のランナーについては、本番環境でこのユースケースを処理するためのサポートに関するリリースノートを確認してください。

(機能マトリックス)

ランナー固有の注記:Google Cloud Dataflow ランナーのドレイン機能は、ループタイマーをサポートしていません(マトリックスへのリンク)