Apache Spark Runnerの使用

Apache Spark Runnerは、Apache Sparkを使用してBeamパイプラインを実行するために使用できます。 Spark Runnerは、ネイティブSparkアプリケーションと同様にSparkパイプラインを実行できます。ローカルモード用の自己完結型アプリケーションをデプロイしたり、SparkのスタンドアロンRMで実行したり、YARNまたはMesosを使用したりできます。

Spark Runnerは、Apache Spark上でBeamパイプラインを実行し、以下を提供します。

Beam機能マトリックスには、Spark Runnerの現在サポートされている機能が記載されています。

Sparkランナーの3つの種類

Spark Runnerには3つの種類があります

  1. Java(およびその他のJVMベースの言語)のみをサポートし、Spark RDD / DStreamに基づく*レガシーランナー*
  2. Java(およびその他のJVMベースの言語)のみをサポートし、SparkデータセットとApache Spark Structured Streamingフレームワークに基づく* Structured Streaming Spark Runner *。

**注:**これはまだ実験段階であり、Beamモデルのカバレッジは部分的です。現時点ではバッチモードのみをサポートしています。

  1. Java、Python、およびGoをサポートする*ポータブルランナー*

このガイドは、Spark Runnerの非ポータブル機能とポータブル機能を説明するために2つのパートに分かれています。以下のスイッチャーを使用して、適切なランナーを選択してください

どのランナーを使用するか:ポータブルランナーまたは非ポータブルランナー?

Beamとそのランナーは、当初はJVMベースの言語(例:Java / Scala / Kotlin)のみをサポートしていました。 PythonとGo SDKは後で追加されました。他の言語で記述されたパイプラインの実行をサポートするために、ランナーのアーキテクチャを大幅に変更する必要がありました。

アプリケーションでJavaのみを使用する場合は、現在、Javaベースのランナーのいずれかを使用する必要があります。 SparkでBeamを使用してPythonまたはGoパイプラインを実行する場合は、ポータブルランナーを使用する必要があります。移植性について詳しくは、移植性ページをご覧ください。

Sparkランナーの前提条件とセットアップ

Spark Runnerは現在、Sparkの3.2.xブランチをサポートしています。

**注:** Spark 2.4.xのサポートは、Beam 2.46.0で廃止されました。

pom.xmlに以下を追加することで、Spark Runnerの最新バージョンの依存関係を追加できます

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark-3</artifactId>
  <version>2.60.0</version>
</dependency>

アプリケーションを使用したSparkのデプロイ

ローカルモード/スタンドアロンで実行する場合など、場合によっては、(自己完結型の)アプリケーションがpom.xmlに次の依存関係を明示的に追加することによってSparkをパックする必要があります

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

そして、Mavenシェードプラグインを使用してアプリケーションjarをシェーディングします

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
        <transformers>
          <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

`mvn package`を実行した後、 `ls target`を実行すると、(artifactIdが `beam-examples`でバージョンが `1.0.0`であると仮定して)が表示されます

beam-examples-1.0.0-shaded.jar

スタンドアロンクラスタに対して実行するには、単に実行します


RDD / DStreamベースのランナーの場合

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner


Structured Streamingベースのランナーの場合

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner

実行環境にDockerがインストールされている必要があります。 PythonでApache Beamを開発するには、Apache Beam Python SDKをインストールする必要があります。 `pip install apache_beam`。 Pythonパイプラインの作成方法については、Pythonドキュメントを参照してください。

pip install apache_beam

Beam 2.20.0以降、事前に構築されたSpark Job Service DockerイメージはDocker Hubで入手できます。

古いバージョンのBeamの場合は、Apache Beamのソースコードのコピーが必要です。ダウンロードページからダウンロードできます。

  1. JobServiceエンドポイントを開始します
    • Dockerを使用(推奨): `docker run --net=host apache/beam_spark_job_server:latest`
    • またはBeamソースコードから: `./gradlew :runners:spark:3:job-server:runShadow`

JobServiceは、Beamパイプラインを送信する中央インスタンスです。 JobServiceは、パイプラインのSparkジョブを作成し、ジョブを実行します。 Sparkクラスタでジョブを実行するには、Beam JobServiceにSparkマスターアドレスを提供する必要があります。

  1. `PortableRunner`、 `job_endpoint`を `localhost:8099`(これはJobServiceのデフォルトアドレスです)、および `environment_type`を `LOOPBACK`に設定して、Pythonパイプラインを上記のエンドポイントに送信します。例えば

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

事前にデプロイされたSparkクラスタでの実行

