Beam MLについて

Pydoc Pydoc




Javadoc Javadoc

Apache Beamを使用して以下のことができます

AI/MLワークロード

Apache Beamは、データ検証、データ前処理、モデル検証、モデルのデプロイと推論に使用できます。

Overview of AI/ML building blocks and where Apache Beam can be used

  1. データの取り込み:受信した新しいデータは、ファイルシステムまたはデータベースに保存されるか、メッセージキューに公開されます。
  2. **データ検証**:データを受信した後、データの品質を確認します。たとえば、外れ値を検出し、標準偏差とクラス分布を計算することができます。
  3. **データの前処理**:データの検証後、モデルのトレーニングに使用できるようにデータを変換します。
  4. モデルのトレーニング:データの準備ができたら、AI/MLモデルをトレーニングします。この手順は、トレーニングされたモデルの品質に応じて、通常複数回繰り返されます。
  5. モデル検証:モデルをデプロイする前に、そのパフォーマンスと精度を検証します。
  6. **モデルのデプロイ**:モデルをデプロイし、それを使用して新しいデータまたは既存のデータに対して推論を実行します。

データの増加と進化に合わせてモデルを最新の状態に保ち、良好なパフォーマンスを維持するには、これらの手順を複数回実行します。さらに、ML opsをプロジェクトに適用して、モデルとデータのライフサイクル全体でAI/MLワークフローを自動化できます。オーケストレーターを使用してこのフローを自動化し、プロジェクト内のさまざまなビルディングブロック間の遷移を処理します。

RunInferenceの使用

RunInference APIは、機械学習推論用に最適化された`PTransform`であり、パイプラインでMLモデルを効率的に使用できます。APIには次の機能が含まれています

サポートと制限事項

BatchElements PTransform

多くのモデルが実装しているベクトル化推論の最適化を利用するために、`BatchElements`変換は、モデルの予測を行う前の中間ステップとして使用されます。この変換は、要素をまとめてバッチ処理します。次に、バッチ処理された要素にRunInferenceの特定のフレームワークの変換が適用されます。たとえば、numpy `ndarrays`の場合は`numpy.stack()`を呼び出し、torch `Tensor`要素の場合は`torch.stack()`を呼び出します。

`beam.BatchElements`の設定をカスタマイズするには、`ModelHandler`で`batch_elements_kwargs`関数をオーバーライドします。たとえば、`min_batch_size`を使用してバッチあたりの最小要素数を設定するか、`max_batch_size`を使用してバッチあたりの最大要素数を設定します。

詳細については、`BatchElements`変換のドキュメントをご覧ください。

共有ヘルパークラス

RunInference実装内で`Shared`クラスを使用すると、プロセスごとにモデルを1回だけロードし、そのプロセスで作成されたすべてのDoFnインスタンスと共有することができます。この機能により、メモリ消費量とモデルのロード時間が削減されます。詳細については、`Shared`クラスのドキュメントをご覧ください。

MLモデルを使用するようにPythonパイプラインを変更する

RunInference変換を使用するには、次のコードをパイプラインに追加します

