Java向けWordCountクイックスタート

このクイックスタートでは、Java開発環境の設定方法と、サンプルパイプラインApache Beam Java SDKで記述)を任意のランナーを使用して実行する方法を示します。

Apache Beam Javaコードベースへのコントリビュートにご関心がある場合は、コントリビューションガイドを参照してください。

このページの内容

開発環境の設定

  1. Java Development Kit (JDK) バージョン8、11、または17をダウンロードしてインストールします。JAVA_HOME環境変数が設定されており、JDKのインストール先を指していることを確認します。
  2. Apache Mavenをダウンロードしてインストールします。お使いのオペレーティングシステムのインストールガイドに従ってください。
  3. オプション: MavenプロジェクトをGradleに変換する場合は、Gradleをインストールします。

サンプルコードの取得

  1. 最新のBeamリリースに対してビルドされるMavenサンプルプロジェクトを生成します。

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.60.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
       
    mvn archetype:generate `
      -D archetypeGroupId=org.apache.beam `
      -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
      -D archetypeVersion=2.60.0 `
      -D groupId=org.example `
      -D artifactId=word-count-beam `
      -D version="0.1" `
      -D package=org.apache.beam.examples `
      -D interactiveMode=false
       

    Mavenは**word-count-beam**ディレクトリに新しいプロジェクトを作成します。

  2. **word-count-beam**ディレクトリに移動します。

    cd word-count-beam/
       
    cd .\word-count-beam
       
    このディレクトリには、**pom.xml**と、サンプルパイプラインを含む**src**ディレクトリが含まれています。

  3. サンプルパイプラインをリストします。

    ls src/main/java/org/apache/beam/examples/
       
    dir .\src\main\java\org\apache\beam\examples
       
    次のサンプルが表示されます。

    このチュートリアルで使用されているサンプルである**WordCount.java**は、入力ファイル(デフォルトではシェイクスピアの「リア王」を含む**.txt**ファイル)から単語をカウントするBeamパイプラインを定義しています。サンプルの詳細については、WordCountサンプルの解説を参照してください。

オプション: MavenからGradleへの変換

以下の手順では、次のランナーについて、MavenからGradleへのビルド変換方法を説明します。

他のランナーの変換プロセスは同様です。詳細については、Apache Mavenからのビルドの移行を参照してください。

  1. **pom.xml**ファイルがあるディレクトリで、自動化されたMavenからGradleへの変換を実行します。
    gradle init
       
    Gradleビルドを生成するかどうかを尋ねられます。「yes」と入力します。DSL(GroovyまたはKotlin)を選択するように求められます。このチュートリアルでは、Kotlinを選択して「2」と入力します。
  2. 生成された**build.gradle.kts**ファイルを開き、次の変更を加えます。
    1. repositoriesで、mavenLocal()mavenCentral()に置き換えます。
    2. repositoriesで、Confluent Kafka依存関係のリポジトリを宣言します。
      maven {
          url = uri("https://packages.confluent.io/maven/")
      }
            
    3. ビルドスクリプトの最後に、次の条件付き依存関係を追加します。
      if (project.hasProperty("dataflow-runner")) {
          dependencies {
              runtimeOnly("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.60.0")
          }
      }
            
    4. ビルドスクリプトの最後に、次のタスクを追加します。
      tasks.register<JavaExec>("execute") {
        mainClass.set(System.getProperty("mainClass"))
        classpath = sourceSets.main.get().runtimeClasspath
      }
            
  3. プロジェクトをビルドします。
    gradle build
       

サンプルテキストの取得

DataflowRunnerを使用する予定の場合は、この手順をスキップできます。ランナーはGoogle Cloud Storageから直接テキストを取得します。

  1. **word-count-beam**ディレクトリに、**sample.txt**というファイルを作成します。
  2. ファイルにテキストを追加します。この例では、シェイクスピアのリア王のテキストを使用します。

パイプラインの実行

単一のBeamパイプラインは、複数のBeam ランナーで実行できます。DirectRunnerは、マシン上で実行され、特別な設定を必要としないため、はじめに便利です。Beamを試していて、何をすべきか分からない場合は、DirectRunnerを使用してください。

パイプラインを実行する一般的なプロセスは以下のとおりです。

  1. ランナー固有の設定を完了します。
  2. コマンドラインを構築します。
    1. --runner=<runner>を使用してランナーを指定します(デフォルトはDirectRunner)。
    2. ランナー固有の必要なオプションを追加します。
    3. ランナーからアクセスできる入力ファイルと出力場所を選択します。(たとえば、外部クラスタでパイプラインを実行している場合、ローカルファイルにはアクセスできません。)
  3. コマンドを実行します。

WordCountパイプラインを実行するには

  1. ランナーの設定手順に従います。

    DirectRunnerは追加の設定なしで動作します。

  2. 下記の対応するMavenまたはGradleコマンドを実行します。

Mavenを使用したWordCountの実行

Unixシェル用

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=counts" -Pdirect-runner
mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                 --inputFile=sample.txt --output=/tmp/counts" -Pflink-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -Pspark-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
                 --region=<your-gcp-region> \
                 --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                 --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
    -Pdataflow-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -Psamza-runner
mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -Pjet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/sample.txt --output=counts

Windows PowerShell用

mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--inputFile=sample.txt --output=counts" -P direct-runner
mvn package exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=.\target\word-count-beam-bundled-0.1.jar `
               --inputFile=C:\path\to\quickstart\sample.txt --output=C:\tmp\counts" -P flink-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -P spark-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=DataflowRunner --project=<your-gcp-project> `
               --region=<your-gcp-region> \
               --gcpTempLocation=gs://<your-gcs-bucket>/tmp `
               --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" `
 -P dataflow-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
    -D exec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -P samza-runner
