ブログ
2020/12/16
DataFrame APIプレビューが利用可能になりました!
Beam Python SDKの新しいDataFrame APIのプレビュー版がBeam 2.26.0で利用可能になったことを発表します。SqlTransform
(Java, Python)と同様に、DataFrame APIを使用すると、Beamユーザーはこれまでよりもはるかに簡潔に関係ロジックを表現できます。
より表現力豊かなAPI
Beamの新しいDataFrame APIは、よく知られたPandas DataFrame APIとの互換性を目指していますが、以下に詳しく説明するいくつかの注意点があります。この新しいAPIを使用すると、ニューヨーク市のタクシー乗車データをCSVから読み込み、グループ化された集計を実行し、出力をCSVに書き込む単純なパイプラインを非常に簡潔に表現できます。
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
usecols=['passenger_count' , 'DOLocationID'])
# Count the number of passengers dropped off per LocationID
agg = df.groupby('DOLocationID').sum()
agg.to_csv(output)
これを、CombinePerKey
を使用した従来のBeam Pythonパイプラインとして実装された同じロジックと比較してください。
with beam.Pipeline() as p:
(p | beam.io.ReadFromText("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
skip_header_lines=1)
| beam.Map(lambda line: line.split(','))
# Parse CSV, create key - value pairs
| beam.Map(lambda splits: (int(splits[8] or 0), # DOLocationID
int(splits[3] or 0))) # passenger_count
# Sum values per key
| beam.CombinePerKey(sum)
| beam.MapTuple(lambda loc_id, pc: f'{loc_id},{pc}')
| beam.io.WriteToText(known_args.output))
DataFrameの例は、低レベルのCombinePerKey
を使用せずにグループ化された集計を簡潔に表現できるため、すばやく検査して理解するのがはるかに簡単です。
より表現力豊かであることに加えて、DataFrame APIで記述されたパイプラインは、従来のBeamパイプラインよりも効率的になることがよくあります。これは、DataFrame APIが可能な限り非常に効率的な列指向のPandas実装に依存しているためです。
DSLとしてのDataFrame
Beam SQLをご存知かもしれません。これは、BeamのJava SDKで構築されたドメイン固有言語(DSL)です。SQLは、IOや複雑な操作を含む完全なパイプラインをすべてSQLで表現できるため、DSLと見なされます。
同様に、DataFrame APIはPython SDKで構築されたDSLです。上記の例は、IO、ParDo、CombinePerKeyなどの従来のBeam構造を使用せずに記述されていることがわかります。実際、唯一の従来のBeam型はPipelineインスタンスです!それ以外の場合、このパイプラインは完全にDataFrame APIを使用して記述されています。これは、DataFrame APIがPandasの計算操作を実装するだけでなく、Pandasのネイティブ実装(pd.read_{csv,parquet,...}
とpd.DataFrame.to_{csv,parquet,...}
)に基づいたIOも含まれているためです。
SQLと同様に、スキーマを使用することで、DataFrame APIをより大きなパイプラインに埋め込むこともできます。スキーマ対応のPCollectionはDataFrameに変換され、処理され、結果は別のスキーマ対応のPCollectionに変換し直すことができます。たとえば、DataFrame IOのいずれかではなく、従来のBeam IOを使用する場合は、上記のパイプラインを次のように書き換えることができます。
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
with beam.Pipeline() as p:
...
schema_pc = (p | beam.ReadFromText(..)
# Use beam.Select to assign a schema
| beam.Select(DOLocationID=lambda line: int(...),
passenger_count=lambda line: int(...)))
df = to_dataframe(schema_pc)
agg = df.groupby('DOLocationID').sum()
agg_pc = to_pcollection(pc)
# agg_pc has a schema based on the structure of agg
(agg_pc | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
| beam.WriteToText(..))
DataframeTransform
に関数を渡すことで、DataFrame APIを使用することもできます。
from apache_beam.dataframe.transforms import DataframeTransform
with beam.Pipeline() as p:
...
| beam.Select(DOLocationID=lambda line: int(..),
passenger_count=lambda line: int(..))
| DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
| beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
...
注意点
上記で示唆したように、BeamのDataFrame APIとPandas APIにはいくつかの違いがあります。最も重要な違いは、Beam DataFrame APIが、他のBeam APIと同様に、遅延されることです。これは、データを検査するためにDataFrameインスタンスをprint()
できないことを意味します。これは、まだデータを計算していないためです!計算はパイプラインがrun()
されるまで行われません。それまでは、結果の形状/スキーマ(つまり、列の名前と型)のみがわかり、結果自体はわかりません。
特定のPandas操作を使用しようとすると、いくつかの一般的な例外が発生する可能性があります。
- NotImplementedError: これは、まだ検討する時間がない操作または引数を示します。この新しいAPIのプレビュー版では、できるだけ多くのPandas操作を利用できるようにしようとしましたが、まだ長いテールがあります。
- WontImplementError: これは、Beamモデルと互換性がないため、近い将来にサポートする予定がない操作または引数を示します。このエラーを発生させる操作の最大のクラスは、順序に依存するものです(例:shift、cummax、cummin、head、tailなど)。分散データセットを表すPCollectionは順序付けられていないため、これらはBeamに簡単にマップすることはできません。これらの操作の一部でさえ、将来実装される可能性があることに注意してください。実際、順序に依存する操作をサポートする方法についていくつかのアイデアがありますが、まだしばらく先の話です。
最後に、これは次のいくつかのBeamリリースで強化される新機能のプレビューであることに注意することが重要です。今すぐ試してみて、フィードバックをお寄せいただければ幸いですが、まだ本番環境での使用はお勧めしません。
参加方法
この取り組みに参加する最も簡単な方法は、DataFrameを試してみて、感想をお知らせいただくことです!user@beam.apache.orgに質問を送信するか、jiraにバグレポートと機能リクエストを提出できます。特に、まだ実装されていない操作で役立つものがあれば、優先順位を付けることができるため、非常に役立ちます。
DataFrame APIが舞台裏でどのように機能するか、開発に参加する方法について詳しく知りたい場合は、設計ドキュメントとBeamサミットのプレゼンテーションをご覧ください。そこから、最も役立つ方法は、実装されていない操作の一部を完了することです。この作業はBEAM-9547で調整しています。