カスタムウィンドウパターン

このページのサンプルは、一般的なカスタムウィンドウパターンを示しています。 WindowFn関数 を使用してカスタムウィンドウを作成できます。詳細については、ウィンドウイングに関するプログラミングガイドのセクションを参照してください。

注記:カスタムマージウィンドウはPython(fnapiを使用)ではサポートされていません。

データを使用してセッションウィンドウのギャップを動的に設定する

assignWindows関数を変更してデータ駆動型ギャップを使用し、受信データをセッションにウィンドウ化できます。

WindowFn.AssignContext.element()を介してassignWindows関数にアクセスします。元の固定期間のassignWindows関数は次のとおりです。

  public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) {

    // Assign each element into a window from its timestamp until gapDuration in the
    // future.  Overlapping windows (representing elements within gapDuration of
    // each other) will be merged.
    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
  }

データ駆動型ギャップの作成

データ駆動型ギャップを作成するには、次のスニペットをassignWindows関数に追加します。

たとえば、次の関数は、タイムスタンプとgapDurationの間のウィンドウに各要素を割り当てます。

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  Duration dataDrivenGap;
  TableRow message = c.element();

  try {
    dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString()));
  } catch (Exception e) {
    dataDrivenGap = gapDuration;
  }
  return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}

次に、ウィンドウイング関数でgapDurationフィールドを設定します。

public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> {
  /** Duration of the gaps between sessions. */
  private final Duration gapDuration;

  /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
  private DynamicSessions(Duration gapDuration) {
    this.gapDuration = gapDuration;
  }

セッションへのウィンドウイングメッセージ

データ駆動型ギャップを作成した後、新しいカスタムセッションに受信データをウィンドウ化できます。

まず、セッションの長さをギャップ時間 duration に設定します。

/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
  return new DynamicSessions(gapDuration);
}

最後に、パイプラインでデータをセッションにウィンドウ化します。

p.apply(
    "Window into sessions",
    Window.<TableRow>into(
        DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));

サンプルデータとウィンドウ

次のテストデータは、gap属性の有無にかかわらず、2人のユーザーのスコアを集計します。

.apply("Create data", Create.timestamped(
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"4\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
            TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
            TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"10\"}", new Instant().plus(12000)))
        .withCoder(StringUtf8Coder.of()))

下の図は、テストデータを視覚化したものです。

Two sets of data and the standard and dynamic sessions with which the data is windowed.

標準セッション

標準セッションは、次のウィンドウとスコアを使用します。

user=user-2, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=user-1, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=user-2, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)

ユーザー#1は、12秒間隔で分離された2つのイベントを確認します。標準セッションでは、ギャップはデフォルトで10秒です。両方のスコアは異なるセッションにあるため、スコアは追加されません。

ユーザー#2は、それぞれ2秒、7秒、3秒間隔で分離された4つのイベントを確認します。ギャップのいずれもデフォルトを超えていないため、4つのイベントは同じ標準セッションに含まれ、合計されます(18ポイント)。

動的セッション

動的セッションは5秒間のギャップを指定するため、次のウィンドウとスコアを使用します。

user=user-2, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=user-1, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=user-1, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=user-2, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)

動的セッションでは、ユーザー#2は異なるスコアを取得します。3番目のメッセージは2番目のメッセージの7秒後に到着するため、異なるセッションにグループ化されます。18ポイントの大きなセッションは、2つの9ポイントのセッションに分割されます。