from apache_beam.ml.inference.base import RunInference
with pipeline as p:
   predictions = ( p |  'Read' >> beam.ReadFromSource('a_source')
                     | 'RunInference' >> RunInference(<model_handler>)

`model_handler`をモデルハンドラーのセットアップコードに置き換えます。

モデルをインポートするには、基になるモデルをラップする`ModelHandler`オブジェクトを設定する必要があります。インポートするモデルハンドラーは、フレームワークと、入力が含まれているデータ構造体の種類によって異なります。 `ModelHandler`オブジェクトでは、`env_vars`キーワード引数を使用して、推論に必要な環境変数を設定することもできます。次の例は、インポートする可能性のあるモデルハンドラーを示しています。

from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler

事前トレーニング済みモデルの使用

このセクションでは、PyTorch、Scikit-learn、およびTensorflowで事前学習済みモデルを使用するための要件について説明します。

PyTorch

モデルの保存された重みが含まれるファイルへのパスを提供する必要があります。このパスはパイプラインからアクセスできる必要があります。 RunInference APIとPyTorchフレームワークで事前学習済みモデルを使用するには、次の手順を実行します。

  1. 事前学習済みの重みをダウンロードし、パイプラインがアクセスできる場所にホストします。
  2. 次のコードを使用して、モデルの重みのパスをPyTorchのModelHandlerに渡します。state_dict_path=<重みへのパス>

Apache BeamでPyTorchモデルを実行する方法を示すこのノートブックを参照してください。

Scikit-learn

pickle化されたScikit-learnモデルを含むファイルへのパスを提供する必要があります。このパスはパイプラインからアクセスできる必要があります。 RunInference APIとScikit-learnフレームワークで事前学習済みモデルを使用するには、次の手順を実行します。

  1. pickle化されたモデルクラスをダウンロードし、パイプラインがアクセスできる場所にホストします。
  2. 次のコードを使用して、モデルのパスをSklearnのModelHandlerに渡します。model_uri=<pickle化されたファイルへのパス> および model_file_type: <ModelFileType>。モデルのシリアル化方法に応じて、ModelFileType.PICKLE または ModelFileType.JOBLIB を指定できます。

Apache BeamでScikit-learnモデルを実行する方法を示すこのノートブックを参照してください。

TensorFlow

RunInference APIでTensorFlowを使用するには、2つのオプションがあります。

  1. Apache Beam SDKに組み込まれているTensorFlow Model Handler(TFModelHandlerNumpyTFModelHandlerTensor)を使用します。
    • モデルの入力タイプに応じて、numpy入力にはTFModelHandlerNumpyを、tf.Tensor入力にはTFModelHandlerTensorをそれぞれ使用します。
    • tensorflow 2.7以降を使用します。
    • model_uri=<学習済みモデルへのパス>を使用して、モデルのパスをTensorFlowのModelHandlerに渡します。
    • または、学習済みモデルの保存された重みへのパス、create_model_fn=<関数>を使用してモデルを構築する関数、およびmodel_type=ModelType.SAVED_WEIGHTSを設定することもできます。組み込みのモデルハンドラーでTensorflowモデルを実行する方法を示すこのノートブックを参照してください。
  2. tfx_bslを使用する。
    • モデル入力がtf.Exampleタイプの場合は、このアプローチを使用します。
    • tfx_bslバージョン1.10.0以降を使用します。
    • tfx_bsl.public.beam.run_inference.CreateModelHandler()を使用してモデルハンドラーを作成します。
    • apache_beam.ml.inference.base.RunInference変換でモデルハンドラーを使用します。Apache Beamとtfx-bslでTensorFlowモデルを実行する方法を示すこのノートブックを参照してください。

カスタムモデルの使用

サポートされているフレームワークのいずれかで指定されていないモデルを使用する場合、RunInference APIは、任意のカスタム機械学習モデルを使用できるように柔軟に設計されています。モデルを読み込んで推論を実行するために使用するロジックを使用して、独自のModelHandlerまたはKeyedModelHandlerを作成するだけで済みます。

簡単な例はこのノートブックにあります。 load_modelメソッドは、一般的なspaCyパッケージを使用してモデルを読み込む方法を示し、run_inferenceは、 examplesのバッチで推論を実行する方法を示しています。.

RunInferenceパターン

このセクションでは、推論パイプラインをよりシンプルに、より堅牢に、そしてより効率的にするために使用できるパターンとベストプラクティスを提案します。

キー付きModelHandlerオブジェクトの使用

examplesにキーが添付されている場合は、KeyedModelHandlerModelHandlerオブジェクトでラップします。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
   data = p | beam.Create([
      ('img1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

データにキーが付けられているかどうかわからない場合は、MaybeKeyedModelHandlerを使用できます。

また、KeyedModelHandlerを使用して、関連付けられたキーに基づいていくつかの異なるモデルを読み込むこともできます。次の例では、config1を使用してモデルを読み込みます。そのモデルは、key1に関連付けられたすべての examplesの推論に使用されます。 config2を使用して2番目のモデルを読み込みます。そのモデルは、key2およびkey3に関連付けられたすべての examplesに使用されます。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

より詳細な例については、ノートブック複数の異なるトレーニング済みモデルでML推論を実行するを参照してください。

複数のモデルを同時にロードすると、メモリ不足エラー(OOM)のリスクが高まります。デフォルトでは、KeyedModelHandlerはメモリにロードされるモデルの数を制限しません同時に。モデルがすべてメモリに収まらない場合、パイプラインはメモリ不足エラーで失敗する可能性があります。この問題を回避するには、max_models_per_worker_hintパラメーターを使用して、メモリに同時にロードできるモデルの最大数を設定します。

次の例では、SDKワーカープロセスごとに最大2つのモデルを同時にロードします。現在使用されていないモデルはアンロードされます。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

特定のマシンに複数のSDKワーカープロセスを持つランナーは、マシンに最大max_models_per_worker_hint * <ワーカープロセス数>モデルをロードします。

モデルと他の変換からの追加のメモリニーズのために十分なスペースを残してください。モデルがオフロードされた後、メモリがすぐに解放されない可能性があるため、追加のバッファを残すことをお勧めします。

**注**: 多くのモデルがありますが、max_models_per_worker_hintが小さいと、*メモリのスラッシング*が発生する可能性があります。これは、モデルをメモリに出し入れするために大量の実行時間が使用される場合です。メモリのスラッシングの可能性と影響を軽減するために、分散ランナーを使用している場合は、推論ステップの前にGroupByKey変換を挿入します。 GroupByKey変換は、同じキーとモデルを持つ要素が同じワーカーに配置されるようにすることで、スラッシングを削減します。

詳細については、KeyedModelHanderを参照してください。

PredictionResultオブジェクトの使用

Apache Beamで予測を行う場合、出力PCollectionには、入力 examplesのキーと推論の両方が含まれます。これらの両方の項目を出力に含めると、予測を決定した入力を 찾을 수 있습니다。

PredictionResultオブジェクトは、それぞれexampleinferenceという名前の入力と推論の両方を含むNamedTupleです。キーが入力データとともにRunInference変換に渡されると、出力PCollectionはキーとPredictionResultオブジェクトであるTuple[str, PredictionResult]を返します。パイプラインは、RunInference変換後の手順でPredictionResultオブジェクトと対話します。

class PostProcessor(beam.DoFn):
    def process(self, element: Tuple[str, PredictionResult]):
       key, prediction_result = element
       inputs = prediction_result.example
       predictions = prediction_result.inference

       # Post-processing logic
       result = ...

       yield (key, result)

with pipeline as p:
    output = (
        p | 'Read' >> beam.ReadFromSource('a_source')
                | 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
                | 'ProcessOutput' >> beam.ParDo(PostProcessor()))

このオブジェクトを明示的に使用する必要がある場合は、パイプラインに次の行を含めてオブジェクトをインポートします。

from apache_beam.ml.inference.base import PredictionResult

詳細については、PredictionResultドキュメントを参照してください。

自動モデル更新

パイプラインを停止せずにRunInference PTransformで使用されているモデルを自動的に更新するには、ModelMetadataサイド入力PCollectionをRunInference入力パラメーターmodel_metadata_pcollに渡します。

ModelMetdataは、次のものを含むNamedTupleです。

ユースケース

サイド入力PCollectionは、エラーを回避するためにAsSingletonビューに従う必要があります。

**注**: メインPCollectionが入力を出力し、サイド入力がまだ入力を受信していない場合、サイド入力が更新されるまでメインPCollectionはバッファされます。これは、AfterCountAfterProcessingTimeなどのデータ駆動型トリガーを持つグローバルウィンドウ化されたサイド入力で発生する可能性があります。サイド入力が更新されるまで、それぞれのModelHandlerを渡すために使用されるデフォルトまたは初期モデルIDをサイド入力として出力します。

レコードの前処理と後処理

RunInferenceを使用すると、変換に前処理と後処理の操作を追加できます。前処理操作を適用するには、モデルハンドラーでwith_preprocess_fnを使用します。

inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))

後処理操作を適用するには、モデルハンドラーでwith_postprocess_fnを使用します。

inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))

