Apache BeamでのI/O変換のテスト

Apache Beam I/O変換をテストするための例とデザインパターン

注:このガイドはまだ作成中です。ガイドを完成させるためのオープンな課題があります:BEAM-1025

はじめに

このドキュメントでは、Beamコミュニティが過去のI/O変換の作成経験に基づいて推奨するテストのセットについて説明します。I/O変換をBeamコミュニティに貢献したい場合は、これらのテストを実装する必要があります。

ユニットテストと統合テストを作成するのは標準的ですが、多くの可能な定義があります。私たちの定義は次のとおりです

パフォーマンスベンチマークに関する注意

パフォーマンスベンチマーク専用の個別のテストを作成することはお勧めしません。代わりに、さまざまなテストシナリオに対応するために必要なパラメータを受け入れることができる統合テストを設定することをお勧めします。

たとえば、以下のガイドラインに従って統合テストを作成すると、統合テストは異なるランナー(ローカルまたはクラスタ構成)で実行でき、小さなデータセットを持つ小さなインスタンスであるデータストア、または大規模なデータセットを持つ大規模な本番対応クラスタに対して実行できます。これにより、さまざまなシナリオをカバーできます。その1つがパフォーマンスベンチマークです。

テストバランス - ユニット vs インテグレーション

統合テストで大量のコードをカバーするのは簡単ですが、テストの失敗の原因を見つけるのが難しくなり、テストが不安定になります。

ただし、複数のワーカーが複数のノードを持つデータストアインスタンスに対して読み取り/書き込みを行うことで見つかる貴重なバグのセットがあります(例:読み取りレプリカなど)。これらのシナリオはユニットテストでは見つけにくく、I/O変換でバグを引き起こすことがよくあります。

私たちのテスト戦略は、これら2つの矛盾するニーズのバランスです。ユニットテストで可能な限り多くのテストを行い、さまざまな構成で実行できる単一の小さな統合テストを作成することをお勧めします。

Java

Python

ユニットテスト

目標

非目標

ユニットテストの実装

すべての変換のユニットテストを作成するための一般的なガイドは、PTransformスタイルガイドにあります。以下に、いくつかの重要なポイントを詳しく説明しました。

Source APIを使用している場合は、コードを徹底的にユニットテストしてください。軽微な実装エラーにより、ユーザーが検出するのが難しいデータの破損やデータ損失(レコードのスキップや重複など)が発生する可能性があります。また、SourceTestUtilssource_test_utilsを使用することも検討してください。これは、Source実装をテストするための重要な要素です。

Source APIを使用していない場合は、PAssertassert_thatTestPipelineを使用してテストを支援できます。

書き込みを実装している場合は、TestPipelineを使用してテストデータを書き込み、Beamクライアントではないクライアントを使用して読み取りと検証を行うことができます。

フェイクの使用

ユニットテストでモックを使用する代わりに(各テストの各呼び出しに対して正確な応答を事前にプログラミングする)、フェイクを使用してください。I/O変換テストにフェイクを使用する推奨方法は、テスト対象のサービスの既存のインメモリ/埋め込み可能なバージョンを使用することですが、存在しない場合は、独自の実装を検討してください。フェイクは、「必要なテスト条件を取得できる」と「正確なモック関数呼び出しを何百万も記述する必要がない」という点で適切な組み合わせであることが証明されています。

ネットワーク障害

テストと関心の分離を容易にするために、ネットワークを介してやり取りするコードは、I/O変換とは別のクラスで処理する必要があります。推奨される設計パターンは、I/O変換が読み取りまたは書き込みが不可能になったと判断したら、例外をスローすることです。

これにより、I/O変換のユニットテストは、完璧なネットワーク接続を持っているかのように動作でき、ネットワーク接続の問題を再試行/その他の方法で処理する必要がなくなります。

バッチ処理

I/O変換が読み取り/書き込みのバッチ処理を許可する場合は、テストでバッチ処理を強制的に発生させる必要があります。I/O変換で構成可能なバッチサイズオプションを使用すると、これを簡単に実現できます。これらはテスト専用としてマークする必要があります。

I/O変換の統合テスト

現在、Python I/O統合テストや、境界のないデータストアまたは最終的に整合性のあるデータストアの統合テストの例はありません。これらの分野での貢献を歓迎します。詳細については、Beam dev@メーリングリストにお問い合わせください。

目標

統合テスト、データストア、およびKubernetes

現実世界の条件でI/O変換をテストするには、データストアインスタンスに接続する必要があります。

Beamコミュニティは、統合テストに使用されるデータストアをKubernetesでホストしています。統合テストをBeamの継続的インテグレーション環境で実行するには、データストアのインスタンスをセットアップするKubernetesスクリプトが必要です。

