Apache Beam WordCount例
- Java SDK
- Python SDK
- Go SDK
WordCountの例は、テキストを読み取り、テキスト行を個々の単語にトークン化し、それらの単語ごとに頻度カウントを実行できる処理パイプラインを設定する方法を示しています。Beam SDKには、互いに基づいて構築されるこれらの4つの段階的に詳細なWordCount例が用意されています。すべての例の入力テキストは、シェイクスピアのテキストのセットです。
各WordCountの例では、Beamプログラミングモデルにおけるさまざまな概念が紹介されています。最も単純な例であるMinimalWordCountを理解することから始めましょう。パイプライン構築の基本原則に慣れてきたら、他の例でより多くの概念を学びましょう。
- **MinimalWordCount**は、パイプライン構築に含まれる基本原則を示しています。
- **WordCount**は、再利用可能で保守しやすいパイプラインを作成する際に一般的に推奨されるベストプラクティスの一部を紹介します。
- **DebuggingWordCount**は、ロギングとデバッグのプラクティスを紹介します。
- **WindowedWordCount**は、Beamのプログラミングモデルを使用して制限付きと無制限の両方のデータセットを処理する方法を示しています。
MinimalWordCount例
MinimalWordCountは、Direct Runnerを使用してテキストファイルから読み取り、単語のトークン化とカウントを行う変換を適用し、データを出力テキストファイルに書き込む単純なパイプラインを示しています。
この例では、入力ファイルと出力ファイルの場所をハードコーディングしており、エラーチェックを実行していません。これは、Beamパイプライン作成の「骨組み」だけを示すことを目的としています。このパラメーター化の欠如により、この特定のパイプラインは、標準的なBeamパイプラインよりもさまざまなランナー間での移植性が低くなります。後の例では、パイプラインの入力と出力ソースをパラメーター化し、他のベストプラクティスを示します。
Javaの完全なコードを表示するには、**MinimalWordCount**を参照してください。
Pythonの完全なコードを表示するには、**wordcount_minimal.py**を参照してください。
Goの完全なコードを表示するには、**minimal_wordcount.go**を参照してください。
主要な概念
- パイプラインの作成
- パイプラインへの変換の適用
- 入力の読み取り(この例:テキストファイルの読み取り)
- ParDo変換の適用
- SDK提供の変換の適用(この例:Count)
- 出力の書き込み(この例:テキストファイルへの書き込み)
- パイプラインの実行
次のセクションでは、MinimalWordCountパイプラインからの関連するコード抜粋を使用して、これらの概念を詳しく説明します。
パイプラインの作成
この例では、最初に`PipelineOptions`オブジェクトを作成します。このオブジェクトを使用すると、パイプラインを実行するパイプラインランナーや、選択したランナーに必要なランナースペシフィックな構成など、パイプラインのさまざまなオプションを設定できます。この例では、これらのオプションをプログラムで設定していますが、多くの場合、コマンドライン引数を使用して`PipelineOptions`を設定します。
`DataflowRunner`や`SparkRunner`など、パイプラインを実行するためのランナーを指定できます。この例のようにランナーの指定を省略すると、パイプラインは`DirectRunner`を使用してローカルで実行されます。次のセクションでは、パイプラインのランナーを指定します。
// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions
input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'
beam_options = PipelineOptions(
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
)
次のステップは、作成したばかりのオプションを使用して`Pipeline`オブジェクトを作成することです。Pipelineオブジェクトは、実行される変換のグラフを構築し、その特定のパイプラインに関連付けます。
最初のステップは、`Pipeline`オブジェクトを作成することです。これは、実行される変換のグラフを構築し、その特定のパイプラインに関連付けます。スコープにより、複合変換へのグループ化が可能です。
パイプライン変換の適用
MinimalWordCountパイプラインには、データをパイプラインに読み込み、データを操作または変換し、結果を出力するいくつかの変換が含まれています。変換は個々の操作で構成される場合も、複数のネストされた変換(複合変換)を含む場合もあります。
各変換は、何らかの入力データを受け取り、何らかの出力データを作成します。入力データと出力データは、多くの場合、SDKクラス`PCollection`によって表されます。`PCollection`は、Beam SDKによって提供される特別なクラスで、無制限のデータセットを含む、事実上あらゆるサイズのデータセットを表すために使用できます。
図1:MinimalWordCountパイプラインのデータフロー。
MinimalWordCountパイプラインには5つの変換が含まれています。
- `Pipeline`オブジェクト自体にテキストファイル`Read`変換が適用され、`PCollection`が出力として生成されます。出力`PCollection`の各要素は、入力ファイルの1行を表します。この例では、公開アクセス可能なGoogle Cloud Storageバケット(「gs://」)に保存されている入力データを使用しています。
- この変換は、`PCollection
`内の行を分割します。各要素は、シェイクスピアの収集されたテキスト内の個々の単語です。代わりに、テキスト行を個々の単語にトークン化する各要素に対して`DoFn`(匿名クラスとしてインラインで定義)を呼び出すParDo変換を使用することもできました。この変換の入力は、前の`TextIO.Read`変換によって生成されたテキスト行の`PCollection`です。`ParDo`変換は、各要素がテキスト内の個々の単語を表す新しい`PCollection`を出力します。
SDK提供の`Count`変換は、任意のタイプの`PCollection`を受け取り、キー/値ペアの`PCollection`を返す汎用変換です。各キーは入力コレクションからの固有の要素を表し、各値はそのキーが入力コレクションに表示された回数を表します。
このパイプラインでは、`Count`の入力は前の`ParDo`によって生成された個々の単語の`PCollection`であり、出力は、各キーがテキスト内の固有の単語を表し、関連付けられた値が各単語の出現回数を表すキー/値ペアの`PCollection`です。
次の変換は、固有の単語と出現回数のキー/値ペアをそれぞれ、出力ファイルに書き込むのに適した印刷可能な文字列にフォーマットします。
マップ変換は、単純な`ParDo`をカプセル化するより高度な複合変換です。入力`PCollection`の各要素に対して、マップ変換は正確に1つの出力要素を作成する関数を適用します。
- テキストファイル書き込み変換。この変換は、フォーマットされた文字列の最終的な`PCollection`を入力として受け取り、各要素を出力テキストファイルに書き込みます。入力`PCollection`の各要素は、結果の出力ファイル内の1行を表します。
`Write`変換は、この場合無視される`PDone`型の取るに足りない結果値を生成することに注意してください。
`Write`変換は、PCollectionを返さないことに注意してください。
パイプラインの実行
`run`メソッドを呼び出すことでパイプラインを実行します。これにより、`PipelineOptions`で指定したパイプラインランナーによってパイプラインが実行されます。
ランナーにパイプラインを渡して実行します。
`run`メソッドは非同期であることに注意してください。ブロッキング実行の場合は、`run`への呼び出しによって返される結果オブジェクトで`waitUntilFinish` `wait_until_finish`メソッドを呼び出します。
Playgroundで完全な例を試す
WordCount例
このWordCountの例では、パイプラインの読み取り、書き込み、保守を容易にするために推奨されるいくつかのプログラミングプラクティスを紹介します。明示的に必要ではありませんが、これによりパイプラインの実行の柔軟性が高まり、パイプラインのテストが容易になり、パイプラインのコードの再利用性が向上します。
このセクションでは、パイプライン構築の基本概念をよく理解していることを前提としています。まだその段階に達していないと感じた場合は、上記のセクションMinimalWordCountをお読みください。
Javaでこの例を実行するには
Java WordCountクイックスタートで説明されているように、開発環境をセットアップし、Mavenアーキタイプを生成します。次に、ランナーのいずれかを使用してパイプラインを実行します。
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
Javaの完全なコードを表示するには、**WordCount**を参照してください。
Pythonでこの例を実行するには
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
Pythonの完全なコードを表示するには、**wordcount.py**を参照してください。
Goでこの例を実行するには
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest
Goの完全なコードを表示するには、**wordcount.go**を参照してください。
新しい概念
- 明示的な`DoFn`を使用した`ParDo`の適用
- 複合変換の作成
- パラメーター化可能な`PipelineOptions`の使用
次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。
明示的なDoFnsの指定
`ParDo`変換を使用する場合、入力`PCollection`の各要素に適用される処理操作を指定する必要があります。この処理操作は、SDKクラス`DoFn`のサブクラスです。前の例(MinimalWordCount)で行われているように、各`ParDo`の`DoFn`サブクラスを匿名の内部クラスインスタンスとしてインラインで作成できます。ただし、`DoFn`をグローバルレベルで定義することをお勧めします。これにより、単体テストが容易になり、`ParDo`コードの可読性が向上します。
ParDo
トランスフォームを使用する際は、入力PCollection
内の各要素に適用される処理操作を指定する必要があります。この処理操作は、名前付き関数または特別に名前が付けられたメソッドを持つ構造体のいずれかです。無名関数(クロージャを除く)を使用できます。ただし、DoFn
をグローバルレベルで定義することをお勧めします。これにより、単体テストが容易になり、ParDo
コードの可読性も向上します。
複合変換の作成
複数のトランスフォームまたはParDo
ステップからなる処理操作がある場合は、PTransform
のサブクラスとして作成できます。PTransform
サブクラスを作成すると、複雑なトランスフォームをカプセル化し、パイプラインの構造をより明確でモジュール化し、単体テストを容易にすることができます。
複数のトランスフォームまたはParDo
ステップからなる処理操作がある場合は、通常のGo関数を使用してそれらをカプセル化できます。さらに、名前付きサブスコープを使用して、それらを監視可能な複合トランスフォームとしてグループ化できます。
この例では、2つのトランスフォームがPTransform
サブクラスCountWords
としてカプセル化されています。CountWords
には、ExtractWordsFn
を実行するParDo
と、SDK提供のCount
トランスフォームが含まれています。
この例では、2つのトランスフォームがCountWords
関数としてカプセル化されています。
CountWords
が定義されている場合、その最終的な入力と出力を指定します。入力は抽出操作のためのPCollection<String>
であり、出力はカウント操作によって生成されるPCollection<KV<String, Long>>
です。
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
public static void main(String[] args) throws IOException {
Pipeline p = ...
p.apply(...)
.apply(new CountWords())
...
}
パラメーター化可能なPipelineOptionsの使用
パイプラインを実行する際に、さまざまな実行オプションをハードコードできます。ただし、より一般的な方法は、コマンドライン引数の解析を介して独自の構成オプションを定義することです。コマンドラインを介して構成オプションを定義すると、コードをさまざまなランナー間でより簡単に移植できます。
コマンドラインパーサーによって処理される引数を追加し、それらのデフォルト値を指定します。その後、パイプラインコードでオプションの値にアクセスできます。
この目的には、標準のflag
パッケージを使用できます。
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
...
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
...
}
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = pipeline | beam.io.ReadFromText(args.input_file)
Playgroundで完全な例を試す
DebuggingWordCount例
DebuggingWordCountの例は、パイプラインコードをインストルメントするためのベストプラクティスを示しています。
Javaでこの例を実行するには
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
Javaの完全なコードを表示するには、DebuggingWordCountを参照してください。
Pythonでこの例を実行するには
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
Pythonの完全なコードを表示するには、wordcount_debugging.pyを参照してください。
Goでこの例を実行するには
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
Goの完全なコードを表示するには、debugging_wordcount.goを参照してください。
新しい概念
- ロギング
PAssert
によるパイプラインのテスト
次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。
ロギング
各ランナーは、ログを独自のやり方で処理することを選択できます。
// This example uses .trace and .debug:
public class DebuggingWordCount {
public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
...
@ProcessElement
public void processElement(ProcessContext c) {
if (...) {
...
LOG.debug("Matched: " + c.element().getKey());
} else {
...
LOG.trace("Did not match: " + c.element().getKey());
}
}
}
}
# [START example_wordcount_debugging_aggregators]
import logging
class FilterTextFn(beam.DoFn):
"""A DoFn that filters for a specific key based on a regular expression."""
def __init__(self, pattern):
self.pattern = pattern
# A custom metric can track values in your pipeline as it runs. Create
# custom metrics matched_word and unmatched_words.
self.matched_words = Metrics.counter(self.__class__, 'matched_words')
self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')
def process(self, element):
word, _ = element
if re.match(self.pattern, word):
# Log at INFO level each element we match. When executing this pipeline
# using the Dataflow service, these log lines will appear in the Cloud
# Logging UI.
logging.info('Matched %s', word)
# Add 1 to the custom metric counter matched_words
self.matched_words.inc()
yield element
else:
# Log at the "DEBUG" level each element that is not matched. Different
# log levels can be used to control the verbosity of logging providing
# an effective mechanism to filter less important information. Note
# currently only "INFO" and higher level logs are emitted to the Cloud
# Logger. This log message will not be visible in the Cloud Logger.
logging.debug('Did not match %s', word)
# Add 1 to the custom metric counter umatched_words
self.umatched_words.inc()
type filterFn struct {
...
}
func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
if f.re.MatchString(word) {
// Log at the "INFO" level each element that we match.
log.Infof(ctx, "Matched: %v", word)
emit(word, count)
} else {
// Log at the "DEBUG" level each element that is not matched.
log.Debugf(ctx, "Did not match: %v", word)
}
}
Direct Runner
DirectRunner
でパイプラインを実行する場合は、ログメッセージをローカルコンソールに直接出力できます。Beam SDK for Javaを使用する場合は、クラスパスにSlf4j
を追加する必要があります。
Cloud Dataflow Runner
DataflowRunner
でパイプラインを実行する場合は、Stackdriver Loggingを使用できます。Stackdriver Loggingは、すべてのCloud Dataflowジョブのワーカーからのログを、Google Cloud Platformコンソールの単一の位置に集約します。Stackdriver Loggingを使用して、Cloud Dataflowがジョブの完了のために起動したすべてのワーカーからのログを検索およびアクセスできます。パイプラインのDoFn
インスタンスのログステートメントは、パイプラインの実行中にStackdriver Loggingに表示されます。
ワーカーのログレベルも制御できます。ユーザーコードを実行するCloud Dataflowワーカーは、デフォルトで「INFO」ログレベル以上のStackdriver Loggingへのログ記録が構成されています。次のように指定することで、特定のログ名前空間のログレベルをオーバーライドできます。--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
。たとえば、Cloud Dataflowサービスを使用してパイプラインを実行する際に--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
を指定すると、Stackdriver Loggingには、デフォルトの「INFO」以上のログに加えて、パッケージの「DEBUG」以上のログのみが含まれます。
デフォルトのCloud Dataflowワーカーのログ構成は、--defaultWorkerLogLevel=<TRACE、DEBUG、INFO、WARN、ERRORのいずれか>
を指定してオーバーライドできます。たとえば、Cloud Dataflowサービスでパイプラインを実行する際に--defaultWorkerLogLevel=DEBUG
を指定すると、Cloud Loggingにはすべての「DEBUG」以上のログが含まれます。デフォルトのワーカーログレベルをTRACEまたはDEBUGに変更すると、出力されるログの量が大幅に増加することに注意してください。
Apache Spark Runner
注記: このセクションはまだ追加されていません。Issue 18076を参照してください。
Apache Flink Runner
注記: このセクションはまだ追加されていません。Issue 18075を参照してください。
Apache Nemo Runner
NemoRunner
でパイプラインを実行する場合は、ほとんどのログメッセージがローカルコンソールに直接出力されます。ログを最大限に活用するには、クラスパスにSlf4j
を追加する必要があります。ドライバ側と実行側の両方でログを観察するには、Apache REEFによって作成されたフォルダを観察する必要があります。たとえば、ローカルランタイムでパイプラインを実行する場合、作業ディレクトリにREEF_LOCAL_RUNTIME
というフォルダが作成され、ログとメトリック情報はすべてそのディレクトリにあります。
アサーションによるパイプラインのテスト
PAssert
assert_that
は、Hamcrestのコレクションマッチャーのスタイルで、パイプラインレベルのテストを作成してPCollectionの内容を検証するために使用できる便利なPTransformのセットです。アサートは、小さなデータセットを使用した単体テストで最も効果的に使用されます。
passert
パッケージには、パイプラインレベルのテストを作成してPCollectionの内容を検証するために使用できる便利なPTransformが含まれています。アサートは、小さなデータセットを使用した単体テストで最も効果的に使用されます。
次の例では、フィルタリングされた単語のセットが予想されるカウントと一致することを確認します。アサートは出力せず、すべての期待値が満たされた場合にのみパイプラインは成功します。
次の例では、2つのコレクションに同じ値が含まれていることを確認します。アサートは出力せず、すべての期待値が満たされた場合にのみパイプラインは成功します。
単体テストの例については、DebuggingWordCountTestを参照してください。
Playgroundで完全な例を試す
WindowedWordCount例
WindowedWordCountの例は、前の例と同様にテキスト内の単語をカウントしますが、いくつかの高度な概念を紹介しています。
新しい概念
- 無制限と制限付きデータセット
- データへのタイムスタンプの追加
- ウィンドウ化
- ウィンドウ化されたPCollectionでのPTransformsの再利用
次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。
Javaでこの例を実行するには
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
Javaの完全なコードを表示するには、WindowedWordCountを参照してください。
Pythonでこの例を実行するには
このパイプラインは、--output_table
パラメーターを使用して、PROJECT:DATASET.TABLE
またはDATASET.TABLE
形式でBigQueryテーブルに結果を出力します。
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
--output_table PROJECT:DATASET.TABLE \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
Pythonの完全なコードを表示するには、windowed_wordcount.pyを参照してください。
Goでこの例を実行するには
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
Goの完全なコードを表示するには、windowed_wordcount.goを参照してください。
無制限と制限付きデータセット
Beamを使用すると、バウンドされたデータセットとバウンドされていないデータセットの両方を処理できる単一のパイプラインを作成できます。データセットに固定数の要素がある場合、それはバウンドされたデータセットであり、すべてのデータはまとめて処理できます。バウンドされたデータセットの場合、考慮すべき質問は「すべてのデータを持っていますか?」です。データが継続的に到着する場合(モバイルゲームの例の無限のゲームスコアの流れなど)、それはバウンドされていないデータセットです。バウンドされていないデータセットは、一度に処理できる状態になることはありません。そのため、データは継続的に実行されるストリーミングパイプラインを使用して処理する必要があります。データセットは特定の時点までしか完了しないため、考慮すべき質問は「どの時点まですべてのデータを持っていますか?」です。Beamはウィンドウ処理を使用して、継続的に更新されるデータセットを有限サイズの論理ウィンドウに分割します。入力がバウンドされていない場合は、ストリーミングをサポートするランナーを使用する必要があります。
パイプラインの入力がバウンドされている場合、すべてのダウンストリームPCollectionもバウンドされます。同様に、入力がバウンドされていない場合、パイプラインのすべてのダウンストリームPCollectionはバウンドされませんが、個別のブランチは独立してバウンドされる可能性があります。
この例の入力はシェイクスピアのテキストのセットであり、これは有限のデータセットであることを思い出してください。したがって、この例では、テキストファイルからバウンドされたデータを読み取ります。
def main(arvg=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file',
dest='input_file',
default='/Users/home/words-example.txt')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
lines = p | 'read' >> ReadFromText(known_args.input_file)
データへのタイムスタンプの追加
PCollection
内の各要素には、関連付けられたタイムスタンプがあります。各要素のタイムスタンプは、最初にPCollection
を作成するソースによって割り当てられます。バウンドされていないPCollectionを作成するソースの中には、各新しい要素に、要素が読み取られたり追加されたりする時点に対応するタイムスタンプを割り当てるものがあります。DoFn
を使用してタイムスタンプを手動で割り当てたり調整したりできますが、タイムスタンプを前方に移動することしかできません。
この例では、入力はバウンドされています。例のために、AddTimestampsFn
という名前のDoFn
メソッド(ParDo
によって呼び出される)は、要素自体を指定して、各要素のタイムスタンプを設定します。たとえば、要素がログ行の場合、このParDo
はログ文字列から時間を解析して、要素のタイムスタンプとして設定できます。シェイクスピアの著作にはタイムスタンプが固有に存在しないため、ここでは概念を示すためにランダムなタイムスタンプを作成しました。入力テキストの各行には、2時間の間のランダムな関連付けられたタイムスタンプが設定されます。
以下は、ParDo
によって呼び出されるDoFn
であるAddTimestampFn
のコードです。これは、要素自体を指定してタイムスタンプのデータ要素を設定します。たとえば、要素がログ行の場合、このParDo
はログ文字列から時間を解析して、要素のタイムスタンプとして設定できます。シェイクスピアの著作にはタイムスタンプが固有に存在しないため、ここでは概念を示すためにランダムなタイムスタンプを作成しました。入力テキストの各行には、2時間の間のランダムな関連付けられたタイムスタンプが設定されます。
static class AddTimestampFn extends DoFn<String, String> {
private final Instant minTimestamp;
private final Instant maxTimestamp;
AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
}
@ProcessElement
public void processElement(ProcessContext c) {
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
/**
* Concept #2: Set the data element with that timestamp.
*/
c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
}
}
beam.X
「型変数」の使用により、トランスフォームを任意の型で使用できます。
ウィンドウ化
Beamは、PCollection
をバウンドされた要素のセットに細分化するウィンドウ処理という概念を使用します。複数の要素を集約するPTransformは、コレクション全体が無限のサイズ(バウンドされていない)であっても、複数の有限ウィンドウの連続として各PCollection
を処理します。
WindowedWordCountの例では、固定時間ウィンドウ処理が適用されます。ここで、各ウィンドウは固定時間間隔を表します。この例の固定ウィンドウサイズはデフォルトで1分です(コマンドラインオプションで変更できます)。
ウィンドウ化されたPCollectionでのPTransformsの再利用
単純なPCollectionに対して作成された既存のPTransformを、ウィンドウ化されたPCollectionに対しても再利用できます。
Playgroundで完全な例を試す
StreamingWordCount例
StreamingWordCountの例は、Pub/SubサブスクリプションまたはトピックからPub/Subメッセージを読み取り、各メッセージ内の単語の頻度カウントを実行するストリーミングパイプラインです。WindowedWordCountと同様に、この例では固定時間ウィンドウ処理が適用されます。ここで、各ウィンドウは固定時間間隔を表します。この例の固定ウィンドウサイズは15秒です。パイプラインは、各15秒のウィンドウで検出された単語の頻度カウントを出力します。
新しい概念
- 無制限データセットの読み込み
- 無制限結果の書き込み
Javaでこの例を実行するには
注記: StreamingWordCountはまだJava SDKでは利用できません。
Pythonでこの例を実行するには
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
Pythonの完全なコードを見るには、streaming_wordcount.pyを参照してください。
Goでこの例を実行するには
注記: StreamingWordCountはまだGo SDKでは利用できません。これに関する未解決の問題があります(Issue 18879)。
無制限データセットの読み込み
この例では、入力として無制限のデータセットを使用しています。beam.io.ReadFromPubSub
を使用して、Pub/SubサブスクリプションまたはトピックからPub/Subメッセージを読み取ります。
無制限結果の書き込み
入力が無制限の場合、出力PCollection
も同様に無制限になります。そのため、結果に対して適切なI/Oを選択する必要があります。一部のI/Oは制限付き出力のみをサポートしますが、他のI/Oは制限付きと無制限の両方の出力をサポートしています。
この例では、無制限のPCollection
を使用し、結果をGoogle Pub/Subにストリーミングします。beam.io.WriteToPubSub
を使用して、結果をフォーマットし、Pub/Subトピックに書き込みます。
次のステップ
- モバイルゲームの例の手順で、モバイルゲームの例を順を追って説明します。
- 自己学習ペースでラーニングリソースをご覧ください。
- お気に入りのビデオとポッドキャストをご覧ください。
- Beamのusers@メーリングリストに参加してください。
問題が発生した場合は、お気軽にお問い合わせください!
最終更新日:2024年10月31日
お探しのものが見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!