Apache Beam Java SDK クイックスタート

このクイックスタートでは、サンプルパイプラインApache Beam Java SDK で記述し、Direct Runner を使用して実行する方法を示します。Direct Runner は、ローカルマシン上でパイプラインを実行します。

Apache Beam Java コードベースへの貢献に興味がある場合は、貢献ガイドを参照してください。

このページの内容

開発環境のセットアップ

sdkman を使用して Java Development Kit(JDK)をインストールします。

# Install sdkman
curl -s "https://get.sdkman.io" | bash

# Install Java 17
sdk install java 17.0.5-tem

このクイックスタートを実行するには、Gradle または Apache Maven のいずれかを使用できます。

# Install Gradle
sdk install gradle

# Install Maven
sdk install maven

GitHub リポジトリのクローン

apache/beam-starter-java GitHub リポジトリをクローンまたはダウンロードし、beam-starter-java ディレクトリに移動します。

git clone https://github.com/apache/beam-starter-java.git
cd beam-starter-java

クイックスタートの実行

Gradle: Gradle でクイックスタートを実行するには、次のコマンドを実行します。

gradle run --args='--inputText=Greetings'

Maven: Maven でクイックスタートを実行するには、次のコマンドを実行します。

mvn compile exec:java -Dexec.args=--inputText='Greetings'

出力は次のようになります。

Hello
World!
Greetings

行の順序は異なる場合があります。

コードの探索

このクイックスタートのメインコードファイルは App.java (GitHub) です。このコードは次の手順を実行します。

  1. Beam パイプラインを作成します。
  2. 初期 PCollection を作成します。
  3. PCollection に変換を適用します。
  4. Direct Runner を使用してパイプラインを実行します。

パイプラインの作成

コードはまず Pipeline オブジェクトを作成します。Pipeline オブジェクトは、実行する変換のグラフを構築します。

var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
var pipeline = Pipeline.create(options);

PipelineOptions オブジェクトを使用すると、パイプラインのさまざまなオプションを設定できます。この例に示す fromArgs メソッドは、コマンドライン引数を解析し、コマンドラインからパイプラインオプションを設定できるようにします。

初期 PCollection の作成

PCollection 抽象化は、潜在的に分散された複数要素のデータセットを表します。Beam パイプラインには、初期 PCollection を設定するためのデータソースが必要です。ソースは、バインドされている(既知の固定サイズ)か、アンバインドされている(無制限のサイズ)のいずれかです。

この例では、Create.of メソッドを使用して、メモリ内の文字列配列から PCollection を作成します。結果の PCollection には、文字列「Hello」、「World!」、およびユーザーが指定した入力文字列が含まれます。

return pipeline
  .apply("Create elements", Create.of(Arrays.asList("Hello", "World!", inputText)))

PCollection への変換の適用

変換は、PCollection 内の要素を変更、フィルタリング、グループ化、分析、またはその他の処理を行うことができます。この例では、コレクションの要素を新しいコレクションにマッピングする MapElements 変換を使用します。

.apply("Print elements",
    MapElements.into(TypeDescriptors.strings()).via(x -> {
      System.out.println(x);
      return x;
    }));

ここで

この例では、マッピング関数は、元の値を返すラムダです。また、副作用として System.out に値を出力します。

パイプラインの実行

前のセクションで示したコードはパイプラインを定義しますが、まだデータを処理しません。データを処理するには、パイプラインを実行します。

pipeline.run().waitUntilFinish();

Beam ランナーは、特定のプラットフォームで Beam パイプラインを実行します。この例では、指定しない場合のデフォルトランナーである Direct Runner を使用します。Direct Runner は、ローカルマシンでパイプラインを実行します。効率のために最適化されているのではなく、テストと開発を目的としています。詳細については、Direct Runner の使用を参照してください。

実稼働環境のワークロードでは、通常、Apache Flink、Apache Spark、Google Cloud Dataflow などのビッグデータ処理システムでパイプラインを実行する分散ランナーを使用します。これらのシステムは、大規模な並列処理をサポートしています。

次のステップ

何か問題が発生した場合は、お気軽にご連絡ください