Apache Spark Runnerの使用
Apache Spark Runnerは、Apache Sparkを使用してBeamパイプラインを実行するために使用できます。 Spark Runnerは、ネイティブSparkアプリケーションと同様にSparkパイプラインを実行できます。ローカルモード用の自己完結型アプリケーションをデプロイしたり、SparkのスタンドアロンRMで実行したり、YARNまたはMesosを使用したりできます。
Spark Runnerは、Apache Spark上でBeamパイプラインを実行し、以下を提供します。
- バッチおよびストリーミング(および組み合わせ)パイプライン。
- RDDおよびDStreamによって提供されるのと同じフォールトトレランス保証。
- Sparkが提供するのと同じセキュリティ機能。
- Beamアグリゲーターも報告するSparkのメトリックシステムを使用した組み込みメトリックレポート。
- Sparkのブロードキャスト変数を介したBeamサイド入力のネイティブサポート。
Beam機能マトリックスには、Spark Runnerの現在サポートされている機能が記載されています。
Sparkランナーの3つの種類
Spark Runnerには3つの種類があります
- Java(およびその他のJVMベースの言語)のみをサポートし、Spark RDD / DStreamに基づく*レガシーランナー*
- Java(およびその他のJVMベースの言語)のみをサポートし、SparkデータセットとApache Spark Structured Streamingフレームワークに基づく* Structured Streaming Spark Runner *。
**注:**これはまだ実験段階であり、Beamモデルのカバレッジは部分的です。現時点ではバッチモードのみをサポートしています。
- Java、Python、およびGoをサポートする*ポータブルランナー*
このガイドは、Spark Runnerの非ポータブル機能とポータブル機能を説明するために2つのパートに分かれています。以下のスイッチャーを使用して、適切なランナーを選択してください
どのランナーを使用するか:ポータブルランナーまたは非ポータブルランナー?
Beamとそのランナーは、当初はJVMベースの言語(例:Java / Scala / Kotlin)のみをサポートしていました。 PythonとGo SDKは後で追加されました。他の言語で記述されたパイプラインの実行をサポートするために、ランナーのアーキテクチャを大幅に変更する必要がありました。
アプリケーションでJavaのみを使用する場合は、現在、Javaベースのランナーのいずれかを使用する必要があります。 SparkでBeamを使用してPythonまたはGoパイプラインを実行する場合は、ポータブルランナーを使用する必要があります。移植性について詳しくは、移植性ページをご覧ください。
- 非ポータブル(Java)
- ポータブル(Java / Python / Go)
Sparkランナーの前提条件とセットアップ
Spark Runnerは現在、Sparkの3.2.xブランチをサポートしています。
**注:** Spark 2.4.xのサポートは、Beam 2.46.0で廃止されました。
pom.xmlに以下を追加することで、Spark Runnerの最新バージョンの依存関係を追加できます
アプリケーションを使用したSparkのデプロイ
ローカルモード/スタンドアロンで実行する場合など、場合によっては、(自己完結型の)アプリケーションがpom.xmlに次の依存関係を明示的に追加することによってSparkをパックする必要があります
そして、Mavenシェードプラグインを使用してアプリケーションjarをシェーディングします
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
`mvn package`を実行した後、 `ls target`を実行すると、(artifactIdが `beam-examples`でバージョンが `1.0.0`であると仮定して)が表示されます
スタンドアロンクラスタに対して実行するには、単に実行します
RDD / DStreamベースのランナーの場合
Structured Streamingベースのランナーの場合
実行環境にDockerがインストールされている必要があります。 PythonでApache Beamを開発するには、Apache Beam Python SDKをインストールする必要があります。 `pip install apache_beam`。 Pythonパイプラインの作成方法については、Pythonドキュメントを参照してください。
Beam 2.20.0以降、事前に構築されたSpark Job Service DockerイメージはDocker Hubで入手できます。
古いバージョンのBeamの場合は、Apache Beamのソースコードのコピーが必要です。ダウンロードページからダウンロードできます。
- JobServiceエンドポイントを開始します
- Dockerを使用(推奨): `docker run --net=host apache/beam_spark_job_server:latest`
- またはBeamソースコードから: `./gradlew :runners:spark:3:job-server:runShadow`
JobServiceは、Beamパイプラインを送信する中央インスタンスです。 JobServiceは、パイプラインのSparkジョブを作成し、ジョブを実行します。 Sparkクラスタでジョブを実行するには、Beam JobServiceにSparkマスターアドレスを提供する必要があります。
- `PortableRunner`、 `job_endpoint`を `localhost:8099`(これはJobServiceのデフォルトアドレスです)、および `environment_type`を `LOOPBACK`に設定して、Pythonパイプラインを上記のエンドポイントに送信します。例えば
事前にデプロイされたSparkクラスタでの実行
Sparkデプロイメントがすでに存在するクラスタ(Sparkクラスはコンテナクラスパスで使用可能)にBeamパイプラインをデプロイする場合、追加の依存関係は必要ありません。さまざまなデプロイメントモードの詳細については、スタンドアロン、YARN、またはMesosを参照してください。
- デフォルトでポート7077でマスターを公開するSparkクラスタを起動します。
- Sparkマスターに接続するJobServiceを開始します
- Dockerを使用(推奨): `docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077`
- またはBeamソースコードから: `./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077`
- 上記のようにパイプラインを送信します。ただし、 `environment_type = LOOPBACK`はローカルテストのみを対象としています。詳細はこちらをご覧ください。
(クラスタのセットアップによっては、 `environment_type`オプションを変更する必要がある場合があります。詳細はこちらをご覧ください。)
Dataprocクラスタ(YARNバックエンド)での実行
Python、Go、およびその他のサポートされている言語で記述されたBeamジョブを実行するには、BeamのSpark Runnerページ(移植性フレームワークロードマップも参照)で説明されているように、 `SparkRunner`と `PortableRunner`を使用できます。
次の例では、YarnがバックエンドになっているDataprocクラスタのマスターノードから、PythonでポータブルBeamジョブを実行します。
注:この例は、Dataproc 2.0、Spark 3.1.2、およびBeam 2.37.0で正常に実行されます。
- Dockerコンポーネントが有効になっているDataprocクラスタを作成します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --properties spark:spark.master.rest.enabled=true
- `--optional-components`:Docker。
- `--image-version`:クラスタのイメージバージョン。これは、クラスタにインストールされているSparkバージョンを決定します(たとえば、最新および以前の4つの2.0.xイメージリリースバージョンにリストされているApache Sparkコンポーネントバージョンを参照)。
- `--region`:サポートされているDataproc リージョン。
- `--enable-component-gateway`:Webインターフェースへのアクセスを有効にします。
- `--scopes`:同じプロジェクト内のGCPサービスへのAPIアクセスを有効にします。
- `--properties`:一部のコンポーネントの特定の構成を追加します。ここでは、spark.master.restが有効になっており、クラスタへのジョブ送信に使用できます。
- Cloud Storageバケットを作成します。
gsutil mb BUCKET_NAME
- ローカル環境にジョブに必要なPythonライブラリをインストールします。
python -m pip install apache-beam[gcp]==BEAM_VERSION
- ワードカウントのサンプルパイプラインと、パイプラインを実行するために必要なすべての依存関係、アーティファクトなどを、後で実行できるjarにバンドルします。
python -m apache_beam.examples.wordcount \ --runner=SparkRunner \ --output_executable_path=OUTPUT_JAR_PATH \ --output=gs://BUCKET_NAME/python-wordcount-out \ --spark_version=3
- `--runner`(必須): `SparkRunner`。
- `--output_executable_path`(必須):作成されるバンドルjarのパス。
- `--output`(必須):出力が書き込まれる場所。
- `--spark_version`(オプション):sparkバージョン3(デフォルト)または2(非推奨!)を選択します。
- Dataprocクラスタのマスターノードにsparkジョブを送信します。
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=org.apache.beam.runners.spark.SparkPipelineRunner \ --jars=OUTPUT_JAR_PATH
- `--cluster`:作成されたDataprocクラスタの名前。
- `--region`:サポートされているDataproc リージョン。
- `--class`:アプリケーションのエントリポイント。
- `--jars`:アプリケーションとすべての依存関係を含むバンドルされたjarへのパス。
- 結果がバケットに書き込まれたことを確認します。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Sparkランナーのパイプラインオプション
Spark Runnerでパイプラインを実行する場合は、次のパイプラインオプションを検討する必要があります。
RDD / DStreamベースのランナーの場合
フィールド | 説明 | デフォルト値 |
---|---|---|
runner | 使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。 | Sparkを使用して実行するには、 `SparkRunner`に設定します。 |
sparkMaster | Spark MasterのURL。これは `SparkConf#setMaster(String)`の設定と同等であり、xコアでローカルに実行するには `local [x]`、Sparkスタンドアロンクラスタに接続するには `spark:// host:port`、Mesosクラスタに接続するには `mesos:// host:port`、yarnクラスタに接続するには `yarn`のいずれかになります。 | local[4] |
storageLevel | バッチパイプラインでRDDをキャッシュするときに使用する `StorageLevel`。 Spark Runnerは、繰り返し評価されるRDDを自動的にキャッシュします。 Beamのストリーミングパイプラインはステートフルであるため、これはバッチのみのプロパティです。これには、Spark DStreamの `StorageLevel`が `MEMORY_ONLY`である必要があります。 | MEMORY_ONLY |
batchIntervalMillis | `StreamingContext`の `batchDuration`-Sparkのバッチ間隔を設定します。 | 1000 |
enableSparkMetricSinks | Sparkのメトリックシンクへのメトリックのレポートを有効にします。 | true |
cacheDisabled | パイプライン全体で再利用されるPCollectionのキャッシュを無効にします。 RDDを保存するよりも再計算する方が速い場合に便利です。 | false |
Structured Streamingベースのランナーの場合
フィールド | 説明 | デフォルト値 |
---|---|---|
runner | 使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。 | Spark Structured Streamingを使用して実行するには、 `SparkStructuredStreamingRunner`に設定します。 |
sparkMaster | Spark MasterのURL。これは `SparkConf#setMaster(String)`の設定と同等であり、xコアでローカルに実行するには `local [x]`、Sparkスタンドアロンクラスタに接続するには `spark:// host:port`、Mesosクラスタに接続するには `mesos:// host:port`、yarnクラスタに接続するには `yarn`のいずれかになります。 | local[4] |
testMode | 便利なデバッグ情報(触媒実行プランとBeam DAGの印刷)を提供するテストモードを有効にします | false |
enableSparkMetricSinks | Sparkのメトリックシンクへのメトリックのレポートを有効にします。 | true |
checkpointDir | ストリーミングの回復力のためのチェックポイントディレクトリ。バッチでは無視されます。耐久性のために、HDFS / S3 / GSなどの信頼性の高いファイルシステムが必要です。 | / tmpのローカルディレクトリ |
filesToStage | すべてのワーカーに送信してクラスパスに配置するJarファイル。 | クラスパスからのすべてのファイル |
EnableSparkMetricSinks | アグリゲーター値をSparkのメトリックシンクに送信するかどうかを有効/無効にします | true |
フィールド | 説明 | 値 |
---|---|---|
--runner | 使用するパイプラインランナー。このオプションを使用すると、実行時にパイプラインランナーを決定できます。 | Sparkを使用して実行するには、PortableRunner に設定します。 |
--job_endpoint | 使用するジョブサービスエンドポイント。ホスト名:ポートの形式(例:localhost:3000)である必要があります。 | ジョブサービスエンドポイント(デフォルトはlocalhost:8099)に合わせて設定します。 |
追加の注意事項
spark-submitの使用
Sparkアプリケーションをクラスタに送信する場合、Sparkインストールに付属のspark-submit
スクリプトを使用するのが一般的です(推奨されます)。上記のPipelineOptions
は、spark-submit
を置き換えるものではなく、補完するものです。上記のオプションは、application-arguments
の1つとして渡すことができ、–master
の設定が優先されます。 spark-submit
の一般的な使用方法の詳細については、Sparkのドキュメントをご覧ください。
ジョブの監視
実行中のSparkジョブは、SparkのWebインターフェースを使用して監視できます。デフォルトでは、ドライバノードのポート4040
で利用できます。ローカルマシンでSparkを実行する場合、これはhttp://localhost:4040
になります。Sparkには、事後確認するための履歴サーバーもあります。
メトリクスは、REST APIからも入手できます。 Sparkは、Sparkメトリクスをさまざまなシンクに報告できるメトリクスシステムを提供しています。 Sparkランナーは、この同じメトリクスシステムを使用してユーザー定義のBeamアグリゲーターを報告し、現在GraphiteSinkとCSVSinkをサポートしています。 Sparkでサポートされている追加のシンクのサポートを提供することは、簡単かつ直接的です。
Sparkメトリクスは、ポータブルランナーではまだサポートされていません。
ストリーミング実行
RDD / DStreamベースのランナーの場合
パイプラインでUnboundedSource
を使用する場合、Spark Runnerは自動的にストリーミングモードを設定します。ストリーミングモードの強制は、主にテストに使用され、推奨されません。
Structured Streamingベースのランナーの場合
ストリーミングモードは、Spark Structured Streamingランナーではまだ実装されていません。
ストリーミングは、Sparkポータブルランナーではまだサポートされていません。
提供されたSparkContextとStreamingListenersの使用
RDD / DStreamベースのランナーの場合
spark-jobserverを使用する場合など、提供されたSparkContext
を使用してSparkジョブを実行する場合、またはStreamingListeners
を使用する場合は、SparkPipelineOptions
を使用できません(コンテキストまたはリスナーはコマンドライン引数として渡すことができません)。代わりに、プログラムでのみ使用できるSparkContextOptions
を使用する必要があります。これは、一般的なPipelineOptions
実装ではありません。
Structured Streamingベースのランナーの場合
提供されたSparkSessionとStreamingListenersは、Spark Structured Streamingランナーではサポートされていません。
提供されたSparkContextとStreamingListenersは、Sparkポータブルランナーではサポートされていません。
Kubernetes
ジョブサーバーなしでBeamジョブを送信する
追加のジョブサーバーを起動せずに、Spark Kubernetesクラスターで直接ビームジョブを送信するには、次の手順を実行します。
spark-submit --master MASTER_URL \
--conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
--class org.apache.beam.runners.spark.SparkPipelineRunner \
--conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
./wc_job.jar
Dataprocでビームジョブを実行する場合と同様に、以下のようにジョブjarをバンドルできます。この例では、SDK harnessのPROCESS
タイプを使用して、プロセスによってジョブを実行します。
python -m beam_example_wc \
--runner=SparkRunner \
--output_executable_path=./wc_job.jar \
--environment_type=PROCESS \
--environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
--spark_version=3
以下は、Kubernetesエグゼキューターポッドテンプレートの例です。initContainer
は、ビームパイプラインを実行するためにビームSDKハーネスをダウンロードするために必要です。
spec:
containers:
- name: spark-kubernetes-executor
volumeMounts:
- name: beam-data
mountPath: /opt/apache/beam/
initContainers:
- name: init-beam
image: apache/beam_python3.7_sdk
command:
- cp
- /opt/apache/beam/boot
- /init-container/data/boot
volumeMounts:
- name: beam-data
mountPath: /init-container/data
volumes:
- name: beam-data
emptyDir: {}
ジョブサーバーを使用してBeamジョブを送信する
ジョブサーバーを使用してApacheビームジョブを実行するようにSparkを構成する例。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!