Pythonマルチ言語パイプライン クイックスタート

このページでは、Apache Beam SDK for Pythonを使用したマルチ言語パイプラインの作成の概要を説明します。このトピックの詳細については、マルチ言語パイプラインを参照してください。

このクイックスタートで示されているコードは、実行可能な例のコレクションで入手できます。

マルチ言語Pythonパイプラインをビルドして実行するには、Beam SDKがインストールされたPython環境が必要です。環境が設定されていない場合は、まずApache Beam Python SDKクイックスタートを完了してください。

マルチ言語パイプラインとは、1つのBeam SDK言語で構築され、別のBeam SDK言語からの1つ以上の変換を使用するパイプラインです。これらの「他の言語」の変換は、クロス言語変換と呼ばれます。これは、パイプラインコンポーネントをBeam SDK間で簡単に共有し、すべてのSDKで使用可能な変換のプールを増やすためのものです。以下の例では、マルチ言語パイプラインはBeam Python SDKで構築され、クロス言語変換はBeam Java SDKで構築されています。

クロス言語変換の作成

入力文字列にプレフィックスを追加する簡単なJava変換、JavaPrefixを次に示します。

public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {

  final String prefix;

  public JavaPrefix(String prefix) {
    this.prefix = prefix;
  }

  class AddPrefixDoFn extends DoFn<String, String> {

    @ProcessElement
    public void process(@Element String input, OutputReceiver<String> o) {
      o.output(prefix + input);
    }
  }

  @Override
  public PCollection<String> expand(PCollection<String> input) {
    return input
        .apply(
            "AddPrefix",
            ParDo.of(new AddPrefixDoFn()));
  }
}

これをクロス言語変換として利用可能にするには、構成オブジェクトとビルダーを追加する必要があります。

注:Beam 2.34.0以降、Python SDKユーザーは、追加のJavaコードを記述せずにいくつかのJava変換を使用できます。詳細については、クロス言語Java変換の作成を参照してください。

構成オブジェクトは、変換に必要なフィールドを持つ単純なJavaオブジェクト(POJO)です。例として、JavaPrefixConfigurationがあります。

public class JavaPrefixConfiguration {

  String prefix;

  public void setPrefix(String prefix) {
    this.prefix = prefix;
  }
}

次に示すJavaPrefixBuilderのように実装されたビルダークラスは、ExternalTransformBuilderを実装し、構成オブジェクトを使用するbuildExternalをオーバーライドする必要があります。

public class JavaPrefixBuilder implements
    ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {

    @Override
    public PTransform<PCollection<String>, PCollection<String>> buildExternal(
        JavaPrefixConfiguration configuration) {
      return new JavaPrefix(configuration.prefix);
    }
}

また、拡張サービスにあなたの変換を登録するためのレジストラークラスを追加する必要があります。

@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {

  final String URN = "beam:transform:my.beam.test:javaprefix:v1";

  @Override
  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
    return ImmutableMap.of(URN,new JavaPrefixBuilder());
  }
}

JavaPrefixRegistrarで示すように、レジストラーはExternalTransformRegistrarを実装する必要があり、1つのメソッドknownBuilderInstancesがあります。これは、一意のURNをビルダーのインスタンスにマッピングするマップを返します。このクラスを拡張サービスに登録するには、AutoServiceアノテーションを使用できます。

拡張サービスの選択

マルチ言語パイプラインのジョブを構築する場合、Beamは拡張サービスを使用して複合変換を展開します。リモートSDKごとに少なくとも1つの拡張サービスが必要です。

ほとんどの場合、デフォルトのJava ExpansionServiceを使用できます。このサービスは、拡張サービスのポートを指定する単一の引数を取ります。その後、アドレスはPythonパイプラインによって提供されます。

マルチ言語パイプラインを実行する前に、Javaクロス言語変換をビルドし、拡張サービスを起動する必要があります。拡張サービスを起動する際には、クラスパスに依存関係を追加する必要があります。複数のJARを使用できますが、単一のシェーディングされたJARを作成する方が簡単な場合があります。PythonとJavaの両方の依存関係は、Python SDKによってランナーにステージングされます。

