Java SDKからのRunInferenceの使用
この例のパイプラインはJavaで記述されており、Google Cloud Storageから入力データを読み取ります。 PythonExternalTransformを利用して、複合Python変換が呼び出され、前処理、後処理、および推論を行います。最後に、データはJavaパイプラインでGoogle Cloud Storageに書き戻されます。
この例で使用されているコードは、Beamリポジトリにあります。
NLPモデルとデータセット
bert-base-uncased
自然言語処理(NLP)モデルを使用して推論を行います。このモデルはオープンソースで、HuggingFaceで入手できます。このBERTモデルは、文脈に基づいて文の最後の単語を予測するために使用されます。
IMDB映画レビューデータセットも使用します。これは、Kaggleで入手できるオープンソースのデータセットです。
以下は、前処理後のデータのサンプルです
テキスト | 最後の単語 |
---|---|
他のレビュアーの1人は、Ozエピソードを1つ見ただけで、[MASK]になると言っています。 | ハマる |
素晴らしい小さな[MASK] | 作品 |
それで、私はボールの作品の大ファンではありませんが、それほど多くの[MASK]ではありません。 | いる |
これは、[MASK]になった3人の囚人の素晴らしい映画です。 | 有名 |
一部の映画は、単純に[MASK]にすべきではありません。 | リメイク |
カレン・カーペンターの物語は、歌手カレン・カーペンターの複雑な[MASK]についてもう少し詳しく示しています。 | 人生 |
多言語推論パイプライン
多言語パイプラインを使用すると、はるかに多くの変換にアクセスできます。詳細については、Apache Beamプログラミングガイドの多言語パイプラインを参照してください。
カスタムPython変換
推論の実行に加えて、データに対して前処理と後処理も実行する必要があります。データの後処理により、出力を解釈することができます。これらの3つのタスクを実行するために、次のスニペットに示すように、タスクごとにユニットDoFnまたはPTransformを使用して、1つの複合カスタムPTransformが作成されます。
def expand(self, pcoll):
return (
pcoll
| 'Preprocess' >> beam.ParDo(self.Preprocess(self._tokenizer))
| 'Inference' >> RunInference(KeyedModelHandler(self._model_handler))
| 'Postprocess' >> beam.ParDo(self.Postprocess(
self._tokenizer))
)
まず、データの前処理です。この場合、生のテキストデータはクリーンアップされ、BERTモデル用にトークン化されます。これらのステップはすべて、Preprocess
DoFnで実行されます。Preprocess
DoFnは、単一の要素を入力として受け取り、元のテキストとトークン化されたテキストの両方を含むリストを返します。
前処理されたデータは、推論を行うために使用されます。これは、Apache Beam SDKですでに入手可能なRunInference
PTransformで行われます。RunInference
PTransformには、モデルハンドラーという1つのパラメーターが必要です。この例では、Preprocess
DoFnが元の文も出力するため、KeyedModelHandler
が使用されます。要件に基づいて、前処理の方法を変更できます。このモデルハンドラーは、複合PTransformの次の初期化関数で定義されています。
def __init__(self, model, model_path):
self._model = model
logging.info(f"Downloading {self._model} model from GCS.")
self._model_config = BertConfig.from_pretrained(self._model)
self._tokenizer = BertTokenizer.from_pretrained(self._model)
self._model_handler = self.PytorchModelHandlerKeyedTensorWrapper(
state_dict_path=(model_path),
model_class=BertForMaskedLM,
model_params={'config': self._model_config},
device='cuda:0')
PytorchModelHandlerKeyedTensor
モデルハンドラーのラッパーであるPytorchModelHandlerKeyedTensorWrapper
が使用されます。PytorchModelHandlerKeyedTensor
モデルハンドラーは、PyTorchモデルで推論を行います。BertTokenizer
から生成されたトークン化された文字列は長さが異なる可能性があり、stack()ではテンソルが同じサイズである必要があるため、PytorchModelHandlerKeyedTensorWrapper
はバッチサイズを1に制限します。max_batch_size
を1に制限すると、run_inference()呼び出しにはバッチごとに1つの例が含まれることになります。次のコードは、ラッパーの定義を示しています。
class PytorchModelHandlerKeyedTensorWrapper(PytorchModelHandlerKeyedTensor):
def batch_elements_kwargs(self):
return {'max_batch_size': 1}
別の方法は、すべてのテンソルを同じ長さにすることです。この例は、その方法を示しています。
ModelConfig
とModelTokenizer
は、初期化関数でロードされます。ModelConfig
はモデルアーキテクチャを定義するために使用され、ModelTokenizer
は入力データをトークン化するために使用されます。次の2つのパラメーターは、これらのタスクに使用されます。
model
:推論に使用されるモデルの名前。この例では、bert-base-uncased
です。model_path
:推論に使用されるモデルのstate_dict
へのパス。この例では、state_dict
が格納されているGoogle Cloud Storageバケットへのパスです。
これらのパラメーターは両方とも、Java PipelineOptions
で指定されています。
最後に、Postprocess
DoFnでモデル予測を後処理します。Postprocess
DoFnは、元のテキスト、文の最後の単語、および予測された単語を返します。
Pythonコードをパッケージにコンパイル
カスタムPythonコードは、ローカルパッケージで記述し、tarballとしてコンパイルする必要があります。このパッケージは、Javaパイプラインで使用できます。次の例は、Pythonパッケージをtarballにコンパイルする方法を示しています。
pip install --upgrade build && python -m build --sdist
これを実行するには、setup.py
が必要です。tarballへのパスは、Javaパイプラインのパイプラインオプションの引数として使用されます。
Javaパイプラインの実行
Javaパイプラインは、MultiLangRunInference
クラスで定義されています。このパイプラインでは、データがGoogle Cloud Storageから読み取られ、クロスランゲージPython変換が適用され、出力がGoogle Cloud Storageに書き戻されます。
PythonExternalTransform
は、クロスランゲージPython変換をJavaパイプラインに挿入するために使用されます。PythonExternalTransform
は、Python変換の完全修飾名である文字列パラメーターを受け取ります。
withKwarg
メソッドは、Python変換に必要なパラメータを指定するために使用されます。この例では、model
とmodel_path
パラメータが指定されています。これらのパラメータは、最初のセクションで示されているように、複合Python PTransformの初期化関数で使用されます。最後に、withExtraPackages
メソッドは、Python変換に必要な追加のPython依存関係を指定するために使用されます。この例では、local_packages
リストが使用されており、これにはPythonの要件とコンパイル済みのtarballへのパスが含まれています。
パイプラインを実行するには、次のコマンドを使用します。
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MultiLangRunInference \
-Dexec.args="--runner=DataflowRunner \
--project=$GCP_PROJECT\
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/temp/ \
--inputFile=gs://$GCP_BUCKET/input/imdb_reviews.csv \
--outputFile=gs://$GCP_BUCKET/output/ouput.txt \
--modelPath=gs://$GCP_BUCKET/input/bert-model/bert-base-uncased.pth \
--modelName=$MODEL_NAME \
--localPackage=$LOCAL_PACKAGE" \
-Pdataflow-runner
標準のGoogle CloudとRunnerのパラメータが指定されています。inputFile
とoutputFile
パラメータは、入力ファイルと出力ファイルを指定するために使用されます。modelPath
とmodelName
のカスタムパラメータは、PythonExternalTransform
に渡されます。最後に、localPackage
パラメータは、カスタムPython変換を含むコンパイル済みのPythonパッケージへのパスを指定するために使用されます。
終わりに
この例をベースとして、他のカスタム多言語推論パイプラインを作成してください。他のSDKを使用することもできます。たとえば、Goにもクロスランゲージ変換を行うためのラッパーがあります。詳細については、Apache BeamプログラミングガイドのGoパイプラインでのクロスランゲージ変換の使用を参照してください。
この例で使用されている完全なコードは、GitHubで確認できます。
最終更新日:2024/10/31
お探しの情報はすべて見つかりましたか?
すべて役立ち、明確でしたか?何か変更したい点はありますか?ぜひお知らせください!