Sparkデプロイメントがすでに存在するクラスタ(Sparkクラスはコンテナクラスパスで使用可能)にBeamパイプラインをデプロイする場合、追加の依存関係は必要ありません。さまざまなデプロイメントモードの詳細については、スタンドアロンYARN、またはMesosを参照してください。

  1. デフォルトでポート7077でマスターを公開するSparkクラスタを起動します。

  1. Sparkマスターに接続するJobServiceを開始します
    • Dockerを使用(推奨): `docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077`
    • またはBeamソースコードから: `./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077`

  1. 上記のようにパイプラインを送信します。ただし、 `environment_type = LOOPBACK`はローカルテストのみを対象としています。詳細はこちらをご覧ください。

(クラスタのセットアップによっては、 `environment_type`オプションを変更する必要がある場合があります。詳細はこちらをご覧ください。)

Dataprocクラスタ(YARNバックエンド)での実行

Python、Go、およびその他のサポートされている言語で記述されたBeamジョブを実行するには、BeamのSpark Runnerページ(移植性フレームワークロードマップも参照)で説明されているように、 `SparkRunner`と `PortableRunner`を使用できます。

次の例では、YarnがバックエンドになっているDataprocクラスタのマスターノードから、PythonでポータブルBeamジョブを実行します。

注:この例は、Dataproc 2.0、Spark 3.1.2、およびBeam 2.37.0で正常に実行されます。

  1. Dockerコンポーネントが有効になっているDataprocクラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DOCKER \
    --image-version=DATAPROC_IMAGE_VERSION \
    --region=REGION \
    --enable-component-gateway \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --properties spark:spark.master.rest.enabled=true
  1. Cloud Storageバケットを作成します。
gsutil mb BUCKET_NAME
  1. ローカル環境にジョブに必要なPythonライブラリをインストールします。
python -m pip install apache-beam[gcp]==BEAM_VERSION
  1. ワードカウントのサンプルパイプラインと、パイプラインを実行するために必要なすべての依存関係、アーティファクトなどを、後で実行できるjarにバンドルします。
python -m apache_beam.examples.wordcount \
    --runner=SparkRunner \
    --output_executable_path=OUTPUT_JAR_PATH \
    --output=gs://BUCKET_NAME/python-wordcount-out \
    --spark_version=3
  1. Dataprocクラスタのマスターノードにsparkジョブを送信します。
gcloud dataproc jobs submit spark \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --class=org.apache.beam.runners.spark.SparkPipelineRunner \
        --jars=OUTPUT_JAR_PATH
  1. 結果がバケットに書き込まれたことを確認します。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID

Sparkランナーのパイプラインオプション

Spark Runnerでパイプラインを実行する場合は、次のパイプラインオプションを検討する必要があります。


RDD / DStreamベースのランナーの場合

フィールド説明デフォルト値
runner使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。Sparkを使用して実行するには、 `SparkRunner`に設定します。
sparkMasterSpark MasterのURL。これは `SparkConf#setMaster(String)`の設定と同等であり、xコアでローカルに実行するには `local [x]`、Sparkスタンドアロンクラスタに接続するには `spark:// host:port`、Mesosクラスタに接続するには `mesos:// host:port`、yarnクラスタに接続するには `yarn`のいずれかになります。local[4]
storageLevelバッチパイプラインでRDDをキャッシュするときに使用する `StorageLevel`。 Spark Runnerは、繰り返し評価されるRDDを自動的にキャッシュします。 Beamのストリーミングパイプラインはステートフルであるため、これはバッチのみのプロパティです。これには、Spark DStreamの `StorageLevel`が `MEMORY_ONLY`である必要があります。MEMORY_ONLY
batchIntervalMillis`StreamingContext`の `batchDuration`-Sparkのバッチ間隔を設定します。1000
enableSparkMetricSinksSparkのメトリックシンクへのメトリックのレポートを有効にします。true
cacheDisabledパイプライン全体で再利用されるPCollectionのキャッシュを無効にします。 RDDを保存するよりも再計算する方が速い場合に便利です。false


Structured Streamingベースのランナーの場合

フィールド説明デフォルト値
runner使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。Spark Structured Streamingを使用して実行するには、 `SparkStructuredStreamingRunner`に設定します。
sparkMasterSpark MasterのURL。これは `SparkConf#setMaster(String)`の設定と同等であり、xコアでローカルに実行するには `local [x]`、Sparkスタンドアロンクラスタに接続するには `spark:// host:port`、Mesosクラスタに接続するには `mesos:// host:port`、yarnクラスタに接続するには `yarn`のいずれかになります。local[4]
testMode便利なデバッグ情報(触媒実行プランとBeam DAGの印刷)を提供するテストモードを有効にしますfalse
enableSparkMetricSinksSparkのメトリックシンクへのメトリックのレポートを有効にします。true
checkpointDirストリーミングの回復力のためのチェックポイントディレクトリ。バッチでは無視されます。耐久性のために、HDFS / S3 / GSなどの信頼性の高いファイルシステムが必要です。/ tmpのローカルディレクトリ
filesToStageすべてのワーカーに送信してクラスパスに配置するJarファイル。クラスパスからのすべてのファイル
EnableSparkMetricSinksアグリゲーター値をSparkのメトリックシンクに送信するかどうかを有効/無効にしますtrue
フィールド説明
--runner使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。Sparkを使用して実行するには、PortableRunnerに設定します。
--job_endpoint使用するジョブサービスエンドポイント。ホスト名:ポートの形式(例:localhost:3000)である必要があります。ジョブサービスエンドポイント(デフォルトはlocalhost:8099)に合わせて設定します。

