Apache Beam Python SDKクイックスタート
このクイックスタートでは、サンプルパイプラインをApache Beam Python SDKを使用して記述し、Direct Runnerを使用して実行する方法を示します。 Direct Runnerは、パイプラインをローカルマシン上で実行します。
Apache Beam Pythonコードベースへの貢献に興味がある場合は、貢献ガイドを参照してください。
このページの内容
開発環境をセットアップする
Apache Beamは、まだEOLに達していないリリース済みのPythonバージョンで動作することを目指していますが、Apache Beamが最新のPythonマイナーバージョンを完全にサポートするまでには、いくつかのリリースが必要になる場合があります。
必要な最小のPythonバージョンは、apache-beamプロジェクトページの[メタ]セクションの[必須]にリストされています。サポートされているすべてのPythonバージョンのリストは、ページの最下部にある[分類子]セクションの[プログラミング言語]にリストされています。
次のコマンドを実行して、Pythonのバージョンを確認してください
Pythonインタープリターがない場合は、Pythonダウンロードページからダウンロードしてインストールできます。
既にお持ちのバージョンに加えて、別のバージョンのPythonをインストールする必要がある場合は、開発者Wikiでいくつかの推奨事項を見つけることができます。
GitHubリポジトリをクローンする
apache/beam-starter-python GitHubリポジトリをクローンまたはダウンロードして、beam-starter-python
ディレクトリに移動します。
仮想環境を作成してアクティブ化する
仮想環境は、独自のPythonディストリビューションを含むディレクトリツリーです。プロジェクトのすべての依存関係が独立した自己完結型の環境にインストールされるように、仮想環境を使用することをお勧めします。仮想環境を設定するには、次のコマンドを実行します。
これらのコマンドがプラットフォームで機能しない場合は、venv
ドキュメントを参照してください。
プロジェクトの依存関係をインストールする
次のコマンドを実行して、requirements.txt
ファイルからプロジェクトの依存関係をインストールします
クイックスタートを実行する
次のコマンドを実行します
出力は次のようになります
行は異なる順序で表示される場合があります。
次のコマンドを実行して、仮想環境を非アクティブ化します
コードを探索する
このクイックスタートのメインコードファイルは、app.pyです(GitHub)。コードは次の手順を実行します。
- Beamパイプラインを作成します。
- 初期の
PCollection
を作成します。 PCollection
に変換を適用します。- Direct Runnerを使用して、パイプラインを実行します。
パイプラインを作成する
コードは最初にPipeline
オブジェクトを作成します。 Pipeline
オブジェクトは、実行される変換のグラフを構築します。
with beam.Pipeline(options=beam_options) as pipeline:
ここに示されているbeam_option
変数は、パイプラインのオプションを設定するために使用されるPipelineOptions
オブジェクトです。詳細については、パイプラインオプションの設定を参照してください。
初期PCollectionを作成する
PCollection
抽象化は、潜在的に分散された複数要素のデータセットを表します。 Beamパイプラインには、初期のPCollection
を生成するためのデータソースが必要です。ソースは、バインドされている(既知の固定サイズ)か、アンバインドされている(無制限のサイズ)場合があります。
この例では、Create
メソッドを使用して、インメモリの文字列配列からPCollection
を作成します。結果のPCollection
には、「Hello」、「World!」という文字列と、ユーザーが提供する入力文字列が含まれます。
pipeline
| "Create elements" >> beam.Create(["Hello", "World!", input_text])
注:パイプ演算子|
は、連鎖変換に使用されます。
PCollectionに変換を適用する
変換は、PCollection
の要素を変更、フィルタリング、グループ化、分析、またはその他の処理を実行できます。この例では、コレクションの要素を新しいコレクションにマップするMap
変換を使用します
| "Print elements" >> beam.Map(print)
パイプラインを実行する
パイプラインを実行するには、Pipeline.run
メソッドを呼び出すことができます
pipeline.run.wait_until_finish()
ただし、Pipeline
オブジェクトをwith
ステートメントで囲むと、run
メソッドが自動的に呼び出されます。
with beam.Pipeline(options=beam_options) as pipeline:
# ...
# run() is called automatically
Beam ランナーは、特定のプラットフォーム上でBeamパイプラインを実行します。ランナーを指定しない場合、Direct Runnerがデフォルトになります。 Direct Runnerは、パイプラインをローカルマシン上で実行します。効率を最適化するのではなく、テストと開発を目的としています。詳細については、Direct Runnerの使用を参照してください。
本番環境のワークロードでは、通常、Apache Flink、Apache Spark、Google Cloud Dataflowなどのビッグデータ処理システムでパイプラインを実行する分散ランナーを使用します。これらのシステムは、大規模な並列処理をサポートしています。
次のステップ
- Python用Beam SDKの詳細と、Python SDK APIリファレンスを参照してください。
- 当社の学習リソースで、自分のペースでツアーに参加してください。
- お気に入りのビデオとポッドキャストをいくつかご覧ください。
- Beam users@メーリングリストに参加してください。
問題が発生した場合は、お気軽にご連絡ください!
最終更新日: 2024/10/31
お探しのものはすべて見つかりましたか?
すべてが有用で明確でしたか?変更したいことはありますか?お知らせください!