拡張サービスを実行する手順は、使用するビルドツールによって異なります。**java-prefix-bundled-0.1.jar**という名前のJARをビルドしたと仮定すると、拡張サービスが実行されるポートが12345の場合、次のコマンドでサービスを起動できます。

java -jar java-prefix-bundled-0.1.jar 12345

サンプル拡張サービスの実行方法については、このREADMEを参照してください。

Pythonパイプラインの作成

これで、PythonパイプラインはExternalTransform APIを使用してクロス言語変換を構成できます。addprefix.pyからの例を次に示します。

with beam.Pipeline(options=pipeline_options) as p:
  input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)

  java_output = (
      input
      | 'JavaPrefix' >> beam.ExternalTransform(
            'beam:transform:my.beam.test:javaprefix:v1',
            ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
            "localhost:12345"))

  def python_prefix(record):
    return 'python:%s' % record

  output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
  output | 'Write' >> WriteToText(output_path)

ExternalTransformは3つのパラメーターを取ります。

URNは変換の一意のBeam識別子であり、拡張サービスについては既に説明しました。PayloadBuilderは新しい概念であり、次に説明します。

注:URNが他の変換のURNと競合しないようにするには、クロス言語変換のURNの選択で説明されているURN規則に従ってください。

ペイロードビルダーの提供

上記のPythonパイプラインの例では、ExternalTransformの2番目の引数としてImplicitSchemaPayloadBuilderを提供しています。ImplicitSchemaPayloadBuilderは、提供された値からスキーマを生成するペイロードを構築します。この場合、提供された値は次のキーと値のペアに含まれています:{'prefix': 'java:'}JavaPrefix変換はprefix引数を期待しており、ペイロードビルダーは文字列java:を渡します。これは、各入力要素の前に追加されます。

ペイロードビルダーは、拡張リクエストで変換のペイロードを構築するのに役立ちます。ImplicitSchemaPayloadBuilderの代わりに、名前付きタプルスキーマに基づいてペイロードを構築するNamedTupleBasedPayloadBuilder、または型アノテーションに基づいてスキーマを構築するAnnotationBasedPayloadBuilderを使用できます。使用可能なペイロードビルダーの完全なリストについては、transforms.external APIリファレンスを参照してください。

標準の要素型の使用

マルチ言語境界では、すべてのBeam SDKが理解できる要素型を使用する必要があります。これらは、Beam標準コーダーによって表される型です。

任意の構造化型(たとえば、任意のJavaオブジェクト)には、ROWPCollection<Row>)を使用します。PCollection<Row>を生成する新しいJava複合変換を開発する必要がある場合があります。SDK固有のコーダーは、他のSDKによって消費されるPCollectionで使用されていない限り、複合クロス言語変換内で使用できます。

パイプラインの実行

Pythonパイプラインを実行する正確なコマンドは、環境によって異なります。パイプラインがaddprefix.pyという名前のファイルに記述されていると仮定すると、手順は下記と同様になります。詳細については、addprefix.pyのコメントを参照してください。

Direct Runnerを使用した実行

次のコマンドでは、input1はテキストの行を含むファイルです。

python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output

Dataflow Runnerを使用した実行

次のスクリプトは、Cloud Storageバケットのサンプルテキストを使用して、Dataflowで多言語パイプラインを実行します。スクリプトはご自身の環境に合わせて調整する必要があります。

#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

# other commands, e.g. changing into the appropriate directory

gsutil rm gs://$GCS_BUCKET/javaprefix/*

python addprefix.py \
    --runner DataflowRunner \
    --temp_location $TEMP_LOCATION \
    --project $GCP_PROJECT \
    --region $GCP_REGION \
    --job_name $JOB_NAME \
    --num_workers $NUM_WORKERS \
    --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
    --output "gs://$GCS_BUCKET/javaprefix/output"