WithTimestamps
コレクションのすべての要素にタイムスタンプを割り当てます。
例
次の例では、PCollection
を含むパイプラインを作成し、その各要素にタイムスタンプ値をアタッチします。ウィンドウ処理と遅延データがストリーミングパイプラインで重要な役割を果たす場合、タイムスタンプは特に役立ちます。
例1:イベント時間によるタイムスタンプ
要素自体には、多くの場合、すでにタイムスタンプフィールドが含まれています。beam.window.TimestampedValue
は、値とUnixタイムスタンプを秒単位で受け取ります。
time.struct_time
からunix_time
に変換するには、time.mktime
を使用できます。時間の形式設定オプションの詳細については、time.strftime
を参照してください。
datetime.datetime
からunix_time
に変換するには、まずdatetime.timetuple
を使用してtime.struct_time
に変換できます。
例2:論理クロックによるタイムスタンプ
各要素に時系列番号がある場合、これらの番号を論理クロックとして使用できます。これらの番号は、特にウィンドウ処理と遅延データルールに応じて、「秒」相当に変換する必要があります。
例3:処理時間によるタイムスタンプ
要素に時間データがない場合は、各要素の現在の処理時間を使用することもできます。これは、各要素を処理しているワーカーのローカル時間を取得することに注意してください。ワーカーは時間デルタを持つ可能性があるため、この方法を使用することは正確な順序付けを行うための信頼できる方法ではありません。
処理時間を使用すると、要素がパイプラインに入るときにタイムスタンプがアタッチされるため、データが遅れて到着しているかどうかを知る方法はありません。
関連する変換
- Reifyは、Beam値の明示的な形式と暗黙的な形式を変換します。
最終更新日:2024/10/31
お探しの情報はすべて見つかりましたか?
すべて有用で明確でしたか?変更したい点はありますか?ぜひお知らせください!