Apache Beam WordCount例

WordCountの例は、テキストを読み取り、テキスト行を個々の単語にトークン化し、それらの単語ごとに頻度カウントを実行できる処理パイプラインを設定する方法を示しています。Beam SDKには、互いに基づいて構築されるこれらの4つの段階的に詳細なWordCount例が用意されています。すべての例の入力テキストは、シェイクスピアのテキストのセットです。

各WordCountの例では、Beamプログラミングモデルにおけるさまざまな概念が紹介されています。最も単純な例であるMinimalWordCountを理解することから始めましょう。パイプライン構築の基本原則に慣れてきたら、他の例でより多くの概念を学びましょう。

MinimalWordCount例

MinimalWordCountは、Direct Runnerを使用してテキストファイルから読み取り、単語のトークン化とカウントを行う変換を適用し、データを出力テキストファイルに書き込む単純なパイプラインを示しています。

この例では、入力ファイルと出力ファイルの場所をハードコーディングしており、エラーチェックを実行していません。これは、Beamパイプライン作成の「骨組み」だけを示すことを目的としています。このパラメーター化の欠如により、この特定のパイプラインは、標準的なBeamパイプラインよりもさまざまなランナー間での移植性が低くなります。後の例では、パイプラインの入力と出力ソースをパラメーター化し、他のベストプラクティスを示します。

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
$ go install github.com/apache/beam/sdks/v2/go/examples/minimal_wordcount
$ minimal_wordcount

Javaの完全なコードを表示するには、**MinimalWordCount**を参照してください。

Pythonの完全なコードを表示するには、**wordcount_minimal.py**を参照してください。

Goの完全なコードを表示するには、**minimal_wordcount.go**を参照してください。

主要な概念

次のセクションでは、MinimalWordCountパイプラインからの関連するコード抜粋を使用して、これらの概念を詳しく説明します。

パイプラインの作成

この例では、最初に`PipelineOptions`オブジェクトを作成します。このオブジェクトを使用すると、パイプラインを実行するパイプラインランナーや、選択したランナーに必要なランナースペシフィックな構成など、パイプラインのさまざまなオプションを設定できます。この例では、これらのオプションをプログラムで設定していますが、多くの場合、コマンドライン引数を使用して`PipelineOptions`を設定します。

`DataflowRunner`や`SparkRunner`など、パイプラインを実行するためのランナーを指定できます。この例のようにランナーの指定を省略すると、パイプラインは`DirectRunner`を使用してローカルで実行されます。次のセクションでは、パイプラインのランナーを指定します。

 // Create a PipelineOptions object. This object lets us set various execution
 // options for our pipeline, such as the runner you wish to use. This example
 // will run with the DirectRunner by default, based on the class path configured
 // in its dependencies.
 PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
)

次のステップは、作成したばかりのオプションを使用して`Pipeline`オブジェクトを作成することです。Pipelineオブジェクトは、実行される変換のグラフを構築し、その特定のパイプラインに関連付けます。

最初のステップは、`Pipeline`オブジェクトを作成することです。これは、実行される変換のグラフを構築し、その特定のパイプラインに関連付けます。スコープにより、複合変換へのグループ化が可能です。

Pipeline p = Pipeline.create(options);
pipeline = beam.Pipeline(options=beam_options)
p := beam.NewPipeline()
s := p.Root()

パイプライン変換の適用

MinimalWordCountパイプラインには、データをパイプラインに読み込み、データを操作または変換し、結果を出力するいくつかの変換が含まれています。変換は個々の操作で構成される場合も、複数のネストされた変換(複合変換)を含む場合もあります。

各変換は、何らかの入力データを受け取り、何らかの出力データを作成します。入力データと出力データは、多くの場合、SDKクラス`PCollection`によって表されます。`PCollection`は、Beam SDKによって提供される特別なクラスで、無制限のデータセットを含む、事実上あらゆるサイズのデータセットを表すために使用できます。

The MinimalWordCount pipeline data flow.

図1:MinimalWordCountパイプラインのデータフロー。

