Beamでの大規模言語モデル推論

Apache Beam 2.40.0では、Beamパイプラインに機械学習モデルをデプロイできるRunInference APIが導入されました。 RunInference変換は、機械学習(ML)モデルを使用して、例のPCollectionに対して推論を実行します。 この変換は、入力の例と出力の予測を含むPCollectionを出力します。 詳細については、こちらのRunInferenceを参照してください。 また、GitHubで推論の例を見つけることもできます。

非常に大規模なモデルでRunInferenceを使用する

RunInferenceは、ハードウェアに収まる限り、任意の大規模モデルで適切に機能します。

メモリ管理

RunInferenceには、メモリ使用量を削減するためのいくつかのメカニズムがあります。 たとえば、デフォルトでは、RunInferenceはプロセスごとに各モデルのコピーを最大1つロードします(スレッドごとに1つではなく)。

ただし、多くのBeamランナーは、マシンごとに複数のBeamプロセスを同時に実行します。 これにより、LLMのような大規模モデルを複数回ロードするメモリフットプリントが大きすぎて単一のマシンに収まらない可能性があるため、問題が発生する可能性があります。 メモリを大量に消費するモデルの場合、RunInferenceは複数のプロセス間でメモリをよりインテリジェントに共有して、全体的なメモリフットプリントを削減するメカニズムを提供します。 このモードを有効にするには、ユーザーはモデル構成でパラメーターlarge_modelをTrueに設定するだけで済みます(以下の例を参照)。Beamがメモリ管理を行います。 カスタムモデルハンドラーを使用する場合、同様の効果を得るために、share_model_across_processes関数またはmodel_copies関数をオーバーライドできます。

T5を使用したパイプライン例の実行

この例では、パイプラインでRunInferenceを使用して、T5言語モデルで推論を実行する方法を示します。 T5は、教師なしタスクと教師ありタスクのマルチタスク混合で事前トレーニングされたエンコーダーデコーダーモデルです。 各タスクはテキストからテキストへの形式に変換されます。 この例では、110億のパラメーターを含み、サイズが45 GBのT5-11Bを使用します。 さまざまなタスクで適切に機能するために、T5は各タスクに対応する入力に異なるプレフィックスを付加します。 たとえば、翻訳の場合、入力はtranslate English to German: …となり、要約の場合はsummarize: …となります。 T5の詳細については、HuggingFaceドキュメントのT5の概要を参照してください。

このモデルで推論を実行するには、まずapache-beam 2.40以降をインストールします

pip install apache-beam -U

次に、requirements.txtにリストされている必要なパッケージをインストールし、必要な引数を渡します。 次の手順で、Hugging Face HubからT5-11bモデルをダウンロードできます

import torch
from transformers import T5ForConditionalGeneration

model = T5ForConditionalGeneration.from_pretrained("path/to/cloned/t5-11b")
torch.save(model.state_dict(), "path/to/save/state_dict.pth")

GitHubでコードを表示できます

  1. マシン上でローカルに
python main.py --runner DirectRunner \
               --model_state_dict_path <local or remote path to state_dict> \
               --model_name t5-11b

この例を実行するには、45 GBのディスク容量が必要です。

  1. Dataflowを使用したGoogle Cloud
python main.py --runner DataflowRunner \
                --model_state_dict_path <gs://path/to/saved/state_dict.pth> \
                --model_name t5-11b \
                --project <PROJECT_ID> \
                --region <REGION> \
                --requirements_file requirements.txt \
                --staging_location <gs://path/to/staging/location>
                --temp_location <gs://path/to/temp/location> \
                --experiments "use_runner_v2,no_use_multiple_sdk_containers" \
                --machine_type=n1-highmem-16 \
                --disk_size_gb=200

こちらの説明に従って、他の構成パラメーターを渡すこともできます。

パイプラインのステップ

パイプラインには、次の手順が含まれています

  1. 入力を出力します。
  2. トークナイザーを使用して、テキストをトランスフォーマーが読み取れるトークンID整数にエンコードします。
  3. RunInferenceを使用して出力を取得します。
  4. RunInferenceの出力をデコードして出力します。

次のコードスニペットには、4つの手順が含まれています

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "CreateInputs" >> beam.Create(task_sentences)
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
        )

