Pythonマルチ言語パイプライン クイックスタート
このページでは、Apache Beam SDK for Pythonを使用したマルチ言語パイプラインの作成の概要を説明します。このトピックの詳細については、マルチ言語パイプラインを参照してください。
このクイックスタートで示されているコードは、実行可能な例のコレクションで入手できます。
マルチ言語Pythonパイプラインをビルドして実行するには、Beam SDKがインストールされたPython環境が必要です。環境が設定されていない場合は、まずApache Beam Python SDKクイックスタートを完了してください。
マルチ言語パイプラインとは、1つのBeam SDK言語で構築され、別のBeam SDK言語からの1つ以上の変換を使用するパイプラインです。これらの「他の言語」の変換は、クロス言語変換と呼ばれます。これは、パイプラインコンポーネントをBeam SDK間で簡単に共有し、すべてのSDKで使用可能な変換のプールを増やすためのものです。以下の例では、マルチ言語パイプラインはBeam Python SDKで構築され、クロス言語変換はBeam Java SDKで構築されています。
クロス言語変換の作成
入力文字列にプレフィックスを追加する簡単なJava変換、JavaPrefixを次に示します。
public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {
final String prefix;
public JavaPrefix(String prefix) {
this.prefix = prefix;
}
class AddPrefixDoFn extends DoFn<String, String> {
@ProcessElement
public void process(@Element String input, OutputReceiver<String> o) {
o.output(prefix + input);
}
}
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
.apply(
"AddPrefix",
ParDo.of(new AddPrefixDoFn()));
}
}
これをクロス言語変換として利用可能にするには、構成オブジェクトとビルダーを追加する必要があります。
注:Beam 2.34.0以降、Python SDKユーザーは、追加のJavaコードを記述せずにいくつかのJava変換を使用できます。詳細については、クロス言語Java変換の作成を参照してください。
構成オブジェクトは、変換に必要なフィールドを持つ単純なJavaオブジェクト(POJO)です。例として、JavaPrefixConfigurationがあります。
public class JavaPrefixConfiguration {
String prefix;
public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
次に示すJavaPrefixBuilderのように実装されたビルダークラスは、ExternalTransformBuilderを実装し、構成オブジェクトを使用するbuildExternal
をオーバーライドする必要があります。
public class JavaPrefixBuilder implements
ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {
@Override
public PTransform<PCollection<String>, PCollection<String>> buildExternal(
JavaPrefixConfiguration configuration) {
return new JavaPrefix(configuration.prefix);
}
}
また、拡張サービスにあなたの変換を登録するためのレジストラークラスを追加する必要があります。
@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {
final String URN = "beam:transform:my.beam.test:javaprefix:v1";
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
return ImmutableMap.of(URN,new JavaPrefixBuilder());
}
}
JavaPrefixRegistrarで示すように、レジストラーはExternalTransformRegistrarを実装する必要があり、1つのメソッドknownBuilderInstances
があります。これは、一意のURNをビルダーのインスタンスにマッピングするマップを返します。このクラスを拡張サービスに登録するには、AutoServiceアノテーションを使用できます。
拡張サービスの選択
マルチ言語パイプラインのジョブを構築する場合、Beamは拡張サービスを使用して複合変換を展開します。リモートSDKごとに少なくとも1つの拡張サービスが必要です。
ほとんどの場合、デフォルトのJava ExpansionServiceを使用できます。このサービスは、拡張サービスのポートを指定する単一の引数を取ります。その後、アドレスはPythonパイプラインによって提供されます。
マルチ言語パイプラインを実行する前に、Javaクロス言語変換をビルドし、拡張サービスを起動する必要があります。拡張サービスを起動する際には、クラスパスに依存関係を追加する必要があります。複数のJARを使用できますが、単一のシェーディングされたJARを作成する方が簡単な場合があります。PythonとJavaの両方の依存関係は、Python SDKによってランナーにステージングされます。
拡張サービスを実行する手順は、使用するビルドツールによって異なります。**java-prefix-bundled-0.1.jar**という名前のJARをビルドしたと仮定すると、拡張サービスが実行されるポートが12345
の場合、次のコマンドでサービスを起動できます。
java -jar java-prefix-bundled-0.1.jar 12345
サンプル拡張サービスの実行方法については、このREADMEを参照してください。
Pythonパイプラインの作成
これで、PythonパイプラインはExternalTransform APIを使用してクロス言語変換を構成できます。addprefix.pyからの例を次に示します。
with beam.Pipeline(options=pipeline_options) as p:
input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)
java_output = (
input
| 'JavaPrefix' >> beam.ExternalTransform(
'beam:transform:my.beam.test:javaprefix:v1',
ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
"localhost:12345"))
def python_prefix(record):
return 'python:%s' % record
output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
output | 'Write' >> WriteToText(output_path)
ExternalTransform
は3つのパラメーターを取ります。
- クロス言語変換のURN
- バイト文字列またはPayloadBuilderとしてのペイロード
- 拡張サービス
URNは変換の一意のBeam識別子であり、拡張サービスについては既に説明しました。PayloadBuilderは新しい概念であり、次に説明します。
注:URNが他の変換のURNと競合しないようにするには、クロス言語変換のURNの選択で説明されているURN規則に従ってください。
ペイロードビルダーの提供
上記のPythonパイプラインの例では、ExternalTransform
の2番目の引数としてImplicitSchemaPayloadBuilderを提供しています。ImplicitSchemaPayloadBuilder
は、提供された値からスキーマを生成するペイロードを構築します。この場合、提供された値は次のキーと値のペアに含まれています:{'prefix': 'java:'}
。JavaPrefix
変換はprefix
引数を期待しており、ペイロードビルダーは文字列java:
を渡します。これは、各入力要素の前に追加されます。
ペイロードビルダーは、拡張リクエストで変換のペイロードを構築するのに役立ちます。ImplicitSchemaPayloadBuilder
の代わりに、名前付きタプルスキーマに基づいてペイロードを構築するNamedTupleBasedPayloadBuilder、または型アノテーションに基づいてスキーマを構築するAnnotationBasedPayloadBuilderを使用できます。使用可能なペイロードビルダーの完全なリストについては、transforms.external APIリファレンスを参照してください。
標準の要素型の使用
マルチ言語境界では、すべてのBeam SDKが理解できる要素型を使用する必要があります。これらは、Beam標準コーダーによって表される型です。
BYTES
STRING_UTF8
KV
BOOL
VARINT
DOUBLE
ITERABLE
TIMER
WINDOWED_VALUE
ROW
任意の構造化型(たとえば、任意のJavaオブジェクト)には、ROW
(PCollection<Row>
)を使用します。PCollection<Row>
を生成する新しいJava複合変換を開発する必要がある場合があります。SDK固有のコーダーは、他のSDKによって消費されるPCollectionで使用されていない限り、複合クロス言語変換内で使用できます。
パイプラインの実行
Pythonパイプラインを実行する正確なコマンドは、環境によって異なります。パイプラインがaddprefix.pyという名前のファイルに記述されていると仮定すると、手順は下記と同様になります。詳細については、addprefix.pyのコメントを参照してください。
Direct Runnerを使用した実行
次のコマンドでは、input1
はテキストの行を含むファイルです。
python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output
Dataflow Runnerを使用した実行
次のスクリプトは、Cloud Storageバケットのサンプルテキストを使用して、Dataflowで多言語パイプラインを実行します。スクリプトはご自身の環境に合わせて調整する必要があります。
#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"
# other commands, e.g. changing into the appropriate directory
gsutil rm gs://$GCS_BUCKET/javaprefix/*
python addprefix.py \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $GCP_PROJECT \
--region $GCP_REGION \
--job_name $JOB_NAME \
--num_workers $NUM_WORKERS \
--input "gs://dataflow-samples/shakespeare/kinglear.txt" \
--output "gs://$GCS_BUCKET/javaprefix/output"