共有オブジェクトを使用したキャッシュデータ

キャッシュとは、データの今後のリクエストをより迅速に処理できるようにデータを格納するソフトウェアコンポーネントです。キャッシュにアクセスするには、サイド入力、ステートフルなDoFn、外部サービスの呼び出しを使用できます。Python SDKは、共有モジュールに別のオプションを提供します。このオプションは、サイド入力よりもメモリ効率が高く、ステートフルなDoFnよりもシンプルで、要素または要素のバンドルごとに外部サービスにアクセスする必要がないため、外部サービスの呼び出しよりもパフォーマンスが高くなります。Beam SDKを使用したデータのキャッシュ戦略の詳細については、2022 Beam SummitのセッションDataflowでBeam SDKを使用してデータをキャッシュするための戦略を参照してください。

このページの例では、共有モジュールSharedクラスを使用して、境界付きおよび境界なしのPCollectionオブジェクトの要素をエンリッチする方法を示しています。サンプルでは、*注文*と*顧客*の2つのデータセットが使用されます。注文レコードには、顧客属性が顧客レコードのマッピングによって追加される顧客IDが含まれています。

バッチパイプラインにキャッシュを作成する

この例では、顧客キャッシュはEnrichOrderFnsetupメソッドで辞書としてロードされます。キャッシュは、注文レコードに顧客属性を追加するために使用されます。Pythonの辞書は弱参照をサポートしておらず、Sharedオブジェクトは共有リソースのシングルトンインスタンスへの弱参照をカプセル化しているため、ラッパークラスを作成します。

# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
    pass

class EnrichOrderFn(beam.DoFn):
    def __init__(self):
        self._customers = {}
        self._shared_handle = shared.Shared()

    def setup(self):
        # setup is a good place to initialize transient in-memory resources.
        self._customer_lookup = self._shared_handle.acquire(self.load_customers)

    def load_customers(self):
        self._customers = expensive_remote_call_to_load_customers()
        return WeakRefDict(self._customers)

    def process(self, element):
        attr = self._customer_lookup.get(element["customer_id"], {})
        yield {**element, **attr}

ストリーミングパイプラインにキャッシュを作成し、定期的に更新する

顧客キャッシュは時間の経過とともに変化すると想定されるため、定期的に更新する必要があります。共有オブジェクトをリロードするには、acquireメソッドのtag引数を変更します。この例では、更新はstart_bundleメソッドに実装されており、現在のタグ値を既存の共有オブジェクトに関連付けられている値と比較します。 set_tagメソッドは、最大経過時間秒以内で同じタグ値を返します。したがって、タグ値が既存のタグ値よりも大きい場合、顧客キャッシュの更新がトリガーされます。

# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
    pass

class EnrichOrderFn(beam.DoFn):
    def __init__(self):
        self._max_stale_sec = 60
        self._customers = {}
        self._shared_handle = shared.Shared()

    def setup(self):
        # setup is a good place to initialize transient in-memory resources.
        self._customer_lookup = self._shared_handle.acquire(
            self.load_customers, self.set_tag()
        )

    def set_tag(self):
        # A single tag value is returned within a period, which is upper-limited by the max stale second.
        current_ts = datetime.now().timestamp()
        return current_ts - (current_ts % self._max_stale_sec)

    def load_customers(self):
        # Assign the tag value of the current period for comparison.
        self._customers = expensive_remote_call_to_load_customers(tag=self.set_tag())
        return WeakRefDict(self._customers)

    def start_bundle(self):
        # Update the shared object when the current tag value exceeds the existing value.
        if self.set_tag() > self._customers["tag"]:
            self._customer_lookup = self._shared_handle.acquire(
                self.load_customers, self.set_tag()
            )

    def process(self, element):
        attr = self._customer_lookup.get(element["customer_id"], {})
        yield {**element, **attr}