パイプラインの3番目のステップでは、RunInferenceを使用します。 使用するには、最初にModelHandlerを定義する必要があります。 RunInferenceは、PyTorchTensorFlowScikit-Learnのモデルハンドラーを提供します。 この例ではPyTorchモデルを使用しているため、PyTorchModelHandlerTensorモデルハンドラーを使用します。

  gen_fn = make_tensor_model_fn('generate')

  model_handler = PytorchModelHandlerTensor(
      state_dict_path=args.model_state_dict_path,
      model_class=T5ForConditionalGeneration,
      model_params={"config": AutoConfig.from_pretrained(args.model_name)},
      device="cpu",
      inference_fn=gen_fn,
      large_model=True)

ModelHandlerには、次のようなパラメーターが必要です

大規模モデルのトラブルシューティング

ピク漬けエラー

large_model=True でプロセス間でモデルを共有する場合、またはカスタムモデルハンドラーを使用する場合、Beamはプロセス境界を越えて入力データと出力データを送信します。これを行うために、picklingと呼ばれるシリアライゼーション方法を使用します。たとえば、output=model.my_inference_fn(input_1, input_2) を呼び出すと、input_1input_2、および output はすべてpickle化する必要があります。モデル自体はプロセス境界を越えて渡されないため、pickle化する必要はありません。

ほとんどのオブジェクトは問題なくpickle化できますが、これらのオブジェクトのいずれかがpickle化できない場合、error: can't pickle fasttext_pybind.fasttext objects のようなエラーが発生する可能性があります。これを回避するには、いくつかのオプションがあります。

まず、可能であれば、プロセス間でモデルを共有しないことを選択できます。これにより、追加のメモリ負荷が発生しますが、場合によっては許容できる場合があります。

次に、カスタムモデルハンドラーを使用して、モデルをラップしてシリアライズ可能な型を受け取り、返すことができます。たとえば、モデルハンドラーが次のようになっている場合

class MyModelHandler():
   def load_model(self):
      return model_loading_logic()

   def run_inference(self, batch: Sequence[str], model, inference_args):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      my_output = int(unpickleable_returned[0])
      return my_output

代わりに、pickle化できない部分をモデルラッパーでラップすることができます。モデルラッパーは推論プロセスに存在するため、pickle化可能なオブジェクトのみを受け取り/返す限り、これは機能します。

class MyWrapper():
   def __init__(self, model):
      self._model = model

   def predict(self, batch: Sequence[str]):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      return int(prediction[0])

class MyModelHandler():
   def load_model(self):
      return MyWrapper(model_loading_logic())

   def run_inference(self, batch: Sequence[str], model: MyWrapper, inference_args):
      return model.predict(unpickleable_object)

BeamにおけるRAGとプロンプトエンジニアリング

Beamは、Retrieval Augmented Generation (RAG) を使用してLLMプロンプトの品質を向上させるための優れたツールでもあります。Retrieval Augmented Generationは、大規模言語モデル (LLM) を外部知識ソースに接続することで、LLMを強化する手法です。これにより、LLMはリアルタイム情報にアクセスして処理できるようになり、応答の精度、関連性、および事実性が向上します。

Beamには、このプロセスを簡素化するためのいくつかのメカニズムがあります。

  1. BeamのMLTransformは、RAGに使用される埋め込みを生成するための埋め込みパッケージを提供します。埋め込みハンドラーのないモデルがある場合は、RunInferenceを使用して埋め込みを生成することもできます。
  2. BeamのEnrichment変換を使用すると、ベクトルデータベースなどの外部ストレージシステムで埋め込みやその他の情報を簡単に検索できます。

まとめて、これらを使用して次の手順でRAGを実行できます。

パイプライン1 - 知識ベースを生成する

  1. BeamのIOコネクタのいずれかを使用して、外部ソースからデータを取り込む
  2. MLTransformを使用して、そのデータに埋め込みを生成する
  3. ParDoを使用して、それらの埋め込みをベクトルDBに書き込む

パイプライン2 - 知識ベースを使用してRAGを実行する

  1. BeamのIOコネクタのいずれかを使用して、外部ソースからデータを取り込む
  2. MLTransformを使用して、そのデータに埋め込みを生成する
  3. Enrichmentを使用して、ベクトルDBからの追加の埋め込みでそのデータを強化する
  4. RunInferenceを使用して、その強化されたデータでLLMをプロンプトする
  5. BeamのIOコネクタのいずれかを使用して、そのデータを目的のシンクに書き込む

RAGを実行するパイプラインの例については、https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/rag_usecase/beam_rag_notebook.ipynbを参照してください。