データ探索
Apache Beamの様々なデータ処理は、AI/MLプロジェクトに適用できます。
- データ探索:プロジェクトの開始時またはデータの変更時に、データ(プロパティ、分布、統計)について学びます。
- データ前処理:モデルのトレーニングに使用できるよう、データを加工します。
- データ後処理:推論の実行後、モデルの出力を意味のあるものにするために変換が必要になる場合があります。
- データ検証:外れ値を検出し、標準偏差やクラス分布を計算して、データの品質を確認します。
データ処理は、主に2つのトピックに分類できます。この例では、まずデータ探索、次にデータ前処理と検証の両方を使用するMLのデータパイプラインを調べます。データ後処理は、前処理と似ているため、扱いません。後処理は、パイプラインの順序と種類だけが異なります。
初期データ探索
Pandas は、データ探索を実行するための一般的なツールです。Pandasは、Python向けのデータ分析および操作ツールです。2次元表形式のデータを含むデータ構造であるDataFrameを使用し、データに対してラベル付きの行と列を提供します。Apache Beam Python SDKは、PandasライクなDataFrameオブジェクトを操作するためのDataFrame APIを提供します。
Beam DataFrame APIは、Apache Beamパイプライン内で使い慣れたプログラミングインターフェースにアクセスすることを目的としています。このAPIを使用すると、データ探索を実行できます。データ前処理パイプラインのコードを再利用できます。DataFrame APIを使用することで、標準的なPandasコマンドを呼び出すことで、複雑なデータ処理パイプラインを構築できます。
Beamインタラクティブランナーと組み合わせてJupyterLabノートブックでDataFrame APIを使用できます。ノートブックを使用して、パイプラインを反復的に開発し、個々のパイプラインステップの結果を表示します。
以下は、ノートブックでのApache Beamにおけるデータ探索の例です。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)
# Investigate columns and data types
beam_df.dtypes
# Generate descriptive statistics
ib.collect(beam_df.describe())
# Investigate missing values
ib.collect(beam_df.isnull())
AI/MLプロジェクトでApache BeamとDataFrame APIを使用してデータ探索とデータ前処理を実装する完全なエンドツーエンドの例については、Beam DataFrame API AI/MLチュートリアルを参照してください。
ML用データパイプライン
一般的なデータ前処理パイプラインは、次の手順で構成されています。
- データの読み取りと書き込み:ファイルシステム、データベース、またはメッセージキューからデータを読み取り、書き込みます。Apache Beamには、データの取り込みと書き込みのための豊富なIOコネクタがあります。
- データクレンジング:MLモデルで使用前にデータをフィルタリングおよびクレンジングします。重複データや無関係なデータを削除したり、データセットの誤りを修正したり、不要な外れ値をフィルタリングしたり、欠損データに対処したりする必要がある場合があります。
- データ変換:データは、モデルのトレーニングに必要な期待される入力に適合する必要があります。データを正規化、one-hotエンコーディング、スケーリング、またはベクトル化する必要がある場合があります。
- データエンリッチメント:データを外部データソースで拡張して、データの意味を明確にしたり、MLモデルが解釈しやすくしたりすることができます。たとえば、都市名や住所を座標のセットに変換することができます。
- データ検証とメトリクス:データが、パイプラインで検証できる特定の要件に準拠していることを確認します。クラス分布など、データからのメトリクスを報告します。
Apache Beamパイプラインを使用して、これらの手順すべてを実装できます。この例では、前述のすべてのステップを示すパイプラインを示します。
import apache_beam as beam
from apache_beam.metrics import Metrics
with beam.Pipeline() as pipeline:
# Create data
input_data = (
pipeline
| beam.Create([
{'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
{'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
{'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))
# Clean data
def filter_missing_data(row):
return row['weight'] is not None
cleaned_data = input_data | beam.Filter(filter_missing_data)
# Transform data
def scale_min_max_data(row):
row['age'] = (row['age']/100)
row['height'] = (row['height']-150)/50
row['weight'] = (row['weight']-50)/50
yield row
transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)
# Enrich data
side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
def coordinates_lookup(row, coordinates):
row['coordinates'] = coordinates.get(row['city'], (0, 0))
del row['city']
yield row
enriched_data = (
transformed_data
| beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))
# Metrics
counter = Metrics.counter('main', 'counter')
def count_data(row):
counter.inc()
yield row
output_data = enriched_data | beam.FlatMap(count_data)
# Write data
output_data | beam.io.WriteToText('output.csv')
最終更新日:2024/10/31
探していたものが見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!