Java SDKからのRunInferenceの使用

この例のパイプラインはJavaで記述されており、Google Cloud Storageから入力データを読み取ります。 PythonExternalTransformを利用して、複合Python変換が呼び出され、前処理、後処理、および推論を行います。最後に、データはJavaパイプラインでGoogle Cloud Storageに書き戻されます。

この例で使用されているコードは、Beamリポジトリにあります。

NLPモデルとデータセット

bert-base-uncased自然言語処理(NLP)モデルを使用して推論を行います。このモデルはオープンソースで、HuggingFaceで入手できます。このBERTモデルは、文脈に基づいて文の最後の単語を予測するために使用されます。

IMDB映画レビューデータセットも使用します。これは、Kaggleで入手できるオープンソースのデータセットです。

以下は、前処理後のデータのサンプルです

テキスト最後の単語
他のレビュアーの1人は、Ozエピソードを1つ見ただけで、[MASK]になると言っています。ハマる
素晴らしい小さな[MASK]作品
それで、私はボールの作品の大ファンではありませんが、それほど多くの[MASK]ではありません。いる
これは、[MASK]になった3人の囚人の素晴らしい映画です。有名
一部の映画は、単純に[MASK]にすべきではありません。リメイク
カレン・カーペンターの物語は、歌手カレン・カーペンターの複雑な[MASK]についてもう少し詳しく示しています。人生

多言語推論パイプライン

多言語パイプラインを使用すると、はるかに多くの変換にアクセスできます。詳細については、Apache Beamプログラミングガイドの多言語パイプラインを参照してください。

カスタムPython変換

推論の実行に加えて、データに対して前処理と後処理も実行する必要があります。データの後処理により、出力を解釈することができます。これらの3つのタスクを実行するために、次のスニペットに示すように、タスクごとにユニットDoFnまたはPTransformを使用して、1つの複合カスタムPTransformが作成されます。

def expand(self, pcoll):
    return (
    pcoll
    | 'Preprocess' >> beam.ParDo(self.Preprocess(self._tokenizer))
    | 'Inference' >> RunInference(KeyedModelHandler(self._model_handler))
    | 'Postprocess' >> beam.ParDo(self.Postprocess(
        self._tokenizer))
    )

まず、データの前処理です。この場合、生のテキストデータはクリーンアップされ、BERTモデル用にトークン化されます。これらのステップはすべて、Preprocess DoFnで実行されます。Preprocess DoFnは、単一の要素を入力として受け取り、元のテキストとトークン化されたテキストの両方を含むリストを返します。

前処理されたデータは、推論を行うために使用されます。これは、Apache Beam SDKですでに入手可能なRunInference PTransformで行われます。RunInference PTransformには、モデルハンドラーという1つのパラメーターが必要です。この例では、Preprocess DoFnが元の文も出力するため、KeyedModelHandlerが使用されます。要件に基づいて、前処理の方法を変更できます。このモデルハンドラーは、複合PTransformの次の初期化関数で定義されています。

def __init__(self, model, model_path):
    self._model = model
    logging.info(f"Downloading {self._model} model from GCS.")
    self._model_config = BertConfig.from_pretrained(self._model)
    self._tokenizer = BertTokenizer.from_pretrained(self._model)
    self._model_handler = self.PytorchModelHandlerKeyedTensorWrapper(
        state_dict_path=(model_path),
        model_class=BertForMaskedLM,
        model_params={'config': self._model_config},
        device='cuda:0')

PytorchModelHandlerKeyedTensorモデルハンドラーのラッパーであるPytorchModelHandlerKeyedTensorWrapperが使用されます。PytorchModelHandlerKeyedTensorモデルハンドラーは、PyTorchモデルで推論を行います。BertTokenizerから生成されたトークン化された文字列は長さが異なる可能性があり、stack()ではテンソルが同じサイズである必要があるため、PytorchModelHandlerKeyedTensorWrapperはバッチサイズを1に制限します。max_batch_sizeを1に制限すると、run_inference()呼び出しにはバッチごとに1つの例が含まれることになります。次のコードは、ラッパーの定義を示しています。

