Apache Samza Runnerの使用

Apache Samza Runnerは、Apache Samzaを使用してBeamパイプラインを実行するために使用できます。 Samza Runnerは、SamzaアプリケーションでBeamパイプラインを実行し、ローカルで実行できます。 アプリケーションはさらに.tgzファイルにビルドされ、YARNクラスターまたはZookeeperを使用したSamzaスタンドアロンクラスターにデプロイできます。

Samza RunnerとSamzaは大規模なステートフルストリーミングジョブに適しており、以下を提供します。

Beam機能マトリクスには、Samza Runnerの現在サポートされている機能が記載されています。

Samza Runnerの前提条件とセットアップ

Samza Runnerは、1.0より大きいバージョンのSamza上に構築されています。

依存関係の指定

pom.xmlに以下を追加することで、Samza Runnerへの依存関係を指定できます。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-samza</artifactId>
  <version>2.60.0</version>
  <scope>runtime</scope>
</dependency>

<!-- Samza dependencies -->
<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-api</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-core_2.11</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kafka_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv-rocksdb_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

Samza Runnerを使用したパイプラインの実行

パイプラインをローカルで実行するか、すべてのjarファイルとリソースファイルを使用してスタンドアロンクラスターにデプロイする場合、パッケージ化は必要ありません。 たとえば、次のコマンドはWordCountの例を実行します。

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Psamza-runner \
    -Dexec.args="--runner=SamzaRunner \
      --inputFile=/path/to/input \
      --output=/path/to/counts"

パイプラインをYARNクラスターにデプロイするには、サンプルのSamzaジョブをデプロイするための手順があります。 まず、アプリケーションjarとリソースファイルを.tgzアーカイブファイルにパッケージ化し、Yarnコンテナがダウンロードできるようにする必要があります。 設定では、このTGZファイルの場所のURIを指定する必要があります。

yarn.package.path=${your_job_tgz_URI}

job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}

設定の詳細については、Samza設定リファレンスを参照してください。

設定ファイルは、コマンドライン引数--configFilePath=/path/to/config.propertiesを設定することで渡されます。 これにより、YarnリソースマネージャーでBeamパイプラインのメインクラスを実行でき、Samza Runnerは内部でYarnジョブを送信します。

GithubのSamza Beamの例をご覧ください。

Samza Runnerのパイプラインオプション

Samza Runnerでパイプラインを実行する場合、次のパイプラインオプションを使用できます。

フィールド説明デフォルト値
runner使用するパイプラインランナー。 このオプションを使用すると、実行時にパイプラインランナーを決定できます。Samzaを使用して実行するには、SamzaRunnerに設定します。
configFilePathプロパティファイルを使用したSamzaの設定。empty、つまりローカル実行を使用します。
configFactory設定ファイルパスから設定ファイルを読み取るためのファクトリ。PropertiesConfigFactory、設定をプロパティファイルとして読み取ります。
configOverrideプログラムで設定する設定のオーバーライド。empty、つまり設定ファイルまたはローカル実行を使用します。
jobInstanceジョブのインスタンス名。1
samzaExecutionEnvironmentSamzaアプリケーションの実行環境。 詳細については、SamzaExecutionEnvironmentを参照してください。LOCAL
watermarkIntervalウォーターマークをチェックする間隔(ミリ秒)。1000
systemBufferSize特定のシステムのバッファリングするメッセージの最大数。5000
eventTimerBufferSizePTransformのメモリにバッファリングするイベント時間タイマーの最大数。5000
maxSourceParallelismデータソースに許可される最大並列度。1
storeBatchGetSize状態ストアのバッチ取得サイズ制限。10000
enableMetricsSamza RunnerでBeamメトリックを有効/無効にします。true
stateDurable状態が永続的であるための設定。false
maxBundleSizeバンドル内の要素の最大数。1(デフォルトでは自動バンドルは無効になっています)
maxBundleTimeMsバンドルをファイナライズする前に待機する最大時間(ミリ秒)。1000

ジョブの監視

BeamとSamzaの両方から出力されるメトリックを使用して、パイプラインジョブを監視できます。たとえば、elements_readbacklog_elementsなどのBeamソースメトリック、job-healthyprocess-envelopesなどのSamzaジョブメトリックなどです。 Samzaメトリックの完全なリストは、Samzaメトリックスリファレンスにあります。 開発中はJMXを介してジョブのメトリックを表示し、Graphiteなどのグラフシステムにメトリックを送信できます。 詳細については、Samzaメトリクスを参照してください。

実行中のSamza YARNジョブの場合、YARN Web UIを使用してジョブの状態を監視し、ログを確認できます。