マルチモデルパイプライン

Apache Beamを使用すると、マルチモデルパイプラインを開発できます。この例では、入力データを取り込み変換し、モデルを介して実行し、最初のモデルの結果を2番目のモデルに渡す方法を示します。このページでは、マルチモデルパイプラインがどのように機能するかを説明し、構築するために知っておくべきことの概要を示します。

このセクションを読む前に、「パイプライン開発ライフサイクル」の情報に精通しておくことをお勧めします。

Beamでマルチモデルパイプラインを構築する方法

一般的な機械学習ワークフローには、データ取り込み、データ処理タスク、推論、後処理など、一連のデータ変換ステップが含まれます。Apache Beamを使用すると、それらのステップすべてを単一のApache Beam有向非巡回グラフ(DAG)にカプセル化することで、それらを一緒に調整できます。これにより、回復力がありスケーラブルなエンドツーエンドの機械学習システムを構築できます。

Apache Beamパイプラインに機械学習モデルをデプロイするには、DAGのPTransformステップとしてのモデルの統合を容易にするRunInferenceAPIを使用します。単一のDAG内で複数のRunInference変換を構成すると、複数のMLモデルで構成されるパイプラインを構築できます。このようにして、Apache Beamは複雑なMLシステムの開発をサポートします。

Apache Beamでマルチモデルパイプラインを構築するために、さまざまなパターンを使用できます。このページでは、A/Bパターンとカスケードパターンについて説明します。

A/Bパターン

A/Bパターンは、複数のMLモデルが並行して実行されているフレームワークを表します。このパターンの1つのアプリケーションは、さまざまな機械学習モデルのパフォーマンスをテストし、新しいモデルが既存のモデルよりも改善されているかどうかを判断することです。これは「チャンピオン/チャレンジャー」メソッドとしても知られています。通常、制御モデルのパフォーマンスと現在のモデルのパフォーマンスを比較するためのビジネスメトリックを定義します。

例としては、ユーザーの好みやアクティビティ履歴に基づいて広告を推奨する既存のモデルを持つレコメンデーションエンジンモデルが考えられます。新しいモデルをデプロイする場合、受信ユーザーのトラフィックを2つのブランチに分割し、半分のユーザーは新しいモデルに、残りの半分は現在のモデルに接触させることができます。

その後、定義された期間にわたって両方のユーザーセットの広告の平均クリックスルー率(CTR)を測定して、新しいモデルが既存のモデルよりもパフォーマンスが高いかどうかを判断できます。

import apache_beam as beam

with beam.Pipeline() as pipeline:
   userset_a_traffic, userset_b_traffic =
     (pipeline | 'ReadFromStream' >> beam.ReadFromStream('stream_source')
               | ‘Partition’ >> beam.partition(split_dataset, 2, ratio=[5, 5])
     )

model_a_predictions = userset_a_traffic | RunInference(<model_handler_A>)
model_b_predictions = userset_b_traffic | RunInference(<model_handler_B>)

ここで、beam.partitionは、データソースを50/50分割のパーティションに分割するために使用されます。データパーティショニングの詳細については、「Partition」を参照してください。

カスケードパターン

カスケードパターンは、問題の解決策に一連のMLモデルが含まれる場合に使用されます。このシナリオでは、モデルの出力は通常、別のモデルに渡す前に、PTransformを使用して適切な形式に変換されます。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.ParDo(post_processing()) | RunInference(<model_handler_B>)

イメージキャプション生成とランキングの例を使用したアンサンブルモデル」のノートブックでは、イメージキャプションを生成およびランク付けするために使用されるカスケードパイプラインのエンドツーエンドの例を示しています。このソリューションは、2つのオープンソースモデルで構成されています。

  1. 入力画像から候補画像キャプションを生成するキャプション生成モデル(BLIP
  2. 画像と候補キャプションを使用して、画像の説明に最適な順序でキャプションをランク付けするキャプションランキングモデル(CLIP

複数の異なるトレーニング済みのモデルを使用する

KeyedModelHandlerを使用して、いくつかの異なるモデルをRunInference変換にロードできます。関連付けられたキーを使用して、どのモデルをどのデータで使用するかを決定します。次の例では、config1を使用してモデルをロードします。そのモデルは、key1に関連付けられたすべての例の推論に使用されます。config2を使用して2番目のモデルをロードします。そのモデルは、key2key3に関連付けられたすべての例に使用されます。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

詳細な例については、「複数の異なるトレーニング済みモデルを使用したML推論の実行」のノートブックを参照してください。

同時に複数のモデルをロードすると、メモリ不足エラー(OOM)のリスクが高まります。デフォルトでは、KeyedModelHandlerは、同時にメモリにロードされるモデルの数を制限しません。モデルがすべてメモリに収まらない場合、パイプラインはメモリ不足エラーで失敗する可能性があります。この問題を回避するには、max_models_per_worker_hintパラメーターを使用して、同時にメモリにロードできるモデルの最大数を設定します。

次の例では、SDKワーカープロセスごとに最大2つのモデルを一度にロードします。現在使用されていないモデルはアンロードします。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

特定のマシン上に複数のSDKワーカープロセスがあるランナーは、マシンに最大でmax_models_per_worker_hint*<ワーカープロセス数>モデルをロードします。

モデルと他の変換からの追加のメモリニーズのために十分なスペースを確保します。モデルがオフロードされた後、メモリがすぐに解放されない可能性があるため、追加のバッファを残すことをお勧めします。

:モデルが多くてもmax_models_per_worker_hintが小さいと、メモリのスラッシングが発生する可能性があります。これは、メモリとの間でモデルをスワップするために大量の実行時間が使用されます。メモリのスラッシングの可能性と影響を減らすために、分散ランナーを使用している場合は、推論ステップの前にGroupByKey変換を挿入します。GroupByKey変換は、同じキーとモデルを持つ要素が同じワーカー上に配置されるようにすることで、スラッシングを削減します。

詳細については、KeyedModelHanderを参照してください。