Java 多言語パイプラインクイックスタート
このページでは、Apache Beam SDK for Javaを使用して多言語パイプラインを作成する方法の概要について説明します。このトピックの詳細については、多言語パイプラインを参照してください。
多言語パイプラインとは、1つのBeam SDK言語で構築され、別のBeam SDK言語からの1つ以上の変換を使用するパイプラインです。別のSDKからのこれらの変換は、クロスランゲージ変換と呼ばれます。多言語サポートにより、パイプラインコンポーネントをBeam SDK間で簡単に共有できるようになり、すべてのSDKで使用可能な変換のプールが拡大します。
以下の例では、多言語パイプラインはBeam Java SDKで構築され、クロスランゲージ変換はBeam Python SDKで構築されています。
前提条件
このクイックスタートは、シェイクスピアテキストの単語数をカウントするJavaのサンプルパイプライン、PythonDataframeWordCountに基づいています。パイプラインを実行する場合は、Beamリポジトリをクローンまたはダウンロードし、ソースコードからサンプルをビルドできます。
サンプルをビルドして実行するには、Beam Java SDKバージョン2.41.0以降がインストールされたJava環境とPython環境が必要です。これらの環境がまだ設定されていない場合は、最初にApache Beam Java SDKクイックスタートとApache Beam Python SDKクイックスタートを完了してください。
ポータブルDirectRunnerで実行するには、Dockerをローカルにインストールし、Dockerデーモンを実行している必要があります。これはDataflowには必要ありません。
Dataflowで実行するには、課金が有効になっているGoogle CloudプロジェクトとGoogle Cloud Storageバケットが必要です。
この例は、Pythonバージョン3.8より前では利用できないPython pandasパッケージ1.4.0以降に依存しています。したがって、システムにインストールされているデフォルトのPythonバージョンが3.8以降であることを確認してください。
クロスランゲージ変換を指定する
Javaのサンプルパイプラインは、PythonのDataframeTransformをクロスランゲージ変換として使用します。この変換は、pandasのようなDataFrameオブジェクトを操作するためのBeamデータフレームAPIの一部です。
クロスランゲージ変換を適用するには、パイプラインでそれを指定する必要があります。Python変換は、完全修飾名で識別されます。たとえば、`DataframeTransform`は`apache_beam.dataframe.transforms`パッケージにあるため、完全修飾名は`apache_beam.dataframe.transforms.DataframeTransform`です。サンプルパイプライン、PythonDataframeWordCountは、この完全修飾名をPythonExternalTransformに渡します。
**注:**サンプルパイプラインは、任意のPythonクロスランゲージ変換を使用するJava多言語パイプラインの開発を示すことを目的としています。JavaでのデータフレームAPIの本番ユースケースでは、代わりに上位レベルのDataframeTransformを使用する必要があります。
サンプルからの完全なパイプライン定義は次のとおりです。
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.setRowSchema(ExtractWordsFn.SCHEMA)
.apply(
PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
"apache_beam.dataframe.transforms.DataframeTransform",
options.getExpansionService())
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
.withKwarg("include_indexes", true))
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
`PythonExternalTransform`は、外部Python変換を呼び出すためのラッパーです。`from`メソッドは、2つの文字列を受け入れます。1)完全修飾変換名。2)拡張サービスのオプションのアドレスとポート番号。このメソッドは、Javaパイプラインで直接使用できるPythonクロスランゲージ変換のスタブを返します。`withKwarg`は、Pythonクロスランゲージ変換をインスタンス化するためのキーワード引数を指定します。この場合、`withKwarg`は2回呼び出され、`func`引数と`include_indexes`引数を指定し、これらの引数は`DataframeTransform`に渡されます。`PythonExternalTransform`は、Pythonクロスランゲージ変換の引数とキーワード引数を指定する他の方法も提供します。
このパイプラインの仕組みを理解するには、最初の`withKwarg`呼び出しを詳しく見ると役立ちます。
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
`PythonCallableSource.of`への引数は、Pythonラムダ関数の文字列表現です。`DataframeTransform`は、`PCollection`にデータフレームであるかのように適用するPython呼び出し可能オブジェクトを引数として取ります。`withKwarg`メソッドを使用すると、JavaパイプラインでPython呼び出し可能オブジェクトを指定できます。`DataframeTransform`に関数を渡す方法の詳細については、パイプラインへのデータフレームの埋め込みを参照してください。
Javaパイプラインを実行する
環境をカスタマイズしたり、デフォルトのBeam SDKでは使用できない変換を使用したりする場合は、独自の拡張サービスを実行する必要がある場合があります。そのような場合は、パイプラインを実行する前に拡張サービスを開始してください。
パイプラインを実行する前に、選択したBeamランナーのランナー固有の設定を実行してください。
Maven Archetypeを使用してDataflowランナーで実行する(Beam 2.43.0以降)
- 関連するBeamバージョンのBeam examples Maven archetypeを確認してください。
export BEAM_VERSION=<Beam version>
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
- パイプラインを実行します。
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.PythonDataframeWordCount \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
HEADでDataflowランナーで実行する
次のスクリプトは、Cloud Storageバケットのサンプルテキストを使用して、Dataflowでサンプルの多言語パイプラインを実行します。スクリプトを環境に適応させる必要があります。
export GCP_PROJECT=<project>
export OUTPUT_BUCKET=<bucket>
export GCP_REGION=<region>
export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--output=gs://${OUTPUT_BUCKET}/count \
--region=${GCP_REGION}"
パイプラインは、結果を含むファイルを**gs://$OUTPUT_BUCKET/count-00000-of-00001**に出力します。
DirectRunnerで実行する
**注:**多言語パイプラインは、ポータブルランナーを使用する必要があります。ポータブルDirectRunnerはまだ実験段階であり、すべてのBeam機能をサポートしているわけではありません。
- 最新バージョンのBeam Python SDKがインストールされたPython仮想環境を作成します。手順については、こちらを参照してください。
- ポータブルDirectRunnerのジョブサーバーを実行します(Pythonで実装)。
export JOB_SERVER_PORT=<port>
python -m apache_beam.runners.portability.local_job_service_main -p $JOB_SERVER_PORT
別のシェルで、Beam HEAD Gitクローンに移動します。
ローカルパイプライン実行用のBeam Java SDKコンテナをビルドします(このガイドでは、JAVA_HOMEがJava 11に設定されている必要があります)。
./gradlew :sdks:java:container:java11:docker -Pjava11Home=$JAVA_HOME
- パイプラインを実行します。
export JOB_SERVER_PORT=<port> # Same port as before
export OUTPUT_FILE=<local relative path>
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=PortableRunner \
--jobEndpoint=localhost:$JOB_SERVER_PORT \
--output=$OUTPUT_FILE"
**注**この出力は、Python Dockerコンテナのローカルファイルシステムに書き込まれます。GCSに書き込むことによって出力を検証するには、ポータブルDirectRunnerは現在、GCSにアクセスするためのローカル認証情報を正しく転送できないため、`output`オプションに公開アクセス可能なGCSパスを指定する必要があります。
詳細:拡張サービスを開始する
多言語パイプラインのジョブをビルドする場合、Beamは拡張サービスを使用して複合変換を拡張します。リモートSDKごとに少なくとも1つの拡張サービスが必要です。
一般的なケースでは、システムにサポートされているバージョンのPythonがインストールされている場合、`PythonExternalTransform`に拡張サービスの作成と起動の詳細を処理させることができます。ただし、環境をカスタマイズしたり、デフォルトのBeam SDKでは使用できない変換を使用したりする場合は、独自の拡張サービスを実行する必要がある場合があります。
たとえば、Python 変換の標準拡張サービスである ExpansionServiceServicer を開始するには、次の手順に従います。
こちらの手順に従って、新しい仮想環境をアクティブ化します。
gcp
およびdataframe
パッケージを使用して Apache Beam をインストールします。
pip install 'apache-beam[gcp,dataframe]'
- 次のコマンドを実行します。
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
このコマンドは、標準拡張サービスを開始する expansion_service_main.py を実行します。Gradle を使用して Java パイプラインを実行する場合、expansionService
オプションを使用して拡張サービスを指定できます。例: --expansionService=localhost:<PORT>
。
次のステップ
Beam の複数言語パイプラインのサポートについて詳しくは、複数言語パイプラインをご覧ください。Beam DataFrame API について詳しくは、Beam DataFrames の概要をご覧ください。