Apache BeamでのI/O変換のテスト
Apache Beam I/O変換をテストするための例とデザインパターン
- Java SDK
- Python SDK
注:このガイドはまだ作成中です。ガイドを完成させるためのオープンな課題があります:BEAM-1025。
はじめに
このドキュメントでは、Beamコミュニティが過去のI/O変換の作成経験に基づいて推奨するテストのセットについて説明します。I/O変換をBeamコミュニティに貢献したい場合は、これらのテストを実装する必要があります。
ユニットテストと統合テストを作成するのは標準的ですが、多くの可能な定義があります。私たちの定義は次のとおりです
- ユニットテスト
- 目標:変換の正確性のみを検証すること - コアな動作、コーナーケースなど。
- 使用するデータストア:データストアのインメモリバージョン(利用可能な場合)、それ以外の場合は、フェイクを作成する必要があります。
- データセットサイズ:非常に小さい(10行から100行程度)。
- 統合テスト
- 目標:ランナー/データストアの実際のバージョンと対話する際に発生する問題をキャッチすること
- 使用するデータストア:テスト前に事前構成された実際のインスタンス
- データセットサイズ:小規模から中規模(1000行から10GB程度)。
パフォーマンスベンチマークに関する注意
パフォーマンスベンチマーク専用の個別のテストを作成することはお勧めしません。代わりに、さまざまなテストシナリオに対応するために必要なパラメータを受け入れることができる統合テストを設定することをお勧めします。
たとえば、以下のガイドラインに従って統合テストを作成すると、統合テストは異なるランナー(ローカルまたはクラスタ構成)で実行でき、小さなデータセットを持つ小さなインスタンスであるデータストア、または大規模なデータセットを持つ大規模な本番対応クラスタに対して実行できます。これにより、さまざまなシナリオをカバーできます。その1つがパフォーマンスベンチマークです。
テストバランス - ユニット vs インテグレーション
統合テストで大量のコードをカバーするのは簡単ですが、テストの失敗の原因を見つけるのが難しくなり、テストが不安定になります。
ただし、複数のワーカーが複数のノードを持つデータストアインスタンスに対して読み取り/書き込みを行うことで見つかる貴重なバグのセットがあります(例:読み取りレプリカなど)。これらのシナリオはユニットテストでは見つけにくく、I/O変換でバグを引き起こすことがよくあります。
私たちのテスト戦略は、これら2つの矛盾するニーズのバランスです。ユニットテストで可能な限り多くのテストを行い、さまざまな構成で実行できる単一の小さな統合テストを作成することをお勧めします。
例
Java
- BigtableIOのテスト実装は、
Source
のユニットテストに関する現在のベストプラクティスの最良の例と見なされています - JdbcIOには、統合テストを作成するための現在のベストプラクティスの例があります。
- ElasticsearchIOは、バウンドされた読み取り/書き込みのテストを示しています
- MqttIOとAmpqpIOは、アンバウンドされた読み取り/書き込みを示しています
Python
- avroio_testは、液体シャーディング、
source_test_utils
、assert_that
、およびequal_to
のテストの例です
ユニットテスト
目標
- I/O変換のコードの正確性を検証します。
- I/O変換が、接続先のデータストアのリファレンス実装(「リファレンス実装」とは、フェイクまたはインメモリバージョンを意味します)と連携して正しく動作することを検証します。
- 高速に実行でき、1台のマシンのみが必要で、メモリ/ディスクフットプリントが比較的小さく、非ローカルネットワークアクセスがない(できればまったくない)必要があります。数秒以内に実行されるテストを目指してください。20秒を超える場合は、beam devメーリングリストで議論する必要があります。
- I/O変換がネットワーク障害を処理できることを検証します。
非目標
- 外部データストアの問題をテストします。これにより、非常に複雑なテストになる可能性があります。
ユニットテストの実装
すべての変換のユニットテストを作成するための一般的なガイドは、PTransformスタイルガイドにあります。以下に、いくつかの重要なポイントを詳しく説明しました。
Source
APIを使用している場合は、コードを徹底的にユニットテストしてください。軽微な実装エラーにより、ユーザーが検出するのが難しいデータの破損やデータ損失(レコードのスキップや重複など)が発生する可能性があります。また、SourceTestUtils
source_test_utils
を使用することも検討してください。これは、Source
実装をテストするための重要な要素です。
Source
APIを使用していない場合は、PAssert
assert_that
でTestPipeline
を使用してテストを支援できます。
書き込みを実装している場合は、TestPipeline
を使用してテストデータを書き込み、Beamクライアントではないクライアントを使用して読み取りと検証を行うことができます。
フェイクの使用
ユニットテストでモックを使用する代わりに(各テストの各呼び出しに対して正確な応答を事前にプログラミングする)、フェイクを使用してください。I/O変換テストにフェイクを使用する推奨方法は、テスト対象のサービスの既存のインメモリ/埋め込み可能なバージョンを使用することですが、存在しない場合は、独自の実装を検討してください。フェイクは、「必要なテスト条件を取得できる」と「正確なモック関数呼び出しを何百万も記述する必要がない」という点で適切な組み合わせであることが証明されています。
ネットワーク障害
テストと関心の分離を容易にするために、ネットワークを介してやり取りするコードは、I/O変換とは別のクラスで処理する必要があります。推奨される設計パターンは、I/O変換が読み取りまたは書き込みが不可能になったと判断したら、例外をスローすることです。
これにより、I/O変換のユニットテストは、完璧なネットワーク接続を持っているかのように動作でき、ネットワーク接続の問題を再試行/その他の方法で処理する必要がなくなります。
バッチ処理
I/O変換が読み取り/書き込みのバッチ処理を許可する場合は、テストでバッチ処理を強制的に発生させる必要があります。I/O変換で構成可能なバッチサイズオプションを使用すると、これを簡単に実現できます。これらはテスト専用としてマークする必要があります。
I/O変換の統合テスト
現在、Python I/O統合テストや、境界のないデータストアまたは最終的に整合性のあるデータストアの統合テストの例はありません。これらの分野での貢献を歓迎します。詳細については、Beam dev@メーリングリストにお問い合わせください。
目標
- データストア、I/O変換、ランナー間の相互作用のエンドツーエンドテストを可能にし、現実世界の条件をシミュレートします。
- 小規模および大規模の両方のテストを可能にします。
- 自己完結型:テストが変更できるデータストアの存在を除き、可能な限り最小限の初期セットアップまたは既存の外部状態を必要とします。
- 誰でも、Beamが継続的インテグレーションサーバーで実行するのと同じI/O変換統合テストを実行できます。
統合テスト、データストア、およびKubernetes
現実世界の条件でI/O変換をテストするには、データストアインスタンスに接続する必要があります。
Beamコミュニティは、統合テストに使用されるデータストアをKubernetesでホストしています。統合テストをBeamの継続的インテグレーション環境で実行するには、データストアのインスタンスをセットアップするKubernetesスクリプトが必要です。
ただし、ローカルで作業する場合、Kubernetesを使用する必要はありません。すべてのテストインフラストラクチャで接続情報を渡すことができるため、開発者はローカル開発にお好みのホスティングインフラストラクチャを使用できます。
自分のマシンでの統合テストの実行
IO統合テストはいつでも自分のマシンで実行できます。統合テストを実行するための大まかな手順は次のとおりです。
- 実行されるテストに対応するデータストアをセットアップします。
- テストを実行し、作成したばかりのデータストアから接続情報を渡します。
- データストアをクリーンアップします。
データストアのセットアップ/クリーンアップ
Kubernetesスクリプトを使用してデータストアをホストする場合は、kubectlを使用してローカルでクラスターに接続できることを確認してください。独自のデータストアが既にセットアップされている場合は、以下のリストからステップ3を実行するだけで済みます。
- 実行するテストに対応するデータストアをセットアップします。現在サポートされているすべてのデータストアのKubernetesスクリプトは、.test-infra/kubernetesにあります。
- 場合によっては、専用のセットアップスクリプト(*.sh)があります。それ以外の場合は、
kubectl create -f [scriptname]
を実行してデータストアを作成できます。kubernetes.shスクリプトにいくつかの標準的な手順を実行させることもできます。 - 慣例により、次のものが存在します。
- データストア自体と
NodePort
サービスのymlスクリプト。NodePort
サービスは、同じサブネットワーク内からKubernetesクラスターのマシンに接続するすべての人のために、データストアへのポートを開きます。このようなスクリプトは、Minikube Kubernetes Engineでスクリプトを実行する場合に特に役立ちます。 - LoadBalancerサービスを備えた別のスクリプト。このようなサービスは、データストアの外部IPを公開します。このようなスクリプトは、外部アクセスが必要な場合(例:Jenkins上)に必要です。
- データストア自体と
- 例
- JDBCの場合は、Postgresをセットアップできます。
kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
- Elasticsearchの場合は、セットアップスクリプトを実行できます。
bash .test-infra/kubernetes/elasticsearch/setup.sh
- JDBCの場合は、Postgresをセットアップできます。
- 場合によっては、専用のセットアップスクリプト(*.sh)があります。それ以外の場合は、
- サービスIPアドレスを特定します
- NodePortサービス:
kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
- LoadBalancerサービス:
kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
- NodePortサービス:
integrationTest
gradleタスクとテストクラスの指示(例:JdbcIOIT.javaの指示を参照)を使用してテストを実行します。- Kubernetesスクリプトで指定されたリソースを削除するようにKubernetesに指示します。
- JDBC:
kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
- Elasticsearch:
bash .test-infra/kubernetes/elasticsearch/teardown.sh
- JDBC:
特定のテストの実行
integrationTest
は、IO統合テストを実行するための専用のgradleタスクです。
Cloud Dataflowランナーでの使用例
./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
HDFSファイルシステムおよびDirectランナーでの使用例
注:以下のセットアップは、/etc/hostsファイルにhadoopネームノードとhadoopデータノードの外部IPのエントリが含まれている場合にのみ機能します。詳細については、Small Cluster構成ファイルおよびLarge Cluster構成ファイルの説明を参照してください。
export HADOOP_USER_NAME=root
./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT
パラメータの説明
オプション | 機能 |
-p sdks/java/io/file-based-io-tests/ | テストするI/Oのプロジェクトサブモジュールを指定します。 |
-DintegrationTestPipelineOptions | 実行中のテストにパイプラインオプションを直接渡します。 |
-DintegrationTestRunner | テストの実行に使用するランナー。現在可能なオプションは、direct、dataflowです。 |
-Dfilesystem | (該当する場合、オプション)テストを実行するために使用するファイルシステム。現在可能なオプションは、gcs、hdfs、s3です。指定しない場合は、ローカルファイルシステムが使用されます。 |
--tests | 実行するテスト(クラス/テストメソッドへの完全修飾参照)を指定します。 |
プルリクエストでの統合テストの実行
ほとんどのIO統合テストには、メトリックを収集し、回帰を回避するために定期的に実行される専用のJenkinsジョブがあります。ghprbプラグインのおかげで、特定のフレーズがGithubプルリクエストのコメントに入力されると、これらのジョブをオンデマンドでトリガーすることもできます。このようにして、特定のIOへの貢献が改善であるか、状況を悪化させているか(そうでないことを願っています!)を確認できます。
IO統合テストを実行するには、プルリクエストに次のコメントを入力します。
テスト | フレーズ |
JdbcIOIT | Java JdbcIOパフォーマンステストを実行 |
MongoDBIOIT | Java MongoDBIOパフォーマンステストを実行 |
HadoopFormatIOIT | Java HadoopFormatIOパフォーマンステストを実行 |
TextIO - ローカルファイルシステム | Java TextIOパフォーマンステストを実行 |
TextIO - HDFS | Java TextIOパフォーマンステストHDFSを実行 |
圧縮されたTextIO - ローカルファイルシステム | Java CompressedTextIOパフォーマンステストを実行 |
圧縮されたTextIO - HDFS | Java CompressedTextIOパフォーマンステストHDFSを実行 |
AvroIO - ローカルファイルシステム | Java AvroIOパフォーマンステストを実行 |
AvroIO - HDFS | Java AvroIOパフォーマンステストHDFSを実行 |
TFRecordIO - ローカルファイルシステム | Java TFRecordIOパフォーマンステストを実行 |
ParquetIO - ローカルファイルシステム | Java ParquetIOパフォーマンステストを実行 |
XmlIO - ローカルファイルシステム | Java XmlIOパフォーマンステストを実行 |
XmlIO - HDFS | HDFSでJava XmlIOパフォーマンステストを実行 |
すべてのジョブ定義は、.test-infra/jenkinsにあります。プルリクエストで新しいJenkinsジョブ定義を変更/追加した場合は、統合テストを実行する前にシードジョブを実行してください(コメント:「シードジョブを実行」)。
パフォーマンステストダッシュボード
前述したように、定期的に実行されるJenkinsジョブからテスト実行時間を収集することにより、IOITのパフォーマンスを測定します。その結果、データベース(BigQuery)に保存されるため、プロットの形式で表示できます。
すべての結果を収集するダッシュボードは、こちらで入手できます。パフォーマンステストダッシュボード
統合テストの実装
統合テストを実装するために必要な3つのコンポーネントがあります
- テストコード:実際のテストを行うコード。I/O変換とのやり取り、データの読み取りと書き込み、データの検証を行います。
- Kubernetesスクリプト:テストコードで使用されるデータストアをセットアップするKubernetesスクリプト。
- Jenkinsジョブ:データソースのセットアップ、テストの実行、テスト後のクリーンアップに必要なすべての手順を実行するJenkins Job DSLスクリプト。
これら2つの要素については、以下で詳しく説明します。
テストコード
これらは、統合テストコードで使用される規則です
- テストでは、パイプラインオプションを使用して接続情報を受け取る必要があります。
- Javaの場合、io/commonディレクトリに共有パイプラインオプションオブジェクトがあります。これは、同じデータストア(例:
Elasticsearch
およびHadoopFormatIO
テスト)に2つのテストがある場合、それらのテストは同じパイプラインオプションを共有することを意味します。
- Javaの場合、io/commonディレクトリに共有パイプラインオプションオブジェクトがあります。これは、同じデータストア(例:
- テストデータをプログラムで生成し、テストに使用するデータ量をパラメータ化します。
- Javaの場合、
CountingInput
+TestRow
を組み合わせて、任意のスケーリングで決定論的なテストデータを生成できます。
- Javaの場合、
- テストには、書き込み、読み取りのスタイルを使用します。
- 1つの
Test
で、パイプラインを実行してI/O変換を使用して書き込みを行い、別のパイプラインを実行してI/O変換を使用して読み取りを行います。 - データの検証は、読み取りの結果のみである必要があります。データベースに書き込まれたデータを他の方法で検証しないでください。
- 効率的な方法ですべての行の実際のコンテンツを検証します。これを行う簡単な方法は、行のハッシュを取得して結合することです。
HashingFn
はこれを簡単にするのに役立ち、TestRow
には事前に計算されたハッシュがあります。 - デバッグを容易にするために、
PAssert
のcontainsInAnyOrder
を使用して、すべての行のサブセットの内容を検証します。
- 1つの
- テストは、同じデータベースインスタンスで複数回および/または同時に実行される可能性があると想定する必要があります。
- テストデータをクリーンアップします。これを
@AfterClass
で実行して、確実に実行されるようにします。 - 実行ごとに(タイムスタンプはこれを行う簡単な方法です)、および必要に応じてメソッドごとに一意のテーブル名を使用します。
- テストデータをクリーンアップします。これを
これらの原則のエンドツーエンドの例は、JdbcIOITにあります。
Kubernetesスクリプト
統合テスト、データストア、Kubernetesで説明したように、テストをBeamの継続的インテグレーションサーバーで実行するには、データストアのインスタンスを作成するKubernetesスクリプトを実装する必要があります。
これについてサポートが必要な場合や、その他の質問がある場合は、Beam dev@メーリングリストに連絡してください。コミュニティが支援できる場合があります。
BeamデータストアKubernetesスクリプトを作成するためのガイドライン
- 2つのKubernetesスクリプトを定義する必要があります。
- これは、項目#1を実装する最良の方法です。
- 最初のスクリプトには、メインのデータストアインスタンススクリプト(
StatefulSet
)と、データストアを公開するNodePort
サービスが含まれます。これは、Beam Jenkins継続的インテグレーションサーバーによって実行されるスクリプトになります。 - 2番目のスクリプトでは、Kubernetesクラスターが別のネットワーク上にある場合に、データストアに外部IPアドレスを公開するために使用される追加の
LoadBalancer
サービスを定義します。このファイルの名前には通常、'-for-local-dev'という接尾辞が付きます。
- ポッドがクラッシュ後に再作成されるようにする必要があります。
pod
を直接使用すると、ポッドがクラッシュした場合や、クラスターがポッドのコンテナを移動させるようなことがあった場合に、ポッドは再作成されません。- ほとんどの場合、
StatefulSet
を使用することをお勧めします。これは、再起動後も持続する永続ディスクをサポートしており、特定の永続ディスクを使用するポッドに関連付けられた安定したネットワーク識別子を持っているためです。Deployment
とReplicaSet
も有用な可能性がありますが、これらの機能がないため、シナリオは少なくなります。
- データストアの小規模インスタンスと大規模インスタンス用に個別のスクリプトを作成する必要があります。
- これは、小規模および大規模統合テストで説明されているように、統合テストで利用できる小規模および大規模なデータストアの両方を持つための最良の方法と思われます。
- 信頼できるソースのDockerイメージを使用し、Dockerイメージのバージョンを固定する必要があります。
- 次の順序でイメージを選択する必要があります。
- データソース/シンクの作成者によって提供されるイメージ(公式に保守している場合)。Apacheプロジェクトの場合、これは公式のApacheリポジトリになります。
- セキュリティ修正と保証されたメンテナンスがあるため、公式のDockerイメージ。
- 優れたメンテナーがいる非公式のDockerイメージ、または他のプロバイダーからのイメージ(例:quay.io)。
- 次の順序でイメージを選択する必要があります。
Jenkinsジョブ
.test-infra/jenkinsディレクトリに、既存のIOIT Jenkinsジョブ定義の例があります。job_PerformanceTest_*.groovyというファイルを探してください。最も顕著な例は次のとおりです。
重要なステップを忘れたり、コードを繰り返したりすることなく、ジョブを簡単に作成するのに役立つユーティリティクラスがあることに注意してください。詳細については、Kubernetes.groovyを参照してください。
小規模および大規模な統合テスト
Apache Beamは、複数の構成で統合テストを実行できることを期待しています。
- 小規模
- ランナーの単一のワーカーで実行します(可能である必要がありますが、必須ではありません)。
- データストアは、単一ノードを使用するように構成する必要があります。
- データセットは非常に小さくてもかまいません(1000行)。
- 大規模
- ランナーの複数のワーカーで実行します。
- データストアは、複数のノードを使用するように構成する必要があります。
- この場合に使用されるデータセットはより大きくなります(数十GB)。
これを行うには、次の手順に従います。
- 2つのKubernetesスクリプトを作成します。1つはデータストアの小規模インスタンス用、もう1つは大規模インスタンス用です。
- テストで、小規模または大規模のテストデータを生成するかどうかを決定するパイプラインオプションを使用します(小規模と大規模は、データストアに適したサイズです)。
この例として、HadoopFormatIOのテストがあります。
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて有用で明確でしたか?何か変更したいことはありますか?ぜひお知らせください!