Apache Beam TypeScript SDK クイックスタート

このクイックスタートでは、サンプルパイプラインApache Beam TypeScript SDK を使用して記述)を Direct Runner を使用して実行する方法を示します。Direct Runner は、ローカルマシンでパイプラインを実行します。

Apache Beam TypeScript コードベースへのコントリビュートにご関心がある場合は、コントリビュートガイド を参照してください。

このページの内容

開発環境のセットアップ

Node.js の開発環境がインストールされていることを確認してください。インストールされていない場合は、ダウンロードページ からダウンロードしてインストールできます。

複数言語の変換を多用しているため、Python 3 と Java もシステムで使用できることをお勧めします。

GitHub リポジトリのクローン作成

apache/beam-starter-typescript GitHub リポジトリをクローンまたはダウンロードし、beam-starter-typescript ディレクトリに移動します。

git clone https://github.com/apache/beam-starter-typescript.git
cd beam-starter-typescript

プロジェクト依存関係のインストール

次のコマンドを実行して、プロジェクトの依存関係をインストールします。

npm install

パイプラインのコンパイル

パイプラインは次のようにビルドされます。

npm run build

クイックスタートの実行

次のコマンドを実行します。

node dist/src/main.js --input_text="Greetings"

出力は次のようになります。

Hello
World!
Greetings

行の順序は異なる場合があります。

コードの調査

このクイックスタートのメインコードファイルは **app.ts** です (GitHub)。コードは次の手順を実行します。

  1. Beam パイプラインを定義します。
  1. Direct Runner を使用してパイプラインを実行します。

パイプラインの作成

Pipelineは、単一のrootオブジェクトを受け取る呼び出し可能な関数です。Pipeline関数は、実行される変換のグラフを構築します。

初期 PCollection の作成

PCollection抽象化は、潜在的に分散された複数要素のデータセットを表します。Beam パイプラインは、初期PCollectionにデータを入力するためのデータソースが必要です。ソースは、境界付き(既知の固定サイズ)または境界なし(無制限のサイズ)にすることができます。

この例では、Create メソッドを使用して、文字列のインメモリ配列からPCollectionを作成します。結果のPCollectionには、「Hello」、「World!」、およびユーザーが提供した入力文字列が含まれます。

root.apply(beam.create(["Hello", "World!", input_text]))

PCollection への変換の適用

変換は、PCollectionの要素を変更、フィルタリング、グループ化、分析、またはその他の処理を行うことができます。この例では、コレクションの要素を新しいコレクションにマッピングするMap変換を使用します。

.map(printAndReturn);

便宜上、PColletionにはmapメソッドがありますが、より一般的には、.apply(someTransform())を使用して変換が適用されます。

パイプラインの実行

パイプラインを実行するには、ランナーが作成され(オプションがある場合もあります)、

createRunner(options)

その後、そのrunメソッドが上記で作成されたパイプライン呼び出し可能オブジェクトに対して呼び出されます。

.run(createPipeline(...));

Beam ランナー は、特定のプラットフォームで Beam パイプラインを実行します。ランナーを指定しない場合、Direct Runner がデフォルトになります。Direct Runner は、ローカルマシンでパイプラインを実行します。これは、効率を最適化するためではなく、テストと開発を目的としています。詳細については、Direct Runner の使用を参照してください。

本番ワークロードでは、通常、Apache Flink、Apache Spark、または Google Cloud Dataflow などのビッグデータ処理システムでパイプラインを実行する分散ランナーを使用します。これらのシステムは、大規模な並列処理をサポートしています。オプションの runner プロパティを介して異なるランナーを要求できます。たとえば、createRunner({runner: "dataflow"})またはcreateRunner({runner: "flink"})です。この例では、この値はコマンドラインで--runner=...として渡すことができます。たとえば、Dataflowで実行するには、次のように記述します。

node dist/src/main.js \
    --runner=dataflow \
    --project=${PROJECT_ID} \
    --tempLocation=gs://${GCS_BUCKET}/wordcount-js/temp --region=${REGION}

次のステップ

問題が発生した場合は、お気軽にお問い合わせください