複数の前処理および後処理操作を連鎖させることもできます。

inference = pcoll | RunInference(
    model_handler.with_preprocess_fn(
      lambda x : do_something(x)
    ).with_preprocess_fn(
      lambda x : do_something_else(x)
    ).with_postprocess_fn(
      lambda x : do_something_after_inference(x)
    ).with_postprocess_fn(
      lambda x : do_something_else_after_inference(x)
    ))

前処理関数は、バッチ処理と推論の前に実行されます。この関数は、入力PCollectionをモデルハンドラーの基本入力タイプにマップします。複数の前処理関数を適用する場合、それらは、最後に適用されたものから最初に適用されたものの順序で元のPCollectionで実行されます。

後処理関数は、推論後に実行されます。この関数は、基本モデルハンドラーの出力タイプを目的の出力タイプにマップします。複数の後処理関数を適用する場合、それらは、最初に適用されたものから最後に適用されたものの順序で元の推論結果で実行されます。

エラー処理

RunInferenceの使用中にエラーを確実に処理するには、*デッドレターキュー*を使用できます。デッドレターキューは、失敗したレコードを別のPCollectionに出力して、さらに処理します。このPCollectionは、分析してストレージシステムに送信し、そこでレビューしてパイプラインに再送信したり、破棄したりできます。 RunInferenceには、デッドレターキューの組み込みサポートがあります。 RunInference変換にwith_exception_handlingを適用することにより、デッドレターキューを使用できます。

main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here

このパターンは、関連する前処理および後処理操作を伴う RunInference 変換にも適用できます。

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied

Javaパイプラインからの推論の実行