MinimalWordCountパイプラインには5つの変換が含まれています。

  1. `Pipeline`オブジェクト自体にテキストファイル`Read`変換が適用され、`PCollection`が出力として生成されます。出力`PCollection`の各要素は、入力ファイルの1行を表します。この例では、公開アクセス可能なGoogle Cloud Storageバケット(「gs://」)に保存されている入力データを使用しています。
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
pipeline
| beam.io.ReadFromText(input_file)
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/*")
  1. この変換は、`PCollection`内の行を分割します。各要素は、シェイクスピアの収集されたテキスト内の個々の単語です。代わりに、テキスト行を個々の単語にトークン化する各要素に対して`DoFn`(匿名クラスとしてインラインで定義)を呼び出すParDo変換を使用することもできました。この変換の入力は、前の`TextIO.Read`変換によって生成されたテキスト行の`PCollection`です。`ParDo`変換は、各要素がテキスト内の個々の単語を表す新しい`PCollection`を出力します。
    .apply("ExtractWords", FlatMapElements
        .into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
# The Flatmap transform is a simplified version of ParDo.

| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)
  1. SDK提供の`Count`変換は、任意のタイプの`PCollection`を受け取り、キー/値ペアの`PCollection`を返す汎用変換です。各キーは入力コレクションからの固有の要素を表し、各値はそのキーが入力コレクションに表示された回数を表します。

    このパイプラインでは、`Count`の入力は前の`ParDo`によって生成された個々の単語の`PCollection`であり、出力は、各キーがテキスト内の固有の単語を表し、関連付けられた値が各単語の出現回数を表すキー/値ペアの`PCollection`です。

.apply(Count.<String>perElement())
| beam.combiners.Count.PerElement()
counted := stats.Count(s, words)
  1. 次の変換は、固有の単語と出現回数のキー/値ペアをそれぞれ、出力ファイルに書き込むのに適した印刷可能な文字列にフォーマットします。

    マップ変換は、単純な`ParDo`をカプセル化するより高度な複合変換です。入力`PCollection`の各要素に対して、マップ変換は正確に1つの出力要素を作成する関数を適用します。

.apply("FormatResults", MapElements
    .into(TypeDescriptors.strings())
    .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)
  1. テキストファイル書き込み変換。この変換は、フォーマットされた文字列の最終的な`PCollection`を入力として受け取り、各要素を出力テキストファイルに書き込みます。入力`PCollection`の各要素は、結果の出力ファイル内の1行を表します。
.apply(TextIO.write().to("wordcounts"));
| beam.io.WriteToText(output_path)
textio.Write(s, "wordcounts.txt", formatted)

`Write`変換は、この場合無視される`PDone`型の取るに足りない結果値を生成することに注意してください。

`Write`変換は、PCollectionを返さないことに注意してください。

パイプラインの実行

`run`メソッドを呼び出すことでパイプラインを実行します。これにより、`PipelineOptions`で指定したパイプラインランナーによってパイプラインが実行されます。

ランナーにパイプラインを渡して実行します。

p.run().waitUntilFinish();
with beam.Pipeline(...) as p:
  [construction]
# p.run() automatically called
direct.Execute(context.Background(), p)

`run`メソッドは非同期であることに注意してください。ブロッキング実行の場合は、`run`への呼び出しによって返される結果オブジェクトで`waitUntilFinish` `wait_until_finish`メソッドを呼び出します。

Playgroundで完全な例を試す

WordCount例

このWordCountの例では、パイプラインの読み取り、書き込み、保守を容易にするために推奨されるいくつかのプログラミングプラクティスを紹介します。明示的に必要ではありませんが、これによりパイプラインの実行の柔軟性が高まり、パイプラインのテストが容易になり、パイプラインのコードの再利用性が向上します。

このセクションでは、パイプライン構築の基本概念をよく理解していることを前提としています。まだその段階に達していないと感じた場合は、上記のセクションMinimalWordCountをお読みください。

Javaでこの例を実行するには

Java WordCountクイックスタートで説明されているように、開発環境をセットアップし、Mavenアーキタイプを生成します。次に、ランナーのいずれかを使用してパイプラインを実行します。

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                  --project=YOUR_PROJECT --region=GCE_REGION \
                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

Javaの完全なコードを表示するには、**WordCount**を参照してください。

Pythonでこの例を実行するには

python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
# Running Beam Python on a distributed Flink cluster requires additional configuration.
# See /documentation/runners/flink/ for more information.
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
                                         --output /path/to/write/counts \
                                         --runner SparkRunner
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --region YOUR_GCP_REGION \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

Pythonの完全なコードを表示するには、**wordcount.py**を参照してください。

Goでこの例を実行するには

$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
$ wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --region your-gcp-region \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache/beam_go_sdk:latest
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

Goの完全なコードを表示するには、**wordcount.go**を参照してください。

新しい概念

次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。

明示的なDoFnsの指定

`ParDo`変換を使用する場合、入力`PCollection`の各要素に適用される処理操作を指定する必要があります。この処理操作は、SDKクラス`DoFn`のサブクラスです。前の例(MinimalWordCount)で行われているように、各`ParDo`の`DoFn`サブクラスを匿名の内部クラスインスタンスとしてインラインで作成できます。ただし、`DoFn`をグローバルレベルで定義することをお勧めします。これにより、単体テストが容易になり、`ParDo`コードの可読性が向上します。

ParDo トランスフォームを使用する際は、入力PCollection内の各要素に適用される処理操作を指定する必要があります。この処理操作は、名前付き関数または特別に名前が付けられたメソッドを持つ構造体のいずれかです。無名関数(クロージャを除く)を使用できます。ただし、DoFnをグローバルレベルで定義することをお勧めします。これにより、単体テストが容易になり、ParDoコードの可読性も向上します。

// In this example, ExtractWordsFn is a DoFn that is defined as a static class:

static class ExtractWordsFn extends DoFn<String, String> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
        ...
    }
}
# In this example, the DoFns are defined as classes:


class FormatAsTextFn(beam.DoFn):
  def process(self, element):
    word, count = element
    yield '%s: %s' % (word, count)

formatted = counts | beam.ParDo(FormatAsTextFn())
// In this example, extractFn is a DoFn that is defined as a function:

func extractFn(ctx context.Context, line string, emit func(string)) {
   ...
}

複合変換の作成

複数のトランスフォームまたはParDoステップからなる処理操作がある場合は、PTransformのサブクラスとして作成できます。PTransformサブクラスを作成すると、複雑なトランスフォームをカプセル化し、パイプラインの構造をより明確でモジュール化し、単体テストを容易にすることができます。

複数のトランスフォームまたはParDoステップからなる処理操作がある場合は、通常のGo関数を使用してそれらをカプセル化できます。さらに、名前付きサブスコープを使用して、それらを監視可能な複合トランスフォームとしてグループ化できます。

この例では、2つのトランスフォームがPTransformサブクラスCountWordsとしてカプセル化されています。CountWordsには、ExtractWordsFnを実行するParDoと、SDK提供のCountトランスフォームが含まれています。

この例では、2つのトランスフォームがCountWords関数としてカプセル化されています。

CountWordsが定義されている場合、その最終的な入力と出力を指定します。入力は抽出操作のためのPCollection<String>であり、出力はカウント操作によって生成されるPCollection<KV<String, Long>>です。

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >>
      beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement())

counts = lines | CountWords()
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
	s = s.Scope("CountWords")

	// Convert lines of text into individual words.
	col := beam.ParDo(s, extractFn, lines)

	// Count the number of times each word occurs.
	return stats.Count(s, col)
}

パラメーター化可能なPipelineOptionsの使用

パイプラインを実行する際に、さまざまな実行オプションをハードコードできます。ただし、より一般的な方法は、コマンドライン引数の解析を介して独自の構成オプションを定義することです。コマンドラインを介して構成オプションを定義すると、コードをさまざまなランナー間でより簡単に移植できます。

コマンドラインパーサーによって処理される引数を追加し、それらのデフォルト値を指定します。その後、パイプラインコードでオプションの値にアクセスできます。

この目的には、標準のflagパッケージを使用できます。

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = pipeline | beam.io.ReadFromText(args.input_file)
var input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")

func main() {
    ...
    p := beam.NewPipeline()
    s := p.Root()

    lines := textio.Read(s, *input)
    ...

Playgroundで完全な例を試す

DebuggingWordCount例

DebuggingWordCountの例は、パイプラインコードをインストルメントするためのベストプラクティスを示しています。

Javaでこの例を実行するには

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=JetRunner --jetLocalMode=3 --output=counts

Javaの完全なコードを表示するには、DebuggingWordCountを参照してください。

Pythonでこの例を実行するには

python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

Pythonの完全なコードを表示するには、wordcount_debugging.pyを参照してください。

Goでこの例を実行するには

$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
$ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                      --output gs://<your-gcs-bucket>/counts \
                      --runner dataflow \
                      --project your-gcp-project \
                      --region your-gcp-region \
                      --temp_location gs://<your-gcs-bucket>/tmp/ \
                      --staging_location gs://<your-gcs-bucket>/binaries/ \
                      --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

Goの完全なコードを表示するには、debugging_wordcount.goを参照してください。

新しい概念

次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。

ロギング

各ランナーは、ログを独自のやり方で処理することを選択できます。

// This example uses .trace and .debug:

public class DebuggingWordCount {

  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (...) {
        ...
        LOG.debug("Matched: " + c.element().getKey());
      } else {
        ...
        LOG.trace("Did not match: " + c.element().getKey());
      }
    }
  }
}
# [START example_wordcount_debugging_aggregators]
import logging

class FilterTextFn(beam.DoFn):
  """A DoFn that filters for a specific key based on a regular expression."""
  def __init__(self, pattern):
    self.pattern = pattern
    # A custom metric can track values in your pipeline as it runs. Create
    # custom metrics matched_word and unmatched_words.
    self.matched_words = Metrics.counter(self.__class__, 'matched_words')
    self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')

  def process(self, element):
    word, _ = element
    if re.match(self.pattern, word):
      # Log at INFO level each element we match. When executing this pipeline
      # using the Dataflow service, these log lines will appear in the Cloud
      # Logging UI.
      logging.info('Matched %s', word)

      # Add 1 to the custom metric counter matched_words
      self.matched_words.inc()
      yield element
    else:
      # Log at the "DEBUG" level each element that is not matched. Different
      # log levels can be used to control the verbosity of logging providing
      # an effective mechanism to filter less important information. Note
      # currently only "INFO" and higher level logs are emitted to the Cloud
      # Logger. This log message will not be visible in the Cloud Logger.
      logging.debug('Did not match %s', word)

      # Add 1 to the custom metric counter umatched_words
      self.umatched_words.inc()
type filterFn struct {
    ...
}

func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
    if f.re.MatchString(word) {
         // Log at the "INFO" level each element that we match.
         log.Infof(ctx, "Matched: %v", word)
         emit(word, count)
    } else {
        // Log at the "DEBUG" level each element that is not matched.
        log.Debugf(ctx, "Did not match: %v", word)
    }
}

Direct Runner

DirectRunnerでパイプラインを実行する場合は、ログメッセージをローカルコンソールに直接出力できます。Beam SDK for Javaを使用する場合は、クラスパスにSlf4jを追加する必要があります。

Cloud Dataflow Runner

DataflowRunnerでパイプラインを実行する場合は、Stackdriver Loggingを使用できます。Stackdriver Loggingは、すべてのCloud Dataflowジョブのワーカーからのログを、Google Cloud Platformコンソールの単一の位置に集約します。Stackdriver Loggingを使用して、Cloud Dataflowがジョブの完了のために起動したすべてのワーカーからのログを検索およびアクセスできます。パイプラインのDoFnインスタンスのログステートメントは、パイプラインの実行中にStackdriver Loggingに表示されます。

ワーカーのログレベルも制御できます。ユーザーコードを実行するCloud Dataflowワーカーは、デフォルトで「INFO」ログレベル以上のStackdriver Loggingへのログ記録が構成されています。次のように指定することで、特定のログ名前空間のログレベルをオーバーライドできます。--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}。たとえば、Cloud Dataflowサービスを使用してパイプラインを実行する際に--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}を指定すると、Stackdriver Loggingには、デフォルトの「INFO」以上のログに加えて、パッケージの「DEBUG」以上のログのみが含まれます。

デフォルトのCloud Dataflowワーカーのログ構成は、--defaultWorkerLogLevel=<TRACE、DEBUG、INFO、WARN、ERRORのいずれか>を指定してオーバーライドできます。たとえば、Cloud Dataflowサービスでパイプラインを実行する際に--defaultWorkerLogLevel=DEBUGを指定すると、Cloud Loggingにはすべての「DEBUG」以上のログが含まれます。デフォルトのワーカーログレベルをTRACEまたはDEBUGに変更すると、出力されるログの量が大幅に増加することに注意してください。

Apache Spark Runner

注記: このセクションはまだ追加されていません。Issue 18076を参照してください。

注記: このセクションはまだ追加されていません。Issue 18075を参照してください。

Apache Nemo Runner

NemoRunnerでパイプラインを実行する場合は、ほとんどのログメッセージがローカルコンソールに直接出力されます。ログを最大限に活用するには、クラスパスにSlf4jを追加する必要があります。ドライバ側と実行側の両方でログを観察するには、Apache REEFによって作成されたフォルダを観察する必要があります。たとえば、ローカルランタイムでパイプラインを実行する場合、作業ディレクトリにREEF_LOCAL_RUNTIMEというフォルダが作成され、ログとメトリック情報はすべてそのディレクトリにあります。

アサーションによるパイプラインのテスト

PAssertassert_thatは、Hamcrestのコレクションマッチャーのスタイルで、パイプラインレベルのテストを作成してPCollectionの内容を検証するために使用できる便利なPTransformのセットです。アサートは、小さなデータセットを使用した単体テストで最も効果的に使用されます。

passertパッケージには、パイプラインレベルのテストを作成してPCollectionの内容を検証するために使用できる便利なPTransformが含まれています。アサートは、小さなデータセットを使用した単体テストで最も効果的に使用されます。

次の例では、フィルタリングされた単語のセットが予想されるカウントと一致することを確認します。アサートは出力せず、すべての期待値が満たされた場合にのみパイプラインは成功します。

次の例では、2つのコレクションに同じ値が含まれていることを確認します。アサートは出力せず、すべての期待値が満たされた場合にのみパイプラインは成功します。

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

with TestPipeline() as p:
  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
...
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")

単体テストの例については、DebuggingWordCountTestを参照してください。

Playgroundで完全な例を試す

WindowedWordCount例

WindowedWordCountの例は、前の例と同様にテキスト内の単語をカウントしますが、いくつかの高度な概念を紹介しています。

新しい概念

次のセクションでは、これらの重要な概念を詳しく説明し、パイプラインコードをより小さなセクションに分割します。

Javaでこの例を実行するには

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

Javaの完全なコードを表示するには、WindowedWordCountを参照してください。

Pythonでこの例を実行するには

このパイプラインは、--output_tableパラメーターを使用して、PROJECT:DATASET.TABLEまたはDATASET.TABLE形式でBigQueryテーブルに結果を出力します。

python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
                                         --output_table PROJECT:DATASET.TABLE \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

Pythonの完全なコードを表示するには、windowed_wordcount.pyを参照してください。

Goでこの例を実行するには

$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
$ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

Goの完全なコードを表示するには、windowed_wordcount.goを参照してください。

無制限と制限付きデータセット

Beamを使用すると、バウンドされたデータセットとバウンドされていないデータセットの両方を処理できる単一のパイプラインを作成できます。データセットに固定数の要素がある場合、それはバウンドされたデータセットであり、すべてのデータはまとめて処理できます。バウンドされたデータセットの場合、考慮すべき質問は「すべてのデータを持っていますか?」です。データが継続的に到着する場合(モバイルゲームの例の無限のゲームスコアの流れなど)、それはバウンドされていないデータセットです。バウンドされていないデータセットは、一度に処理できる状態になることはありません。そのため、データは継続的に実行されるストリーミングパイプラインを使用して処理する必要があります。データセットは特定の時点までしか完了しないため、考慮すべき質問は「どの時点まですべてのデータを持っていますか?」です。Beamはウィンドウ処理を使用して、継続的に更新されるデータセットを有限サイズの論理ウィンドウに分割します。入力がバウンドされていない場合は、ストリーミングをサポートするランナーを使用する必要があります。

パイプラインの入力がバウンドされている場合、すべてのダウンストリームPCollectionもバウンドされます。同様に、入力がバウンドされていない場合、パイプラインのすべてのダウンストリームPCollectionはバウンドされませんが、個別のブランチは独立してバウンドされる可能性があります。

この例の入力はシェイクスピアのテキストのセットであり、これは有限のデータセットであることを思い出してください。したがって、この例では、テキストファイルからバウンドされたデータを読み取ります。

public static void main(String[] args) throws IOException {
    Options options = ...
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline
      .apply(TextIO.read().from(options.getInputFile()))
def main(arvg=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input-file',
                      dest='input_file',
                      default='/Users/home/words-example.txt')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)
  lines  = p | 'read' >> ReadFromText(known_args.input_file)
func main() {
   ...
   p := beam.NewPipeline()
   s := p.Root()

   lines := textio.Read(s, *input)
   ...
}

データへのタイムスタンプの追加

PCollection内の各要素には、関連付けられたタイムスタンプがあります。各要素のタイムスタンプは、最初にPCollectionを作成するソースによって割り当てられます。バウンドされていないPCollectionを作成するソースの中には、各新しい要素に、要素が読み取られたり追加されたりする時点に対応するタイムスタンプを割り当てるものがあります。DoFnを使用してタイムスタンプを手動で割り当てたり調整したりできますが、タイムスタンプを前方に移動することしかできません。

この例では、入力はバウンドされています。例のために、AddTimestampsFnという名前のDoFnメソッド(ParDoによって呼び出される)は、要素自体を指定して、各要素のタイムスタンプを設定します。たとえば、要素がログ行の場合、このParDoはログ文字列から時間を解析して、要素のタイムスタンプとして設定できます。シェイクスピアの著作にはタイムスタンプが固有に存在しないため、ここでは概念を示すためにランダムなタイムスタンプを作成しました。入力テキストの各行には、2時間の間のランダムな関連付けられたタイムスタンプが設定されます。

.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
beam.Map(AddTimestampFn(min_timestamp, max_timestamp))
timestampedLines := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, lines)

以下は、ParDoによって呼び出されるDoFnであるAddTimestampFnのコードです。これは、要素自体を指定してタイムスタンプのデータ要素を設定します。たとえば、要素がログ行の場合、このParDoはログ文字列から時間を解析して、要素のタイムスタンプとして設定できます。シェイクスピアの著作にはタイムスタンプが固有に存在しないため、ここでは概念を示すためにランダムなタイムスタンプを作成しました。入力テキストの各行には、2時間の間のランダムな関連付けられたタイムスタンプが設定されます。

static class AddTimestampFn extends DoFn<String, String> {
  private final Instant minTimestamp;
  private final Instant maxTimestamp;

  AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
    this.minTimestamp = minTimestamp;
    this.maxTimestamp = maxTimestamp;
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    Instant randomTimestamp =
      new Instant(
          ThreadLocalRandom.current()
          .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

    /**
     * Concept #2: Set the data element with that timestamp.
     */
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}
class AddTimestampFn(beam.DoFn):

  def __init__(self, min_timestamp, max_timestamp):
     self.min_timestamp = min_timestamp
     self.max_timestamp = max_timestamp

  def process(self, element):
    return window.TimestampedValue(
       element,
       random.randint(self.min_timestamp, self.max_timestamp))