ただし、ローカルで作業する場合、Kubernetesを使用する必要はありません。すべてのテストインフラストラクチャで接続情報を渡すことができるため、開発者はローカル開発にお好みのホスティングインフラストラクチャを使用できます。

自分のマシンでの統合テストの実行

IO統合テストはいつでも自分のマシンで実行できます。統合テストを実行するための大まかな手順は次のとおりです。

  1. 実行されるテストに対応するデータストアをセットアップします。
  2. テストを実行し、作成したばかりのデータストアから接続情報を渡します。
  3. データストアをクリーンアップします。

データストアのセットアップ/クリーンアップ

Kubernetesスクリプトを使用してデータストアをホストする場合は、kubectlを使用してローカルでクラスターに接続できることを確認してください。独自のデータストアが既にセットアップされている場合は、以下のリストからステップ3を実行するだけで済みます。

  1. 実行するテストに対応するデータストアをセットアップします。現在サポートされているすべてのデータストアのKubernetesスクリプトは、.test-infra/kubernetesにあります。
    1. 場合によっては、専用のセットアップスクリプト(*.sh)があります。それ以外の場合は、kubectl create -f [scriptname]を実行してデータストアを作成できます。kubernetes.shスクリプトにいくつかの標準的な手順を実行させることもできます。
    2. 慣例により、次のものが存在します。
      1. データストア自体とNodePortサービスのymlスクリプト。NodePortサービスは、同じサブネットワーク内からKubernetesクラスターのマシンに接続するすべての人のために、データストアへのポートを開きます。このようなスクリプトは、Minikube Kubernetes Engineでスクリプトを実行する場合に特に役立ちます。
      2. LoadBalancerサービスを備えた別のスクリプト。このようなサービスは、データストアの外部IPを公開します。このようなスクリプトは、外部アクセスが必要な場合(例:Jenkins上)に必要です。
      1. JDBCの場合は、Postgresをセットアップできます。kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
      2. Elasticsearchの場合は、セットアップスクリプトを実行できます。bash .test-infra/kubernetes/elasticsearch/setup.sh
  2. サービスIPアドレスを特定します
    1. NodePortサービス: kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
    2. LoadBalancerサービス: kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
  3. integrationTest gradleタスクとテストクラスの指示(例:JdbcIOIT.javaの指示を参照)を使用してテストを実行します。
  4. Kubernetesスクリプトで指定されたリソースを削除するようにKubernetesに指示します。
    1. JDBC: kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
    2. Elasticsearch: bash .test-infra/kubernetes/elasticsearch/teardown.sh

特定のテストの実行

integrationTestは、IO統合テストを実行するための専用のgradleタスクです。

Cloud Dataflowランナーでの使用例

./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT

HDFSファイルシステムおよびDirectランナーでの使用例

注:以下のセットアップは、/etc/hostsファイルにhadoopネームノードとhadoopデータノードの外部IPのエントリが含まれている場合にのみ機能します。詳細については、Small Cluster構成ファイルおよびLarge Cluster構成ファイルの説明を参照してください。

export HADOOP_USER_NAME=root

./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT

パラメータの説明

オプション機能
-p sdks/java/io/file-based-io-tests/テストするI/Oのプロジェクトサブモジュールを指定します。
-DintegrationTestPipelineOptions実行中のテストにパイプラインオプションを直接渡します。
-DintegrationTestRunnerテストの実行に使用するランナー。現在可能なオプションは、direct、dataflowです。
-Dfilesystem(該当する場合、オプション)テストを実行するために使用するファイルシステム。現在可能なオプションは、gcs、hdfs、s3です。指定しない場合は、ローカルファイルシステムが使用されます。
--tests実行するテスト(クラス/テストメソッドへの完全修飾参照)を指定します。

プルリクエストでの統合テストの実行

ほとんどのIO統合テストには、メトリックを収集し、回帰を回避するために定期的に実行される専用のJenkinsジョブがあります。ghprbプラグインのおかげで、特定のフレーズがGithubプルリクエストのコメントに入力されると、これらのジョブをオンデマンドでトリガーすることもできます。このようにして、特定のIOへの貢献が改善であるか、状況を悪化させているか(そうでないことを願っています!)を確認できます。

IO統合テストを実行するには、プルリクエストに次のコメントを入力します。

テストフレーズ
JdbcIOITJava JdbcIOパフォーマンステストを実行
MongoDBIOITJava MongoDBIOパフォーマンステストを実行
HadoopFormatIOITJava HadoopFormatIOパフォーマンステストを実行
TextIO - ローカルファイルシステムJava TextIOパフォーマンステストを実行
TextIO - HDFSJava TextIOパフォーマンステストHDFSを実行
圧縮されたTextIO - ローカルファイルシステムJava CompressedTextIOパフォーマンステストを実行
圧縮されたTextIO - HDFSJava CompressedTextIOパフォーマンステストHDFSを実行
AvroIO - ローカルファイルシステムJava AvroIOパフォーマンステストを実行
AvroIO - HDFSJava AvroIOパフォーマンステストHDFSを実行
TFRecordIO - ローカルファイルシステムJava TFRecordIOパフォーマンステストを実行
ParquetIO - ローカルファイルシステムJava ParquetIOパフォーマンステストを実行
XmlIO - ローカルファイルシステムJava XmlIOパフォーマンステストを実行
XmlIO - HDFSHDFSでJava XmlIOパフォーマンステストを実行

