Apache Beam Python SDKクイックスタート

このクイックスタートでは、サンプルパイプラインApache Beam Python SDKを使用して記述し、Direct Runnerを使用して実行する方法を示します。 Direct Runnerは、パイプラインをローカルマシン上で実行します。

Apache Beam Pythonコードベースへの貢献に興味がある場合は、貢献ガイドを参照してください。

このページの内容

開発環境をセットアップする

Apache Beamは、まだEOLに達していないリリース済みのPythonバージョンで動作することを目指していますが、Apache Beamが最新のPythonマイナーバージョンを完全にサポートするまでには、いくつかのリリースが必要になる場合があります。

必要な最小のPythonバージョンは、apache-beamプロジェクトページの[メタ]セクションの[必須]にリストされています。サポートされているすべてのPythonバージョンのリストは、ページの最下部にある[分類子]セクションの[プログラミング言語]にリストされています。

次のコマンドを実行して、Pythonのバージョンを確認してください

python3 --version

Pythonインタープリターがない場合は、Pythonダウンロードページからダウンロードしてインストールできます。

既にお持ちのバージョンに加えて、別のバージョンのPythonをインストールする必要がある場合は、開発者Wikiでいくつかの推奨事項を見つけることができます。

GitHubリポジトリをクローンする

apache/beam-starter-python GitHubリポジトリをクローンまたはダウンロードして、beam-starter-pythonディレクトリに移動します。

git clone https://github.com/apache/beam-starter-python.git
cd beam-starter-python

仮想環境を作成してアクティブ化する

仮想環境は、独自のPythonディストリビューションを含むディレクトリツリーです。プロジェクトのすべての依存関係が独立した自己完結型の環境にインストールされるように、仮想環境を使用することをお勧めします。仮想環境を設定するには、次のコマンドを実行します。

# Create a new Python virtual environment.
python3 -m venv env

# Activate the virtual environment.
source env/bin/activate

これらのコマンドがプラットフォームで機能しない場合は、venvドキュメントを参照してください。

プロジェクトの依存関係をインストールする

次のコマンドを実行して、requirements.txtファイルからプロジェクトの依存関係をインストールします

pip install -e .

クイックスタートを実行する

次のコマンドを実行します

python main.py --input-text="Greetings"

出力は次のようになります

Hello
World!
Greetings

行は異なる順序で表示される場合があります。

次のコマンドを実行して、仮想環境を非アクティブ化します

deactivate

コードを探索する

このクイックスタートのメインコードファイルは、app.pyです(GitHub)。コードは次の手順を実行します。

  1. Beamパイプラインを作成します。
  2. 初期のPCollectionを作成します。
  3. PCollectionに変換を適用します。
  4. Direct Runnerを使用して、パイプラインを実行します。

パイプラインを作成する

コードは最初にPipelineオブジェクトを作成します。 Pipelineオブジェクトは、実行される変換のグラフを構築します。

with beam.Pipeline(options=beam_options) as pipeline:

ここに示されているbeam_option変数は、パイプラインのオプションを設定するために使用されるPipelineOptionsオブジェクトです。詳細については、パイプラインオプションの設定を参照してください。

初期PCollectionを作成する

PCollection抽象化は、潜在的に分散された複数要素のデータセットを表します。 Beamパイプラインには、初期のPCollectionを生成するためのデータソースが必要です。ソースは、バインドされている(既知の固定サイズ)か、アンバインドされている(無制限のサイズ)場合があります。

この例では、Createメソッドを使用して、インメモリの文字列配列からPCollectionを作成します。結果のPCollectionには、「Hello」、「World!」という文字列と、ユーザーが提供する入力文字列が含まれます。

pipeline
| "Create elements" >> beam.Create(["Hello", "World!", input_text])

注:パイプ演算子|は、連鎖変換に使用されます。

PCollectionに変換を適用する

変換は、PCollectionの要素を変更、フィルタリング、グループ化、分析、またはその他の処理を実行できます。この例では、コレクションの要素を新しいコレクションにマップするMap変換を使用します

| "Print elements" >> beam.Map(print)

パイプラインを実行する

パイプラインを実行するには、Pipeline.runメソッドを呼び出すことができます

pipeline.run.wait_until_finish()

ただし、Pipelineオブジェクトをwithステートメントで囲むと、runメソッドが自動的に呼び出されます。

with beam.Pipeline(options=beam_options) as pipeline:
    # ...
    # run() is called automatically

Beam ランナーは、特定のプラットフォーム上でBeamパイプラインを実行します。ランナーを指定しない場合、Direct Runnerがデフォルトになります。 Direct Runnerは、パイプラインをローカルマシン上で実行します。効率を最適化するのではなく、テストと開発を目的としています。詳細については、Direct Runnerの使用を参照してください。

本番環境のワークロードでは、通常、Apache Flink、Apache Spark、Google Cloud Dataflowなどのビッグデータ処理システムでパイプラインを実行する分散ランナーを使用します。これらのシステムは、大規模な並列処理をサポートしています。

次のステップ

問題が発生した場合は、お気軽にご連絡ください!