Apache Samza Runnerの使用
Apache Samza Runnerは、Apache Samzaを使用してBeamパイプラインを実行するために使用できます。 Samza Runnerは、SamzaアプリケーションでBeamパイプラインを実行し、ローカルで実行できます。 アプリケーションはさらに.tgzファイルにビルドされ、YARNクラスターまたはZookeeperを使用したSamzaスタンドアロンクラスターにデプロイできます。
Samza RunnerとSamzaは大規模なステートフルストリーミングジョブに適しており、以下を提供します。
- ローカル状態のファーストクラスサポート(RocksDBストアを使用)。 これにより、高頻度のストリーミングジョブで高速な状態アクセスが可能になります。
- フルスナップショットではなく、状態の増分チェックポイントをサポートするフォールトトレランス。 これにより、Samzaは非常に大きな状態のアプリケーションにスケールできます。
- リモートコールを効率化する完全に非同期の処理エンジン。
- Zookeeperを使用して任意のホスティング環境でアプリケーションを実行するための柔軟なデプロイメントモデル。
- ダウンタイムを最小限に抑えて非常に大規模なデプロイメントをサポートするカナリア、アップグレード、ロールバックなどの機能。
Beam機能マトリクスには、Samza Runnerの現在サポートされている機能が記載されています。
Samza Runnerの前提条件とセットアップ
Samza Runnerは、1.0より大きいバージョンのSamza上に構築されています。
依存関係の指定
pom.xml
に以下を追加することで、Samza Runnerへの依存関係を指定できます。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>2.60.0</version>
<scope>runtime</scope>
</dependency>
<!-- Samza dependencies -->
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
Samza Runnerを使用したパイプラインの実行
パイプラインをローカルで実行するか、すべてのjarファイルとリソースファイルを使用してスタンドアロンクラスターにデプロイする場合、パッケージ化は必要ありません。 たとえば、次のコマンドはWordCountの例を実行します。
$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Psamza-runner \
-Dexec.args="--runner=SamzaRunner \
--inputFile=/path/to/input \
--output=/path/to/counts"
パイプラインをYARNクラスターにデプロイするには、サンプルのSamzaジョブをデプロイするための手順があります。 まず、アプリケーションjarとリソースファイルを.tgz
アーカイブファイルにパッケージ化し、Yarnコンテナがダウンロードできるようにする必要があります。 設定では、このTGZファイルの場所のURIを指定する必要があります。
yarn.package.path=${your_job_tgz_URI}
job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}
設定の詳細については、Samza設定リファレンスを参照してください。
設定ファイルは、コマンドライン引数--configFilePath=/path/to/config.properties
を設定することで渡されます。 これにより、YarnリソースマネージャーでBeamパイプラインのメインクラスを実行でき、Samza Runnerは内部でYarnジョブを送信します。
GithubのSamza Beamの例をご覧ください。
Samza Runnerのパイプラインオプション
Samza Runnerでパイプラインを実行する場合、次のパイプラインオプションを使用できます。
フィールド | 説明 | デフォルト値 |
---|---|---|
runner | 使用するパイプラインランナー。 このオプションを使用すると、実行時にパイプラインランナーを決定できます。 | Samzaを使用して実行するには、SamzaRunner に設定します。 |
configFilePath | プロパティファイルを使用したSamzaの設定。 | empty 、つまりローカル実行を使用します。 |
configFactory | 設定ファイルパスから設定ファイルを読み取るためのファクトリ。 | PropertiesConfigFactory 、設定をプロパティファイルとして読み取ります。 |
configOverride | プログラムで設定する設定のオーバーライド。 | empty 、つまり設定ファイルまたはローカル実行を使用します。 |
jobInstance | ジョブのインスタンス名。 | 1 |
samzaExecutionEnvironment | Samzaアプリケーションの実行環境。 詳細については、SamzaExecutionEnvironment を参照してください。 | LOCAL |
watermarkInterval | ウォーターマークをチェックする間隔(ミリ秒)。 | 1000 |
systemBufferSize | 特定のシステムのバッファリングするメッセージの最大数。 | 5000 |
eventTimerBufferSize | PTransformのメモリにバッファリングするイベント時間タイマーの最大数。 | 5000 |
maxSourceParallelism | データソースに許可される最大並列度。 | 1 |
storeBatchGetSize | 状態ストアのバッチ取得サイズ制限。 | 10000 |
enableMetrics | Samza RunnerでBeamメトリックを有効/無効にします。 | true |
stateDurable | 状態が永続的であるための設定。 | false |
maxBundleSize | バンドル内の要素の最大数。 | 1 (デフォルトでは自動バンドルは無効になっています) |
maxBundleTimeMs | バンドルをファイナライズする前に待機する最大時間(ミリ秒)。 | 1000 |
ジョブの監視
BeamとSamzaの両方から出力されるメトリックを使用して、パイプラインジョブを監視できます。たとえば、elements_read
やbacklog_elements
などのBeamソースメトリック、job-healthy
やprocess-envelopes
などのSamzaジョブメトリックなどです。 Samzaメトリックの完全なリストは、Samzaメトリックスリファレンスにあります。 開発中はJMXを介してジョブのメトリックを表示し、Graphiteなどのグラフシステムにメトリックを送信できます。 詳細については、Samzaメトリクスを参照してください。
実行中のSamza YARNジョブの場合、YARN Web UIを使用してジョブの状態を監視し、ログを確認できます。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか? 変更したいことはありますか? 教えてください!