概要

Hazelcast Jet Runner は、Hazelcast Jet を使用して Beam パイプラインを実行するために使用できます。

Jet Runner と Jet は、大規模な継続的なジョブに適しており、以下を提供します。

Jet Runner は現在実験段階であることに注意することが重要です。Jet に存在する多くの機能を使用できません。

Jet Runner のサポートされている機能については、Beam 機能マトリックスを参照してください。

Hazelcast Jet Runner を使用した WordCount の実行

Beam examples プロジェクトの生成

Java クイックスタートページの手順に従ってください。

ローカル Jet クラスタでの WordCount の実行

新しい Jet クラスタを開始し、その上で WordCount の例を実行するには、Beam examples プロジェクトで次のコマンドを実行します。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetLocalMode=3 \
            --inputFile=pom.xml \
            --output=counts" \
        -Pjet-runner

リモート Jet クラスタでの WordCount の実行

アーキタイプから生成された Beam examples プロジェクトは、特定のリリースされた Beam バージョン(`archetypeVersion` プロパティに関するものです)から提供されます。Jet Runner を含む各 Beam バージョン(2.14.0 以降)は、特定のバージョンの Jet を使用します。このため、スタンドアロンの Jet クラスタを開始して Beam の例を実行しようとすると、両方が互換性があることを確認する必要があります。さまざまな Beam バージョンに推奨される Jet バージョンについては、次の表を参照してください。

Beam バージョン互換性のある Jet バージョン
2.20.0 以降4.x
2.14.0 - 2.19.03.x
2.13.0 以前N/A

使用している Beam と互換性のある最新の Hazelcast Jet バージョンをHazelcast Jet ウェブサイトからダウンロードしてください。

ダウンロードが完了したら、Jet クラスタを開始する必要があります。最も簡単な方法は、ダウンロードした Jet ディストリビューションに付属の `jet-start` スクリプトを使用して Jet クラスタメンバーを開始することです。メンバーは自動検出機能 自動検出機能を使用してクラスタを形成します。2 つのメンバーで構成されるクラスタを起動してみましょう。

$ cd hazelcast-jet
$ bin/jet-start.sh &
$ bin/jet-start.sh &
$ cd hazelcast-jet
$ bin/jet-start &
$ bin/jet-start &

クラスタが稼働していることを確認します。

$ bin/jet.sh cluster
$ bin/jet cluster

次のような内容が表示されます。

State: ACTIVE
Version: 3.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     76bea7ba-f032-4c25-ad04-bdef6782f481
[192.168.0.117]:5702     03ecfaa2-be16-41b6-b5cf-eea584d7fb86
State: ACTIVE
Version: 4.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     b9937bba-32aa-48ba-8e32-423aafed763b
[192.168.0.117]:5702     dfeadfb2-3ba5-4d1c-95e7-71a1a3ca4937

Beam Examples プロジェクトのディレクトリに変更し、次のコマンドを実行して、リモート Jet クラスタにパイプラインを送信して実行します。入力ファイル(カウントする単語を含むファイル)をクラスタが実行されているすべてのマシンに配布してください。そうでなければ、単語カウントジョブはデータを読み取ることができません。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetServers=192.168.0.117:5701,192.168.0.117:5702 \
            --codeJarPathname=target/word-count-beam-bundled-0.1.jar \
            --inputFile=<INPUT_FILE_AVAILABLE_ON_ALL_CLUSTER_MEMBERS> \
            --output=/tmp/counts" \
        -Pjet-runner

Jet Runner のパイプラインオプション

項目説明デフォルト値
runner使用するパイプラインランナー。このオプションを使用すると、ランタイム時にパイプラインランナーを決定できます。Jet を使用して実行するには `JetRunner` に設定します。
jetGroupNamejetClusterName参加する Hazelcast グループの名前。本質的には、Runner が使用する Jet クラスタの ID です。グループを使用すると、各クラスタが独自のグループを持ち、他のクラスタと干渉しない複数のクラスタを作成できます。 Runner が使用する Hazelcast クラスタの名前。jet
jetServersJet クラスタメンバーのアドレスのリスト。Runner が独自の Jet クラスタを開始せず、外部で独立して開始されたクラスタを使用する場合に必要です。ip/ホスト名-ポートのペアのカンマ区切りリストの形式で指定します。例:`192.168.0.117:5701,192.168.0.117:5702`127.0.0.1:5701
codeJarPathname外部 Jet クラスタを使用する場合にのみ必要なプロパティ。クラスタで実行する必要があるすべてのコード(つまり、少なくともパイプラインコードとランナーコード)を含む fat jar の場所を指定します。値は、`new java.io.File()` にパラメータとして受け入れられる任意の文字列です。デフォルト値はありません。
jetLocalModeRunner によってローカルで開始される Jet クラスタメンバーの数。`0` の場合、Runner は外部クラスタを使用します。それより大きい場合、Runner はそれ自体で開始されたクラスタを使用します。0
jetDefaultParallelismJet メンバーのローカル並列度。各 Jet クラスタメンバーに作成される DAG の各頂点のプロセッサ数。2
jetProcessorsCooperativeDoFns の Jet プロセッサが協調的になることを許可するかどうか(つまり、専用の OS スレッドではなくグリーン スレッドを使用するかどうか)を指定するブール値フラグ。true に設定されている場合、出力がない(同期であるとみなされる)場合を除き、そのようなすべてのプロセッサが協調的になります。false