Beam YAMLとProtobufを用いた効率的なストリーミングデータ処理

Beam YAMLとProtobufを用いた効率的なストリーミングデータ処理

ストリーミングデータ処理の規模が拡大するにつれて、その保守、複雑性、コストも増加します。この記事では、パイプラインの再利用性と迅速な展開を保証するProtobufを使用することで、パイプラインを効率的にスケールする方法について説明します。Beam YAMLを使用することで、エンジニアが簡単に実装できるようにすることを目指しています。

Beam YAMLによるパイプラインの簡素化

Apache Beamの初心者にとって、Beamでのパイプラインの作成はやや難しい場合があります。プロジェクトの設定、依存関係の管理など、困難な場合があります。Beam YAMLは、ほとんどの定型コードを排除するため、作業の最も重要な部分であるデータ変換に集中できます。

Beam YAMLの主な利点の一部を以下に示します。

  • 可読性:宣言型言語(YAML)を使用することで、パイプラインの設定はより人間が読みやすくなります。
  • 再利用性:異なるパイプライン間で同じコンポーネントを再利用することが簡素化されます。
  • 保守性:パイプラインの保守と更新が容易になります。

Kafkaトピックからイベントを読み取り、BigQueryに書き込む例を示すテンプレートを以下に示します。

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'TOPIC_NAME'
        format: RAW/AVRO/JSON/PROTO
        bootstrap_servers: 'BOOTSTRAP_SERVERS'
        schema: 'SCHEMA'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
        useAtLeastOnceSemantics: true

options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

完全なワークフロー

このセクションでは、このパイプラインの完全なワークフローを示します。

シンプルなプロトイベントの作成

次のコードは、シンプルなムービーイベントを作成します。

// events/v1/movie_event.proto

syntax = "proto3";

package event.v1;

import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";

message MovieEvent {
  option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
  google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
  google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
  google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
  google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
    // validates the average rating is at least 0
    gte: 0,
    // validates the average rating is at most 100
    lte: 100
  }, (gen_bq_schema.bigquery).description = "Movie rating"];
  string event_dt = 5 [
    (gen_bq_schema.bigquery).type_override = "DATETIME",
    (gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
    (buf.validate.field) = {
      string: {
        pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
      },
      ignore_empty: false,
    }
  ];
}

これらのイベントはBigQueryに書き込まれるため、bq_fieldプロトとbq_tableプロトがインポートされます。これらのプロトファイルは、BigQuery JSONスキーマの生成に役立ちます。この例では、テスト、品質、パフォーマンスを開発プロセスのできるだけ早い段階で実施するシフトレフトアプローチも示しています。たとえば、ソースから有効なイベントのみが生成されるようにするために、buf.validate要素が含まれています。

events/v1フォルダにmovie_event.protoプロトを作成したら、必要なファイル記述子を生成できます。ファイル記述子は、スキーマのコンパイル済み表現であり、さまざまなツールやシステムがProtobufデータを動的に理解して処理することを可能にします。このプロセスの簡素化のために、この例ではBufを使用しており、次の構成ファイルが必要です。

Buf構成

# buf.yaml

version: v2
deps:
  - buf.build/googlecloudplatform/bq-schema-api
  - buf.build/bufbuild/protovalidate
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
# buf.gen.yaml

version: v2
managed:
  enabled: true
plugins:
  # Python Plugins
  - remote: buf.build/protocolbuffers/python
    out: gen/python
  - remote: buf.build/grpc/python
    out: gen/python

  # Java Plugins
  - remote: buf.build/protocolbuffers/java:v25.2
    out: gen/maven/src/main/java
  - remote: buf.build/grpc/java
    out: gen/maven/src/main/java

  # BQ Schemas
  - remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
    out: protoc-gen/bq_schema

必要なJava、Python、BigQueryスキーマ、および記述子ファイルを生成するには、次の2つのコマンドを実行します。

// Generate the buf.lock file
buf deps update

// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports

// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports

Beam YAMLによるプロトの読み取り

YAMLファイルに次の変更を加えます。

# movie_events_pipeline.yml

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'movie_proto'
        format: PROTO
        bootstrap_servers: '<BOOTSTRAP_SERVERS>'
        file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
        message_name: 'event.v1.MovieEvent'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: '<PROJECT_ID>.raw.movie_table'
        useAtLeastOnceSemantics: true
options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

この手順では、形式をPROTOに変更し、file_descriptor_pathmessage_nameを追加します。

Terraformを使用したパイプラインのデプロイ

Terraformを使用して、Dataflowをランナーとして、Beam YAMLパイプラインをデプロイできます。次のTerraformコード例は、これを実現する方法を示しています。

// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
  project = var.gcp_project_id
  service = "dataflow.googleapis.com"
}

// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
  provider                     = google-beta
  project                      = var.gcp_project_id
  name                         = "movie-proto-events"
  container_spec_gcs_path      = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
  region                       = var.gcp_region
  on_delete                    = "drain"
  machine_type                 = "n2d-standard-4"
  enable_streaming_engine      = true
  subnetwork                   = var.subnetwork
  skip_wait_on_job_termination = true
  parameters = {
    yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
    max_num_workers    = 40
    worker_zone        = var.gcp_zone
  }
  depends_on = [google_project_service.enable_dataflow_api]
}

TerraformとProtoを使用して作成できるBigQueryテーブルが存在すると仮定すると、このコードは、KafkaからProtoイベントを読み取り、BigQueryに書き込むBeam YAMLコードを使用してDataflowジョブを作成します。

改善点と結論

コミュニティからの次の貢献により、この例でのBeam YAMLコードを改善できます。

  • スキーマレジストリのサポート:Buf RegistryまたはApicurioなどのスキーマレジストリと統合して、スキーマ管理を改善します。現在のワークフローでは、Bufを使用して記述子を生成し、Google Cloud Storageに格納しています。代わりに、記述子をスキーマレジストリに格納できます。

  • 監視の強化:高度な監視とアラートメカニズムを実装して、データパイプラインの問題を迅速に特定し、対処します。

Beam YAMLとProtobufを活用することで、データ処理パイプラインの作成と保守を合理化し、複雑さを大幅に軽減できます。このアプローチにより、エンジニアは、Beamコードを手動で記述する必要なく、堅牢で再利用可能なパイプラインを効率的に実装およびスケーリングできます。

貢献

機能の構築と追加に協力したい開発者は、Beam YAMLモジュールへの貢献を開始できます。

GitHubリポジトリには、yamlタグでマークされた、公開されているバグの一覧もあります。

Beam YAMLはBeam 2.52から安定版としてマークされていますが、まだ開発中であり、新しい機能が各リリースで追加されています。フレームワークの設計決定に参加し、フレームワークの使用方法に関する洞察を提供したい方は、これらの議論が行われている開発メーリングリストへの参加を強くお勧めします。