type addTimestampFn struct {
	Min beam.EventTime `json:"min"`
}

func (f *addTimestampFn) ProcessElement(x beam.X) (beam.EventTime, beam.X) {
	timestamp := f.Min.Add(time.Duration(rand.Int63n(2 * time.Hour.Nanoseconds())))
	return timestamp, x
}

beam.X「型変数」の使用により、トランスフォームを任意の型で使用できます。

ウィンドウ化

Beamは、PCollectionをバウンドされた要素のセットに細分化するウィンドウ処理という概念を使用します。複数の要素を集約するPTransformは、コレクション全体が無限のサイズ(バウンドされていない)であっても、複数の有限ウィンドウの連続として各PCollectionを処理します。

WindowedWordCountの例では、固定時間ウィンドウ処理が適用されます。ここで、各ウィンドウは固定時間間隔を表します。この例の固定ウィンドウサイズはデフォルトで1分です(コマンドラインオプションで変更できます)。

PCollection<String> windowedWords = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
windowed_words = input | beam.WindowInto(window.FixedWindows(60 * window_size_minutes))
windowedLines := beam.WindowInto(s, window.NewFixedWindows(time.Minute), timestampedLines)

ウィンドウ化されたPCollectionでのPTransformsの再利用

単純なPCollectionに対して作成された既存のPTransformを、ウィンドウ化されたPCollectionに対しても再利用できます。

PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
word_counts = windowed_words | CountWords()
counted := wordcount.CountWords(s, windowedLines)

Playgroundで完全な例を試す

StreamingWordCount例

StreamingWordCountの例は、Pub/SubサブスクリプションまたはトピックからPub/Subメッセージを読み取り、各メッセージ内の単語の頻度カウントを実行するストリーミングパイプラインです。WindowedWordCountと同様に、この例では固定時間ウィンドウ処理が適用されます。ここで、各ウィンドウは固定時間間隔を表します。この例の固定ウィンドウサイズは15秒です。パイプラインは、各15秒のウィンドウで検出された単語の頻度カウントを出力します。

新しい概念

Javaでこの例を実行するには

注記: StreamingWordCountはまだJava SDKでは利用できません。

Pythonでこの例を実行するには

python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

Pythonの完全なコードを見るには、streaming_wordcount.pyを参照してください。

Goでこの例を実行するには

注記: StreamingWordCountはまだGo SDKでは利用できません。これに関する未解決の問題があります(Issue 18879)。

無制限データセットの読み込み

この例では、入力として無制限のデータセットを使用しています。beam.io.ReadFromPubSubを使用して、Pub/SubサブスクリプションまたはトピックからPub/Subメッセージを読み取ります。

  // This example is not currently available for the Beam SDK for Java.
  # Read from Pub/Sub into a PCollection.
  if known_args.input_subscription:
    data = p | beam.io.ReadFromPubSub(
        subscription=known_args.input_subscription)
  else:
    data = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  lines = data | 'DecodeString' >> beam.Map(lambda d: d.decode('utf-8'))
  // This example is not currently available for the Beam SDK for Go.

無制限結果の書き込み

入力が無制限の場合、出力PCollectionも同様に無制限になります。そのため、結果に対して適切なI/Oを選択する必要があります。一部のI/Oは制限付き出力のみをサポートしますが、他のI/Oは制限付きと無制限の両方の出力をサポートしています。

この例では、無制限のPCollectionを使用し、結果をGoogle Pub/Subにストリーミングします。beam.io.WriteToPubSubを使用して、結果をフォーマットし、Pub/Subトピックに書き込みます。

  // This example is not currently available for the Beam SDK for Java.
  # Write to Pub/Sub
  _ = (output
    | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
    | beam.io.WriteToPubSub(known_args.output_topic))
  // This example is not currently available for the Beam SDK for Go.

次のステップ

問題が発生した場合は、お気軽にお問い合わせください