カスタムウィンドウパターン
このページのサンプルは、一般的なカスタムウィンドウパターンを示しています。 WindowFn
関数 を使用してカスタムウィンドウを作成できます。詳細については、ウィンドウイングに関するプログラミングガイドのセクションを参照してください。
注記:カスタムマージウィンドウはPython(fnapiを使用)ではサポートされていません。
データを使用してセッションウィンドウのギャップを動的に設定する
assignWindows
関数を変更してデータ駆動型ギャップを使用し、受信データをセッションにウィンドウ化できます。
WindowFn.AssignContext.element()
を介してassignWindows
関数にアクセスします。元の固定期間のassignWindows
関数は次のとおりです。
public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
データ駆動型ギャップの作成
データ駆動型ギャップを作成するには、次のスニペットをassignWindows
関数に追加します。
- カスタムギャップがデータに存在しない場合のデフォルト値
- カスタムウィンドウのメソッドとして、メインパイプラインから属性を設定する方法
たとえば、次の関数は、タイムスタンプとgapDuration
の間のウィンドウに各要素を割り当てます。
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
Duration dataDrivenGap;
TableRow message = c.element();
try {
dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString()));
} catch (Exception e) {
dataDrivenGap = gapDuration;
}
return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}
次に、ウィンドウイング関数でgapDuration
フィールドを設定します。
public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> {
/** Duration of the gaps between sessions. */
private final Duration gapDuration;
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
private DynamicSessions(Duration gapDuration) {
this.gapDuration = gapDuration;
}
セッションへのウィンドウイングメッセージ
データ駆動型ギャップを作成した後、新しいカスタムセッションに受信データをウィンドウ化できます。
まず、セッションの長さをギャップ時間 duration に設定します。
最後に、パイプラインでデータをセッションにウィンドウ化します。
サンプルデータとウィンドウ
次のテストデータは、gap
属性の有無にかかわらず、2人のユーザーのスコアを集計します。
.apply("Create data", Create.timestamped(
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"4\"}", new Instant()),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
TimestampedValue.of("{\"user\":\"user-1\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
TimestampedValue.of("{\"user\":\"user-2\",\"score\":\"10\"}", new Instant().plus(12000)))
.withCoder(StringUtf8Coder.of()))
下の図は、テストデータを視覚化したものです。
標準セッション
標準セッションは、次のウィンドウとスコアを使用します。
user=user-2, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=user-1, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=user-2, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)
ユーザー#1は、12秒間隔で分離された2つのイベントを確認します。標準セッションでは、ギャップはデフォルトで10秒です。両方のスコアは異なるセッションにあるため、スコアは追加されません。
ユーザー#2は、それぞれ2秒、7秒、3秒間隔で分離された4つのイベントを確認します。ギャップのいずれもデフォルトを超えていないため、4つのイベントは同じ標準セッションに含まれ、合計されます(18ポイント)。
動的セッション
動的セッションは5秒間のギャップを指定するため、次のウィンドウとスコアを使用します。
user=user-2, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=user-1, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=user-1, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=user-2, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)
動的セッションでは、ユーザー#2は異なるスコアを取得します。3番目のメッセージは2番目のメッセージの7秒後に到着するため、異なるセッションにグループ化されます。18ポイントの大きなセッションは、2つの9ポイントのセッションに分割されます。
最終更新日:2024年10月31日
必要な情報は見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したい点があれば教えてください!