Pythonストリーミングパイプライン

Pythonストリーミングパイプラインの実行は、Beam SDKバージョン2.5.0以降で利用可能になりました(一部の制限付き)。

ストリーミング実行を使用する理由

パイプラインがストリーミングまたは継続的に更新されるデータソース(Cloud Pub/Subなど)から読み取る場合、Beamは非境界PCollectionを作成します。ランナーは、コレクション全体を一度に処理できないため、継続的に実行されるストリーミングジョブを使用して非境界PCollectionを処理する必要があります。サイズと境界には、境界付きコレクションと非境界コレクションに関する詳細情報があります。

ストリーム処理を使用するためのパイプラインの変更

バッチパイプラインをストリーミングをサポートするように変更するには、次のコード変更を行う必要があります

Python用Beam SDKには、非境界PCollectionをサポートする2つのI/Oコネクタが含まれています。Google Cloud Pub/Sub(読み取りと書き込み)とGoogle BigQuery(書き込み)です。

次のスニペットは、バッチWordCountの例をストリーミングをサポートするように変更するために必要なコード変更を示しています

これらのバッチWordCountスニペットは、wordcount.pyからのものです。このコードは、TextIO I/Oコネクタを使用して、境界付きコレクションの読み取りと書き込みを行います。

  lines = p | 'read' >> ReadFromText(known_args.input)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)

これらのストリーミングWordCountスニペットは、streaming_wordcount.pyからのものです。このコードは、非境界ソース(Cloud Pub/Sub)から読み取り、書き込みを行うI/Oコネクタを使用し、固定ウィンドウ処理戦略を指定します。

  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | beam.WindowInto(window.FixedWindows(15, 0))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write to Pub/Sub
  output | beam.io.WriteStringsToPubSub(known_args.output_topic)

ストリーミングパイプラインの実行

ストリーミングWordCountパイプラインの例を実行するには、Cloud Pub/Sub入力トピックと出力トピックが必要です。テスト目的でトピックを作成、サブスクライブ、およびプルするには、Cloud Pub/Subクイックスタートのコマンドを使用できます。

次の簡単なbashスクリプトは、入力テキストファイルの行を入力トピックにフィードします

cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done

または、projects/pubsub-public-data/topics/taxirides-realtimeなどの一般公開されているCloud Pub/Subストリームから読み取ることもできます。ただし、書き込みをテストするには、独自の出力トピックを作成する必要があります。

次のコマンドは、streaming_wordcount.pyのストリーミングパイプラインの例を実行します。Cloud Pub/Subプロジェクトと入力トピック(--input_topic)、出力Cloud Pub/Subプロジェクトとトピック(--output_topic)を指定します。

# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
See /documentation/runners/spark/ for more information.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]

# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

ストリーミングパイプラインの実行に関するランナー固有の追加情報については、ランナーのドキュメントを確認してください

サポートされていない機能

Pythonストリーミング実行は現在、次の機能をサポートしていません