DataFrame APIプレビューが利用可能になりました!

Beam Python SDKの新しいDataFrame APIのプレビュー版がBeam 2.26.0で利用可能になったことを発表します。SqlTransformJava, Python)と同様に、DataFrame APIを使用すると、Beamユーザーはこれまでよりもはるかに簡潔に関係ロジックを表現できます。

より表現力豊かなAPI

Beamの新しいDataFrame APIは、よく知られたPandas DataFrame APIとの互換性を目指していますが、以下に詳しく説明するいくつかの注意点があります。この新しいAPIを使用すると、ニューヨーク市のタクシー乗車データをCSVから読み込み、グループ化された集計を実行し、出力をCSVに書き込む単純なパイプラインを非常に簡潔に表現できます。

from apache_beam.dataframe.io import read_csv

with beam.Pipeline() as p:
  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                    usecols=['passenger_count' , 'DOLocationID'])
  # Count the number of passengers dropped off per LocationID
  agg = df.groupby('DOLocationID').sum()
  agg.to_csv(output)

これを、CombinePerKeyを使用した従来のBeam Pythonパイプラインとして実装された同じロジックと比較してください。

with beam.Pipeline() as p:
  (p | beam.io.ReadFromText("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                            skip_header_lines=1)
     | beam.Map(lambda line: line.split(','))
     # Parse CSV, create key - value pairs
     | beam.Map(lambda splits: (int(splits[8] or 0),  # DOLocationID
                                int(splits[3] or 0))) # passenger_count
     # Sum values per key
     | beam.CombinePerKey(sum)
     | beam.MapTuple(lambda loc_id, pc: f'{loc_id},{pc}')
     | beam.io.WriteToText(known_args.output))

DataFrameの例は、低レベルのCombinePerKeyを使用せずにグループ化された集計を簡潔に表現できるため、すばやく検査して理解するのがはるかに簡単です。

より表現力豊かであることに加えて、DataFrame APIで記述されたパイプラインは、従来のBeamパイプラインよりも効率的になることがよくあります。これは、DataFrame APIが可能な限り非常に効率的な列指向のPandas実装に依存しているためです。

DSLとしてのDataFrame

Beam SQLをご存知かもしれません。これは、BeamのJava SDKで構築されたドメイン固有言語(DSL)です。SQLは、IOや複雑な操作を含む完全なパイプラインをすべてSQLで表現できるため、DSLと見なされます。

同様に、DataFrame APIはPython SDKで構築されたDSLです。上記の例は、IO、ParDo、CombinePerKeyなどの従来のBeam構造を使用せずに記述されていることがわかります。実際、唯一の従来のBeam型はPipelineインスタンスです!それ以外の場合、このパイプラインは完全にDataFrame APIを使用して記述されています。これは、DataFrame APIがPandasの計算操作を実装するだけでなく、Pandasのネイティブ実装(pd.read_{csv,parquet,...}pd.DataFrame.to_{csv,parquet,...})に基づいたIOも含まれているためです。

SQLと同様に、スキーマを使用することで、DataFrame APIをより大きなパイプラインに埋め込むこともできます。スキーマ対応のPCollectionはDataFrameに変換され、処理され、結果は別のスキーマ対応のPCollectionに変換し直すことができます。たとえば、DataFrame IOのいずれかではなく、従来のBeam IOを使用する場合は、上記のパイプラインを次のように書き換えることができます。

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

with beam.Pipeline() as p:
  ...
  schema_pc = (p | beam.ReadFromText(..)
                 # Use beam.Select to assign a schema
                 | beam.Select(DOLocationID=lambda line: int(...),
                               passenger_count=lambda line: int(...)))
  df = to_dataframe(schema_pc)
  agg = df.groupby('DOLocationID').sum()
  agg_pc = to_pcollection(pc)

  # agg_pc has a schema based on the structure of agg
  (agg_pc | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
          | beam.WriteToText(..))

DataframeTransformに関数を渡すことで、DataFrame APIを使用することもできます。

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
  ...

注意点

上記で示唆したように、BeamのDataFrame APIとPandas APIにはいくつかの違いがあります。最も重要な違いは、Beam DataFrame APIが、他のBeam APIと同様に、遅延されることです。これは、データを検査するためにDataFrameインスタンスをprint()できないことを意味します。これは、まだデータを計算していないためです!計算はパイプラインがrun()されるまで行われません。それまでは、結果の形状/スキーマ(つまり、列の名前と型)のみがわかり、結果自体はわかりません。

特定のPandas操作を使用しようとすると、いくつかの一般的な例外が発生する可能性があります。

  • NotImplementedError: これは、まだ検討する時間がない操作または引数を示します。この新しいAPIのプレビュー版では、できるだけ多くのPandas操作を利用できるようにしようとしましたが、まだ長いテールがあります。
  • WontImplementError: これは、Beamモデルと互換性がないため、近い将来にサポートする予定がない操作または引数を示します。このエラーを発生させる操作の最大のクラスは、順序に依存するものです(例:shift、cummax、cummin、head、tailなど)。分散データセットを表すPCollectionは順序付けられていないため、これらはBeamに簡単にマップすることはできません。これらの操作の一部でさえ、将来実装される可能性があることに注意してください。実際、順序に依存する操作をサポートする方法についていくつかのアイデアがありますが、まだしばらく先の話です。

最後に、これは次のいくつかのBeamリリースで強化される新機能のプレビューであることに注意することが重要です。今すぐ試してみて、フィードバックをお寄せいただければ幸いですが、まだ本番環境での使用はお勧めしません。

参加方法

この取り組みに参加する最も簡単な方法は、DataFrameを試してみて、感想をお知らせいただくことです!user@beam.apache.orgに質問を送信するか、jiraにバグレポートと機能リクエストを提出できます。特に、まだ実装されていない操作で役立つものがあれば、優先順位を付けることができるため、非常に役立ちます。

DataFrame APIが舞台裏でどのように機能するか、開発に参加する方法について詳しく知りたい場合は、設計ドキュメントBeamサミットのプレゼンテーションをご覧ください。そこから、最も役立つ方法は、実装されていない操作の一部を完了することです。この作業はBEAM-9547で調整しています。