Apache Beam Go SDKクイックスタート

このクイックスタートでは、サンプルパイプラインApache Beam Go SDKを使用して記述)をDirect Runnerを使用して実行する方法を示します。Direct Runnerは、ローカルマシン上でパイプラインを実行します。

Apache Beam Goコードベースへのコントリビュートに関心のある方は、コントリビューションガイドを参照してください。

このページの内容

開発環境のセットアップ

Goの開発環境が準備されていることを確認してください。準備ができていない場合は、ダウンロードとインストールページの手順に従ってください。

GitHubリポジトリのクローン

apache/beam-starter-go GitHubリポジトリをクローンまたはダウンロードし、beam-starter-goディレクトリに移動します。

git clone https://github.com/apache/beam-starter-go.git
cd beam-starter-go

クイックスタートの実行

以下のコマンドを実行します

go run main.go --input-text="Greetings"

出力は次のようになります。

Hello
World!
Greetings

行の順序は異なる場合があります。

コードの調査

このクイックスタートのメインコードファイルは**main.go**です(GitHub)。このコードは次の手順を実行します。

  1. Beamパイプラインを作成します。
  2. 最初のPCollectionを作成します。
  3. 変換を適用します。
  4. Direct Runnerを使用してパイプラインを実行します。

パイプラインの作成

パイプラインを作成する前に、Init関数を呼び出します。

beam.Init()

次に、パイプラインを作成します。

pipeline, scope := beam.NewPipelineWithRoot()

NewPipelineWithRoot関数は、新しいPipelineオブジェクトとパイプラインのルートスコープを返します。*スコープ*は、複合変換のための階層的なグループ化です。

最初のPCollectionの作成

PCollection抽象化は、潜在的に分散された複数要素のデータセットを表します。Beamパイプラインは、最初のPCollectionを満たすためのデータソースが必要です。ソースは、バウンド型(既知の固定サイズ)またはアンバウンド型(無制限のサイズ)にすることができます。

この例では、Create関数を使用して、文字列のインメモリ配列からPCollectionを作成します。結果のPCollectionには、「hello」、「world!」、およびユーザーが提供した入力文字列が含まれます。

elements := beam.Create(scope, "hello", "world!", input_text)

PCollectionへの変換の適用

変換は、PCollection内の要素を変更、フィルタリング、グループ化、分析、またはその他の処理を行うことができます。

この例では、ParDo変換を追加して、入力文字列をタイトルケースに変換します。

elements = beam.ParDo(scope, strings.Title, elements)

ParDo関数は、親スコープ、データに適用される変換関数、および入力PCollectionを取ります。出力PCollectionを返します。

前の例では、変換に組み込みのstrings.Title関数を使用しています。ParDoにアプリケーション定義の関数を提供することもできます。たとえば、

func logAndEmit(ctx context.Context, element string, emit func(string)) {
    beamLog.Infoln(ctx, element)
    emit(element)
}

この関数は入力要素をログに記録し、変更されていない同じ要素を返します。この関数に対するParDoは次のように作成します。

beam.ParDo(scope, logAndEmit, elements)

実行時に、ParDoは入力コレクションの各要素に対してlogAndEmit関数を呼び出します。

パイプラインの実行

前述のセクションで示したコードはパイプラインを定義しますが、まだデータは処理しません。データ処理するには、パイプラインを実行します。

beamx.Run(ctx, pipeline)

Beam ランナーは、特定のプラットフォームでBeamパイプラインを実行します。この例では、Direct Runnerを使用しています。これは、ランナーを指定しない場合のデフォルトのランナーです。Direct Runnerは、ローカルマシン上でパイプラインを実行します。これは効率を最適化するためではなく、テストと開発を目的としています。詳細については、Direct Runnerの使用を参照してください。

本番ワークロードの場合、通常は、Apache Flink、Apache Spark、またはGoogle Cloud Dataflowなどのビッグデータ処理システムでパイプラインを実行する分散ランナーを使用します。これらのシステムは、大規模な並列処理をサポートしています。

次のステップ

問題が発生した場合は、お気軽にお問い合わせください!