Nexmark ベンチマークスイート

概要

Nexmarkは、Nexmark研究論文の「連続データストリーム」クエリに触発されたパイプラインスイートです。

これらは、オンラインオークションシステムを表す3つのエンティティモデルに対する複数のクエリです。

クエリ

これらのクエリは、Beamモデルの多くの側面を実行します。

元のクエリに5つ追加しました。

ベンチマークワークロードの設定

ベンチマークワークロードの調整項目を以下に示します(NexmarkConfiguration.javaを参照)。

これらの設定項目は、起動コマンドラインに渡すことができます。

イベント生成 (デフォルト)

ウィンドウ (デフォルト)

イベント比率 (デフォルト)

技術情報

Nexmark出力

(ローカル) Direct RunnerでSMOKEスイートを使用してストリーミングモードでNexmarkベンチマークを実行した出力例を次に示します。

Performance:
  Conf       Runtime(sec)         Events(/sec)         Results
  0000                5,5              18138,9          100000
  0001                4,2              23657,4           92000
  0002                2,2              45683,0             351
  0003                3,9              25348,5             444
  0004                1,6               6207,3              40
  0005                5,0              20173,5              12
  0006                0,9              11376,6             401
  0007              121,4                823,5               1
  0008                2,5              40273,9            6000
  0009                0,9              10695,2             298
  0010                4,0              25025,0               1
  0011                4,4              22655,2            1919
  0012                3,5              28208,7            1919

ベンチマーク起動設定

Nexmarkランチャーは、コマンドライン引数の管理にBeam PipelineOptionsを使用するプログラムと同様に、--runner引数を受け入れます。これに加えて、必要な依存関係を設定する必要があります。

Gradle経由で実行する場合、次の2つのパラメータが実行を制御します。

-P nexmark.args
    The command line to pass to the Nexmark main program.

-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
    `settings.gradle.kts`.

テストデータは、オンデマンドで決定論的に合成されます。テストデータは、クエリ自体と同じパイプラインで合成されるか、Pub/SubまたはKafkaに公開される場合があります。

クエリ結果は次のようになります。

共通設定パラメータ

バッチまたはストリーミングを決定します。

--streaming=true

イベントジェネレータの数

--numEventGenerators=4

クエリは、名前または番号で実行できます(番号は下位互換性のためにまだ存在し、クエリ0〜12のみに番号が付いています)。

クエリNを実行します。

--query=N

PASSTHROUGHという名前のクエリを実行します。

--query=PASSTHROUGH

利用可能なスイート

実行するスイートは、この設定パラメータを使用して選択できます。

--suite=SUITE

利用可能なスイートは次のとおりです。

Google Cloud Dataflowランナー固有の設定

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar

Direct Runner固有の設定

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores

Spark Runner固有の設定

--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true

Kafkaソース/シンク設定パラメータ

Kafkaのホスト/ IPを設定します(例:「localhost:9092」)。

--bootstrapServers=<kafka host/ip>

結果をKafkaトピックに書き込みます。

--sinkType=KAFKA

ベンチマーク結果に使用されるトピック名を設定します。

--kafkaResultsTopic=<topic name>

イベントをKafkaトピックに書き込むか、Kafkaトピックから読み取るか、またはその両方を実行します。

--sourceType=KAFKA

ベンチマークイベントに使用されるトピック名を設定します。

--kafkaTopic=<topic name>

現在のステータス

これらの表には、さまざまなランナーでのクエリ実行のステータスが含まれています。Google Cloud Dataflowのステータスはまだありません。

バッチ / 合成 / ローカル

クエリDirectSparkFlink
0OKOKOK
1OKOKOK
2OKOKOK
3OKOKOK
4OKOKOK
5OKOKOK
6OKOKOK
7OKOKOK
8OKOKOK
9OKOKOK
10OKOKOK
11OKOKOK
12OKOKOK
BOUNDED_SIDE_INPUT_JOINOKOKOK

ストリーミング / 合成 / ローカル

クエリDirectSpark 課題18416Flink
0OKOKOK
1OKOKOK
2OKOKOK
3OK課題18074BEAM-3961OK
4OKOKOK
5OKOKOK
6OKOKOK
7OKBEAM-2112OK
8OKOKOK
9OKOKOK
10OKOKOK
11OKOKOK
12OKOKOK
BOUNDED_SIDE_INPUT_JOINOKBEAM-2112OK

バッチ / 合成 / クラスタ

今後予定

ストリーミング / 合成 / クラスタ

今後予定

Nexmarkの実行

DirectRunner (ローカル) でのSMOKEスイートの実行

DirectRunner はデフォルトなので、-Pnexmark.runner を渡す必要はありません。ここでは、分かりやすくするために渡しています。

Direct Runner にはバッチモードとストリーミングモードの区別はありませんが、Nexmark の起動にはあります。

これらのパラメータは、DirectRunner の追加の安全チェックの多くを有効にしたままにするため、SMOKE スイートで Nexmark スイートに問題がないことを確認できます。

バッチモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=false
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

ストリーミングモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=true
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

SparkRunner (ローカル) でのSMOKEスイートの実行

SparkRunner は Nexmark Gradle の起動で特別な扱いを受けます。タスクは、SparkRunner がビルドされた Spark のバージョンを提供し、ロギングを設定します。

バッチモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true"

ストリーミングモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true"

FlinkRunner (ローカル) でのSMOKEスイートの実行

バッチモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

ストリーミングモード

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

Google Cloud DataflowでのSMOKEスイートの実行

以下のコマンドが有効になるように、最初にこれらを設定してください

PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>

起動

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:google-cloud-dataflow-java" \
    -Pnexmark.args="
        --runner=DataflowRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --project=${PROJECT}
        --zone=${ZONE}
        --workerMachineType=n1-highmem-8
        --stagingLocation=${STAGING_LOCATION}
        --sourceType=PUBSUB
        --pubSubMode=PUBLISH_ONLY
        --pubsubTopic=${PUBSUB_TOPIC}
        --resourceNameMode=VERBATIM
        --manageResources=false
        --numEventGenerators=64
        --numWorkers=16
        --maxNumWorkers=16
        --firstEventRate=100000
        --nextEventRate=100000
        --ratePeriodSec=3600
        --isRateLimited=true
        --avgPersonByteSize=500
        --avgAuctionByteSize=500
        --avgBidByteSize=500
        --probDelayedEvent=0.000001
        --occasionalDelaySec=3600
        --numEvents=0
        --useWallclockEventTime=true
        --usePubsubPublishTime=true
        --experiments=enable_custom_pubsub_sink"

Apache Hadoop YARNを使用したSparkクラスタでのクエリ0の実行

パッケージのビルド

./gradlew :sdks:java:testing:nexmark:assemble

クラスタへの送信

spark-submit \
    --class org.apache.beam.sdk.nexmark.Main \
    --master yarn-client \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
        --runner=SparkRunner \
        --query=0 \
        --streamTimeout=60 \
        --streaming=false \
        --manageResources=false \
        --monitorJobs=true"

Nexmarkダッシュボード

以下のダッシュボードは、Beam コンポーネントの非回帰を検出するための CI メカニズムとして使用されます。ランナーやエンジンのベンチマーク比較を目的としたものではありません。特に以下の理由からです。

ダッシュボードの内容

master へのコミットごとに、Nexmark スイートが実行され、グラフにプロットが作成されます。すべてのメトリクスダッシュボードは metrics.beam.apache.org でホストされています。

ダッシュボードには 2 種類あります

これらのランナーのダッシュボードがあります(その他は今後追加されます)

各ダッシュボードには以下が含まれています