すべてのジョブ定義は、.test-infra/jenkinsにあります。プルリクエストで新しいJenkinsジョブ定義を変更/追加した場合は、統合テストを実行する前にシードジョブを実行してください(コメント:「シードジョブを実行」)。

パフォーマンステストダッシュボード

前述したように、定期的に実行されるJenkinsジョブからテスト実行時間を収集することにより、IOITのパフォーマンスを測定します。その結果、データベース(BigQuery)に保存されるため、プロットの形式で表示できます。

すべての結果を収集するダッシュボードは、こちらで入手できます。パフォーマンステストダッシュボード

統合テストの実装

統合テストを実装するために必要な3つのコンポーネントがあります

これら2つの要素については、以下で詳しく説明します。

テストコード

これらは、統合テストコードで使用される規則です

これらの原則のエンドツーエンドの例は、JdbcIOITにあります。

Kubernetesスクリプト

統合テスト、データストア、Kubernetesで説明したように、テストをBeamの継続的インテグレーションサーバーで実行するには、データストアのインスタンスを作成するKubernetesスクリプトを実装する必要があります。

これについてサポートが必要な場合や、その他の質問がある場合は、Beam dev@メーリングリストに連絡してください。コミュニティが支援できる場合があります。

BeamデータストアKubernetesスクリプトを作成するためのガイドライン

  1. 2つのKubernetesスクリプトを定義する必要があります。
    • これは、項目#1を実装する最良の方法です。
    • 最初のスクリプトには、メインのデータストアインスタンススクリプト(StatefulSet)と、データストアを公開するNodePortサービスが含まれます。これは、Beam Jenkins継続的インテグレーションサーバーによって実行されるスクリプトになります。
    • 2番目のスクリプトでは、Kubernetesクラスターが別のネットワーク上にある場合に、データストアに外部IPアドレスを公開するために使用される追加のLoadBalancerサービスを定義します。このファイルの名前には通常、'-for-local-dev'という接尾辞が付きます。
  2. ポッドがクラッシュ後に再作成されるようにする必要があります。
    • podを直接使用すると、ポッドがクラッシュした場合や、クラスターがポッドのコンテナを移動させるようなことがあった場合に、ポッドは再作成されません。
    • ほとんどの場合、StatefulSetを使用することをお勧めします。これは、再起動後も持続する永続ディスクをサポートしており、特定の永続ディスクを使用するポッドに関連付けられた安定したネットワーク識別子を持っているためです。DeploymentReplicaSetも有用な可能性がありますが、これらの機能がないため、シナリオは少なくなります。
  3. データストアの小規模インスタンスと大規模インスタンス用に個別のスクリプトを作成する必要があります。
    • これは、小規模および大規模統合テストで説明されているように、統合テストで利用できる小規模および大規模なデータストアの両方を持つための最良の方法と思われます。
  4. 信頼できるソースのDockerイメージを使用し、Dockerイメージのバージョンを固定する必要があります。
    • 次の順序でイメージを選択する必要があります。
      1. データソース/シンクの作成者によって提供されるイメージ(公式に保守している場合)。Apacheプロジェクトの場合、これは公式のApacheリポジトリになります。
      2. セキュリティ修正と保証されたメンテナンスがあるため、公式のDockerイメージ。
      3. 優れたメンテナーがいる非公式のDockerイメージ、または他のプロバイダーからのイメージ(例:quay.io)。

Jenkinsジョブ

.test-infra/jenkinsディレクトリに、既存のIOIT Jenkinsジョブ定義の例があります。job_PerformanceTest_*.groovyというファイルを探してください。最も顕著な例は次のとおりです。

重要なステップを忘れたり、コードを繰り返したりすることなく、ジョブを簡単に作成するのに役立つユーティリティクラスがあることに注意してください。詳細については、Kubernetes.groovyを参照してください。

小規模および大規模な統合テスト

Apache Beamは、複数の構成で統合テストを実行できることを期待しています。

これを行うには、次の手順に従います。

  1. 2つのKubernetesスクリプトを作成します。1つはデータストアの小規模インスタンス用、もう1つは大規模インスタンス用です。
  2. テストで、小規模または大規模のテストデータを生成するかどうかを決定するパイプラインオプションを使用します(小規模と大規模は、データストアに適したサイズです)。

この例として、HadoopFormatIOのテストがあります。