RunInference API は、Apache Beam の複数言語パイプラインフレームワークを通じて、Beam Java SDK バージョン 2.41.0 以降で使用できます。Java ラッパー変換については、RunInference.java を参照してください。試してみるには、Java Sklearn Mnist 分類の例を参照してください。さらに、Beam Java SDK パイプラインからの前処理と後処理と共に RunInference API を使用する複合 Python 変換の例については、Java SDK からの RunInference の使用を参照してください。

カスタム推論

RunInference API は、現在、Natural Language API や Cloud Vision API などを使用したリモート推論呼び出しをサポートしていません。したがって、これらのリモート API を Apache Beam で使用するには、カスタム推論呼び出しを作成する必要があります。Apache Beam ノートブックでのリモート推論は、beam.DoFn を使用してカスタム リモート推論呼び出しを実装する方法を示しています。実際のプロジェクトでリモート推論を実装する場合は、次の要素を考慮してください。

マルチモデルパイプライン

RunInference 変換を使用して、パイプラインに複数の推論モデルを追加します。マルチモデルパイプラインは、A/B テストや、トークン化、文分割、品詞タグ付け、固有表現抽出、言語検出、相互参照解決などを実行するモデルで構成されるカスケードモデルの構築に役立ちます。詳細については、マルチモデルパイプラインを参照してください。

A/Bパターン

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = data | RunInference(<model_handler_B>)

ここで、model_handler_Amodel_handler_B はモデルハンドラーのセットアップコードです。

カスケードパターン

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>)

ここで、model_handler_Amodel_handler_B はモデルハンドラーのセットアップコードです。

異なるモデル要件にリソースヒントを使用する

単一のパイプラインで複数のモデルを使用する場合、モデルによってメモリまたはワーカー SKU の要件が異なる場合があります。リソースヒントを使用すると、パイプラインの各ステップの計算リソース要件に関する情報をランナーに提供できます。

たとえば、次のスニペットは、各 RunInference 呼び出しに RAM とハードウェアアクセラレータの要件を指定するためのヒントを追加することで、前のカスケードパターンを拡張しています。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>).with_resource_hints(min_ram="20GB")
   model_b_predictions = model_a_predictions
      | beam.Map(some_post_processing)
      | RunInference(<model_handler_B>).with_resource_hints(
         min_ram="4GB",
         accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")

リソースヒントの詳細については、リソースヒントを参照してください。

モデル検証

モデル検証を使用すると、以前に確認されていないデータセットに対してモデルのパフォーマンスをベンチマークできます。選択したメトリクスを抽出し、視覚化を作成し、メタデータをログに記録し、さまざまなモデルのパフォーマンスを比較して、モデルがデプロイの準備ができているかどうかを検証することを最終目標とします。Beam は、パイプライン内で直接 TensorFlow モデルでモデル評価を実行するためのサポートを提供します。

ML モデル評価ページでは、TensorFlow Model Analysis (TFMA) を使用して、モデル評価をパイプラインの一部として統合する方法を示しています。

トラブルシューティング

パイプラインまたはジョブで問題が発生した場合、このセクションでは発生する可能性のある問題と、それらを修正するための提案を示します。

テンソル要素をバッチ処理できない

RunInference は動的バッチ処理を使用します。ただし、RunInference API は異なるサイズのテンソル要素をバッチ処理できないため、RunInference 変換に渡されるサンプルは同じ次元または長さである必要があります。サイズが異なる画像または長さが異なる単語埋め込みを提供すると、次のエラーが発生する可能性があります。

ファイル "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py"、232 行目、run_inference で batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack は各テンソルのサイズが等しいことを想定していますが、エントリ 0 で [12]、エントリ 1 で [10] が得られました [ 'PyTorchRunInference/ParDo(_RunInferenceDoFn)' の実行中]

この問題を回避するには、同じサイズの要素を使用するか、バッチ処理を無効にします。

オプション 1: 同じサイズの要素を使用する

同じサイズの要素を使用するか、入力のサイズを変更します。コンピュータビジョンアプリケーションの場合は、画像入力をサイズ変更して、同じ次元になるようにします。長さが異なるテキストを持つ自然言語処理 (NLP) アプリケーションの場合は、テキストまたは単語埋め込みのサイズを変更して、同じ長さになるようにします。長さが異なるテキストを扱う場合、サイズ変更は不可能な場合があります。このシナリオでは、バッチ処理を無効にすることができます(オプション 2 を参照)。

オプション 2: バッチ処理を無効にする

ModelHandler で batch_elements_kwargs 関数をオーバーライドし、最大バッチサイズ (max_batch_size) を 1 に設定することにより、バッチ処理を無効にします: max_batch_size=1。詳細については、BatchElements PTransform を参照してください。例については、言語モデリングの例を参照してください。

Pydoc Pydoc




Javadoc Javadoc