class PytorchModelHandlerKeyedTensorWrapper(PytorchModelHandlerKeyedTensor):

    def batch_elements_kwargs(self):
      return {'max_batch_size': 1}

別の方法は、すべてのテンソルを同じ長さにすることです。このは、その方法を示しています。

ModelConfigModelTokenizerは、初期化関数でロードされます。ModelConfigはモデルアーキテクチャを定義するために使用され、ModelTokenizerは入力データをトークン化するために使用されます。次の2つのパラメーターは、これらのタスクに使用されます。

これらのパラメーターは両方とも、Java PipelineOptionsで指定されています。

最後に、Postprocess DoFnでモデル予測を後処理します。Postprocess DoFnは、元のテキスト、文の最後の単語、および予測された単語を返します。

Pythonコードをパッケージにコンパイル

カスタムPythonコードは、ローカルパッケージで記述し、tarballとしてコンパイルする必要があります。このパッケージは、Javaパイプラインで使用できます。次の例は、Pythonパッケージをtarballにコンパイルする方法を示しています。

 pip install --upgrade build && python -m build --sdist

これを実行するには、setup.pyが必要です。tarballへのパスは、Javaパイプラインのパイプラインオプションの引数として使用されます。

Javaパイプラインの実行

Javaパイプラインは、MultiLangRunInferenceクラスで定義されています。このパイプラインでは、データがGoogle Cloud Storageから読み取られ、クロスランゲージPython変換が適用され、出力がGoogle Cloud Storageに書き戻されます。

PythonExternalTransformは、クロスランゲージPython変換をJavaパイプラインに挿入するために使用されます。PythonExternalTransformは、Python変換の完全修飾名である文字列パラメーターを受け取ります。

withKwargメソッドは、Python変換に必要なパラメータを指定するために使用されます。この例では、modelmodel_pathパラメータが指定されています。これらのパラメータは、最初のセクションで示されているように、複合Python PTransformの初期化関数で使用されます。最後に、withExtraPackagesメソッドは、Python変換に必要な追加のPython依存関係を指定するために使用されます。この例では、local_packagesリストが使用されており、これにはPythonの要件とコンパイル済みのtarballへのパスが含まれています。

パイプラインを実行するには、次のコマンドを使用します。

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MultiLangRunInference \
    -Dexec.args="--runner=DataflowRunner \
                 --project=$GCP_PROJECT\
                 --region=$GCP_REGION \
                 --gcpTempLocation=gs://$GCP_BUCKET/temp/ \
                 --inputFile=gs://$GCP_BUCKET/input/imdb_reviews.csv \
                 --outputFile=gs://$GCP_BUCKET/output/ouput.txt \
                 --modelPath=gs://$GCP_BUCKET/input/bert-model/bert-base-uncased.pth \
                 --modelName=$MODEL_NAME \
                 --localPackage=$LOCAL_PACKAGE" \
    -Pdataflow-runner

標準のGoogle CloudとRunnerのパラメータが指定されています。inputFileoutputFileパラメータは、入力ファイルと出力ファイルを指定するために使用されます。modelPathmodelNameのカスタムパラメータは、PythonExternalTransformに渡されます。最後に、localPackageパラメータは、カスタムPython変換を含むコンパイル済みのPythonパッケージへのパスを指定するために使用されます。

終わりに

この例をベースとして、他のカスタム多言語推論パイプラインを作成してください。他のSDKを使用することもできます。たとえば、Goにもクロスランゲージ変換を行うためのラッパーがあります。詳細については、Apache BeamプログラミングガイドのGoパイプラインでのクロスランゲージ変換の使用を参照してください。

この例で使用されている完全なコードは、GitHubで確認できます。