Nexmark ベンチマークスイート
概要
Nexmarkは、Nexmark研究論文の「連続データストリーム」クエリに触発されたパイプラインスイートです。
これらは、オンラインオークションシステムを表す3つのエンティティモデルに対する複数のクエリです。
- Personは、オークションに出品する人、オークションに入札する人、またはその両方を表します。
- Auctionは、オークション中のアイテムを表します。
- Bidは、オークション中のアイテムへの入札を表します。
クエリ
これらのクエリは、Beamモデルの多くの側面を実行します。
- Query1またはCURRENCY_CONVERSION: 入札額をユーロで表示します。単純なマップを示しています。
- Query2またはSELECTION: 特定のオークション番号のオークションは何ですか?単純なフィルタを示しています。
- Query3またはLOCAL_ITEM_SUGGESTION: 特定の米国の州で誰が販売していますか?増分結合 (キーごとの状態とタイマーを使用) とフィルタを示しています。
- Query4またはAVERAGE_PRICE_FOR_CATEGORY: 各オークションカテゴリの平均販売価格は何ですか?複雑な結合 (カスタムウィンドウ関数を使用) と集計を示しています。
- Query5またはHOT_ITEMS: 最後の期間に最も入札されたオークションはどれですか?スライドウィンドウとコンバイナを示しています。
- Query6またはAVERAGE_SELLING_PRICE_BY_SELLER: 最後の10件の終了したオークションの出品者ごとの平均販売価格は何ですか?Query4と同じ「落札入札」コアを共有し、特殊なコンバイナを示しています。
- Query7またはHIGHEST_BID: 期間ごとの最高入札額は何ですか?ファンアウトを説明するために、意図的にサイド入力を使用して実装されています。
- Query8またはMONITOR_NEW_USERS: 最後の期間にシステムに参入してオークションを作成したのは誰ですか?単純な結合を示しています。
元のクエリに5つ追加しました。
- Query0またはPASSTHROUGH: パススルー。監視のオーバーヘッドを測定できます。
- Query9またはWINNING_BIDS: 落札入札。Query4とQuery6で共有される共通のサブクエリ。
- Query10またはLOG_TO_SHARDED_FILES: すべてのイベントをGCSファイルに記録します。起動時に大きな副作用のあるウィンドウを示しています。
- Query11またはUSER_SESSIONS: ユーザーはアクティブだった各セッションでいくつの入札を行いましたか?セッションウィンドウを示しています。
- Query12またはPROCESSING_TIME_WINDOWS: ユーザーは一定の処理時間制限内でいくつの入札を行いますか?他のすべてのクエリの非グローバルウィンドウでのイベント時間と比較して、グローバルウィンドウでの処理時間での作業を示しています。
- **BOUNDED_SIDE_INPUT_JOIN**: ストリームを境界のあるサイド入力に結合し、基本的なストリームエンリッチメントをモデル化します。
ベンチマークワークロードの設定
ベンチマークワークロードの調整項目を以下に示します(NexmarkConfiguration.javaを参照)。
これらの設定項目は、起動コマンドラインに渡すことができます。
イベント生成 (デフォルト)
- 生成されるイベント数:100,000
- ジェネレータスレッド数:100
- イベントレート:SIN曲線
- 初期イベントレート:10,000
- イベントレートステップ:10,000
- 同時オークション数:100
- 同時入札/オークション作成者数:1,000
ウィンドウ (デフォルト)
- サイズ:10秒
- スライド期間:5秒
- ウォーターマーク保持:0秒
イベント比率 (デフォルト)
- 人気オークション:½
- 人気入札者:¼
- 人気出品者:¼
技術情報
- 人工CPU負荷
- 人工IO負荷
Nexmark出力
(ローカル) Direct RunnerでSMOKEスイートを使用してストリーミングモードでNexmarkベンチマークを実行した出力例を次に示します。
Performance: Conf Runtime(sec) Events(/sec) Results 0000 5,5 18138,9 100000 0001 4,2 23657,4 92000 0002 2,2 45683,0 351 0003 3,9 25348,5 444 0004 1,6 6207,3 40 0005 5,0 20173,5 12 0006 0,9 11376,6 401 0007 121,4 823,5 1 0008 2,5 40273,9 6000 0009 0,9 10695,2 298 0010 4,0 25025,0 1 0011 4,4 22655,2 1919 0012 3,5 28208,7 1919
ベンチマーク起動設定
Nexmarkランチャーは、コマンドライン引数の管理にBeam PipelineOptionsを使用するプログラムと同様に、--runner
引数を受け入れます。これに加えて、必要な依存関係を設定する必要があります。
Gradle経由で実行する場合、次の2つのパラメータが実行を制御します。
-P nexmark.args
The command line to pass to the Nexmark main program.
-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
`settings.gradle.kts`.
テストデータは、オンデマンドで決定論的に合成されます。テストデータは、クエリ自体と同じパイプラインで合成されるか、Pub/SubまたはKafkaに公開される場合があります。
クエリ結果は次のようになります。
- Pub/SubまたはKafkaに公開されます。
- プレーンテキストとしてテキストファイルに書き込まれます。
- Avroエンコーディングを使用してテキストファイルに書き込まれます。
- BigQueryに送信されます。
- 破棄されます。
共通設定パラメータ
バッチまたはストリーミングを決定します。
--streaming=true
イベントジェネレータの数
--numEventGenerators=4
クエリは、名前または番号で実行できます(番号は下位互換性のためにまだ存在し、クエリ0〜12のみに番号が付いています)。
クエリNを実行します。
--query=N
PASSTHROUGHという名前のクエリを実行します。
--query=PASSTHROUGH
利用可能なスイート
実行するスイートは、この設定パラメータを使用して選択できます。
--suite=SUITE
利用可能なスイートは次のとおりです。
- DEFAULT: クエリ0でデフォルト設定をテストします。
- SMOKE: デフォルト設定ですべてのクエリを実行します。
- STRESS: SMOKEと同様ですが、100万イベントです。
- FULL_THROTTLE: SMOKEと同様ですが、1億イベントです。
Google Cloud Dataflowランナー固有の設定
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar
Direct Runner固有の設定
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
Flink Runner固有の設定
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores
Spark Runner固有の設定
--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
Kafkaソース/シンク設定パラメータ
Kafkaのホスト/ IPを設定します(例:「localhost:9092」)。
--bootstrapServers=<kafka host/ip>
結果をKafkaトピックに書き込みます。
--sinkType=KAFKA
ベンチマーク結果に使用されるトピック名を設定します。
--kafkaResultsTopic=<topic name>
イベントをKafkaトピックに書き込むか、Kafkaトピックから読み取るか、またはその両方を実行します。
--sourceType=KAFKA
ベンチマークイベントに使用されるトピック名を設定します。
--kafkaTopic=<topic name>
現在のステータス
これらの表には、さまざまなランナーでのクエリ実行のステータスが含まれています。Google Cloud Dataflowのステータスはまだありません。
バッチ / 合成 / ローカル
クエリ | Direct | Spark | Flink |
---|---|---|---|
0 | OK | OK | OK |
1 | OK | OK | OK |
2 | OK | OK | OK |
3 | OK | OK | OK |
4 | OK | OK | OK |
5 | OK | OK | OK |
6 | OK | OK | OK |
7 | OK | OK | OK |
8 | OK | OK | OK |
9 | OK | OK | OK |
10 | OK | OK | OK |
11 | OK | OK | OK |
12 | OK | OK | OK |
BOUNDED_SIDE_INPUT_JOIN | OK | OK | OK |
ストリーミング / 合成 / ローカル
クエリ | Direct | Spark 課題18416 | Flink |
---|---|---|---|
0 | OK | OK | OK |
1 | OK | OK | OK |
2 | OK | OK | OK |
3 | OK | 課題18074、BEAM-3961 | OK |
4 | OK | OK | OK |
5 | OK | OK | OK |
6 | OK | OK | OK |
7 | OK | BEAM-2112 | OK |
8 | OK | OK | OK |
9 | OK | OK | OK |
10 | OK | OK | OK |
11 | OK | OK | OK |
12 | OK | OK | OK |
BOUNDED_SIDE_INPUT_JOIN | OK | BEAM-2112 | OK |
バッチ / 合成 / クラスタ
今後予定
ストリーミング / 合成 / クラスタ
今後予定
Nexmarkの実行
DirectRunner (ローカル) でのSMOKEスイートの実行
DirectRunner はデフォルトなので、-Pnexmark.runner
を渡す必要はありません。ここでは、分かりやすくするために渡しています。
Direct Runner にはバッチモードとストリーミングモードの区別はありませんが、Nexmark の起動にはあります。
これらのパラメータは、DirectRunner の追加の安全チェックの多くを有効にしたままにするため、SMOKE スイートで Nexmark スイートに問題がないことを確認できます。
バッチモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=false
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
ストリーミングモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=true
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
SparkRunner (ローカル) でのSMOKEスイートの実行
SparkRunner は Nexmark Gradle の起動で特別な扱いを受けます。タスクは、SparkRunner がビルドされた Spark のバージョンを提供し、ロギングを設定します。
バッチモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true"
ストリーミングモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true"
FlinkRunner (ローカル) でのSMOKEスイートの実行
バッチモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
ストリーミングモード
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
Google Cloud DataflowでのSMOKEスイートの実行
以下のコマンドが有効になるように、最初にこれらを設定してください
PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>
起動
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:google-cloud-dataflow-java" \
-Pnexmark.args="
--runner=DataflowRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--project=${PROJECT}
--zone=${ZONE}
--workerMachineType=n1-highmem-8
--stagingLocation=${STAGING_LOCATION}
--sourceType=PUBSUB
--pubSubMode=PUBLISH_ONLY
--pubsubTopic=${PUBSUB_TOPIC}
--resourceNameMode=VERBATIM
--manageResources=false
--numEventGenerators=64
--numWorkers=16
--maxNumWorkers=16
--firstEventRate=100000
--nextEventRate=100000
--ratePeriodSec=3600
--isRateLimited=true
--avgPersonByteSize=500
--avgAuctionByteSize=500
--avgBidByteSize=500
--probDelayedEvent=0.000001
--occasionalDelaySec=3600
--numEvents=0
--useWallclockEventTime=true
--usePubsubPublishTime=true
--experiments=enable_custom_pubsub_sink"
Apache Hadoop YARNを使用したSparkクラスタでのクエリ0の実行
パッケージのビルド
./gradlew :sdks:java:testing:nexmark:assemble
クラスタへの送信
spark-submit \
--class org.apache.beam.sdk.nexmark.Main \
--master yarn-client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
--runner=SparkRunner \
--query=0 \
--streamTimeout=60 \
--streaming=false \
--manageResources=false \
--monitorJobs=true"
Nexmarkダッシュボード
以下のダッシュボードは、Beam コンポーネントの非回帰を検出するための CI メカニズムとして使用されます。ランナーやエンジンのベンチマーク比較を目的としたものではありません。特に以下の理由からです。
- ランナーのパラメータが同じではない
- Nexmark は、ランナーをローカル(ほとんどの場合埋め込み)モードで実行します
- Nexmark は、すべての CI とビルドも実行する共有マシン上で実行されます。
- ランナーは Beam モデルのサポートが異なります
- ランナーにはそれぞれ異なる強みがあり、比較が困難です
- バッチ指向に設計されたランナーもあれば、ストリーミング指向に設計されたランナーもあります
- 1 秒未満のレイテンシを目標に設計されたものもあれば、自動スケーリングをサポートするものもあります
ダッシュボードの内容
master へのコミットごとに、Nexmark スイートが実行され、グラフにプロットが作成されます。すべてのメトリクスダッシュボードは metrics.beam.apache.org でホストされています。
ダッシュボードには 2 種類あります
- パフォーマンス用(クエリのランタイム)
- 出力 PCollection のサイズ用(一定である必要があります)
これらのランナーのダッシュボードがあります(その他は今後追加されます)
- Spark
- Flink
- Direct Runner
- Dataflow
各ダッシュボードには以下が含まれています
- バッチモードのグラフ
- ストリーミングモードのグラフ
- すべてのクエリのグラフ