WithTimestamps

コレクションのすべての要素にタイムスタンプを割り当てます。

次の例では、PCollectionを含むパイプラインを作成し、その各要素にタイムスタンプ値をアタッチします。ウィンドウ処理と遅延データがストリーミングパイプラインで重要な役割を果たす場合、タイムスタンプは特に役立ちます。

例1:イベント時間によるタイムスタンプ

要素自体には、多くの場合、すでにタイムスタンプフィールドが含まれています。beam.window.TimestampedValueは、値とUnixタイムスタンプを秒単位で受け取ります。

time.struct_timeからunix_timeに変換するには、time.mktimeを使用できます。時間の形式設定オプションの詳細については、time.strftimeを参照してください。

import time

time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
unix_time = time.mktime(time_tuple)

datetime.datetimeからunix_timeに変換するには、まずdatetime.timetupleを使用してtime.struct_timeに変換できます。

import time
import datetime

now = datetime.datetime.now()
time_tuple = now.timetuple()
unix_time = time.mktime(time_tuple)

例2:論理クロックによるタイムスタンプ

各要素に時系列番号がある場合、これらの番号を論理クロックとして使用できます。これらの番号は、特にウィンドウ処理と遅延データルールに応じて、「秒」相当に変換する必要があります。

例3:処理時間によるタイムスタンプ

要素に時間データがない場合は、各要素の現在の処理時間を使用することもできます。これは、各要素を処理しているワーカーのローカル時間を取得することに注意してください。ワーカーは時間デルタを持つ可能性があるため、この方法を使用することは正確な順序付けを行うための信頼できる方法ではありません。

処理時間を使用すると、要素がパイプラインに入るときにタイムスタンプがアタッチされるため、データが遅れて到着しているかどうかを知る方法はありません。