Beamでの大規模言語モデル推論
Apache Beam 2.40.0では、Beamパイプラインに機械学習モデルをデプロイできるRunInference APIが導入されました。 RunInference
変換は、機械学習(ML)モデルを使用して、例のPCollection
に対して推論を実行します。 この変換は、入力の例と出力の予測を含むPCollectionを出力します。 詳細については、こちらのRunInferenceを参照してください。 また、GitHubで推論の例を見つけることもできます。
非常に大規模なモデルでRunInferenceを使用する
RunInferenceは、ハードウェアに収まる限り、任意の大規模モデルで適切に機能します。
メモリ管理
RunInferenceには、メモリ使用量を削減するためのいくつかのメカニズムがあります。 たとえば、デフォルトでは、RunInferenceはプロセスごとに各モデルのコピーを最大1つロードします(スレッドごとに1つではなく)。
ただし、多くのBeamランナーは、マシンごとに複数のBeamプロセスを同時に実行します。 これにより、LLMのような大規模モデルを複数回ロードするメモリフットプリントが大きすぎて単一のマシンに収まらない可能性があるため、問題が発生する可能性があります。 メモリを大量に消費するモデルの場合、RunInferenceは複数のプロセス間でメモリをよりインテリジェントに共有して、全体的なメモリフットプリントを削減するメカニズムを提供します。 このモードを有効にするには、ユーザーはモデル構成でパラメーターlarge_model
をTrueに設定するだけで済みます(以下の例を参照)。Beamがメモリ管理を行います。 カスタムモデルハンドラーを使用する場合、同様の効果を得るために、share_model_across_processes
関数またはmodel_copies
関数をオーバーライドできます。
T5を使用したパイプライン例の実行
この例では、パイプラインでRunInference
を使用して、T5
言語モデルで推論を実行する方法を示します。 T5
は、教師なしタスクと教師ありタスクのマルチタスク混合で事前トレーニングされたエンコーダーデコーダーモデルです。 各タスクはテキストからテキストへの形式に変換されます。 この例では、110億のパラメーターを含み、サイズが45 GBのT5-11B
を使用します。 さまざまなタスクで適切に機能するために、T5
は各タスクに対応する入力に異なるプレフィックスを付加します。 たとえば、翻訳の場合、入力はtranslate English to German: …
となり、要約の場合はsummarize: …
となります。 T5
の詳細については、HuggingFaceドキュメントのT5の概要を参照してください。
このモデルで推論を実行するには、まずapache-beam
2.40以降をインストールします
pip install apache-beam -U
次に、requirements.txtにリストされている必要なパッケージをインストールし、必要な引数を渡します。 次の手順で、Hugging Face HubからT5-11b
モデルをダウンロードできます
- こちらの手順に従ってGit LFSをインストールします
git lfs install
を実行しますgit clone https://huggingface.co/t5-11b
を実行します(これには時間がかかる場合があります)。 これにより、チェックポイントがダウンロードされます。次に、こちらの説明に従って、チェックポイントをモデルの状態辞書に変換する必要があります
import torch
from transformers import T5ForConditionalGeneration
model = T5ForConditionalGeneration.from_pretrained("path/to/cloned/t5-11b")
torch.save(model.state_dict(), "path/to/save/state_dict.pth")
GitHubでコードを表示できます
- マシン上でローカルに
python main.py --runner DirectRunner \
--model_state_dict_path <local or remote path to state_dict> \
--model_name t5-11b
この例を実行するには、45 GBのディスク容量が必要です。
- Dataflowを使用したGoogle Cloud
python main.py --runner DataflowRunner \
--model_state_dict_path <gs://path/to/saved/state_dict.pth> \
--model_name t5-11b \
--project <PROJECT_ID> \
--region <REGION> \
--requirements_file requirements.txt \
--staging_location <gs://path/to/staging/location>
--temp_location <gs://path/to/temp/location> \
--experiments "use_runner_v2,no_use_multiple_sdk_containers" \
--machine_type=n1-highmem-16 \
--disk_size_gb=200
こちらの説明に従って、他の構成パラメーターを渡すこともできます。
パイプラインのステップ
パイプラインには、次の手順が含まれています
- 入力を出力します。
- トークナイザーを使用して、テキストをトランスフォーマーが読み取れるトークンID整数にエンコードします。
- RunInferenceを使用して出力を取得します。
- RunInferenceの出力をデコードして出力します。
次のコードスニペットには、4つの手順が含まれています
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| "CreateInputs" >> beam.Create(task_sentences)
| "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
| "RunInference" >> RunInference(model_handler=model_handler)
| "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
)
パイプラインの3番目のステップでは、RunInference
を使用します。 使用するには、最初にModelHandler
を定義する必要があります。 RunInferenceは、PyTorch
、TensorFlow
、Scikit-Learn
のモデルハンドラーを提供します。 この例ではPyTorch
モデルを使用しているため、PyTorchModelHandlerTensor
モデルハンドラーを使用します。
ModelHandler
には、次のようなパラメーターが必要です
state_dict_path
– モデルの状態の保存された辞書へのパス。model_class
– モデル構造を定義するPytorchモデルのクラス。model_params
– モデルクラスをインスタンス化するのに必要な引数の辞書。device
– モデルを実行したいデバイス。device = GPU の場合、GPUデバイスが利用可能であれば使用されます。そうでない場合は、CPUになります。inference_fn
- RunInference中に使用する推論関数。large_model
- (上記の「メモリ管理」を参照)。メモリ最小化技術を使用してモデルのメモリフットプリントを削減するかどうか。
大規模モデルのトラブルシューティング
ピク漬けエラー
large_model=True
でプロセス間でモデルを共有する場合、またはカスタムモデルハンドラーを使用する場合、Beamはプロセス境界を越えて入力データと出力データを送信します。これを行うために、picklingと呼ばれるシリアライゼーション方法を使用します。たとえば、output=model.my_inference_fn(input_1, input_2)
を呼び出すと、input_1
、input_2
、および output
はすべてpickle化する必要があります。モデル自体はプロセス境界を越えて渡されないため、pickle化する必要はありません。
ほとんどのオブジェクトは問題なくpickle化できますが、これらのオブジェクトのいずれかがpickle化できない場合、error: can't pickle fasttext_pybind.fasttext objects
のようなエラーが発生する可能性があります。これを回避するには、いくつかのオプションがあります。
まず、可能であれば、プロセス間でモデルを共有しないことを選択できます。これにより、追加のメモリ負荷が発生しますが、場合によっては許容できる場合があります。
次に、カスタムモデルハンドラーを使用して、モデルをラップしてシリアライズ可能な型を受け取り、返すことができます。たとえば、モデルハンドラーが次のようになっている場合
class MyModelHandler():
def load_model(self):
return model_loading_logic()
def run_inference(self, batch: Sequence[str], model, inference_args):
unpickleable_object = Unpickleable(batch)
unpickleable_returned = model.predict(unpickleable_object)
my_output = int(unpickleable_returned[0])
return my_output
代わりに、pickle化できない部分をモデルラッパーでラップすることができます。モデルラッパーは推論プロセスに存在するため、pickle化可能なオブジェクトのみを受け取り/返す限り、これは機能します。
class MyWrapper():
def __init__(self, model):
self._model = model
def predict(self, batch: Sequence[str]):
unpickleable_object = Unpickleable(batch)
unpickleable_returned = model.predict(unpickleable_object)
return int(prediction[0])
class MyModelHandler():
def load_model(self):
return MyWrapper(model_loading_logic())
def run_inference(self, batch: Sequence[str], model: MyWrapper, inference_args):
return model.predict(unpickleable_object)
BeamにおけるRAGとプロンプトエンジニアリング
Beamは、Retrieval Augmented Generation (RAG) を使用してLLMプロンプトの品質を向上させるための優れたツールでもあります。Retrieval Augmented Generationは、大規模言語モデル (LLM) を外部知識ソースに接続することで、LLMを強化する手法です。これにより、LLMはリアルタイム情報にアクセスして処理できるようになり、応答の精度、関連性、および事実性が向上します。
Beamには、このプロセスを簡素化するためのいくつかのメカニズムがあります。
- BeamのMLTransformは、RAGに使用される埋め込みを生成するための埋め込みパッケージを提供します。埋め込みハンドラーのないモデルがある場合は、RunInferenceを使用して埋め込みを生成することもできます。
- BeamのEnrichment変換を使用すると、ベクトルデータベースなどの外部ストレージシステムで埋め込みやその他の情報を簡単に検索できます。
まとめて、これらを使用して次の手順でRAGを実行できます。
パイプライン1 - 知識ベースを生成する
- BeamのIOコネクタのいずれかを使用して、外部ソースからデータを取り込む
- MLTransformを使用して、そのデータに埋め込みを生成する
- ParDoを使用して、それらの埋め込みをベクトルDBに書き込む
パイプライン2 - 知識ベースを使用してRAGを実行する
- BeamのIOコネクタのいずれかを使用して、外部ソースからデータを取り込む
- MLTransformを使用して、そのデータに埋め込みを生成する
- Enrichmentを使用して、ベクトルDBからの追加の埋め込みでそのデータを強化する
- RunInferenceを使用して、その強化されたデータでLLMをプロンプトする
- BeamのIOコネクタのいずれかを使用して、そのデータを目的のシンクに書き込む
RAGを実行するパイプラインの例については、https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/rag_usecase/beam_rag_notebook.ipynbを参照してください。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!