追加の注意事項

spark-submitの使用

Sparkアプリケーションをクラスタに送信する場合、Sparkインストールに付属のspark-submitスクリプトを使用するのが一般的です(推奨されます)。上記のPipelineOptionsは、spark-submitを置き換えるものではなく、補完するものです。上記のオプションは、application-argumentsの1つとして渡すことができ、–masterの設定が優先されます。 spark-submitの一般的な使用方法の詳細については、Sparkのドキュメントをご覧ください。

ジョブの監視

実行中のSparkジョブは、SparkのWebインターフェースを使用して監視できます。デフォルトでは、ドライバノードのポート4040で利用できます。ローカルマシンでSparkを実行する場合、これはhttp://localhost:4040になります。Sparkには、事後確認するための履歴サーバーもあります。

メトリクスは、REST APIからも入手できます。 Sparkは、Sparkメトリクスをさまざまなシンクに報告できるメトリクスシステムを提供しています。 Sparkランナーは、この同じメトリクスシステムを使用してユーザー定義のBeamアグリゲーターを報告し、現在GraphiteSinkCSVSinkをサポートしています。 Sparkでサポートされている追加のシンクのサポートを提供することは、簡単かつ直接的です。

Sparkメトリクスは、ポータブルランナーではまだサポートされていません。

ストリーミング実行


RDD / DStreamベースのランナーの場合
パイプラインでUnboundedSourceを使用する場合、Spark Runnerは自動的にストリーミングモードを設定します。ストリーミングモードの強制は、主にテストに使用され、推奨されません。

Structured Streamingベースのランナーの場合
ストリーミングモードは、Spark Structured Streamingランナーではまだ実装されていません。

ストリーミングは、Sparkポータブルランナーではまだサポートされていません。

提供されたSparkContextとStreamingListenersの使用


RDD / DStreamベースのランナーの場合
spark-jobserverを使用する場合など、提供されたSparkContextを使用してSparkジョブを実行する場合、またはStreamingListenersを使用する場合は、SparkPipelineOptionsを使用できません(コンテキストまたはリスナーはコマンドライン引数として渡すことができません)。代わりに、プログラムでのみ使用できるSparkContextOptionsを使用する必要があります。これは、一般的なPipelineOptions実装ではありません。

Structured Streamingベースのランナーの場合
提供されたSparkSessionとStreamingListenersは、Spark Structured Streamingランナーではサポートされていません。

提供されたSparkContextとStreamingListenersは、Sparkポータブルランナーではサポートされていません。

Kubernetes

ジョブサーバーなしでBeamジョブを送信する

追加のジョブサーバーを起動せずに、Spark Kubernetesクラスターで直接ビームジョブを送信するには、次の手順を実行します。

spark-submit --master MASTER_URL \
  --conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
  --class org.apache.beam.runners.spark.SparkPipelineRunner \
  --conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
  ./wc_job.jar

Dataprocでビームジョブを実行する場合と同様に、以下のようにジョブjarをバンドルできます。この例では、SDK harnessPROCESSタイプを使用して、プロセスによってジョブを実行します。

python -m beam_example_wc \
    --runner=SparkRunner \
    --output_executable_path=./wc_job.jar \
    --environment_type=PROCESS \
    --environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
    --spark_version=3

以下は、Kubernetesエグゼキューターポッドテンプレートの例です。initContainerは、ビームパイプラインを実行するためにビームSDKハーネスをダウンロードするために必要です。

spec:
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
      - name: beam-data
        mountPath: /opt/apache/beam/
  initContainers:
  - name: init-beam
    image: apache/beam_python3.7_sdk
    command:
    - cp
    - /opt/apache/beam/boot
    - /init-container/data/boot
    volumeMounts:
    - name: beam-data
      mountPath: /init-container/data
  volumes:
  - name: beam-data
    emptyDir: {}

ジョブサーバーを使用してBeamジョブを送信する

ジョブサーバーを使用してApacheビームジョブを実行するようにSparkを構成する