mvn package -P nemo-runner -DskipTests
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -P jet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=JetRunner --jetLocalMode=3 --inputFile=$pwd/sample.txt --output=counts

Gradleを使用したWordCountの実行

Unixシェル用

gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--inputFile=sample.txt --output=counts"
TODO: document FlinkCluster on Gradle: https://github.com/apache/beam/issues/21499
TODO: document Spark on Gradle: https://github.com/apache/beam/issues/21502
gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--project=<your-gcp-project> --inputFile=gs://apache-beam-samples/shakespeare/* \
    --output=gs://<your-gcs-bucket>/counts --runner=DataflowRunner" -Pdataflow-runner
TODO: document Samza on Gradle: https://github.com/apache/beam/issues/21500
TODO: document Nemo on Gradle: https://github.com/apache/beam/issues/21503
TODO: document Jet on Gradle: https://github.com/apache/beam/issues/21501

結果の確認

パイプラインが完了したら、出力を確認できます。countというプレフィックスが付いた出力ファイルが複数ある場合があります。出力ファイルの数はランナーによって決定され、効率的な分散実行を行う柔軟性が与えられます。

  1. Unixシェルで出力ファイルを表示します。
    ls counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    gsutil ls gs://<your-gcs-bucket>/counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    ls counts*
       
    出力ファイルには、一意の単語とその出現回数が入っています。
  2. Unixシェルで出力内容を表示します。
    more counts*
       
    more /tmp/counts*
       
    more counts*
       
    gsutil cat gs://<your-gcs-bucket>/counts*
       
    more /tmp/counts*
       
    more counts*
       
    more counts*
       
    要素の順序は、ランナーが効率性を最適化できるように保証されていません。しかし、出力は次のようになります。
    ...
    Think: 3
    slower: 1
    Having: 1
    revives: 1
    these: 33
    wipe: 1
    arrives: 1
    concluded: 1
    begins: 3
    ...
    

次のステップ

問題が発生した場合は、お気軽にお問い合わせください