Beam DataFrameの概要

Colabで実行 Colabで実行





Apache Beam Python SDKは、pandasライクなDataFrameオブジェクトを操作するためのDataFrame APIを提供します。この機能を使用すると、PCollectionをDataFrameに変換し、pandas DataFrame APIで利用可能な標準メソッドを使用してDataFrameと対話できます。DataFrame APIはpandas実装の上に構築されており、pandas DataFrameメソッドはデータセットの部分集合に対して並列に呼び出されます。Beam DataFrameとpandas DataFrameの大きな違いは、Beamの並列処理モデルをサポートするために、Beam APIによって操作が遅延されることです。(DataFrame実装の違いの詳細については、pandasとの違いを参照してください。)

Beam DataFrameは、Beamパイプラインのためのドメイン固有言語(DSL)と考えることができます。Beam SQLと同様に、DataFrameはBeam Python SDKに組み込まれたDSLです。このDSLを使用すると、ParDoCombinePerKeyなどの標準的なBeam構成要素を参照せずにパイプラインを作成できます。

Beam DataFrame APIは、Beamパイプライン内で使い慣れたプログラミングインターフェースへのアクセスを提供することを目的としています。場合によっては、DataFrame APIは、非常に効率的なベクトル化されたpandas実装に委譲することで、パイプラインの効率を向上させることもできます。

DataFrameとは?

pandas DataFrameを初めて使用する場合は、pandasを10分で学ぶを読んで始めることができます。これは、pandasパッケージのインポート方法と使用方法を示しています。pandasは、データ操作と分析のためのオープンソースのPythonライブラリです。関係データまたはラベル付きデータを操作することを簡素化するデータ構造を提供します。これらのデータ構造の1つがDataFrameで、2次元表形式のデータを含み、データに対してラベル付きの行と列を提供します。

前提条件

Beam DataFrameを使用するには、Beam Pythonバージョン2.26.0以降(完全なセットアップ手順については、Apache Beam Python SDKクイックスタートを参照)とサポートされているpandasバージョンをインストールする必要があります。Beam 2.34.0以降では、「dataframe」エクストラを使用するのが最も簡単な方法です。

pip install apache_beam[dataframe]

分散ランナーでDataFrame APIパイプラインを実行する際には、ワーカーにも同じpandasバージョンがインストールされている必要があることに注意してください。使用しているPythonバージョンとBeamリリースで使用するpandasのバージョンを確認するには、base_image_requirements.txtを参照してください。

DataFrameの使用

次の例のようにDataFrameを使用できます。この例では、CSVファイルからニューヨーク市のタクシーデータを読み取り、グループ化された集計を実行し、出力をCSVに書き戻します。

from apache_beam.dataframe.io import read_csv

with pipeline as p:
  rides = p | read_csv(input_path)

  # Count the number of passengers dropped off per LocationID
  agg = rides.groupby('DOLocationID').passenger_count.sum()
  agg.to_csv(output_path)

pandasはCSVデータの最初の行から列名を推測できます。これはpassenger_countDOLocationIDの由来です。

この例では、従来のBeam型はPipelineインスタンスのみです。それ以外は、DataFrame APIを使用して完全に記述されています。これは、Beam DataFrame APIに独自のIO操作(たとえば、read_csvto_csv)が含まれているためです。これらはpandasネイティブ実装に基づいています。read_*to_*操作は、ファイルパターンとBeam互換のファイルシステムをサポートします。グループ化はグループ化キーで行われ、任意のpandas操作(この場合はsum)を最終的な書き込みの前に適用できます。最終的な書き込みはto_csvで行われます。

Beam DataFrame APIは、ネイティブのpandas実装との互換性を目指していますが、以下に示すpandasとの違いに記載されているいくつかの注意点があります。

パイプラインへのDataFrameの埋め込み

より大きなパイプラインでDataFrames APIを使用するには、PCollectionをDataFrameに変換し、DataFrameを処理してから、DataFrameをPCollectionに戻すことができます。PCollectionをDataFrameに変換して戻すには、スキーマが添付されたPCollectionを使用する必要があります。スキーマが添付されたPCollectionは、スキーマ対応PCollectionとも呼ばれます。PCollectionにスキーマを添付する方法の詳細については、スキーマの作成を参照してください。

次に、スキーマ対応PCollectionを作成し、to_dataframeを使用してDataFrameに変換し、DataFrameを処理してから、to_pcollectionを使用してDataFrameをPCollectionに戻す例を示します。

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
...


    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(known_args.input)

    words = (
        lines
        | 'Split' >> beam.FlatMap(
            lambda line: re.findall(r'[\w]+', line)).with_output_types(str)
        # Map to Row objects to generate a schema suitable for conversion
        # to a dataframe.
        | 'ToRows' >> beam.Map(lambda word: beam.Row(word=word)))

    df = to_dataframe(words)
    df['count'] = 1
    counted = df.groupby('word').sum()
    counted.to_csv(known_args.output)

    # Deferred DataFrames can also be converted back to schema'd PCollections
    counted_pc = to_pcollection(counted, include_indexes=True)


GitHubで完全な単語数カウントの例を確認できます。その他サンプルDataFrameパイプラインもあります。

DataframeTransformに関数を渡すことによっても、DataFrame APIを使用できます。

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f"{row.DOLocationID},{row.passenger_count}")
  ...

DataframeTransformは、Beam SQL DSLのSqlTransformに似ています。SqlTransformがSQLクエリをPTransformに変換するのに対し、DataframeTransformは、DataFrameを受け取りDataFrameを返す関数を適用するPTransformです。DataframeTransformは、Beamと通常のpandas DataFrameの両方で呼び出せるスタンドアロン関数がある場合に特に便利です。

DataframeTransformは、次の例のように、名前とキーワードで複数のPCollectionを受け取り、返すことができます。

output = (pc1, pc2) | DataframeTransform(lambda df1, df2: ...)

output = {'a': pc, ...} | DataframeTransform(lambda a, ...: ...)

pc1, pc2 = {'a': pc} | DataframeTransform(lambda a: expr1, expr2)

{...} = {a: pc} | DataframeTransform(lambda a: {...})