ブログ
2024/09/20
Beam YAMLとProtobufを用いた効率的なストリーミングデータ処理
Beam YAMLとProtobufを用いた効率的なストリーミングデータ処理
ストリーミングデータ処理の規模が拡大するにつれて、その保守、複雑性、コストも増加します。この記事では、パイプラインの再利用性と迅速な展開を保証するProtobufを使用することで、パイプラインを効率的にスケールする方法について説明します。Beam YAMLを使用することで、エンジニアが簡単に実装できるようにすることを目指しています。
Beam YAMLによるパイプラインの簡素化
Apache Beamの初心者にとって、Beamでのパイプラインの作成はやや難しい場合があります。プロジェクトの設定、依存関係の管理など、困難な場合があります。Beam YAMLは、ほとんどの定型コードを排除するため、作業の最も重要な部分であるデータ変換に集中できます。
Beam YAMLの主な利点の一部を以下に示します。
- 可読性:宣言型言語(YAML)を使用することで、パイプラインの設定はより人間が読みやすくなります。
- 再利用性:異なるパイプライン間で同じコンポーネントを再利用することが簡素化されます。
- 保守性:パイプラインの保守と更新が容易になります。
Kafkaトピックからイベントを読み取り、BigQueryに書き込む例を示すテンプレートを以下に示します。
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'TOPIC_NAME'
format: RAW/AVRO/JSON/PROTO
bootstrap_servers: 'BOOTSTRAP_SERVERS'
schema: 'SCHEMA'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
完全なワークフロー
このセクションでは、このパイプラインの完全なワークフローを示します。
シンプルなプロトイベントの作成
次のコードは、シンプルなムービーイベントを作成します。
// events/v1/movie_event.proto
syntax = "proto3";
package event.v1;
import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";
message MovieEvent {
option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
// validates the average rating is at least 0
gte: 0,
// validates the average rating is at most 100
lte: 100
}, (gen_bq_schema.bigquery).description = "Movie rating"];
string event_dt = 5 [
(gen_bq_schema.bigquery).type_override = "DATETIME",
(gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
(buf.validate.field) = {
string: {
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
},
ignore_empty: false,
}
];
}
これらのイベントはBigQueryに書き込まれるため、bq_field
プロトとbq_table
プロトがインポートされます。これらのプロトファイルは、BigQuery JSONスキーマの生成に役立ちます。この例では、テスト、品質、パフォーマンスを開発プロセスのできるだけ早い段階で実施するシフトレフトアプローチも示しています。たとえば、ソースから有効なイベントのみが生成されるようにするために、buf.validate
要素が含まれています。
events/v1
フォルダにmovie_event.proto
プロトを作成したら、必要なファイル記述子を生成できます。ファイル記述子は、スキーマのコンパイル済み表現であり、さまざまなツールやシステムがProtobufデータを動的に理解して処理することを可能にします。このプロセスの簡素化のために、この例ではBufを使用しており、次の構成ファイルが必要です。
Buf構成
# buf.yaml
version: v2
deps:
- buf.build/googlecloudplatform/bq-schema-api
- buf.build/bufbuild/protovalidate
breaking:
use:
- FILE
lint:
use:
- DEFAULT
# buf.gen.yaml
version: v2
managed:
enabled: true
plugins:
# Python Plugins
- remote: buf.build/protocolbuffers/python
out: gen/python
- remote: buf.build/grpc/python
out: gen/python
# Java Plugins
- remote: buf.build/protocolbuffers/java:v25.2
out: gen/maven/src/main/java
- remote: buf.build/grpc/java
out: gen/maven/src/main/java
# BQ Schemas
- remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
out: protoc-gen/bq_schema
必要なJava、Python、BigQueryスキーマ、および記述子ファイルを生成するには、次の2つのコマンドを実行します。
// Generate the buf.lock file
buf deps update
// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports
// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports
Beam YAMLによるプロトの読み取り
YAMLファイルに次の変更を加えます。
# movie_events_pipeline.yml
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'movie_proto'
format: PROTO
bootstrap_servers: '<BOOTSTRAP_SERVERS>'
file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
message_name: 'event.v1.MovieEvent'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: '<PROJECT_ID>.raw.movie_table'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
この手順では、形式をPROTO
に変更し、file_descriptor_path
とmessage_name
を追加します。
Terraformを使用したパイプラインのデプロイ
Terraformを使用して、Dataflowをランナーとして、Beam YAMLパイプラインをデプロイできます。次のTerraformコード例は、これを実現する方法を示しています。
// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
project = var.gcp_project_id
service = "dataflow.googleapis.com"
}
// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
provider = google-beta
project = var.gcp_project_id
name = "movie-proto-events"
container_spec_gcs_path = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
region = var.gcp_region
on_delete = "drain"
machine_type = "n2d-standard-4"
enable_streaming_engine = true
subnetwork = var.subnetwork
skip_wait_on_job_termination = true
parameters = {
yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
max_num_workers = 40
worker_zone = var.gcp_zone
}
depends_on = [google_project_service.enable_dataflow_api]
}
TerraformとProtoを使用して作成できるBigQueryテーブルが存在すると仮定すると、このコードは、KafkaからProtoイベントを読み取り、BigQueryに書き込むBeam YAMLコードを使用してDataflowジョブを作成します。
改善点と結論
コミュニティからの次の貢献により、この例でのBeam YAMLコードを改善できます。
スキーマレジストリのサポート:Buf RegistryまたはApicurioなどのスキーマレジストリと統合して、スキーマ管理を改善します。現在のワークフローでは、Bufを使用して記述子を生成し、Google Cloud Storageに格納しています。代わりに、記述子をスキーマレジストリに格納できます。
監視の強化:高度な監視とアラートメカニズムを実装して、データパイプラインの問題を迅速に特定し、対処します。
Beam YAMLとProtobufを活用することで、データ処理パイプラインの作成と保守を合理化し、複雑さを大幅に軽減できます。このアプローチにより、エンジニアは、Beamコードを手動で記述する必要なく、堅牢で再利用可能なパイプラインを効率的に実装およびスケーリングできます。
貢献
機能の構築と追加に協力したい開発者は、Beam YAMLモジュールへの貢献を開始できます。
GitHubリポジトリには、yaml
タグでマークされた、公開されているバグの一覧もあります。
Beam YAMLはBeam 2.52から安定版としてマークされていますが、まだ開発中であり、新しい機能が各リリースで追加されています。フレームワークの設計決定に参加し、フレームワークの使用方法に関する洞察を提供したい方は、これらの議論が行われている開発メーリングリストへの参加を強くお勧めします。